Improve LLM streaming logs: start/end with size/content, first/last bytes; fix html2md function name
Some checks failed
BotServer CI/CD / build (push) Failing after 16m34s
Some checks failed
BotServer CI/CD / build (push) Failing after 16m34s
This commit is contained in:
parent
ae814508d2
commit
e63c187f32
3 changed files with 71 additions and 57 deletions
|
|
@ -114,6 +114,7 @@ axum = { workspace = true }
|
|||
axum-server = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
chrono = { workspace = true, features = ["clock", "std"] }
|
||||
html2md = "0.2"
|
||||
color-eyre = { workspace = true }
|
||||
diesel = { workspace = true, features = ["postgres", "uuid", "chrono", "serde_json", "r2d2", "numeric", "32-column-tables"] }
|
||||
dirs = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ use crate::core::shared::models::{BotResponse, UserMessage, UserSession};
|
|||
use crate::core::shared::state::AppState;
|
||||
#[cfg(feature = "chat")]
|
||||
use crate::basic::keywords::add_suggestion::get_suggestions;
|
||||
use html2md::parse_html;
|
||||
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use axum::{
|
||||
|
|
@ -1278,23 +1279,29 @@ while let Some(chunk) = stream_rx.recv().await {
|
|||
preview.replace('\n', "\\n"));
|
||||
|
||||
let full_response_len = full_response.len();
|
||||
let history_preview = if full_response.len() > 100 {
|
||||
format!("{}...", full_response.split_at(100).0)
|
||||
let is_html = full_response.contains("<") && full_response.contains(">");
|
||||
let content_for_save = if is_html {
|
||||
parse_html(&full_response)
|
||||
} else {
|
||||
full_response.clone()
|
||||
};
|
||||
info!("history_save: session_id={} user_id={} full_response_len={} preview={}",
|
||||
session.id, user_id, full_response_len, history_preview);
|
||||
let history_preview = if content_for_save.len() > 100 {
|
||||
format!("{}...", content_for_save.split_at(100).0)
|
||||
} else {
|
||||
content_for_save.clone()
|
||||
};
|
||||
info!("history_save: session_id={} user_id={} full_response_len={} is_html={} content_len={} preview={}",
|
||||
session.id, user_id, full_response_len, is_html, content_for_save.len(), history_preview);
|
||||
|
||||
let state_for_save = self.state.clone();
|
||||
let full_response_for_save = full_response.clone();
|
||||
let content_for_save_owned = content_for_save;
|
||||
let session_id_for_save = session.id;
|
||||
let user_id_for_save = user_id;
|
||||
|
||||
let save_result = tokio::task::spawn_blocking(
|
||||
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut sm = state_for_save.session_manager.blocking_lock();
|
||||
sm.save_message(session_id_for_save, user_id_for_save, 2, &full_response_for_save, 2)?;
|
||||
sm.save_message(session_id_for_save, user_id_for_save, 2, &content_for_save_owned, 2)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
|
|
|
|||
108
src/llm/mod.rs
108
src/llm/mod.rs
|
|
@ -436,60 +436,62 @@ impl LLMProvider for OpenAIClient {
|
|||
return Err(format!("LLM request failed with status: {}", status).into());
|
||||
}
|
||||
|
||||
let handler = get_handler(model);
|
||||
let mut stream_state = String::new(); // State for the handler to track thinking tags across chunks
|
||||
let mut stream = response.bytes_stream();
|
||||
let handler = get_handler(model);
|
||||
let mut stream_state = String::new();
|
||||
let mut stream = response.bytes_stream();
|
||||
let mut first_bytes: Option<String> = None;
|
||||
let mut last_bytes: String = String::new();
|
||||
let mut total_size: usize = 0;
|
||||
let mut content_sent: usize = 0;
|
||||
|
||||
let mut in_reasoning = false;
|
||||
while let Some(chunk_result) = stream.next().await {
|
||||
let chunk = chunk_result?;
|
||||
let chunk_str = String::from_utf8_lossy(&chunk);
|
||||
for line in chunk_str.lines() {
|
||||
if line.starts_with("data: ") && !line.contains("[DONE]") {
|
||||
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
|
||||
// Handle reasoning_content (GLM 4.7 / deepseek / etc)
|
||||
if let Some(reasoning) = data["choices"][0]["delta"]["reasoning_content"].as_str() {
|
||||
if !reasoning.is_empty() {
|
||||
if !in_reasoning {
|
||||
in_reasoning = true;
|
||||
}
|
||||
// Send thinking tokens to UI via JSON stringified message
|
||||
let thinking_msg = serde_json::json!({
|
||||
"type": "thinking",
|
||||
"content": reasoning
|
||||
}).to_string();
|
||||
let _ = tx.send(thinking_msg).await;
|
||||
info!("LLM stream starting for model: {}", model);
|
||||
|
||||
let mut in_reasoning = false;
|
||||
while let Some(chunk_result) = stream.next().await {
|
||||
let chunk = chunk_result?;
|
||||
total_size += chunk.len();
|
||||
let chunk_str = String::from_utf8_lossy(&chunk);
|
||||
trace!("LLM chunk raw: {} bytes", chunk.len());
|
||||
if first_bytes.is_none() {
|
||||
first_bytes = Some(chunk_str.chars().take(100).collect());
|
||||
}
|
||||
last_bytes = chunk_str.chars().take(100).collect();
|
||||
for line in chunk_str.lines() {
|
||||
if line.starts_with("data: ") && !line.contains("[DONE]") {
|
||||
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
|
||||
if let Some(reasoning) = data["choices"][0]["delta"]["reasoning_content"].as_str() {
|
||||
if !reasoning.is_empty() {
|
||||
if !in_reasoning {
|
||||
in_reasoning = true;
|
||||
}
|
||||
let thinking_msg = serde_json::json!({
|
||||
"type": "thinking",
|
||||
"content": reasoning
|
||||
}).to_string();
|
||||
let _ = tx.send(thinking_msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(content) = data["choices"][0]["delta"]["content"].as_str() {
|
||||
if !content.is_empty() {
|
||||
if in_reasoning {
|
||||
in_reasoning = false;
|
||||
let clear_msg = serde_json::json!({"type": "thinking_clear"}).to_string();
|
||||
let _ = tx.send(clear_msg).await;
|
||||
}
|
||||
let processed = handler.process_content_streaming(content, &mut stream_state);
|
||||
if !processed.is_empty() {
|
||||
content_sent += processed.len();
|
||||
let _ = tx.send(processed).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle regular content
|
||||
if let Some(content) = data["choices"][0]["delta"]["content"].as_str() {
|
||||
if !content.is_empty() {
|
||||
if in_reasoning {
|
||||
in_reasoning = false;
|
||||
// Send clear signal
|
||||
let clear_msg = serde_json::json!({
|
||||
"type": "thinking_clear"
|
||||
}).to_string();
|
||||
let _ = tx.send(clear_msg).await;
|
||||
}
|
||||
|
||||
let processed = handler.process_content_streaming(content, &mut stream_state);
|
||||
if !processed.is_empty() {
|
||||
let _ = tx.send(processed).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle standard OpenAI tool_calls
|
||||
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
|
||||
for tool_call in tool_calls {
|
||||
// We send the tool_call object as a JSON string so stream_response
|
||||
// can buffer it and parse it using ToolExecutor::parse_tool_call
|
||||
if let Some(func) = tool_call.get("function") {
|
||||
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
|
||||
let _ = tx.send(args.to_string()).await;
|
||||
}
|
||||
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
|
||||
for tool_call in tool_calls {
|
||||
if let Some(func) = tool_call.get("function") {
|
||||
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
|
||||
let _ = tx.send(args.to_string()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -497,8 +499,12 @@ impl LLMProvider for OpenAIClient {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
info!("LLM stream done: size={} bytes, content_sent={}, first={:?}, last={}",
|
||||
total_size, content_sent, first_bytes, last_bytes);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cancel_job(
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue