hotfix: enhance OpenAI stream handling and error management (#6541)

fix: enhance OpenAI stream handling and error management

- Updated the `openAIChunkToTextDelta` function to include error handling with a try-catch block, improving robustness during stream processing.
- Refined the `readableStreamAsyncIterable` function to ensure proper handling of stream completion and errors, including a return method for cleanup.
- Adjusted type definitions for better clarity and consistency in the handling of async iterables.
This commit is contained in:
SuYao 2025-06-06 15:18:16 +08:00 committed by GitHub
parent 8efa7d25f8
commit bc5cc4bf02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 30 deletions

View File

@ -619,32 +619,36 @@ export default class OpenAIProvider extends BaseOpenAIProvider {
}
const reasoningTag = getAppropriateTag(model)
async function* openAIChunkToTextDelta(stream: any): AsyncGenerator<OpenAIStreamChunk> {
for await (const chunk of stream) {
if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) {
break
}
if (chunk.choices && chunk.choices.length > 0) {
const delta = chunk.choices[0]?.delta
if (
(delta?.reasoning_content && delta?.reasoning_content !== '\n') ||
(delta?.reasoning && delta?.reasoning !== '\n')
) {
yield { type: 'reasoning', textDelta: delta.reasoning_content || delta.reasoning }
}
if (delta?.content) {
yield { type: 'text-delta', textDelta: delta.content }
}
if (delta?.tool_calls && delta?.tool_calls.length > 0) {
yield { type: 'tool-calls', delta: delta }
}
const finishReason = chunk?.choices[0]?.finish_reason
if (!isEmpty(finishReason)) {
yield { type: 'finish', finishReason, usage: chunk.usage, delta, chunk }
try {
for await (const chunk of stream) {
if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) {
break
}
if (chunk.choices && chunk.choices.length > 0) {
const delta = chunk.choices[0]?.delta
if (
(delta?.reasoning_content && delta?.reasoning_content !== '\n') ||
(delta?.reasoning && delta?.reasoning !== '\n')
) {
yield { type: 'reasoning', textDelta: delta.reasoning_content || delta.reasoning }
}
if (delta?.content) {
yield { type: 'text-delta', textDelta: delta.content }
}
if (delta?.tool_calls && delta?.tool_calls.length > 0) {
yield { type: 'tool-calls', delta: delta }
}
const finishReason = chunk?.choices[0]?.finish_reason
if (!isEmpty(finishReason)) {
yield { type: 'finish', finishReason, usage: chunk.usage, delta, chunk }
}
}
}
} catch (error) {
console.error('[openAIChunkToTextDelta] error', error)
throw error
}
}
@ -661,7 +665,7 @@ export default class OpenAIProvider extends BaseOpenAIProvider {
})
// 3. 消费 processedStream分发 onChunk
for await (const chunk of readableStreamAsyncIterable(processedStream)) {
for await (const chunk of readableStreamAsyncIterable<OpenAIStreamChunk>(processedStream)) {
const delta = chunk.type === 'finish' ? chunk.delta : chunk
const rawChunk = chunk.type === 'finish' ? chunk.chunk : chunk
switch (chunk.type) {

View File

@ -1,12 +1,32 @@
export function readableStreamAsyncIterable<T>(stream: ReadableStream<T>): AsyncIterable<T> {
/**
* Most browsers don't yet have async iterable support for ReadableStream,
* and Node has a very different way of reading bytes from its "ReadableStream".
*
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) return stream
const reader = stream.getReader()
return {
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
async next(): Promise<IteratorResult<T>> {
return reader.read() as Promise<IteratorResult<T>>
}
async next() {
try {
const result = await reader.read()
if (result?.done) reader.releaseLock() // release lock when stream becomes closed
return result
} catch (e) {
reader.releaseLock() // release lock when stream becomes errored
throw e
}
},
async return() {
const cancelPromise = reader.cancel()
reader.releaseLock()
await cancelPromise
return { done: true, value: undefined }
},
[Symbol.asyncIterator]() {
return this
}
}
}