From 6200097f7c0d09094115089ec91015163dae0f7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=89=8B=E7=93=9C=E4=B8=80=E5=8D=81=E9=9B=AA?= Date: Tue, 2 Sep 2025 21:19:49 +0800 Subject: [PATCH] Add resource health management and enhance message parsing Introduces a ResourceManager for health checking and retry logic in src/common/health.ts. Updates OneBot message parsing to support disabling URL fetching and multi-message parsing via new payload options. File, image, video, and ptt URL retrievals now use resource health management for improved reliability. Also refactors packet API to allow configurable timeout for FetchRkey. #1220 --- src/common/health.ts | 328 ++++++++++++++++++ src/core/apis/packet.ts | 2 +- src/core/packet/context/operationContext.ts | 4 +- .../action/go-cqhttp/GetFriendMsgHistory.ts | 6 +- .../action/go-cqhttp/GetGroupMsgHistory.ts | 6 +- src/onebot/api/msg.ts | 84 ++++- 6 files changed, 404 insertions(+), 26 deletions(-) create mode 100644 src/common/health.ts diff --git a/src/common/health.ts b/src/common/health.ts new file mode 100644 index 00000000..166abaeb --- /dev/null +++ b/src/common/health.ts @@ -0,0 +1,328 @@ +export interface ResourceConfig { + /** 资源获取函数 */ + resourceFn: (...args: T) => Promise; + /** 失败后禁用时间(毫秒),默认 30 秒 */ + disableTime?: number; + /** 最大重试次数,默认 3 次 */ + maxRetries?: number; + /** 主动测试间隔(毫秒),默认 60 秒 */ + healthCheckInterval?: number; + /** 最大健康检查失败次数,超过后永久禁用,默认 5 次 */ + maxHealthCheckFailures?: number; + /** 资源名称(用于日志) */ + name?: string; + /** 测试参数(用于健康检查) */ + testArgs?: T; + /** 健康检查函数,如果提供则优先使用此函数进行健康检查 */ + healthCheckFn?: (...args: T) => Promise; +} + +interface ResourceState { + config: ResourceConfig; + isEnabled: boolean; + disableUntil: number; + currentRetries: number; + healthCheckFailureCount: number; + isPermanentlyDisabled: boolean; + lastError?: Error; + lastHealthCheckTime: number; + registrationKey: string; +} + +export class ResourceManager { + private resources = new Map>(); + private destroyed = false; + private healthCheckTimer?: NodeJS.Timeout; + private readonly HEALTH_CHECK_TASK_INTERVAL = 5000; // 5秒执行一次健康检查任务 + + constructor() { + this.startHealthCheckTask(); + } + + /** + * 注册资源(注册即调用,重复注册只实际注册一次) + */ + async register( + key: string, + config: ResourceConfig, + ...args: T + ): Promise { + if (this.destroyed) { + throw new Error('ResourceManager has been destroyed'); + } + + const registrationKey = this.generateRegistrationKey(key, config); + + // 检查是否已经注册 + if (this.resources.has(key)) { + const existingState = this.resources.get(key)!; + + // 如果是相同的配置,直接调用 + if (existingState.registrationKey === registrationKey) { + return this.callResource(key, ...args); + } + + // 配置不同,清理旧的并重新注册 + this.unregister(key); + } + + // 创建新的资源状态 + const state: ResourceState = { + config: { + disableTime: 30000, + maxRetries: 3, + healthCheckInterval: 60000, + maxHealthCheckFailures: 5, + name: key, + ...config + }, + isEnabled: true, + disableUntil: 0, + currentRetries: 0, + healthCheckFailureCount: 0, + isPermanentlyDisabled: false, + lastHealthCheckTime: 0, + registrationKey + }; + + this.resources.set(key, state); + + // 注册即调用 + return await this.callResource(key, ...args); + } + + /** + * 调用资源 + */ + async callResource(key: string, ...args: T): Promise { + const state = this.resources.get(key) as ResourceState | undefined; + if (!state) { + throw new Error(`Resource ${key} not registered`); + } + + if (state.isPermanentlyDisabled) { + throw new Error(`Resource ${key} is permanently disabled due to repeated health check failures`); + } + + if (!this.isResourceAvailable(key)) { + const disableUntilDate = new Date(state.disableUntil).toISOString(); + throw new Error(`Resource ${key} is currently disabled until ${disableUntilDate}`); + } + + try { + const result = await state.config.resourceFn(...args); + this.onResourceSuccess(state); + return result; + } catch (error) { + this.onResourceFailure(state, error as Error); + throw error; + } + } + + /** + * 检查资源是否可用 + */ + isResourceAvailable(key: string): boolean { + const state = this.resources.get(key); + if (!state) { + return false; + } + + if (state.isPermanentlyDisabled || !state.isEnabled) { + return false; + } + + return Date.now() >= state.disableUntil; + } + + /** + * 注销资源 + */ + unregister(key: string): boolean { + return this.resources.delete(key); + } + + /** + * 销毁管理器,清理所有资源 + */ + destroy(): void { + if (this.destroyed) { + return; + } + + this.stopHealthCheckTask(); + this.resources.clear(); + this.destroyed = true; + } + + private generateRegistrationKey(key: string, config: ResourceConfig): string { + const configStr = JSON.stringify({ + name: config.name, + disableTime: config.disableTime, + maxRetries: config.maxRetries, + healthCheckInterval: config.healthCheckInterval, + maxHealthCheckFailures: config.maxHealthCheckFailures, + functionStr: config.resourceFn.toString(), + healthCheckFnStr: config.healthCheckFn?.toString() + }); + + return `${key}_${this.simpleHash(configStr)}`; + } + + private simpleHash(str: string): string { + let hash = 0; + for (let i = 0; i < str.length; i++) { + const char = str.charCodeAt(i); + hash = ((hash << 5) - hash) + char; + hash = hash & hash; // Convert to 32bit integer + } + return Math.abs(hash).toString(36); + } + + private onResourceSuccess(state: ResourceState): void { + state.currentRetries = 0; + state.disableUntil = 0; + state.healthCheckFailureCount = 0; + state.lastError = undefined; + } + + private onResourceFailure(state: ResourceState, error: Error): void { + state.currentRetries++; + state.lastError = error; + + // 如果重试次数达到上限,禁用资源 + if (state.currentRetries >= state.config.maxRetries!) { + state.disableUntil = Date.now() + state.config.disableTime!; + state.currentRetries = 0; + } + } + + private startHealthCheckTask(): void { + if (this.healthCheckTimer) { + return; + } + + this.healthCheckTimer = setInterval(() => { + this.runHealthCheckTask(); + }, this.HEALTH_CHECK_TASK_INTERVAL); + } + + private stopHealthCheckTask(): void { + if (this.healthCheckTimer) { + clearInterval(this.healthCheckTimer); + this.healthCheckTimer = undefined; + } + } + + private async runHealthCheckTask(): Promise { + if (this.destroyed) { + return; + } + + const now = Date.now(); + + for (const [key, state] of this.resources) { + // 跳过永久禁用或可用的资源 + if (state.isPermanentlyDisabled || this.isResourceAvailable(key)) { + continue; + } + + // 跳过还在禁用期内的资源 + if (now < state.disableUntil) { + continue; + } + + // 检查是否需要进行健康检查(根据间隔时间) + const lastHealthCheck = state.lastHealthCheckTime || 0; + const healthCheckInterval = state.config.healthCheckInterval!; + + if (now - lastHealthCheck < healthCheckInterval) { + continue; + } + + // 执行健康检查 + await this.performHealthCheck(state); + } + } + + private async performHealthCheck(state: ResourceState): Promise { + state.lastHealthCheckTime = Date.now(); + + try { + let healthCheckResult: boolean; + + // 如果有专门的健康检查函数,使用它 + if (state.config.healthCheckFn) { + const testArgs = state.config.testArgs || [] as unknown as T; + healthCheckResult = await state.config.healthCheckFn(...testArgs); + } else { + // 否则使用原始函数进行检查 + const testArgs = state.config.testArgs || [] as unknown as T; + await state.config.resourceFn(...testArgs); + healthCheckResult = true; + } + + if (healthCheckResult) { + // 健康检查成功,重新启用 + state.isEnabled = true; + state.disableUntil = 0; + state.currentRetries = 0; + state.healthCheckFailureCount = 0; + state.lastError = undefined; + } else { + throw new Error('Health check function returned false'); + } + } catch (error) { + // 健康检查失败,增加失败计数 + state.healthCheckFailureCount++; + state.lastError = error as Error; + + // 检查是否达到最大健康检查失败次数 + if (state.healthCheckFailureCount >= state.config.maxHealthCheckFailures!) { + // 永久禁用资源 + state.isPermanentlyDisabled = true; + state.disableUntil = 0; + } else { + // 继续禁用一段时间 + state.disableUntil = Date.now() + state.config.disableTime!; + } + } + } +} + +// 创建全局实例 +export const resourceManager = new ResourceManager(); + +// 便捷函数 +export async function registerResource( + key: string, + config: ResourceConfig, + ...args: T +): Promise { + return resourceManager.register(key, config, ...args); +} + +// 使用示例: +/* +await registerResource( + 'api-with-health-check', + { + resourceFn: async (id: string) => { + const response = await fetch(`https://api.example.com/data/${id}`); + return response.json(); + }, + healthCheckFn: async (id: string) => { + try { + const response = await fetch(`https://api.example.com/health`); + return response.ok; + } catch { + return false; + } + }, + testArgs: ['health-check-id'], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + 'user123' +); +*/ \ No newline at end of file diff --git a/src/core/apis/packet.ts b/src/core/apis/packet.ts index 99f6a6bd..1c9db417 100644 --- a/src/core/apis/packet.ts +++ b/src/core/apis/packet.ts @@ -68,7 +68,7 @@ export class NTQQPacketApi { this.pkt = new PacketClientSession(this.core); await this.pkt.init(process.pid, table.recv, table.send); try { - await this.pkt.operation.FetchRkey(); + await this.pkt.operation.FetchRkey(1500); } catch (error) { this.logger.logError('测试Packet状态异常', error); return false; diff --git a/src/core/packet/context/operationContext.ts b/src/core/packet/context/operationContext.ts index d957f0e5..e3ab7de6 100644 --- a/src/core/packet/context/operationContext.ts +++ b/src/core/packet/context/operationContext.ts @@ -38,9 +38,9 @@ export class PacketOperationContext { const req = trans.SetGroupTodo.build(groupUin, msgSeq); await this.context.client.sendOidbPacket(req, true); } - async FetchRkey() { + async FetchRkey(timeout: number = 10000) { const req = trans.FetchRkey.build(); - const resp = await this.context.client.sendOidbPacket(req, true); + const resp = await this.context.client.sendOidbPacket(req, true, timeout); const res = trans.FetchRkey.parse(resp); return res.data.rkeyList; } diff --git a/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts b/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts index 28e32c4e..f071b25c 100644 --- a/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts +++ b/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts @@ -14,7 +14,9 @@ const SchemaData = Type.Object({ user_id: Type.String(), message_seq: Type.Optional(Type.String()), count: Type.Number({ default: 20 }), - reverseOrder: Type.Boolean({ default: false }) + reverseOrder: Type.Boolean({ default: false }), + disableGetUrl: Type.Boolean({ default: false }), + parseMultMsg: Type.Boolean({ default: true }) }); @@ -41,7 +43,7 @@ export default class GetFriendMsgHistory extends OneBotAction })); //烘焙消息 const ob11MsgList = (await Promise.all( - msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) + msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl))) ).filter(msg => msg !== undefined); return { 'messages': ob11MsgList }; } diff --git a/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts b/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts index 5dcbedb1..1697e364 100644 --- a/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts +++ b/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts @@ -14,7 +14,9 @@ const SchemaData = Type.Object({ group_id: Type.String(), message_seq: Type.Optional(Type.String()), count: Type.Number({ default: 20 }), - reverseOrder: Type.Boolean({ default: false }) + reverseOrder: Type.Boolean({ default: false }), + disableGetUrl: Type.Boolean({ default: false }), + parseMultMsg: Type.Boolean({ default: true }), }); @@ -39,7 +41,7 @@ export default class GoCQHTTPGetGroupMsgHistory extends OneBotAction this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) + msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl))) ).filter(msg => msg !== undefined); return { 'messages': ob11MsgList }; } diff --git a/src/onebot/api/msg.ts b/src/onebot/api/msg.ts index 8a329f06..ef202e21 100644 --- a/src/onebot/api/msg.ts +++ b/src/onebot/api/msg.ts @@ -46,6 +46,7 @@ import { GroupChange, GroupChangeInfo, GroupInvite, PushMsgBody } from '@/core/p import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest'; import { LRUCache } from '@/common/lru-cache'; import { cleanTaskQueue } from '@/common/clean-task'; +import { registerResource } from '@/common/health'; type RawToOb11Converters = { [Key in keyof MessageElement as Key extends `${string}Element` ? Key : never]: ( @@ -69,7 +70,8 @@ export type SendMessageContext = { } export type RecvMessageContext = { - parseMultMsg: boolean + parseMultMsg: boolean, + disableGetUrl: boolean } function keyCanBeParsed(key: string, parser: RawToOb11Converters): key is keyof RawToOb11Converters { @@ -109,7 +111,7 @@ export class OneBotMsgApi { } }, - picElement: async (element, msg, elementWrapper) => { + picElement: async (element, msg, elementWrapper, { disableGetUrl }) => { try { const peer = { chatType: msg.chatType, @@ -129,7 +131,7 @@ export class OneBotMsgApi { summary: element.summary, file: element.fileName, sub_type: element.picSubType, - url: await this.core.apis.FileApi.getImageUrl(element), + url: disableGetUrl ? (element.filePath ?? '') : await this.core.apis.FileApi.getImageUrl(element), file_size: element.fileSize, }, }; @@ -139,7 +141,7 @@ export class OneBotMsgApi { } }, - fileElement: async (element, msg, elementWrapper) => { + fileElement: async (element, msg, elementWrapper, { disableGetUrl }) => { const peer = { chatType: msg.chatType, peerUid: msg.peerUid, @@ -147,10 +149,24 @@ export class OneBotMsgApi { }; FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileUuid); FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileName); - if (this.core.apis.PacketApi.packetStatus) { + if (this.core.apis.PacketApi.packetStatus && !disableGetUrl) { let url; try { - url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500) + //url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500) + url = await registerResource( + 'file-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); } catch (error) { url = ''; } @@ -345,7 +361,7 @@ export class OneBotMsgApi { return null; }, - videoElement: async (element, msg, elementWrapper) => { + videoElement: async (element, msg, elementWrapper, { disableGetUrl }) => { const peer = { chatType: msg.chatType, peerUid: msg.peerUid, @@ -390,10 +406,24 @@ export class OneBotMsgApi { } //开始兜底 - if (!videoDownUrl) { + if (!videoDownUrl && !disableGetUrl) { if (this.core.apis.PacketApi.packetStatus) { try { - videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + //videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + videoDownUrl = await registerResource( + 'video-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); } catch (e) { this.core.context.logger.logError('获取视频url失败', (e as Error).stack); videoDownUrl = element.filePath; @@ -424,7 +454,21 @@ export class OneBotMsgApi { let pttUrl = ''; if (this.core.apis.PacketApi.packetStatus) { try { - pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); + pttUrl = await registerResource( + 'ptt-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); + //pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); } catch (e) { this.core.context.logger.logError('获取语音url失败', (e as Error).stack); pttUrl = element.filePath; @@ -908,17 +952,19 @@ export class OneBotMsgApi { async parseMessage( msg: RawMessage, messagePostFormat: string, - parseMultMsg: boolean = true + parseMultMsg: boolean = true, + disableGetUrl: boolean = false ) { if (messagePostFormat === 'string') { - return (await this.parseMessageV2(msg, parseMultMsg))?.stringMsg; + return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.stringMsg; } - return (await this.parseMessageV2(msg, parseMultMsg))?.arrayMsg; + return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.arrayMsg; } async parseMessageV2( msg: RawMessage, - parseMultMsg: boolean = true + parseMultMsg: boolean = true, + disableGetUrl: boolean = false ) { if (msg.senderUin == '0' || msg.senderUin == '') return; if (msg.peerUin == '0' || msg.peerUin == '') return; @@ -939,7 +985,7 @@ export class OneBotMsgApi { return undefined; } - const validSegments = await this.parseMessageSegments(msg, parseMultMsg); + const validSegments = await this.parseMessageSegments(msg, parseMultMsg, disableGetUrl); resMsg.message = validSegments; resMsg.raw_message = validSegments.map(msg => encodeCQCode(msg)).join('').trim(); @@ -1010,7 +1056,7 @@ export class OneBotMsgApi { } } - private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean): Promise { + private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean, disableGetUrl: boolean = false): Promise { const msgSegments = await Promise.allSettled(msg.elements.map( async (element) => { for (const key in element) { @@ -1025,7 +1071,7 @@ export class OneBotMsgApi { element[key], msg, element, - { parseMultMsg } + { parseMultMsg, disableGetUrl } ); if (key === 'faceElement' && !parsedElement) { return null; @@ -1179,12 +1225,12 @@ export class OneBotMsgApi { let url = ''; if (mixElement?.picElement && rawMessage) { const tempData = - await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageImage | undefined; + await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageImage | undefined; url = tempData?.data.url ?? ''; } if (mixElement?.videoElement && rawMessage) { const tempData = - await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageVideo | undefined; + await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageVideo | undefined; url = tempData?.data.url ?? ''; } return url !== '' ? url : await this.core.apis.FileApi.downloadMedia(msgId, peer.chatType, peer.peerUid, elementId, '', '');