refactor(middleware): add extractReasoningMiddleware for enhanced text processing (#5637)

This commit is contained in:
SuYao 2025-05-07 22:09:12 +08:00 committed by GitHub
parent 787ffc3390
commit 12f4500c47
6 changed files with 355 additions and 342 deletions

View File

@ -0,0 +1,118 @@
// Modified from https://github.com/vercel/ai/blob/845080d80b8538bb9c7e527d2213acb5f33ac9c2/packages/ai/core/middleware/extract-reasoning-middleware.ts
import { getPotentialStartIndex } from '../utils/getPotentialIndex'
export interface ExtractReasoningMiddlewareOptions {
openingTag: string
closingTag: string
separator?: string
enableReasoning?: boolean
}
function escapeRegExp(str: string) {
return str.replace(/[.*+?^${}()|[\\]\\]/g, '\\$&')
}
// 支持泛型 T默认 T = { type: string; textDelta: string }
export function extractReasoningMiddleware<T extends { type: string } = { type: string; textDelta: string }>({
openingTag,
closingTag,
separator = '\n',
enableReasoning
}: ExtractReasoningMiddlewareOptions) {
const openingTagEscaped = escapeRegExp(openingTag)
const closingTagEscaped = escapeRegExp(closingTag)
return {
wrapGenerate: async ({ doGenerate }: { doGenerate: () => Promise<{ text: string } & Record<string, any>> }) => {
const { text: rawText, ...rest } = await doGenerate()
if (rawText == null) {
return { text: rawText, ...rest }
}
const text = rawText
const regexp = new RegExp(`${openingTagEscaped}(.*?)${closingTagEscaped}`, 'gs')
const matches = Array.from(text.matchAll(regexp))
if (!matches.length) {
return { text, ...rest }
}
const reasoning = matches.map((match: RegExpMatchArray) => match[1]).join(separator)
let textWithoutReasoning = text
for (let i = matches.length - 1; i >= 0; i--) {
const match = matches[i] as RegExpMatchArray
const beforeMatch = textWithoutReasoning.slice(0, match.index as number)
const afterMatch = textWithoutReasoning.slice((match.index as number) + match[0].length)
textWithoutReasoning =
beforeMatch + (beforeMatch.length > 0 && afterMatch.length > 0 ? separator : '') + afterMatch
}
return { ...rest, text: textWithoutReasoning, reasoning }
},
wrapStream: async ({
doStream
}: {
doStream: () => Promise<{ stream: ReadableStream<T> } & Record<string, any>>
}) => {
const { stream, ...rest } = await doStream()
if (!enableReasoning) {
return {
stream,
...rest
}
}
let isFirstReasoning = true
let isFirstText = true
let afterSwitch = false
let isReasoning = false
let buffer = ''
return {
stream: stream.pipeThrough(
new TransformStream<T, T>({
transform: (chunk, controller) => {
if (chunk.type !== 'text-delta') {
controller.enqueue(chunk)
return
}
// @ts-expect-error: textDelta 只在 text-delta/reasoning chunk 上
buffer += chunk.textDelta
function publish(text: string) {
if (text.length > 0) {
const prefix = afterSwitch && (isReasoning ? !isFirstReasoning : !isFirstText) ? separator : ''
controller.enqueue({
...chunk,
type: isReasoning ? 'reasoning' : 'text-delta',
textDelta: prefix + text
})
afterSwitch = false
if (isReasoning) {
isFirstReasoning = false
} else {
isFirstText = false
}
}
}
while (true) {
const nextTag = isReasoning ? closingTag : openingTag
const startIndex = getPotentialStartIndex(buffer, nextTag)
if (startIndex == null) {
publish(buffer)
buffer = ''
break
}
publish(buffer.slice(0, startIndex))
const foundFullMatch = startIndex + nextTag.length <= buffer.length
if (foundFullMatch) {
buffer = buffer.slice(startIndex + nextTag.length)
isReasoning = !isReasoning
afterSwitch = true
} else {
buffer = buffer.slice(startIndex)
break
}
}
}
})
),
...rest
}
}
}
}

View File

@ -15,7 +15,6 @@ import type { Message } from '@renderer/types/newMessage'
import { delay, isJSON, parseJSON } from '@renderer/utils'
import { addAbortController, removeAbortController } from '@renderer/utils/abortController'
import { formatApiHost } from '@renderer/utils/api'
import { glmZeroPreviewProcessor, thinkTagProcessor, ThoughtProcessor } from '@renderer/utils/formats'
import { getMainTextContent } from '@renderer/utils/messageUtils/find'
import { isEmpty } from 'lodash'
import type OpenAI from 'openai'
@ -230,51 +229,4 @@ export default abstract class BaseProvider {
cleanup
}
}
/**
* Finds the appropriate thinking processor for a given text chunk and model.
* Returns the processor if found, otherwise undefined.
*/
protected findThinkingProcessor(chunkText: string, model: Model | undefined): ThoughtProcessor | undefined {
if (!model) return undefined
const processors: ThoughtProcessor[] = [thinkTagProcessor, glmZeroPreviewProcessor]
return processors.find((p) => p.canProcess(chunkText, model))
}
/**
* Returns a function closure that handles incremental reasoning text for a specific stream.
* The returned function processes a chunk, emits THINKING_DELTA for new reasoning,
* and returns the associated content part.
*/
protected handleThinkingTags() {
let memoizedReasoning = ''
// Returns a function that handles a single chunk potentially containing thinking tags
return (chunkText: string, processor: ThoughtProcessor, onChunk: (chunk: any) => void): string => {
// Returns the processed content part
const { reasoning, content } = processor.process(chunkText)
let deltaReasoning = ''
if (reasoning && reasoning.trim()) {
// Check if the new reasoning starts with the previous one
if (reasoning.startsWith(memoizedReasoning)) {
deltaReasoning = reasoning.substring(memoizedReasoning.length)
} else {
// If not a continuation, send the whole new reasoning
deltaReasoning = reasoning
// console.warn("Thinking content did not start with previous memoized version. Sending full content.")
}
memoizedReasoning = reasoning // Update memoized state
} else {
// If no reasoning, reset memoized state? Let's reset.
memoizedReasoning = ''
}
if (deltaReasoning) {
onChunk({ type: ChunkType.THINKING_DELTA, text: deltaReasoning })
}
return content // Return the content part for TEXT_DELTA emission
}
}
}

