mirror of
https://github.com/NapNeko/NapCatQQ.git
synced 2025-12-24 09:00:06 +08:00
* feat: pnpm new * Refactor build and release workflows, update dependencies Switch build scripts and workflows from npm to pnpm, update build and artifact paths, and simplify release workflow by removing version detection and changelog steps. Add new dependencies (silk-wasm, express, ws, node-pty-prebuilt-multiarch), update exports in package.json files, and add vite config for napcat-framework. Also, rename manifest.json for framework package and fix static asset copying in shell build config.
239 lines
8.0 KiB
Python
239 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
NapCat OneBot WebSocket 文件流上传测试脚本
|
||
用于测试 UploadFileStream 接口的一次性分片上传功能
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import base64
|
||
import hashlib
|
||
import os
|
||
import uuid
|
||
from typing import List, Optional
|
||
import websockets
|
||
import argparse
|
||
from pathlib import Path
|
||
|
||
class OneBotUploadTester:
|
||
def __init__(self, ws_url: str = "ws://localhost:3001", access_token: Optional[str] = None):
|
||
self.ws_url = ws_url
|
||
self.access_token = access_token
|
||
self.websocket = None
|
||
|
||
async def connect(self):
|
||
"""连接到 OneBot WebSocket"""
|
||
headers = {}
|
||
if self.access_token:
|
||
headers["Authorization"] = f"Bearer {self.access_token}"
|
||
|
||
print(f"连接到 {self.ws_url}")
|
||
self.websocket = await websockets.connect(self.ws_url, additional_headers=headers)
|
||
print("WebSocket 连接成功")
|
||
|
||
async def disconnect(self):
|
||
"""断开 WebSocket 连接"""
|
||
if self.websocket:
|
||
await self.websocket.close()
|
||
print("WebSocket 连接已断开")
|
||
|
||
def calculate_file_chunks(self, file_path: str, chunk_size: int = 64) -> tuple[List[bytes], str, int]:
|
||
"""
|
||
计算文件分片和 SHA256
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
chunk_size: 分片大小(默认64KB)
|
||
|
||
Returns:
|
||
(chunks, sha256_hash, total_size)
|
||
"""
|
||
chunks = []
|
||
hasher = hashlib.sha256()
|
||
total_size = 0
|
||
|
||
with open(file_path, 'rb') as f:
|
||
while True:
|
||
chunk = f.read(chunk_size)
|
||
if not chunk:
|
||
break
|
||
chunks.append(chunk)
|
||
hasher.update(chunk)
|
||
total_size += len(chunk)
|
||
|
||
sha256_hash = hasher.hexdigest()
|
||
print(f"文件分析完成:")
|
||
print(f" - 文件大小: {total_size} 字节")
|
||
print(f" - 分片数量: {len(chunks)}")
|
||
print(f" - SHA256: {sha256_hash}")
|
||
|
||
return chunks, sha256_hash, total_size
|
||
|
||
async def send_action(self, action: str, params: dict, echo: str = None) -> dict:
|
||
"""发送 OneBot 动作请求"""
|
||
if not echo:
|
||
echo = str(uuid.uuid4())
|
||
|
||
message = {
|
||
"action": action,
|
||
"params": params,
|
||
"echo": echo
|
||
}
|
||
|
||
print(f"发送请求: {action}")
|
||
await self.websocket.send(json.dumps(message))
|
||
|
||
# 等待响应
|
||
while True:
|
||
response = await self.websocket.recv()
|
||
data = json.loads(response)
|
||
|
||
# 检查是否是我们的响应
|
||
if data.get("echo") == echo:
|
||
return data
|
||
else:
|
||
# 可能是其他消息,继续等待
|
||
print(f"收到其他消息: {data}")
|
||
continue
|
||
|
||
async def upload_file_stream_batch(self, file_path: str, chunk_size: int = 64 ) -> str:
|
||
"""
|
||
一次性批量上传文件流
|
||
|
||
Args:
|
||
file_path: 要上传的文件路径
|
||
chunk_size: 分片大小
|
||
|
||
Returns:
|
||
上传完成后的文件路径
|
||
"""
|
||
file_path = Path(file_path)
|
||
if not file_path.exists():
|
||
raise FileNotFoundError(f"文件不存在: {file_path}")
|
||
|
||
# 分析文件
|
||
chunks, sha256_hash, total_size = self.calculate_file_chunks(str(file_path), chunk_size)
|
||
stream_id = str(uuid.uuid4())
|
||
|
||
print(f"\n开始上传文件: {file_path.name}")
|
||
print(f"流ID: {stream_id}")
|
||
|
||
# 一次性发送所有分片
|
||
total_chunks = len(chunks)
|
||
|
||
for chunk_index, chunk_data in enumerate(chunks):
|
||
# 将分片数据编码为 base64
|
||
chunk_base64 = base64.b64encode(chunk_data).decode('utf-8')
|
||
|
||
# 构建参数
|
||
params = {
|
||
"stream_id": stream_id,
|
||
"chunk_data": chunk_base64,
|
||
"chunk_index": chunk_index,
|
||
"total_chunks": total_chunks,
|
||
"file_size": total_size,
|
||
"expected_sha256": sha256_hash,
|
||
"filename": file_path.name,
|
||
"file_retention": 30 * 1000
|
||
}
|
||
|
||
# 发送分片
|
||
response = await self.send_action("upload_file_stream", params)
|
||
|
||
if response.get("status") != "ok":
|
||
raise Exception(f"上传分片 {chunk_index} 失败: {response}")
|
||
|
||
# 解析流响应
|
||
stream_data = response.get("data", {})
|
||
print(f"分片 {chunk_index + 1}/{total_chunks} 上传成功 "
|
||
f"(接收: {stream_data.get('received_chunks', 0)}/{stream_data.get('total_chunks', 0)})")
|
||
|
||
# 发送完成信号
|
||
print(f"\n所有分片发送完成,请求文件合并...")
|
||
complete_params = {
|
||
"stream_id": stream_id,
|
||
"is_complete": True
|
||
}
|
||
|
||
response = await self.send_action("upload_file_stream", complete_params)
|
||
|
||
if response.get("status") != "ok":
|
||
raise Exception(f"文件合并失败: {response}")
|
||
|
||
result = response.get("data", {})
|
||
|
||
if result.get("status") == "file_complete":
|
||
print(f"✅ 文件上传成功!")
|
||
print(f" - 文件路径: {result.get('file_path')}")
|
||
print(f" - 文件大小: {result.get('file_size')} 字节")
|
||
print(f" - SHA256: {result.get('sha256')}")
|
||
return result.get('file_path')
|
||
else:
|
||
raise Exception(f"文件状态异常: {result}")
|
||
|
||
async def test_upload(self, file_path: str, chunk_size: int = 64 ):
|
||
"""测试文件上传"""
|
||
try:
|
||
await self.connect()
|
||
|
||
# 执行上传
|
||
uploaded_path = await self.upload_file_stream_batch(file_path, chunk_size)
|
||
|
||
print(f"\n🎉 测试完成! 上传后的文件路径: {uploaded_path}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 测试失败: {e}")
|
||
raise
|
||
finally:
|
||
await self.disconnect()
|
||
|
||
def create_test_file(file_path: str, size_mb: float = 1):
|
||
"""创建测试文件"""
|
||
size_bytes = int(size_mb * 1024 * 1024)
|
||
|
||
with open(file_path, 'wb') as f:
|
||
# 写入一些有意义的测试数据
|
||
test_data = b"NapCat Upload Test Data - " * 100
|
||
written = 0
|
||
while written < size_bytes:
|
||
write_size = min(len(test_data), size_bytes - written)
|
||
f.write(test_data[:write_size])
|
||
written += write_size
|
||
|
||
print(f"创建测试文件: {file_path} ({size_mb}MB)")
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser(description="NapCat OneBot 文件流上传测试")
|
||
parser.add_argument("--url", default="ws://localhost:3001", help="WebSocket URL")
|
||
parser.add_argument("--token", help="访问令牌")
|
||
parser.add_argument("--file", help="要上传的文件路径")
|
||
parser.add_argument("--chunk-size", type=int, default=64*1024, help="分片大小(字节)")
|
||
parser.add_argument("--create-test", type=float, help="创建测试文件(MB)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 创建测试文件
|
||
if args.create_test:
|
||
test_file = "test_upload_file.bin"
|
||
create_test_file(test_file, args.create_test)
|
||
if not args.file:
|
||
args.file = test_file
|
||
|
||
if not args.file:
|
||
print("请指定要上传的文件路径,或使用 --create-test 创建测试文件")
|
||
return
|
||
|
||
# 创建测试器并运行
|
||
tester = OneBotUploadTester(args.url, args.token)
|
||
await tester.test_upload(args.file, args.chunk_size)
|
||
|
||
if __name__ == "__main__":
|
||
# 安装依赖提示
|
||
try:
|
||
import websockets
|
||
except ImportError:
|
||
print("请先安装依赖: pip install websockets")
|
||
exit(1)
|
||
|
||
asyncio.run(main()) |