From 122c839818ea437b376e1060011b290fc4b33068 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sat, 1 Nov 2025 09:38:15 -0300 Subject: [PATCH] feat: add bot ID to DriveMonitor and simplify config sync - Added bot_id parameter to DriveMonitor::new() and pass it through BotOrchestrator - Modified DriveMonitor to store bot_id as a field - Simplified ConfigManager::sync_gbot_config() to accept content directly instead of file path - Removed file reading logic from sync_gbot_config - Cleaned up logging messages in config sync - Improved error handling and logging in DriveMonitor's gbot check These changes better associate monitoring with specific bots and make the config sync more flexible by accepting content directly rather than requiring a file path. --- src/bot/mod.rs | 6 ++- src/config/mod.rs | 11 ++--- src/drive_monitor/mod.rs | 86 ++++++++++++++++++++++------------------ 3 files changed, 54 insertions(+), 49 deletions(-) diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 0aeb2f0e..6ccc610e 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -89,7 +89,8 @@ impl BotOrchestrator { } } - let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name)); + let bot_id = Uuid::parse_str(&bot_guid)?; + let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name, bot_id)); let _handle = drive_monitor.clone().spawn().await; @@ -136,7 +137,8 @@ impl BotOrchestrator { } } - let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); + let bot_id = Uuid::parse_str(&bot_guid)?; + let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name, bot_id)); let _handle = drive_monitor.clone().spawn().await; diff --git a/src/config/mod.rs b/src/config/mod.rs index 1bed62d4..fcec5d8d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -446,13 +446,9 @@ impl ConfigManager { pub fn sync_gbot_config( &self, bot_id: &uuid::Uuid, - config_path: &str, + content: &str, ) -> Result { use sha2::{Digest, Sha256}; - use std::fs; - - let content = fs::read_to_string(config_path) - .map_err(|e| format!("Failed to read config file: {}", e))?; let mut hasher = Sha256::new(); hasher.update(content.as_bytes()); @@ -483,9 +479,8 @@ impl ConfigManager { } info!( - "Synced {} config values for bot {} from {}", - updated, bot_id, config_path - ); + "Synced {} config values for bot {}", + updated, bot_id); Ok(updated) } } diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 3b4ab605..a1539005 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -22,14 +22,16 @@ pub struct DriveMonitor { state: Arc, bucket_name: String, file_states: Arc>>, + bot_id: uuid::Uuid, } impl DriveMonitor { - pub fn new(state: Arc, bucket_name: String) -> Self { + pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { Self { state, bucket_name, file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + bot_id, } } @@ -171,8 +173,6 @@ impl DriveMonitor { continue; } - - if path.ends_with('/') { continue; } @@ -231,11 +231,7 @@ impl DriveMonitor { Ok(()) } - async fn check_gbot( - &self, - client: &Client, - ) -> Result<(), Box> { - let prefix = ".gbot/"; + async fn check_gbot(&self, client: &Client) -> Result<(), Box> { let mut continuation_token = None; loop { @@ -249,11 +245,11 @@ impl DriveMonitor { for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); let path_parts: Vec<&str> = path.split('/').collect(); - + if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") { continue; } - + if !path.ends_with("config.csv") { continue; } @@ -267,14 +263,21 @@ impl DriveMonitor { .await { Ok(head_res) => { - debug!("HeadObject successful for {}, metadata: {:?}", path, head_res); + debug!( + "HeadObject successful for {}, metadata: {:?}", + path, head_res + ); let response = client .get_object() .bucket(&self.bucket_name) .key(&path) .send() .await?; - debug!("GetObject successful for {}, content length: {}", path, response.content_length().unwrap_or(0)); + debug!( + "GetObject successful for {}, content length: {}", + path, + response.content_length().unwrap_or(0) + ); let bytes = response.body.collect().await?.into_bytes(); debug!("Collected {} bytes for {}", bytes.len(), path); @@ -282,20 +285,15 @@ impl DriveMonitor { .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; debug!("Found {}: {} bytes", path, csv_content.len()); - // Parse config.csv and update bot configuration - let bot_id = path.split('/') - .nth(1) - .and_then(|s| s.strip_suffix(".gbot")) - .and_then(|s| uuid::Uuid::parse_str(s).ok()) - .unwrap_or(uuid::Uuid::nil()); - - if bot_id != uuid::Uuid::nil() { - let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); - if let Err(e) = config_manager.sync_gbot_config(&bot_id, &csv_content) { - error!("Failed to sync config for bot {} {}: {}", path, bot_id, e); - } else { - info!("Successfully synced config for bot {}", bot_id); - } + let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); + if let Err(e) = config_manager.sync_gbot_config(&self.bot_id, &csv_content) + { + error!( + "Failed to sync config for bot {} {}: {}", + path, self.bot_id, e + ); + } else { + info!("Successfully synced config for bot {}", self.bot_id); } } Err(e) => { @@ -318,24 +316,34 @@ impl DriveMonitor { client: &Client, file_path: &str, ) -> Result<(), Box> { - debug!("Fetching object from S3: bucket={}, key={}", &self.bucket_name, file_path); + debug!( + "Fetching object from S3: bucket={}, key={}", + &self.bucket_name, file_path + ); let response = match client .get_object() .bucket(&self.bucket_name) .key(file_path) .send() - .await { - Ok(res) => { - debug!("Successfully fetched object from S3: bucket={}, key={}, size={}", - &self.bucket_name, file_path, res.content_length().unwrap_or(0)); - res - } - Err(e) => { - error!("Failed to fetch object from S3: bucket={}, key={}, error={:?}", - &self.bucket_name, file_path, e); - return Err(e.into()); - } - }; + .await + { + Ok(res) => { + debug!( + "Successfully fetched object from S3: bucket={}, key={}, size={}", + &self.bucket_name, + file_path, + res.content_length().unwrap_or(0) + ); + res + } + Err(e) => { + error!( + "Failed to fetch object from S3: bucket={}, key={}, error={:?}", + &self.bucket_name, file_path, e + ); + return Err(e.into()); + } + }; let bytes = response.body.collect().await?.into_bytes(); let source_content = String::from_utf8(bytes.to_vec())?;