feat: stateful thinking tag stripping for Kimi, Minimax and DeepSeek stream
All checks were successful
BotServer CI/CD / build (push) Successful in 3m42s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-14 16:15:31 -03:00
parent 8ccc4e1c5e
commit ba3e2675ef
5 changed files with 92 additions and 5 deletions

View file

@ -282,6 +282,8 @@ impl LLMProvider for KimiClient {
info!("[Kimi] Connection established, starting stream");
let handler = crate::llm::llm_models::get_handler(model);
let mut stream_state = String::new();
let mut stream = response.bytes_stream();
let mut total_content_chars: usize = 0;
let mut chunk_count: usize = 0;
@ -327,10 +329,13 @@ impl LLMProvider for KimiClient {
// Kimi K2.5: content has the answer, reasoning/reasoning_content is thinking
if let Some(text) = delta.get("content").and_then(|c| c.as_str()) {
if !text.is_empty() {
total_content_chars += text.len();
if tx.send(text.to_string()).await.is_err() {
info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars);
return Ok(());
let processed = handler.process_content_streaming(text, &mut stream_state);
if !processed.is_empty() {
total_content_chars += processed.len();
if tx.send(processed).await.is_err() {
info!("[Kimi] Channel closed, stopping stream after {} content chars", total_content_chars);
return Ok(());
}
}
}
}

View file

@ -38,6 +38,42 @@ impl ModelHandler for DeepseekR3Handler {
strip_think_tags(content)
}
fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String {
let old_len = state.len();
state.push_str(chunk);
let mut clean_current = String::new();
let mut in_think = false;
let mut current_pos = 0;
let full_text = state.as_str();
while current_pos < full_text.len() {
if !in_think {
if full_text[current_pos..].starts_with("<think>") {
in_think = true;
current_pos += 7;
} else {
let c = full_text[current_pos..].chars().next().unwrap();
if current_pos >= old_len {
clean_current.push(c);
}
current_pos += c.len_utf8();
}
} else {
if full_text[current_pos..].starts_with("</think>") {
in_think = false;
current_pos += 8;
} else {
let c = full_text[current_pos..].chars().next().unwrap();
current_pos += c.len_utf8();
}
}
}
clean_current
}
fn has_analysis_markers(&self, buffer: &str) -> bool {
buffer.contains("<think>")
}

View file

@ -69,6 +69,48 @@ impl ModelHandler for MinimaxHandler {
strip_think_tags(content)
}
fn process_content_streaming(&self, chunk: &str, state: &mut String) -> String {
let old_len = state.len();
state.push_str(chunk);
let mut clean_current = String::new();
let mut in_think = false;
let mut current_pos = 0;
let full_text = state.as_str();
while current_pos < full_text.len() {
if !in_think {
if full_text[current_pos..].starts_with("<think>") {
in_think = true;
current_pos += 7;
} else if full_text[current_pos..].starts_with("(分析)") || full_text[current_pos..].starts_with("【分析】") {
in_think = true;
current_pos += 12; // UTF-8 for these 3-char Chinese tags
} else {
let c = full_text[current_pos..].chars().next().unwrap();
if current_pos >= old_len {
clean_current.push(c);
}
current_pos += c.len_utf8();
}
} else {
if full_text[current_pos..].starts_with("</think>") {
in_think = false;
current_pos += 8;
} else if full_text[current_pos..].starts_with("/分析)") || full_text[current_pos..].starts_with("【/分析】") {
in_think = false;
current_pos += 13; // UTF-8 for these 4-char Chinese tags
} else {
let c = full_text[current_pos..].chars().next().unwrap();
current_pos += c.len_utf8();
}
}
}
clean_current
}
fn has_analysis_markers(&self, buffer: &str) -> bool {
buffer.contains("(分析)") || buffer.contains("<think>") || buffer.contains("【分析】")
}

View file

@ -6,6 +6,9 @@ pub mod minimax;
pub trait ModelHandler: Send + Sync {
fn is_analysis_complete(&self, buffer: &str) -> bool;
fn process_content(&self, content: &str) -> String;
fn process_content_streaming(&self, content: &str, _state_buffer: &mut String) -> String {
self.process_content(content)
}
fn has_analysis_markers(&self, buffer: &str) -> bool;
}

View file

@ -432,6 +432,7 @@ impl LLMProvider for OpenAIClient {
}
let handler = get_handler(model);
let mut stream_state = String::new(); // State for the handler to track thinking tags across chunks
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
@ -441,7 +442,7 @@ impl LLMProvider for OpenAIClient {
if line.starts_with("data: ") && !line.contains("[DONE]") {
if let Ok(data) = serde_json::from_str::<Value>(&line[6..]) {
if let Some(content) = data["choices"][0]["delta"]["content"].as_str() {
let processed = handler.process_content(content);
let processed = handler.process_content_streaming(content, &mut stream_state);
if !processed.is_empty() {
let _ = tx.send(processed).await;
}