refactor(onebot): 精简 WebSocket 适配器实现 (#1644)

* refactor(onebot): 移除 `async-mutex` 依赖

* fix(onebot): 避免重复发送 WebSocket pong
This commit is contained in:
ud2
2026-02-21 15:59:12 +08:00
committed by GitHub
parent eb07cdb715
commit 1fc4655ae1
7 changed files with 69 additions and 134 deletions

View File

@@ -13,14 +13,12 @@ import { URL } from 'url';
import { ActionName } from '@/napcat-onebot/action/router'; import { ActionName } from '@/napcat-onebot/action/router';
import { OB11HeartbeatEvent } from '@/napcat-onebot/event/meta/OB11HeartbeatEvent'; import { OB11HeartbeatEvent } from '@/napcat-onebot/event/meta/OB11HeartbeatEvent';
import { OB11LifeCycleEvent, LifeCycleSubType } from '@/napcat-onebot/event/meta/OB11LifeCycleEvent'; import { OB11LifeCycleEvent, LifeCycleSubType } from '@/napcat-onebot/event/meta/OB11LifeCycleEvent';
import { Mutex } from 'async-mutex';
export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig> { export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig> {
private app: Express | undefined; private app: Express | undefined;
private server: http.Server | undefined; private server: http.Server | undefined;
private wsServer?: WebSocketServer; private wsServer?: WebSocketServer;
private wsClients: WebSocket[] = []; private wsClients: WebSocket[] = [];
private wsClientsMutex = new Mutex();
private heartbeatIntervalId: NodeJS.Timeout | null = null; private heartbeatIntervalId: NodeJS.Timeout | null = null;
private wsClientWithEvent: WebSocket[] = []; private wsClientWithEvent: WebSocket[] = [];
@@ -30,19 +28,17 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
override async onEvent<T extends OB11EmitEventContent> (event: T) { override async onEvent<T extends OB11EmitEventContent> (event: T) {
// http server is passive, no need to emit event // http server is passive, no need to emit event
this.wsClientsMutex.runExclusive(async () => { const promises = this.wsClientWithEvent.map((wsClient) => {
const promises = this.wsClientWithEvent.map((wsClient) => { return new Promise<void>((resolve, reject) => {
return new Promise<void>((resolve, reject) => { if (wsClient.readyState === WebSocket.OPEN) {
if (wsClient.readyState === WebSocket.OPEN) { wsClient.send(JSON.stringify(event));
wsClient.send(JSON.stringify(event)); resolve();
resolve(); } else {
} else { reject(new Error('WebSocket is not open'));
reject(new Error('WebSocket is not open')); }
}
});
}); });
await Promise.allSettled(promises);
}); });
await Promise.allSettled(promises);
} }
open () { open () {
@@ -65,13 +61,9 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
this.server?.close(); this.server?.close();
this.app = undefined; this.app = undefined;
this.stopHeartbeat(); this.stopHeartbeat();
await this.wsClientsMutex.runExclusive(async () => { this.wsClients.forEach((wsClient) => wsClient.close());
this.wsClients.forEach((wsClient) => { this.wsClients = [];
wsClient.close(); this.wsClientWithEvent = [];
});
this.wsClients = [];
this.wsClientWithEvent = [];
});
this.wsServer?.close(); this.wsServer?.close();
} }
@@ -153,36 +145,29 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
wsClient.on('message', (message) => { wsClient.on('message', (message) => {
this.handleWSMessage(wsClient, message).then().catch(e => this.logger.logError(e)); this.handleWSMessage(wsClient, message).then().catch(e => this.logger.logError(e));
}); });
wsClient.on('ping', () => {
wsClient.pong();
});
wsClient.on('pong', () => { wsClient.on('pong', () => {
// this.logger.logDebug('[OneBot] [HTTP WebSocket] Pong received'); // this.logger.logDebug('[OneBot] [HTTP WebSocket] Pong received');
}); });
wsClient.once('close', () => { wsClient.once('close', () => {
this.wsClientsMutex.runExclusive(async () => { const NormolIndex = this.wsClients.indexOf(wsClient);
const NormolIndex = this.wsClients.indexOf(wsClient); if (NormolIndex !== -1) {
if (NormolIndex !== -1) { this.wsClients.splice(NormolIndex, 1);
this.wsClients.splice(NormolIndex, 1);
}
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
if (EventIndex !== -1) {
this.wsClientWithEvent.splice(EventIndex, 1);
}
if (this.wsClientWithEvent.length === 0) {
this.stopHeartbeat();
}
});
});
await this.wsClientsMutex.runExclusive(async () => {
if (!isApiConnect) {
this.wsClientWithEvent.push(wsClient);
} }
this.wsClients.push(wsClient); const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
if (this.wsClientWithEvent.length > 0) { if (EventIndex !== -1) {
this.startHeartbeat(); this.wsClientWithEvent.splice(EventIndex, 1);
}
if (this.wsClientWithEvent.length === 0) {
this.stopHeartbeat();
} }
}); });
if (!isApiConnect) {
this.wsClientWithEvent.push(wsClient);
}
this.wsClients.push(wsClient);
if (this.wsClientWithEvent.length > 0) {
this.startHeartbeat();
}
}).on('error', (err) => this.logger.log('[OneBot] [HTTP WebSocket] Server Error:', err.message)); }).on('error', (err) => this.logger.log('[OneBot] [HTTP WebSocket] Server Error:', err.message));
} }
@@ -197,12 +182,10 @@ export class OB11HttpServerAdapter extends IOB11NetworkAdapter<HttpServerConfig>
private startHeartbeat () { private startHeartbeat () {
if (this.heartbeatIntervalId) return; if (this.heartbeatIntervalId) return;
this.heartbeatIntervalId = setInterval(() => { this.heartbeatIntervalId = setInterval(() => {
this.wsClientsMutex.runExclusive(async () => { this.wsClientWithEvent.forEach((wsClient) => {
this.wsClientWithEvent.forEach((wsClient) => { if (wsClient.readyState === WebSocket.OPEN) {
if (wsClient.readyState === WebSocket.OPEN) { wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, 30000, this.core.selfInfo.online ?? true, true)));
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, 30000, this.core.selfInfo.online ?? true, true))); }
}
});
}); });
}, 30000); }, 30000);
} }

