From 4f383ab4226640d960b3877bc830e69f2296158e Mon Sep 17 00:00:00 2001 From: SuYao Date: Fri, 6 Jun 2025 15:18:16 +0800 Subject: [PATCH] hotfix: enhance OpenAI stream handling and error management (#6541) fix: enhance OpenAI stream handling and error management - Updated the `openAIChunkToTextDelta` function to include error handling with a try-catch block, improving robustness during stream processing. - Refined the `readableStreamAsyncIterable` function to ensure proper handling of stream completion and errors, including a return method for cleanup. - Adjusted type definitions for better clarity and consistency in the handling of async iterables. --- .../providers/AiProvider/OpenAIProvider.ts | 52 ++++++++++--------- src/renderer/src/utils/stream.ts | 32 +++++++++--- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/src/renderer/src/providers/AiProvider/OpenAIProvider.ts b/src/renderer/src/providers/AiProvider/OpenAIProvider.ts index 945a8b3ac9..1c2a49756f 100644 --- a/src/renderer/src/providers/AiProvider/OpenAIProvider.ts +++ b/src/renderer/src/providers/AiProvider/OpenAIProvider.ts @@ -619,32 +619,36 @@ export default class OpenAIProvider extends BaseOpenAIProvider { } const reasoningTag = getAppropriateTag(model) async function* openAIChunkToTextDelta(stream: any): AsyncGenerator { - for await (const chunk of stream) { - if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) { - break - } - - if (chunk.choices && chunk.choices.length > 0) { - const delta = chunk.choices[0]?.delta - if ( - (delta?.reasoning_content && delta?.reasoning_content !== '\n') || - (delta?.reasoning && delta?.reasoning !== '\n') - ) { - yield { type: 'reasoning', textDelta: delta.reasoning_content || delta.reasoning } - } - if (delta?.content) { - yield { type: 'text-delta', textDelta: delta.content } - } - if (delta?.tool_calls && delta?.tool_calls.length > 0) { - yield { type: 'tool-calls', delta: delta } - } - - const finishReason = chunk?.choices[0]?.finish_reason - if (!isEmpty(finishReason)) { - yield { type: 'finish', finishReason, usage: chunk.usage, delta, chunk } + try { + for await (const chunk of stream) { + if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) { break } + + if (chunk.choices && chunk.choices.length > 0) { + const delta = chunk.choices[0]?.delta + if ( + (delta?.reasoning_content && delta?.reasoning_content !== '\n') || + (delta?.reasoning && delta?.reasoning !== '\n') + ) { + yield { type: 'reasoning', textDelta: delta.reasoning_content || delta.reasoning } + } + if (delta?.content) { + yield { type: 'text-delta', textDelta: delta.content } + } + if (delta?.tool_calls && delta?.tool_calls.length > 0) { + yield { type: 'tool-calls', delta: delta } + } + + const finishReason = chunk?.choices[0]?.finish_reason + if (!isEmpty(finishReason)) { + yield { type: 'finish', finishReason, usage: chunk.usage, delta, chunk } + } + } } + } catch (error) { + console.error('[openAIChunkToTextDelta] error', error) + throw error } } @@ -661,7 +665,7 @@ export default class OpenAIProvider extends BaseOpenAIProvider { }) // 3. 消费 processedStream,分发 onChunk - for await (const chunk of readableStreamAsyncIterable(processedStream)) { + for await (const chunk of readableStreamAsyncIterable(processedStream)) { const delta = chunk.type === 'finish' ? chunk.delta : chunk const rawChunk = chunk.type === 'finish' ? chunk.chunk : chunk switch (chunk.type) { diff --git a/src/renderer/src/utils/stream.ts b/src/renderer/src/utils/stream.ts index 77119db8f1..e1d34cc94e 100644 --- a/src/renderer/src/utils/stream.ts +++ b/src/renderer/src/utils/stream.ts @@ -1,12 +1,32 @@ -export function readableStreamAsyncIterable(stream: ReadableStream): AsyncIterable { +/** + * Most browsers don't yet have async iterable support for ReadableStream, + * and Node has a very different way of reading bytes from its "ReadableStream". + * + * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 + */ +export function readableStreamAsyncIterable(stream: any): AsyncIterableIterator { + if (stream[Symbol.asyncIterator]) return stream + const reader = stream.getReader() return { - [Symbol.asyncIterator](): AsyncIterator { - return { - async next(): Promise> { - return reader.read() as Promise> - } + async next() { + try { + const result = await reader.read() + if (result?.done) reader.releaseLock() // release lock when stream becomes closed + return result + } catch (e) { + reader.releaseLock() // release lock when stream becomes errored + throw e } + }, + async return() { + const cancelPromise = reader.cancel() + reader.releaseLock() + await cancelPromise + return { done: true, value: undefined } + }, + [Symbol.asyncIterator]() { + return this } } }