diff --git a/packages/napcat-shell/base.ts b/packages/napcat-shell/base.ts index aacc9ddf..1e4c4f62 100644 --- a/packages/napcat-shell/base.ts +++ b/packages/napcat-shell/base.ts @@ -29,7 +29,6 @@ import { napCatVersion } from 'napcat-common/src/version'; import { NodeIO3MiscListener } from 'napcat-core/listeners/NodeIO3MiscListener'; import { sleep } from 'napcat-common/src/helper'; import { FFmpegService } from '@/napcat-core/helper/ffmpeg/ffmpeg'; -import { connectToNamedPipe } from './pipe'; import { NativePacketHandler } from 'napcat-core/packet/handler/client'; import { logSubscription, LogWrapper } from '@/napcat-core/helper/log'; import { proxiedListenerOf } from '@/napcat-core/helper/proxy-handler'; diff --git a/packages/napcat-shell/napcat.ts b/packages/napcat-shell/napcat.ts index ae105ec8..54cc4876 100644 --- a/packages/napcat-shell/napcat.ts +++ b/packages/napcat-shell/napcat.ts @@ -2,6 +2,8 @@ import { NCoreInitShell } from './base'; import { NapCatPathWrapper } from '@/napcat-common/src/path'; import { LogWrapper } from '@/napcat-core/helper/log'; import { connectToNamedPipe } from './pipe'; +import { WebUiDataRuntime } from '@/napcat-webui-backend/src/helper/Data'; +import { createProcessManager, type IProcessManager, type IWorkerProcess } from './process-api'; import path from 'path'; import { fileURLToPath } from 'url'; @@ -9,121 +11,161 @@ import { fileURLToPath } from 'url'; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); -// 扩展 Process 类型以支持 parentPort -declare global { - namespace NodeJS { - interface Process { - parentPort?: { - on (event: 'message', listener: (e: { data: any; }) => void): void; - postMessage (message: any): void; - }; - } - } -} - -// 判断是否为子进程(通过环境变量) -const isWorkerProcess = process.env['NAPCAT_WORKER_PROCESS'] === '1'; - -// 判断是否禁用多进程模式 -const isMultiProcessDisabled = process.env['NAPCAT_DISABLE_MULTI_PROCESS'] === '1'; - -// 只在主进程中导入 utilityProcess -let utilityProcess: any; -if (!isWorkerProcess && !isMultiProcessDisabled) { - // @ts-ignore - electron 运行时存在但类型声明可能缺失 - const electron = await import('electron'); - utilityProcess = electron.utilityProcess; -} +// 环境变量配置 +const ENV = { + isWorkerProcess: process.env['NAPCAT_WORKER_PROCESS'] === '1', + isMultiProcessDisabled: process.env['NAPCAT_DISABLE_MULTI_PROCESS'] === '1', + isPipeDisabled: process.env['NAPCAT_DISABLE_PIPE'] === '1', +} as const; +// 初始化日志 const pathWrapper = new NapCatPathWrapper(); const logger = new LogWrapper(pathWrapper.logsPath); -// 存储当前的 worker 进程引用 -let currentWorker: any = null; +// 进程管理器和当前 Worker 进程引用 +let processManager: IProcessManager | null = null; +let currentWorker: IWorkerProcess | null = null; +let isElectron = false; -// 重启 worker 进程的函数 -export async function restartWorker () { - logger.log('[NapCat] [UtilityProcess] 正在重启Worker进程...'); - - if (currentWorker) { - const workerPid = currentWorker.pid; - logger.log(`[NapCat] [UtilityProcess] 准备关闭Worker进程,PID: ${workerPid}`); - - // 发送关闭信号 - currentWorker.postMessage({ type: 'shutdown' }); - - // 等待进程退出,最多等待 3 秒 - await new Promise((resolve) => { - const timeout = setTimeout(() => { - logger.logWarn('[NapCat] [UtilityProcess] Worker进程未在 3 秒内退出,尝试强制终止'); - currentWorker.kill(); - resolve(); - }, 3000); - - currentWorker.once('exit', () => { - clearTimeout(timeout); - logger.log('[NapCat] [UtilityProcess] Worker进程已正常退出'); - resolve(); - }); - }); - - // 检查进程是否真的被杀掉了 - if (workerPid) { - logger.log(`[NapCat] [UtilityProcess] 检查进程 ${workerPid} 是否已终止...`); - try { - // 尝试发送信号 0 来检查进程是否存在 - process.kill(workerPid, 0); - // 如果没有抛出异常,说明进程还在运行 - logger.logWarn(`[NapCat] [UtilityProcess] 进程 ${workerPid} 仍在运行,强制杀掉`); - try { - // 使用 SIGKILL 强制终止进程 - process.kill(workerPid, 'SIGKILL'); - logger.log(`[NapCat] [UtilityProcess] 已强制终止进程 ${workerPid}`); - } catch (killError) { - logger.logError(`[NapCat] [UtilityProcess] 强制终止进程失败:`, killError); - } - } catch (e) { - // 抛出异常说明进程不存在,已经被成功杀掉 - logger.log(`[NapCat] [UtilityProcess] 进程 ${workerPid} 已确认终止`); - } - } - - // 进程结束后等待 3 秒再启动新进程 - logger.log('[NapCat] [UtilityProcess] Worker进程已关闭,等待 3 秒后启动新进程...'); - await new Promise(resolve => setTimeout(resolve, 3000)); - } - - // 启动新的 worker 进程 - await startWorker(); - logger.log('[NapCat] [UtilityProcess] Worker进程重启完成'); +/** + * 获取进程类型名称(用于日志) + */ +function getProcessTypeName (): string { + return isElectron ? 'UtilityProcess' : 'Fork'; } -async function startWorker () { - // 创建 utility 进程 - // 根据实际构建产物确定文件扩展名 - const workerScript = __filename.endsWith('.mjs') +/** + * 获取 Worker 脚本路径 + */ +function getWorkerScriptPath (): string { + return __filename.endsWith('.mjs') ? path.join(__dirname, 'napcat.mjs') : path.join(__dirname, 'napcat.js'); +} - const child = utilityProcess.fork(workerScript, [], { +/** + * 检查进程是否存在 + */ +function isProcessAlive (pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +/** + * 强制终止进程 + */ +function forceKillProcess (pid: number): void { + try { + process.kill(pid, 'SIGKILL'); + logger.log(`[NapCat] [Process] 已强制终止进程 ${pid}`); + } catch (error) { + logger.logError(`[NapCat] [Process] 强制终止进程失败:`, error); + } +} + +/** + * 重启 Worker 进程 + */ +export async function restartWorker (): Promise { + logger.log('[NapCat] [Process] 正在重启Worker进程...'); + + if (!currentWorker) { + logger.logWarn('[NapCat] [Process] 没有运行中的Worker进程'); + await startWorker(); + return; + } + + const workerPid = currentWorker.pid; + logger.log(`[NapCat] [Process] 准备关闭Worker进程,PID: ${workerPid}`); + + // 1. 通知旧进程准备重启(旧进程会自行退出) + currentWorker.postMessage({ type: 'restart-prepare' }); + + // 2. 等待进程退出(最多 5 秒,给更多时间让进程自行清理) + await new Promise((resolve) => { + const timeout = setTimeout(() => { + logger.logWarn('[NapCat] [Process] Worker进程未在 5 秒内退出,尝试发送强制关闭信号'); + currentWorker?.postMessage({ type: 'shutdown' }); + + // 再等待 2 秒 + setTimeout(() => { + logger.logWarn('[NapCat] [Process] Worker进程仍未退出,尝试 kill'); + currentWorker?.kill(); + resolve(); + }, 2000); + }, 5000); + + currentWorker?.once('exit', () => { + clearTimeout(timeout); + logger.log('[NapCat] [Process] Worker进程已正常退出'); + resolve(); + }); + }); + + // 3. 二次确认进程是否真的被终止(兜底检查) + if (workerPid) { + logger.log(`[NapCat] [Process] 检查进程 ${workerPid} 是否已终止...`); + + if (isProcessAlive(workerPid)) { + logger.logWarn(`[NapCat] [Process] 进程 ${workerPid} 仍在运行,尝试强制杀掉(兜底)`); + forceKillProcess(workerPid); + + // 等待 1 秒后再次检查 + await new Promise(resolve => setTimeout(resolve, 1000)); + + if (isProcessAlive(workerPid)) { + logger.logError(`[NapCat] [Process] 进程 ${workerPid} 无法终止,可能需要手动处理`); + } else { + logger.log(`[NapCat] [Process] 进程 ${workerPid} 已被强制终止`); + } + } else { + logger.log(`[NapCat] [Process] 进程 ${workerPid} 已确认终止`); + } + } + + // 4. 等待 3 秒后启动新进程 + logger.log('[NapCat] [Process] Worker进程已关闭,等待 3 秒后启动新进程...'); + await new Promise(resolve => setTimeout(resolve, 3000)); + + // 5. 启动新进程 + await startWorker(); + logger.log('[NapCat] [Process] Worker进程重启完成'); +} + +/** + * 启动 Worker 进程 + */ +async function startWorker (): Promise { + if (!processManager) { + throw new Error('进程管理器未初始化'); + } + + const workerScript = getWorkerScriptPath(); + const processType = getProcessTypeName(); + + const child = processManager.createWorker(workerScript, [], { env: { ...process.env, NAPCAT_WORKER_PROCESS: '1', }, - stdio: 'pipe', + stdio: isElectron ? 'pipe' : ['inherit', 'pipe', 'pipe', 'ipc'], }); currentWorker = child; - logger.log('[NapCat] [UtilityProcess] 已创建Worker进程,PID:', child.pid); + logger.log(`[NapCat] [${processType}] 已创建Worker进程,PID: ${child.pid}`); - // 监听子进程标准输出 - 直接原始输出 + // 监听标准输出(直接转发) if (child.stdout) { child.stdout.on('data', (data: Buffer) => { process.stdout.write(data); }); } - // 监听子进程标准错误 - 直接原始输出 + // 监听标准错误(直接转发) if (child.stderr) { child.stderr.on('data', (data: Buffer) => { process.stderr.write(data); @@ -131,57 +173,61 @@ async function startWorker () { } // 监听子进程消息 - child.on('message', (msg: any) => { - logger.log('[NapCat] [UtilityProcess] 收到Worker消息:', msg); + child.on('message', (msg: unknown) => { + logger.log(`[NapCat] [${processType}] 收到Worker消息:`, msg); // 处理重启请求 - if (msg?.type === 'restart') { - logger.log('[NapCat] [UtilityProcess] 收到重启请求,正在重启Worker进程...'); + if (typeof msg === 'object' && msg !== null && 'type' in msg && msg.type === 'restart') { + logger.log(`[NapCat] [${processType}] 收到重启请求,正在重启Worker进程...`); restartWorker().catch(e => { - logger.logError('[NapCat] [UtilityProcess] 重启Worker进程失败:', e); + logger.logError(`[NapCat] [${processType}] 重启Worker进程失败:`, e); }); } }); // 监听子进程退出 - child.on('exit', (code: number) => { - if (code !== 0) { - logger.logError(`[NapCat] [UtilityProcess] Worker进程退出,退出码: ${code}`); + child.on('exit', (code: unknown) => { + const exitCode = typeof code === 'number' ? code : 0; + if (exitCode !== 0) { + logger.logError(`[NapCat] [${processType}] Worker进程退出,退出码: ${exitCode}`); } else { - logger.log('[NapCat] [UtilityProcess] Worker进程正常退出'); + logger.log(`[NapCat] [${processType}] Worker进程正常退出`); } - - // 可选:自动重启工作进程 - // logger.log('[NapCat] [UtilityProcess] 正在重启Worker进程...'); - // setTimeout(() => restartWorker(), 1000); }); // 监听子进程生成 child.on('spawn', () => { - logger.log('[NapCat] [UtilityProcess] Worker进程已生成'); + logger.log(`[NapCat] [${processType}] Worker进程已生成`); }); } -async function startMasterProcess () { - logger.log('[NapCat] [UtilityProcess] Master进程启动,PID:', process.pid); +/** + * 启动 Master 进程 + */ +async function startMasterProcess (): Promise { + const processType = getProcessTypeName(); + logger.log(`[NapCat] [${processType}] Master进程启动,PID: ${process.pid}`); - // 连接命名管道,用于输出子进程内容(可通过环境变量禁用) - if (process.env['NAPCAT_DISABLE_PIPE'] !== '1') { - await connectToNamedPipe(logger).catch(e => logger.logError('命名管道连接失败', e)); + // 连接命名管道(可通过环境变量禁用) + if (!ENV.isPipeDisabled) { + await connectToNamedPipe(logger).catch(e => + logger.logError('命名管道连接失败', e) + ); } else { - logger.log('[NapCat] [UtilityProcess] 命名管道已禁用 (NAPCAT_DISABLE_PIPE=1)'); + logger.log(`[NapCat] [${processType}] 命名管道已禁用 (NAPCAT_DISABLE_PIPE=1)`); } - // 启动 worker 进程 + // 启动 Worker 进程 await startWorker(); // 优雅关闭处理 const shutdown = (signal: string) => { - logger.log(`[NapCat] [UtilityProcess] 收到${signal}信号,正在关闭...`); + logger.log(`[NapCat] [Process] 收到${signal}信号,正在关闭...`); + if (currentWorker) { currentWorker.postMessage({ type: 'shutdown' }); setTimeout(() => { - currentWorker.kill(); + currentWorker?.kill(); process.exit(0); }, 1000); } else { @@ -193,57 +239,86 @@ async function startMasterProcess () { process.on('SIGTERM', () => shutdown('SIGTERM')); } -async function startWorkerProcess () { - logger.log('[NapCat] [UtilityProcess] Worker进程启动,PID:', process.pid); +/** + * 启动 Worker 进程(子进程入口) + */ +async function startWorkerProcess (): Promise { + if (!processManager) { + throw new Error('进程管理器未初始化'); + } + + const processType = getProcessTypeName(); + logger.log(`[NapCat] [${processType}] Worker进程启动,PID: ${process.pid}`); // 监听来自父进程的消息 - process.parentPort?.on('message', (e: { data: any; }) => { - const msg = e.data; - if (msg?.type === 'shutdown') { - logger.log('[NapCat] [UtilityProcess] 收到关闭信号,正在退出...'); - process.exit(0); + processManager.onParentMessage((msg: unknown) => { + if (typeof msg === 'object' && msg !== null && 'type' in msg) { + if (msg.type === 'restart-prepare') { + // 收到重启准备信号,主动退出 + logger.log(`[NapCat] [${processType}] 收到重启准备信号,正在主动退出...`); + // 给一点时间让日志输出 + setTimeout(() => { + process.exit(0); + }, 100); + } else if (msg.type === 'shutdown') { + // 收到强制关闭信号 + logger.log(`[NapCat] [${processType}] 收到关闭信号,正在退出...`); + process.exit(0); + } } }); - // 注册重启进程函数到 WebUI(在 Worker 进程中) - const { WebUiDataRuntime } = await import('@/napcat-webui-backend/src/helper/Data'); + // 注册重启进程函数到 WebUI WebUiDataRuntime.setRestartProcessCall(async () => { try { - // 向父进程发送重启请求 - if (process.parentPort) { - process.parentPort.postMessage({ type: 'restart' }); + const success = processManager!.sendToParent({ type: 'restart' }); + + if (success) { return { result: true, message: '进程重启请求已发送' }; } else { return { result: false, message: '无法与主进程通信' }; } } catch (e) { - logger.logError('[NapCat] [UtilityProcess] 发送重启请求失败:', e); - return { result: false, message: '发送重启请求失败: ' + (e as Error).message }; + logger.logError('[NapCat] [Process] 发送重启请求失败:', e); + return { + result: false, + message: '发送重启请求失败: ' + (e as Error).message + }; } }); - // 在子进程中启动NapCat核心 + // 启动 NapCat 核心 await NCoreInitShell(); } -// 主入口 -if (isMultiProcessDisabled) { - // 禁用多进程模式,直接启动 NCoreInitShell - logger.log('[NapCat] [SingleProcess] 多进程模式已禁用,直接启动核心'); - NCoreInitShell().catch((e: Error) => { - logger.logError('[NapCat] [SingleProcess] 启动失败:', e); - process.exit(1); - }); -} else if (isWorkerProcess) { - // Worker进程 - startWorkerProcess().catch((e: Error) => { - logger.logError('[NapCat] [UtilityProcess] Worker进程启动失败:', e); - process.exit(1); - }); -} else { - // Master进程 - startMasterProcess().catch((e: Error) => { - logger.logError('[NapCat] [UtilityProcess] Master进程启动失败:', e); - process.exit(1); - }); +/** + * 主入口 + */ +async function main (): Promise { + // 单进程模式:直接启动核心 + if (ENV.isMultiProcessDisabled) { + logger.log('[NapCat] [SingleProcess] 多进程模式已禁用,直接启动核心'); + await NCoreInitShell(); + return; + } + + // 多进程模式:初始化进程管理器 + const result = await createProcessManager(); + processManager = result.manager; + isElectron = result.isElectron; + + logger.log(`[NapCat] [Process] 检测到 ${isElectron ? 'Electron' : 'Node.js'} 环境`); + + // 根据进程类型启动 + if (ENV.isWorkerProcess) { + await startWorkerProcess(); + } else { + await startMasterProcess(); + } } + +// 启动应用 +main().catch((e: Error) => { + logger.logError('[NapCat] [Process] 启动失败:', e); + process.exit(1); +}); diff --git a/packages/napcat-shell/process-api.ts b/packages/napcat-shell/process-api.ts new file mode 100644 index 00000000..c4316b7c --- /dev/null +++ b/packages/napcat-shell/process-api.ts @@ -0,0 +1,178 @@ +import type { Readable } from 'stream'; +import type { fork as forkType } from 'child_process'; + +// 扩展 Process 类型以支持 parentPort +declare global { + namespace NodeJS { + interface Process { + parentPort?: { + on (event: 'message', listener: (e: { data: unknown; }) => void): void; + postMessage (message: unknown): void; + }; + } + } +} + +/** + * 统一的进程接口 + */ +export interface IWorkerProcess { + readonly pid: number | undefined; + readonly stdout: Readable | null; + readonly stderr: Readable | null; + + postMessage (message: unknown): void; + kill (): boolean; + on (event: string, listener: (...args: unknown[]) => void): void; + once (event: string, listener: (...args: unknown[]) => void): void; +} + +/** + * 进程创建选项 + */ +export interface ProcessOptions { + env: NodeJS.ProcessEnv; + stdio: 'pipe' | 'ignore' | 'inherit' | Array<'pipe' | 'ignore' | 'inherit' | 'ipc'>; +} + +/** + * 进程管理器接口 + */ +export interface IProcessManager { + createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess; + onParentMessage (handler: (message: unknown) => void): void; + sendToParent (message: unknown): boolean; +} + +/** + * Electron utilityProcess 包装器 + */ +class ElectronProcessManager implements IProcessManager { + private utilityProcess: { + fork (modulePath: string, args: string[], options: unknown): unknown; + }; + + constructor (utilityProcess: { fork (modulePath: string, args: string[], options: unknown): unknown; }) { + this.utilityProcess = utilityProcess; + } + + createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess { + const child: any = this.utilityProcess.fork(modulePath, args, options); + + return { + pid: child.pid as number | undefined, + stdout: child.stdout as Readable | null, + stderr: child.stderr as Readable | null, + + postMessage (message: unknown): void { + child.postMessage(message); + }, + + kill (): boolean { + return child.kill() as boolean; + }, + + on (event: string, listener: (...args: unknown[]) => void): void { + child.on(event, listener); + }, + + once (event: string, listener: (...args: unknown[]) => void): void { + child.once(event, listener); + }, + }; + } + + onParentMessage (handler: (message: unknown) => void): void { + if (process.parentPort) { + process.parentPort.on('message', (e: { data: unknown; }) => { + handler(e.data); + }); + } + } + + sendToParent (message: unknown): boolean { + if (process.parentPort) { + process.parentPort.postMessage(message); + return true; + } + return false; + } +} + +/** + * Node.js child_process 包装器 + */ +class NodeProcessManager implements IProcessManager { + private forkFn: typeof forkType; + + constructor (forkFn: typeof forkType) { + this.forkFn = forkFn; + } + + createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess { + const child = this.forkFn(modulePath, args, options as any); + + return { + pid: child.pid, + stdout: child.stdout, + stderr: child.stderr, + + postMessage (message: unknown): void { + if (child.send) { + child.send(message as any); + } + }, + + kill (): boolean { + return child.kill(); + }, + + on (event: string, listener: (...args: unknown[]) => void): void { + child.on(event, listener); + }, + + once (event: string, listener: (...args: unknown[]) => void): void { + child.once(event, listener); + }, + }; + } + + onParentMessage (handler: (message: unknown) => void): void { + process.on('message', (message: unknown) => { + handler(message); + }); + } + + sendToParent (message: unknown): boolean { + if (process.send) { + process.send(message as any); + return true; + } + return false; + } +} + +/** + * 检测运行环境并创建对应的进程管理器 + */ +export async function createProcessManager (): Promise<{ + manager: IProcessManager; + isElectron: boolean; +}> { + const isElectron = typeof process.versions['electron'] !== 'undefined'; + + if (isElectron) { + // @ts-ignore - electron 运行时存在但类型声明可能缺失 + const electron = await import('electron'); + return { + manager: new ElectronProcessManager(electron.utilityProcess), + isElectron: true, + }; + } else { + const { fork } = await import('child_process'); + return { + manager: new NodeProcessManager(fork), + isElectron: false, + }; + } +}