refactor(bot): limit lock scope for history and tidy progress output

- Reordered imports for clarity (chrono and tokio::time::Instant).
- Fixed comment indentation around compact automation note.
- Refactored session history retrieval to acquire the mutex only briefly, then process compacted message skipping and history limiting outside the lock.
- Added explanatory comments for the new lock handling logic.
- Cleaned up token progress calculation and display formatting, improving readability of GPU/CPU/TOKENS bars.
- Minor formatting adjustments throughout the file.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-07 16:43:49 -03:00
parent 120d06a0db
commit 9e1370c04f

View file

@ -6,16 +6,16 @@ use crate::shared::models::{BotResponse, Suggestion, UserMessage, UserSession};
use crate::shared::state::AppState; use crate::shared::state::AppState;
use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage; use actix_ws::Message as WsMessage;
use chrono::{Utc}; use chrono::Utc;
use diesel::PgConnection; use diesel::PgConnection;
use log::{error, info, trace, warn}; use log::{error, info, trace, warn};
use serde_json; use serde_json;
use tokio::time::Instant;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::Mutex as AsyncMutex; use tokio::sync::Mutex as AsyncMutex;
use tokio::time::Instant;
use uuid::Uuid; use uuid::Uuid;
pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) { pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) {
@ -53,7 +53,7 @@ impl BotOrchestrator {
}; };
// Spawn internal automation to run compact prompt every minute if enabled // Spawn internal automation to run compact prompt every minute if enabled
// Compact automation disabled to avoid Send issues in background task // Compact automation disabled to avoid Send issues in background task
orchestrator orchestrator
} }
@ -407,21 +407,25 @@ impl BotOrchestrator {
.unwrap_or(-1) .unwrap_or(-1)
}; };
let mut sm = self.state.session_manager.lock().await; // Acquire lock briefly for history retrieval with configurable limit
let mut history = sm.get_conversation_history(session.id, user_id)?; let history = {
let mut sm = self.state.session_manager.lock().await;
let mut history = sm.get_conversation_history(session.id, user_id)?;
// Skip all messages before the most recent compacted message (type 9) // Skip all messages before the most recent compacted message (type 9)
if let Some(last_compacted_index) = history.iter().rposition(|(role, content)| { if let Some(last_compacted_index) = history
role == "COMPACTED" || content.starts_with("SUMMARY:") .iter()
}) { .rposition(|(role, content)| role == "COMPACTED" || content.starts_with("SUMMARY:"))
history = history.split_off(last_compacted_index); {
} history = history.split_off(last_compacted_index);
}
// Apply history limit if configured if history_limit > 0 && history.len() > history_limit as usize {
if history_limit > 0 && history.len() > history_limit as usize { let start = history.len() - history_limit as usize;
let start = history.len() - history_limit as usize; history.drain(0..start);
history.drain(0..start); }
} history
};
let mut prompt = String::new(); let mut prompt = String::new();
if !system_prompt.is_empty() { if !system_prompt.is_empty() {
@ -560,24 +564,26 @@ impl BotOrchestrator {
// Update progress if interval elapsed // Update progress if interval elapsed
if last_progress_update.elapsed() >= progress_interval { if last_progress_update.elapsed() >= progress_interval {
let current_tokens = initial_tokens + crate::shared::utils::estimate_token_count(&full_response); let current_tokens =
initial_tokens + crate::shared::utils::estimate_token_count(&full_response);
if let Ok(metrics) = get_system_metrics(current_tokens, max_context_size) { if let Ok(metrics) = get_system_metrics(current_tokens, max_context_size) {
let gpu_bar = "".repeat((metrics.gpu_usage.unwrap_or(0.0) / 5.0).round() as usize); let gpu_bar =
let cpu_bar = "".repeat((metrics.cpu_usage / 5.0).round() as usize); "".repeat((metrics.gpu_usage.unwrap_or(0.0) / 5.0).round() as usize);
let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64; let cpu_bar = "".repeat((metrics.cpu_usage / 5.0).round() as usize);
let token_bar = "".repeat((token_ratio * 20.0).round() as usize); let token_ratio = current_tokens as f64 / max_context_size.max(1) as f64;
use std::io::{self, Write}; let token_bar = "".repeat((token_ratio * 20.0).round() as usize);
print!( use std::io::{self, Write};
"\rGPU [{:<20}] {:.1}% | CPU [{:<20}] {:.1}% | TOKENS [{:<20}] {}/{}", print!(
gpu_bar, "\rGPU [{:<20}] {:.1}% | CPU [{:<20}] {:.1}% | TOKENS [{:<20}] {}/{}",
metrics.gpu_usage.unwrap_or(0.0), gpu_bar,
cpu_bar, metrics.gpu_usage.unwrap_or(0.0),
metrics.cpu_usage, cpu_bar,
token_bar, metrics.cpu_usage,
current_tokens, token_bar,
max_context_size current_tokens,
); max_context_size
io::stdout().flush().unwrap(); );
io::stdout().flush().unwrap();
} }
last_progress_update = Instant::now(); last_progress_update = Instant::now();
} }
@ -603,35 +609,42 @@ io::stdout().flush().unwrap();
} }
} }
trace!( trace!(
"Stream processing completed, {} chunks processed", "Stream processing completed, {} chunks processed",
chunk_count chunk_count
); );
// Sum tokens from all p.push context builds before submission // Sum tokens from all p.push context builds before submission
let total_tokens = crate::shared::utils::estimate_token_count(&prompt) let total_tokens = crate::shared::utils::estimate_token_count(&prompt)
+ crate::shared::utils::estimate_token_count(&context_data) + crate::shared::utils::estimate_token_count(&context_data)
+ crate::shared::utils::estimate_token_count(&full_response); + crate::shared::utils::estimate_token_count(&full_response);
info!("Total tokens (context + prompt + response): {}", total_tokens); info!(
"Total tokens (context + prompt + response): {}",
total_tokens
);
// Trigger compact prompt if enabled // Trigger compact prompt if enabled
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
let compact_enabled = config_manager let compact_enabled = config_manager
.get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "prompt-compact", None) .get_config(
.unwrap_or_default() &Uuid::parse_str(&message.bot_id).unwrap_or_default(),
.parse::<i32>() "prompt-compact",
.unwrap_or(0); None,
if compact_enabled > 0 { )
let state = self.state.clone(); .unwrap_or_default()
tokio::task::spawn_blocking(move || { .parse::<i32>()
loop { .unwrap_or(0);
if let Err(e) = tokio::runtime::Handle::current().block_on(crate::automation::execute_compact_prompt(state.clone())) { if compact_enabled > 0 {
error!("Failed to execute compact prompt: {}", e); let state = self.state.clone();
tokio::task::spawn_blocking(move || loop {
if let Err(e) = tokio::runtime::Handle::current()
.block_on(crate::automation::execute_compact_prompt(state.clone()))
{
error!("Failed to execute compact prompt: {}", e);
}
std::thread::sleep(Duration::from_secs(60));
});
} }
std::thread::sleep(Duration::from_secs(60));
}
});
}
// Save final message with short lock scope // Save final message with short lock scope
{ {
@ -1311,8 +1324,3 @@ async fn send_warning_handler(
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"}))) Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"})))
} }
);
}
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"})))
}