View File

@@ -85,9 +85,6 @@ export class OB11WebSocketClientAdapter extends IOB11NetworkAdapter<WebsocketCli
}, },
}); });
this.connection.on('ping', () => {
this.connection?.pong();
});
this.connection.on('pong', () => { this.connection.on('pong', () => {
// this.logger.logDebug('[OneBot] [WebSocket Client] 收到pong'); // this.logger.logDebug('[OneBot] [WebSocket Client] 收到pong');
}); });

View File

@@ -1,7 +1,6 @@
import { OB11EmitEventContent, OB11NetworkReloadType } from './index'; import { OB11EmitEventContent, OB11NetworkReloadType } from './index';
import { URL } from 'url'; import { URL } from 'url';
import { RawData, WebSocket, WebSocketServer } from 'ws'; import { RawData, WebSocket, WebSocketServer } from 'ws';
import { Mutex } from 'async-mutex';
import { OB11Response } from '@/napcat-onebot/action/OneBotAction'; import { OB11Response } from '@/napcat-onebot/action/OneBotAction';
import { ActionName } from '@/napcat-onebot/action/router'; import { ActionName } from '@/napcat-onebot/action/router';
import { NapCatCore } from 'napcat-core'; import { NapCatCore } from 'napcat-core';
@@ -17,7 +16,6 @@ import json5 from 'json5';
export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketServerConfig> { export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketServerConfig> {
wsServer?: WebSocketServer; wsServer?: WebSocketServer;
wsClients: WebSocket[] = []; wsClients: WebSocket[] = [];
wsClientsMutex = new Mutex();
private heartbeatIntervalId: NodeJS.Timeout | null = null; private heartbeatIntervalId: NodeJS.Timeout | null = null;
wsClientWithEvent: WebSocket[] = []; wsClientWithEvent: WebSocket[] = [];
@@ -58,36 +56,29 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
wsClient.on('message', (message) => { wsClient.on('message', (message) => {
this.handleMessage(wsClient, message).then().catch(e => this.logger.logError(e)); this.handleMessage(wsClient, message).then().catch(e => this.logger.logError(e));
}); });
wsClient.on('ping', () => {
wsClient.pong();
});
wsClient.on('pong', () => { wsClient.on('pong', () => {
// this.logger.logDebug('[OneBot] [WebSocket Server] Pong received'); // this.logger.logDebug('[OneBot] [WebSocket Server] Pong received');
}); });
wsClient.once('close', () => { wsClient.once('close', () => {
this.wsClientsMutex.runExclusive(async () => { const NormolIndex = this.wsClients.indexOf(wsClient);
const NormolIndex = this.wsClients.indexOf(wsClient); if (NormolIndex !== -1) {
if (NormolIndex !== -1) { this.wsClients.splice(NormolIndex, 1);
this.wsClients.splice(NormolIndex, 1);
}
const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
if (EventIndex !== -1) {
this.wsClientWithEvent.splice(EventIndex, 1);
}
if (this.wsClientWithEvent.length === 0) {
this.stopHeartbeat();
}
});
});
await this.wsClientsMutex.runExclusive(async () => {
if (!isApiConnect) {
this.wsClientWithEvent.push(wsClient);
} }
this.wsClients.push(wsClient); const EventIndex = this.wsClientWithEvent.indexOf(wsClient);
if (this.wsClientWithEvent.length > 0) { if (EventIndex !== -1) {
this.startHeartbeat(); this.wsClientWithEvent.splice(EventIndex, 1);
}
if (this.wsClientWithEvent.length === 0) {
this.stopHeartbeat();
} }
}); });
if (!isApiConnect) {
this.wsClientWithEvent.push(wsClient);
}
this.wsClients.push(wsClient);
if (this.wsClientWithEvent.length > 0) {
this.startHeartbeat();
}
}).on('error', (err) => this.logger.log('[OneBot] [WebSocket Server] Server Error:', err.message)); }).on('error', (err) => this.logger.log('[OneBot] [WebSocket Server] Server Error:', err.message));
} }
@@ -100,19 +91,17 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
} }
async onEvent<T extends OB11EmitEventContent> (event: T) { async onEvent<T extends OB11EmitEventContent> (event: T) {
this.wsClientsMutex.runExclusive(async () => { const promises = this.wsClientWithEvent.map((wsClient) => {
const promises = this.wsClientWithEvent.map((wsClient) => { return new Promise<void>((resolve, reject) => {
return new Promise<void>((resolve, reject) => { if (wsClient.readyState === WebSocket.OPEN) {
if (wsClient.readyState === WebSocket.OPEN) { wsClient.send(JSON.stringify(event));
wsClient.send(JSON.stringify(event)); resolve();
resolve(); } else {
} else { reject(new Error('WebSocket is not open'));
reject(new Error('WebSocket is not open')); }
}
});
}); });
await Promise.allSettled(promises);
}); });
await Promise.allSettled(promises);
} }
open () { open () {
@@ -136,24 +125,18 @@ export class OB11WebSocketServerAdapter extends IOB11NetworkAdapter<WebsocketSer
} }
}); });
this.stopHeartbeat(); this.stopHeartbeat();
await this.wsClientsMutex.runExclusive(async () => { this.wsClients.forEach((wsClient) => wsClient.close());
this.wsClients.forEach((wsClient) => { this.wsClients = [];
wsClient.close(); this.wsClientWithEvent = [];
});
this.wsClients = [];
this.wsClientWithEvent = [];
});
} }
private startHeartbeat () { private startHeartbeat () {
if (this.heartbeatIntervalId || this.config.heartInterval <= 0) return; if (this.heartbeatIntervalId || this.config.heartInterval <= 0) return;
this.heartbeatIntervalId = setInterval(() => { this.heartbeatIntervalId = setInterval(() => {
this.wsClientsMutex.runExclusive(async () => { this.wsClientWithEvent.forEach((wsClient) => {
this.wsClientWithEvent.forEach((wsClient) => { if (wsClient.readyState === WebSocket.OPEN) {
if (wsClient.readyState === WebSocket.OPEN) { wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, this.config.heartInterval, this.core.selfInfo.online ?? true, true)));
wsClient.send(JSON.stringify(new OB11HeartbeatEvent(this.core, this.config.heartInterval, this.core.selfInfo.online ?? true, true))); }
}
});
}); });
}, this.config.heartInterval); }, this.config.heartInterval);
} }

