feat(automation): refactor compact prompt logic and remove unused code

Refactored the compact_prompt_for_bots function to use structured JSON messages instead of plain text formatting. Removed unused execute_compact_prompt method and related code from automation service as the functionality is now handled elsewhere. The changes include:
- Using serde_json to structure messages for LLM
- Improved error handling and fallback mechanism
- Cleaned up obsolete compact prompt execution code
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-11 22:31:19 -03:00
parent be87cc82b5
commit b8ba0a7d41
7 changed files with 60 additions and 201 deletions

View file

@ -62,7 +62,7 @@ async fn compact_prompt_for_bots(
// Calculate start index: if there's a summary, start after it; otherwise start from 0 // Calculate start index: if there's a summary, start after it; otherwise start from 0
let start_index = last_summary_index.map(|idx| idx + 1).unwrap_or(0); let start_index = last_summary_index.map(|idx| idx + 1).unwrap_or(0);
for (i, (role, _)) in history.iter().enumerate().skip(start_index) { for (_i, (role, _)) in history.iter().enumerate().skip(start_index) {
if role == "compact" { if role == "compact" {
continue; continue;
} }
@ -101,21 +101,26 @@ async fn compact_prompt_for_bots(
messages_since_summary messages_since_summary
); );
let mut compacted = "Please summarize the following conversation between a human and an AI assistant:\n".to_string(); let mut messages = Vec::new();
messages.push(serde_json::json!({
"role": "system",
"content": "Please summarize the following conversation between a user and a bot"
}));
// Include messages from start_index onward for (role, content) in history.iter().skip(start_index) {
let messages_to_include = history.iter().skip(start_index);
for (role, content) in messages_to_include {
if role == "compact" { if role == "compact" {
continue; continue;
} }
compacted.push_str(&format!("{}: {}\n", role, content)); messages.push(serde_json::json!({
"role": role,
"content": content
}));
} }
let llm_provider = state.llm_provider.clone(); let llm_provider = state.llm_provider.clone();
trace!("Starting summarization for session {}", session.id); trace!("Starting summarization for session {}", session.id);
let mut filtered = String::new(); let mut filtered = String::new();
let summarized = match llm_provider.generate(&compacted, &serde_json::Value::Null).await { let summarized = match llm_provider.generate("", &serde_json::Value::Array(messages)).await {
Ok(summary) => { Ok(summary) => {
trace!( trace!(
"Successfully summarized session {} ({} chars)", "Successfully summarized session {} ({} chars)",
@ -138,7 +143,7 @@ async fn compact_prompt_for_bots(
session.id, e session.id, e
); );
trace!("Using fallback summary for session {}", session.id); trace!("Using fallback summary for session {}", session.id);
format!("SUMMARY: {}", compacted) // Fallback format!("SUMMARY: {}", filtered) // Fallback
} }
}; };
info!( info!(
@ -148,7 +153,7 @@ async fn compact_prompt_for_bots(
); );
{ {
let mut session_manager = state.session_manager.lock().await; let mut session_manager = state.session_manager.lock().await;
session_manager.save_message(session.id, session.user_id, 9, &filtered, 1)?; session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?;
} }
let _session_cleanup = guard((), |_| { let _session_cleanup = guard((), |_| {

View file

@ -109,63 +109,4 @@ impl AutomationService {
} }
Ok(()) Ok(())
} }
async fn execute_compact_prompt(
&self,
automation: &Automation,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(self.state.conn.clone());
let compact_threshold = config_manager
.get_config(&automation.bot_id, "prompt-compact", None)?
.parse::<usize>()
.unwrap_or(0);
if compact_threshold == 0 {
return Ok(());
}
let mut session_manager = self.state.session_manager.lock().await;
let sessions = session_manager.get_user_sessions(uuid::Uuid::nil())?;
for session in sessions {
if session.bot_id != automation.bot_id {
continue;
}
let history = session_manager.get_conversation_history(session.id, session.user_id)?;
if history.len() > compact_threshold {
trace!(
"Compacting prompt for session {}: {} messages",
session.id,
history.len()
);
let mut compacted = String::new();
for (role, content) in &history[..history.len() - compact_threshold] {
compacted.push_str(&format!("{}: {}\n", role, content));
}
let summarized = format!("SUMMARY: {}", compacted);
session_manager.save_message(session.id, session.user_id, 3, &summarized, 1)?;
}
}
Ok(())
}
}
pub async fn execute_compact_prompt(
state: Arc<crate::shared::state::AppState>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::shared::models::system_automations::dsl::{is_active, system_automations};
use diesel::prelude::*;
let state_clone = state.clone();
let service = AutomationService::new(state_clone);
let mut conn = state
.conn
.get()
.map_err(|e| format!("Failed to acquire database connection: {}", e))?;
let automations: Vec<crate::shared::models::Automation> = system_automations
.filter(is_active.eq(true))
.load::<crate::shared::models::Automation>(&mut conn)?;
for automation in automations {
if let Err(e) = service.execute_compact_prompt(&automation).await {
error!(
"Failed to compact prompt for bot {}: {}",
automation.bot_id, e
);
}
}
Ok(())
} }

View file

@ -1,6 +1,7 @@
mod ui; mod ui;
use crate::config::ConfigManager; use crate::config::ConfigManager;
use crate::drive_monitor::DriveMonitor; use crate::drive_monitor::DriveMonitor;
use crate::llm::OpenAIClient;
use crate::llm_models; use crate::llm_models;
use crate::nvidia::get_system_metrics; use crate::nvidia::get_system_metrics;
use crate::bot::ui::BotUI; use crate::bot::ui::BotUI;
@ -361,22 +362,12 @@ impl BotOrchestrator {
} }
history history
}; };
let mut prompt = String::new(); let messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
if !system_prompt.is_empty() {
prompt.push_str(&format!("SYSTEM: *** {} *** \n", system_prompt));
}
if !context_data.is_empty() {
prompt.push_str(&format!("CONTEXT: *** {} *** \n", context_data));
}
for (role, content) in &history {
prompt.push_str(&format!("{}:{}\n", role, content));
}
prompt.push_str(&format!("\nbot:"));
trace!( trace!(
"Stream prompt constructed with {} history entries", "Stream messages constructed with {} history entries",
history.len() history.len()
); );
trace!("LLM prompt: [{}]", prompt); trace!("LLM messages: {:?}", messages);
let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100); let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100);
let llm = self.state.llm_provider.clone(); let llm = self.state.llm_provider.clone();
if message.channel == "web" { if message.channel == "web" {
@ -406,10 +397,9 @@ impl BotOrchestrator {
}; };
response_tx.send(thinking_response).await?; response_tx.send(thinking_response).await?;
} }
let prompt_clone = prompt.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = llm if let Err(e) = llm
.generate_stream(&prompt_clone, &serde_json::Value::Null, stream_tx) .generate_stream("", &messages, stream_tx)
.await .await
{ {
error!("LLM streaming error: {}", e); error!("LLM streaming error: {}", e);
@ -422,7 +412,7 @@ impl BotOrchestrator {
let mut first_word_received = false; let mut first_word_received = false;
let mut last_progress_update = Instant::now(); let mut last_progress_update = Instant::now();
let progress_interval = Duration::from_secs(1); let progress_interval = Duration::from_secs(1);
let initial_tokens = crate::shared::utils::estimate_token_count(&prompt); let initial_tokens = crate::shared::utils::estimate_token_count(&message.content);
let config_manager = ConfigManager::new(self.state.conn.clone()); let config_manager = ConfigManager::new(self.state.conn.clone());
let max_context_size = config_manager let max_context_size = config_manager
.get_config( .get_config(
@ -482,8 +472,6 @@ impl BotOrchestrator {
let _cpu_bar = "".repeat((metrics.cpu_usage / 5.0).round() as usize); let _cpu_bar = "".repeat((metrics.cpu_usage / 5.0).round() as usize);
let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64;
let _token_bar = "".repeat((token_ratio * 20.0).round() as usize); let _token_bar = "".repeat((token_ratio * 20.0).round() as usize);
let mut ui = BotUI::new().unwrap();
ui.render_progress(current_tokens, max_context_size).unwrap();
} }
last_progress_update = Instant::now(); last_progress_update = Instant::now();
} }
@ -510,7 +498,7 @@ impl BotOrchestrator {
"Stream processing completed, {} chunks processed", "Stream processing completed, {} chunks processed",
chunk_count chunk_count
); );
let total_tokens = crate::shared::utils::estimate_token_count(&prompt) let total_tokens = crate::shared::utils::estimate_token_count(&message.content)
+ crate::shared::utils::estimate_token_count(&context_data) + crate::shared::utils::estimate_token_count(&context_data)
+ crate::shared::utils::estimate_token_count(&full_response); + crate::shared::utils::estimate_token_count(&full_response);
info!( info!(
@ -518,26 +506,6 @@ impl BotOrchestrator {
total_tokens total_tokens
); );
let config_manager = ConfigManager::new( self.state.conn.clone()); let config_manager = ConfigManager::new( self.state.conn.clone());
let compact_enabled = config_manager
.get_config(
&Uuid::parse_str(&message.bot_id).unwrap_or_default(),
"prompt-compact",
None,
)
.unwrap_or_default()
.parse::<i32>()
.unwrap_or(0);
if compact_enabled > 0 {
let state = self.state.clone();
tokio::task::spawn_blocking(move || loop {
if let Err(e) = tokio::runtime::Handle::current()
.block_on(crate::automation::execute_compact_prompt(state.clone()))
{
error!("Failed to execute compact prompt: {}", e);
}
std::thread::sleep(Duration::from_secs(60));
});
}
{ {
let mut sm = self.state.session_manager.lock().await; let mut sm = self.state.session_manager.lock().await;
sm.save_message(session.id, user_id, 2, &full_response, 1)?; sm.save_message(session.id, user_id, 2, &full_response, 1)?;
@ -668,8 +636,6 @@ impl BotOrchestrator {
"Sending warning to session {} on channel {}: {}", "Sending warning to session {} on channel {}: {}",
session_id, channel, message session_id, channel, message
); );
let mut ui = BotUI::new().unwrap();
ui.render_warning(message).unwrap();
Ok(()) Ok(())
} }
pub async fn trigger_auto_welcome( pub async fn trigger_auto_welcome(

View file

@ -17,43 +17,7 @@ impl BotUI {
let terminal = Terminal::new(backend)?; let terminal = Terminal::new(backend)?;
Ok(Self { terminal }) Ok(Self { terminal })
} }
pub fn render_progress(&mut self, current_tokens: usize, max_context_size: usize) -> io::Result<()> { fn render_warning(&mut self, message: &str) -> io::Result<()> {
let metrics = get_system_metrics(current_tokens, max_context_size).unwrap_or_default();
let gpu_usage = metrics.gpu_usage.unwrap_or(0.0);
let cpu_usage = metrics.cpu_usage;
let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64;
self.terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([
Constraint::Length(3),
Constraint::Length(3),
Constraint::Length(3),
Constraint::Min(0),
])
.split(f.area());
let gpu_gauge = Gauge::default()
.block(Block::default().title("GPU Usage").borders(Borders::ALL))
.gauge_style(Style::default().fg(Color::Green).add_modifier(Modifier::BOLD))
.ratio(gpu_usage as f64 / 100.0)
.label(format!("{:.1}%", gpu_usage));
let cpu_gauge = Gauge::default()
.block(Block::default().title("CPU Usage").borders(Borders::ALL))
.gauge_style(Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD))
.ratio(cpu_usage as f64 / 100.0)
.label(format!("{:.1}%", cpu_usage));
let token_gauge = Gauge::default()
.block(Block::default().title("Token Progress").borders(Borders::ALL))
.gauge_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD))
.ratio(token_ratio)
.label(format!("{}/{}", current_tokens, max_context_size));
f.render_widget(gpu_gauge, chunks[0]);
f.render_widget(cpu_gauge, chunks[1]);
f.render_widget(token_gauge, chunks[2]);
})?;
Ok(())
}
pub fn render_warning(&mut self, message: &str) -> io::Result<()> {
self.terminal.draw(|f| { self.terminal.draw(|f| {
let block = Block::default() let block = Block::default()
.title("⚠️ NVIDIA Warning") .title("⚠️ NVIDIA Warning")

View file

@ -204,8 +204,8 @@ pub async fn start_llm_server(
} else { } else {
let mut cmd = tokio::process::Command::new("sh"); let mut cmd = tokio::process::Command::new("sh");
cmd.arg("-c").arg(format!( cmd.arg("-c").arg(format!(
"cd {} && ./llama-server {} --verbose >llm-stdout.log 2>&1 &", "cd {} && ./llama-server {} --verbose >../../../../logs/llm/stdout.log 2>&1 &",
llama_cpp_path, args llama_cpp_path, args
)); ));
info!("Executing LLM server command: cd {} && ./llama-server {} --verbose", llama_cpp_path, args); info!("Executing LLM server command: cd {} && ./llama-server {} --verbose", llama_cpp_path, args);
cmd.spawn()?; cmd.spawn()?;

View file

@ -1,5 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use log::info;
use serde_json::Value; use serde_json::Value;
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub mod local; pub mod local;
@ -39,18 +40,20 @@ impl LLMProvider for OpenAIClient {
async fn generate( async fn generate(
&self, &self,
prompt: &str, prompt: &str,
_config: &Value, messages: &Value,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let messages = self.parse_messages(prompt); let default_messages = serde_json::json!([{"role": "user", "content": prompt}]);
let response = self let response = self
.client .client
.post(&format!("{}/v1/chat/completions/", self.base_url)) .post(&format!("{}/v1/chat/completions/", self.base_url))
.header("Authorization", format!("Bearer {}", self.api_key)) .header("Authorization", format!("Bearer {}", self.api_key))
.json(&serde_json::json!({ .json(&serde_json::json!({
"model": "gpt-3.5-turbo", "model": "gpt-3.5-turbo",
"messages": messages, "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() {
"max_tokens": 1000 messages
} else {
&default_messages
}
})) }))
.send() .send()
.await?; .await?;
@ -69,18 +72,22 @@ impl LLMProvider for OpenAIClient {
async fn generate_stream( async fn generate_stream(
&self, &self,
prompt: &str, prompt: &str,
_config: &Value, messages: &Value,
tx: mpsc::Sender<String>, tx: mpsc::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let messages = self.parse_messages(prompt); let default_messages = serde_json::json!([{"role": "user", "content": prompt}]);
let response = self let response = self
.client .client
.post(&format!("{}/v1/chat/completions", self.base_url)) .post(&format!("{}/v1/chat/completions", self.base_url))
.header("Authorization", format!("Bearer {}", self.api_key)) .header("Authorization", format!("Bearer {}", self.api_key))
.json(&serde_json::json!({ .json(&serde_json::json!({
"model": "gpt-3.5-turbo", "model": "gpt-3.5-turbo",
"messages": messages, "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() {
info!("Using provided messages: {:?}", messages);
messages
} else {
&default_messages
},
"stream": true "stream": true
})) }))
.send() .send()
@ -120,50 +127,26 @@ impl OpenAIClient {
} }
} }
fn parse_messages(&self, prompt: &str) -> Vec<Value> { pub fn build_messages(system_prompt: &str, context_data: &str, history: &[(String, String)]) -> Value {
let mut messages = Vec::new(); let mut messages = Vec::new();
let mut current_role = None; if !system_prompt.is_empty() {
let mut current_content = String::new(); messages.push(serde_json::json!({
"role": "system",
for line in prompt.lines() { "content": system_prompt
if let Some(role_end) = line.find(':') { }));
let role_part = &line[..role_end].trim().to_lowercase();
let role = match role_part.as_str() {
"human" => "user",
"bot" => "assistant",
"compact" => "system",
_ => continue
};
if let Some(r) = current_role.take() {
if !current_content.is_empty() {
messages.push(serde_json::json!({
"role": r,
"content": current_content.trim()
}));
}
}
current_role = Some(role);
current_content = line[role_end + 1..].trim_start().to_string();
continue;
}
if let Some(_) = current_role {
if !current_content.is_empty() {
current_content.push('\n');
}
current_content.push_str(line);
}
} }
if !context_data.is_empty() {
if let Some(role) = current_role { messages.push(serde_json::json!({
if !current_content.is_empty() { "role": "system",
messages.push(serde_json::json!({ "content": context_data
"role": role, }));
"content": current_content.trim()
}));
}
} }
messages for (role, content) in history {
messages.push(serde_json::json!({
"role": role,
"content": content
}));
}
serde_json::Value::Array(messages)
} }
} }

View file

@ -273,7 +273,7 @@ impl SessionManager {
let mut history: Vec<(String, String)> = Vec::new(); let mut history: Vec<(String, String)> = Vec::new();
for (other_role, content) in messages { for (other_role, content) in messages {
let role_str = match other_role { let role_str = match other_role {
1 => "human".to_string(), 1 => "user".to_string(),
2 => "bot".to_string(), 2 => "bot".to_string(),
3 => "system".to_string(), 3 => "system".to_string(),
9 => "compact".to_string(), 9 => "compact".to_string(),