diff --git a/src/llm/kimi.rs b/src/llm/kimi.rs
index 0eef4564..2b7c01bb 100644
--- a/src/llm/kimi.rs
+++ b/src/llm/kimi.rs
@@ -282,6 +282,8 @@ impl LLMProvider for KimiClient {
info!("[Kimi] Connection established, starting stream");
+ let handler = crate::llm::llm_models::get_handler(model);
+ let mut stream_state = String::new();
let mut stream = response.bytes_stream();
let mut total_content_chars: usize = 0;
let mut chunk_count: usize = 0;
@@ -327,10 +329,13 @@ impl LLMProvider for KimiClient {
// Kimi K2.5: content has the answer, reasoning/reasoning_content is thinking
if let Some(text) = delta.get("content").and_then(|c| c.as_str()) {
if !text.is_empty() {
- total_content_chars += text.len();
- if tx.send(text.to_string()).await.is_err() {
- info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars);
- return Ok(());
+ let processed = handler.process_content_streaming(text, &mut stream_state);
+ if !processed.is_empty() {
+ total_content_chars += processed.len();
+ if tx.send(processed).await.is_err() {
+ info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars);
+ return Ok(());
+ }
}
}
}
diff --git a/src/llm/llm_models/deepseek_r3.rs b/src/llm/llm_models/deepseek_r3.rs
index ca04b31a..d5a9dd8d 100644
--- a/src/llm/llm_models/deepseek_r3.rs
+++ b/src/llm/llm_models/deepseek_r3.rs
@@ -38,6 +38,42 @@ impl ModelHandler for DeepseekR3Handler {
strip_think_tags(content)
}
+ fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String {
+ let old_len = state.len();
+ state.push_str(chunk);
+
+ let mut clean_current = String::new();
+ let mut in_think = false;
+
+ let mut current_pos = 0;
+ let full_text = state.as_str();
+
+ while current_pos < full_text.len() {
+ if !in_think {
+ if full_text[current_pos..].starts_with("") {
+ in_think = true;
+ current_pos += 7;
+ } else {
+ let c = full_text[current_pos..].chars().next().unwrap();
+ if current_pos >= old_len {
+ clean_current.push(c);
+ }
+ current_pos += c.len_utf8();
+ }
+ } else {
+ if full_text[current_pos..].starts_with("") {
+ in_think = false;
+ current_pos += 8;
+ } else {
+ let c = full_text[current_pos..].chars().next().unwrap();
+ current_pos += c.len_utf8();
+ }
+ }
+ }
+
+ clean_current
+ }
+
fn has_analysis_markers(&self, buffer: &str) -> bool {
buffer.contains("")
}
diff --git a/src/llm/llm_models/minimax.rs b/src/llm/llm_models/minimax.rs
index 761bdec9..c95262e9 100644
--- a/src/llm/llm_models/minimax.rs
+++ b/src/llm/llm_models/minimax.rs
@@ -69,6 +69,48 @@ impl ModelHandler for MinimaxHandler {
strip_think_tags(content)
}
+ fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String {
+ let old_len = state.len();
+ state.push_str(chunk);
+
+ let mut clean_current = String::new();
+ let mut in_think = false;
+
+ let mut current_pos = 0;
+ let full_text = state.as_str();
+
+ while current_pos < full_text.len() {
+ if !in_think {
+ if full_text[current_pos..].starts_with("") {
+ in_think = true;
+ current_pos += 7;
+ } else if full_text[current_pos..].starts_with("(分析)") || full_text[current_pos..].starts_with("【分析】") {
+ in_think = true;
+ current_pos += 12; // UTF-8 for these 3-char Chinese tags
+ } else {
+ let c = full_text[current_pos..].chars().next().unwrap();
+ if current_pos >= old_len {
+ clean_current.push(c);
+ }
+ current_pos += c.len_utf8();
+ }
+ } else {
+ if full_text[current_pos..].starts_with("") {
+ in_think = false;
+ current_pos += 8;
+ } else if full_text[current_pos..].starts_with("(/分析)") || full_text[current_pos..].starts_with("【/分析】") {
+ in_think = false;
+ current_pos += 13; // UTF-8 for these 4-char Chinese tags
+ } else {
+ let c = full_text[current_pos..].chars().next().unwrap();
+ current_pos += c.len_utf8();
+ }
+ }
+ }
+
+ clean_current
+ }
+
fn has_analysis_markers(&self, buffer: &str) -> bool {
buffer.contains("(分析)") || buffer.contains("") || buffer.contains("【分析】")
}
diff --git a/src/llm/llm_models/mod.rs b/src/llm/llm_models/mod.rs
index 965adb99..7e405942 100644
--- a/src/llm/llm_models/mod.rs
+++ b/src/llm/llm_models/mod.rs
@@ -6,6 +6,9 @@ pub mod minimax;
pub trait ModelHandler: Send + Sync {
fn is_analysis_complete(&self, buffer: &str) -> bool;
fn process_content(&self, content: &str) -> String;
+ fn process_content_streaming(&self, content: &str, _state_buffer: &mut String) -> String {
+ self.process_content(content)
+ }
fn has_analysis_markers(&self, buffer: &str) -> bool;
}
diff --git a/src/llm/mod.rs b/src/llm/mod.rs
index 6d83eb9b..a220f3ac 100644
--- a/src/llm/mod.rs
+++ b/src/llm/mod.rs
@@ -432,6 +432,7 @@ impl LLMProvider for OpenAIClient {
}
let handler = get_handler(model);
+ let mut stream_state = String::new(); // State for the handler to track thinking tags across chunks
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
@@ -441,7 +442,7 @@ impl LLMProvider for OpenAIClient {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::(&line[6..]) {
if let Some(content) = data["choices"][0]["delta"]["content"].as_str() {
- let processed = handler.process_content(content);
+ let processed = handler.process_content_streaming(content, &mut stream_state);
if !processed.is_empty() {
let _ = tx.send(processed).await;
}