From 2a042d400b93deecd73c5502b78b16ef735772ce Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sat, 4 Apr 2026 07:35:03 -0300 Subject: [PATCH] fix: Replace Handle::try_current().block_on() with thread::spawn pattern - Fixes panic: Cannot start a runtime from within a runtime - kb_statistics.rs: Wrap all async calls in std::thread::spawn - post_to.rs: Replace Handle::try_current with thread::spawn + mpsc - Removes dead Handle::try_current checks from sync functions - Follows AGENTS.md pattern for async-from-sync callbacks --- src/basic/keywords/kb_statistics.rs | 106 +++++++++++++++------------- src/basic/keywords/post_to.rs | 66 +++++++++++------ 2 files changed, 99 insertions(+), 73 deletions(-) diff --git a/src/basic/keywords/kb_statistics.rs b/src/basic/keywords/kb_statistics.rs index 68752ecd..0a6f77a8 100644 --- a/src/basic/keywords/kb_statistics.rs +++ b/src/basic/keywords/kb_statistics.rs @@ -45,14 +45,20 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - let Ok(runtime) = rt else { - error!("KB STATISTICS: No tokio runtime available"); - return Dynamic::UNIT; - }; + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { get_kb_statistics(&state, &user).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); - let result = runtime - .block_on(async { get_kb_statistics(&state, &user).await }); + let result = rx.recv().unwrap_or(Err("Channel error".into())); match result { Ok(stats) => match serde_json::to_value(&stats) { @@ -85,16 +91,21 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - if rt.is_err() { - error!("KB COLLECTION STATS: No tokio runtime available"); - return Dynamic::UNIT; - } - let collection = collection_name.to_string(); - let result = rt - .expect("valid syntax registration") - .block_on(async { get_collection_statistics(&state, &collection).await }); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { get_collection_statistics(&state, &collection).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); + + let result = rx.recv().unwrap_or(Err("Channel error".into())); match result { Ok(stats) => match serde_json::to_value(&stats) { @@ -125,15 +136,7 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - if rt.is_err() { - error!("KB DOCUMENTS COUNT: No tokio runtime available"); - return 0; - } - - let result = get_documents_count(&state, &user); - - result.unwrap_or(0) + get_documents_count(&state, &user).unwrap_or(0) }); let state_clone4 = Arc::clone(&state); @@ -150,15 +153,7 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - if rt.is_err() { - error!("KB DOCUMENTS ADDED SINCE: No tokio runtime available"); - return 0; - } - - let result = get_documents_added_since(&state, &user, days); - - result.unwrap_or(0) + get_documents_added_since(&state, &user, days).unwrap_or(0) }); let state_clone5 = Arc::clone(&state); @@ -174,15 +169,20 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - if rt.is_err() { - error!("KB LIST COLLECTIONS: No tokio runtime available"); - return Dynamic::UNIT; - } + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { list_collections(&state, &user).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); - let result = rt - .expect("valid syntax registration") - .block_on(async { list_collections(&state, &user).await }); + let result = rx.recv().unwrap_or(Err("Channel error".into())); match result { Ok(collections) => { @@ -209,16 +209,20 @@ pub fn kb_statistics_keyword(state: Arc, user: UserSession, engine: &m user.user_id ); - let rt = tokio::runtime::Handle::try_current(); - if rt.is_err() { - error!("KB STORAGE SIZE: No tokio runtime available"); - return 0.0; - } - - let result = rt - .expect("valid syntax registration") - .block_on(async { get_storage_size(&state, &user).await }); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { get_storage_size(&state, &user).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); + let result = rx.recv().unwrap_or(Err("Channel error".into())); result.unwrap_or(0.0) }); } diff --git a/src/basic/keywords/post_to.rs b/src/basic/keywords/post_to.rs index 8548df9a..d11ca5c1 100644 --- a/src/basic/keywords/post_to.rs +++ b/src/basic/keywords/post_to.rs @@ -77,14 +77,21 @@ fn post_to_impl( content = content.with_video(vid); } - let rt = tokio::runtime::Handle::try_current().map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("No async runtime available: {}", e).into(), - rhai::Position::NONE, - )) - })?; + let cm = channel_manager.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { cm.post_to(&account_name, &content).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); - let result = rt.block_on(async { channel_manager.post_to(&account_name, &content).await }); + let result = rx.recv().unwrap_or(Err("Channel error".into())); match result { Ok(post_result) => { @@ -140,15 +147,22 @@ fn post_to_multiple_impl( content = content.with_video(vid); } - let rt = tokio::runtime::Handle::try_current().map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("No async runtime available: {}", e).into(), - rhai::Position::NONE, - )) - })?; + let cm = channel_manager.clone(); + let names = account_names.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let results = if let Ok(rt) = rt { + rt.block_on(async { cm.post_to_multiple(&names, &content).await }) + } else { + Vec::new() + }; + let _ = tx.send(results); + }); - let results = - rt.block_on(async { channel_manager.post_to_multiple(&account_names, &content).await }); + let results = rx.recv().unwrap_or_default(); let mut total = 0; let mut successful = 0; @@ -266,14 +280,22 @@ fn post_to_advanced_impl( } } - let rt = tokio::runtime::Handle::try_current().map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("No async runtime available: {}", e).into(), - rhai::Position::NONE, - )) - })?; + let cm = channel_manager.clone(); + let channel_str = channel.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + let result = if let Ok(rt) = rt { + rt.block_on(async { cm.post_to(&channel_str, &content).await }) + } else { + Err("Failed to create runtime".into()) + }; + let _ = tx.send(result); + }); - let result = rt.block_on(async { channel_manager.post_to(&channel, &content).await }); + let result = rx.recv().unwrap_or(Err("Channel error".into())); match result { Ok(post_result) => {