View File

@@ -26,7 +26,6 @@
"express": "^5.0.0", "express": "^5.0.0",
"ws": "^8.18.3", "ws": "^8.18.3",
"file-type": "^21.0.0", "file-type": "^21.0.0",
"async-mutex": "^0.5.0",
"napcat-protobuf": "workspace:*", "napcat-protobuf": "workspace:*",
"json5": "^2.2.3", "json5": "^2.2.3",
"napcat-core": "workspace:*", "napcat-core": "workspace:*",

View File

@@ -61,20 +61,6 @@ declare module 'yaml' {
export const stringify: (...args: any[]) => any; export const stringify: (...args: any[]) => any;
} }
declare module 'async-mutex' {
export class Mutex {
acquire (): Promise<() => void>;
runExclusive<T> (callback: () => T | Promise<T>): Promise<T>;
}
export class Semaphore {
acquire (): Promise<[() => void, number]>;
runExclusive<T> (callback: () => T | Promise<T>): Promise<T>;
release (): void;
}
const _async_mutex_default: { Mutex: typeof Mutex; Semaphore: typeof Semaphore; };
export default _async_mutex_default;
}
declare module 'napcat-protobuf' { declare module 'napcat-protobuf' {
export class NapProtoMsg<T = any> { export class NapProtoMsg<T = any> {
constructor (schema: any); constructor (schema: any);

View File

@@ -35,9 +35,6 @@ const EXTERNAL_TYPE_REPLACEMENTS = {
'ValidateFunction<T>': 'any', 'ValidateFunction<T>': 'any',
// inversify // inversify
'Container': 'any', 'Container': 'any',
// async-mutex
'Mutex': 'any',
'Semaphore': 'any',
// napcat-protobuf // napcat-protobuf
'NapProtoDecodeStructType': 'any', 'NapProtoDecodeStructType': 'any',
'NapProtoEncodeStructType': 'any', 'NapProtoEncodeStructType': 'any',
@@ -90,15 +87,15 @@ function replaceExternalTypes (content) {
// 使用类型上下文的模式匹配 // 使用类型上下文的模式匹配
const typeContextPatterns = [ const typeContextPatterns = [
// : Type // : Type
/:\s*(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[;,)\]\}|&]|$)/g, /:\s*(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[;,)\]\}|&]|$)/g,
// <Type> // <Type>
/<(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)>/g, /<(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)>/g,
// Type[] // Type[]
/(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)\[\]/g, /(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)\[\]/g,
// extends Type // extends Type
/extends\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g, /extends\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
// implements Type // implements Type
/implements\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|Mutex|Semaphore|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g, /implements\s+(WebSocket|WebSocketServer|RawData|Ajv|AnySchema|ValidateFunction|Container|NapProtoDecodeStructType|NapProtoEncodeStructType|Express|Request|Response|NextFunction)(?=\s*[{,])/g,
]; ];
for (const pattern of typeContextPatterns) { for (const pattern of typeContextPatterns) {

10
pnpm-lock.yaml generated
View File

@@ -176,9 +176,6 @@ importers:
ajv: ajv:
specifier: ^8.13.0 specifier: ^8.13.0
version: 8.17.1 version: 8.17.1
async-mutex:
specifier: ^0.5.0
version: 0.5.0
cors: cors:
specifier: ^2.8.5 specifier: ^2.8.5
version: 2.8.5 version: 2.8.5
@@ -3563,9 +3560,6 @@ packages:
resolution: {integrity: sha512-hsU18Ae8CDTR6Kgu9DYf0EbCr/a5iGL0rytQDobUcdpYOKokk8LEjVphnXkDkgpi0wYVsqrXuP0bZxJaTqdgoA==} resolution: {integrity: sha512-hsU18Ae8CDTR6Kgu9DYf0EbCr/a5iGL0rytQDobUcdpYOKokk8LEjVphnXkDkgpi0wYVsqrXuP0bZxJaTqdgoA==}
engines: {node: '>= 0.4'} engines: {node: '>= 0.4'}
async-mutex@0.5.0:
resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==}
async@3.2.6: async@3.2.6:
resolution: {integrity: sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==} resolution: {integrity: sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==}
@@ -10443,10 +10437,6 @@ snapshots:
async-function@1.0.0: {} async-function@1.0.0: {}
async-mutex@0.5.0:
dependencies:
tslib: 2.8.1
async@3.2.6: {} async@3.2.6: {}
asynckit@0.4.0: {} asynckit@0.4.0: {}