Add resource health management and enhance message parsing

Introduces a ResourceManager for health checking and retry logic in src/common/health.ts. Updates OneBot message parsing to support disabling URL fetching and multi-message parsing via new payload options. File, image, video, and ptt URL retrievals now use resource health management for improved reliability. Also refactors packet API to allow configurable timeout for FetchRkey.
#1220
This commit is contained in:
手瓜一十雪 2025-09-02 21:19:49 +08:00
parent c7af0384fb
commit 6200097f7c
6 changed files with 404 additions and 26 deletions

328
src/common/health.ts Normal file
View File

@ -0,0 +1,328 @@
export interface ResourceConfig<T extends any[], R> {
/** 资源获取函数 */
resourceFn: (...args: T) => Promise<R>;
/** 失败后禁用时间(毫秒),默认 30 秒 */
disableTime?: number;
/** 最大重试次数,默认 3 次 */
maxRetries?: number;
/** 主动测试间隔(毫秒),默认 60 秒 */
healthCheckInterval?: number;
/** 最大健康检查失败次数,超过后永久禁用,默认 5 次 */
maxHealthCheckFailures?: number;
/** 资源名称(用于日志) */
name?: string;
/** 测试参数(用于健康检查) */
testArgs?: T;
/** 健康检查函数,如果提供则优先使用此函数进行健康检查 */
healthCheckFn?: (...args: T) => Promise<boolean>;
}
interface ResourceState<T extends any[], R> {
config: ResourceConfig<T, R>;
isEnabled: boolean;
disableUntil: number;
currentRetries: number;
healthCheckFailureCount: number;
isPermanentlyDisabled: boolean;
lastError?: Error;
lastHealthCheckTime: number;
registrationKey: string;
}
export class ResourceManager {
private resources = new Map<string, ResourceState<any, any>>();
private destroyed = false;
private healthCheckTimer?: NodeJS.Timeout;
private readonly HEALTH_CHECK_TASK_INTERVAL = 5000; // 5秒执行一次健康检查任务
constructor() {
this.startHealthCheckTask();
}
/**
*
*/
async register<T extends any[], R>(
key: string,
config: ResourceConfig<T, R>,
...args: T
): Promise<R> {
if (this.destroyed) {
throw new Error('ResourceManager has been destroyed');
}
const registrationKey = this.generateRegistrationKey(key, config);
// 检查是否已经注册
if (this.resources.has(key)) {
const existingState = this.resources.get(key)!;
// 如果是相同的配置,直接调用
if (existingState.registrationKey === registrationKey) {
return this.callResource<T, R>(key, ...args);
}
// 配置不同,清理旧的并重新注册
this.unregister(key);
}
// 创建新的资源状态
const state: ResourceState<T, R> = {
config: {
disableTime: 30000,
maxRetries: 3,
healthCheckInterval: 60000,
maxHealthCheckFailures: 5,
name: key,
...config
},
isEnabled: true,
disableUntil: 0,
currentRetries: 0,
healthCheckFailureCount: 0,
isPermanentlyDisabled: false,
lastHealthCheckTime: 0,
registrationKey
};
this.resources.set(key, state);
// 注册即调用
return await this.callResource<T, R>(key, ...args);
}
/**
*
*/
async callResource<T extends any[], R>(key: string, ...args: T): Promise<R> {
const state = this.resources.get(key) as ResourceState<T, R> | undefined;
if (!state) {
throw new Error(`Resource ${key} not registered`);
}
if (state.isPermanentlyDisabled) {
throw new Error(`Resource ${key} is permanently disabled due to repeated health check failures`);
}
if (!this.isResourceAvailable(key)) {
const disableUntilDate = new Date(state.disableUntil).toISOString();
throw new Error(`Resource ${key} is currently disabled until ${disableUntilDate}`);
}
try {
const result = await state.config.resourceFn(...args);
this.onResourceSuccess(state);
return result;
} catch (error) {
this.onResourceFailure(state, error as Error);
throw error;
}
}
/**
*
*/
isResourceAvailable(key: string): boolean {
const state = this.resources.get(key);
if (!state) {
return false;
}
if (state.isPermanentlyDisabled || !state.isEnabled) {
return false;
}
return Date.now() >= state.disableUntil;
}
/**
*
*/
unregister(key: string): boolean {
return this.resources.delete(key);
}
/**
*
*/
destroy(): void {
if (this.destroyed) {
return;
}
this.stopHealthCheckTask();
this.resources.clear();
this.destroyed = true;
}
private generateRegistrationKey<T extends any[], R>(key: string, config: ResourceConfig<T, R>): string {
const configStr = JSON.stringify({
name: config.name,
disableTime: config.disableTime,
maxRetries: config.maxRetries,
healthCheckInterval: config.healthCheckInterval,
maxHealthCheckFailures: config.maxHealthCheckFailures,
functionStr: config.resourceFn.toString(),
healthCheckFnStr: config.healthCheckFn?.toString()
});
return `${key}_${this.simpleHash(configStr)}`;
}
private simpleHash(str: string): string {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32bit integer
}
return Math.abs(hash).toString(36);
}
private onResourceSuccess<T extends any[], R>(state: ResourceState<T, R>): void {
state.currentRetries = 0;
state.disableUntil = 0;
state.healthCheckFailureCount = 0;
state.lastError = undefined;
}
private onResourceFailure<T extends any[], R>(state: ResourceState<T, R>, error: Error): void {
state.currentRetries++;
state.lastError = error;
// 如果重试次数达到上限,禁用资源
if (state.currentRetries >= state.config.maxRetries!) {
state.disableUntil = Date.now() + state.config.disableTime!;
state.currentRetries = 0;
}
}
private startHealthCheckTask(): void {
if (this.healthCheckTimer) {
return;
}
this.healthCheckTimer = setInterval(() => {
this.runHealthCheckTask();
}, this.HEALTH_CHECK_TASK_INTERVAL);
}
private stopHealthCheckTask(): void {
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
this.healthCheckTimer = undefined;
}
}
private async runHealthCheckTask(): Promise<void> {
if (this.destroyed) {
return;
}
const now = Date.now();
for (const [key, state] of this.resources) {
// 跳过永久禁用或可用的资源
if (state.isPermanentlyDisabled || this.isResourceAvailable(key)) {
continue;
}
// 跳过还在禁用期内的资源
if (now < state.disableUntil) {
continue;
}
// 检查是否需要进行健康检查(根据间隔时间)
const lastHealthCheck = state.lastHealthCheckTime || 0;
const healthCheckInterval = state.config.healthCheckInterval!;
if (now - lastHealthCheck < healthCheckInterval) {
continue;
}
// 执行健康检查
await this.performHealthCheck(state);
}
}
private async performHealthCheck<T extends any[], R>(state: ResourceState<T, R>): Promise<void> {
state.lastHealthCheckTime = Date.now();
try {
let healthCheckResult: boolean;
// 如果有专门的健康检查函数,使用它
if (state.config.healthCheckFn) {
const testArgs = state.config.testArgs || [] as unknown as T;
healthCheckResult = await state.config.healthCheckFn(...testArgs);
} else {
// 否则使用原始函数进行检查
const testArgs = state.config.testArgs || [] as unknown as T;
await state.config.resourceFn(...testArgs);
healthCheckResult = true;
}
if (healthCheckResult) {
// 健康检查成功,重新启用
state.isEnabled = true;
state.disableUntil = 0;
state.currentRetries = 0;
state.healthCheckFailureCount = 0;
state.lastError = undefined;
} else {
throw new Error('Health check function returned false');
}
} catch (error) {
// 健康检查失败,增加失败计数
state.healthCheckFailureCount++;
state.lastError = error as Error;
// 检查是否达到最大健康检查失败次数
if (state.healthCheckFailureCount >= state.config.maxHealthCheckFailures!) {
// 永久禁用资源
state.isPermanentlyDisabled = true;
state.disableUntil = 0;
} else {
// 继续禁用一段时间
state.disableUntil = Date.now() + state.config.disableTime!;
}
}
}
}
// 创建全局实例
export const resourceManager = new ResourceManager();
// 便捷函数
export async function registerResource<T extends any[], R>(
key: string,
config: ResourceConfig<T, R>,
...args: T
): Promise<R> {
return resourceManager.register(key, config, ...args);
}
// 使用示例:
/*
await registerResource(
'api-with-health-check',
{
resourceFn: async (id: string) => {
const response = await fetch(`https://api.example.com/data/${id}`);
return response.json();
},
healthCheckFn: async (id: string) => {
try {
const response = await fetch(`https://api.example.com/health`);
return response.ok;
} catch {
return false;
}
},
testArgs: ['health-check-id'],
healthCheckInterval: 30000,
maxHealthCheckFailures: 3
},
'user123'
);
*/

