From 42d50014a1f05ea6b8441fa11afcf02a13a8099a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=89=8B=E7=93=9C=E4=B8=80=E5=8D=81=E9=9B=AA?= Date: Sat, 20 Sep 2025 15:55:37 +0800 Subject: [PATCH] Refactor event handling to use async/await across adapters Updated all network adapters' onEvent methods to be asynchronous and return Promises, ensuring consistent async event emission and handling. Adjusted related methods and event emission logic to properly await asynchronous operations, improving reliability for streaming, plugin, HTTP, and WebSocket event flows. Also improved error handling and messaging in stream and WebSocket actions. --- .../action/stream/DownloadFileStream.ts | 4 +- .../action/stream/TestStreamDownload.ts | 9 +++-- src/onebot/network/adapter.ts | 2 +- src/onebot/network/http-client.ts | 2 +- src/onebot/network/http-server-sse.ts | 15 +++++++- src/onebot/network/http-server.ts | 11 ++++-- src/onebot/network/index.ts | 12 +++--- src/onebot/network/plugin-manger.ts | 12 ++++-- src/onebot/network/plugin.ts | 2 +- src/onebot/network/websocket-client.ts | 29 +++++++++------ src/onebot/network/websocket-server.ts | 37 +++++++++++++------ 11 files changed, 89 insertions(+), 46 deletions(-) diff --git a/src/onebot/action/stream/DownloadFileStream.ts b/src/onebot/action/stream/DownloadFileStream.ts index 10db2991..2c0095ef 100644 --- a/src/onebot/action/stream/DownloadFileStream.ts +++ b/src/onebot/action/stream/DownloadFileStream.ts @@ -87,7 +87,7 @@ export class DownloadFileStream extends OneBotAction; @@ -17,13 +17,16 @@ export class TestDownloadStream extends OneBotAction ${i + 1}`, data_type: 'data_chunk' }); await new Promise(resolve => setTimeout(resolve, 100)); } + if( _payload.error ){ + throw new Error('This is a test error'); + } return { type: StreamStatus.Response, data_type: 'data_complete', - data: '流传输完成' + data: 'Stream transmission complete' }; } } diff --git a/src/onebot/network/adapter.ts b/src/onebot/network/adapter.ts index 6cb3def3..173df954 100644 --- a/src/onebot/network/adapter.ts +++ b/src/onebot/network/adapter.ts @@ -23,7 +23,7 @@ export abstract class IOB11NetworkAdapter { this.logger = core.context.logger; } - abstract onEvent(event: T): void; + abstract onEvent(event: T): Promise; abstract open(): void | Promise; diff --git a/src/onebot/network/http-client.ts b/src/onebot/network/http-client.ts index 91911227..45d4298d 100644 --- a/src/onebot/network/http-client.ts +++ b/src/onebot/network/http-client.ts @@ -16,7 +16,7 @@ export class OB11HttpClientAdapter extends IOB11NetworkAdapter super(name, config, core, obContext, actions); } - onEvent(event: T) { + async onEvent(event: T) { this.emitEventAsync(event).catch(e => this.logger.logError('[OneBot] [Http Client] 新消息事件HTTP上报返回快速操作失败', e)); } diff --git a/src/onebot/network/http-server-sse.ts b/src/onebot/network/http-server-sse.ts index da3181fc..2368b99d 100644 --- a/src/onebot/network/http-server-sse.ts +++ b/src/onebot/network/http-server-sse.ts @@ -23,11 +23,22 @@ export class OB11HttpSSEServerAdapter extends OB11HttpServerAdapter { req.on('close', () => { this.sseClients = this.sseClients.filter((client) => client !== res); }); + } - override onEvent(event: T) { + override async onEvent(event: T) { + let promises: Promise[] = []; this.sseClients.forEach((res) => { - res.write(`data: ${JSON.stringify(event)}\n\n`); + promises.push(new Promise((resolve, reject) => { + res.write(`data: ${JSON.stringify(event)}\n\n`, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + })); }); + await Promise.allSettled(promises); } } diff --git a/src/onebot/network/http-server.ts b/src/onebot/network/http-server.ts index e6a49323..75f9c96a 100644 --- a/src/onebot/network/http-server.ts +++ b/src/onebot/network/http-server.ts @@ -20,7 +20,7 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter } // eslint-disable-next-line @typescript-eslint/no-unused-vars - override onEvent(_event: T) { + override async onEvent(_event: T) { // http server is passive, no need to emit event } @@ -126,9 +126,14 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter try { const result = await action.handle(payload, this.name, this.config, { send: request_sse ? async (data: object) => { - this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent); + await this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent); } : async (data: object) => { - res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n"); + let newPromise = new Promise((resolve, _reject) => { + res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n", () => { + resolve(); + }); + }); + return newPromise; } }, real_echo); if (useStream) { diff --git a/src/onebot/network/index.ts b/src/onebot/network/index.ts index 13dcf325..0faff7da 100644 --- a/src/onebot/network/index.ts +++ b/src/onebot/network/index.ts @@ -20,9 +20,9 @@ export class OB11NetworkManager { } async emitEvent(event: OB11EmitEventContent) { - return Promise.all(Array.from(this.adapters.values()).map(adapter => { + return Promise.all(Array.from(this.adapters.values()).map(async adapter => { if (adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } @@ -32,19 +32,19 @@ export class OB11NetworkManager { } async emitEventByName(names: string[], event: OB11EmitEventContent) { - return Promise.all(names.map(name => { + return Promise.all(names.map(async name => { const adapter = this.adapters.get(name); if (adapter && adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } async emitEventByNames(map: Map) { - return Promise.all(Array.from(map.entries()).map(([name, event]) => { + return Promise.all(Array.from(map.entries()).map(async ([name, event]) => { const adapter = this.adapters.get(name); if (adapter && adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } diff --git a/src/onebot/network/plugin-manger.ts b/src/onebot/network/plugin-manger.ts index 72e9ea77..fcd29723 100644 --- a/src/onebot/network/plugin-manger.ts +++ b/src/onebot/network/plugin-manger.ts @@ -251,14 +251,20 @@ export class OB11PluginMangerAdapter extends IOB11NetworkAdapter { this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`); } - onEvent(event: T) { + async onEvent(event: T) { if (!this.isEnable) { return; } // 遍历所有已加载的插件,调用它们的事件处理方法 - for (const [, plugin] of this.loadedPlugins) { - this.callPluginEventHandler(plugin, event); + try { + await Promise.allSettled( + Array.from(this.loadedPlugins.values()).map((plugin) => + this.callPluginEventHandler(plugin, event) + ) + ); + } catch (error) { + this.logger.logError('[Plugin Adapter] Error handling event:', error); } } diff --git a/src/onebot/network/plugin.ts b/src/onebot/network/plugin.ts index 2d04cd58..c805edb8 100644 --- a/src/onebot/network/plugin.ts +++ b/src/onebot/network/plugin.ts @@ -251,7 +251,7 @@ export class OB11PluginAdapter extends IOB11NetworkAdapter { this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`); } - onEvent(event: T) { + async onEvent(event: T) { if (!this.isEnable) { return; } diff --git a/src/onebot/network/websocket-client.ts b/src/onebot/network/websocket-client.ts index 34b83f2f..9d9a750a 100644 --- a/src/onebot/network/websocket-client.ts +++ b/src/onebot/network/websocket-client.ts @@ -19,7 +19,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(event: T) { + async onEvent(event: T) { if (this.connection && this.connection.readyState === WebSocket.OPEN) { this.connection.send(JSON.stringify(event)); } @@ -62,10 +62,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(data: T) { - if (this.connection && this.connection.readyState === WebSocket.OPEN) { - this.connection.send(JSON.stringify(data)); - } + private async checkStateAndReply(data: T) { + return new Promise((resolve, reject) => { + if (this.connection && this.connection.readyState === WebSocket.OPEN) { + this.connection.send(JSON.stringify(data)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); } private async tryConnect() { @@ -92,7 +97,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter { try { - this.connectEvent(this.core); + this.connectEvent(this.core).catch(e => this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e); } @@ -123,9 +128,9 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT)); + await this.checkStateAndReply(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Client] 发送生命周期失败', e); } @@ -140,7 +145,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); + await this.checkStateAndReply(OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); return; } receiveData.params = (receiveData?.params) ? receiveData.params : {};// 兼容类型验证 @@ -148,15 +153,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo)); + await this.checkStateAndReply(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo)); return; } const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, { send: async (data: object) => { - this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }); + await this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }); } }); - this.checkStateAndReply({ ...retdata }); + await this.checkStateAndReply({ ...retdata }); } async reload(newConfig: WebsocketClientConfig) { const wasEnabled = this.isEnable; diff --git a/src/onebot/network/websocket-server.ts b/src/onebot/network/websocket-server.ts index b5bb9e9e..7fc6d2b4 100644 --- a/src/onebot/network/websocket-server.ts +++ b/src/onebot/network/websocket-server.ts @@ -83,17 +83,25 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient); + this.checkStateAndReply(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient).catch(e => this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e); } } - onEvent(event: T) { + async onEvent(event: T) { this.wsClientsMutex.runExclusive(async () => { - this.wsClientWithEvent.forEach((wsClient) => { - wsClient.send(JSON.stringify(event)); + let promises = this.wsClientWithEvent.map((wsClient) => { + return new Promise((resolve, reject) => { + if (wsClient.readyState === WebSocket.OPEN) { + wsClient.send(JSON.stringify(event)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); }); + await Promise.allSettled(promises); }); } @@ -160,10 +168,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(data: T, wsClient: WebSocket) { - if (wsClient.readyState === WebSocket.OPEN) { - wsClient.send(JSON.stringify(data)); - } + private async checkStateAndReply(data: T, wsClient: WebSocket) { + return await new Promise((resolve, reject) => { + if (wsClient.readyState === WebSocket.OPEN) { + wsClient.send(JSON.stringify(data)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); } private async handleMessage(wsClient: WebSocket, message: RawData) { @@ -175,7 +188,7 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient); + await this.checkStateAndReply(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient); return; } receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证 不然类型校验爆炸 @@ -183,15 +196,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient); + await this.checkStateAndReply(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient); return; } const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, { send: async (data: object) => { - this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient); + await this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient); } }); - this.checkStateAndReply({ ...retdata }, wsClient); + await this.checkStateAndReply({ ...retdata }, wsClient); } async reload(newConfig: WebsocketServerConfig) {