diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 0aeb2f0e..6ccc610e 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -89,7 +89,8 @@ impl BotOrchestrator { } } - let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name)); + let bot_id = Uuid::parse_str(&bot_guid)?; + let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name, bot_id)); let _handle = drive_monitor.clone().spawn().await; @@ -136,7 +137,8 @@ impl BotOrchestrator { } } - let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); + let bot_id = Uuid::parse_str(&bot_guid)?; + let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name, bot_id)); let _handle = drive_monitor.clone().spawn().await; diff --git a/src/config/mod.rs b/src/config/mod.rs index 1bed62d4..fcec5d8d 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -446,13 +446,9 @@ impl ConfigManager { pub fn sync_gbot_config( &self, bot_id: &uuid::Uuid, - config_path: &str, + content: &str, ) -> Result { use sha2::{Digest, Sha256}; - use std::fs; - - let content = fs::read_to_string(config_path) - .map_err(|e| format!("Failed to read config file: {}", e))?; let mut hasher = Sha256::new(); hasher.update(content.as_bytes()); @@ -483,9 +479,8 @@ impl ConfigManager { } info!( - "Synced {} config values for bot {} from {}", - updated, bot_id, config_path - ); + "Synced {} config values for bot {}", + updated, bot_id); Ok(updated) } } diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 3b4ab605..a1539005 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -22,14 +22,16 @@ pub struct DriveMonitor { state: Arc, bucket_name: String, file_states: Arc>>, + bot_id: uuid::Uuid, } impl DriveMonitor { - pub fn new(state: Arc, bucket_name: String) -> Self { + pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { Self { state, bucket_name, file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), + bot_id, } } @@ -171,8 +173,6 @@ impl DriveMonitor { continue; } - - if path.ends_with('/') { continue; } @@ -231,11 +231,7 @@ impl DriveMonitor { Ok(()) } - async fn check_gbot( - &self, - client: &Client, - ) -> Result<(), Box> { - let prefix = ".gbot/"; + async fn check_gbot(&self, client: &Client) -> Result<(), Box> { let mut continuation_token = None; loop { @@ -249,11 +245,11 @@ impl DriveMonitor { for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); let path_parts: Vec<&str> = path.split('/').collect(); - + if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") { continue; } - + if !path.ends_with("config.csv") { continue; } @@ -267,14 +263,21 @@ impl DriveMonitor { .await { Ok(head_res) => { - debug!("HeadObject successful for {}, metadata: {:?}", path, head_res); + debug!( + "HeadObject successful for {}, metadata: {:?}", + path, head_res + ); let response = client .get_object() .bucket(&self.bucket_name) .key(&path) .send() .await?; - debug!("GetObject successful for {}, content length: {}", path, response.content_length().unwrap_or(0)); + debug!( + "GetObject successful for {}, content length: {}", + path, + response.content_length().unwrap_or(0) + ); let bytes = response.body.collect().await?.into_bytes(); debug!("Collected {} bytes for {}", bytes.len(), path); @@ -282,20 +285,15 @@ impl DriveMonitor { .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; debug!("Found {}: {} bytes", path, csv_content.len()); - // Parse config.csv and update bot configuration - let bot_id = path.split('/') - .nth(1) - .and_then(|s| s.strip_suffix(".gbot")) - .and_then(|s| uuid::Uuid::parse_str(s).ok()) - .unwrap_or(uuid::Uuid::nil()); - - if bot_id != uuid::Uuid::nil() { - let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); - if let Err(e) = config_manager.sync_gbot_config(&bot_id, &csv_content) { - error!("Failed to sync config for bot {} {}: {}", path, bot_id, e); - } else { - info!("Successfully synced config for bot {}", bot_id); - } + let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); + if let Err(e) = config_manager.sync_gbot_config(&self.bot_id, &csv_content) + { + error!( + "Failed to sync config for bot {} {}: {}", + path, self.bot_id, e + ); + } else { + info!("Successfully synced config for bot {}", self.bot_id); } } Err(e) => { @@ -318,24 +316,34 @@ impl DriveMonitor { client: &Client, file_path: &str, ) -> Result<(), Box> { - debug!("Fetching object from S3: bucket={}, key={}", &self.bucket_name, file_path); + debug!( + "Fetching object from S3: bucket={}, key={}", + &self.bucket_name, file_path + ); let response = match client .get_object() .bucket(&self.bucket_name) .key(file_path) .send() - .await { - Ok(res) => { - debug!("Successfully fetched object from S3: bucket={}, key={}, size={}", - &self.bucket_name, file_path, res.content_length().unwrap_or(0)); - res - } - Err(e) => { - error!("Failed to fetch object from S3: bucket={}, key={}, error={:?}", - &self.bucket_name, file_path, e); - return Err(e.into()); - } - }; + .await + { + Ok(res) => { + debug!( + "Successfully fetched object from S3: bucket={}, key={}, size={}", + &self.bucket_name, + file_path, + res.content_length().unwrap_or(0) + ); + res + } + Err(e) => { + error!( + "Failed to fetch object from S3: bucket={}, key={}, error={:?}", + &self.bucket_name, file_path, e + ); + return Err(e.into()); + } + }; let bytes = response.body.collect().await?.into_bytes(); let source_content = String::from_utf8(bytes.to_vec())?;