View File

@ -18,6 +18,7 @@ import {
} from '@renderer/config/models'
import { getStoreSetting } from '@renderer/hooks/useSettings'
import i18n from '@renderer/i18n'
import { extractReasoningMiddleware } from '@renderer/middlewares/extractReasoningMiddleware'
import { getAssistantSettings, getDefaultModel, getTopNamingModel } from '@renderer/services/AssistantService'
import { EVENT_NAMES } from '@renderer/services/EventService'
import {
@ -51,6 +52,7 @@ import {
import { mcpToolCallResponseToOpenAICompatibleMessage, parseAndCallTools } from '@renderer/utils/mcp-tools'
import { findFileBlocks, findImageBlocks, getMainTextContent } from '@renderer/utils/messageUtils/find'
import { buildSystemPrompt } from '@renderer/utils/prompt'
import { asyncGeneratorToReadableStream, readableStreamAsyncIterable } from '@renderer/utils/stream'
import { isEmpty, takeRight } from 'lodash'
import OpenAI, { AzureOpenAI } from 'openai'
import {
@ -62,6 +64,11 @@ import {
import { CompletionsParams } from '.'
import OpenAIProvider from './OpenAIProvider'
// 1. 定义联合类型
export type OpenAIStreamChunk =
| { type: 'reasoning' | 'text-delta'; textDelta: string }
| { type: 'finish'; finishReason: any; usage: any; delta: any; chunk: any }
export default class OpenAICompatibleProvider extends OpenAIProvider {
constructor(provider: Provider) {
super(provider)
@ -331,6 +338,10 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
const { contextCount, maxTokens, streamOutput } = getAssistantSettings(assistant)
const isEnabledWebSearch = assistant.enableWebSearch || !!assistant.webSearchProviderId
messages = addImageFileToContents(messages)
const enableReasoning =
((isSupportedThinkingTokenModel(model) || isSupportedReasoningEffortModel(model)) &&
assistant.settings?.reasoning_effort !== undefined) ||
(isReasoningModel(model) && (!isSupportedThinkingTokenModel(model) || !isSupportedReasoningEffortModel(model)))
let systemMessage = { role: 'system', content: assistant.prompt || '' }
if (isSupportedReasoningEffortOpenAIModel(model)) {
systemMessage = {
@ -357,52 +368,7 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
return streamOutput
}
let hasReasoningContent = false
let lastChunk = ''
const isReasoningJustDone = (
delta: OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta & {
reasoning_content?: string
reasoning?: string
thinking?: string
}
) => {
if (!delta?.content) {
return false
}
// 检查当前chunk和上一个chunk的组合是否形成###Response标记
const combinedChunks = lastChunk + delta.content
lastChunk = delta.content
// 检测思考结束
if (combinedChunks.includes('###Response') || delta.content === '</think>') {
return true
}
// 如果有reasoning_content或reasoning说明是在思考中
if (delta?.reasoning_content || delta?.reasoning || delta?.thinking) {
hasReasoningContent = true
}
// 如果之前有reasoning_content或reasoning现在有普通content说明思考结束
return !!(hasReasoningContent && delta.content)
}
let time_first_token_millsec = 0
let time_first_token_millsec_delta = 0
let time_first_content_millsec = 0
const start_time_millsec = new Date().getTime()
console.log(
`completions start_time_millsec ${new Date(start_time_millsec).toLocaleString(undefined, {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: 'numeric',
minute: 'numeric',
second: 'numeric',
fractionalSecondDigits: 3
})}`
)
const lastUserMessage = _messages.findLast((m) => m.role === 'user')
const { abortController, cleanup, signalPromise } = this.createAbortController(lastUserMessage?.id, true)
const { signal } = abortController
@ -436,9 +402,7 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
} as ChatCompletionMessageParam)
toolResults.forEach((ts) => reqMessages.push(ts as ChatCompletionMessageParam))
console.debug('[tool] reqMessages before processing', model.id, reqMessages)
reqMessages = processReqMessages(model, reqMessages)
console.debug('[tool] reqMessages', model.id, reqMessages)
const newStream = await this.sdk.chat.completions
// @ts-ignore key is not typed
.create(
@ -499,193 +463,192 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
// let isThinkingInContent: ThoughtProcessor | undefined = undefined
// const processThinkingChunk = this.handleThinkingTags()
let isFirstChunk = true
let isFirstThinkingChunk = true
for await (const chunk of stream) {
if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) {
break
}
let time_first_token_millsec = 0
let time_first_token_millsec_delta = 0
let time_first_content_millsec = 0
let time_thinking_start = 0
const delta = chunk.choices[0]?.delta
const finishReason = chunk.choices[0]?.finish_reason
// --- Incremental onChunk calls ---
// 1. Reasoning Content
const reasoningContent = delta?.reasoning_content || delta?.reasoning
const currentTime = new Date().getTime() // Get current time for each chunk
if (time_first_token_millsec === 0 && isFirstThinkingChunk && reasoningContent) {
// 记录第一个token的时间
time_first_token_millsec = currentTime
// 记录第一个token的时间差
time_first_token_millsec_delta = currentTime - start_time_millsec
console.log(
`completions time_first_token_millsec ${new Date(currentTime).toLocaleString(undefined, {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: 'numeric',
minute: 'numeric',
second: 'numeric',
fractionalSecondDigits: 3
})}`
)
isFirstThinkingChunk = false
}
if (reasoningContent) {
thinkingContent += reasoningContent
hasReasoningContent = true // Keep track if reasoning occurred
// Calculate thinking time as time elapsed since start until this chunk
const thinking_time = currentTime - time_first_token_millsec
onChunk({ type: ChunkType.THINKING_DELTA, text: reasoningContent, thinking_millsec: thinking_time })
}
if (isReasoningJustDone(delta) && time_first_content_millsec === 0) {
time_first_content_millsec = currentTime
final_time_thinking_millsec_delta = time_first_content_millsec - time_first_token_millsec
onChunk({
type: ChunkType.THINKING_COMPLETE,
text: thinkingContent,
thinking_millsec: final_time_thinking_millsec_delta
})
thinkingContent = ''
isFirstThinkingChunk = true
hasReasoningContent = false
}
// 2. Text Content
if (delta?.content) {
if (isEnabledWebSearch) {
if (delta?.annotations) {
delta.content = convertLinks(delta.content || '', isFirstChunk)
} else if (assistant.model?.provider === 'openrouter') {
delta.content = convertLinksToOpenRouter(delta.content || '', isFirstChunk)
} else if (isZhipuModel(assistant.model)) {
delta.content = convertLinksToZhipu(delta.content || '', isFirstChunk)
} else if (isHunyuanSearchModel(assistant.model)) {
delta.content = convertLinksToHunyuan(
delta.content || '',
chunk.search_info.search_results || [],
isFirstChunk
)
}
}
// 说明前面没有思考内容
if (isFirstChunk && time_first_token_millsec === 0 && time_first_token_millsec_delta === 0) {
isFirstChunk = false
time_first_token_millsec = currentTime
time_first_token_millsec_delta = time_first_token_millsec - start_time_millsec
}
content += delta.content // Still accumulate for processToolUses
// isThinkingInContent = this.findThinkingProcessor(content, model)
// if (isThinkingInContent) {
// processThinkingChunk(content, isThinkingInContent, onChunk)
onChunk({ type: ChunkType.TEXT_DELTA, text: delta.content })
// } else {
// }
}
// console.log('delta?.finish_reason', delta?.finish_reason)
if (!isEmpty(finishReason) || delta?.annotations) {
onChunk({ type: ChunkType.TEXT_COMPLETE, text: content })
final_time_completion_millsec_delta = currentTime - start_time_millsec
console.log(
`completions final_time_completion_millsec ${new Date(currentTime).toLocaleString(undefined, {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: 'numeric',
minute: 'numeric',
second: 'numeric',
fractionalSecondDigits: 3
})}`
)
// 6. Usage (If provided per chunk) - Capture the last known usage
if (chunk.usage) {
// console.log('chunk.usage', chunk.usage)
lastUsage = chunk.usage // Update with the latest usage info
// Send incremental usage update if needed by UI (optional, keep if useful)
// onChunk({ type: 'block_in_progress', response: { usage: chunk.usage } })
// 1. 初始化中间件
const reasoningTags = [
{ openingTag: '<think>', closingTag: '</think>', separator: '\n' },
{ openingTag: '###Thinking', closingTag: '###Response', separator: '\n' }
]
const getAppropriateTag = (model: Model) => {
if (model.id.includes('qwen3')) return reasoningTags[0]
return reasoningTags[0]
}
const reasoningTag = getAppropriateTag(model)
async function* openAIChunkToTextDelta(stream: any): AsyncGenerator<OpenAIStreamChunk> {
for await (const chunk of stream) {
if (window.keyv.get(EVENT_NAMES.CHAT_COMPLETION_PAUSED)) {
break
}
// 3. Web Search
if (delta?.annotations) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: delta.annotations,
source: WebSearchSource.OPENAI_COMPATIBLE
}
} as LLMWebSearchCompleteChunk)
const delta = chunk.choices[0]?.delta
if (delta?.reasoning_content || delta?.reasoning) {
yield { type: 'reasoning', textDelta: delta.reasoning_content || delta.reasoning }
}
if (delta?.content) {
yield { type: 'text-delta', textDelta: delta.content }
}
if (assistant.model?.provider === 'perplexity') {
const citations = chunk.citations
if (citations) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: citations,
source: WebSearchSource.PERPLEXITY
}
} as LLMWebSearchCompleteChunk)
}
}
if (isEnabledWebSearch && isZhipuModel(model) && finishReason === 'stop' && chunk?.web_search) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: chunk.web_search,
source: WebSearchSource.ZHIPU
}
} as LLMWebSearchCompleteChunk)
}
if (isEnabledWebSearch && isHunyuanSearchModel(model) && chunk?.search_info?.search_results) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: chunk.search_info.search_results,
source: WebSearchSource.HUNYUAN
}
} as LLMWebSearchCompleteChunk)
const finishReason = chunk.choices[0]?.finish_reason
if (!isEmpty(finishReason)) {
yield { type: 'finish', finishReason, usage: chunk.usage, delta, chunk }
break
}
}
}
// --- End of Incremental onChunk calls ---
} // End of for await loop
// Call processToolUses AFTER the loop finishes processing the main stream content
// Note: parseAndCallTools inside processToolUses should handle its own onChunk for tool responses
await processToolUses(content, idx)
// Send the final block_complete chunk with accumulated data
onChunk({
type: ChunkType.BLOCK_COMPLETE,
response: {
// Use the enhanced usage object
usage: lastUsage,
metrics: {
// Get completion tokens from the last usage object if available
completion_tokens: lastUsage?.completion_tokens,
time_completion_millsec: final_time_completion_millsec_delta,
time_first_token_millsec: time_first_token_millsec_delta,
time_thinking_millsec: final_time_thinking_millsec_delta
}
}
// 2. 使用中间件
const { stream: processedStream } = await extractReasoningMiddleware<OpenAIStreamChunk>({
openingTag: reasoningTag?.openingTag,
closingTag: reasoningTag?.closingTag,
separator: reasoningTag?.separator,
enableReasoning
}).wrapStream({
doStream: async () => ({
stream: asyncGeneratorToReadableStream(openAIChunkToTextDelta(stream))
})
})
// FIXME: 临时方案,重置时间戳和思考内容
time_first_token_millsec = 0
time_first_content_millsec = 0
// 3. 消费 processedStream分发 onChunk
for await (const chunk of readableStreamAsyncIterable(processedStream)) {
const currentTime = new Date().getTime()
const delta = chunk.type === 'finish' ? chunk.delta : chunk
const rawChunk = chunk.type === 'finish' ? chunk.chunk : chunk
// OpenAI stream typically doesn't provide a final summary chunk easily.
// We are sending per-chunk usage if available.
switch (chunk.type) {
case 'reasoning': {
if (time_thinking_start === 0) {
time_thinking_start = currentTime
time_first_token_millsec = currentTime
time_first_token_millsec_delta = currentTime - start_time_millsec
}
thinkingContent += chunk.textDelta
const thinking_time = currentTime - time_thinking_start
onChunk({ type: ChunkType.THINKING_DELTA, text: chunk.textDelta, thinking_millsec: thinking_time })
break
}
case 'text-delta': {
let textDelta = chunk.textDelta
if (assistant.enableWebSearch && delta) {
const originalDelta = rawChunk?.choices?.[0]?.delta
if (originalDelta?.annotations) {
textDelta = convertLinks(textDelta, isFirstChunk)
} else if (assistant.model?.provider === 'openrouter') {
textDelta = convertLinksToOpenRouter(textDelta, isFirstChunk)
} else if (isZhipuModel(assistant.model)) {
textDelta = convertLinksToZhipu(textDelta, isFirstChunk)
} else if (isHunyuanSearchModel(assistant.model)) {
const searchResults = rawChunk?.search_info?.search_results || []
textDelta = convertLinksToHunyuan(textDelta, searchResults, isFirstChunk)
}
}
if (isFirstChunk) {
isFirstChunk = false
if (time_first_token_millsec === 0) {
time_first_token_millsec = currentTime
time_first_token_millsec_delta = currentTime - start_time_millsec
}
}
content += textDelta
if (time_thinking_start > 0 && time_first_content_millsec === 0) {
time_first_content_millsec = currentTime
final_time_thinking_millsec_delta = time_first_content_millsec - time_thinking_start
onChunk({
type: ChunkType.THINKING_COMPLETE,
text: thinkingContent,
thinking_millsec: final_time_thinking_millsec_delta
})
}
onChunk({ type: ChunkType.TEXT_DELTA, text: textDelta })
break
}
case 'finish': {
const finishReason = chunk.finishReason
const usage = chunk.usage
const originalFinishDelta = chunk.delta
const originalFinishRawChunk = chunk.chunk
if (!isEmpty(finishReason)) {
onChunk({ type: ChunkType.TEXT_COMPLETE, text: content })
final_time_completion_millsec_delta = currentTime - start_time_millsec
if (usage) {
lastUsage = usage
}
if (originalFinishDelta?.annotations) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: originalFinishDelta.annotations,
source: WebSearchSource.OPENAI
}
} as LLMWebSearchCompleteChunk)
}
if (assistant.model?.provider === 'perplexity') {
const citations = originalFinishRawChunk.citations
if (citations) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: citations,
source: WebSearchSource.PERPLEXITY
}
} as LLMWebSearchCompleteChunk)
}
}
if (
isEnabledWebSearch &&
isZhipuModel(model) &&
finishReason === 'stop' &&
originalFinishRawChunk?.web_search
) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: originalFinishRawChunk.web_search,
source: WebSearchSource.ZHIPU
}
} as LLMWebSearchCompleteChunk)
}
if (
isEnabledWebSearch &&
isHunyuanSearchModel(model) &&
originalFinishRawChunk?.search_info?.search_results
) {
onChunk({
type: ChunkType.LLM_WEB_SEARCH_COMPLETE,
llm_web_search: {
results: originalFinishRawChunk.search_info.search_results,
source: WebSearchSource.HUNYUAN
}
} as LLMWebSearchCompleteChunk)
}
}
await processToolUses(content, idx)
onChunk({
type: ChunkType.BLOCK_COMPLETE,
response: {
usage: lastUsage,
metrics: {
completion_tokens: lastUsage?.completion_tokens,
time_completion_millsec: final_time_completion_millsec_delta,
time_first_token_millsec: time_first_token_millsec_delta,
time_thinking_millsec: final_time_thinking_millsec_delta
}
}
})
break
}
}
}
}
console.debug('[completions] reqMessages before processing', model.id, reqMessages)
reqMessages = processReqMessages(model, reqMessages)
console.debug('[completions] reqMessages', model.id, reqMessages)
// 等待接口返回流
onChunk({ type: ChunkType.LLM_RESPONSE_CREATED })
const stream = await this.sdk.chat.completions
@ -835,7 +798,6 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
await this.checkIsCopilot()
console.debug('[summaries] reqMessages', model.id, [systemMessage, userMessage])
// @ts-ignore key is not typed
const response = await this.sdk.chat.completions.create({
model: model.id,
@ -877,10 +839,8 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
role: 'user',
content: userMessageContent
}
console.debug('[summaryForSearch] reqMessages', model.id, [systemMessage, userMessage])
const lastUserMessage = messages[messages.length - 1]
console.log('lastUserMessage?.id', lastUserMessage?.id)
const { abortController, cleanup } = this.createAbortController(lastUserMessage?.id)
const { signal } = abortController
@ -989,7 +949,6 @@ export default class OpenAICompatibleProvider extends OpenAIProvider {
try {
await this.checkIsCopilot()
console.debug('[checkModel] body', model.id, body)
if (!stream) {
const response = await this.sdk.chat.completions.create(body as ChatCompletionCreateParamsNonStreaming)
if (!response?.choices[0].message) {

View File

@ -1,4 +1,3 @@
import { Model } from '@renderer/types'
import type { Message } from '@renderer/types/newMessage'
import { findImageBlocks, getMainTextContent } from './messageUtils/find'
@ -99,75 +98,6 @@ export function removeSvgEmptyLines(text: string): string {
// return content
// }
export interface ThoughtProcessor {
canProcess: (content: string, model?: Model) => boolean
process: (content: string) => { reasoning: string; content: string }
}
export const glmZeroPreviewProcessor: ThoughtProcessor = {
canProcess: (content: string, model?: Model) => {
if (!model) return false
const modelId = model.id || ''
const modelName = model.name || ''
const isGLMZeroPreview =
modelId.toLowerCase().includes('glm-zero-preview') || modelName.toLowerCase().includes('glm-zero-preview')
return isGLMZeroPreview && content.includes('###Thinking')
},
process: (content: string) => {
const parts = content.split('###')
const thinkingMatch = parts.find((part) => part.trim().startsWith('Thinking'))
const responseMatch = parts.find((part) => part.trim().startsWith('Response'))
return {
reasoning: thinkingMatch ? thinkingMatch.replace('Thinking', '').trim() : '',
content: responseMatch ? responseMatch.replace('Response', '').trim() : ''
}
}
}
export const thinkTagProcessor: ThoughtProcessor = {
canProcess: (content: string, model?: Model) => {
if (!model) return false
return content.startsWith('<think>') || content.includes('</think>')
},
process: (content: string) => {
// 处理正常闭合的 think 标签
const thinkPattern = /^<think>(.*?)<\/think>/s
const matches = content.match(thinkPattern)
if (matches) {
return {
reasoning: matches[1].trim(),
content: content.replace(thinkPattern, '').trim()
}
}
// 处理只有结束标签的情况
if (content.includes('</think>') && !content.startsWith('<think>')) {
const parts = content.split('</think>')
return {
reasoning: parts[0].trim(),
content: parts.slice(1).join('</think>').trim()
}
}
// 处理只有开始标签的情况
if (content.startsWith('<think>')) {
return {
reasoning: content.slice(7).trim(), // 跳过 '<think>' 标签
content: ''
}
}
return {
reasoning: '',
content
}
}
}
export function withGenerateImage(message: Message): { content: string; images?: string[] } {
const originalContent = getMainTextContent(message)
const imagePattern = new RegExp(`!\\[[^\\]]*\\]\\((.*?)\\s*("(?:.*[^"])")?\\s*\\)`)

View File

@ -0,0 +1,29 @@
// Copied from https://github.com/vercel/ai/blob/main/packages/ai/core/util/get-potential-start-index.ts
/**
* Returns the index of the start of the searchedText in the text, or null if it
* is not found.
*/
export function getPotentialStartIndex(text: string, searchedText: string): number | null {
// Return null immediately if searchedText is empty.
if (searchedText.length === 0) {
return null
}
// Check if the searchedText exists as a direct substring of text.
const directIndex = text.indexOf(searchedText)
if (directIndex !== -1) {
return directIndex
}
// Otherwise, look for the largest suffix of "text" that matches
// a prefix of "searchedText". We go from the end of text inward.
for (let i = text.length - 1; i >= 0; i--) {
const suffix = text.substring(i)
if (searchedText.startsWith(suffix)) {
return i
}
}
return null
}

View File

@ -0,0 +1,25 @@
export function readableStreamAsyncIterable<T>(stream: ReadableStream<T>): AsyncIterable<T> {
const reader = stream.getReader()
return {
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
async next(): Promise<IteratorResult<T>> {
return reader.read() as Promise<IteratorResult<T>>
}
}
}
}
}
export function asyncGeneratorToReadableStream<T>(gen: AsyncGenerator<T>): ReadableStream<T> {
return new ReadableStream<T>({
async pull(controller) {
const { value, done } = await gen.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
}
})
}