Refactor process management with unified API

Introduces a new process-api.ts module to abstract process management for both Electron and Node.js environments. Refactors napcat.ts to use this unified API, improving clarity and maintainability of worker/master process logic, restart handling, and environment detection. Removes unused import from base.ts.
This commit is contained in:
手瓜一十雪 2026-01-17 15:02:54 +08:00
parent 1c7ac42a46
commit 3a880e389b
3 changed files with 402 additions and 150 deletions

View File

@ -29,7 +29,6 @@ import { napCatVersion } from 'napcat-common/src/version';
import { NodeIO3MiscListener } from 'napcat-core/listeners/NodeIO3MiscListener';
import { sleep } from 'napcat-common/src/helper';
import { FFmpegService } from '@/napcat-core/helper/ffmpeg/ffmpeg';
import { connectToNamedPipe } from './pipe';
import { NativePacketHandler } from 'napcat-core/packet/handler/client';
import { logSubscription, LogWrapper } from '@/napcat-core/helper/log';
import { proxiedListenerOf } from '@/napcat-core/helper/proxy-handler';

View File

@ -2,6 +2,8 @@ import { NCoreInitShell } from './base';
import { NapCatPathWrapper } from '@/napcat-common/src/path';
import { LogWrapper } from '@/napcat-core/helper/log';
import { connectToNamedPipe } from './pipe';
import { WebUiDataRuntime } from '@/napcat-webui-backend/src/helper/Data';
import { createProcessManager, type IProcessManager, type IWorkerProcess } from './process-api';
import path from 'path';
import { fileURLToPath } from 'url';
@ -9,121 +11,161 @@ import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 扩展 Process 类型以支持 parentPort
declare global {
namespace NodeJS {
interface Process {
parentPort?: {
on (event: 'message', listener: (e: { data: any; }) => void): void;
postMessage (message: any): void;
};
}
}
}
// 判断是否为子进程(通过环境变量)
const isWorkerProcess = process.env['NAPCAT_WORKER_PROCESS'] === '1';
// 判断是否禁用多进程模式
const isMultiProcessDisabled = process.env['NAPCAT_DISABLE_MULTI_PROCESS'] === '1';
// 只在主进程中导入 utilityProcess
let utilityProcess: any;
if (!isWorkerProcess && !isMultiProcessDisabled) {
// @ts-ignore - electron 运行时存在但类型声明可能缺失
const electron = await import('electron');
utilityProcess = electron.utilityProcess;
}
// 环境变量配置
const ENV = {
isWorkerProcess: process.env['NAPCAT_WORKER_PROCESS'] === '1',
isMultiProcessDisabled: process.env['NAPCAT_DISABLE_MULTI_PROCESS'] === '1',
isPipeDisabled: process.env['NAPCAT_DISABLE_PIPE'] === '1',
} as const;
// 初始化日志
const pathWrapper = new NapCatPathWrapper();
const logger = new LogWrapper(pathWrapper.logsPath);
// 存储当前的 worker 进程引用
let currentWorker: any = null;
// 进程管理器和当前 Worker 进程引用
let processManager: IProcessManager | null = null;
let currentWorker: IWorkerProcess | null = null;
let isElectron = false;
// 重启 worker 进程的函数
export async function restartWorker () {
logger.log('[NapCat] [UtilityProcess] 正在重启Worker进程...');
if (currentWorker) {
const workerPid = currentWorker.pid;
logger.log(`[NapCat] [UtilityProcess] 准备关闭Worker进程PID: ${workerPid}`);
// 发送关闭信号
currentWorker.postMessage({ type: 'shutdown' });
// 等待进程退出,最多等待 3 秒
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
logger.logWarn('[NapCat] [UtilityProcess] Worker进程未在 3 秒内退出,尝试强制终止');
currentWorker.kill();
resolve();
}, 3000);
currentWorker.once('exit', () => {
clearTimeout(timeout);
logger.log('[NapCat] [UtilityProcess] Worker进程已正常退出');
resolve();
});
});
// 检查进程是否真的被杀掉了
if (workerPid) {
logger.log(`[NapCat] [UtilityProcess] 检查进程 ${workerPid} 是否已终止...`);
try {
// 尝试发送信号 0 来检查进程是否存在
process.kill(workerPid, 0);
// 如果没有抛出异常,说明进程还在运行
logger.logWarn(`[NapCat] [UtilityProcess] 进程 ${workerPid} 仍在运行,强制杀掉`);
try {
// 使用 SIGKILL 强制终止进程
process.kill(workerPid, 'SIGKILL');
logger.log(`[NapCat] [UtilityProcess] 已强制终止进程 ${workerPid}`);
} catch (killError) {
logger.logError(`[NapCat] [UtilityProcess] 强制终止进程失败:`, killError);
}
} catch (e) {
// 抛出异常说明进程不存在,已经被成功杀掉
logger.log(`[NapCat] [UtilityProcess] 进程 ${workerPid} 已确认终止`);
}
}
// 进程结束后等待 3 秒再启动新进程
logger.log('[NapCat] [UtilityProcess] Worker进程已关闭等待 3 秒后启动新进程...');
await new Promise(resolve => setTimeout(resolve, 3000));
}
// 启动新的 worker 进程
await startWorker();
logger.log('[NapCat] [UtilityProcess] Worker进程重启完成');
/**
*
*/
function getProcessTypeName (): string {
return isElectron ? 'UtilityProcess' : 'Fork';
}
async function startWorker () {
// 创建 utility 进程
// 根据实际构建产物确定文件扩展名
const workerScript = __filename.endsWith('.mjs')
/**
* Worker
*/
function getWorkerScriptPath (): string {
return __filename.endsWith('.mjs')
? path.join(__dirname, 'napcat.mjs')
: path.join(__dirname, 'napcat.js');
}
const child = utilityProcess.fork(workerScript, [], {
/**
*
*/
function isProcessAlive (pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
/**
*
*/
function forceKillProcess (pid: number): void {
try {
process.kill(pid, 'SIGKILL');
logger.log(`[NapCat] [Process] 已强制终止进程 ${pid}`);
} catch (error) {
logger.logError(`[NapCat] [Process] 强制终止进程失败:`, error);
}
}
/**
* Worker
*/
export async function restartWorker (): Promise<void> {
logger.log('[NapCat] [Process] 正在重启Worker进程...');
if (!currentWorker) {
logger.logWarn('[NapCat] [Process] 没有运行中的Worker进程');
await startWorker();
return;
}
const workerPid = currentWorker.pid;
logger.log(`[NapCat] [Process] 准备关闭Worker进程PID: ${workerPid}`);
// 1. 通知旧进程准备重启(旧进程会自行退出)
currentWorker.postMessage({ type: 'restart-prepare' });
// 2. 等待进程退出(最多 5 秒,给更多时间让进程自行清理)
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
logger.logWarn('[NapCat] [Process] Worker进程未在 5 秒内退出,尝试发送强制关闭信号');
currentWorker?.postMessage({ type: 'shutdown' });
// 再等待 2 秒
setTimeout(() => {
logger.logWarn('[NapCat] [Process] Worker进程仍未退出尝试 kill');
currentWorker?.kill();
resolve();
}, 2000);
}, 5000);
currentWorker?.once('exit', () => {
clearTimeout(timeout);
logger.log('[NapCat] [Process] Worker进程已正常退出');
resolve();
});
});
// 3. 二次确认进程是否真的被终止(兜底检查)
if (workerPid) {
logger.log(`[NapCat] [Process] 检查进程 ${workerPid} 是否已终止...`);
if (isProcessAlive(workerPid)) {
logger.logWarn(`[NapCat] [Process] 进程 ${workerPid} 仍在运行,尝试强制杀掉(兜底)`);
forceKillProcess(workerPid);
// 等待 1 秒后再次检查
await new Promise(resolve => setTimeout(resolve, 1000));
if (isProcessAlive(workerPid)) {
logger.logError(`[NapCat] [Process] 进程 ${workerPid} 无法终止,可能需要手动处理`);
} else {
logger.log(`[NapCat] [Process] 进程 ${workerPid} 已被强制终止`);
}
} else {
logger.log(`[NapCat] [Process] 进程 ${workerPid} 已确认终止`);
}
}
// 4. 等待 3 秒后启动新进程
logger.log('[NapCat] [Process] Worker进程已关闭等待 3 秒后启动新进程...');
await new Promise(resolve => setTimeout(resolve, 3000));
// 5. 启动新进程
await startWorker();
logger.log('[NapCat] [Process] Worker进程重启完成');
}
/**
* Worker
*/
async function startWorker (): Promise<void> {
if (!processManager) {
throw new Error('进程管理器未初始化');
}
const workerScript = getWorkerScriptPath();
const processType = getProcessTypeName();
const child = processManager.createWorker(workerScript, [], {
env: {
...process.env,
NAPCAT_WORKER_PROCESS: '1',
},
stdio: 'pipe',
stdio: isElectron ? 'pipe' : ['inherit', 'pipe', 'pipe', 'ipc'],
});
currentWorker = child;
logger.log('[NapCat] [UtilityProcess] 已创建Worker进程PID:', child.pid);
logger.log(`[NapCat] [${processType}] 已创建Worker进程PID: ${child.pid}`);
// 监听子进程标准输出 - 直接原始输出
// 监听标准输出(直接转发)
if (child.stdout) {
child.stdout.on('data', (data: Buffer) => {
process.stdout.write(data);
});
}
// 监听子进程标准错误 - 直接原始输出
// 监听标准错误(直接转发)
if (child.stderr) {
child.stderr.on('data', (data: Buffer) => {
process.stderr.write(data);
@ -131,57 +173,61 @@ async function startWorker () {
}
// 监听子进程消息
child.on('message', (msg: any) => {
logger.log('[NapCat] [UtilityProcess] 收到Worker消息:', msg);
child.on('message', (msg: unknown) => {
logger.log(`[NapCat] [${processType}] 收到Worker消息:`, msg);
// 处理重启请求
if (msg?.type === 'restart') {
logger.log('[NapCat] [UtilityProcess] 收到重启请求正在重启Worker进程...');
if (typeof msg === 'object' && msg !== null && 'type' in msg && msg.type === 'restart') {
logger.log(`[NapCat] [${processType}] 收到重启请求正在重启Worker进程...`);
restartWorker().catch(e => {
logger.logError('[NapCat] [UtilityProcess] 重启Worker进程失败:', e);
logger.logError(`[NapCat] [${processType}] 重启Worker进程失败:`, e);
});
}
});
// 监听子进程退出
child.on('exit', (code: number) => {
if (code !== 0) {
logger.logError(`[NapCat] [UtilityProcess] Worker进程退出退出码: ${code}`);
child.on('exit', (code: unknown) => {
const exitCode = typeof code === 'number' ? code : 0;
if (exitCode !== 0) {
logger.logError(`[NapCat] [${processType}] Worker进程退出退出码: ${exitCode}`);
} else {
logger.log('[NapCat] [UtilityProcess] Worker进程正常退出');
logger.log(`[NapCat] [${processType}] Worker进程正常退出`);
}
// 可选:自动重启工作进程
// logger.log('[NapCat] [UtilityProcess] 正在重启Worker进程...');
// setTimeout(() => restartWorker(), 1000);
});
// 监听子进程生成
child.on('spawn', () => {
logger.log('[NapCat] [UtilityProcess] Worker进程已生成');
logger.log(`[NapCat] [${processType}] Worker进程已生成`);
});
}
async function startMasterProcess () {
logger.log('[NapCat] [UtilityProcess] Master进程启动PID:', process.pid);
/**
* Master
*/
async function startMasterProcess (): Promise<void> {
const processType = getProcessTypeName();
logger.log(`[NapCat] [${processType}] Master进程启动PID: ${process.pid}`);
// 连接命名管道,用于输出子进程内容(可通过环境变量禁用)
if (process.env['NAPCAT_DISABLE_PIPE'] !== '1') {
await connectToNamedPipe(logger).catch(e => logger.logError('命名管道连接失败', e));
// 连接命名管道(可通过环境变量禁用)
if (!ENV.isPipeDisabled) {
await connectToNamedPipe(logger).catch(e =>
logger.logError('命名管道连接失败', e)
);
} else {
logger.log('[NapCat] [UtilityProcess] 命名管道已禁用 (NAPCAT_DISABLE_PIPE=1)');
logger.log(`[NapCat] [${processType}] 命名管道已禁用 (NAPCAT_DISABLE_PIPE=1)`);
}
// 启动 worker 进程
// 启动 Worker 进程
await startWorker();
// 优雅关闭处理
const shutdown = (signal: string) => {
logger.log(`[NapCat] [UtilityProcess] 收到${signal}信号,正在关闭...`);
logger.log(`[NapCat] [Process] 收到${signal}信号,正在关闭...`);
if (currentWorker) {
currentWorker.postMessage({ type: 'shutdown' });
setTimeout(() => {
currentWorker.kill();
currentWorker?.kill();
process.exit(0);
}, 1000);
} else {
@ -193,57 +239,86 @@ async function startMasterProcess () {
process.on('SIGTERM', () => shutdown('SIGTERM'));
}
async function startWorkerProcess () {
logger.log('[NapCat] [UtilityProcess] Worker进程启动PID:', process.pid);
/**
* Worker
*/
async function startWorkerProcess (): Promise<void> {
if (!processManager) {
throw new Error('进程管理器未初始化');
}
const processType = getProcessTypeName();
logger.log(`[NapCat] [${processType}] Worker进程启动PID: ${process.pid}`);
// 监听来自父进程的消息
process.parentPort?.on('message', (e: { data: any; }) => {
const msg = e.data;
if (msg?.type === 'shutdown') {
logger.log('[NapCat] [UtilityProcess] 收到关闭信号,正在退出...');
process.exit(0);
processManager.onParentMessage((msg: unknown) => {
if (typeof msg === 'object' && msg !== null && 'type' in msg) {
if (msg.type === 'restart-prepare') {
// 收到重启准备信号,主动退出
logger.log(`[NapCat] [${processType}] 收到重启准备信号,正在主动退出...`);
// 给一点时间让日志输出
setTimeout(() => {
process.exit(0);
}, 100);
} else if (msg.type === 'shutdown') {
// 收到强制关闭信号
logger.log(`[NapCat] [${processType}] 收到关闭信号,正在退出...`);
process.exit(0);
}
}
});
// 注册重启进程函数到 WebUI在 Worker 进程中)
const { WebUiDataRuntime } = await import('@/napcat-webui-backend/src/helper/Data');
// 注册重启进程函数到 WebUI
WebUiDataRuntime.setRestartProcessCall(async () => {
try {
// 向父进程发送重启请求
if (process.parentPort) {
process.parentPort.postMessage({ type: 'restart' });
const success = processManager!.sendToParent({ type: 'restart' });
if (success) {
return { result: true, message: '进程重启请求已发送' };
} else {
return { result: false, message: '无法与主进程通信' };
}
} catch (e) {
logger.logError('[NapCat] [UtilityProcess] 发送重启请求失败:', e);
return { result: false, message: '发送重启请求失败: ' + (e as Error).message };
logger.logError('[NapCat] [Process] 发送重启请求失败:', e);
return {
result: false,
message: '发送重启请求失败: ' + (e as Error).message
};
}
});
// 在子进程中启动NapCat核心
// 启动 NapCat 核心
await NCoreInitShell();
}
// 主入口
if (isMultiProcessDisabled) {
// 禁用多进程模式,直接启动 NCoreInitShell
logger.log('[NapCat] [SingleProcess] 多进程模式已禁用,直接启动核心');
NCoreInitShell().catch((e: Error) => {
logger.logError('[NapCat] [SingleProcess] 启动失败:', e);
process.exit(1);
});
} else if (isWorkerProcess) {
// Worker进程
startWorkerProcess().catch((e: Error) => {
logger.logError('[NapCat] [UtilityProcess] Worker进程启动失败:', e);
process.exit(1);
});
} else {
// Master进程
startMasterProcess().catch((e: Error) => {
logger.logError('[NapCat] [UtilityProcess] Master进程启动失败:', e);
process.exit(1);
});
/**
*
*/
async function main (): Promise<void> {
// 单进程模式:直接启动核心
if (ENV.isMultiProcessDisabled) {
logger.log('[NapCat] [SingleProcess] 多进程模式已禁用,直接启动核心');
await NCoreInitShell();
return;
}
// 多进程模式:初始化进程管理器
const result = await createProcessManager();
processManager = result.manager;
isElectron = result.isElectron;
logger.log(`[NapCat] [Process] 检测到 ${isElectron ? 'Electron' : 'Node.js'} 环境`);
// 根据进程类型启动
if (ENV.isWorkerProcess) {
await startWorkerProcess();
} else {
await startMasterProcess();
}
}
// 启动应用
main().catch((e: Error) => {
logger.logError('[NapCat] [Process] 启动失败:', e);
process.exit(1);
});

View File

@ -0,0 +1,178 @@
import type { Readable } from 'stream';
import type { fork as forkType } from 'child_process';
// 扩展 Process 类型以支持 parentPort
declare global {
namespace NodeJS {
interface Process {
parentPort?: {
on (event: 'message', listener: (e: { data: unknown; }) => void): void;
postMessage (message: unknown): void;
};
}
}
}
/**
*
*/
export interface IWorkerProcess {
readonly pid: number | undefined;
readonly stdout: Readable | null;
readonly stderr: Readable | null;
postMessage (message: unknown): void;
kill (): boolean;
on (event: string, listener: (...args: unknown[]) => void): void;
once (event: string, listener: (...args: unknown[]) => void): void;
}
/**
*
*/
export interface ProcessOptions {
env: NodeJS.ProcessEnv;
stdio: 'pipe' | 'ignore' | 'inherit' | Array<'pipe' | 'ignore' | 'inherit' | 'ipc'>;
}
/**
*
*/
export interface IProcessManager {
createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess;
onParentMessage (handler: (message: unknown) => void): void;
sendToParent (message: unknown): boolean;
}
/**
* Electron utilityProcess
*/
class ElectronProcessManager implements IProcessManager {
private utilityProcess: {
fork (modulePath: string, args: string[], options: unknown): unknown;
};
constructor (utilityProcess: { fork (modulePath: string, args: string[], options: unknown): unknown; }) {
this.utilityProcess = utilityProcess;
}
createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess {
const child: any = this.utilityProcess.fork(modulePath, args, options);
return {
pid: child.pid as number | undefined,
stdout: child.stdout as Readable | null,
stderr: child.stderr as Readable | null,
postMessage (message: unknown): void {
child.postMessage(message);
},
kill (): boolean {
return child.kill() as boolean;
},
on (event: string, listener: (...args: unknown[]) => void): void {
child.on(event, listener);
},
once (event: string, listener: (...args: unknown[]) => void): void {
child.once(event, listener);
},
};
}
onParentMessage (handler: (message: unknown) => void): void {
if (process.parentPort) {
process.parentPort.on('message', (e: { data: unknown; }) => {
handler(e.data);
});
}
}
sendToParent (message: unknown): boolean {
if (process.parentPort) {
process.parentPort.postMessage(message);
return true;
}
return false;
}
}
/**
* Node.js child_process
*/
class NodeProcessManager implements IProcessManager {
private forkFn: typeof forkType;
constructor (forkFn: typeof forkType) {
this.forkFn = forkFn;
}
createWorker (modulePath: string, args: string[], options: ProcessOptions): IWorkerProcess {
const child = this.forkFn(modulePath, args, options as any);
return {
pid: child.pid,
stdout: child.stdout,
stderr: child.stderr,
postMessage (message: unknown): void {
if (child.send) {
child.send(message as any);
}
},
kill (): boolean {
return child.kill();
},
on (event: string, listener: (...args: unknown[]) => void): void {
child.on(event, listener);
},
once (event: string, listener: (...args: unknown[]) => void): void {
child.once(event, listener);
},
};
}
onParentMessage (handler: (message: unknown) => void): void {
process.on('message', (message: unknown) => {
handler(message);
});
}
sendToParent (message: unknown): boolean {
if (process.send) {
process.send(message as any);
return true;
}
return false;
}
}
/**
*
*/
export async function createProcessManager (): Promise<{
manager: IProcessManager;
isElectron: boolean;
}> {
const isElectron = typeof process.versions['electron'] !== 'undefined';
if (isElectron) {
// @ts-ignore - electron 运行时存在但类型声明可能缺失
const electron = await import('electron');
return {
manager: new ElectronProcessManager(electron.utilityProcess),
isElectron: true,
};
} else {
const { fork } = await import('child_process');
return {
manager: new NodeProcessManager(fork),
isElectron: false,
};
}
}