feat(automation): add compaction lock and improve prompt summarization

Added `once_cell` and `scopeguard` dependencies to implement thread-safe compaction lock mechanism. Modified `compact_prompt_for_bot` to:
- Prevent concurrent compaction for the same bot using a global lock
- Add proper tracing and error handling
- Improve summarization with content filtering
- Clean up locks automatically using scopeguard
- Remove redundant threshold check and compact entire history

The changes ensure thread safety during prompt compaction and provide better observability through tracing.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-06 18:02:04 -03:00
parent 4ce06daf75
commit 1a4b56e312
4 changed files with 69 additions and 9 deletions

2
Cargo.lock generated
View file

@ -1125,6 +1125,7 @@ dependencies = [
"mockito", "mockito",
"native-tls", "native-tls",
"num-format", "num-format",
"once_cell",
"pdf-extract", "pdf-extract",
"qdrant-client", "qdrant-client",
"rand 0.9.2", "rand 0.9.2",
@ -1132,6 +1133,7 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"rhai", "rhai",
"scopeguard",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2",

View file

@ -43,6 +43,8 @@ email = ["imap"]
desktop = [] desktop = []
[dependencies] [dependencies]
scopeguard = "1.2.0"
once_cell = "1.18.0"
actix-cors = "0.7" actix-cors = "0.7"
actix-multipart = "0.7" actix-multipart = "0.7"
actix-web = "4.9" actix-web = "4.9"

View file

@ -1,8 +1,10 @@
use crate::config::ConfigManager; use crate::config::ConfigManager;
use crate::llm_models;
use crate::shared::models::Automation; use crate::shared::models::Automation;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use diesel::prelude::*; use diesel::prelude::*;
use log::{error, info}; use log::{error, info, trace};
use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
use uuid::Uuid; use uuid::Uuid;
@ -51,6 +53,32 @@ async fn compact_prompt_for_bot(
state: &Arc<AppState>, state: &Arc<AppState>,
automation: &Automation, automation: &Automation,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Skip if already compacting this bot
use once_cell::sync::Lazy;
use scopeguard::guard;
static IN_PROGRESS: Lazy<tokio::sync::Mutex<HashSet<Uuid>>> = Lazy::new(|| {
tokio::sync::Mutex::new(HashSet::new())
});
{
let mut in_progress = IN_PROGRESS.lock().await;
if in_progress.contains(&automation.bot_id) {
trace!("Skipping compaction for bot {} - already in progress", automation.bot_id);
return Ok(());
}
in_progress.insert(automation.bot_id);
}
// Ensure cleanup happens when function exits
let bot_id = automation.bot_id;
let _cleanup = guard((), |_| {
tokio::spawn(async move {
let mut in_progress = IN_PROGRESS.lock().await;
in_progress.remove(&bot_id);
trace!("Released compaction lock for bot {}", bot_id);
});
});
info!("Executing prompt compaction for bot: {}", automation.bot_id); info!("Executing prompt compaction for bot: {}", automation.bot_id);
let config_manager = ConfigManager::new(Arc::clone(&state.conn)); let config_manager = ConfigManager::new(Arc::clone(&state.conn));
@ -80,15 +108,15 @@ async fn compact_prompt_for_bot(
session_manager.get_conversation_history(session.id, session.user_id)? session_manager.get_conversation_history(session.id, session.user_id)?
}; };
if history.len() > compact_threshold {
info!( info!(
"Compacting prompt for session {}: {} messages", "Compacting prompt for session {}: {} messages",
session.id, session.id,
history.len() history.len()
); );
// Compact entire conversation history when threshold is reached
let mut compacted = String::new(); let mut compacted = String::new();
for (role, content) in &history[..history.len() - compact_threshold] { for (role, content) in &history {
compacted.push_str(&format!("{}: {}\n", role, content)); compacted.push_str(&format!("{}: {}\n", role, content));
} }
@ -96,11 +124,26 @@ async fn compact_prompt_for_bot(
let llm_provider = state.llm_provider.clone(); let llm_provider = state.llm_provider.clone();
let compacted_clone = compacted.clone(); let compacted_clone = compacted.clone();
// Run LLM summarization // Run LLM summarization with proper tracing and filtering
let summarized = match llm_provider.generate(&compacted_clone, &serde_json::Value::Null).await { trace!("Starting summarization for session {}", session.id);
Ok(summary) => format!("SUMMARY: {}", summary), let summarized = match llm_provider.summarize(&compacted_clone).await {
Ok(summary) => {
trace!("Successfully summarized session {} ({} chars)",
session.id, summary.len());
// Use handler to filter <think> content
let handler = llm_models::get_handler(
&config_manager.get_config(
&automation.bot_id,
"llm-model",
None
).unwrap_or_default()
);
let filtered = handler.process_content(&summary);
format!("SUMMARY: {}", filtered)
},
Err(e) => { Err(e) => {
error!("Failed to summarize conversation: {}", e); error!("Failed to summarize conversation for session {}: {}", session.id, e);
trace!("Using fallback summary for session {}", session.id);
format!("SUMMARY: {}", compacted) // Fallback format!("SUMMARY: {}", compacted) // Fallback
} }
}; };
@ -110,12 +153,17 @@ async fn compact_prompt_for_bot(
history.len() history.len()
); );
// Save with minimal lock time // Remove all old messages and save only the summary
{ {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
// First delete all existing messages for this session
if let Err(e) = session_manager.clear_messages(session.id) {
error!("Failed to clear messages for session {}: {}", session.id, e);
return Err(e);
}
// Then save just the summary
session_manager.save_message(session.id, session.user_id, 3, &summarized, 1)?; session_manager.save_message(session.id, session.user_id, 3, &summarized, 1)?;
} }
}
} }
Ok(()) Ok(())

View file

@ -174,6 +174,14 @@ impl SessionManager {
Ok(inserted) Ok(inserted)
} }
pub fn clear_messages(&mut self, session_id: Uuid) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::message_history::dsl::*;
diesel::delete(message_history.filter(session_id.eq(session_id)))
.execute(&mut self.conn)?;
Ok(())
}
pub fn save_message( pub fn save_message(
&mut self, &mut self,
sess_id: Uuid, sess_id: Uuid,