feat: AdaptiveConcurrencyController

This commit is contained in:
xkeyC
2025-12-05 22:06:55 +08:00
parent 3c60b5a2c1
commit 8898569067
13 changed files with 387 additions and 62 deletions

22
rust/Cargo.lock generated
View File

@@ -2994,7 +2994,7 @@ dependencies = [
[[package]]
name = "librqbit"
version = "9.0.0-beta.1"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"arc-swap",
@@ -3058,7 +3058,7 @@ dependencies = [
[[package]]
name = "librqbit-bencode"
version = "3.1.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"arrayvec",
@@ -3074,7 +3074,7 @@ dependencies = [
[[package]]
name = "librqbit-buffers"
version = "4.2.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"bytes",
"librqbit-clone-to-owned",
@@ -3085,7 +3085,7 @@ dependencies = [
[[package]]
name = "librqbit-clone-to-owned"
version = "3.0.1"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"bytes",
]
@@ -3093,7 +3093,7 @@ dependencies = [
[[package]]
name = "librqbit-core"
version = "5.0.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"bytes",
@@ -3122,7 +3122,7 @@ dependencies = [
[[package]]
name = "librqbit-dht"
version = "5.3.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"backon",
@@ -3169,7 +3169,7 @@ dependencies = [
[[package]]
name = "librqbit-lsd"
version = "0.1.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"atoi",
@@ -3189,7 +3189,7 @@ dependencies = [
[[package]]
name = "librqbit-peer-protocol"
version = "4.3.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"bitvec",
@@ -3209,7 +3209,7 @@ dependencies = [
[[package]]
name = "librqbit-sha1-wrapper"
version = "4.1.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"assert_cfg",
"aws-lc-rs",
@@ -3218,7 +3218,7 @@ dependencies = [
[[package]]
name = "librqbit-tracker-comms"
version = "3.0.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"async-stream",
@@ -3246,7 +3246,7 @@ dependencies = [
[[package]]
name = "librqbit-upnp"
version = "1.0.0"
source = "git+https://github.com/StarCitizenToolBox/rqbit?tag=webseed-v0.0.2#7a9b4d7db84b7b9cccc424e294610cc800a9baa4"
source = "git+https://github.com/StarCitizenToolBox/rqbit?rev=f8c0b0927904e1d8b0e28e708bd69fd8069d413a#f8c0b0927904e1d8b0e28e708bd69fd8069d413a"
dependencies = [
"anyhow",
"bstr",

View File

@@ -35,7 +35,7 @@ unp4k_rs = { git = "https://github.com/StarCitizenToolBox/unp4k_rs", tag = "V0.0
uuid = { version = "1.19.0", features = ["v4"] }
parking_lot = "0.12.5"
crossbeam-channel = "0.5.15"
librqbit = { git = "https://github.com/StarCitizenToolBox/rqbit", tag = "webseed-v0.0.2" }
librqbit = { git = "https://github.com/StarCitizenToolBox/rqbit", rev = "f8c0b0927904e1d8b0e28e708bd69fd8069d413a" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
bytes = "1.10"

View File

@@ -26,9 +26,9 @@ static SESSION_INIT_LOCK: once_cell::sync::Lazy<Mutex<()>> =
static TORRENT_HANDLES: once_cell::sync::Lazy<RwLock<HashMap<usize, ManagedTorrentHandle>>> =
once_cell::sync::Lazy::new(|| RwLock::new(HashMap::new()));
// Store output folders for each task
static TASK_OUTPUT_FOLDERS: once_cell::sync::Lazy<RwLock<HashMap<usize, String>>> =
once_cell::sync::Lazy::new(|| RwLock::new(HashMap::new()));
// Store completed tasks info (in-memory cache, cleared on restart)
static COMPLETED_TASKS_CACHE: once_cell::sync::Lazy<RwLock<Vec<DownloadTaskInfo>>> =
once_cell::sync::Lazy::new(|| RwLock::new(Vec::new()));
/// Download task status
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -132,12 +132,15 @@ pub async fn downloader_init(
},
webseed_config: Some(WebSeedConfig{
max_concurrent_per_source: 32,
max_total_concurrent: 128,
max_total_concurrent: 64,
request_timeout_secs: 30,
prefer_for_large_gaps: true,
min_gap_for_webseed: 10,
max_errors_before_disable: 10,
disable_cooldown_secs: 600,
adaptive_increase_threshold: 5,
adaptive_decrease_threshold: 10,
..Default::default()
}),
..Default::default()
},
@@ -156,6 +159,44 @@ pub fn downloader_is_initialized() -> bool {
SESSION.read().is_some()
}
/// Check if there are pending tasks to restore from session file (without starting the downloader)
/// This reads the session.json file directly to check if there are any torrents saved.
///
/// Parameters:
/// - working_dir: The directory where session data is stored (same as passed to downloader_init)
///
/// Returns: true if there are tasks to restore, false otherwise
#[frb(sync)]
pub fn downloader_has_pending_session_tasks(working_dir: String) -> bool {
let session_file = PathBuf::from(&working_dir)
.join("rqbit-session")
.join("session.json");
if !session_file.exists() {
return false;
}
// Try to read and parse the session file
match std::fs::read_to_string(&session_file) {
Ok(content) => {
// Parse as JSON to check if there are any torrents
// The structure is: { "torrents": { "0": {...}, "1": {...} } }
match serde_json::from_str::<serde_json::Value>(&content) {
Ok(json) => {
if let Some(torrents) = json.get("torrents") {
if let Some(obj) = torrents.as_object() {
return !obj.is_empty();
}
}
false
}
Err(_) => false,
}
}
Err(_) => false,
}
}
/// Helper function to get session
fn get_session() -> Result<Arc<Session>> {
SESSION.read()
@@ -195,17 +236,10 @@ pub async fn downloader_add_torrent(
match response {
AddTorrentResponse::Added(id, handle) => {
// Store output folder
if let Some(folder) = output_folder.clone() {
TASK_OUTPUT_FOLDERS.write().insert(id, folder);
}
TORRENT_HANDLES.write().insert(id, handle);
Ok(id)
}
AddTorrentResponse::AlreadyManaged(id, handle) => {
if let Some(folder) = output_folder.clone() {
TASK_OUTPUT_FOLDERS.write().insert(id, folder);
}
TORRENT_HANDLES.write().insert(id, handle);
Ok(id)
}
@@ -251,16 +285,10 @@ pub async fn downloader_add_magnet(
match response {
AddTorrentResponse::Added(id, handle) => {
if let Some(folder) = output_folder.clone() {
TASK_OUTPUT_FOLDERS.write().insert(id, folder);
}
TORRENT_HANDLES.write().insert(id, handle);
Ok(id)
}
AddTorrentResponse::AlreadyManaged(id, handle) => {
if let Some(folder) = output_folder.clone() {
TASK_OUTPUT_FOLDERS.write().insert(id, folder);
}
TORRENT_HANDLES.write().insert(id, handle);
Ok(id)
}
@@ -364,11 +392,13 @@ pub async fn downloader_get_task_info(task_id: usize) -> Result<DownloadTaskInfo
if let Some(handle) = handle {
let stats = handle.stats();
let name = handle.name().unwrap_or_else(|| format!("Task {}", task_id));
let output_folder = TASK_OUTPUT_FOLDERS
.read()
.get(&task_id)
.cloned()
.unwrap_or_default();
// Get output_folder from handle's shared options
let output_folder = handle
.shared()
.options
.output_folder
.to_string_lossy()
.into_owned();
let status = get_task_status(&stats);
let progress = if stats.total_bytes > 0 {
@@ -440,11 +470,13 @@ pub async fn downloader_get_all_tasks() -> Result<Vec<DownloadTaskInfo>> {
for (id, handle) in torrents {
let stats = handle.stats();
let name = handle.name().unwrap_or_else(|| format!("Task {}", id));
let output_folder = TASK_OUTPUT_FOLDERS
.read()
.get(&id)
.cloned()
.unwrap_or_default();
// Get output_folder from handle's shared options
let output_folder = handle
.shared()
.options
.output_folder
.to_string_lossy()
.into_owned();
let status = get_task_status(&stats);
let progress = if stats.total_bytes > 0 {
@@ -552,7 +584,6 @@ pub async fn downloader_stop() -> Result<()> {
session.stop().await;
}
TORRENT_HANDLES.write().clear();
TASK_OUTPUT_FOLDERS.write().clear();
Ok(())
}
@@ -568,10 +599,24 @@ pub async fn downloader_shutdown() -> Result<()> {
}
TORRENT_HANDLES.write().clear();
TASK_OUTPUT_FOLDERS.write().clear();
// Clear completed tasks cache on shutdown
COMPLETED_TASKS_CACHE.write().clear();
Ok(())
}
/// Get all completed tasks from cache (tasks removed by downloader_remove_completed_tasks)
/// This cache is cleared when the downloader is shutdown/restarted
#[frb(sync)]
pub fn downloader_get_completed_tasks_cache() -> Vec<DownloadTaskInfo> {
COMPLETED_TASKS_CACHE.read().clone()
}
/// Clear the completed tasks cache manually
#[frb(sync)]
pub fn downloader_clear_completed_tasks_cache() {
COMPLETED_TASKS_CACHE.write().clear();
}
/// Update global speed limits
/// Note: rqbit Session doesn't support runtime limit changes,
/// this function is a placeholder that returns an error.
@@ -586,6 +631,7 @@ pub async fn downloader_update_speed_limits(
}
/// Remove all completed tasks (equivalent to aria2's --seed-time=0 behavior)
/// Removed tasks are cached in memory and can be queried via downloader_get_completed_tasks_cache
pub async fn downloader_remove_completed_tasks() -> Result<u32> {
let session = get_session()?;
@@ -599,8 +645,10 @@ pub async fn downloader_remove_completed_tasks() -> Result<u32> {
if has_handle {
// Use TorrentIdOrHash::Id for deletion (TorrentId is just usize)
if session.delete(TorrentIdOrHash::Id(task.id), false).await.is_ok() {
// Save task info to cache before removing
COMPLETED_TASKS_CACHE.write().push(task.clone());
TORRENT_HANDLES.write().remove(&task.id);
TASK_OUTPUT_FOLDERS.write().remove(&task.id);
removed_count += 1;
}
}

View File

@@ -37,7 +37,7 @@ flutter_rust_bridge::frb_generated_boilerplate!(
default_rust_auto_opaque = RustAutoOpaqueNom,
);
pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_VERSION: &str = "2.11.1";
pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_CONTENT_HASH: i32 = -641930410;
pub(crate) const FLUTTER_RUST_BRIDGE_CODEGEN_CONTENT_HASH: i32 = -1482626931;
// Section: executor
@@ -304,6 +304,24 @@ fn wire__crate__api__downloader_api__downloader_add_url_impl(
},
)
}
fn wire__crate__api__downloader_api__downloader_clear_completed_tasks_cache_impl(
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_sync::<flutter_rust_bridge::for_generated::DcoCodec, _>(
flutter_rust_bridge::for_generated::TaskInfo {
debug_name: "downloader_clear_completed_tasks_cache",
port: None,
mode: flutter_rust_bridge::for_generated::FfiCallMode::Sync,
},
move || {
transform_result_dco::<_, _, ()>((move || {
let output_ok = Result::<_, ()>::Ok({
crate::api::downloader_api::downloader_clear_completed_tasks_cache();
})?;
Ok(output_ok)
})())
},
)
}
fn wire__crate__api__downloader_api__downloader_get_all_tasks_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
) {
@@ -327,6 +345,24 @@ fn wire__crate__api__downloader_api__downloader_get_all_tasks_impl(
},
)
}
fn wire__crate__api__downloader_api__downloader_get_completed_tasks_cache_impl(
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_sync::<flutter_rust_bridge::for_generated::DcoCodec, _>(
flutter_rust_bridge::for_generated::TaskInfo {
debug_name: "downloader_get_completed_tasks_cache",
port: None,
mode: flutter_rust_bridge::for_generated::FfiCallMode::Sync,
},
move || {
transform_result_dco::<_, _, ()>((move || {
let output_ok = Result::<_, ()>::Ok(
crate::api::downloader_api::downloader_get_completed_tasks_cache(),
)?;
Ok(output_ok)
})())
},
)
}
fn wire__crate__api__downloader_api__downloader_get_global_stats_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
) {
@@ -400,6 +436,28 @@ fn wire__crate__api__downloader_api__downloader_has_active_tasks_impl(
},
)
}
fn wire__crate__api__downloader_api__downloader_has_pending_session_tasks_impl(
working_dir: impl CstDecode<String>,
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_sync::<flutter_rust_bridge::for_generated::DcoCodec, _>(
flutter_rust_bridge::for_generated::TaskInfo {
debug_name: "downloader_has_pending_session_tasks",
port: None,
mode: flutter_rust_bridge::for_generated::FfiCallMode::Sync,
},
move || {
let api_working_dir = working_dir.cst_decode();
transform_result_dco::<_, _, ()>((move || {
let output_ok = Result::<_, ()>::Ok(
crate::api::downloader_api::downloader_has_pending_session_tasks(
api_working_dir,
),
)?;
Ok(output_ok)
})())
},
)
}
fn wire__crate__api__downloader_api__downloader_init_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
working_dir: impl CstDecode<String>,
@@ -1204,7 +1262,7 @@ fn wire__crate__api__win32_api__remove_nvme_patch_impl(
}
fn wire__crate__api__win32_api__resolve_shortcut_impl(
port_: flutter_rust_bridge::for_generated::MessagePort,
_lnk_path: impl CstDecode<String>,
lnk_path: impl CstDecode<String>,
) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap_normal::<flutter_rust_bridge::for_generated::DcoCodec, _, _>(
flutter_rust_bridge::for_generated::TaskInfo {
@@ -1213,11 +1271,11 @@ fn wire__crate__api__win32_api__resolve_shortcut_impl(
mode: flutter_rust_bridge::for_generated::FfiCallMode::Normal,
},
move || {
let api__lnk_path = _lnk_path.cst_decode();
let api_lnk_path = lnk_path.cst_decode();
move |context| {
transform_result_dco::<_, _, flutter_rust_bridge::for_generated::anyhow::Error>(
(move || {
let output_ok = crate::api::win32_api::resolve_shortcut(api__lnk_path)?;
let output_ok = crate::api::win32_api::resolve_shortcut(api_lnk_path)?;
Ok(output_ok)
})(),
)
@@ -4067,6 +4125,12 @@ mod io {
)
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_clear_completed_tasks_cache(
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
wire__crate__api__downloader_api__downloader_clear_completed_tasks_cache_impl()
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_get_all_tasks(
port_: i64,
@@ -4074,6 +4138,12 @@ mod io {
wire__crate__api__downloader_api__downloader_get_all_tasks_impl(port_)
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_get_completed_tasks_cache(
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
wire__crate__api__downloader_api__downloader_get_completed_tasks_cache_impl()
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_get_global_stats(
port_: i64,
@@ -4096,6 +4166,13 @@ mod io {
wire__crate__api__downloader_api__downloader_has_active_tasks_impl(port_)
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_has_pending_session_tasks(
working_dir: *mut wire_cst_list_prim_u_8_strict,
) -> flutter_rust_bridge::for_generated::WireSyncRust2DartDco {
wire__crate__api__downloader_api__downloader_has_pending_session_tasks_impl(working_dir)
}
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__downloader_api__downloader_init(
port_: i64,
@@ -4378,9 +4455,9 @@ mod io {
#[unsafe(no_mangle)]
pub extern "C" fn frbgen_starcitizen_doctor_wire__crate__api__win32_api__resolve_shortcut(
port_: i64,
_lnk_path: *mut wire_cst_list_prim_u_8_strict,
lnk_path: *mut wire_cst_list_prim_u_8_strict,
) {
wire__crate__api__win32_api__resolve_shortcut_impl(port_, _lnk_path)
wire__crate__api__win32_api__resolve_shortcut_impl(port_, lnk_path)
}
#[unsafe(no_mangle)]