feat: unp4k update

This commit is contained in:
xkeyC
2025-12-10 21:04:47 +08:00
parent c172b623d7
commit 23e909e330
7 changed files with 114 additions and 111 deletions

View File

@@ -7,7 +7,9 @@ use anyhow::{bail, Context, Result};
use bytes::Bytes;
use flutter_rust_bridge::frb;
use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ManagedTorrent, Session, SessionOptions, SessionPersistenceConfig, TorrentStats, TorrentStatsState, WebSeedConfig, api::TorrentIdOrHash, dht::PersistentDhtConfig, limits::LimitsConfig
api::TorrentIdOrHash, dht::PersistentDhtConfig, limits::LimitsConfig, AddTorrent,
AddTorrentOptions, AddTorrentResponse, ManagedTorrent, Session, SessionOptions,
SessionPersistenceConfig, TorrentStats, TorrentStatsState, WebSeedConfig,
};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
@@ -72,7 +74,7 @@ pub struct DownloadGlobalStat {
}
/// Initialize the download manager session with persistence enabled
///
///
/// Parameters:
/// - working_dir: The directory to store session data (persistence, DHT, etc.)
/// - default_download_dir: The default directory to store downloads
@@ -90,7 +92,7 @@ pub async fn downloader_init(
}
let _lock = SESSION_INIT_LOCK.lock().await;
// Double check after acquiring lock
if SESSION.read().is_some() {
return Ok(());
@@ -99,15 +101,15 @@ pub async fn downloader_init(
// Working directory for persistence and session data
let working_folder = PathBuf::from(&working_dir);
std::fs::create_dir_all(&working_folder)?;
// Default download folder
let output_folder = PathBuf::from(&default_download_dir);
std::fs::create_dir_all(&output_folder)?;
// Create persistence folder for session state in working directory
let persistence_folder = working_folder.join("rqbit-session");
std::fs::create_dir_all(&persistence_folder)?;
// DHT persistence file in working directory
let dht_persistence_file = working_folder.join("dht.json");
@@ -131,7 +133,7 @@ pub async fn downloader_init(
upload_bps: upload_limit_bps.and_then(NonZeroU32::new),
download_bps: download_limit_bps.and_then(NonZeroU32::new),
},
webseed_config: Some(WebSeedConfig{
webseed_config: Some(WebSeedConfig {
max_concurrent_per_source: 32,
max_total_concurrent: 64,
request_timeout_secs: 30,
@@ -162,21 +164,21 @@ pub fn downloader_is_initialized() -> bool {
/// Check if there are pending tasks to restore from session file (without starting the downloader)
/// This reads the session.json file directly to check if there are any torrents saved.
///
///
/// Parameters:
/// - working_dir: The directory where session data is stored (same as passed to downloader_init)
///
///
/// Returns: true if there are tasks to restore, false otherwise
#[frb(sync)]
pub fn downloader_has_pending_session_tasks(working_dir: String) -> bool {
let session_file = PathBuf::from(&working_dir)
.join("rqbit-session")
.join("session.json");
if !session_file.exists() {
return false;
}
// Try to read and parse the session file
match std::fs::read_to_string(&session_file) {
Ok(content) => {
@@ -200,7 +202,8 @@ pub fn downloader_has_pending_session_tasks(working_dir: String) -> bool {
/// Helper function to get session
fn get_session() -> Result<Arc<Session>> {
SESSION.read()
SESSION
.read()
.clone()
.context("Downloader not initialized. Call downloader_init first.")
}
@@ -321,7 +324,10 @@ pub async fn downloader_add_url(
.context("Failed to download torrent file")?;
if !response.status().is_success() {
bail!("Failed to download torrent file: HTTP {}", response.status());
bail!(
"Failed to download torrent file: HTTP {}",
response.status()
);
}
let bytes = response
@@ -345,7 +351,10 @@ pub async fn downloader_pause(task_id: usize) -> Result<()> {
};
if let Some(handle) = handle {
session.pause(&handle).await.context("Failed to pause torrent")?;
session
.pause(&handle)
.await
.context("Failed to pause torrent")?;
Ok(())
} else {
bail!("Task not found: {}", task_id)
@@ -362,7 +371,10 @@ pub async fn downloader_resume(task_id: usize) -> Result<()> {
};
if let Some(handle) = handle {
session.unpause(&handle).await.context("Failed to resume torrent")?;
session
.unpause(&handle)
.await
.context("Failed to resume torrent")?;
Ok(())
} else {
bail!("Task not found: {}", task_id)
@@ -427,7 +439,9 @@ pub async fn downloader_get_task_info(task_id: usize) -> Result<DownloadTaskInfo
let (download_speed, upload_speed, num_peers) = if let Some(live) = &stats.live {
let down = (live.download_speed.mbps * 1024.0 * 1024.0) as u64;
let up = (live.upload_speed.mbps * 1024.0 * 1024.0) as u64;
let peers = (live.snapshot.peer_stats.queued + live.snapshot.peer_stats.connecting + live.snapshot.peer_stats.live) as usize;
let peers = (live.snapshot.peer_stats.queued
+ live.snapshot.peer_stats.connecting
+ live.snapshot.peer_stats.live) as usize;
(down, up, peers)
} else {
(0, 0, 0)
@@ -455,7 +469,7 @@ fn get_task_status(stats: &TorrentStats) -> DownloadTaskStatus {
if stats.error.is_some() {
return DownloadTaskStatus::Error;
}
if stats.finished {
return DownloadTaskStatus::Finished;
}
@@ -508,7 +522,9 @@ pub async fn downloader_get_all_tasks() -> Result<Vec<DownloadTaskInfo>> {
let (download_speed, upload_speed, num_peers) = if let Some(live) = &stats.live {
let down = (live.download_speed.mbps * 1024.0 * 1024.0) as u64;
let up = (live.upload_speed.mbps * 1024.0 * 1024.0) as u64;
let peers = (live.snapshot.peer_stats.queued + live.snapshot.peer_stats.connecting + live.snapshot.peer_stats.live) as usize;
let peers = (live.snapshot.peer_stats.queued
+ live.snapshot.peer_stats.connecting
+ live.snapshot.peer_stats.live) as usize;
(down, up, peers)
} else {
(0, 0, 0)
@@ -536,7 +552,7 @@ pub async fn downloader_get_all_tasks() -> Result<Vec<DownloadTaskInfo>> {
// Merge cached completed tasks with IDs based on cache index (10000 + index)
let mut result = tasks.into_inner();
let completed_tasks_cache = COMPLETED_TASKS_CACHE.read();
for (cache_index, task) in completed_tasks_cache.iter().enumerate() {
let mut task_with_id = task.clone();
// Assign ID based on cache index: 10000, 10001, 10002, etc.
@@ -552,11 +568,11 @@ pub async fn downloader_get_global_stats() -> Result<DownloadGlobalStat> {
let tasks = downloader_get_all_tasks().await?;
let mut stat = DownloadGlobalStat::default();
for task in &tasks {
stat.download_speed += task.download_speed;
stat.upload_speed += task.upload_speed;
match task.status {
DownloadTaskStatus::Live => stat.num_active += 1,
DownloadTaskStatus::Paused | DownloadTaskStatus::Checking => stat.num_waiting += 1,
@@ -568,20 +584,23 @@ pub async fn downloader_get_global_stats() -> Result<DownloadGlobalStat> {
}
/// Check if a task with given name exists
///
///
/// Parameters:
/// - name: Task name to search for
/// - downloading_only: If true, only search in active/waiting tasks. If false, include completed tasks (default: true)
pub async fn downloader_is_name_in_task(name: String, downloading_only: Option<bool>) -> bool {
let downloading_only = downloading_only.unwrap_or(true);
if let Ok(tasks) = downloader_get_all_tasks().await {
for task in tasks {
// If downloading_only is true, skip finished and error tasks
if downloading_only && (task.status == DownloadTaskStatus::Finished || task.status == DownloadTaskStatus::Error) {
if downloading_only
&& (task.status == DownloadTaskStatus::Finished
|| task.status == DownloadTaskStatus::Error)
{
continue;
}
if task.name.contains(&name) {
return true;
}
@@ -595,11 +614,11 @@ pub async fn downloader_pause_all() -> Result<()> {
let session = get_session()?;
let handles: Vec<_> = TORRENT_HANDLES.read().values().cloned().collect();
for handle in handles {
let _ = session.pause(&handle).await;
}
Ok(())
}
@@ -608,11 +627,11 @@ pub async fn downloader_resume_all() -> Result<()> {
let session = get_session()?;
let handles: Vec<_> = TORRENT_HANDLES.read().values().cloned().collect();
for handle in handles {
let _ = session.unpause(&handle).await;
}
Ok(())
}
@@ -632,11 +651,11 @@ pub async fn downloader_shutdown() -> Result<()> {
let mut guard = SESSION.write();
guard.take()
};
if let Some(session) = session_opt {
session.stop().await;
}
TORRENT_HANDLES.write().clear();
// Clear completed tasks cache on shutdown
COMPLETED_TASKS_CACHE.write().clear();
@@ -657,7 +676,7 @@ pub fn downloader_clear_completed_tasks_cache() {
}
/// Update global speed limits
/// Note: rqbit Session doesn't support runtime limit changes,
/// Note: rqbit Session doesn't support runtime limit changes,
/// this function is a placeholder that returns an error.
/// Speed limits should be set during downloader_init.
pub async fn downloader_update_speed_limits(
@@ -676,7 +695,7 @@ pub async fn downloader_remove_completed_tasks() -> Result<u32> {
let tasks = downloader_get_all_tasks().await?;
let mut removed_count = 0u32;
for task in tasks {
if task.status == DownloadTaskStatus::Finished {
// Only process active tasks (id < 10000)
@@ -684,7 +703,11 @@ pub async fn downloader_remove_completed_tasks() -> Result<u32> {
let has_handle = TORRENT_HANDLES.read().contains_key(&task.id);
if has_handle {
// Use TorrentIdOrHash::Id for deletion
if session.delete(TorrentIdOrHash::Id(task.id), false).await.is_ok() {
if session
.delete(TorrentIdOrHash::Id(task.id), false)
.await
.is_ok()
{
// Cache the task - it will get ID based on cache length
COMPLETED_TASKS_CACHE.write().push(task.clone());
TORRENT_HANDLES.write().remove(&task.id);
@@ -694,7 +717,7 @@ pub async fn downloader_remove_completed_tasks() -> Result<u32> {
}
}
}
Ok(removed_count)
}
@@ -702,8 +725,9 @@ pub async fn downloader_remove_completed_tasks() -> Result<u32> {
pub async fn downloader_has_active_tasks() -> bool {
if let Ok(tasks) = downloader_get_all_tasks().await {
for task in tasks {
if task.status != DownloadTaskStatus::Finished
&& task.status != DownloadTaskStatus::Error {
if task.status != DownloadTaskStatus::Finished
&& task.status != DownloadTaskStatus::Error
{
return true;
}
}

View File

@@ -3,7 +3,7 @@ use flutter_rust_bridge::frb;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use unp4k::{P4kEntry, P4kFile};
use unp4k::{CryXmlReader, P4kEntry, P4kFile};
/// P4K 文件项信息
#[frb(dart_metadata=("freezed"))]
@@ -32,7 +32,11 @@ fn dos_datetime_to_millis(date: u16, time: u16) -> i64 {
let days_since_epoch = {
let mut days = 0i64;
for y in 1970..year {
days += if (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0) { 366 } else { 365 };
days += if (y % 4 == 0 && y % 100 != 0) || (y % 400 == 0) {
366
} else {
365
};
}
let days_in_months = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
if month >= 1 && month <= 12 {
@@ -45,7 +49,8 @@ fn dos_datetime_to_millis(date: u16, time: u16) -> i64 {
days
};
(days_since_epoch * 86400 + (hour as i64) * 3600 + (minute as i64) * 60 + (second as i64)) * 1000
(days_since_epoch * 86400 + (hour as i64) * 3600 + (minute as i64) * 60 + (second as i64))
* 1000
}
// 全局 P4K 读取器实例(用于保持状态)
@@ -132,6 +137,32 @@ pub async fn p4k_get_all_files() -> Result<Vec<P4kFileItem>> {
pub async fn p4k_extract_to_memory(file_path: String) -> Result<Vec<u8>> {
// 确保文件列表已加载
tokio::task::spawn_blocking(|| ensure_files_loaded()).await??;
// 获取文件 entry 的克隆
let entry = p4k_get_entry(file_path).await?;
// 在后台线程执行阻塞的提取操作
let data = tokio::task::spawn_blocking(move || {
let mut reader = GLOBAL_P4K_READER.lock().unwrap();
if reader.is_none() {
return Err(anyhow!("P4K reader not initialized"));
}
let data = reader.as_mut().unwrap().extract_entry(&entry)?;
if (entry.name.ends_with(".xml") || entry.name.ends_with(".mtl"))
&& CryXmlReader::is_cryxml(&data)
{
let cry_xml_string = CryXmlReader::parse(&data)?;
return Ok(cry_xml_string.into_bytes());
}
Ok::<_, anyhow::Error>(data)
})
.await??;
Ok(data)
}
async fn p4k_get_entry(file_path: String) -> Result<P4kEntry> {
// 确保文件列表已加载
tokio::task::spawn_blocking(|| ensure_files_loaded()).await??;
// 规范化路径
let normalized_path = if file_path.starts_with("\\") {
@@ -149,34 +180,27 @@ pub async fn p4k_extract_to_memory(file_path: String) -> Result<Vec<u8>> {
.clone()
};
// 在后台线程执行阻塞的提取操作
let data = tokio::task::spawn_blocking(move || {
let mut reader = GLOBAL_P4K_READER.lock().unwrap();
if reader.is_none() {
return Err(anyhow!("P4K reader not initialized"));
}
let data = reader.as_mut().unwrap().extract_entry(&entry)?;
Ok::<_, anyhow::Error>(data)
})
.await??;
Ok(data)
Ok(entry)
}
/// 提取文件到磁盘
pub async fn p4k_extract_to_disk(file_path: String, output_path: String) -> Result<()> {
let data = p4k_extract_to_memory(file_path).await?;
let entry = p4k_get_entry(file_path).await?;
// 在后台线程执行阻塞的文件写入操作
tokio::task::spawn_blocking(move || {
let output = PathBuf::from(&output_path);
// 创建父目录
if let Some(parent) = output.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(output, data)?;
let mut reader_guard = GLOBAL_P4K_READER.lock().unwrap();
let reader = reader_guard
.as_mut()
.ok_or_else(|| anyhow!("P4K reader not initialized"))?;
unp4k::p4k_utils::extract_single_file(reader, &entry, &output, true)?;
Ok::<_, anyhow::Error>(())
})
.await??;