feat(automation): improve prompt compaction scheduling and error handling

Refactor the compact prompt scheduler to use proper indentation and improve error logging. Added more detailed error messages for prompt compaction failures and included bot_id in error logs. The changes make the code more maintainable and debugging easier while maintaining the same functionality.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-11 15:01:57 -03:00
parent b52e4b2737
commit ee442b1d5c
6 changed files with 183 additions and 175 deletions

View file

@ -3,100 +3,163 @@ use crate::llm_models;
use crate::shared::models::Automation; use crate::shared::models::Automation;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use diesel::prelude::*; use diesel::prelude::*;
use log::{error, trace}; use log::{error, trace};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
use uuid::Uuid; use uuid::Uuid;
pub fn start_compact_prompt_scheduler(state: Arc<AppState>) { pub fn start_compact_prompt_scheduler(state: Arc<AppState>) {
tokio::spawn(async move { tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
let mut interval = interval(Duration::from_secs(60)); let mut interval = interval(Duration::from_secs(60));
loop { loop {
interval.tick().await; interval.tick().await;
if let Err(e) = execute_compact_prompt(Arc::clone(&state)).await { if let Err(e) = execute_compact_prompt(Arc::clone(&state)).await {
error!("Prompt compaction failed: {}", e); error!("Prompt compaction failed: {}", e);
} }
} }
}); });
} }
async fn execute_compact_prompt(state: Arc<AppState>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn execute_compact_prompt(
use crate::shared::models::system_automations::dsl::{is_active, system_automations}; state: Arc<AppState>,
let automations: Vec<Automation> = { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = state.conn.get().map_err(|e| format!("Failed to acquire lock: {}", e))?; use crate::shared::models::system_automations::dsl::{is_active, system_automations};
system_automations.filter(is_active.eq(true)).load::<Automation>(&mut *conn)? let automations: Vec<Automation> = {
}; let mut conn = state
for automation in automations { .conn
if let Err(e) = compact_prompt_for_bot(&state, &automation).await { .get()
error!("Failed to compact prompt for bot {}: {}", automation.bot_id, e); .map_err(|e| format!("Failed to acquire lock: {}", e))?;
} system_automations
} .filter(is_active.eq(true))
Ok(()) .load::<Automation>(&mut *conn)?
};
for automation in automations {
if let Err(e) = compact_prompt_for_bot(&state, &automation).await {
error!(
"Failed to compact prompt for bot {}: {}",
automation.bot_id, e
);
}
}
Ok(())
} }
async fn compact_prompt_for_bot(state: &Arc<AppState>, automation: &Automation) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn compact_prompt_for_bot(
use once_cell::sync::Lazy; state: &Arc<AppState>,
use scopeguard::guard; automation: &Automation,
static IN_PROGRESS: Lazy<tokio::sync::Mutex<HashSet<Uuid>>> = Lazy::new(|| tokio::sync::Mutex::new(HashSet::new())); ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
{ use once_cell::sync::Lazy;
let mut in_progress = IN_PROGRESS.lock().await; use scopeguard::guard;
if in_progress.contains(&automation.bot_id) { static IN_PROGRESS: Lazy<tokio::sync::Mutex<HashSet<Uuid>>> =
return Ok(()); Lazy::new(|| tokio::sync::Mutex::new(HashSet::new()));
} {
in_progress.insert(automation.bot_id); let mut in_progress = IN_PROGRESS.lock().await;
} if in_progress.contains(&automation.bot_id) {
let bot_id = automation.bot_id; return Ok(());
let _cleanup = guard((), |_| { }
tokio::spawn(async move { in_progress.insert(automation.bot_id);
let mut in_progress = IN_PROGRESS.lock().await; }
in_progress.remove(&bot_id); let bot_id = automation.bot_id;
}); let _cleanup = guard((), |_| {
}); tokio::spawn(async move {
let config_manager = ConfigManager::new(state.conn.clone()); let mut in_progress = IN_PROGRESS.lock().await;
let compact_threshold = config_manager in_progress.remove(&bot_id);
.get_config(&automation.bot_id, "prompt-compact", None)? });
.parse::<i32>() });
.unwrap_or(0); let config_manager = ConfigManager::new(state.conn.clone());
if compact_threshold == 0 { let compact_threshold = config_manager
return Ok(()); .get_config(&automation.bot_id, "prompt-compact", None)?
} else if compact_threshold < 0 { .parse::<i32>()
trace!("Negative compact threshold detected for bot {}, skipping", automation.bot_id); .unwrap_or(0);
} if compact_threshold == 0 {
let sessions = { return Ok(());
let mut session_manager = state.session_manager.lock().await; } else if compact_threshold < 0 {
session_manager.get_user_sessions(Uuid::nil())? trace!(
}; "Negative compact threshold detected for bot {}, skipping",
for session in sessions { automation.bot_id
if session.bot_id != automation.bot_id { );
continue; }
} let sessions = {
let history = { let mut session_manager = state.session_manager.lock().await;
let mut session_manager = state.session_manager.lock().await; session_manager.get_user_sessions(Uuid::nil())?
session_manager.get_conversation_history(session.id, session.user_id)? };
}; for session in sessions {
trace!("Compacting prompt for session {}: {} messages", session.id, history.len()); if session.bot_id != automation.bot_id {
let mut compacted = String::new(); trace!("Skipping session {} - bot_id {} doesn't match automation bot_id {}",
for (role, content) in &history { session.id, session.bot_id, automation.bot_id);
compacted.push_str(&format!("{}: {}\n", role, content)); continue;
} }
let llm_provider = state.llm_provider.clone(); let history = {
let compacted_clone = compacted.clone(); let mut session_manager = state.session_manager.lock().await;
let summarized = match llm_provider.summarize(&compacted_clone).await { session_manager.get_conversation_history(session.id, session.user_id)?
Ok(summary) => { };
trace!("Successfully summarized conversation for session {}, summary length: {}", session.id, summary.len());
let handler = llm_models::get_handler(&config_manager.get_config(&automation.bot_id, "llm-model", None).unwrap_or_default()); let mut messages_since_summary = 0;
let filtered = handler.process_content(&summary); let mut has_new_messages = false;
format!("SUMMARY: {}", filtered) let mut last_summary_index = history.iter().position(|(role, _)|
}, role == "compact")
Err(e) => { .unwrap_or(0);
error!("Failed to summarize conversation for session {}: {}", session.id, e);
format!("SUMMARY: {}", compacted) for (i, (role, _)) in history.iter().enumerate().skip(last_summary_index + 1) {
} if role == "compact" {
}; continue;
trace!("Prompt compacted {}: {} messages", session.id, history.len()); }
{ messages_since_summary += 1;
let mut session_manager = state.session_manager.lock().await; has_new_messages = true;
session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; }
}
} if !has_new_messages {
Ok(()) trace!("Skipping session {} - no new messages since last summary", session.id);
continue;
}
if messages_since_summary < compact_threshold as usize {
trace!("Skipping compaction for session {} - only {} new messages since last summary (threshold: {})",
session.id, messages_since_summary, compact_threshold);
continue;
}
trace!(
"Compacting prompt for session {}: {} messages since last summary",
session.id,
messages_since_summary
);
let mut compacted = String::new();
for (role, content) in &history {
compacted.push_str(&format!("{}: {}\n", role, content));
}
let llm_provider = state.llm_provider.clone();
let compacted_clone = compacted.clone();
let summarized = match llm_provider.summarize(&compacted_clone).await {
Ok(summary) => {
trace!(
"Successfully summarized conversation for session {}, summary length: {}",
session.id,
summary.len()
);
let handler = llm_models::get_handler(
&config_manager
.get_config(&automation.bot_id, "llm-model", None)
.unwrap_or_default(),
);
let filtered = handler.process_content(&summary);
format!("SUMMARY: {}", filtered)
}
Err(e) => {
error!(
"Failed to summarize conversation for session {}: {}",
session.id, e
);
format!("SUMMARY: {}", compacted)
}
};
trace!(
"Prompt compacted {}: {} messages",
session.id,
history.len()
);
{
let mut session_manager = state.session_manager.lock().await;
session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?;
}
}
Ok(())
} }

