From 1a4b56e312e5eb35ca301a3bb05931ada058afd6 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Thu, 6 Nov 2025 18:02:04 -0300 Subject: [PATCH] feat(automation): add compaction lock and improve prompt summarization Added `once_cell` and `scopeguard` dependencies to implement thread-safe compaction lock mechanism. Modified `compact_prompt_for_bot` to: - Prevent concurrent compaction for the same bot using a global lock - Add proper tracing and error handling - Improve summarization with content filtering - Clean up locks automatically using scopeguard - Remove redundant threshold check and compact entire history The changes ensure thread safety during prompt compaction and provide better observability through tracing. --- Cargo.lock | 2 + Cargo.toml | 2 + src/automation/compact_prompt.rs | 66 +++++++++++++++++++++++++++----- src/session/mod.rs | 8 ++++ 4 files changed, 69 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 309de8df..f2bdbc2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1125,6 +1125,7 @@ dependencies = [ "mockito", "native-tls", "num-format", + "once_cell", "pdf-extract", "qdrant-client", "rand 0.9.2", @@ -1132,6 +1133,7 @@ dependencies = [ "regex", "reqwest", "rhai", + "scopeguard", "serde", "serde_json", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 3c75c06b..80754733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ email = ["imap"] desktop = [] [dependencies] +scopeguard = "1.2.0" +once_cell = "1.18.0" actix-cors = "0.7" actix-multipart = "0.7" actix-web = "4.9" diff --git a/src/automation/compact_prompt.rs b/src/automation/compact_prompt.rs index 51b0441f..ee8cd54b 100644 --- a/src/automation/compact_prompt.rs +++ b/src/automation/compact_prompt.rs @@ -1,8 +1,10 @@ use crate::config::ConfigManager; +use crate::llm_models; use crate::shared::models::Automation; use crate::shared::state::AppState; use diesel::prelude::*; -use log::{error, info}; +use log::{error, info, trace}; +use std::collections::HashSet; use std::sync::Arc; use tokio::time::{interval, Duration}; use uuid::Uuid; @@ -51,6 +53,32 @@ async fn compact_prompt_for_bot( state: &Arc, automation: &Automation, ) -> Result<(), Box> { + // Skip if already compacting this bot + use once_cell::sync::Lazy; + use scopeguard::guard; + static IN_PROGRESS: Lazy>> = Lazy::new(|| { + tokio::sync::Mutex::new(HashSet::new()) + }); + + { + let mut in_progress = IN_PROGRESS.lock().await; + if in_progress.contains(&automation.bot_id) { + trace!("Skipping compaction for bot {} - already in progress", automation.bot_id); + return Ok(()); + } + in_progress.insert(automation.bot_id); + } + + // Ensure cleanup happens when function exits + let bot_id = automation.bot_id; + let _cleanup = guard((), |_| { + tokio::spawn(async move { + let mut in_progress = IN_PROGRESS.lock().await; + in_progress.remove(&bot_id); + trace!("Released compaction lock for bot {}", bot_id); + }); + }); + info!("Executing prompt compaction for bot: {}", automation.bot_id); let config_manager = ConfigManager::new(Arc::clone(&state.conn)); @@ -80,15 +108,15 @@ async fn compact_prompt_for_bot( session_manager.get_conversation_history(session.id, session.user_id)? }; - if history.len() > compact_threshold { info!( "Compacting prompt for session {}: {} messages", session.id, history.len() ); + // Compact entire conversation history when threshold is reached let mut compacted = String::new(); - for (role, content) in &history[..history.len() - compact_threshold] { + for (role, content) in &history { compacted.push_str(&format!("{}: {}\n", role, content)); } @@ -96,11 +124,26 @@ async fn compact_prompt_for_bot( let llm_provider = state.llm_provider.clone(); let compacted_clone = compacted.clone(); - // Run LLM summarization - let summarized = match llm_provider.generate(&compacted_clone, &serde_json::Value::Null).await { - Ok(summary) => format!("SUMMARY: {}", summary), + // Run LLM summarization with proper tracing and filtering + trace!("Starting summarization for session {}", session.id); + let summarized = match llm_provider.summarize(&compacted_clone).await { + Ok(summary) => { + trace!("Successfully summarized session {} ({} chars)", + session.id, summary.len()); + // Use handler to filter content + let handler = llm_models::get_handler( + &config_manager.get_config( + &automation.bot_id, + "llm-model", + None + ).unwrap_or_default() + ); + let filtered = handler.process_content(&summary); + format!("SUMMARY: {}", filtered) + }, Err(e) => { - error!("Failed to summarize conversation: {}", e); + error!("Failed to summarize conversation for session {}: {}", session.id, e); + trace!("Using fallback summary for session {}", session.id); format!("SUMMARY: {}", compacted) // Fallback } }; @@ -110,12 +153,17 @@ async fn compact_prompt_for_bot( history.len() ); - // Save with minimal lock time + // Remove all old messages and save only the summary { let mut session_manager = state.session_manager.lock().await; + // First delete all existing messages for this session + if let Err(e) = session_manager.clear_messages(session.id) { + error!("Failed to clear messages for session {}: {}", session.id, e); + return Err(e); + } + // Then save just the summary session_manager.save_message(session.id, session.user_id, 3, &summarized, 1)?; } - } } Ok(()) diff --git a/src/session/mod.rs b/src/session/mod.rs index 7ace2d22..93d70796 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -174,6 +174,14 @@ impl SessionManager { Ok(inserted) } + pub fn clear_messages(&mut self, session_id: Uuid) -> Result<(), Box> { + use crate::shared::models::message_history::dsl::*; + + diesel::delete(message_history.filter(session_id.eq(session_id))) + .execute(&mut self.conn)?; + Ok(()) + } + pub fn save_message( &mut self, sess_id: Uuid,