This commit is contained in:
linyuchen
2024-04-15 00:09:08 +08:00
parent 0ef3e38d70
commit 356aba762c
218 changed files with 8465 additions and 5 deletions

View File

@@ -0,0 +1,143 @@
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
import { ActionName } from '../../action/types';
import { OB11Response } from '../../action/OB11Response';
import BaseAction from '../../action/BaseAction';
import { actionMap } from '../../action';
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
import { wsReply } from './reply';
import { WebSocket as WebSocketClass } from 'ws';
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
import { log } from '../../../common/utils/log';
import { ob11Config } from '@/onebot11/config';
import { napCatCore } from '@/core';
import { selfInfo } from '@/common/data';
export const rwsList: ReverseWebsocket[] = [];
export class ReverseWebsocket {
public websocket: WebSocketClass | undefined;
public url: string;
private running: boolean = false;
public constructor(url: string) {
this.url = url;
this.running = true;
this.connect();
}
public stop() {
this.running = false;
this.websocket!.close();
}
public onopen() {
wsReply(this.websocket!, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
}
public async onmessage(msg: string) {
let receiveData: { action: ActionName | undefined, params: any, echo?: any } = { action: undefined, params: {} };
let echo = null;
try {
receiveData = JSON.parse(msg.toString());
echo = receiveData.echo;
log('收到反向Websocket消息', receiveData);
} catch (e) {
return wsReply(this.websocket!, OB11Response.error('json解析失败请检查数据格式', 1400, echo));
}
const action: BaseAction<any, any> | undefined = actionMap.get(receiveData.action!);
if (!action) {
return wsReply(this.websocket!, OB11Response.error('不支持的api ' + receiveData.action, 1404, echo));
}
try {
const handleResult = await action.websocketHandle(receiveData.params, echo);
wsReply(this.websocket!, handleResult);
} catch (e) {
wsReply(this.websocket!, OB11Response.error(`api处理出错:${e}`, 1200, echo));
}
}
public onclose = () => {
log('反向ws断开', this.url);
unregisterWsEventSender(this.websocket!);
if (this.running) {
this.reconnect();
}
};
public send(msg: string) {
if (this.websocket && this.websocket.readyState == WebSocket.OPEN) {
this.websocket.send(msg);
}
}
private reconnect() {
setTimeout(() => {
this.connect();
}, 3000); // TODO: 重连间隔在配置文件中实现
}
private connect() {
const { token, heartInterval } = ob11Config;
this.websocket = new WebSocketClass(this.url, {
handshakeTimeout: 2000,
perMessageDeflate: false,
headers: {
'X-Self-ID': selfInfo.uin,
'Authorization': `Bearer ${token}`,
'x-client-role': 'Universal', // koishi-adapter-onebot 需要这个字段
}
});
registerWsEventSender(this.websocket);
log('Trying to connect to the websocket server: ' + this.url);
this.websocket.on('open', () => {
log('Connected to the websocket server: ' + this.url);
this.onopen();
});
this.websocket.on('message', async (data) => {
await this.onmessage(data.toString());
});
this.websocket.on('error', log);
const wsClientInterval = setInterval(() => {
postWsEvent(new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
}, heartInterval); // 心跳包
this.websocket.on('close', () => {
clearInterval(wsClientInterval);
log('The websocket connection: ' + this.url + ' closed, trying reconnecting...');
this.onclose();
});
}
}
class OB11ReverseWebsockets {
start() {
for (const url of ob11Config.wsReverseUrls) {
log('开始连接反向ws', url);
new Promise(() => {
try {
rwsList.push(new ReverseWebsocket(url));
} catch (e: any) {
log(e.stack);
}
}).then();
}
}
stop() {
for (const rws of rwsList) {
rws.stop();
}
}
restart() {
this.stop();
this.start();
}
}
export const ob11ReverseWebsockets = new OB11ReverseWebsockets();

View File

@@ -0,0 +1,76 @@
import { WebSocket } from 'ws';
import { actionMap } from '../../action';
import { OB11Response } from '../../action/OB11Response';
import { postWsEvent, registerWsEventSender, unregisterWsEventSender } from '../postOB11Event';
import { ActionName } from '../../action/types';
import BaseAction from '../../action/BaseAction';
import { LifeCycleSubType, OB11LifeCycleEvent } from '../../event/meta/OB11LifeCycleEvent';
import { OB11HeartbeatEvent } from '../../event/meta/OB11HeartbeatEvent';
import { WebsocketServerBase } from '@/common/server/websocket';
import { IncomingMessage } from 'node:http';
import { wsReply } from './reply';
import { napCatCore } from '@/core';
import { log } from '../../../common/utils/log';
import { ob11Config } from '@/onebot11/config';
import { selfInfo } from '@/common/data';
const heartbeatRunning = false;
class OB11WebsocketServer extends WebsocketServerBase {
authorizeFailed(wsClient: WebSocket) {
wsClient.send(JSON.stringify(OB11Response.res(null, 'failed', 1403, 'token验证失败')));
}
async handleAction(wsClient: WebSocket, actionName: string, params: any, echo?: any) {
const action: BaseAction<any, any> | undefined = actionMap.get(actionName);
if (!action) {
return wsReply(wsClient, OB11Response.error('不支持的api ' + actionName, 1404, echo));
}
try {
const handleResult = await action.websocketHandle(params, echo);
wsReply(wsClient, handleResult);
} catch (e: any) {
wsReply(wsClient, OB11Response.error(`api处理出错:${e.stack}`, 1200, echo));
}
}
onConnect(wsClient: WebSocket, url: string, req: IncomingMessage) {
if (url == '/api' || url == '/api/' || url == '/') {
wsClient.on('message', async (msg) => {
let receiveData: { action: ActionName, params: any, echo?: any } = { action: '', params: {} };
let echo = null;
try {
receiveData = JSON.parse(msg.toString());
echo = receiveData.echo;
log('收到正向Websocket消息', receiveData);
} catch (e) {
return wsReply(wsClient, OB11Response.error('json解析失败请检查数据格式', 1400, echo));
}
this.handleAction(wsClient, receiveData.action, receiveData.params, receiveData.echo).then();
});
}
if (url == '/event' || url == '/event/' || url == '/') {
registerWsEventSender(wsClient);
log('event上报ws客户端已连接');
try {
wsReply(wsClient, new OB11LifeCycleEvent(LifeCycleSubType.CONNECT));
} catch (e) {
log('发送生命周期失败', e);
}
const { heartInterval } = ob11Config;
const wsClientInterval = setInterval(() => {
postWsEvent(new OB11HeartbeatEvent(!!selfInfo.online, true, heartInterval));
}, heartInterval); // 心跳包
wsClient.on('close', () => {
log('event上报ws客户端已断开');
clearInterval(wsClientInterval);
unregisterWsEventSender(wsClient);
});
}
}
}
export const ob11WebsocketServer = new OB11WebsocketServer();

View File

@@ -0,0 +1,19 @@
import { WebSocket as WebSocketClass } from 'ws';
import { OB11Response } from '../../action/OB11Response';
import { PostEventType } from '../postOB11Event';
import { log } from '../../../common/utils/log';
import { isNull } from '../../../common/utils/helper';
export function wsReply(wsClient: WebSocketClass, data: OB11Response | PostEventType) {
try {
const packet = Object.assign({}, data);
if (isNull(packet['echo'])) {
delete packet['echo'];
}
wsClient.send(JSON.stringify(packet));
log('ws 消息上报', wsClient.url || '', data);
} catch (e: any) {
log('websocket 回复失败', e.stack, data);
}
}