diff --git a/src/onebot/action/stream/DownloadFileStream.ts b/src/onebot/action/stream/DownloadFileStream.ts index 10db2991..2c0095ef 100644 --- a/src/onebot/action/stream/DownloadFileStream.ts +++ b/src/onebot/action/stream/DownloadFileStream.ts @@ -87,7 +87,7 @@ export class DownloadFileStream extends OneBotAction; @@ -17,13 +17,16 @@ export class TestDownloadStream extends OneBotAction ${i + 1}`, data_type: 'data_chunk' }); await new Promise(resolve => setTimeout(resolve, 100)); } + if( _payload.error ){ + throw new Error('This is a test error'); + } return { type: StreamStatus.Response, data_type: 'data_complete', - data: '流传输完成' + data: 'Stream transmission complete' }; } } diff --git a/src/onebot/network/adapter.ts b/src/onebot/network/adapter.ts index 6cb3def3..173df954 100644 --- a/src/onebot/network/adapter.ts +++ b/src/onebot/network/adapter.ts @@ -23,7 +23,7 @@ export abstract class IOB11NetworkAdapter { this.logger = core.context.logger; } - abstract onEvent(event: T): void; + abstract onEvent(event: T): Promise; abstract open(): void | Promise; diff --git a/src/onebot/network/http-client.ts b/src/onebot/network/http-client.ts index 91911227..45d4298d 100644 --- a/src/onebot/network/http-client.ts +++ b/src/onebot/network/http-client.ts @@ -16,7 +16,7 @@ export class OB11HttpClientAdapter extends IOB11NetworkAdapter super(name, config, core, obContext, actions); } - onEvent(event: T) { + async onEvent(event: T) { this.emitEventAsync(event).catch(e => this.logger.logError('[OneBot] [Http Client] 新消息事件HTTP上报返回快速操作失败', e)); } diff --git a/src/onebot/network/http-server-sse.ts b/src/onebot/network/http-server-sse.ts index da3181fc..2368b99d 100644 --- a/src/onebot/network/http-server-sse.ts +++ b/src/onebot/network/http-server-sse.ts @@ -23,11 +23,22 @@ export class OB11HttpSSEServerAdapter extends OB11HttpServerAdapter { req.on('close', () => { this.sseClients = this.sseClients.filter((client) => client !== res); }); + } - override onEvent(event: T) { + override async onEvent(event: T) { + let promises: Promise[] = []; this.sseClients.forEach((res) => { - res.write(`data: ${JSON.stringify(event)}\n\n`); + promises.push(new Promise((resolve, reject) => { + res.write(`data: ${JSON.stringify(event)}\n\n`, (err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + })); }); + await Promise.allSettled(promises); } } diff --git a/src/onebot/network/http-server.ts b/src/onebot/network/http-server.ts index e6a49323..75f9c96a 100644 --- a/src/onebot/network/http-server.ts +++ b/src/onebot/network/http-server.ts @@ -20,7 +20,7 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter } // eslint-disable-next-line @typescript-eslint/no-unused-vars - override onEvent(_event: T) { + override async onEvent(_event: T) { // http server is passive, no need to emit event } @@ -126,9 +126,14 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter try { const result = await action.handle(payload, this.name, this.config, { send: request_sse ? async (data: object) => { - this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent); + await this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent); } : async (data: object) => { - res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n"); + let newPromise = new Promise((resolve, _reject) => { + res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n", () => { + resolve(); + }); + }); + return newPromise; } }, real_echo); if (useStream) { diff --git a/src/onebot/network/index.ts b/src/onebot/network/index.ts index 13dcf325..0faff7da 100644 --- a/src/onebot/network/index.ts +++ b/src/onebot/network/index.ts @@ -20,9 +20,9 @@ export class OB11NetworkManager { } async emitEvent(event: OB11EmitEventContent) { - return Promise.all(Array.from(this.adapters.values()).map(adapter => { + return Promise.all(Array.from(this.adapters.values()).map(async adapter => { if (adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } @@ -32,19 +32,19 @@ export class OB11NetworkManager { } async emitEventByName(names: string[], event: OB11EmitEventContent) { - return Promise.all(names.map(name => { + return Promise.all(names.map(async name => { const adapter = this.adapters.get(name); if (adapter && adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } async emitEventByNames(map: Map) { - return Promise.all(Array.from(map.entries()).map(([name, event]) => { + return Promise.all(Array.from(map.entries()).map(async ([name, event]) => { const adapter = this.adapters.get(name); if (adapter && adapter.isEnable) { - return adapter.onEvent(event); + return await adapter.onEvent(event); } })); } diff --git a/src/onebot/network/plugin-manger.ts b/src/onebot/network/plugin-manger.ts index 72e9ea77..fcd29723 100644 --- a/src/onebot/network/plugin-manger.ts +++ b/src/onebot/network/plugin-manger.ts @@ -251,14 +251,20 @@ export class OB11PluginMangerAdapter extends IOB11NetworkAdapter { this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`); } - onEvent(event: T) { + async onEvent(event: T) { if (!this.isEnable) { return; } // 遍历所有已加载的插件,调用它们的事件处理方法 - for (const [, plugin] of this.loadedPlugins) { - this.callPluginEventHandler(plugin, event); + try { + await Promise.allSettled( + Array.from(this.loadedPlugins.values()).map((plugin) => + this.callPluginEventHandler(plugin, event) + ) + ); + } catch (error) { + this.logger.logError('[Plugin Adapter] Error handling event:', error); } } diff --git a/src/onebot/network/plugin.ts b/src/onebot/network/plugin.ts index 2d04cd58..c805edb8 100644 --- a/src/onebot/network/plugin.ts +++ b/src/onebot/network/plugin.ts @@ -251,7 +251,7 @@ export class OB11PluginAdapter extends IOB11NetworkAdapter { this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`); } - onEvent(event: T) { + async onEvent(event: T) { if (!this.isEnable) { return; } diff --git a/src/onebot/network/websocket-client.ts b/src/onebot/network/websocket-client.ts index 34b83f2f..9d9a750a 100644 --- a/src/onebot/network/websocket-client.ts +++ b/src/onebot/network/websocket-client.ts @@ -19,7 +19,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(event: T) { + async onEvent(event: T) { if (this.connection && this.connection.readyState === WebSocket.OPEN) { this.connection.send(JSON.stringify(event)); } @@ -62,10 +62,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(data: T) { - if (this.connection && this.connection.readyState === WebSocket.OPEN) { - this.connection.send(JSON.stringify(data)); - } + private async checkStateAndReply(data: T) { + return new Promise((resolve, reject) => { + if (this.connection && this.connection.readyState === WebSocket.OPEN) { + this.connection.send(JSON.stringify(data)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); } private async tryConnect() { @@ -92,7 +97,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter { try { - this.connectEvent(this.core); + this.connectEvent(this.core).catch(e => this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e); } @@ -123,9 +128,9 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT)); + await this.checkStateAndReply(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Client] 发送生命周期失败', e); } @@ -140,7 +145,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); + await this.checkStateAndReply(OB11Response.error('json解析失败,请检查数据格式', 1400, echo)); return; } receiveData.params = (receiveData?.params) ? receiveData.params : {};// 兼容类型验证 @@ -148,15 +153,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo)); + await this.checkStateAndReply(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo)); return; } const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, { send: async (data: object) => { - this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }); + await this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }); } }); - this.checkStateAndReply({ ...retdata }); + await this.checkStateAndReply({ ...retdata }); } async reload(newConfig: WebsocketClientConfig) { const wasEnabled = this.isEnable; diff --git a/src/onebot/network/websocket-server.ts b/src/onebot/network/websocket-server.ts index b5bb9e9e..7fc6d2b4 100644 --- a/src/onebot/network/websocket-server.ts +++ b/src/onebot/network/websocket-server.ts @@ -83,17 +83,25 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient); + this.checkStateAndReply(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient).catch(e => this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e)); } catch (e) { this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e); } } - onEvent(event: T) { + async onEvent(event: T) { this.wsClientsMutex.runExclusive(async () => { - this.wsClientWithEvent.forEach((wsClient) => { - wsClient.send(JSON.stringify(event)); + let promises = this.wsClientWithEvent.map((wsClient) => { + return new Promise((resolve, reject) => { + if (wsClient.readyState === WebSocket.OPEN) { + wsClient.send(JSON.stringify(event)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); }); + await Promise.allSettled(promises); }); } @@ -160,10 +168,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(data: T, wsClient: WebSocket) { - if (wsClient.readyState === WebSocket.OPEN) { - wsClient.send(JSON.stringify(data)); - } + private async checkStateAndReply(data: T, wsClient: WebSocket) { + return await new Promise((resolve, reject) => { + if (wsClient.readyState === WebSocket.OPEN) { + wsClient.send(JSON.stringify(data)); + resolve(); + } else { + reject(new Error('WebSocket is not open')); + } + }); } private async handleMessage(wsClient: WebSocket, message: RawData) { @@ -175,7 +188,7 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient); + await this.checkStateAndReply(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient); return; } receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证 不然类型校验爆炸 @@ -183,15 +196,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient); + await this.checkStateAndReply(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient); return; } const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, { send: async (data: object) => { - this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient); + await this.checkStateAndReply({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient); } }); - this.checkStateAndReply({ ...retdata }, wsClient); + await this.checkStateAndReply({ ...retdata }, wsClient); } async reload(newConfig: WebsocketServerConfig) {