diff --git a/Cargo.toml b/Cargo.toml index d6b80223..11ceeb53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,7 @@ axum = { workspace = true } axum-server = { workspace = true } base64 = { workspace = true } chrono = { workspace = true, features = ["clock", "std"] } +html2md = "0.2" color-eyre = { workspace = true } diesel = { workspace = true, features = ["postgres", "uuid", "chrono", "serde_json", "r2d2", "numeric", "32-column-tables"] } dirs = { workspace = true } diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 055af6ae..4cde562e 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -22,6 +22,7 @@ use crate::core::shared::models::{BotResponse, UserMessage, UserSession}; use crate::core::shared::state::AppState; #[cfg(feature = "chat")] use crate::basic::keywords::add_suggestion::get_suggestions; +use html2md::parse_html; use axum::extract::ws::{Message, WebSocket}; use axum::{ @@ -1278,23 +1279,29 @@ while let Some(chunk) = stream_rx.recv().await { preview.replace('\n', "\\n")); let full_response_len = full_response.len(); - let history_preview = if full_response.len() > 100 { - format!("{}...", full_response.split_at(100).0) + let is_html = full_response.contains("<") && full_response.contains(">"); + let content_for_save = if is_html { + parse_html(&full_response) } else { full_response.clone() }; - info!("history_save: session_id={} user_id={} full_response_len={} preview={}", - session.id, user_id, full_response_len, history_preview); + let history_preview = if content_for_save.len() > 100 { + format!("{}...", content_for_save.split_at(100).0) + } else { + content_for_save.clone() + }; + info!("history_save: session_id={} user_id={} full_response_len={} is_html={} content_len={} preview={}", + session.id, user_id, full_response_len, is_html, content_for_save.len(), history_preview); let state_for_save = self.state.clone(); - let full_response_for_save = full_response.clone(); + let content_for_save_owned = content_for_save; let session_id_for_save = session.id; let user_id_for_save = user_id; let save_result = tokio::task::spawn_blocking( move || -> Result<(), Box> { let mut sm = state_for_save.session_manager.blocking_lock(); - sm.save_message(session_id_for_save, user_id_for_save, 2, &full_response_for_save, 2)?; + sm.save_message(session_id_for_save, user_id_for_save, 2, &content_for_save_owned, 2)?; Ok(()) }, ) diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 43de8d5a..6df3f19e 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -436,60 +436,62 @@ impl LLMProvider for OpenAIClient { return Err(format!("LLM request failed with status: {}", status).into()); } - let handler = get_handler(model); - let mut stream_state = String::new(); // State for the handler to track thinking tags across chunks - let mut stream = response.bytes_stream(); + let handler = get_handler(model); + let mut stream_state = String::new(); + let mut stream = response.bytes_stream(); + let mut first_bytes: Option = None; + let mut last_bytes: String = String::new(); + let mut total_size: usize = 0; + let mut content_sent: usize = 0; - let mut in_reasoning = false; - while let Some(chunk_result) = stream.next().await { - let chunk = chunk_result?; - let chunk_str = String::from_utf8_lossy(&chunk); - for line in chunk_str.lines() { - if line.starts_with("data: ") && !line.contains("[DONE]") { - if let Ok(data) = serde_json::from_str::(&line[6..]) { - // Handle reasoning_content (GLM 4.7 / deepseek / etc) - if let Some(reasoning) = data["choices"][0]["delta"]["reasoning_content"].as_str() { - if !reasoning.is_empty() { - if !in_reasoning { - in_reasoning = true; - } - // Send thinking tokens to UI via JSON stringified message - let thinking_msg = serde_json::json!({ - "type": "thinking", - "content": reasoning - }).to_string(); - let _ = tx.send(thinking_msg).await; + info!("LLM stream starting for model: {}", model); + + let mut in_reasoning = false; + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result?; + total_size += chunk.len(); + let chunk_str = String::from_utf8_lossy(&chunk); + trace!("LLM chunk raw: {} bytes", chunk.len()); + if first_bytes.is_none() { + first_bytes = Some(chunk_str.chars().take(100).collect()); + } + last_bytes = chunk_str.chars().take(100).collect(); + for line in chunk_str.lines() { + if line.starts_with("data: ") && !line.contains("[DONE]") { + if let Ok(data) = serde_json::from_str::(&line[6..]) { + if let Some(reasoning) = data["choices"][0]["delta"]["reasoning_content"].as_str() { + if !reasoning.is_empty() { + if !in_reasoning { + in_reasoning = true; + } + let thinking_msg = serde_json::json!({ + "type": "thinking", + "content": reasoning + }).to_string(); + let _ = tx.send(thinking_msg).await; + } + } + + if let Some(content) = data["choices"][0]["delta"]["content"].as_str() { + if !content.is_empty() { + if in_reasoning { + in_reasoning = false; + let clear_msg = serde_json::json!({"type": "thinking_clear"}).to_string(); + let _ = tx.send(clear_msg).await; + } + let processed = handler.process_content_streaming(content, &mut stream_state); + if !processed.is_empty() { + content_sent += processed.len(); + let _ = tx.send(processed).await; } } + } - // Handle regular content - if let Some(content) = data["choices"][0]["delta"]["content"].as_str() { - if !content.is_empty() { - if in_reasoning { - in_reasoning = false; - // Send clear signal - let clear_msg = serde_json::json!({ - "type": "thinking_clear" - }).to_string(); - let _ = tx.send(clear_msg).await; - } - - let processed = handler.process_content_streaming(content, &mut stream_state); - if !processed.is_empty() { - let _ = tx.send(processed).await; - } - } - } - - // Handle standard OpenAI tool_calls - if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { - for tool_call in tool_calls { - // We send the tool_call object as a JSON string so stream_response - // can buffer it and parse it using ToolExecutor::parse_tool_call - if let Some(func) = tool_call.get("function") { - if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { - let _ = tx.send(args.to_string()).await; - } + if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { + for tool_call in tool_calls { + if let Some(func) = tool_call.get("function") { + if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { + let _ = tx.send(args.to_string()).await; } } } @@ -497,8 +499,12 @@ impl LLMProvider for OpenAIClient { } } } + } - Ok(()) + info!("LLM stream done: size={} bytes, content_sent={}, first={:?}, last={}", + total_size, content_sent, first_bytes, last_bytes); + + Ok(()) } async fn cancel_job(