View file

@ -416,7 +416,7 @@ impl BotOrchestrator {
// Skip all messages before the most recent compacted message (type 9) // Skip all messages before the most recent compacted message (type 9)
if let Some(last_compacted_index) = history if let Some(last_compacted_index) = history
.iter() .iter()
.rposition(|(role, content)| role == "COMPACTED" || content.starts_with("SUMMARY:")) .rposition(|(role, _content)| role == "compact")
{ {
history = history.split_off(last_compacted_index); history = history.split_off(last_compacted_index);
} }

View file

@ -7,18 +7,9 @@ use uuid::Uuid;
pub struct AppConfig { pub struct AppConfig {
pub drive: DriveConfig, pub drive: DriveConfig,
pub server: ServerConfig, pub server: ServerConfig,
pub database: DatabaseConfig,
pub site_path: String, pub site_path: String,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct DatabaseConfig {
pub username: String,
pub password: String,
pub server: String,
pub port: u32,
pub database: String,
}
#[derive(Clone)]
pub struct DriveConfig { pub struct DriveConfig {
pub server: String, pub server: String,
pub access_key: String, pub access_key: String,
@ -29,18 +20,6 @@ pub struct ServerConfig {
pub host: String, pub host: String,
pub port: u16, pub port: u16,
} }
impl AppConfig {
pub fn database_url(&self) -> String {
format!(
"postgres://{}:{}@{}:{}/{}",
self.database.username,
self.database.password,
self.database.server,
self.database.port,
self.database.database
)
}
}
impl AppConfig { impl AppConfig {
pub fn from_database(pool: &DbPool) -> Result<Self, diesel::result::Error> { pub fn from_database(pool: &DbPool) -> Result<Self, diesel::result::Error> {
use crate::shared::models::schema::bot_configuration::dsl::*; use crate::shared::models::schema::bot_configuration::dsl::*;
@ -77,7 +56,7 @@ impl AppConfig {
.first::<String>(&mut conn) .first::<String>(&mut conn)
.unwrap_or_else(|_| default.to_string()) .unwrap_or_else(|_| default.to_string())
}; };
let get_u32 = |key: &str, default: u32| -> u32 { let _get_u32 = |key: &str, default: u32| -> u32 {
config_map config_map
.get(key) .get(key)
.and_then(|v| v.3.parse().ok()) .and_then(|v| v.3.parse().ok())
@ -95,28 +74,6 @@ impl AppConfig {
.map(|v| v.3.to_lowercase() == "true") .map(|v| v.3.to_lowercase() == "true")
.unwrap_or(default) .unwrap_or(default)
}; };
let database = DatabaseConfig {
username: match std::env::var("TABLES_USERNAME") {
Ok(v) => v,
Err(_) => get_str("TABLES_USERNAME", "gbuser"),
},
password: match std::env::var("TABLES_PASSWORD") {
Ok(v) => v,
Err(_) => get_str("TABLES_PASSWORD", ""),
},
server: match std::env::var("TABLES_SERVER") {
Ok(v) => v,
Err(_) => get_str("TABLES_SERVER", "localhost"),
},
port: std::env::var("TABLES_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or_else(|| get_u32("TABLES_PORT", 5432)),
database: match std::env::var("TABLES_DATABASE") {
Ok(v) => v,
Err(_) => get_str("TABLES_DATABASE", "botserver"),
},
};
let drive = DriveConfig { let drive = DriveConfig {
server: std::env::var("DRIVE_SERVER").unwrap(), server: std::env::var("DRIVE_SERVER").unwrap(),
@ -129,7 +86,6 @@ impl AppConfig {
host: get_str("SERVER_HOST", "127.0.0.1"), host: get_str("SERVER_HOST", "127.0.0.1"),
port: get_u16("SERVER_PORT", 8080), port: get_u16("SERVER_PORT", 8080),
}, },
database,
site_path: { site_path: {
ConfigManager::new(pool.clone()) ConfigManager::new(pool.clone())
.get_config(&Uuid::nil(), "SITES_ROOT", Some("./botserver-stack/sites"))? .get_config(&Uuid::nil(), "SITES_ROOT", Some("./botserver-stack/sites"))?
@ -138,16 +94,6 @@ impl AppConfig {
}) })
} }
pub fn from_env() -> Result<Self, anyhow::Error> { pub fn from_env() -> Result<Self, anyhow::Error> {
let database_url = std::env::var("DATABASE_URL").unwrap();
let (db_username, db_password, db_server, db_port, db_name) =
crate::shared::utils::parse_database_url(&database_url);
let database = DatabaseConfig {
username: db_username,
password: db_password,
server: db_server,
port: db_port,
database: db_name,
};
let minio = DriveConfig { let minio = DriveConfig {
server: std::env::var("DRIVE_SERVER").unwrap(), server: std::env::var("DRIVE_SERVER").unwrap(),
access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(), access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(),
@ -162,7 +108,6 @@ impl AppConfig {
.and_then(|p| p.parse().ok()) .and_then(|p| p.parse().ok())
.unwrap_or(8080), .unwrap_or(8080),
}, },
database,
site_path: { site_path: {
let pool = create_conn()?; let pool = create_conn()?;
ConfigManager::new(pool).get_config( ConfigManager::new(pool).get_config(

View file

@ -41,11 +41,11 @@ use crate::email::{
}; };
use crate::file::upload_file; use crate::file::upload_file;
use crate::meet::{voice_start, voice_stop}; use crate::meet::{voice_start, voice_stop};
use crate::shared::utils::create_s3_operator;
use crate::package_manager::InstallMode; use crate::package_manager::InstallMode;
use crate::session::{create_session, get_session_history, get_sessions, start_session}; use crate::session::{create_session, get_session_history, get_sessions, start_session};
use crate::shared::state::AppState; use crate::shared::state::AppState;
use crate::shared::utils::create_conn; use crate::shared::utils::create_conn;
use crate::shared::utils::create_s3_operator;
use crate::web_server::{bot_index, index, static_files}; use crate::web_server::{bot_index, index, static_files};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BootstrapProgress { pub enum BootstrapProgress {
@ -60,6 +60,13 @@ pub enum BootstrapProgress {
} }
#[tokio::main] #[tokio::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
dotenv().ok();
println!(
"Starting {} {}...",
std::env::var("PLATFORM_NAME").unwrap_or("General Bots".to_string()),
env!("CARGO_PKG_VERSION")
);
use crate::llm::local::ensure_llama_servers_running; use crate::llm::local::ensure_llama_servers_running;
use botserver::config::ConfigManager; use botserver::config::ConfigManager;
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
@ -149,26 +156,20 @@ async fn main() -> std::io::Result<()> {
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await; let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await;
let env_path = std::env::current_dir().unwrap().join(".env"); let env_path = std::env::current_dir().unwrap().join(".env");
let cfg = if env_path.exists() { let cfg = if env_path.exists() {
progress_tx_clone
.send(BootstrapProgress::StartingComponent(
progress_tx_clone "all services".to_string(),
.send(BootstrapProgress::StartingComponent( ))
"all services".to_string(), .ok();
)) bootstrap
.ok(); .start_all()
bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
progress_tx_clone progress_tx_clone
.send(BootstrapProgress::ConnectingDatabase) .send(BootstrapProgress::ConnectingDatabase)
.ok(); .ok();
match create_conn() { match create_conn() {
Ok(pool) => { Ok(pool) => {
let mut conn = pool.get().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Database connection failed: {}", e),
)
})?;
AppConfig::from_database(&pool) AppConfig::from_database(&pool)
.unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config")) .unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config"))
} }
@ -177,12 +178,14 @@ async fn main() -> std::io::Result<()> {
} else { } else {
bootstrap.bootstrap().await; bootstrap.bootstrap().await;
progress_tx_clone progress_tx_clone
.send(BootstrapProgress::StartingComponent( .send(BootstrapProgress::StartingComponent(
"all services".to_string(), "all services".to_string(),
)) ))
.ok(); .ok();
bootstrap.start_all().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; bootstrap
.start_all()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
match create_conn() { match create_conn() {
Ok(pool) => AppConfig::from_database(&pool) Ok(pool) => AppConfig::from_database(&pool)
@ -190,7 +193,7 @@ async fn main() -> std::io::Result<()> {
Err(_) => AppConfig::from_env().expect("Failed to load config from env"), Err(_) => AppConfig::from_env().expect("Failed to load config from env"),
} }
}; };
progress_tx_clone progress_tx_clone
.send(BootstrapProgress::UploadingTemplates) .send(BootstrapProgress::UploadingTemplates)
.ok(); .ok();

View file

@ -288,6 +288,7 @@ impl SessionManager {
1 => "user".to_string(), 1 => "user".to_string(),
2 => "assistant".to_string(), 2 => "assistant".to_string(),
3 => "system".to_string(), 3 => "system".to_string(),
9 => "compact".to_string(),
_ => "unknown".to_string(), _ => "unknown".to_string(),
}; };
history.push((role_str, content)); history.push((role_str, content));

View file

@ -5,8 +5,8 @@ use crate::shared::state::AppState;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum TreeNode { pub enum TreeNode {
Bucket { name: String }, Bucket { name: String },
Folder { bucket: String, path: String, name: String }, Folder { bucket: String, path: String },
File { bucket: String, path: String, name: String }, File { bucket: String, path: String },
} }
pub struct FileTree { pub struct FileTree {
@ -100,7 +100,6 @@ impl FileTree {
self.items.push(("⬆️ .. (go back)".to_string(), TreeNode::Folder { self.items.push(("⬆️ .. (go back)".to_string(), TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: "..".to_string(), path: "..".to_string(),
name: "..".to_string(),
})); }));
if let Some(drive) = &self.app_state.drive { if let Some(drive) = &self.app_state.drive {
@ -178,7 +177,6 @@ impl FileTree {
self.items.push((display, TreeNode::Folder { self.items.push((display, TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: full_path, path: full_path,
name: folder_name,
})); }));
} }
@ -201,7 +199,6 @@ impl FileTree {
self.items.push((display, TreeNode::File { self.items.push((display, TreeNode::File {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: full_path, path: full_path,
name,
})); }));
} }
} }
@ -210,7 +207,6 @@ impl FileTree {
self.items.push(("(empty folder)".to_string(), TreeNode::Folder { self.items.push(("(empty folder)".to_string(), TreeNode::Folder {
bucket: bucket.to_string(), bucket: bucket.to_string(),
path: String::new(), path: String::new(),
name: String::new(),
})); }));
} }