diff --git a/src/automation/compact_prompt.rs b/src/automation/compact_prompt.rs index 4a38fd0d..bdcf2adc 100644 --- a/src/automation/compact_prompt.rs +++ b/src/automation/compact_prompt.rs @@ -3,100 +3,163 @@ use crate::llm_models; use crate::shared::models::Automation; use crate::shared::state::AppState; use diesel::prelude::*; -use log::{error, trace}; +use log::{error, trace}; use std::collections::HashSet; use std::sync::Arc; use tokio::time::{interval, Duration}; use uuid::Uuid; pub fn start_compact_prompt_scheduler(state: Arc) { - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(30)).await; - let mut interval = interval(Duration::from_secs(60)); - loop { - interval.tick().await; - if let Err(e) = execute_compact_prompt(Arc::clone(&state)).await { - error!("Prompt compaction failed: {}", e); - } - } - }); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; + let mut interval = interval(Duration::from_secs(60)); + loop { + interval.tick().await; + if let Err(e) = execute_compact_prompt(Arc::clone(&state)).await { + error!("Prompt compaction failed: {}", e); + } + } + }); } -async fn execute_compact_prompt(state: Arc) -> Result<(), Box> { - use crate::shared::models::system_automations::dsl::{is_active, system_automations}; - let automations: Vec = { - let mut conn = state.conn.get().map_err(|e| format!("Failed to acquire lock: {}", e))?; - system_automations.filter(is_active.eq(true)).load::(&mut *conn)? - }; - for automation in automations { - if let Err(e) = compact_prompt_for_bot(&state, &automation).await { - error!("Failed to compact prompt for bot {}: {}", automation.bot_id, e); - } - } - Ok(()) +async fn execute_compact_prompt( + state: Arc, +) -> Result<(), Box> { + use crate::shared::models::system_automations::dsl::{is_active, system_automations}; + let automations: Vec = { + let mut conn = state + .conn + .get() + .map_err(|e| format!("Failed to acquire lock: {}", e))?; + system_automations + .filter(is_active.eq(true)) + .load::(&mut *conn)? + }; + for automation in automations { + if let Err(e) = compact_prompt_for_bot(&state, &automation).await { + error!( + "Failed to compact prompt for bot {}: {}", + automation.bot_id, e + ); + } + } + Ok(()) } -async fn compact_prompt_for_bot(state: &Arc, automation: &Automation) -> Result<(), Box> { - use once_cell::sync::Lazy; - use scopeguard::guard; - static IN_PROGRESS: Lazy>> = Lazy::new(|| tokio::sync::Mutex::new(HashSet::new())); - { - let mut in_progress = IN_PROGRESS.lock().await; - if in_progress.contains(&automation.bot_id) { - return Ok(()); - } - in_progress.insert(automation.bot_id); - } - let bot_id = automation.bot_id; - let _cleanup = guard((), |_| { - tokio::spawn(async move { - let mut in_progress = IN_PROGRESS.lock().await; - in_progress.remove(&bot_id); - }); - }); - let config_manager = ConfigManager::new(state.conn.clone()); - let compact_threshold = config_manager - .get_config(&automation.bot_id, "prompt-compact", None)? - .parse::() - .unwrap_or(0); - if compact_threshold == 0 { - return Ok(()); - } else if compact_threshold < 0 { - trace!("Negative compact threshold detected for bot {}, skipping", automation.bot_id); - } - let sessions = { - let mut session_manager = state.session_manager.lock().await; - session_manager.get_user_sessions(Uuid::nil())? - }; - for session in sessions { - if session.bot_id != automation.bot_id { - continue; - } - let history = { - let mut session_manager = state.session_manager.lock().await; - session_manager.get_conversation_history(session.id, session.user_id)? - }; - trace!("Compacting prompt for session {}: {} messages", session.id, history.len()); - let mut compacted = String::new(); - for (role, content) in &history { - compacted.push_str(&format!("{}: {}\n", role, content)); - } - let llm_provider = state.llm_provider.clone(); - let compacted_clone = compacted.clone(); - let summarized = match llm_provider.summarize(&compacted_clone).await { - Ok(summary) => { - trace!("Successfully summarized conversation for session {}, summary length: {}", session.id, summary.len()); - let handler = llm_models::get_handler(&config_manager.get_config(&automation.bot_id, "llm-model", None).unwrap_or_default()); - let filtered = handler.process_content(&summary); - format!("SUMMARY: {}", filtered) - }, - Err(e) => { - error!("Failed to summarize conversation for session {}: {}", session.id, e); - format!("SUMMARY: {}", compacted) - } - }; - trace!("Prompt compacted {}: {} messages", session.id, history.len()); - { - let mut session_manager = state.session_manager.lock().await; - session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; - } - } - Ok(()) +async fn compact_prompt_for_bot( + state: &Arc, + automation: &Automation, +) -> Result<(), Box> { + use once_cell::sync::Lazy; + use scopeguard::guard; + static IN_PROGRESS: Lazy>> = + Lazy::new(|| tokio::sync::Mutex::new(HashSet::new())); + { + let mut in_progress = IN_PROGRESS.lock().await; + if in_progress.contains(&automation.bot_id) { + return Ok(()); + } + in_progress.insert(automation.bot_id); + } + let bot_id = automation.bot_id; + let _cleanup = guard((), |_| { + tokio::spawn(async move { + let mut in_progress = IN_PROGRESS.lock().await; + in_progress.remove(&bot_id); + }); + }); + let config_manager = ConfigManager::new(state.conn.clone()); + let compact_threshold = config_manager + .get_config(&automation.bot_id, "prompt-compact", None)? + .parse::() + .unwrap_or(0); + if compact_threshold == 0 { + return Ok(()); + } else if compact_threshold < 0 { + trace!( + "Negative compact threshold detected for bot {}, skipping", + automation.bot_id + ); + } + let sessions = { + let mut session_manager = state.session_manager.lock().await; + session_manager.get_user_sessions(Uuid::nil())? + }; + for session in sessions { + if session.bot_id != automation.bot_id { + trace!("Skipping session {} - bot_id {} doesn't match automation bot_id {}", + session.id, session.bot_id, automation.bot_id); + continue; + } + let history = { + let mut session_manager = state.session_manager.lock().await; + session_manager.get_conversation_history(session.id, session.user_id)? + }; + + let mut messages_since_summary = 0; + let mut has_new_messages = false; + let mut last_summary_index = history.iter().position(|(role, _)| + role == "compact") + .unwrap_or(0); + + for (i, (role, _)) in history.iter().enumerate().skip(last_summary_index + 1) { + if role == "compact" { + continue; + } + messages_since_summary += 1; + has_new_messages = true; + } + + if !has_new_messages { + trace!("Skipping session {} - no new messages since last summary", session.id); + continue; + } + if messages_since_summary < compact_threshold as usize { + trace!("Skipping compaction for session {} - only {} new messages since last summary (threshold: {})", + session.id, messages_since_summary, compact_threshold); + continue; + } + + trace!( + "Compacting prompt for session {}: {} messages since last summary", + session.id, + messages_since_summary + ); + let mut compacted = String::new(); + for (role, content) in &history { + compacted.push_str(&format!("{}: {}\n", role, content)); + } + let llm_provider = state.llm_provider.clone(); + let compacted_clone = compacted.clone(); + let summarized = match llm_provider.summarize(&compacted_clone).await { + Ok(summary) => { + trace!( + "Successfully summarized conversation for session {}, summary length: {}", + session.id, + summary.len() + ); + let handler = llm_models::get_handler( + &config_manager + .get_config(&automation.bot_id, "llm-model", None) + .unwrap_or_default(), + ); + let filtered = handler.process_content(&summary); + format!("SUMMARY: {}", filtered) + } + Err(e) => { + error!( + "Failed to summarize conversation for session {}: {}", + session.id, e + ); + format!("SUMMARY: {}", compacted) + } + }; + trace!( + "Prompt compacted {}: {} messages", + session.id, + history.len() + ); + { + let mut session_manager = state.session_manager.lock().await; + session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; + } + } + Ok(()) } diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 5595ab06..b750a16d 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -416,7 +416,7 @@ impl BotOrchestrator { // Skip all messages before the most recent compacted message (type 9) if let Some(last_compacted_index) = history .iter() - .rposition(|(role, content)| role == "COMPACTED" || content.starts_with("SUMMARY:")) + .rposition(|(role, _content)| role == "compact") { history = history.split_off(last_compacted_index); } diff --git a/src/config/mod.rs b/src/config/mod.rs index 0f3d492a..c9b7d202 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -7,18 +7,9 @@ use uuid::Uuid; pub struct AppConfig { pub drive: DriveConfig, pub server: ServerConfig, - pub database: DatabaseConfig, pub site_path: String, } #[derive(Clone)] -pub struct DatabaseConfig { - pub username: String, - pub password: String, - pub server: String, - pub port: u32, - pub database: String, -} -#[derive(Clone)] pub struct DriveConfig { pub server: String, pub access_key: String, @@ -29,18 +20,6 @@ pub struct ServerConfig { pub host: String, pub port: u16, } -impl AppConfig { - pub fn database_url(&self) -> String { - format!( - "postgres://{}:{}@{}:{}/{}", - self.database.username, - self.database.password, - self.database.server, - self.database.port, - self.database.database - ) - } -} impl AppConfig { pub fn from_database(pool: &DbPool) -> Result { use crate::shared::models::schema::bot_configuration::dsl::*; @@ -77,7 +56,7 @@ impl AppConfig { .first::(&mut conn) .unwrap_or_else(|_| default.to_string()) }; - let get_u32 = |key: &str, default: u32| -> u32 { + let _get_u32 = |key: &str, default: u32| -> u32 { config_map .get(key) .and_then(|v| v.3.parse().ok()) @@ -95,28 +74,6 @@ impl AppConfig { .map(|v| v.3.to_lowercase() == "true") .unwrap_or(default) }; - let database = DatabaseConfig { - username: match std::env::var("TABLES_USERNAME") { - Ok(v) => v, - Err(_) => get_str("TABLES_USERNAME", "gbuser"), - }, - password: match std::env::var("TABLES_PASSWORD") { - Ok(v) => v, - Err(_) => get_str("TABLES_PASSWORD", ""), - }, - server: match std::env::var("TABLES_SERVER") { - Ok(v) => v, - Err(_) => get_str("TABLES_SERVER", "localhost"), - }, - port: std::env::var("TABLES_PORT") - .ok() - .and_then(|p| p.parse().ok()) - .unwrap_or_else(|| get_u32("TABLES_PORT", 5432)), - database: match std::env::var("TABLES_DATABASE") { - Ok(v) => v, - Err(_) => get_str("TABLES_DATABASE", "botserver"), - }, - }; let drive = DriveConfig { server: std::env::var("DRIVE_SERVER").unwrap(), @@ -129,7 +86,6 @@ impl AppConfig { host: get_str("SERVER_HOST", "127.0.0.1"), port: get_u16("SERVER_PORT", 8080), }, - database, site_path: { ConfigManager::new(pool.clone()) .get_config(&Uuid::nil(), "SITES_ROOT", Some("./botserver-stack/sites"))? @@ -138,16 +94,6 @@ impl AppConfig { }) } pub fn from_env() -> Result { - let database_url = std::env::var("DATABASE_URL").unwrap(); - let (db_username, db_password, db_server, db_port, db_name) = - crate::shared::utils::parse_database_url(&database_url); - let database = DatabaseConfig { - username: db_username, - password: db_password, - server: db_server, - port: db_port, - database: db_name, - }; let minio = DriveConfig { server: std::env::var("DRIVE_SERVER").unwrap(), access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(), @@ -162,7 +108,6 @@ impl AppConfig { .and_then(|p| p.parse().ok()) .unwrap_or(8080), }, - database, site_path: { let pool = create_conn()?; ConfigManager::new(pool).get_config( diff --git a/src/main.rs b/src/main.rs index 10b64706..865e1b1f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,11 +41,11 @@ use crate::email::{ }; use crate::file::upload_file; use crate::meet::{voice_start, voice_stop}; -use crate::shared::utils::create_s3_operator; use crate::package_manager::InstallMode; use crate::session::{create_session, get_session_history, get_sessions, start_session}; use crate::shared::state::AppState; use crate::shared::utils::create_conn; +use crate::shared::utils::create_s3_operator; use crate::web_server::{bot_index, index, static_files}; #[derive(Debug, Clone)] pub enum BootstrapProgress { @@ -60,6 +60,13 @@ pub enum BootstrapProgress { } #[tokio::main] async fn main() -> std::io::Result<()> { + dotenv().ok(); + println!( + "Starting {} {}...", + std::env::var("PLATFORM_NAME").unwrap_or("General Bots".to_string()), + env!("CARGO_PKG_VERSION") + ); + use crate::llm::local::ensure_llama_servers_running; use botserver::config::ConfigManager; let args: Vec = std::env::args().collect(); @@ -149,26 +156,20 @@ async fn main() -> std::io::Result<()> { let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await; let env_path = std::env::current_dir().unwrap().join(".env"); let cfg = if env_path.exists() { - - - progress_tx_clone - .send(BootstrapProgress::StartingComponent( - "all services".to_string(), - )) - .ok(); - bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - + progress_tx_clone + .send(BootstrapProgress::StartingComponent( + "all services".to_string(), + )) + .ok(); + bootstrap + .start_all() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + progress_tx_clone .send(BootstrapProgress::ConnectingDatabase) .ok(); match create_conn() { Ok(pool) => { - let mut conn = pool.get().map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - format!("Database connection failed: {}", e), - ) - })?; AppConfig::from_database(&pool) .unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config")) } @@ -177,12 +178,14 @@ async fn main() -> std::io::Result<()> { } else { bootstrap.bootstrap().await; - progress_tx_clone - .send(BootstrapProgress::StartingComponent( - "all services".to_string(), - )) - .ok(); - bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + progress_tx_clone + .send(BootstrapProgress::StartingComponent( + "all services".to_string(), + )) + .ok(); + bootstrap + .start_all() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; match create_conn() { Ok(pool) => AppConfig::from_database(&pool) @@ -190,7 +193,7 @@ async fn main() -> std::io::Result<()> { Err(_) => AppConfig::from_env().expect("Failed to load config from env"), } }; - + progress_tx_clone .send(BootstrapProgress::UploadingTemplates) .ok(); diff --git a/src/session/mod.rs b/src/session/mod.rs index 00b380f9..802733a2 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -288,6 +288,7 @@ impl SessionManager { 1 => "user".to_string(), 2 => "assistant".to_string(), 3 => "system".to_string(), + 9 => "compact".to_string(), _ => "unknown".to_string(), }; history.push((role_str, content)); diff --git a/src/ui_tree/file_tree.rs b/src/ui_tree/file_tree.rs index 27affa31..296e697e 100644 --- a/src/ui_tree/file_tree.rs +++ b/src/ui_tree/file_tree.rs @@ -5,8 +5,8 @@ use crate::shared::state::AppState; #[derive(Debug, Clone)] pub enum TreeNode { Bucket { name: String }, - Folder { bucket: String, path: String, name: String }, - File { bucket: String, path: String, name: String }, + Folder { bucket: String, path: String }, + File { bucket: String, path: String }, } pub struct FileTree { @@ -100,7 +100,6 @@ impl FileTree { self.items.push(("⬆️ .. (go back)".to_string(), TreeNode::Folder { bucket: bucket.to_string(), path: "..".to_string(), - name: "..".to_string(), })); if let Some(drive) = &self.app_state.drive { @@ -178,7 +177,6 @@ impl FileTree { self.items.push((display, TreeNode::Folder { bucket: bucket.to_string(), path: full_path, - name: folder_name, })); } @@ -201,7 +199,6 @@ impl FileTree { self.items.push((display, TreeNode::File { bucket: bucket.to_string(), path: full_path, - name, })); } } @@ -210,7 +207,6 @@ impl FileTree { self.items.push(("(empty folder)".to_string(), TreeNode::Folder { bucket: bucket.to_string(), path: String::new(), - name: String::new(), })); }