From 2c6ed89319cff520120e35002f45dd1b4e2db6e9 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 15 Apr 2026 15:24:30 -0300 Subject: [PATCH] fix: add stream_exit logging to trace early loop termination --- src/core/bot/mod.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 440895bc..8a1c2b67 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -951,11 +951,14 @@ while let Some(chunk) = stream_rx.recv().await { // Check if cancellation was requested (user sent new message) match cancel_rx.try_recv() { Ok(_) => { - info!("Streaming cancelled for session {} - user sent new message", session.id); + info!("stream_exit: Cancelled for session {}", session.id); break; } Err(broadcast::error::TryRecvError::Empty) => {} - Err(broadcast::error::TryRecvError::Closed) => break, + Err(broadcast::error::TryRecvError::Closed) => { + info!("stream_exit: Cancel channel closed for session {}", session.id); + break; + } Err(broadcast::error::TryRecvError::Lagged(_)) => {} } @@ -1025,10 +1028,10 @@ while let Some(chunk) = stream_rx.recv().await { context_max_length: 0, }; - if response_tx.send(response).await.is_err() { - warn!("Response channel closed"); - break; - } + if response_tx.send(response).await.is_err() { + warn!("stream_exit: Response channel closed for session {}", session.id); + break; + } } // Start accumulating from { onwards @@ -1089,12 +1092,12 @@ while let Some(chunk) = stream_rx.recv().await { }; if response_tx.send(response).await.is_err() { - warn!("Response channel closed during tool execution"); + warn!("stream_exit: Response channel closed during tool execution for session {}", session.id); break; } } else { error!( - "[TOOL_EXEC] Tool '{}' execution failed: {:?}", + "tool_exec: Tool {} execution failed: {:?}", tc.tool_name, execution_result.error ); @@ -1121,7 +1124,7 @@ while let Some(chunk) = stream_rx.recv().await { }; if response_tx.send(response).await.is_err() { - warn!("Response channel closed during tool error"); + warn!("stream_exit: Response channel closed during tool error for session {}", session.id); break; } } @@ -1160,7 +1163,7 @@ while let Some(chunk) = stream_rx.recv().await { accumulating_tool_call = false; // Reset accumulation flag after flush if response_tx.send(response).await.is_err() { - warn!("Response channel closed"); + warn!("stream_exit: Response channel closed for session {}", session.id); break; } } @@ -1208,7 +1211,7 @@ while let Some(chunk) = stream_rx.recv().await { }; if response_tx.send(thinking_msg).await.is_err() { - warn!("Response channel closed"); + warn!("stream_exit: Response channel closed for session {}", session.id); break; } continue; @@ -1247,7 +1250,7 @@ while let Some(chunk) = stream_rx.recv().await { context_max_length: 0, }; if response_tx.send(response).await.is_err() { - warn!("Response channel closed"); + warn!("stream_exit: Response channel closed for session {}", session.id); break; } }