From 3159d04414103b3afdc3da109898a7910269a1e7 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 14 Apr 2026 08:59:10 -0300 Subject: [PATCH] fix: spawn LLM response in separate task to prevent recv_task blocking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the recv_task awaited stream_response() directly, which froze the entire WebSocket message receiver while the LLM ran (30s+). This meant a second user message couldn't be processed until the first LLM call finished — a race condition that locked the session. Now stream_response runs in its own tokio::spawn, keeping recv_task free to handle new messages immediately. Also fixed borrow/lifetime issue by cloning the response channel sender out of the lock scope. Co-authored-by: Qwen-Coder --- src/core/bot/mod.rs | 75 +++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 6292c054..23b3bad2 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -1604,39 +1604,37 @@ let mut send_task = tokio::spawn(async move { info!("Processing message for session {}", session_id); if let Ok(user_msg) = serde_json::from_str::(&text) { - let orchestrator = BotOrchestrator::new(state_clone.clone()); - if let Some(tx_clone) = state_clone - .response_channels - .lock() - .await - .get(&session_id.to_string()) - { - // Ensure session exists - create if not - let session_result = { - let mut sm = state_clone.session_manager.lock().await; - sm.get_session_by_id(session_id) - }; + // Get session first, outside any lock scope + let session_result = { + let mut sm = state_clone.session_manager.lock().await; + sm.get_session_by_id(session_id) + }; - let session = match session_result { - Ok(Some(sess)) => sess, - Ok(None) => { - // Use session manager to create session (will generate new UUID) - let mut sm = state_clone.session_manager.lock().await; - match sm.create_session(user_id, bot_id, "WebSocket Chat") { - Ok(new_session) => new_session, - Err(e) => { - error!("Failed to create session: {}", e); - continue; - } + let session = match session_result { + Ok(Some(sess)) => sess, + Ok(None) => { + let mut sm = state_clone.session_manager.lock().await; + match sm.create_session(user_id, bot_id, "WebSocket Chat") { + Ok(new_session) => new_session, + Err(e) => { + error!("Failed to create session: {}", e); + continue; } } - Err(e) => { - error!("Error getting session: {}", e); - continue; - } - }; + } + Err(e) => { + error!("Error getting session: {}", e); + continue; + } + }; - // Use bot_id from WebSocket connection instead of from message + // Get response channel sender out of lock scope + let tx_opt = { + let channels = state_clone.response_channels.lock().await; + channels.get(&session_id.to_string()).cloned() + }; + + if let Some(tx_clone) = tx_opt { let corrected_msg = UserMessage { bot_id: bot_id.to_string(), user_id: session.user_id.to_string(), @@ -1644,13 +1642,18 @@ let mut send_task = tokio::spawn(async move { ..user_msg }; info!("Calling orchestrator for session {}", session_id); - - if let Err(e) = orchestrator - .stream_response(corrected_msg, tx_clone.clone()) - .await - { - error!("Failed to stream response: {}", e); - } + + // Spawn LLM in its own task so recv_task stays free to handle + // new messages — prevents one hung LLM from locking the session. + let orch = BotOrchestrator::new(state_clone.clone()); + tokio::spawn(async move { + if let Err(e) = orch + .stream_response(corrected_msg, tx_clone) + .await + { + error!("Failed to stream response: {}", e); + } + }); } else { warn!("Response channel NOT found for session: {}", session_id); }