diff --git a/src/common/health.ts b/src/common/health.ts new file mode 100644 index 00000000..166abaeb --- /dev/null +++ b/src/common/health.ts @@ -0,0 +1,328 @@ +export interface ResourceConfig { + /** 资源获取函数 */ + resourceFn: (...args: T) => Promise; + /** 失败后禁用时间(毫秒),默认 30 秒 */ + disableTime?: number; + /** 最大重试次数,默认 3 次 */ + maxRetries?: number; + /** 主动测试间隔(毫秒),默认 60 秒 */ + healthCheckInterval?: number; + /** 最大健康检查失败次数,超过后永久禁用,默认 5 次 */ + maxHealthCheckFailures?: number; + /** 资源名称(用于日志) */ + name?: string; + /** 测试参数(用于健康检查) */ + testArgs?: T; + /** 健康检查函数,如果提供则优先使用此函数进行健康检查 */ + healthCheckFn?: (...args: T) => Promise; +} + +interface ResourceState { + config: ResourceConfig; + isEnabled: boolean; + disableUntil: number; + currentRetries: number; + healthCheckFailureCount: number; + isPermanentlyDisabled: boolean; + lastError?: Error; + lastHealthCheckTime: number; + registrationKey: string; +} + +export class ResourceManager { + private resources = new Map>(); + private destroyed = false; + private healthCheckTimer?: NodeJS.Timeout; + private readonly HEALTH_CHECK_TASK_INTERVAL = 5000; // 5秒执行一次健康检查任务 + + constructor() { + this.startHealthCheckTask(); + } + + /** + * 注册资源(注册即调用,重复注册只实际注册一次) + */ + async register( + key: string, + config: ResourceConfig, + ...args: T + ): Promise { + if (this.destroyed) { + throw new Error('ResourceManager has been destroyed'); + } + + const registrationKey = this.generateRegistrationKey(key, config); + + // 检查是否已经注册 + if (this.resources.has(key)) { + const existingState = this.resources.get(key)!; + + // 如果是相同的配置,直接调用 + if (existingState.registrationKey === registrationKey) { + return this.callResource(key, ...args); + } + + // 配置不同,清理旧的并重新注册 + this.unregister(key); + } + + // 创建新的资源状态 + const state: ResourceState = { + config: { + disableTime: 30000, + maxRetries: 3, + healthCheckInterval: 60000, + maxHealthCheckFailures: 5, + name: key, + ...config + }, + isEnabled: true, + disableUntil: 0, + currentRetries: 0, + healthCheckFailureCount: 0, + isPermanentlyDisabled: false, + lastHealthCheckTime: 0, + registrationKey + }; + + this.resources.set(key, state); + + // 注册即调用 + return await this.callResource(key, ...args); + } + + /** + * 调用资源 + */ + async callResource(key: string, ...args: T): Promise { + const state = this.resources.get(key) as ResourceState | undefined; + if (!state) { + throw new Error(`Resource ${key} not registered`); + } + + if (state.isPermanentlyDisabled) { + throw new Error(`Resource ${key} is permanently disabled due to repeated health check failures`); + } + + if (!this.isResourceAvailable(key)) { + const disableUntilDate = new Date(state.disableUntil).toISOString(); + throw new Error(`Resource ${key} is currently disabled until ${disableUntilDate}`); + } + + try { + const result = await state.config.resourceFn(...args); + this.onResourceSuccess(state); + return result; + } catch (error) { + this.onResourceFailure(state, error as Error); + throw error; + } + } + + /** + * 检查资源是否可用 + */ + isResourceAvailable(key: string): boolean { + const state = this.resources.get(key); + if (!state) { + return false; + } + + if (state.isPermanentlyDisabled || !state.isEnabled) { + return false; + } + + return Date.now() >= state.disableUntil; + } + + /** + * 注销资源 + */ + unregister(key: string): boolean { + return this.resources.delete(key); + } + + /** + * 销毁管理器,清理所有资源 + */ + destroy(): void { + if (this.destroyed) { + return; + } + + this.stopHealthCheckTask(); + this.resources.clear(); + this.destroyed = true; + } + + private generateRegistrationKey(key: string, config: ResourceConfig): string { + const configStr = JSON.stringify({ + name: config.name, + disableTime: config.disableTime, + maxRetries: config.maxRetries, + healthCheckInterval: config.healthCheckInterval, + maxHealthCheckFailures: config.maxHealthCheckFailures, + functionStr: config.resourceFn.toString(), + healthCheckFnStr: config.healthCheckFn?.toString() + }); + + return `${key}_${this.simpleHash(configStr)}`; + } + + private simpleHash(str: string): string { + let hash = 0; + for (let i = 0; i < str.length; i++) { + const char = str.charCodeAt(i); + hash = ((hash << 5) - hash) + char; + hash = hash & hash; // Convert to 32bit integer + } + return Math.abs(hash).toString(36); + } + + private onResourceSuccess(state: ResourceState): void { + state.currentRetries = 0; + state.disableUntil = 0; + state.healthCheckFailureCount = 0; + state.lastError = undefined; + } + + private onResourceFailure(state: ResourceState, error: Error): void { + state.currentRetries++; + state.lastError = error; + + // 如果重试次数达到上限,禁用资源 + if (state.currentRetries >= state.config.maxRetries!) { + state.disableUntil = Date.now() + state.config.disableTime!; + state.currentRetries = 0; + } + } + + private startHealthCheckTask(): void { + if (this.healthCheckTimer) { + return; + } + + this.healthCheckTimer = setInterval(() => { + this.runHealthCheckTask(); + }, this.HEALTH_CHECK_TASK_INTERVAL); + } + + private stopHealthCheckTask(): void { + if (this.healthCheckTimer) { + clearInterval(this.healthCheckTimer); + this.healthCheckTimer = undefined; + } + } + + private async runHealthCheckTask(): Promise { + if (this.destroyed) { + return; + } + + const now = Date.now(); + + for (const [key, state] of this.resources) { + // 跳过永久禁用或可用的资源 + if (state.isPermanentlyDisabled || this.isResourceAvailable(key)) { + continue; + } + + // 跳过还在禁用期内的资源 + if (now < state.disableUntil) { + continue; + } + + // 检查是否需要进行健康检查(根据间隔时间) + const lastHealthCheck = state.lastHealthCheckTime || 0; + const healthCheckInterval = state.config.healthCheckInterval!; + + if (now - lastHealthCheck < healthCheckInterval) { + continue; + } + + // 执行健康检查 + await this.performHealthCheck(state); + } + } + + private async performHealthCheck(state: ResourceState): Promise { + state.lastHealthCheckTime = Date.now(); + + try { + let healthCheckResult: boolean; + + // 如果有专门的健康检查函数,使用它 + if (state.config.healthCheckFn) { + const testArgs = state.config.testArgs || [] as unknown as T; + healthCheckResult = await state.config.healthCheckFn(...testArgs); + } else { + // 否则使用原始函数进行检查 + const testArgs = state.config.testArgs || [] as unknown as T; + await state.config.resourceFn(...testArgs); + healthCheckResult = true; + } + + if (healthCheckResult) { + // 健康检查成功,重新启用 + state.isEnabled = true; + state.disableUntil = 0; + state.currentRetries = 0; + state.healthCheckFailureCount = 0; + state.lastError = undefined; + } else { + throw new Error('Health check function returned false'); + } + } catch (error) { + // 健康检查失败,增加失败计数 + state.healthCheckFailureCount++; + state.lastError = error as Error; + + // 检查是否达到最大健康检查失败次数 + if (state.healthCheckFailureCount >= state.config.maxHealthCheckFailures!) { + // 永久禁用资源 + state.isPermanentlyDisabled = true; + state.disableUntil = 0; + } else { + // 继续禁用一段时间 + state.disableUntil = Date.now() + state.config.disableTime!; + } + } + } +} + +// 创建全局实例 +export const resourceManager = new ResourceManager(); + +// 便捷函数 +export async function registerResource( + key: string, + config: ResourceConfig, + ...args: T +): Promise { + return resourceManager.register(key, config, ...args); +} + +// 使用示例: +/* +await registerResource( + 'api-with-health-check', + { + resourceFn: async (id: string) => { + const response = await fetch(`https://api.example.com/data/${id}`); + return response.json(); + }, + healthCheckFn: async (id: string) => { + try { + const response = await fetch(`https://api.example.com/health`); + return response.ok; + } catch { + return false; + } + }, + testArgs: ['health-check-id'], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + 'user123' +); +*/ \ No newline at end of file diff --git a/src/core/apis/packet.ts b/src/core/apis/packet.ts index 99f6a6bd..1c9db417 100644 --- a/src/core/apis/packet.ts +++ b/src/core/apis/packet.ts @@ -68,7 +68,7 @@ export class NTQQPacketApi { this.pkt = new PacketClientSession(this.core); await this.pkt.init(process.pid, table.recv, table.send); try { - await this.pkt.operation.FetchRkey(); + await this.pkt.operation.FetchRkey(1500); } catch (error) { this.logger.logError('测试Packet状态异常', error); return false; diff --git a/src/core/packet/context/operationContext.ts b/src/core/packet/context/operationContext.ts index d957f0e5..e3ab7de6 100644 --- a/src/core/packet/context/operationContext.ts +++ b/src/core/packet/context/operationContext.ts @@ -38,9 +38,9 @@ export class PacketOperationContext { const req = trans.SetGroupTodo.build(groupUin, msgSeq); await this.context.client.sendOidbPacket(req, true); } - async FetchRkey() { + async FetchRkey(timeout: number = 10000) { const req = trans.FetchRkey.build(); - const resp = await this.context.client.sendOidbPacket(req, true); + const resp = await this.context.client.sendOidbPacket(req, true, timeout); const res = trans.FetchRkey.parse(resp); return res.data.rkeyList; } diff --git a/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts b/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts index 28e32c4e..f071b25c 100644 --- a/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts +++ b/src/onebot/action/go-cqhttp/GetFriendMsgHistory.ts @@ -14,7 +14,9 @@ const SchemaData = Type.Object({ user_id: Type.String(), message_seq: Type.Optional(Type.String()), count: Type.Number({ default: 20 }), - reverseOrder: Type.Boolean({ default: false }) + reverseOrder: Type.Boolean({ default: false }), + disableGetUrl: Type.Boolean({ default: false }), + parseMultMsg: Type.Boolean({ default: true }) }); @@ -41,7 +43,7 @@ export default class GetFriendMsgHistory extends OneBotAction })); //烘焙消息 const ob11MsgList = (await Promise.all( - msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) + msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl))) ).filter(msg => msg !== undefined); return { 'messages': ob11MsgList }; } diff --git a/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts b/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts index 5dcbedb1..1697e364 100644 --- a/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts +++ b/src/onebot/action/go-cqhttp/GetGroupMsgHistory.ts @@ -14,7 +14,9 @@ const SchemaData = Type.Object({ group_id: Type.String(), message_seq: Type.Optional(Type.String()), count: Type.Number({ default: 20 }), - reverseOrder: Type.Boolean({ default: false }) + reverseOrder: Type.Boolean({ default: false }), + disableGetUrl: Type.Boolean({ default: false }), + parseMultMsg: Type.Boolean({ default: true }), }); @@ -39,7 +41,7 @@ export default class GoCQHTTPGetGroupMsgHistory extends OneBotAction this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) + msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl))) ).filter(msg => msg !== undefined); return { 'messages': ob11MsgList }; } diff --git a/src/onebot/api/msg.ts b/src/onebot/api/msg.ts index 8a329f06..ef202e21 100644 --- a/src/onebot/api/msg.ts +++ b/src/onebot/api/msg.ts @@ -46,6 +46,7 @@ import { GroupChange, GroupChangeInfo, GroupInvite, PushMsgBody } from '@/core/p import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest'; import { LRUCache } from '@/common/lru-cache'; import { cleanTaskQueue } from '@/common/clean-task'; +import { registerResource } from '@/common/health'; type RawToOb11Converters = { [Key in keyof MessageElement as Key extends `${string}Element` ? Key : never]: ( @@ -69,7 +70,8 @@ export type SendMessageContext = { } export type RecvMessageContext = { - parseMultMsg: boolean + parseMultMsg: boolean, + disableGetUrl: boolean } function keyCanBeParsed(key: string, parser: RawToOb11Converters): key is keyof RawToOb11Converters { @@ -109,7 +111,7 @@ export class OneBotMsgApi { } }, - picElement: async (element, msg, elementWrapper) => { + picElement: async (element, msg, elementWrapper, { disableGetUrl }) => { try { const peer = { chatType: msg.chatType, @@ -129,7 +131,7 @@ export class OneBotMsgApi { summary: element.summary, file: element.fileName, sub_type: element.picSubType, - url: await this.core.apis.FileApi.getImageUrl(element), + url: disableGetUrl ? (element.filePath ?? '') : await this.core.apis.FileApi.getImageUrl(element), file_size: element.fileSize, }, }; @@ -139,7 +141,7 @@ export class OneBotMsgApi { } }, - fileElement: async (element, msg, elementWrapper) => { + fileElement: async (element, msg, elementWrapper, { disableGetUrl }) => { const peer = { chatType: msg.chatType, peerUid: msg.peerUid, @@ -147,10 +149,24 @@ export class OneBotMsgApi { }; FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileUuid); FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileName); - if (this.core.apis.PacketApi.packetStatus) { + if (this.core.apis.PacketApi.packetStatus && !disableGetUrl) { let url; try { - url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500) + //url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500) + url = await registerResource( + 'file-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); } catch (error) { url = ''; } @@ -345,7 +361,7 @@ export class OneBotMsgApi { return null; }, - videoElement: async (element, msg, elementWrapper) => { + videoElement: async (element, msg, elementWrapper, { disableGetUrl }) => { const peer = { chatType: msg.chatType, peerUid: msg.peerUid, @@ -390,10 +406,24 @@ export class OneBotMsgApi { } //开始兜底 - if (!videoDownUrl) { + if (!videoDownUrl && !disableGetUrl) { if (this.core.apis.PacketApi.packetStatus) { try { - videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + //videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + videoDownUrl = await registerResource( + 'video-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); } catch (e) { this.core.context.logger.logError('获取视频url失败', (e as Error).stack); videoDownUrl = element.filePath; @@ -424,7 +454,21 @@ export class OneBotMsgApi { let pttUrl = ''; if (this.core.apis.PacketApi.packetStatus) { try { - pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); + pttUrl = await registerResource( + 'ptt-url-get', + { + resourceFn: async () => { + return await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); + }, + healthCheckFn: async () => { + return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false); + }, + testArgs: [], + healthCheckInterval: 30000, + maxHealthCheckFailures: 3 + }, + ); + //pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); } catch (e) { this.core.context.logger.logError('获取语音url失败', (e as Error).stack); pttUrl = element.filePath; @@ -908,17 +952,19 @@ export class OneBotMsgApi { async parseMessage( msg: RawMessage, messagePostFormat: string, - parseMultMsg: boolean = true + parseMultMsg: boolean = true, + disableGetUrl: boolean = false ) { if (messagePostFormat === 'string') { - return (await this.parseMessageV2(msg, parseMultMsg))?.stringMsg; + return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.stringMsg; } - return (await this.parseMessageV2(msg, parseMultMsg))?.arrayMsg; + return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.arrayMsg; } async parseMessageV2( msg: RawMessage, - parseMultMsg: boolean = true + parseMultMsg: boolean = true, + disableGetUrl: boolean = false ) { if (msg.senderUin == '0' || msg.senderUin == '') return; if (msg.peerUin == '0' || msg.peerUin == '') return; @@ -939,7 +985,7 @@ export class OneBotMsgApi { return undefined; } - const validSegments = await this.parseMessageSegments(msg, parseMultMsg); + const validSegments = await this.parseMessageSegments(msg, parseMultMsg, disableGetUrl); resMsg.message = validSegments; resMsg.raw_message = validSegments.map(msg => encodeCQCode(msg)).join('').trim(); @@ -1010,7 +1056,7 @@ export class OneBotMsgApi { } } - private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean): Promise { + private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean, disableGetUrl: boolean = false): Promise { const msgSegments = await Promise.allSettled(msg.elements.map( async (element) => { for (const key in element) { @@ -1025,7 +1071,7 @@ export class OneBotMsgApi { element[key], msg, element, - { parseMultMsg } + { parseMultMsg, disableGetUrl } ); if (key === 'faceElement' && !parsedElement) { return null; @@ -1179,12 +1225,12 @@ export class OneBotMsgApi { let url = ''; if (mixElement?.picElement && rawMessage) { const tempData = - await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageImage | undefined; + await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageImage | undefined; url = tempData?.data.url ?? ''; } if (mixElement?.videoElement && rawMessage) { const tempData = - await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageVideo | undefined; + await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageVideo | undefined; url = tempData?.data.url ?? ''; } return url !== '' ? url : await this.core.apis.FileApi.downloadMedia(msgId, peer.chatType, peer.peerUid, elementId, '', '');