use async_trait::async_trait; use futures::StreamExt; use log::{error, info}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::mpsc; use super::LLMProvider; // Kimi K2.5 (moonshotai) API Client // NVIDIA endpoint with special chat_template_kwargs for thinking mode #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiMessage { pub role: String, #[serde(skip_serializing_if = "Option::is_none")] pub content: Option, #[serde(default)] pub tool_calls: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiRequest { pub model: String, pub messages: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub stream: Option, #[serde(skip_serializing_if = "Option::is_none")] pub max_tokens: Option, #[serde(skip_serializing_if = "Option::is_none")] pub temperature: Option, #[serde(skip_serializing_if = "Option::is_none")] pub top_p: Option, #[serde(skip_serializing_if = "Option::is_none")] pub tools: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub tool_choice: Option, #[serde(rename = "chat_template_kwargs", skip_serializing_if = "Option::is_none")] pub chat_template_kwargs: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiChatTemplateKwargs { pub thinking: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiResponseChoice { #[serde(default)] pub index: u32, pub message: KimiMessage, #[serde(default)] pub finish_reason: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiResponse { pub id: String, pub object: String, pub created: u64, pub model: String, pub choices: Vec, #[serde(default)] pub usage: Option, } // Streaming structures #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct KimiStreamDelta { #[serde(default)] pub content: Option, #[serde(default)] pub role: Option, #[serde(default)] pub tool_calls: Option>, #[serde(default)] pub reasoning_content: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiStreamChoice { #[serde(default)] pub index: u32, #[serde(default)] pub delta: KimiStreamDelta, #[serde(default)] pub finish_reason: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KimiStreamChunk { pub id: String, pub object: String, pub created: u64, pub model: String, pub choices: Vec, #[serde(default)] pub usage: Option, } #[derive(Debug)] pub struct KimiClient { client: reqwest::Client, base_url: String, } impl KimiClient { pub fn new(base_url: String) -> Self { let base = base_url.trim_end_matches('/').to_string(); Self { client: reqwest::Client::new(), base_url: base, } } fn build_url(&self) -> String { if self.base_url.contains("/chat/completions") { self.base_url.clone() } else { format!("{}/chat/completions", self.base_url) } } fn sanitize_utf8(input: &str) -> String { input.chars() .filter(|c| { let cp = *c as u32; !(0xD800..=0xDBFF).contains(&cp) && !(0xDC00..=0xDFFF).contains(&cp) }) .collect() } } #[async_trait] impl LLMProvider for KimiClient { async fn generate( &self, prompt: &str, _config: &Value, model: &str, key: &str, ) -> Result> { let messages = vec![KimiMessage { role: "user".to_string(), content: Some(prompt.to_string()), tool_calls: None, }]; let model_name = if model == "kimi-k2.5" || model == "kimi-k2" { "moonshotai/kimi-k2.5" } else { model }; let request = KimiRequest { model: model_name.to_string(), messages, stream: Some(false), max_tokens: None, temperature: Some(1.0), top_p: Some(1.0), tools: None, tool_choice: None, chat_template_kwargs: Some(KimiChatTemplateKwargs { thinking: true, }), }; let url = self.build_url(); info!("Kimi non-streaming request to: {}", url); let response = self .client .post(&url) .header("Authorization", format!("Bearer {}", key)) .header("Content-Type", "application/json") .header("Accept", "application/json") .json(&request) .send() .await?; if !response.status().is_success() { let error_text = response.text().await.unwrap_or_default(); error!("Kimi API error: {}", error_text); return Err(format!("Kimi API error: {}", error_text).into()); } let kimi_response: KimiResponse = response.json().await?; let content = kimi_response .choices .first() .and_then(|c| c.message.content.clone()) .unwrap_or_default(); Ok(content) } async fn generate_stream( &self, prompt: &str, config: &Value, tx: mpsc::Sender, model: &str, key: &str, tools: Option<&Vec>, ) -> Result<(), Box> { let messages = if let Some(msgs) = config.as_array() { msgs.iter() .filter_map(|m| { let role = m.get("role")?.as_str()?; let content = m.get("content")?.as_str()?; let sanitized = Self::sanitize_utf8(content); Some(KimiMessage { role: role.to_string(), content: Some(sanitized), tool_calls: None, }) }) .collect::>() } else { vec![KimiMessage { role: "user".to_string(), content: Some(Self::sanitize_utf8(prompt)), tool_calls: None, }] }; if messages.is_empty() { return Err("No valid messages in request".into()); } let model_name = if model == "kimi-k2.5" || model == "kimi-k2" { "moonshotai/kimi-k2.5" } else { model }; let tool_choice = if tools.is_some() { Some(serde_json::json!("auto")) } else { None }; let request = KimiRequest { model: model_name.to_string(), messages, stream: Some(true), max_tokens: None, temperature: Some(1.0), top_p: Some(1.0), tools: tools.cloned(), tool_choice, chat_template_kwargs: Some(KimiChatTemplateKwargs { thinking: true, }), }; let url = self.build_url(); info!("Kimi streaming request to: {}", url); let response = self .client .post(&url) .header("Authorization", format!("Bearer {}", key)) .header("Content-Type", "application/json") .header("Accept", "text/event-stream") .json(&request) .send() .await?; if !response.status().is_success() { let error_text = response.text().await.unwrap_or_default(); error!("Kimi streaming error: {}", error_text); return Err(format!("Kimi streaming error: {}", error_text).into()); } let mut stream = response.bytes_stream(); let mut buffer = Vec::new(); while let Some(chunk_result) = stream.next().await { let chunk = chunk_result.map_err(|e| format!("Stream error: {}", e))?; buffer.extend_from_slice(&chunk); let data = String::from_utf8_lossy(&buffer); for line in data.lines() { let line = line.trim(); if line.is_empty() { continue; } if line == "data: [DONE]" { std::mem::drop(tx.send(String::new())); return Ok(()); } if let Some(json_str) = line.strip_prefix("data: ") { let json_str = json_str.trim(); if let Ok(chunk_data) = serde_json::from_str::(json_str) { if let Some(choices) = chunk_data.get("choices").and_then(|c| c.as_array()) { for choice in choices { if let Some(delta) = choice.get("delta") { if let Some(tool_calls) = delta.get("tool_calls").and_then(|t| t.as_array()) { for tool_call in tool_calls { let tool_call_json = serde_json::json!({ "type": "tool_call", "content": tool_call }).to_string(); let _ = tx.send(tool_call_json).await; } } // Kimi K2.5 sends text via reasoning_content (thinking mode) // content may be null; we accept both fields let text = delta.get("content").and_then(|c| c.as_str()) .or_else(|| delta.get("reasoning_content").and_then(|c| c.as_str())); if let Some(text) = text { if !text.is_empty() { let _ = tx.send(text.to_string()).await; } } } if let Some(reason) = choice.get("finish_reason").and_then(|r| r.as_str()) { if !reason.is_empty() { info!("Kimi stream finished: {}", reason); std::mem::drop(tx.send(String::new())); return Ok(()); } } } } } } } if let Some(last_newline) = data.rfind('\n') { buffer = buffer[last_newline + 1..].to_vec(); } } std::mem::drop(tx.send(String::new())); Ok(()) } async fn cancel_job( &self, _session_id: &str, ) -> Result<(), Box> { info!("Kimi cancel requested for session {} (no-op)", _session_id); Ok(()) } }