View File

@ -68,7 +68,7 @@ export class NTQQPacketApi {
this.pkt = new PacketClientSession(this.core); this.pkt = new PacketClientSession(this.core);
await this.pkt.init(process.pid, table.recv, table.send); await this.pkt.init(process.pid, table.recv, table.send);
try { try {
await this.pkt.operation.FetchRkey(); await this.pkt.operation.FetchRkey(1500);
} catch (error) { } catch (error) {
this.logger.logError('测试Packet状态异常', error); this.logger.logError('测试Packet状态异常', error);
return false; return false;

View File

@ -38,9 +38,9 @@ export class PacketOperationContext {
const req = trans.SetGroupTodo.build(groupUin, msgSeq); const req = trans.SetGroupTodo.build(groupUin, msgSeq);
await this.context.client.sendOidbPacket(req, true); await this.context.client.sendOidbPacket(req, true);
} }
async FetchRkey() { async FetchRkey(timeout: number = 10000) {
const req = trans.FetchRkey.build(); const req = trans.FetchRkey.build();
const resp = await this.context.client.sendOidbPacket(req, true); const resp = await this.context.client.sendOidbPacket(req, true, timeout);
const res = trans.FetchRkey.parse(resp); const res = trans.FetchRkey.parse(resp);
return res.data.rkeyList; return res.data.rkeyList;
} }

View File

@ -14,7 +14,9 @@ const SchemaData = Type.Object({
user_id: Type.String(), user_id: Type.String(),
message_seq: Type.Optional(Type.String()), message_seq: Type.Optional(Type.String()),
count: Type.Number({ default: 20 }), count: Type.Number({ default: 20 }),
reverseOrder: Type.Boolean({ default: false }) reverseOrder: Type.Boolean({ default: false }),
disableGetUrl: Type.Boolean({ default: false }),
parseMultMsg: Type.Boolean({ default: true })
}); });
@ -41,7 +43,7 @@ export default class GetFriendMsgHistory extends OneBotAction<Payload, Response>
})); }));
//烘焙消息 //烘焙消息
const ob11MsgList = (await Promise.all( const ob11MsgList = (await Promise.all(
msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl)))
).filter(msg => msg !== undefined); ).filter(msg => msg !== undefined);
return { 'messages': ob11MsgList }; return { 'messages': ob11MsgList };
} }

