fix: add 60s timeout to LLM stream reads and add concurrent scan guard
All checks were successful
BotServer CI/CD / build (push) Successful in 3m53s

- Add tokio timeout to SSE stream reads in OpenAI client (60s)
- Prevents indefinite hang when Kimi/Nvidia stops responding
- Add scanning AtomicBool to prevent concurrent check_gbkb_changes calls
- Skip GBKB scan entirely when all KBs already indexed in Qdrant

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-13 12:58:11 -03:00
parent c1df15eb48
commit 723407cfd6
2 changed files with 86 additions and 44 deletions

View file

@ -55,6 +55,7 @@ pub struct DriveMonitor {
kb_manager: Arc<KnowledgeBaseManager>,
work_root: PathBuf,
is_processing: Arc<AtomicBool>,
scanning: Arc<AtomicBool>,
consecutive_failures: Arc<AtomicU32>,
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
@ -89,6 +90,7 @@ impl DriveMonitor {
kb_manager,
work_root,
is_processing: Arc::new(AtomicBool::new(false)),
scanning: Arc::new(AtomicBool::new(false)),
consecutive_failures: Arc::new(AtomicU32::new(0)),
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())),
@ -1457,6 +1459,16 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
&self,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// Prevent concurrent scans - if already scanning, skip this tick
if self
.scanning
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
trace!("[GBKB] Scan already in progress for bot {}, skipping", self.bot_id);
return Ok(());
}
debug!("[GBKB] check_gbkb_changes ENTER for bot {} (prefix: {})", self.bot_id, self.bucket_name);
let bot_name = self
.bucket_name
@ -1796,6 +1808,7 @@ let file_state = FileState {
debug!("[GBKB] check_gbkb_changes EXIT for bot {}", self.bot_id);
trace!("check_gbkb_changes EXIT");
self.scanning.store(false, Ordering::Release);
Ok(())
}

View file

@ -448,55 +448,76 @@ impl LLMProvider for OpenAIClient {
let handler = get_handler(model);
let mut stream = response.bytes_stream();
// Accumulate tool calls here because OpenAI streams them in fragments
let mut active_tool_calls: Vec<serde_json::Value> = Vec::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
let chunk_str = String::from_utf8_lossy(&chunk);
for line in chunk_str.lines() {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
let content = data["choices"][0]["delta"]["content"].as_str();
if let Some(content) = content {
let processed = handler.process_content(content);
if !processed.is_empty() {
let _ = tx.send(processed).await;
}
}
// Add timeout to stream reads - if Kimi/Nvidia stops responding, fail gracefully
const STREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
// Handle standard OpenAI tool_calls
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
for tool_delta in tool_calls {
if let Some(index) = tool_delta["index"].as_u64() {
let idx = index as usize;
if active_tool_calls.len() <= idx {
active_tool_calls.resize(idx + 1, serde_json::json!({
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": ""
loop {
let chunk_opt = match tokio::time::timeout(
STREAM_TIMEOUT,
stream.next(),
).await {
Ok(opt) => opt,
Err(_) => {
// Timeout - LLM stopped sending data
log::warn!("[LLM] Stream timed out after {}s for model {}",
STREAM_TIMEOUT.as_secs(), model);
let _ = tx.send(format!("[ERROR] LLM response timed out after {} seconds.",
STREAM_TIMEOUT.as_secs())).await;
break;
}
};
match chunk_opt {
Some(Ok(chunk)) => {
let chunk_str = String::from_utf8_lossy(&chunk);
for line in chunk_str.lines() {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
let content = data["choices"][0]["delta"]["content"].as_str();
if let Some(content) = content {
let processed = handler.process_content(content);
if !processed.is_empty() {
let _ = tx.send(processed).await;
}
}
// Handle standard OpenAI tool_calls
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
for tool_delta in tool_calls {
if let Some(index) = tool_delta["index"].as_u64() {
let idx = index as usize;
if active_tool_calls.len() <= idx {
active_tool_calls.resize(idx + 1, serde_json::json!({
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": ""
}
}));
}
}));
}
let current = &mut active_tool_calls[idx];
if let Some(id) = tool_delta["id"].as_str() {
current["id"] = serde_json::Value::String(id.to_string());
}
if let Some(func) = tool_delta.get("function") {
if let Some(name) = func.get("name").and_then(|n| n.as_str()) {
current["function"]["name"] = serde_json::Value::String(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
if let Some(existing_args) = current["function"]["arguments"].as_str() {
let mut new_args = existing_args.to_string();
new_args.push_str(args);
current["function"]["arguments"] = serde_json::Value::String(new_args);
let current = &mut active_tool_calls[idx];
if let Some(id) = tool_delta["id"].as_str() {
current["id"] = serde_json::Value::String(id.to_string());
}
if let Some(func) = tool_delta.get("function") {
if let Some(name) = func.get("name").and_then(|n| n.as_str()) {
current["function"]["name"] = serde_json::Value::String(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
if let Some(existing_args) = current["function"]["arguments"].as_str() {
let mut new_args = existing_args.to_string();
new_args.push_str(args);
current["function"]["arguments"] = serde_json::Value::String(new_args);
}
}
}
}
}
@ -505,6 +526,14 @@ if line.starts_with("data: ") && !line.contains("[DONE]") {
}
}
}
Some(Err(e)) => {
log::error!("[LLM] Stream error: {}", e);
break;
}
None => {
// Stream ended
break;
}
}
}