feat(automation): improve prompt compaction scheduling and error handling

Refactor the compact prompt scheduler to use proper indentation and improve error logging. Added more detailed error messages for prompt compaction failures and included bot_id in error logs. The changes make the code more maintainable and debugging easier while maintaining the same functionality.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-11 15:01:57 -03:00
parent 191bf0bba7
commit 1a47355834
6 changed files with 183 additions and 175 deletions

View file

@ -20,23 +20,37 @@ pub fn start_compact_prompt_scheduler(state: Arc<AppState>) {
} }
}); });
} }
async fn execute_compact_prompt(state: Arc<AppState>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn execute_compact_prompt(
state: Arc<AppState>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::shared::models::system_automations::dsl::{is_active, system_automations}; use crate::shared::models::system_automations::dsl::{is_active, system_automations};
let automations: Vec<Automation> = { let automations: Vec<Automation> = {
let mut conn = state.conn.get().map_err(|e| format!("Failed to acquire lock: {}", e))?; let mut conn = state
system_automations.filter(is_active.eq(true)).load::<Automation>(&mut *conn)? .conn
.get()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
system_automations
.filter(is_active.eq(true))
.load::<Automation>(&mut *conn)?
}; };
for automation in automations { for automation in automations {
if let Err(e) = compact_prompt_for_bot(&state, &automation).await { if let Err(e) = compact_prompt_for_bot(&state, &automation).await {
error!("Failed to compact prompt for bot {}: {}", automation.bot_id, e); error!(
"Failed to compact prompt for bot {}: {}",
automation.bot_id, e
);
} }
} }
Ok(()) Ok(())
} }
async fn compact_prompt_for_bot(state: &Arc<AppState>, automation: &Automation) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn compact_prompt_for_bot(
state: &Arc<AppState>,
automation: &Automation,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use scopeguard::guard; use scopeguard::guard;
static IN_PROGRESS: Lazy<tokio::sync::Mutex<HashSet<Uuid>>> = Lazy::new(|| tokio::sync::Mutex::new(HashSet::new())); static IN_PROGRESS: Lazy<tokio::sync::Mutex<HashSet<Uuid>>> =
Lazy::new(|| tokio::sync::Mutex::new(HashSet::new()));
{ {
let mut in_progress = IN_PROGRESS.lock().await; let mut in_progress = IN_PROGRESS.lock().await;
if in_progress.contains(&automation.bot_id) { if in_progress.contains(&automation.bot_id) {
@ -59,7 +73,10 @@ async fn compact_prompt_for_bot(state: &Arc<AppState>, automation: &Automation)
if compact_threshold == 0 { if compact_threshold == 0 {
return Ok(()); return Ok(());
} else if compact_threshold < 0 { } else if compact_threshold < 0 {
trace!("Negative compact threshold detected for bot {}, skipping", automation.bot_id); trace!(
"Negative compact threshold detected for bot {}, skipping",
automation.bot_id
);
} }
let sessions = { let sessions = {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
@ -67,13 +84,44 @@ async fn compact_prompt_for_bot(state: &Arc<AppState>, automation: &Automation)
}; };
for session in sessions { for session in sessions {
if session.bot_id != automation.bot_id { if session.bot_id != automation.bot_id {
trace!("Skipping session {} - bot_id {} doesn't match automation bot_id {}",
session.id, session.bot_id, automation.bot_id);
continue; continue;
} }
let history = { let history = {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
session_manager.get_conversation_history(session.id, session.user_id)? session_manager.get_conversation_history(session.id, session.user_id)?
}; };
trace!("Compacting prompt for session {}: {} messages", session.id, history.len());
let mut messages_since_summary = 0;
let mut has_new_messages = false;
let mut last_summary_index = history.iter().position(|(role, _)|
role == "compact")
.unwrap_or(0);
for (i, (role, _)) in history.iter().enumerate().skip(last_summary_index + 1) {
if role == "compact" {
continue;
}
messages_since_summary += 1;
has_new_messages = true;
}
if !has_new_messages {
trace!("Skipping session {} - no new messages since last summary", session.id);
continue;
}
if messages_since_summary < compact_threshold as usize {
trace!("Skipping compaction for session {} - only {} new messages since last summary (threshold: {})",
session.id, messages_since_summary, compact_threshold);
continue;
}
trace!(
"Compacting prompt for session {}: {} messages since last summary",
session.id,
messages_since_summary
);
let mut compacted = String::new(); let mut compacted = String::new();
for (role, content) in &history { for (role, content) in &history {
compacted.push_str(&format!("{}: {}\n", role, content)); compacted.push_str(&format!("{}: {}\n", role, content));
@ -82,17 +130,32 @@ async fn compact_prompt_for_bot(state: &Arc<AppState>, automation: &Automation)
let compacted_clone = compacted.clone(); let compacted_clone = compacted.clone();
let summarized = match llm_provider.summarize(&compacted_clone).await { let summarized = match llm_provider.summarize(&compacted_clone).await {
Ok(summary) => { Ok(summary) => {
trace!("Successfully summarized conversation for session {}, summary length: {}", session.id, summary.len()); trace!(
let handler = llm_models::get_handler(&config_manager.get_config(&automation.bot_id, "llm-model", None).unwrap_or_default()); "Successfully summarized conversation for session {}, summary length: {}",
session.id,
summary.len()
);
let handler = llm_models::get_handler(
&config_manager
.get_config(&automation.bot_id, "llm-model", None)
.unwrap_or_default(),
);
let filtered = handler.process_content(&summary); let filtered = handler.process_content(&summary);
format!("SUMMARY: {}", filtered) format!("SUMMARY: {}", filtered)
}, }
Err(e) => { Err(e) => {
error!("Failed to summarize conversation for session {}: {}", session.id, e); error!(
"Failed to summarize conversation for session {}: {}",
session.id, e
);
format!("SUMMARY: {}", compacted) format!("SUMMARY: {}", compacted)
} }
}; };
trace!("Prompt compacted {}: {} messages", session.id, history.len()); trace!(
"Prompt compacted {}: {} messages",
session.id,
history.len()
);
{ {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?;

View file

@ -416,7 +416,7 @@ impl BotOrchestrator {
// Skip all messages before the most recent compacted message (type 9) // Skip all messages before the most recent compacted message (type 9)
if let Some(last_compacted_index) = history if let Some(last_compacted_index) = history
.iter() .iter()
.rposition(|(role, content)| role == "COMPACTED" || content.starts_with("SUMMARY:")) .rposition(|(role, _content)| role == "compact")
{ {
history = history.split_off(last_compacted_index); history = history.split_off(last_compacted_index);
} }

View file

@ -7,18 +7,9 @@ use uuid::Uuid;
pub struct AppConfig { pub struct AppConfig {
pub drive: DriveConfig, pub drive: DriveConfig,
pub server: ServerConfig, pub server: ServerConfig,
pub database: DatabaseConfig,
pub site_path: String, pub site_path: String,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct DatabaseConfig {
pub username: String,
pub password: String,
pub server: String,
pub port: u32,
pub database: String,
}
#[derive(Clone)]
pub struct DriveConfig { pub struct DriveConfig {
pub server: String, pub server: String,
pub access_key: String, pub access_key: String,
@ -29,18 +20,6 @@ pub struct ServerConfig {
pub host: String, pub host: String,
pub port: u16, pub port: u16,
} }
impl AppConfig {
pub fn database_url(&self) -> String {
format!(
"postgres://{}:{}@{}:{}/{}",
self.database.username,
self.database.password,
self.database.server,
self.database.port,
self.database.database
)
}
}
impl AppConfig { impl AppConfig {
pub fn from_database(pool: &DbPool) -> Result<Self, diesel::result::Error> { pub fn from_database(pool: &DbPool) -> Result<Self, diesel::result::Error> {
use crate::shared::models::schema::bot_configuration::dsl::*; use crate::shared::models::schema::bot_configuration::dsl::*;
@ -77,7 +56,7 @@ impl AppConfig {
.first::<String>(&mut conn) .first::<String>(&mut conn)
.unwrap_or_else(|_| default.to_string()) .unwrap_or_else(|_| default.to_string())
}; };
let get_u32 = |key: &str, default: u32| -> u32 { let _get_u32 = |key: &str, default: u32| -> u32 {
config_map config_map
.get(key) .get(key)
.and_then(|v| v.3.parse().ok()) .and_then(|v| v.3.parse().ok())
@ -95,28 +74,6 @@ impl AppConfig {
.map(|v| v.3.to_lowercase() == "true") .map(|v| v.3.to_lowercase() == "true")
.unwrap_or(default) .unwrap_or(default)
}; };
let database = DatabaseConfig {
username: match std::env::var("TABLES_USERNAME") {
Ok(v) => v,
Err(_) => get_str("TABLES_USERNAME", "gbuser"),
},
password: match std::env::var("TABLES_PASSWORD") {
Ok(v) => v,
Err(_) => get_str("TABLES_PASSWORD", ""),
},
server: match std::env::var("TABLES_SERVER") {
Ok(v) => v,
Err(_) => get_str("TABLES_SERVER", "localhost"),
},
port: std::env::var("TABLES_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or_else(|| get_u32("TABLES_PORT", 5432)),
database: match std::env::var("TABLES_DATABASE") {
Ok(v) => v,
Err(_) => get_str("TABLES_DATABASE", "botserver"),
},
};
let drive = DriveConfig { let drive = DriveConfig {
server: std::env::var("DRIVE_SERVER").unwrap(), server: std::env::var("DRIVE_SERVER").unwrap(),
@ -129,7 +86,6 @@ impl AppConfig {
host: get_str("SERVER_HOST", "127.0.0.1"), host: get_str("SERVER_HOST", "127.0.0.1"),
port: get_u16("SERVER_PORT", 8080), port: get_u16("SERVER_PORT", 8080),
}, },
database,
site_path: { site_path: {
ConfigManager::new(pool.clone()) ConfigManager::new(pool.clone())
.get_config(&Uuid::nil(), "SITES_ROOT", Some("./botserver-stack/sites"))? .get_config(&Uuid::nil(), "SITES_ROOT", Some("./botserver-stack/sites"))?
@ -138,16 +94,6 @@ impl AppConfig {
}) })
} }
pub fn from_env() -> Result<Self, anyhow::Error> { pub fn from_env() -> Result<Self, anyhow::Error> {
let database_url = std::env::var("DATABASE_URL").unwrap();
let (db_username, db_password, db_server, db_port, db_name) =
crate::shared::utils::parse_database_url(&database_url);
let database = DatabaseConfig {
username: db_username,
password: db_password,
server: db_server,
port: db_port,
database: db_name,
};
let minio = DriveConfig { let minio = DriveConfig {
server: std::env::var("DRIVE_SERVER").unwrap(), server: std::env::var("DRIVE_SERVER").unwrap(),
access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(), access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(),
@ -162,7 +108,6 @@ impl AppConfig {
.and_then(|p| p.parse().ok()) .and_then(|p| p.parse().ok())
.unwrap_or(8080), .unwrap_or(8080),
}, },
database,
site_path: { site_path: {
let pool = create_conn()?; let pool = create_conn()?;
ConfigManager::new(pool).get_config( ConfigManager::new(pool).get_config(

View file

@ -41,11 +41,11 @@ use crate::email::{
}; };
use crate::file::upload_file; use crate::file::upload_file;
use crate::meet::{voice_start, voice_stop}; use crate::meet::{voice_start, voice_stop};
use crate::shared::utils::create_s3_operator;
use crate::package_manager::InstallMode; use crate::package_manager::InstallMode;
use crate::session::{create_session, get_session_history, get_sessions, start_session}; use crate::session::{create_session, get_session_history, get_sessions, start_session};
use crate::shared::state::AppState; use crate::shared::state::AppState;
use crate::shared::utils::create_conn; use crate::shared::utils::create_conn;
use crate::shared::utils::create_s3_operator;
use crate::web_server::{bot_index, index, static_files}; use crate::web_server::{bot_index, index, static_files};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BootstrapProgress { pub enum BootstrapProgress {
@ -60,6 +60,13 @@ pub enum BootstrapProgress {
} }
#[tokio::main] #[tokio::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
dotenv().ok();
println!(
"Starting {} {}...",
std::env::var("PLATFORM_NAME").unwrap_or("General Bots".to_string()),
env!("CARGO_PKG_VERSION")
);
use crate::llm::local::ensure_llama_servers_running; use crate::llm::local::ensure_llama_servers_running;
use botserver::config::ConfigManager; use botserver::config::ConfigManager;
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
@ -149,26 +156,20 @@ async fn main() -> std::io::Result<()> {
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await; let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await;
let env_path = std::env::current_dir().unwrap().join(".env"); let env_path = std::env::current_dir().unwrap().join(".env");
let cfg = if env_path.exists() { let cfg = if env_path.exists() {
progress_tx_clone progress_tx_clone
.send(BootstrapProgress::StartingComponent( .send(BootstrapProgress::StartingComponent(
"all services".to_string(), "all services".to_string(),
)) ))
.ok(); .ok();
bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; bootstrap
.start_all()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
progress_tx_clone progress_tx_clone
.send(BootstrapProgress::ConnectingDatabase) .send(BootstrapProgress::ConnectingDatabase)
.ok(); .ok();
match create_conn() { match create_conn() {
Ok(pool) => { Ok(pool) => {
let mut conn = pool.get().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Database connection failed: {}", e),
)
})?;
AppConfig::from_database(&pool) AppConfig::from_database(&pool)
.unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config")) .unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config"))
} }
@ -182,7 +183,9 @@ async fn main() -> std::io::Result<()> {
"all services".to_string(), "all services".to_string(),
)) ))
.ok(); .ok();
bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; bootstrap
.start_all()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
match create_conn() { match create_conn() {
Ok(pool) => AppConfig::from_database(&pool) Ok(pool) => AppConfig::from_database(&pool)

View file

@ -288,6 +288,7 @@ impl SessionManager {
1 => "user".to_string(), 1 => "user".to_string(),
2 => "assistant".to_string(), 2 => "assistant".to_string(),
3 => "system".to_string(), 3 => "system".to_string(),
9 => "compact".to_string(),
_ => "unknown".to_string(), _ => "unknown".to_string(),
}; };
history.push((role_str, content)); history.push((role_str, content));

View file

@ -5,8 +5,8 @@ use crate::shared::state::AppState;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum TreeNode { pub enum TreeNode {
Bucket { name: String }, Bucket { name: String },
Folder { bucket: String, path: String, name: String }, Folder { bucket: String, path: String },
File { bucket: String, path: String, name: String }, File { bucket: String, path: String },
} }
pub struct FileTree { pub struct FileTree {
@ -100,7 +100,6 @@ impl FileTree {
self.items.push(("⬆️ .. (go back)".to_string(), TreeNode::Folder { self.items.push(("⬆️ .. (go back)".to_string(), TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: "..".to_string(), path: "..".to_string(),
name: "..".to_string(),
})); }));
if let Some(drive) = &self.app_state.drive { if let Some(drive) = &self.app_state.drive {
@ -178,7 +177,6 @@ impl FileTree {
self.items.push((display, TreeNode::Folder { self.items.push((display, TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: full_path, path: full_path,
name: folder_name,
})); }));
} }
@ -201,7 +199,6 @@ impl FileTree {
self.items.push((display, TreeNode::File { self.items.push((display, TreeNode::File {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: full_path, path: full_path,
name,
})); }));
} }
} }
@ -210,7 +207,6 @@ impl FileTree {
self.items.push(("(empty folder)".to_string(), TreeNode::Folder { self.items.push(("(empty folder)".to_string(), TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: String::new(), path: String::new(),
name: String::new(),
})); }));
} }