mirror of
https://github.com/NapNeko/NapCatQQ.git
synced 2025-12-19 13:10:16 +08:00
Refactor event handling to use async/await across adapters
Updated all network adapters' onEvent methods to be asynchronous and return Promises, ensuring consistent async event emission and handling. Adjusted related methods and event emission logic to properly await asynchronous operations, improving reliability for streaming, plugin, HTTP, and WebSocket event flows. Also improved error handling and messaging in stream and WebSocket actions.
This commit is contained in:
parent
a36ae315b0
commit
42d50014a1
@ -87,7 +87,7 @@ export class DownloadFileStream extends OneBotAction<Payload, StreamPacket<Downl
|
||||
const totalSize = fileSize || stats.size;
|
||||
|
||||
// 发送文件信息
|
||||
req.send({
|
||||
await req.send({
|
||||
type: StreamStatus.Stream,
|
||||
data_type: 'file_info',
|
||||
file_name: fileName,
|
||||
@ -104,7 +104,7 @@ export class DownloadFileStream extends OneBotAction<Payload, StreamPacket<Downl
|
||||
const base64Chunk = chunk.toString('base64');
|
||||
bytesRead += chunk.length;
|
||||
|
||||
req.send({
|
||||
await req.send({
|
||||
type: StreamStatus.Stream,
|
||||
data_type: 'file_chunk',
|
||||
index: chunkIndex,
|
||||
|
||||
@ -5,7 +5,7 @@ import { NetworkAdapterConfig } from '@/onebot/config/config';
|
||||
import { StreamPacket, StreamStatus } from './StreamBasic';
|
||||
|
||||
const SchemaData = Type.Object({
|
||||
|
||||
error: Type.Optional(Type.Boolean({ default: false }))
|
||||
});
|
||||
|
||||
type Payload = Static<typeof SchemaData>;
|
||||
@ -17,13 +17,16 @@ export class TestDownloadStream extends OneBotAction<Payload, StreamPacket<{ dat
|
||||
|
||||
async _handle(_payload: Payload, _adaptername: string, _config: NetworkAdapterConfig, req: OneBotRequestToolkit) {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
req.send({ type: StreamStatus.Stream, data: `这是第 ${i + 1} 片流数据`, data_type: 'data_chunk' });
|
||||
await req.send({ type: StreamStatus.Stream, data: `Index-> ${i + 1}`, data_type: 'data_chunk' });
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
}
|
||||
if( _payload.error ){
|
||||
throw new Error('This is a test error');
|
||||
}
|
||||
return {
|
||||
type: StreamStatus.Response,
|
||||
data_type: 'data_complete',
|
||||
data: '流传输完成'
|
||||
data: 'Stream transmission complete'
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,7 +23,7 @@ export abstract class IOB11NetworkAdapter<CT extends NetworkAdapterConfig> {
|
||||
this.logger = core.context.logger;
|
||||
}
|
||||
|
||||
abstract onEvent<T extends OB11EmitEventContent>(event: T): void;
|
||||
abstract onEvent<T extends OB11EmitEventContent>(event: T): Promise<void>;
|
||||
|
||||
abstract open(): void | Promise<void>;
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@ export class OB11HttpClientAdapter extends IOB11NetworkAdapter<HttpClientConfig>
|
||||
super(name, config, core, obContext, actions);
|
||||
}
|
||||
|
||||
onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
this.emitEventAsync(event).catch(e => this.logger.logError('[OneBot] [Http Client] 新消息事件HTTP上报返回快速操作失败', e));
|
||||
}
|
||||
|
||||
|
||||
@ -23,11 +23,22 @@ export class OB11HttpSSEServerAdapter extends OB11HttpServerAdapter {
|
||||
req.on('close', () => {
|
||||
this.sseClients = this.sseClients.filter((client) => client !== res);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
override onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
override async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
let promises: Promise<void>[] = [];
|
||||
this.sseClients.forEach((res) => {
|
||||
res.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||
promises.push(new Promise<void>((resolve, reject) => {
|
||||
res.write(`data: ${JSON.stringify(event)}\n\n`, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
||||
await Promise.allSettled(promises);
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
|
||||
}
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
override onEvent<T extends OB11EmitEventContent>(_event: T) {
|
||||
override async onEvent<T extends OB11EmitEventContent>(_event: T) {
|
||||
// http server is passive, no need to emit event
|
||||
}
|
||||
|
||||
@ -126,9 +126,14 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
|
||||
try {
|
||||
const result = await action.handle(payload, this.name, this.config, {
|
||||
send: request_sse ? async (data: object) => {
|
||||
this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent);
|
||||
await this.onEvent({ ...OB11Response.ok(data, real_echo, true) } as unknown as OB11EmitEventContent);
|
||||
} : async (data: object) => {
|
||||
res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n");
|
||||
let newPromise = new Promise<void>((resolve, _reject) => {
|
||||
res.write(JSON.stringify({ ...OB11Response.ok(data, real_echo, true) }) + "\r\n\r\n", () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
return newPromise;
|
||||
}
|
||||
}, real_echo);
|
||||
if (useStream) {
|
||||
|
||||
@ -20,9 +20,9 @@ export class OB11NetworkManager {
|
||||
}
|
||||
|
||||
async emitEvent(event: OB11EmitEventContent) {
|
||||
return Promise.all(Array.from(this.adapters.values()).map(adapter => {
|
||||
return Promise.all(Array.from(this.adapters.values()).map(async adapter => {
|
||||
if (adapter.isEnable) {
|
||||
return adapter.onEvent(event);
|
||||
return await adapter.onEvent(event);
|
||||
}
|
||||
}));
|
||||
}
|
||||
@ -32,19 +32,19 @@ export class OB11NetworkManager {
|
||||
}
|
||||
|
||||
async emitEventByName(names: string[], event: OB11EmitEventContent) {
|
||||
return Promise.all(names.map(name => {
|
||||
return Promise.all(names.map(async name => {
|
||||
const adapter = this.adapters.get(name);
|
||||
if (adapter && adapter.isEnable) {
|
||||
return adapter.onEvent(event);
|
||||
return await adapter.onEvent(event);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
async emitEventByNames(map: Map<string, OB11EmitEventContent>) {
|
||||
return Promise.all(Array.from(map.entries()).map(([name, event]) => {
|
||||
return Promise.all(Array.from(map.entries()).map(async ([name, event]) => {
|
||||
const adapter = this.adapters.get(name);
|
||||
if (adapter && adapter.isEnable) {
|
||||
return adapter.onEvent(event);
|
||||
return await adapter.onEvent(event);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@ -251,14 +251,20 @@ export class OB11PluginMangerAdapter extends IOB11NetworkAdapter<PluginConfig> {
|
||||
this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`);
|
||||
}
|
||||
|
||||
onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
if (!this.isEnable) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 遍历所有已加载的插件,调用它们的事件处理方法
|
||||
for (const [, plugin] of this.loadedPlugins) {
|
||||
this.callPluginEventHandler(plugin, event);
|
||||
try {
|
||||
await Promise.allSettled(
|
||||
Array.from(this.loadedPlugins.values()).map((plugin) =>
|
||||
this.callPluginEventHandler(plugin, event)
|
||||
)
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.logError('[Plugin Adapter] Error handling event:', error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -251,7 +251,7 @@ export class OB11PluginAdapter extends IOB11NetworkAdapter<PluginConfig> {
|
||||
this.logger.log(`[Plugin Adapter] Unloaded plugin: ${pluginName}`);
|
||||
}
|
||||
|
||||
onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
if (!this.isEnable) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -19,7 +19,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
super(name, config, core, obContext, actions);
|
||||
}
|
||||
|
||||
onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
this.connection.send(JSON.stringify(event));
|
||||
}
|
||||
@ -62,10 +62,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
}
|
||||
}
|
||||
|
||||
private checkStateAndReply<T>(data: T) {
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
this.connection.send(JSON.stringify(data));
|
||||
}
|
||||
private async checkStateAndReply<T>(data: T) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
this.connection.send(JSON.stringify(data));
|
||||
resolve();
|
||||
} else {
|
||||
reject(new Error('WebSocket is not open'));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async tryConnect() {
|
||||
@ -92,7 +97,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
});
|
||||
this.connection.on('open', () => {
|
||||
try {
|
||||
this.connectEvent(this.core);
|
||||
this.connectEvent(this.core).catch(e => this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e));
|
||||
} catch (e) {
|
||||
this.logger.logError('[OneBot] [WebSocket Client] 发送连接生命周期失败', e);
|
||||
}
|
||||
@ -123,9 +128,9 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
}
|
||||
}
|
||||
|
||||
connectEvent(core: NapCatCore) {
|
||||
async connectEvent(core: NapCatCore) {
|
||||
try {
|
||||
this.checkStateAndReply<unknown>(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT));
|
||||
await this.checkStateAndReply<unknown>(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT));
|
||||
} catch (e) {
|
||||
this.logger.logError('[OneBot] [WebSocket Client] 发送生命周期失败', e);
|
||||
}
|
||||
@ -140,7 +145,7 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
echo = receiveData.echo;
|
||||
this.logger.logDebug('[OneBot] [WebSocket Client] 收到正向Websocket消息', receiveData);
|
||||
} catch {
|
||||
this.checkStateAndReply<unknown>(OB11Response.error('json解析失败,请检查数据格式', 1400, echo));
|
||||
await this.checkStateAndReply<unknown>(OB11Response.error('json解析失败,请检查数据格式', 1400, echo));
|
||||
return;
|
||||
}
|
||||
receiveData.params = (receiveData?.params) ? receiveData.params : {};// 兼容类型验证
|
||||
@ -148,15 +153,15 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
|
||||
const action = this.actions.get(receiveData.action as any);
|
||||
if (!action) {
|
||||
this.logger.logError('[OneBot] [WebSocket Client] 发生错误', '不支持的Api ' + receiveData.action);
|
||||
this.checkStateAndReply<unknown>(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo));
|
||||
await this.checkStateAndReply<unknown>(OB11Response.error('不支持的Api ' + receiveData.action, 1404, echo));
|
||||
return;
|
||||
}
|
||||
const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, {
|
||||
send: async (data: object) => {
|
||||
this.checkStateAndReply<unknown>({ ...OB11Response.ok(data, echo ?? '', true) });
|
||||
await this.checkStateAndReply<unknown>({ ...OB11Response.ok(data, echo ?? '', true) });
|
||||
}
|
||||
});
|
||||
this.checkStateAndReply<unknown>({ ...retdata });
|
||||
await this.checkStateAndReply<unknown>({ ...retdata });
|
||||
}
|
||||
async reload(newConfig: WebsocketClientConfig) {
|
||||
const wasEnabled = this.isEnable;
|
||||
|
||||
@ -83,17 +83,25 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
|
||||
}
|
||||
connectEvent(core: NapCatCore, wsClient: WebSocket) {
|
||||
try {
|
||||
this.checkStateAndReply<unknown>(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient);
|
||||
this.checkStateAndReply<unknown>(new OB11LifeCycleEvent(core, LifeCycleSubType.CONNECT), wsClient).catch(e => this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e));
|
||||
} catch (e) {
|
||||
this.logger.logError('[OneBot] [WebSocket Server] 发送生命周期失败', e);
|
||||
}
|
||||
}
|
||||
|
||||
onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
async onEvent<T extends OB11EmitEventContent>(event: T) {
|
||||
this.wsClientsMutex.runExclusive(async () => {
|
||||
this.wsClientWithEvent.forEach((wsClient) => {
|
||||
wsClient.send(JSON.stringify(event));
|
||||
let promises = this.wsClientWithEvent.map((wsClient) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
if (wsClient.readyState === WebSocket.OPEN) {
|
||||
wsClient.send(JSON.stringify(event));
|
||||
resolve();
|
||||
} else {
|
||||
reject(new Error('WebSocket is not open'));
|
||||
}
|
||||
});
|
||||
});
|
||||
await Promise.allSettled(promises);
|
||||
});
|
||||
}
|
||||
|
||||
@ -160,10 +168,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
|
||||
return false;
|
||||
}
|
||||
|
||||
private checkStateAndReply<T>(data: T, wsClient: WebSocket) {
|
||||
if (wsClient.readyState === WebSocket.OPEN) {
|
||||
wsClient.send(JSON.stringify(data));
|
||||
}
|
||||
private async checkStateAndReply<T>(data: T, wsClient: WebSocket) {
|
||||
return await new Promise<void>((resolve, reject) => {
|
||||
if (wsClient.readyState === WebSocket.OPEN) {
|
||||
wsClient.send(JSON.stringify(data));
|
||||
resolve();
|
||||
} else {
|
||||
reject(new Error('WebSocket is not open'));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async handleMessage(wsClient: WebSocket, message: RawData) {
|
||||
@ -175,7 +188,7 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
|
||||
echo = receiveData.echo;
|
||||
//this.logger.logDebug('收到正向Websocket消息', receiveData);
|
||||
} catch {
|
||||
this.checkStateAndReply<unknown>(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient);
|
||||
await this.checkStateAndReply<unknown>(OB11Response.error('json解析失败,请检查数据格式', 1400, echo), wsClient);
|
||||
return;
|
||||
}
|
||||
receiveData.params = (receiveData?.params) ? receiveData.params : {};//兼容类型验证 不然类型校验爆炸
|
||||
@ -183,15 +196,15 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
|
||||
const action = this.actions.get(receiveData.action as any);
|
||||
if (!action) {
|
||||
this.logger.logError('[OneBot] [WebSocket Client] 发生错误', '不支持的API ' + receiveData.action);
|
||||
this.checkStateAndReply<unknown>(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient);
|
||||
await this.checkStateAndReply<unknown>(OB11Response.error('不支持的API ' + receiveData.action, 1404, echo), wsClient);
|
||||
return;
|
||||
}
|
||||
const retdata = await action.websocketHandle(receiveData.params, echo ?? '', this.name, this.config, {
|
||||
send: async (data: object) => {
|
||||
this.checkStateAndReply<unknown>({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient);
|
||||
await this.checkStateAndReply<unknown>({ ...OB11Response.ok(data, echo ?? '', true) }, wsClient);
|
||||
}
|
||||
});
|
||||
this.checkStateAndReply<unknown>({ ...retdata }, wsClient);
|
||||
await this.checkStateAndReply<unknown>({ ...retdata }, wsClient);
|
||||
}
|
||||
|
||||
async reload(newConfig: WebsocketServerConfig) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user