diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 7aa5d102..05a84dbd 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -55,6 +55,7 @@ pub struct DriveMonitor { kb_manager: Arc, work_root: PathBuf, is_processing: Arc, + scanning: Arc, consecutive_failures: Arc, #[cfg(any(feature = "research", feature = "llm"))] files_being_indexed: Arc>>, @@ -89,6 +90,7 @@ impl DriveMonitor { kb_manager, work_root, is_processing: Arc::new(AtomicBool::new(false)), + scanning: Arc::new(AtomicBool::new(false)), consecutive_failures: Arc::new(AtomicU32::new(0)), #[cfg(any(feature = "research", feature = "llm"))] files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())), @@ -1457,6 +1459,16 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), &self, client: &Client, ) -> Result<(), Box> { + // Prevent concurrent scans - if already scanning, skip this tick + if self + .scanning + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + trace!("[GBKB] Scan already in progress for bot {}, skipping", self.bot_id); + return Ok(()); + } + debug!("[GBKB] check_gbkb_changes ENTER for bot {} (prefix: {})", self.bot_id, self.bucket_name); let bot_name = self .bucket_name @@ -1796,6 +1808,7 @@ let file_state = FileState { debug!("[GBKB] check_gbkb_changes EXIT for bot {}", self.bot_id); trace!("check_gbkb_changes EXIT"); + self.scanning.store(false, Ordering::Release); Ok(()) } diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 0182610a..9b9d514c 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -448,55 +448,76 @@ impl LLMProvider for OpenAIClient { let handler = get_handler(model); let mut stream = response.bytes_stream(); - + // Accumulate tool calls here because OpenAI streams them in fragments let mut active_tool_calls: Vec = Vec::new(); - while let Some(chunk_result) = stream.next().await { - let chunk = chunk_result?; - let chunk_str = String::from_utf8_lossy(&chunk); - for line in chunk_str.lines() { -if line.starts_with("data: ") && !line.contains("[DONE]") { - if let Ok(data) = serde_json::from_str::(&line[6..]) { - let content = data["choices"][0]["delta"]["content"].as_str(); - if let Some(content) = content { - let processed = handler.process_content(content); - if !processed.is_empty() { - let _ = tx.send(processed).await; - } - } + // Add timeout to stream reads - if Kimi/Nvidia stops responding, fail gracefully + const STREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); - // Handle standard OpenAI tool_calls - if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { - for tool_delta in tool_calls { - if let Some(index) = tool_delta["index"].as_u64() { - let idx = index as usize; - if active_tool_calls.len() <= idx { - active_tool_calls.resize(idx + 1, serde_json::json!({ - "id": "", - "type": "function", - "function": { - "name": "", - "arguments": "" + loop { + let chunk_opt = match tokio::time::timeout( + STREAM_TIMEOUT, + stream.next(), + ).await { + Ok(opt) => opt, + Err(_) => { + // Timeout - LLM stopped sending data + log::warn!("[LLM] Stream timed out after {}s for model {}", + STREAM_TIMEOUT.as_secs(), model); + let _ = tx.send(format!("[ERROR] LLM response timed out after {} seconds.", + STREAM_TIMEOUT.as_secs())).await; + break; + } + }; + + match chunk_opt { + Some(Ok(chunk)) => { + let chunk_str = String::from_utf8_lossy(&chunk); + for line in chunk_str.lines() { + if line.starts_with("data: ") && !line.contains("[DONE]") { + if let Ok(data) = serde_json::from_str::(&line[6..]) { + let content = data["choices"][0]["delta"]["content"].as_str(); + if let Some(content) = content { + let processed = handler.process_content(content); + if !processed.is_empty() { + let _ = tx.send(processed).await; + } + } + + // Handle standard OpenAI tool_calls + if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { + for tool_delta in tool_calls { + if let Some(index) = tool_delta["index"].as_u64() { + let idx = index as usize; + if active_tool_calls.len() <= idx { + active_tool_calls.resize(idx + 1, serde_json::json!({ + "id": "", + "type": "function", + "function": { + "name": "", + "arguments": "" + } + })); } - })); - } - - let current = &mut active_tool_calls[idx]; - - if let Some(id) = tool_delta["id"].as_str() { - current["id"] = serde_json::Value::String(id.to_string()); - } - - if let Some(func) = tool_delta.get("function") { - if let Some(name) = func.get("name").and_then(|n| n.as_str()) { - current["function"]["name"] = serde_json::Value::String(name.to_string()); - } - if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { - if let Some(existing_args) = current["function"]["arguments"].as_str() { - let mut new_args = existing_args.to_string(); - new_args.push_str(args); - current["function"]["arguments"] = serde_json::Value::String(new_args); + + let current = &mut active_tool_calls[idx]; + + if let Some(id) = tool_delta["id"].as_str() { + current["id"] = serde_json::Value::String(id.to_string()); + } + + if let Some(func) = tool_delta.get("function") { + if let Some(name) = func.get("name").and_then(|n| n.as_str()) { + current["function"]["name"] = serde_json::Value::String(name.to_string()); + } + if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { + if let Some(existing_args) = current["function"]["arguments"].as_str() { + let mut new_args = existing_args.to_string(); + new_args.push_str(args); + current["function"]["arguments"] = serde_json::Value::String(new_args); + } + } } } } @@ -505,6 +526,14 @@ if line.starts_with("data: ") && !line.contains("[DONE]") { } } } + Some(Err(e)) => { + log::error!("[LLM] Stream error: {}", e); + break; + } + None => { + // Stream ended + break; + } } }