feat: add bot ID to DriveMonitor and simplify config sync

- Added bot_id parameter to DriveMonitor::new() and pass it through BotOrchestrator
- Modified DriveMonitor to store bot_id as a field
- Simplified ConfigManager::sync_gbot_config() to accept content directly instead of file path
- Removed file reading logic from sync_gbot_config
- Cleaned up logging messages in config sync
- Improved error handling and logging in DriveMonitor's gbot check

These changes better associate monitoring with specific bots and make the config sync more flexible by accepting content directly rather than requiring a file path.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-01 09:38:15 -03:00
parent 96c4283d29
commit 122c839818
3 changed files with 54 additions and 49 deletions

View file

@ -89,7 +89,8 @@ impl BotOrchestrator {
} }
} }
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name)); let bot_id = Uuid::parse_str(&bot_guid)?;
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name, bot_id));
let _handle = drive_monitor.clone().spawn().await; let _handle = drive_monitor.clone().spawn().await;
@ -136,7 +137,8 @@ impl BotOrchestrator {
} }
} }
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); let bot_id = Uuid::parse_str(&bot_guid)?;
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name, bot_id));
let _handle = drive_monitor.clone().spawn().await; let _handle = drive_monitor.clone().spawn().await;

View file

@ -446,13 +446,9 @@ impl ConfigManager {
pub fn sync_gbot_config( pub fn sync_gbot_config(
&self, &self,
bot_id: &uuid::Uuid, bot_id: &uuid::Uuid,
config_path: &str, content: &str,
) -> Result<usize, String> { ) -> Result<usize, String> {
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::fs;
let content = fs::read_to_string(config_path)
.map_err(|e| format!("Failed to read config file: {}", e))?;
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(content.as_bytes()); hasher.update(content.as_bytes());
@ -483,9 +479,8 @@ impl ConfigManager {
} }
info!( info!(
"Synced {} config values for bot {} from {}", "Synced {} config values for bot {}",
updated, bot_id, config_path updated, bot_id);
);
Ok(updated) Ok(updated)
} }
} }

View file

@ -22,14 +22,16 @@ pub struct DriveMonitor {
state: Arc<AppState>, state: Arc<AppState>,
bucket_name: String, bucket_name: String,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>, file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
bot_id: uuid::Uuid,
} }
impl DriveMonitor { impl DriveMonitor {
pub fn new(state: Arc<AppState>, bucket_name: String) -> Self { pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
Self { Self {
state, state,
bucket_name, bucket_name,
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
bot_id,
} }
} }
@ -171,8 +173,6 @@ impl DriveMonitor {
continue; continue;
} }
if path.ends_with('/') { if path.ends_with('/') {
continue; continue;
} }
@ -231,11 +231,7 @@ impl DriveMonitor {
Ok(()) Ok(())
} }
async fn check_gbot( async fn check_gbot(&self, client: &Client) -> Result<(), Box<dyn Error + Send + Sync>> {
&self,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbot/";
let mut continuation_token = None; let mut continuation_token = None;
loop { loop {
@ -267,14 +263,21 @@ impl DriveMonitor {
.await .await
{ {
Ok(head_res) => { Ok(head_res) => {
debug!("HeadObject successful for {}, metadata: {:?}", path, head_res); debug!(
"HeadObject successful for {}, metadata: {:?}",
path, head_res
);
let response = client let response = client
.get_object() .get_object()
.bucket(&self.bucket_name) .bucket(&self.bucket_name)
.key(&path) .key(&path)
.send() .send()
.await?; .await?;
debug!("GetObject successful for {}, content length: {}", path, response.content_length().unwrap_or(0)); debug!(
"GetObject successful for {}, content length: {}",
path,
response.content_length().unwrap_or(0)
);
let bytes = response.body.collect().await?.into_bytes(); let bytes = response.body.collect().await?.into_bytes();
debug!("Collected {} bytes for {}", bytes.len(), path); debug!("Collected {} bytes for {}", bytes.len(), path);
@ -282,20 +285,15 @@ impl DriveMonitor {
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
debug!("Found {}: {} bytes", path, csv_content.len()); debug!("Found {}: {} bytes", path, csv_content.len());
// Parse config.csv and update bot configuration let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
let bot_id = path.split('/') if let Err(e) = config_manager.sync_gbot_config(&self.bot_id, &csv_content)
.nth(1) {
.and_then(|s| s.strip_suffix(".gbot")) error!(
.and_then(|s| uuid::Uuid::parse_str(s).ok()) "Failed to sync config for bot {} {}: {}",
.unwrap_or(uuid::Uuid::nil()); path, self.bot_id, e
);
if bot_id != uuid::Uuid::nil() { } else {
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); info!("Successfully synced config for bot {}", self.bot_id);
if let Err(e) = config_manager.sync_gbot_config(&bot_id, &csv_content) {
error!("Failed to sync config for bot {} {}: {}", path, bot_id, e);
} else {
info!("Successfully synced config for bot {}", bot_id);
}
} }
} }
Err(e) => { Err(e) => {
@ -318,24 +316,34 @@ impl DriveMonitor {
client: &Client, client: &Client,
file_path: &str, file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
debug!("Fetching object from S3: bucket={}, key={}", &self.bucket_name, file_path); debug!(
"Fetching object from S3: bucket={}, key={}",
&self.bucket_name, file_path
);
let response = match client let response = match client
.get_object() .get_object()
.bucket(&self.bucket_name) .bucket(&self.bucket_name)
.key(file_path) .key(file_path)
.send() .send()
.await { .await
Ok(res) => { {
debug!("Successfully fetched object from S3: bucket={}, key={}, size={}", Ok(res) => {
&self.bucket_name, file_path, res.content_length().unwrap_or(0)); debug!(
res "Successfully fetched object from S3: bucket={}, key={}, size={}",
} &self.bucket_name,
Err(e) => { file_path,
error!("Failed to fetch object from S3: bucket={}, key={}, error={:?}", res.content_length().unwrap_or(0)
&self.bucket_name, file_path, e); );
return Err(e.into()); res
} }
}; Err(e) => {
error!(
"Failed to fetch object from S3: bucket={}, key={}, error={:?}",
&self.bucket_name, file_path, e
);
return Err(e.into());
}
};
let bytes = response.body.collect().await?.into_bytes(); let bytes = response.body.collect().await?.into_bytes();
let source_content = String::from_utf8(bytes.to_vec())?; let source_content = String::from_utf8(bytes.to_vec())?;