View File

@ -14,7 +14,9 @@ const SchemaData = Type.Object({
group_id: Type.String(), group_id: Type.String(),
message_seq: Type.Optional(Type.String()), message_seq: Type.Optional(Type.String()),
count: Type.Number({ default: 20 }), count: Type.Number({ default: 20 }),
reverseOrder: Type.Boolean({ default: false }) reverseOrder: Type.Boolean({ default: false }),
disableGetUrl: Type.Boolean({ default: false }),
parseMultMsg: Type.Boolean({ default: true }),
}); });
@ -39,7 +41,7 @@ export default class GoCQHTTPGetGroupMsgHistory extends OneBotAction<Payload, Re
})); }));
//烘焙消息 //烘焙消息
const ob11MsgList = (await Promise.all( const ob11MsgList = (await Promise.all(
msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat))) msgList.map(msg => this.obContext.apis.MsgApi.parseMessage(msg, config.messagePostFormat, payload.parseMultMsg, payload.disableGetUrl)))
).filter(msg => msg !== undefined); ).filter(msg => msg !== undefined);
return { 'messages': ob11MsgList }; return { 'messages': ob11MsgList };
} }

View File

@ -46,6 +46,7 @@ import { GroupChange, GroupChangeInfo, GroupInvite, PushMsgBody } from '@/core/p
import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest'; import { OB11GroupRequestEvent } from '../event/request/OB11GroupRequest';
import { LRUCache } from '@/common/lru-cache'; import { LRUCache } from '@/common/lru-cache';
import { cleanTaskQueue } from '@/common/clean-task'; import { cleanTaskQueue } from '@/common/clean-task';
import { registerResource } from '@/common/health';
type RawToOb11Converters = { type RawToOb11Converters = {
[Key in keyof MessageElement as Key extends `${string}Element` ? Key : never]: ( [Key in keyof MessageElement as Key extends `${string}Element` ? Key : never]: (
@ -69,7 +70,8 @@ export type SendMessageContext = {
} }
export type RecvMessageContext = { export type RecvMessageContext = {
parseMultMsg: boolean parseMultMsg: boolean,
disableGetUrl: boolean
} }
function keyCanBeParsed(key: string, parser: RawToOb11Converters): key is keyof RawToOb11Converters { function keyCanBeParsed(key: string, parser: RawToOb11Converters): key is keyof RawToOb11Converters {
@ -109,7 +111,7 @@ export class OneBotMsgApi {
} }
}, },
picElement: async (element, msg, elementWrapper) => { picElement: async (element, msg, elementWrapper, { disableGetUrl }) => {
try { try {
const peer = { const peer = {
chatType: msg.chatType, chatType: msg.chatType,
@ -129,7 +131,7 @@ export class OneBotMsgApi {
summary: element.summary, summary: element.summary,
file: element.fileName, file: element.fileName,
sub_type: element.picSubType, sub_type: element.picSubType,
url: await this.core.apis.FileApi.getImageUrl(element), url: disableGetUrl ? (element.filePath ?? '') : await this.core.apis.FileApi.getImageUrl(element),
file_size: element.fileSize, file_size: element.fileSize,
}, },
}; };
@ -139,7 +141,7 @@ export class OneBotMsgApi {
} }
}, },
fileElement: async (element, msg, elementWrapper) => { fileElement: async (element, msg, elementWrapper, { disableGetUrl }) => {
const peer = { const peer = {
chatType: msg.chatType, chatType: msg.chatType,
peerUid: msg.peerUid, peerUid: msg.peerUid,
@ -147,10 +149,24 @@ export class OneBotMsgApi {
}; };
FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileUuid); FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileUuid);
FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileName); FileNapCatOneBotUUID.encode(peer, msg.msgId, elementWrapper.elementId, element.fileUuid, element.fileName);
if (this.core.apis.PacketApi.packetStatus) { if (this.core.apis.PacketApi.packetStatus && !disableGetUrl) {
let url; let url;
try { try {
url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500) //url = await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500)
url = await registerResource(
'file-url-get',
{
resourceFn: async () => {
return await this.core.apis.FileApi.getFileUrl(msg.chatType, msg.peerUid, element.fileUuid, element.file10MMd5, 1500);
},
healthCheckFn: async () => {
return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false);
},
testArgs: [],
healthCheckInterval: 30000,
maxHealthCheckFailures: 3
},
);
} catch (error) { } catch (error) {
url = ''; url = '';
} }
@ -345,7 +361,7 @@ export class OneBotMsgApi {
return null; return null;
}, },
videoElement: async (element, msg, elementWrapper) => { videoElement: async (element, msg, elementWrapper, { disableGetUrl }) => {
const peer = { const peer = {
chatType: msg.chatType, chatType: msg.chatType,
peerUid: msg.peerUid, peerUid: msg.peerUid,
@ -390,10 +406,24 @@ export class OneBotMsgApi {
} }
//开始兜底 //开始兜底
if (!videoDownUrl) { if (!videoDownUrl && !disableGetUrl) {
if (this.core.apis.PacketApi.packetStatus) { if (this.core.apis.PacketApi.packetStatus) {
try { try {
videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500); //videoDownUrl = await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500);
videoDownUrl = await registerResource(
'video-url-get',
{
resourceFn: async () => {
return await this.core.apis.FileApi.getVideoUrlPacket(msg.peerUid, element.fileUuid, 1500);
},
healthCheckFn: async () => {
return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false);
},
testArgs: [],
healthCheckInterval: 30000,
maxHealthCheckFailures: 3
},
);
} catch (e) { } catch (e) {
this.core.context.logger.logError('获取视频url失败', (e as Error).stack); this.core.context.logger.logError('获取视频url失败', (e as Error).stack);
videoDownUrl = element.filePath; videoDownUrl = element.filePath;
@ -424,7 +454,21 @@ export class OneBotMsgApi {
let pttUrl = ''; let pttUrl = '';
if (this.core.apis.PacketApi.packetStatus) { if (this.core.apis.PacketApi.packetStatus) {
try { try {
pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500); pttUrl = await registerResource(
'ptt-url-get',
{
resourceFn: async () => {
return await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500);
},
healthCheckFn: async () => {
return await this.core.apis.PacketApi.pkt.operation.FetchRkey().then(() => true).catch(() => false);
},
testArgs: [],
healthCheckInterval: 30000,
maxHealthCheckFailures: 3
},
);
//pttUrl = await this.core.apis.FileApi.getPttUrl(msg.peerUid, element.fileUuid, 1500);
} catch (e) { } catch (e) {
this.core.context.logger.logError('获取语音url失败', (e as Error).stack); this.core.context.logger.logError('获取语音url失败', (e as Error).stack);
pttUrl = element.filePath; pttUrl = element.filePath;
@ -908,17 +952,19 @@ export class OneBotMsgApi {
async parseMessage( async parseMessage(
msg: RawMessage, msg: RawMessage,
messagePostFormat: string, messagePostFormat: string,
parseMultMsg: boolean = true parseMultMsg: boolean = true,
disableGetUrl: boolean = false
) { ) {
if (messagePostFormat === 'string') { if (messagePostFormat === 'string') {
return (await this.parseMessageV2(msg, parseMultMsg))?.stringMsg; return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.stringMsg;
} }
return (await this.parseMessageV2(msg, parseMultMsg))?.arrayMsg; return (await this.parseMessageV2(msg, parseMultMsg, disableGetUrl))?.arrayMsg;
} }
async parseMessageV2( async parseMessageV2(
msg: RawMessage, msg: RawMessage,
parseMultMsg: boolean = true parseMultMsg: boolean = true,
disableGetUrl: boolean = false
) { ) {
if (msg.senderUin == '0' || msg.senderUin == '') return; if (msg.senderUin == '0' || msg.senderUin == '') return;
if (msg.peerUin == '0' || msg.peerUin == '') return; if (msg.peerUin == '0' || msg.peerUin == '') return;
@ -939,7 +985,7 @@ export class OneBotMsgApi {
return undefined; return undefined;
} }
const validSegments = await this.parseMessageSegments(msg, parseMultMsg); const validSegments = await this.parseMessageSegments(msg, parseMultMsg, disableGetUrl);
resMsg.message = validSegments; resMsg.message = validSegments;
resMsg.raw_message = validSegments.map(msg => encodeCQCode(msg)).join('').trim(); resMsg.raw_message = validSegments.map(msg => encodeCQCode(msg)).join('').trim();
@ -1010,7 +1056,7 @@ export class OneBotMsgApi {
} }
} }
private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean): Promise<OB11MessageData[]> { private async parseMessageSegments(msg: RawMessage, parseMultMsg: boolean, disableGetUrl: boolean = false): Promise<OB11MessageData[]> {
const msgSegments = await Promise.allSettled(msg.elements.map( const msgSegments = await Promise.allSettled(msg.elements.map(
async (element) => { async (element) => {
for (const key in element) { for (const key in element) {
@ -1025,7 +1071,7 @@ export class OneBotMsgApi {
element[key], element[key],
msg, msg,
element, element,
{ parseMultMsg } { parseMultMsg, disableGetUrl }
); );
if (key === 'faceElement' && !parsedElement) { if (key === 'faceElement' && !parsedElement) {
return null; return null;
@ -1179,12 +1225,12 @@ export class OneBotMsgApi {
let url = ''; let url = '';
if (mixElement?.picElement && rawMessage) { if (mixElement?.picElement && rawMessage) {
const tempData = const tempData =
await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageImage | undefined; await this.obContext.apis.MsgApi.rawToOb11Converters.picElement?.(mixElement?.picElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageImage | undefined;
url = tempData?.data.url ?? ''; url = tempData?.data.url ?? '';
} }
if (mixElement?.videoElement && rawMessage) { if (mixElement?.videoElement && rawMessage) {
const tempData = const tempData =
await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false }) as OB11MessageVideo | undefined; await this.obContext.apis.MsgApi.rawToOb11Converters.videoElement?.(mixElement?.videoElement, rawMessage, mixElement, { parseMultMsg: false, disableGetUrl: false }) as OB11MessageVideo | undefined;
url = tempData?.data.url ?? ''; url = tempData?.data.url ?? '';
} }
return url !== '' ? url : await this.core.apis.FileApi.downloadMedia(msgId, peer.chatType, peer.peerUid, elementId, '', ''); return url !== '' ? url : await this.core.apis.FileApi.downloadMedia(msgId, peer.chatType, peer.peerUid, elementId, '', '');