From c5d30adebe3406f53965139b9823157a55f2c762 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 13 Apr 2026 15:07:19 -0300 Subject: [PATCH] revert: restore llm/mod.rs to stable April 9 version Co-authored-by: Qwen-Coder --- src/llm/mod.rs | 140 +++++++++++++++++-------------------------------- 1 file changed, 49 insertions(+), 91 deletions(-) diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 72631637..b8811ea3 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use futures::StreamExt; -use log::{error, info, trace}; +use log::{error, info}; use serde_json::Value; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; @@ -11,7 +11,6 @@ pub mod episodic_memory; pub mod glm; pub mod hallucination_detector; pub mod llm_models; -#[cfg(feature = "llm")] pub mod local; pub mod observability; pub mod rate_limiter; @@ -290,7 +289,7 @@ impl LLMProvider for OpenAIClient { 128000 // Cerebras gpt-oss models and GPT-4 variants } else if model.contains("gpt-3.5") { 16385 - } else if model == "local" || model.is_empty() { + } else if model.starts_with("http://localhost:808") || model == "local" { 768 // Local llama.cpp server context limit } else { 32768 // Default conservative limit for modern models @@ -379,7 +378,7 @@ impl LLMProvider for OpenAIClient { 128000 // Cerebras gpt-oss models and GPT-4 variants } else if model.contains("gpt-3.5") { 16385 - } else if model == "local" || model.is_empty() { + } else if model.starts_with("http://localhost:808") || model == "local" { 768 // Local llama.cpp server context limit } else { 32768 // Default conservative limit for modern models @@ -413,8 +412,7 @@ impl LLMProvider for OpenAIClient { let mut request_body = serde_json::json!({ "model": model, "messages": messages, - "stream": true, - "max_tokens": 16384 + "stream": true }); // Add tools to the request if provided @@ -449,86 +447,54 @@ impl LLMProvider for OpenAIClient { let handler = get_handler(model); let mut stream = response.bytes_stream(); - + // Accumulate tool calls here because OpenAI streams them in fragments let mut active_tool_calls: Vec = Vec::new(); - // Add timeout to stream reads - if Kimi/Nvidia stops responding, fail gracefully - const STREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result?; + let chunk_str = String::from_utf8_lossy(&chunk); + for line in chunk_str.lines() { + if line.starts_with("data: ") && !line.contains("[DONE]") { + if let Ok(data) = serde_json::from_str::(&line[6..]) { + if let Some(content) = data["choices"][0]["delta"]["content"].as_str() { + let processed = handler.process_content(content); + if !processed.is_empty() { + let _ = tx.send(processed).await; + } + } - loop { - let chunk_opt = match tokio::time::timeout( - STREAM_TIMEOUT, - stream.next(), - ).await { - Ok(opt) => opt, - Err(_) => { - // Timeout - LLM stopped sending data - log::warn!("[LLM] Stream timed out after {}s for model {}", - STREAM_TIMEOUT.as_secs(), model); - let _ = tx.send(format!("[ERROR] LLM response timed out after {} seconds.", - STREAM_TIMEOUT.as_secs())).await; - break; - } - }; - - match chunk_opt { - Some(Ok(chunk)) => { - let chunk_str = String::from_utf8_lossy(&chunk); - for line in chunk_str.lines() { - if line.starts_with("data: ") && !line.contains("[DONE]") { - if let Ok(data) = serde_json::from_str::(&line[6..]) { - // Kimi K2.5 and other reasoning models send thinking in "reasoning" field - // Only process "content" (actual response), ignore "reasoning" (thinking) - let content = data["choices"][0]["delta"]["content"].as_str(); - let reasoning = data["choices"][0]["delta"]["reasoning"].as_str(); - - // Log first chunk to help debug reasoning models - if reasoning.is_some() && content.is_none() { - trace!("[LLM] Kimi reasoning chunk (no content yet): {} chars", - reasoning.unwrap_or("").len()); - } - - if let Some(content) = content { - let processed = handler.process_content(content); - if !processed.is_empty() { - let _ = tx.send(processed).await; + // Handle standard OpenAI tool_calls + if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { + for tool_delta in tool_calls { + if let Some(index) = tool_delta["index"].as_u64() { + let idx = index as usize; + if active_tool_calls.len() <= idx { + active_tool_calls.resize(idx + 1, serde_json::json!({ + "id": "", + "type": "function", + "function": { + "name": "", + "arguments": "" + } + })); } - } - - // Handle standard OpenAI tool_calls - if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { - for tool_delta in tool_calls { - if let Some(index) = tool_delta["index"].as_u64() { - let idx = index as usize; - if active_tool_calls.len() <= idx { - active_tool_calls.resize(idx + 1, serde_json::json!({ - "id": "", - "type": "function", - "function": { - "name": "", - "arguments": "" - } - })); - } - - let current = &mut active_tool_calls[idx]; - - if let Some(id) = tool_delta["id"].as_str() { - current["id"] = serde_json::Value::String(id.to_string()); - } - - if let Some(func) = tool_delta.get("function") { - if let Some(name) = func.get("name").and_then(|n| n.as_str()) { - current["function"]["name"] = serde_json::Value::String(name.to_string()); - } - if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { - if let Some(existing_args) = current["function"]["arguments"].as_str() { - let mut new_args = existing_args.to_string(); - new_args.push_str(args); - current["function"]["arguments"] = serde_json::Value::String(new_args); - } - } + + let current = &mut active_tool_calls[idx]; + + if let Some(id) = tool_delta["id"].as_str() { + current["id"] = serde_json::Value::String(id.to_string()); + } + + if let Some(func) = tool_delta.get("function") { + if let Some(name) = func.get("name").and_then(|n| n.as_str()) { + current["function"]["name"] = serde_json::Value::String(name.to_string()); + } + if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { + if let Some(existing_args) = current["function"]["arguments"].as_str() { + let mut new_args = existing_args.to_string(); + new_args.push_str(args); + current["function"]["arguments"] = serde_json::Value::String(new_args); } } } @@ -537,14 +503,6 @@ impl LLMProvider for OpenAIClient { } } } - Some(Err(e)) => { - log::error!("[LLM] Stream error: {}", e); - break; - } - None => { - // Stream ended - break; - } } } @@ -927,10 +885,10 @@ mod tests { fn test_openai_client_new_custom_url() { let client = OpenAIClient::new( "test_key".to_string(), - Some("".to_string()), + Some("http://localhost:9000".to_string()), None, ); - assert_eq!(client.base_url, ""); + assert_eq!(client.base_url, "http://localhost:9000"); } #[test]