feat: msg push

This commit is contained in:
手瓜一十雪
2024-11-14 20:18:19 +08:00
parent 61b58f6dfb
commit d9a67de683
15 changed files with 345 additions and 245 deletions

View File

@@ -14,7 +14,7 @@ import {
RawMessage,
SendStatusType,
} from '@/core';
import { OB11Config, OB11ConfigLoader } from '@/onebot/config';
import { OB11ConfigLoader } from '@/onebot/config';
import {
OB11ActiveHttpAdapter,
OB11ActiveWebSocketAdapter,
@@ -45,6 +45,8 @@ import { OB11GroupRecallNoticeEvent } from '@/onebot/event/notice/OB11GroupRecal
import { LRUCache } from '@/common/lru-cache';
import { NodeIKernelRecentContactListener } from '@/core/listeners/NodeIKernelRecentContactListener';
import { BotOfflineEvent } from './event/notice/BotOfflineEvent';
import { defaultOnebotConfig, mergeOnebotConfigs, OnebotConfig } from './config/config';
import { OB11Message } from './types';
//OneBot实现类
export class NapCatOneBot11Adapter {
@@ -61,7 +63,8 @@ export class NapCatOneBot11Adapter {
constructor(core: NapCatCore, context: InstanceContext, pathWrapper: NapCatPathWrapper) {
this.core = core;
this.context = context;
this.configLoader = new OB11ConfigLoader(core, pathWrapper.configPath);
this.configLoader = new OB11ConfigLoader(core, pathWrapper.configPath,);
this.configLoader.save(mergeOnebotConfigs(defaultOnebotConfig, this.configLoader.configData));
this.apis = {
GroupApi: new OneBotGroupApi(this, core),
UserApi: new OneBotUserApi(this, core),
@@ -72,65 +75,77 @@ export class NapCatOneBot11Adapter {
this.actions = createActionMap(this, core);
this.networkManager = new OB11NetworkManager();
}
async creatOneBotLog(ob11Config: OnebotConfig) {
let log = `[network] 配置加载\n`;
for (const key of ob11Config.network.httpServers) {
log += `HTTP服务: ${key.host}:${key.port}, : ${key.enable ? '已启动' : '未启动'}\n`;
}
for (const key of ob11Config.network.httpClients) {
log += `HTTP上报服务: ${key.url}, : ${key.enable ? '已启动' : '未启动'}\n`;
}
for (const key of ob11Config.network.websocketServers) {
log += `WebSocket服务: ${key.host}:${key.port}, : ${key.enable ? '已启动' : '未启动'}\n`;
}
for (const key of ob11Config.network.websocketClients) {
log += `WebSocket反向服务: ${key.url}, : ${key.enable ? '已启动' : '未启动'}\n`;
}
return log;
}
async InitOneBot() {
const selfInfo = this.core.selfInfo;
const ob11Config = this.configLoader.configData;
const serviceInfo = `
HTTP服务 ${ob11Config.http.enable ? '已启动' : '未启动'}, ${ob11Config.http.host}:${ob11Config.http.port}
HTTP上报服务 ${ob11Config.http.enablePost ? '已启动' : '未启动'}, 上报地址: ${ob11Config.http.postUrls}
WebSocket服务 ${ob11Config.ws.enable ? '已启动' : '未启动'}, ${ob11Config.ws.host}:${ob11Config.ws.port}
WebSocket反向服务 ${ob11Config.reverseWs.enable ? '已启动' : '未启动'}, 反向地址: ${ob11Config.reverseWs.urls}`;
this.core.apis.UserApi.getUserDetailInfo(selfInfo.uid).then(user => {
selfInfo.nick = user.nick;
this.context.logger.setLogSelfInfo(selfInfo);
}).catch(this.context.logger.logError.bind(this.context.logger));
let serviceInfo = await this.creatOneBotLog(ob11Config);
this.context.logger.log(`[Notice] [OneBot11] ${serviceInfo}`);
//创建NetWork服务
if (ob11Config.http.enable) {
this.networkManager.registerAdapter(new OB11PassiveHttpAdapter(
ob11Config.http.port, ob11Config.token, this.core, this.actions,
));
// //创建NetWork服务
for (const key of ob11Config.network.httpServers) {
if (key.enable) {
this.networkManager.registerAdapter(new OB11PassiveHttpAdapter(
key.name, key.port, key.token, this.core, this.actions,
));
}
}
if (ob11Config.http.enablePost) {
ob11Config.http.postUrls.forEach(url => {
for (const key of ob11Config.network.httpClients) {
if (key.enable) {
this.networkManager.registerAdapter(new OB11ActiveHttpAdapter(
url, ob11Config.http.secret, this.core, this,
key.name, key.url, key.token, this.core, this,
));
});
}
}
if (ob11Config.ws.enable) {
const OBPassiveWebSocketAdapter = new OB11PassiveWebSocketAdapter(
ob11Config.ws.host, ob11Config.ws.port, ob11Config.heartInterval, ob11Config.token, this.core, this.actions,
);
this.networkManager.registerAdapter(OBPassiveWebSocketAdapter);
for (const key of ob11Config.network.websocketServers) {
if (key.enable) {
this.networkManager.registerAdapter(new OB11PassiveWebSocketAdapter(
key.name, key.host, key.port, key.heartInterval, key.token, this.core, this.actions,
));
}
}
if (ob11Config.reverseWs.enable) {
ob11Config.reverseWs.urls.forEach(url => {
for (const key of ob11Config.network.websocketClients) {
if (key.enable) {
this.networkManager.registerAdapter(new OB11ActiveWebSocketAdapter(
url, 5000, ob11Config.heartInterval, ob11Config.token, this.core, this.actions,
key.name, key.url, 5000, key.heartInterval, key.token, this.core, this.actions,
));
});
}
}
await this.networkManager.openAllAdapters();
this.initMsgListener();
this.initBuddyListener();
this.initGroupListener();
//this.initRecentContactListener();
await WebUiDataRuntime.setQQLoginUin(selfInfo.uin.toString());
await WebUiDataRuntime.setQQLoginStatus(true);
await WebUiDataRuntime.setOnOB11ConfigChanged(async (newConfig: OB11Config) => {
const prev = this.configLoader.configData;
this.configLoader.save(newConfig);
this.context.logger.log(`OneBot11 配置更改:${JSON.stringify(prev)} -> ${JSON.stringify(newConfig)}`);
await this.reloadNetwork(prev, newConfig);
});
// await WebUiDataRuntime.setOnOB11ConfigChanged(async (newConfig: OB11Config) => {
// const prev = this.configLoader.configData;
// this.configLoader.save(newConfig);
// this.context.logger.log(`OneBot11 配置更改:${JSON.stringify(prev)} -> ${JSON.stringify(newConfig)}`);
// await this.reloadNetwork(prev, newConfig);
// });
}
initRecentContactListener() {
@@ -144,88 +159,88 @@ export class NapCatOneBot11Adapter {
};
}
private async reloadNetwork(prev: OB11Config, now: OB11Config) {
const serviceInfo = `
HTTP服务 ${now.http.enable ? '已启动' : '未启动'}, ${now.http.host}:${now.http.port}
HTTP上报服务 ${now.http.enablePost ? '已启动' : '未启动'}, 上报地址: ${now.http.postUrls}
WebSocket服务 ${now.ws.enable ? '已启动' : '未启动'}, ${now.ws.host}:${now.ws.port}
WebSocket反向服务 ${now.reverseWs.enable ? '已启动' : '未启动'}, 反向地址: ${now.reverseWs.urls}`;
this.context.logger.log(`[Notice] [OneBot11] 热重载 ${serviceInfo}`);
// private async reloadNetwork(prev: OB11Config, now: OB11Config) {
// const serviceInfo = `
// HTTP服务 ${now.http.enable ? '已启动' : '未启动'}, ${now.http.host}:${now.http.port}
// HTTP上报服务 ${now.http.enablePost ? '已启动' : '未启动'}, 上报地址: ${now.http.postUrls}
// WebSocket服务 ${now.ws.enable ? '已启动' : '未启动'}, ${now.ws.host}:${now.ws.port}
// WebSocket反向服务 ${now.reverseWs.enable ? '已启动' : '未启动'}, 反向地址: ${now.reverseWs.urls}`;
// this.context.logger.log(`[Notice] [OneBot11] 热重载 ${serviceInfo}`);
// check difference in passive http (Http)
if (prev.http.enable !== now.http.enable) {
if (now.http.enable) {
await this.networkManager.registerAdapterAndOpen(new OB11PassiveHttpAdapter(
now.http.port, now.token, this.core, this.actions,
));
} else {
await this.networkManager.closeAdapterByPredicate(adapter => adapter instanceof OB11PassiveHttpAdapter);
}
}
// // check difference in passive http (Http)
// if (prev.http.enable !== now.http.enable) {
// if (now.http.enable) {
// await this.networkManager.registerAdapterAndOpen(new OB11PassiveHttpAdapter(
// now.http.port, now.token, this.core, this.actions,
// ));
// } else {
// await this.networkManager.closeAdapterByPredicate(adapter => adapter instanceof OB11PassiveHttpAdapter);
// }
// }
// check difference in active http (HttpPost)
if (prev.http.enablePost !== now.http.enablePost) {
if (now.http.enablePost) {
now.http.postUrls.forEach(url => {
this.networkManager.registerAdapterAndOpen(new OB11ActiveHttpAdapter(
url, now.http.secret, this.core, this,
));
});
} else {
await this.networkManager.closeAdapterByPredicate(adapter => adapter instanceof OB11ActiveHttpAdapter);
}
} else if (now.http.enablePost) {
const { added, removed } = this.findDifference<string>(prev.http.postUrls, now.http.postUrls);
await this.networkManager.closeAdapterByPredicate(
adapter => adapter instanceof OB11ActiveHttpAdapter && removed.includes(adapter.url),
);
for (const url of added) {
await this.networkManager.registerAdapterAndOpen(new OB11ActiveHttpAdapter(
url, now.http.secret, this.core, this,
));
}
}
// // check difference in active http (HttpPost)
// if (prev.http.enablePost !== now.http.enablePost) {
// if (now.http.enablePost) {
// now.http.postUrls.forEach(url => {
// this.networkManager.registerAdapterAndOpen(new OB11ActiveHttpAdapter(
// url, now.http.secret, this.core, this,
// ));
// });
// } else {
// await this.networkManager.closeAdapterByPredicate(adapter => adapter instanceof OB11ActiveHttpAdapter);
// }
// } else if (now.http.enablePost) {
// const { added, removed } = this.findDifference<string>(prev.http.postUrls, now.http.postUrls);
// await this.networkManager.closeAdapterByPredicate(
// adapter => adapter instanceof OB11ActiveHttpAdapter && removed.includes(adapter.url),
// );
// for (const url of added) {
// await this.networkManager.registerAdapterAndOpen(new OB11ActiveHttpAdapter(
// url, now.http.secret, this.core, this,
// ));
// }
// }
// check difference in passive websocket (Ws)
if (prev.ws.enable !== now.ws.enable) {
if (now.ws.enable) {
await this.networkManager.registerAdapterAndOpen(new OB11PassiveWebSocketAdapter(
now.ws.host, now.ws.port, now.heartInterval, now.token, this.core, this.actions,
));
} else {
await this.networkManager.closeAdapterByPredicate(
adapter => adapter instanceof OB11PassiveWebSocketAdapter,
);
}
}
// // check difference in passive websocket (Ws)
// if (prev.ws.enable !== now.ws.enable) {
// if (now.ws.enable) {
// await this.networkManager.registerAdapterAndOpen(new OB11PassiveWebSocketAdapter(
// now.ws.host, now.ws.port, now.heartInterval, now.token, this.core, this.actions,
// ));
// } else {
// await this.networkManager.closeAdapterByPredicate(
// adapter => adapter instanceof OB11PassiveWebSocketAdapter,
// );
// }
// }
// check difference in active websocket (ReverseWs)
if (prev.reverseWs.enable !== now.reverseWs.enable) {
if (now.reverseWs.enable) {
now.reverseWs.urls.forEach(url => {
this.networkManager.registerAdapterAndOpen(new OB11ActiveWebSocketAdapter(
url, 5000, now.heartInterval, now.token, this.core, this.actions,
));
});
} else {
await this.networkManager.closeAdapterByPredicate(
adapter => adapter instanceof OB11ActiveWebSocketAdapter,
);
}
} else if (now.reverseWs.enable) {
const { added, removed } = this.findDifference<string>(prev.reverseWs.urls, now.reverseWs.urls);
await this.networkManager.closeAdapterByPredicate(
adapter => adapter instanceof OB11ActiveWebSocketAdapter && removed.includes(adapter.url),
);
for (const url of added) {
await this.networkManager.registerAdapterAndOpen(new OB11ActiveWebSocketAdapter(
url, 5000, now.heartInterval, now.token, this.core, this.actions,
));
}
}
// // check difference in active websocket (ReverseWs)
// if (prev.reverseWs.enable !== now.reverseWs.enable) {
// if (now.reverseWs.enable) {
// now.reverseWs.urls.forEach(url => {
// this.networkManager.registerAdapterAndOpen(new OB11ActiveWebSocketAdapter(
// url, 5000, now.heartInterval, now.token, this.core, this.actions,
// ));
// });
// } else {
// await this.networkManager.closeAdapterByPredicate(
// adapter => adapter instanceof OB11ActiveWebSocketAdapter,
// );
// }
// } else if (now.reverseWs.enable) {
// const { added, removed } = this.findDifference<string>(prev.reverseWs.urls, now.reverseWs.urls);
// await this.networkManager.closeAdapterByPredicate(
// adapter => adapter instanceof OB11ActiveWebSocketAdapter && removed.includes(adapter.url),
// );
// for (const url of added) {
// await this.networkManager.registerAdapterAndOpen(new OB11ActiveWebSocketAdapter(
// url, 5000, now.heartInterval, now.token, this.core, this.actions,
// ));
// }
// }
}
// }
private findDifference<T>(prev: T[], now: T[]): { added: T[], removed: T[] } {
const added = now.filter(item => !prev.includes(item));
@@ -285,21 +300,12 @@ export class NapCatOneBot11Adapter {
if (msg.sendStatus == SendStatusType.KSEND_STATUS_SUCCESS && msgIdSend.get(msg.msgId) == 0) {
msgIdSend.put(msg.msgId, 1);
// 完成后再post
this.apis.MsgApi.parseMessage(msg)
.then((ob11Msg) => {
if (!ob11Msg) return;
ob11Msg.target_id = parseInt(msg.peerUin);
if (this.configLoader.configData.reportSelfMessage) {
msg.id = MessageUnique.createUniqueMsgId({
chatType: msg.chatType,
peerUid: msg.peerUid,
guildId: '',
}, msg.msgId);
this.emitMsg(msg);
} else {
// logOB11Message(this.core, ob11Msg);
}
});
msg.id = MessageUnique.createUniqueMsgId({
chatType: msg.chatType,
peerUid: msg.peerUid,
guildId: '',
}, msg.msgId);
this.emitMsg(msg, true);
}
}
};
@@ -491,56 +497,52 @@ export class NapCatOneBot11Adapter {
);
}
private async emitMsg(message: RawMessage, parseEvent: boolean = true) {
const { debug, reportSelfMessage, messagePostFormat } = this.configLoader.configData;
private async emitMsg(message: RawMessage, selfMsg: boolean = true) {
let network = Object.values(this.configLoader.configData.network) as Array<typeof this.configLoader.configData.network[keyof typeof this.configLoader.configData.network]>;
this.context.logger.logDebug('收到新消息 RawMessage', message);
this.apis.MsgApi.parseMessage(message, messagePostFormat).then((ob11Msg) => {
this.apis.MsgApi.parseMessageV2(message).then((ob11Msg) => {
if (!ob11Msg) return;
const isSelfMsg = ob11Msg.stringMsg.user_id.toString() == this.core.selfInfo.uin || ob11Msg.arrayMsg.user_id.toString() == this.core.selfInfo.uin;
this.context.logger.logDebug('转化为 OB11Message', ob11Msg);
if (debug) {
ob11Msg.raw = message;
} else if (ob11Msg.message.length === 0) {
let msgMap: Map<string, OB11Message> = new Map();
let enable_client: string[] = [];
network.flat().filter(e => e.enable).map(e => {
enable_client.push(e.name);
if (e.messagePostFormat == 'string') {
msgMap.set(e.name, structuredClone(ob11Msg.stringMsg));
} else {
msgMap.set(e.name, structuredClone(ob11Msg.arrayMsg));
}
if (isSelfMsg) {
ob11Msg.stringMsg.target_id = parseInt(message.peerUin);
ob11Msg.arrayMsg.target_id = parseInt(message.peerUin);
}
});
let debug_network = network.flat().filter(e => e.enable && e.debug);
if (debug_network.length > 0) {
for (const adapter of debug_network) {
if (adapter.name) {
const msg = msgMap.get(adapter.name);
if (msg) {
msg.raw = message;
}
}
}
} else if (ob11Msg.stringMsg.message.length === 0 || ob11Msg.arrayMsg.message.length == 0) {
return;
}
const isSelfMsg = ob11Msg.user_id.toString() == this.core.selfInfo.uin;
if (isSelfMsg && !reportSelfMessage) {
return;
}
let notreportSelf_network = network.flat().filter(e => e.enable && !e.reportSelfMessage);
if (isSelfMsg) {
ob11Msg.target_id = parseInt(message.peerUin);
for (const adapter of notreportSelf_network) {
msgMap.delete(adapter.name);
}
}
// if (ob11Msg.raw_message.startsWith('!set')) {
// this.core.apis.UserApi.getUidByUinV2(ob11Msg.user_id.toString()).then(uid => {
// if(uid){
// this.core.apis.PacketApi.sendSetSpecialTittlePacket(message.peerUin, uid, '测试');
// console.log('set', message.peerUin, uid);
// }
// });
// }
// if (ob11Msg.raw_message.startsWith('!status')) {
// console.log('status', message.peerUin, message.senderUin);
// let delMsg: string[] = [];
// let peer = {
// peerUid: message.peerUin,
// chatType: 2,
// };
// this.core.apis.PacketApi.sendStatusPacket(+message.senderUin).then(async e => {
// if (e) {
// const { sendElements } = await this.apis.MsgApi.createSendElements([{
// type: OB11MessageDataType.text,
// data: {
// text: 'status ' + JSON.stringify(e, null, 2),
// }
// }], peer)
// this.apis.MsgApi.sendMsgWithOb11UniqueId(peer, sendElements, delMsg)
// }
// })
// }
this.networkManager.emitEvent(ob11Msg);
this.networkManager.emitEventByNames(msgMap);
}).catch(e => this.context.logger.logError.bind(this.context.logger)('constructMessage error: ', e));
this.apis.GroupApi.parseGroupEvent(message).then(groupEvent => {