From 9e1370c04fd7b58eb9b56c6382b0020edb0aeee3 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Fri, 7 Nov 2025 16:43:49 -0300 Subject: [PATCH] refactor(bot): limit lock scope for history and tidy progress output - Reordered imports for clarity (chrono and tokio::time::Instant). - Fixed comment indentation around compact automation note. - Refactored session history retrieval to acquire the mutex only briefly, then process compacted message skipping and history limiting outside the lock. - Added explanatory comments for the new lock handling logic. - Cleaned up token progress calculation and display formatting, improving readability of GPU/CPU/TOKENS bars. - Minor formatting adjustments throughout the file. --- src/bot/mod.rs | 136 ++++++++++++++++++++++++++----------------------- 1 file changed, 72 insertions(+), 64 deletions(-) diff --git a/src/bot/mod.rs b/src/bot/mod.rs index f985b55d..bc47981f 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -6,16 +6,16 @@ use crate::shared::models::{BotResponse, Suggestion, UserMessage, UserSession}; use crate::shared::state::AppState; use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_ws::Message as WsMessage; -use chrono::{Utc}; +use chrono::Utc; use diesel::PgConnection; use log::{error, info, trace, warn}; use serde_json; -use tokio::time::Instant; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::Mutex as AsyncMutex; +use tokio::time::Instant; use uuid::Uuid; pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) { @@ -53,7 +53,7 @@ impl BotOrchestrator { }; // Spawn internal automation to run compact prompt every minute if enabled - // Compact automation disabled to avoid Send issues in background task + // Compact automation disabled to avoid Send issues in background task orchestrator } @@ -407,21 +407,25 @@ impl BotOrchestrator { .unwrap_or(-1) }; - let mut sm = self.state.session_manager.lock().await; - let mut history = sm.get_conversation_history(session.id, user_id)?; + // Acquire lock briefly for history retrieval with configurable limit + let history = { + let mut sm = self.state.session_manager.lock().await; + let mut history = sm.get_conversation_history(session.id, user_id)?; - // Skip all messages before the most recent compacted message (type 9) - if let Some(last_compacted_index) = history.iter().rposition(|(role, content)| { - role == "COMPACTED" || content.starts_with("SUMMARY:") - }) { - history = history.split_off(last_compacted_index); - } + // Skip all messages before the most recent compacted message (type 9) + if let Some(last_compacted_index) = history + .iter() + .rposition(|(role, content)| role == "COMPACTED" || content.starts_with("SUMMARY:")) + { + history = history.split_off(last_compacted_index); + } - // Apply history limit if configured - if history_limit > 0 && history.len() > history_limit as usize { - let start = history.len() - history_limit as usize; - history.drain(0..start); - } + if history_limit > 0 && history.len() > history_limit as usize { + let start = history.len() - history_limit as usize; + history.drain(0..start); + } + history + }; let mut prompt = String::new(); if !system_prompt.is_empty() { @@ -560,24 +564,26 @@ impl BotOrchestrator { // Update progress if interval elapsed if last_progress_update.elapsed() >= progress_interval { - let current_tokens = initial_tokens + crate::shared::utils::estimate_token_count(&full_response); + let current_tokens = + initial_tokens + crate::shared::utils::estimate_token_count(&full_response); if let Ok(metrics) = get_system_metrics(current_tokens, max_context_size) { -let gpu_bar = "█".repeat((metrics.gpu_usage.unwrap_or(0.0) / 5.0).round() as usize); -let cpu_bar = "█".repeat((metrics.cpu_usage / 5.0).round() as usize); -let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; -let token_bar = "█".repeat((token_ratio * 20.0).round() as usize); -use std::io::{self, Write}; -print!( - "\rGPU [{:<20}] {:.1}% | CPU [{:<20}] {:.1}% | TOKENS [{:<20}] {}/{}", - gpu_bar, - metrics.gpu_usage.unwrap_or(0.0), - cpu_bar, - metrics.cpu_usage, - token_bar, - current_tokens, - max_context_size -); -io::stdout().flush().unwrap(); + let gpu_bar = + "█".repeat((metrics.gpu_usage.unwrap_or(0.0) / 5.0).round() as usize); + let cpu_bar = "█".repeat((metrics.cpu_usage / 5.0).round() as usize); + let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; + let token_bar = "█".repeat((token_ratio * 20.0).round() as usize); + use std::io::{self, Write}; + print!( + "\rGPU [{:<20}] {:.1}% | CPU [{:<20}] {:.1}% | TOKENS [{:<20}] {}/{}", + gpu_bar, + metrics.gpu_usage.unwrap_or(0.0), + cpu_bar, + metrics.cpu_usage, + token_bar, + current_tokens, + max_context_size + ); + io::stdout().flush().unwrap(); } last_progress_update = Instant::now(); } @@ -603,35 +609,42 @@ io::stdout().flush().unwrap(); } } -trace!( - "Stream processing completed, {} chunks processed", - chunk_count -); + trace!( + "Stream processing completed, {} chunks processed", + chunk_count + ); -// Sum tokens from all p.push context builds before submission -let total_tokens = crate::shared::utils::estimate_token_count(&prompt) - + crate::shared::utils::estimate_token_count(&context_data) - + crate::shared::utils::estimate_token_count(&full_response); -info!("Total tokens (context + prompt + response): {}", total_tokens); + // Sum tokens from all p.push context builds before submission + let total_tokens = crate::shared::utils::estimate_token_count(&prompt) + + crate::shared::utils::estimate_token_count(&context_data) + + crate::shared::utils::estimate_token_count(&full_response); + info!( + "Total tokens (context + prompt + response): {}", + total_tokens + ); -// Trigger compact prompt if enabled -let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); -let compact_enabled = config_manager - .get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "prompt-compact", None) - .unwrap_or_default() - .parse::() - .unwrap_or(0); -if compact_enabled > 0 { -let state = self.state.clone(); -tokio::task::spawn_blocking(move || { - loop { - if let Err(e) = tokio::runtime::Handle::current().block_on(crate::automation::execute_compact_prompt(state.clone())) { - error!("Failed to execute compact prompt: {}", e); + // Trigger compact prompt if enabled + let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); + let compact_enabled = config_manager + .get_config( + &Uuid::parse_str(&message.bot_id).unwrap_or_default(), + "prompt-compact", + None, + ) + .unwrap_or_default() + .parse::() + .unwrap_or(0); + if compact_enabled > 0 { + let state = self.state.clone(); + tokio::task::spawn_blocking(move || loop { + if let Err(e) = tokio::runtime::Handle::current() + .block_on(crate::automation::execute_compact_prompt(state.clone())) + { + error!("Failed to execute compact prompt: {}", e); + } + std::thread::sleep(Duration::from_secs(60)); + }); } - std::thread::sleep(Duration::from_secs(60)); - } -}); -} // Save final message with short lock scope { @@ -1311,8 +1324,3 @@ async fn send_warning_handler( Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"}))) } - ); - } - - Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"}))) -}