fix: add stream_exit logging to trace early loop termination
All checks were successful
BotServer CI/CD / build (push) Successful in 3m49s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-15 15:24:30 -03:00
parent be94e5c3ad
commit 2c6ed89319

View file

@ -951,11 +951,14 @@ while let Some(chunk) = stream_rx.recv().await {
// Check if cancellation was requested (user sent new message)
match cancel_rx.try_recv() {
Ok(_) => {
info!("Streaming cancelled for session {} - user sent new message", session.id);
info!("stream_exit: Cancelled for session {}", session.id);
break;
}
Err(broadcast::error::TryRecvError::Empty) => {}
Err(broadcast::error::TryRecvError::Closed) => break,
Err(broadcast::error::TryRecvError::Closed) => {
info!("stream_exit: Cancel channel closed for session {}", session.id);
break;
}
Err(broadcast::error::TryRecvError::Lagged(_)) => {}
}
@ -1025,10 +1028,10 @@ while let Some(chunk) = stream_rx.recv().await {
context_max_length: 0,
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed");
break;
}
if response_tx.send(response).await.is_err() {
warn!("stream_exit: Response channel closed for session {}", session.id);
break;
}
}
// Start accumulating from { onwards
@ -1089,12 +1092,12 @@ while let Some(chunk) = stream_rx.recv().await {
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed during tool execution");
warn!("stream_exit: Response channel closed during tool execution for session {}", session.id);
break;
}
} else {
error!(
"[TOOL_EXEC] Tool '{}' execution failed: {:?}",
"tool_exec: Tool {} execution failed: {:?}",
tc.tool_name, execution_result.error
);
@ -1121,7 +1124,7 @@ while let Some(chunk) = stream_rx.recv().await {
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed during tool error");
warn!("stream_exit: Response channel closed during tool error for session {}", session.id);
break;
}
}
@ -1160,7 +1163,7 @@ while let Some(chunk) = stream_rx.recv().await {
accumulating_tool_call = false; // Reset accumulation flag after flush
if response_tx.send(response).await.is_err() {
warn!("Response channel closed");
warn!("stream_exit: Response channel closed for session {}", session.id);
break;
}
}
@ -1208,7 +1211,7 @@ while let Some(chunk) = stream_rx.recv().await {
};
if response_tx.send(thinking_msg).await.is_err() {
warn!("Response channel closed");
warn!("stream_exit: Response channel closed for session {}", session.id);
break;
}
continue;
@ -1247,7 +1250,7 @@ while let Some(chunk) = stream_rx.recv().await {
context_max_length: 0,
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed");
warn!("stream_exit: Response channel closed for session {}", session.id);
break;
}
}