diff --git a/src/automation/compact_prompt.rs b/src/automation/compact_prompt.rs index 6a1635c3..55a54030 100644 --- a/src/automation/compact_prompt.rs +++ b/src/automation/compact_prompt.rs @@ -62,7 +62,7 @@ async fn compact_prompt_for_bots( // Calculate start index: if there's a summary, start after it; otherwise start from 0 let start_index = last_summary_index.map(|idx| idx + 1).unwrap_or(0); - for (i, (role, _)) in history.iter().enumerate().skip(start_index) { + for (_i, (role, _)) in history.iter().enumerate().skip(start_index) { if role == "compact" { continue; } @@ -101,21 +101,26 @@ async fn compact_prompt_for_bots( messages_since_summary ); - let mut compacted = "Please summarize the following conversation between a human and an AI assistant:\n".to_string(); + let mut messages = Vec::new(); + messages.push(serde_json::json!({ + "role": "system", + "content": "Please summarize the following conversation between a user and a bot" + })); - // Include messages from start_index onward - let messages_to_include = history.iter().skip(start_index); - - for (role, content) in messages_to_include { + for (role, content) in history.iter().skip(start_index) { if role == "compact" { continue; } - compacted.push_str(&format!("{}: {}\n", role, content)); + messages.push(serde_json::json!({ + "role": role, + "content": content + })); } + let llm_provider = state.llm_provider.clone(); trace!("Starting summarization for session {}", session.id); let mut filtered = String::new(); - let summarized = match llm_provider.generate(&compacted, &serde_json::Value::Null).await { + let summarized = match llm_provider.generate("", &serde_json::Value::Array(messages)).await { Ok(summary) => { trace!( "Successfully summarized session {} ({} chars)", @@ -138,7 +143,7 @@ async fn compact_prompt_for_bots( session.id, e ); trace!("Using fallback summary for session {}", session.id); - format!("SUMMARY: {}", compacted) // Fallback + format!("SUMMARY: {}", filtered) // Fallback } }; info!( @@ -148,7 +153,7 @@ async fn compact_prompt_for_bots( ); { let mut session_manager = state.session_manager.lock().await; - session_manager.save_message(session.id, session.user_id, 9, &filtered, 1)?; + session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; } let _session_cleanup = guard((), |_| { diff --git a/src/automation/mod.rs b/src/automation/mod.rs index dfd67e18..163c4318 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -109,63 +109,4 @@ impl AutomationService { } Ok(()) } - async fn execute_compact_prompt( - &self, - automation: &Automation, - ) -> Result<(), Box> { - let config_manager = ConfigManager::new(self.state.conn.clone()); - let compact_threshold = config_manager - .get_config(&automation.bot_id, "prompt-compact", None)? - .parse::() - .unwrap_or(0); - if compact_threshold == 0 { - return Ok(()); - } - let mut session_manager = self.state.session_manager.lock().await; - let sessions = session_manager.get_user_sessions(uuid::Uuid::nil())?; - for session in sessions { - if session.bot_id != automation.bot_id { - continue; - } - let history = session_manager.get_conversation_history(session.id, session.user_id)?; - if history.len() > compact_threshold { - trace!( - "Compacting prompt for session {}: {} messages", - session.id, - history.len() - ); - let mut compacted = String::new(); - for (role, content) in &history[..history.len() - compact_threshold] { - compacted.push_str(&format!("{}: {}\n", role, content)); - } - let summarized = format!("SUMMARY: {}", compacted); - session_manager.save_message(session.id, session.user_id, 3, &summarized, 1)?; - } - } - Ok(()) - } -} -pub async fn execute_compact_prompt( - state: Arc, -) -> Result<(), Box> { - use crate::shared::models::system_automations::dsl::{is_active, system_automations}; - use diesel::prelude::*; - let state_clone = state.clone(); - let service = AutomationService::new(state_clone); - let mut conn = state - .conn - .get() - .map_err(|e| format!("Failed to acquire database connection: {}", e))?; - let automations: Vec = system_automations - .filter(is_active.eq(true)) - .load::(&mut conn)?; - for automation in automations { - if let Err(e) = service.execute_compact_prompt(&automation).await { - error!( - "Failed to compact prompt for bot {}: {}", - automation.bot_id, e - ); - } - } - Ok(()) } diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 51b0a025..b9f05371 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,6 +1,7 @@ mod ui; use crate::config::ConfigManager; use crate::drive_monitor::DriveMonitor; +use crate::llm::OpenAIClient; use crate::llm_models; use crate::nvidia::get_system_metrics; use crate::bot::ui::BotUI; @@ -361,22 +362,12 @@ impl BotOrchestrator { } history }; - let mut prompt = String::new(); - if !system_prompt.is_empty() { - prompt.push_str(&format!("SYSTEM: *** {} *** \n", system_prompt)); - } - if !context_data.is_empty() { - prompt.push_str(&format!("CONTEXT: *** {} *** \n", context_data)); - } - for (role, content) in &history { - prompt.push_str(&format!("{}:{}\n", role, content)); - } - prompt.push_str(&format!("\nbot:")); + let messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history); trace!( - "Stream prompt constructed with {} history entries", + "Stream messages constructed with {} history entries", history.len() ); - trace!("LLM prompt: [{}]", prompt); + trace!("LLM messages: {:?}", messages); let (stream_tx, mut stream_rx) = mpsc::channel::(100); let llm = self.state.llm_provider.clone(); if message.channel == "web" { @@ -406,10 +397,9 @@ impl BotOrchestrator { }; response_tx.send(thinking_response).await?; } - let prompt_clone = prompt.clone(); tokio::spawn(async move { if let Err(e) = llm - .generate_stream(&prompt_clone, &serde_json::Value::Null, stream_tx) + .generate_stream("", &messages, stream_tx) .await { error!("LLM streaming error: {}", e); @@ -422,7 +412,7 @@ impl BotOrchestrator { let mut first_word_received = false; let mut last_progress_update = Instant::now(); let progress_interval = Duration::from_secs(1); - let initial_tokens = crate::shared::utils::estimate_token_count(&prompt); + let initial_tokens = crate::shared::utils::estimate_token_count(&message.content); let config_manager = ConfigManager::new(self.state.conn.clone()); let max_context_size = config_manager .get_config( @@ -482,8 +472,6 @@ impl BotOrchestrator { let _cpu_bar = "█".repeat((metrics.cpu_usage / 5.0).round() as usize); let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; let _token_bar = "█".repeat((token_ratio * 20.0).round() as usize); - let mut ui = BotUI::new().unwrap(); - ui.render_progress(current_tokens, max_context_size).unwrap(); } last_progress_update = Instant::now(); } @@ -510,7 +498,7 @@ impl BotOrchestrator { "Stream processing completed, {} chunks processed", chunk_count ); - let total_tokens = crate::shared::utils::estimate_token_count(&prompt) + let total_tokens = crate::shared::utils::estimate_token_count(&message.content) + crate::shared::utils::estimate_token_count(&context_data) + crate::shared::utils::estimate_token_count(&full_response); info!( @@ -518,26 +506,6 @@ impl BotOrchestrator { total_tokens ); let config_manager = ConfigManager::new( self.state.conn.clone()); - let compact_enabled = config_manager - .get_config( - &Uuid::parse_str(&message.bot_id).unwrap_or_default(), - "prompt-compact", - None, - ) - .unwrap_or_default() - .parse::() - .unwrap_or(0); - if compact_enabled > 0 { - let state = self.state.clone(); - tokio::task::spawn_blocking(move || loop { - if let Err(e) = tokio::runtime::Handle::current() - .block_on(crate::automation::execute_compact_prompt(state.clone())) - { - error!("Failed to execute compact prompt: {}", e); - } - std::thread::sleep(Duration::from_secs(60)); - }); - } { let mut sm = self.state.session_manager.lock().await; sm.save_message(session.id, user_id, 2, &full_response, 1)?; @@ -668,8 +636,6 @@ impl BotOrchestrator { "Sending warning to session {} on channel {}: {}", session_id, channel, message ); - let mut ui = BotUI::new().unwrap(); - ui.render_warning(message).unwrap(); Ok(()) } pub async fn trigger_auto_welcome( diff --git a/src/bot/ui.rs b/src/bot/ui.rs index 219eb8c4..f1bb73e6 100644 --- a/src/bot/ui.rs +++ b/src/bot/ui.rs @@ -17,43 +17,7 @@ impl BotUI { let terminal = Terminal::new(backend)?; Ok(Self { terminal }) } - pub fn render_progress(&mut self, current_tokens: usize, max_context_size: usize) -> io::Result<()> { - let metrics = get_system_metrics(current_tokens, max_context_size).unwrap_or_default(); - let gpu_usage = metrics.gpu_usage.unwrap_or(0.0); - let cpu_usage = metrics.cpu_usage; - let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; - self.terminal.draw(|f| { - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Length(3), - Constraint::Length(3), - Constraint::Length(3), - Constraint::Min(0), - ]) - .split(f.area()); - let gpu_gauge = Gauge::default() - .block(Block::default().title("GPU Usage").borders(Borders::ALL)) - .gauge_style(Style::default().fg(Color::Green).add_modifier(Modifier::BOLD)) - .ratio(gpu_usage as f64 / 100.0) - .label(format!("{:.1}%", gpu_usage)); - let cpu_gauge = Gauge::default() - .block(Block::default().title("CPU Usage").borders(Borders::ALL)) - .gauge_style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD)) - .ratio(cpu_usage as f64 / 100.0) - .label(format!("{:.1}%", cpu_usage)); - let token_gauge = Gauge::default() - .block(Block::default().title("Token Progress").borders(Borders::ALL)) - .gauge_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)) - .ratio(token_ratio) - .label(format!("{}/{}", current_tokens, max_context_size)); - f.render_widget(gpu_gauge, chunks[0]); - f.render_widget(cpu_gauge, chunks[1]); - f.render_widget(token_gauge, chunks[2]); - })?; - Ok(()) - } - pub fn render_warning(&mut self, message: &str) -> io::Result<()> { + fn render_warning(&mut self, message: &str) -> io::Result<()> { self.terminal.draw(|f| { let block = Block::default() .title("⚠️ NVIDIA Warning") diff --git a/src/llm/local.rs b/src/llm/local.rs index 79778301..69a00d9f 100644 --- a/src/llm/local.rs +++ b/src/llm/local.rs @@ -204,8 +204,8 @@ pub async fn start_llm_server( } else { let mut cmd = tokio::process::Command::new("sh"); cmd.arg("-c").arg(format!( - "cd {} && ./llama-server {} --verbose >llm-stdout.log 2>&1 &", - llama_cpp_path, args + "cd {} && ./llama-server {} --verbose >../../../../logs/llm/stdout.log 2>&1 &", + llama_cpp_path, args )); info!("Executing LLM server command: cd {} && ./llama-server {} --verbose", llama_cpp_path, args); cmd.spawn()?; diff --git a/src/llm/mod.rs b/src/llm/mod.rs index abdd405b..3c861019 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::StreamExt; +use log::info; use serde_json::Value; use tokio::sync::mpsc; pub mod local; @@ -39,18 +40,20 @@ impl LLMProvider for OpenAIClient { async fn generate( &self, prompt: &str, - _config: &Value, + messages: &Value, ) -> Result> { - let messages = self.parse_messages(prompt); - + let default_messages = serde_json::json!([{"role": "user", "content": prompt}]); let response = self .client .post(&format!("{}/v1/chat/completions/", self.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&serde_json::json!({ "model": "gpt-3.5-turbo", - "messages": messages, - "max_tokens": 1000 + "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() { + messages + } else { + &default_messages + } })) .send() .await?; @@ -69,18 +72,22 @@ impl LLMProvider for OpenAIClient { async fn generate_stream( &self, prompt: &str, - _config: &Value, + messages: &Value, tx: mpsc::Sender, ) -> Result<(), Box> { - let messages = self.parse_messages(prompt); - + let default_messages = serde_json::json!([{"role": "user", "content": prompt}]); let response = self .client .post(&format!("{}/v1/chat/completions", self.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&serde_json::json!({ "model": "gpt-3.5-turbo", - "messages": messages, + "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() { + info!("Using provided messages: {:?}", messages); + messages + } else { + &default_messages + }, "stream": true })) .send() @@ -120,50 +127,26 @@ impl OpenAIClient { } } - fn parse_messages(&self, prompt: &str) -> Vec { + pub fn build_messages(system_prompt: &str, context_data: &str, history: &[(String, String)]) -> Value { let mut messages = Vec::new(); - let mut current_role = None; - let mut current_content = String::new(); - - for line in prompt.lines() { - if let Some(role_end) = line.find(':') { - let role_part = &line[..role_end].trim().to_lowercase(); - let role = match role_part.as_str() { - "human" => "user", - "bot" => "assistant", - "compact" => "system", - _ => continue - }; - - if let Some(r) = current_role.take() { - if !current_content.is_empty() { - messages.push(serde_json::json!({ - "role": r, - "content": current_content.trim() - })); - } - } - current_role = Some(role); - current_content = line[role_end + 1..].trim_start().to_string(); - continue; - } - - if let Some(_) = current_role { - if !current_content.is_empty() { - current_content.push('\n'); - } - current_content.push_str(line); - } + if !system_prompt.is_empty() { + messages.push(serde_json::json!({ + "role": "system", + "content": system_prompt + })); } - - if let Some(role) = current_role { - if !current_content.is_empty() { - messages.push(serde_json::json!({ - "role": role, - "content": current_content.trim() - })); - } + if !context_data.is_empty() { + messages.push(serde_json::json!({ + "role": "system", + "content": context_data + })); } - messages + for (role, content) in history { + messages.push(serde_json::json!({ + "role": role, + "content": content + })); + } + serde_json::Value::Array(messages) } } diff --git a/src/session/mod.rs b/src/session/mod.rs index c4ae248c..89eaca1f 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -273,7 +273,7 @@ impl SessionManager { let mut history: Vec<(String, String)> = Vec::new(); for (other_role, content) in messages { let role_str = match other_role { - 1 => "human".to_string(), + 1 => "user".to_string(), 2 => "bot".to_string(), 3 => "system".to_string(), 9 => "compact".to_string(),