debug: add WebSocket message tracing
Some checks failed
BotServer CI/CD / build (push) Failing after 4m10s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-13 20:46:28 -03:00
parent 517d5435a9
commit 650cb70961
2 changed files with 31 additions and 2 deletions

View file

@ -10,7 +10,7 @@ use tool_executor::ToolExecutor;
use crate::core::config::ConfigManager;
#[cfg(feature = "drive")]
use crate::drive::drive_monitor::DriveMonitor;
use crate::drive::drive_monitor::{DriveMonitor, set_llm_streaming};
#[cfg(feature = "llm")]
use crate::llm::llm_models;
#[cfg(feature = "llm")]
@ -811,13 +811,20 @@ impl BotOrchestrator {
// Clone messages for the async task
let messages_clone = messages.clone();
// Set flag to prevent DriveMonitor PDF downloads during LLM streaming
#[cfg(feature = "drive")]
set_llm_streaming(true);
let stream_tx_clone = stream_tx.clone();
tokio::spawn(async move {
if let Err(e) = llm
.generate_stream("", &messages_clone, stream_tx, &model_clone, &key_clone, tools_for_llm.as_ref())
.generate_stream("", &messages_clone, stream_tx_clone, &model_clone, &key_clone, tools_for_llm.as_ref())
.await
{
error!("LLM streaming error: {}", e);
}
#[cfg(feature = "drive")]
set_llm_streaming(false);
});
let mut full_response = String::new();
@ -1570,6 +1577,7 @@ let mut send_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
debug!("WebSocket received text: {}", text);
if let Ok(user_msg) = serde_json::from_str::<UserMessage>(&text) {
let orchestrator = BotOrchestrator::new(state_clone.clone());
if let Some(tx_clone) = state_clone

View file

@ -23,6 +23,19 @@ use tokio::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::fs as tokio_fs;
#[cfg(any(feature = "research", feature = "llm"))]
static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
#[cfg(any(feature = "research", feature = "llm"))]
pub fn set_llm_streaming(streaming: bool) {
LLM_STREAMING.store(streaming, Ordering::SeqCst);
}
#[cfg(any(feature = "research", feature = "llm"))]
pub fn is_llm_streaming() -> bool {
LLM_STREAMING.load(Ordering::SeqCst)
}
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 30;
const RETRY_BACKOFF_SECS: i64 = 3600;
@ -1672,6 +1685,14 @@ let file_state = FileState {
files_processed += 1;
debug!("[GBKB] Queue size: {}/10", files_to_process.len());
// Skip downloads if LLM is actively streaming to prevent deadlock
#[cfg(any(feature = "research", feature = "llm"))]
if is_llm_streaming() {
debug!("[GBKB] Skipping download - LLM is streaming, will retry later");
files_to_process.clear();
break;
}
if files_to_process.len() >= 10 {
debug!("[GBKB] Downloading batch of {} files", files_to_process.len());
for file_path in std::mem::take(&mut files_to_process) {