diff --git a/.vscode/launch.json b/.vscode/launch.json index 0c6b0aa8..ef0fdff7 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,7 +14,8 @@ }, "args": [], "env": { - "RUST_LOG": "info" + "RUST_LOG": "debug,actix_server=off,hyper_util=off,aws_smithy_runtime=off,aws_smithy_runtime_api=off,tracing=off,aws_sdk_s3=off" + }, "cwd": "${workspaceFolder}" }, diff --git a/src/automation/mod.rs b/src/automation/mod.rs index 2c30ffef..377dc9e2 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -3,8 +3,7 @@ use crate::shared::models::{Automation, TriggerKind}; use crate::shared::state::AppState; use chrono::{DateTime, Datelike, Timelike, Utc}; use diesel::prelude::*; -use log::{error, info, trace, warn}; -use std::env; +use log::{debug, error, info, trace, warn}; use std::path::Path; use std::sync::Arc; use tokio::time::Duration; @@ -176,14 +175,14 @@ impl AutomationService { ); for automation in automations { if let Some(TriggerKind::Scheduled) = TriggerKind::from_i32(automation.kind) { - trace!( + debug!( "Evaluating schedule pattern={:?} for automation {}", automation.schedule, automation.id ); if let Some(pattern) = &automation.schedule { if Self::should_run_cron(pattern, now.timestamp()) { - trace!( + debug!( "Pattern matched; executing automation {} param='{}'", automation.id, automation.param @@ -191,7 +190,7 @@ impl AutomationService { self.execute_action(&automation.param).await; self.update_last_triggered(automation.id).await; } else { - trace!("Pattern did not match for automation {}", automation.id); + debug!("Pattern did not match for automation {}", automation.id); } } } @@ -278,8 +277,7 @@ impl AutomationService { async fn execute_action(&self, param: &str) { trace!("Starting execute_action with param='{}'", param); - let bot_id_string = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); - let bot_id = Uuid::parse_str(&bot_id_string).unwrap_or_else(|_| Uuid::new_v4()); + let (bot_id, _) = crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()); trace!("Resolved bot_id={} for param='{}'", bot_id, param); let redis_key = format!("job:running:{}:{}", bot_id, param); @@ -316,7 +314,34 @@ impl AutomationService { } } - let full_path = Path::new(&self.scripts_dir).join(param); + // Get bot name from database + let bot_name = { + use crate::shared::models::bots; + let mut conn = self.state.conn.lock().unwrap(); + match bots::table + .filter(bots::id.eq(bot_id)) + .select(bots::name) + .first::(&mut *conn) + .optional() + { + Ok(Some(name)) => name, + Ok(None) => { + warn!("No bot found with id {}, using default name", bot_id); + crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1 + } + Err(e) => { + error!("Failed to query bot name: {}", e); + crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1 + } + } + }; + + let path_str = format!("./work/{}.gbai/{}.gbdialog/{}", + bot_name, + bot_name, + param + ); + let full_path = Path::new(&path_str); trace!("Resolved full path: {}", full_path.display()); let script_content = match tokio::fs::read_to_string(&full_path).await { @@ -333,9 +358,8 @@ impl AutomationService { if let Some(client) = &self.state.drive { let bucket_name = format!( - "{}{}.gbai", - env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()), - env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()) + "{}.gbai", + crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).0.to_string() ); let s3_key = format!(".gbdialog/{}", param); diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index c48a6c63..cb15029b 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -162,22 +162,8 @@ pub async fn get_from_bucket( let client = state.drive.as_ref().ok_or("S3 client not configured")?; let bucket_name = { - let cfg = state - .config - .as_ref() - .ok_or_else(|| -> Box { - error!("App configuration missing"); - "App configuration missing".into() - })?; - let org_prefix = &cfg.drive.org_prefix; - - if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') { - error!("Invalid org_prefix: {}", org_prefix); - return Err("Invalid organization prefix".into()); - } - - let bucket = format!("{}default.gbai", org_prefix); + let bucket = format!("default.gbai"); debug!("Resolved bucket name: {}", bucket); bucket }; diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 09746b96..00a9e8dd 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,4 +1,5 @@ use crate::channels::ChannelAdapter; +use crate::config::ConfigManager; use crate::context::langcache::get_langcache_client; use crate::drive_monitor::DriveMonitor; use crate::kb::embeddings::generate_embeddings; @@ -16,7 +17,28 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::Mutex as AsyncMutex; use uuid::Uuid; -use crate::config::ConfigManager; + +pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) { + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + match bots + .filter(is_active.eq(true)) + .select((id, name)) + .first::<(Uuid, String)>(conn) + .optional() + { + Ok(Some((bot_id, bot_name))) => (bot_id, bot_name), + Ok(None) => { + warn!("No active bots found, using nil UUID"); + (Uuid::nil(), "default".to_string()) + } + Err(e) => { + error!("Failed to query default bot: {}", e); + (Uuid::nil(), "default".to_string()) + } + } +} pub struct BotOrchestrator { pub state: Arc, @@ -108,13 +130,11 @@ impl BotOrchestrator { pub async fn create_bot( &self, - bot_guid: &str, + bot_name: &str, ) -> Result<(), Box> { - let bucket_name = format!( - "{}{}.gbai", - self.state.config.as_ref().unwrap().drive.org_prefix, - bot_guid - ); + // TODO: Move logic to here after duplication refactor + + let bucket_name = format!("{}.gbai", bot_name); crate::create_bucket::create_bucket(&bucket_name)?; Ok(()) } @@ -273,8 +293,9 @@ impl BotOrchestrator { "Sending direct message to session {}: '{}'", session_id, content ); + let (bot_id, _) = get_default_bot(&mut self.state.conn.lock().unwrap()); let bot_response = BotResponse { - bot_id: "default_bot".to_string(), + bot_id: bot_id.to_string(), user_id: "default_user".to_string(), session_id: session_id.to_string(), channel: channel.to_string(), @@ -312,13 +333,15 @@ impl BotOrchestrator { "Changing context for session {} to {}", session_id, context_name ); - + let mut session_manager = self.state.session_manager.lock().await; - session_manager.update_session_context( - &Uuid::parse_str(session_id)?, - &Uuid::parse_str(user_id)?, - context_name.to_string() - ).await?; + session_manager + .update_session_context( + &Uuid::parse_str(session_id)?, + &Uuid::parse_str(user_id)?, + context_name.to_string(), + ) + .await?; // Send confirmation back to client let confirmation = BotResponse { @@ -444,25 +467,31 @@ impl BotOrchestrator { // Handle context change messages (type 4) first if message.message_type == 4 { if let Some(context_name) = &message.context_name { - return self.handle_context_change( - &message.user_id, - &message.bot_id, - &message.session_id, - &message.channel, - context_name - ).await; + return self + .handle_context_change( + &message.user_id, + &message.bot_id, + &message.session_id, + &message.channel, + context_name, + ) + .await; } } // Create regular response -let channel = message.channel.clone(); + let channel = message.channel.clone(); let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); let max_context_size = config_manager - .get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "llm-server-ctx-size", None) + .get_config( + &Uuid::parse_str(&message.bot_id).unwrap_or_default(), + "llm-server-ctx-size", + None, + ) .unwrap_or_default() .parse::() .unwrap_or(0); - + let current_context_length = 0usize; let bot_response = BotResponse { @@ -480,14 +509,11 @@ let channel = message.channel.clone(); context_max_length: max_context_size, }; -if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { - adapter.send_message(bot_response).await?; -} else { - warn!( - "No channel adapter found for message channel: {}", - channel - ); -} + if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { + adapter.send_message(bot_response).await?; + } else { + warn!("No channel adapter found for message channel: {}", channel); + } Ok(()) } @@ -654,15 +680,6 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { e })?; - let _bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { - Uuid::parse_str(&bot_guid).map_err(|e| { - warn!("Invalid BOT_GUID from env: {}", e); - e - })? - } else { - warn!("BOT_GUID not set in environment, using nil UUID"); - Uuid::nil() - }; let session = { let mut sm = self.state.session_manager.lock().await; @@ -859,11 +876,15 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); let max_context_size = config_manager - .get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "llm-server-ctx-size", None) + .get_config( + &Uuid::parse_str(&message.bot_id).unwrap_or_default(), + "llm-server-ctx-size", + None, + ) .unwrap_or_default() .parse::() .unwrap_or(0); - + let current_context_length = 0usize; let final_msg = BotResponse { @@ -933,7 +954,6 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { })? }; - let start_script_path = format!("./work/{}.gbai/{}.gbdialog/start.ast", bot_name, bot_name); let start_script = match std::fs::read_to_string(&start_script_path) { @@ -1086,32 +1106,10 @@ pub fn bot_from_url( } } - // Fall back to first available bot - match bots - .filter(is_active.eq(true)) - .select((id, name)) - .first::<(Uuid, String)>(db_conn) - .optional() - { - Ok(Some((first_bot_id, first_bot_name))) => { - log::info!( - "Using first available bot: {} ({})", - first_bot_id, - first_bot_name - ); - Ok((first_bot_id, first_bot_name)) - } - Ok(None) => { - error!("No active bots found in database"); - Err(HttpResponse::ServiceUnavailable() - .json(serde_json::json!({"error": "No bots available"}))) - } - Err(e) => { - error!("Failed to query bots: {}", e); - Err(HttpResponse::InternalServerError() - .json(serde_json::json!({"error": "Failed to query bots"}))) - } - } + // Fall back to default bot + let (bot_id, bot_name) = get_default_bot(db_conn); + log::info!("Using default bot: {} ({})", bot_id, bot_name); + Ok((bot_id, bot_name)) } impl Default for BotOrchestrator { @@ -1388,7 +1386,6 @@ async fn websocket_handler( Ok(res) } - #[actix_web::post("/api/warn")] async fn send_warning_handler( data: web::Data, diff --git a/src/config/mod.rs b/src/config/mod.rs index 033eaed3..9a804573 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -44,7 +44,6 @@ pub struct DriveConfig { pub access_key: String, pub secret_key: String, pub use_ssl: bool, - pub org_prefix: String, } #[derive(Clone)] @@ -196,7 +195,6 @@ impl AppConfig { access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"), secret_key: get_str("DRIVE_SECRET", "minioadmin"), use_ssl: get_bool("DRIVE_USE_SSL", false), - org_prefix: get_str("DRIVE_ORG_PREFIX", "pragmatismo-"), }; let email = EmailConfig { @@ -276,10 +274,7 @@ impl AppConfig { use_ssl: std::env::var("DRIVE_USE_SSL") .unwrap_or_else(|_| "false".to_string()) .parse() - .unwrap_or(false), - org_prefix: std::env::var("DRIVE_ORG_PREFIX") - .unwrap_or_else(|_| "pragmatismo-".to_string()), - }; + .unwrap_or(false) }; let email = EmailConfig { from: std::env::var("EMAIL_FROM").unwrap_or_else(|_| "noreply@example.com".to_string()), @@ -370,7 +365,6 @@ fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> { writeln!(file, "DRIVE_ACCESSKEY={}", drive.access_key)?; writeln!(file, "DRIVE_SECRET={}", drive.secret_key)?; writeln!(file, "DRIVE_USE_SSL={}", drive.use_ssl)?; - writeln!(file, "DRIVE_ORG_PREFIX={}", drive.org_prefix)?; Ok(()) } diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 9ec871db..5154e7d1 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -266,7 +266,7 @@ impl DriveMonitor { .await { Ok(head_res) => { - debug!( + trace!( "HeadObject successful for {}, metadata: {:?}", path, head_res ); @@ -283,10 +283,10 @@ impl DriveMonitor { ); let bytes = response.body.collect().await?.into_bytes(); - debug!("Collected {} bytes for {}", bytes.len(), path); + trace!("Collected {} bytes for {}", bytes.len(), path); let csv_content = String::from_utf8(bytes.to_vec()) .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; - debug!("Found {}: {} bytes", path, csv_content.len()); + trace!("Found {}: {} bytes", path, csv_content.len()); // Restart LLaMA servers only if llm- properties changed let llm_lines: Vec<_> = csv_content diff --git a/src/file/mod.rs b/src/file/mod.rs index fc886489..414b88a5 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -176,7 +176,6 @@ async fn create_s3_client( server: std::env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"), access_key: std::env::var("DRIVE_ACCESS_KEY").expect("DRIVE_ACCESS_KEY not set"), secret_key: std::env::var("DRIVE_SECRET_KEY").expect("DRIVE_SECRET_KEY not set"), - org_prefix: "".to_string(), use_ssl: false, }; Ok(init_drive(&config).await?)