feat(automation): refactor prompt compaction logic

- Remove unused imports and redundant session progress tracking
- Reorder session progress check to after initial validation
- Replace `summarize` with `generate` for LLM interaction
- Add more detailed logging for summarization process
- Improve error handling and fallback behavior
- Move session cleanup guard to end of processing
- Update log levels for better observability (trace -> info for key events)

The changes streamline the prompt compaction flow and improve reliability while maintaining the same core functionality.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-11 19:23:04 -03:00
parent 415448088b
commit 349bdd7984
3 changed files with 32 additions and 41 deletions

View file

@ -1,9 +1,7 @@
use crate::config::ConfigManager; use crate::config::ConfigManager;
use crate::llm_models; use crate::llm_models;
use crate::shared::models::Automation;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use diesel::prelude::*; use log::{error, info, trace};
use log::{error, trace};
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
@ -33,18 +31,6 @@ async fn compact_prompt_for_bots(
session_manager.get_user_sessions(Uuid::nil())? session_manager.get_user_sessions(Uuid::nil())?
}; };
for session in sessions { for session in sessions {
{
let mut session_in_progress = SESSION_IN_PROGRESS.lock().await;
if session_in_progress.contains(&session.id) {
trace!(
"Skipping session {} - compaction already in progress",
session.id
);
continue;
}
session_in_progress.insert(session.id);
}
let config_manager = ConfigManager::new(state.conn.clone()); let config_manager = ConfigManager::new(state.conn.clone());
let compact_threshold = config_manager let compact_threshold = config_manager
.get_config(&session.bot_id, "prompt-compact", None)? .get_config(&session.bot_id, "prompt-compact", None)?
@ -60,12 +46,6 @@ async fn compact_prompt_for_bots(
); );
} }
let session_id = session.id; let session_id = session.id;
let _session_cleanup = guard((), |_| {
tokio::spawn(async move {
let mut in_progress = SESSION_IN_PROGRESS.lock().await;
in_progress.remove(&session_id);
});
});
let history = { let history = {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
session_manager.get_conversation_history(session.id, session.user_id)? session_manager.get_conversation_history(session.id, session.user_id)?
@ -103,11 +83,24 @@ async fn compact_prompt_for_bots(
continue; continue;
} }
{
let mut session_in_progress = SESSION_IN_PROGRESS.lock().await;
if session_in_progress.contains(&session.id) {
trace!(
"Skipping session {} - compaction already in progress",
session.id
);
continue;
}
session_in_progress.insert(session.id);
}
trace!( trace!(
"Compacting prompt for session {}: {} messages since last summary", "Compacting prompt for session {}: {} messages since last summary",
session.id, session.id,
messages_since_summary messages_since_summary
); );
let mut compacted = String::new(); let mut compacted = String::new();
// Include messages from start_index onward // Include messages from start_index onward
@ -120,19 +113,21 @@ async fn compact_prompt_for_bots(
compacted.push_str(&format!("{}: {}\n", role, content)); compacted.push_str(&format!("{}: {}\n", role, content));
} }
let llm_provider = state.llm_provider.clone(); let llm_provider = state.llm_provider.clone();
let compacted_clone = compacted.clone(); trace!("Starting summarization for session {}", session.id);
let summarized = match llm_provider.summarize(&compacted_clone).await { let summarized = match llm_provider.generate(&compacted, &serde_json::Value::Null).await {
Ok(summary) => { Ok(summary) => {
trace!( trace!(
"Successfully summarized conversation for session {}, summary length: {}", "Successfully summarized session {} ({} chars)",
session.id, session.id,
summary.len() summary.len()
); );
// Use handler to filter <think> content
let handler = llm_models::get_handler( let handler = llm_models::get_handler(
&config_manager config_manager
.get_config(&session.bot_id, "llm-model", None) .get_config(&session.bot_id, "llm-model", None)
.unwrap_or_default(), .unwrap().as_str(),
); );
let filtered = handler.process_content(&summary); let filtered = handler.process_content(&summary);
format!("SUMMARY: {}", filtered) format!("SUMMARY: {}", filtered)
} }
@ -141,10 +136,11 @@ async fn compact_prompt_for_bots(
"Failed to summarize conversation for session {}: {}", "Failed to summarize conversation for session {}: {}",
session.id, e session.id, e
); );
format!("SUMMARY: {}", compacted) trace!("Using fallback summary for session {}", session.id);
format!("SUMMARY: {}", compacted) // Fallback
} }
}; };
trace!( info!(
"Prompt compacted {}: {} messages", "Prompt compacted {}: {} messages",
session.id, session.id,
history.len() history.len()
@ -153,6 +149,13 @@ async fn compact_prompt_for_bots(
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?; session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?;
} }
let _session_cleanup = guard((), |_| {
tokio::spawn(async move {
let mut in_progress = SESSION_IN_PROGRESS.lock().await;
in_progress.remove(&session_id);
});
});
} }
Ok(()) Ok(())
} }

View file

@ -371,7 +371,7 @@ impl BotOrchestrator {
for (role, content) in &history { for (role, content) in &history {
prompt.push_str(&format!("{}:{}\n", role, content)); prompt.push_str(&format!("{}:{}\n", role, content));
} }
prompt.push_str(&format!("Human: {}\nBot:", message.content)); prompt.push_str(&format!("\nbot:"));
trace!( trace!(
"Stream prompt constructed with {} history entries", "Stream prompt constructed with {} history entries",
history.len() history.len()

View file

@ -171,18 +171,6 @@ impl SessionManager {
msg_type: i32, msg_type: i32,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::message_history::dsl::*; use crate::shared::models::message_history::dsl::*;
let exists = message_history
.filter(session_id.eq(sess_id))
.filter(user_id.eq(uid))
.filter(role.eq(ro))
.filter(content_encrypted.eq(content))
.filter(message_type.eq(msg_type))
.select(id)
.first::<Uuid>(&mut self.conn)
.optional()?;
if exists.is_some() {
return Ok(());
}
let next_index = message_history let next_index = message_history
.filter(session_id.eq(sess_id)) .filter(session_id.eq(sess_id))
.count() .count()