use crate::basic::compiler::BasicCompiler; use crate::config::ConfigManager; use crate::core::kb::KnowledgeBaseManager; use crate::shared::message_types::MessageType; use crate::shared::state::AppState; use aws_sdk_s3::Client; use log::{debug, error, info}; use std::collections::HashMap; use std::error::Error; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::time::{interval, Duration}; #[derive(Debug, Clone)] pub struct FileState { pub etag: String, } #[derive(Debug, Clone)] pub struct DriveMonitor { state: Arc, bucket_name: String, file_states: Arc>>, bot_id: uuid::Uuid, kb_manager: Arc, work_root: PathBuf, is_processing: Arc, } impl DriveMonitor { pub fn new(state: Arc, bucket_name: String, bot_id: uuid::Uuid) -> Self { let work_root = PathBuf::from("work"); let kb_manager = Arc::new(KnowledgeBaseManager::new(work_root.clone())); Self { state, bucket_name, file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), bot_id, kb_manager, work_root, is_processing: Arc::new(AtomicBool::new(false)), } } pub async fn start_monitoring(&self) -> Result<(), Box> { info!("Starting DriveMonitor for bot {}", self.bot_id); self.is_processing .store(true, std::sync::atomic::Ordering::SeqCst); self.check_for_changes().await?; let self_clone = Arc::new(self.clone()); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); while self_clone .is_processing .load(std::sync::atomic::Ordering::SeqCst) { interval.tick().await; if let Err(e) = self_clone.check_for_changes().await { error!("Error during sync for bot {}: {}", self_clone.bot_id, e); } } }); info!("DriveMonitor started for bot {}", self.bot_id); Ok(()) } pub async fn stop_monitoring(&self) -> Result<(), Box> { info!("Stopping DriveMonitor for bot {}", self.bot_id); self.is_processing .store(false, std::sync::atomic::Ordering::SeqCst); self.file_states.write().await.clear(); info!("DriveMonitor stopped for bot {}", self.bot_id); Ok(()) } pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { info!( "Drive Monitor service started for bucket: {}", self.bucket_name ); let mut tick = interval(Duration::from_secs(90)); loop { tick.tick().await; if self.is_processing.load(Ordering::Acquire) { log::warn!( "Drive monitor is still processing previous changes, skipping this tick" ); continue; } self.is_processing.store(true, Ordering::Release); if let Err(e) = self.check_for_changes().await { log::error!("Error checking for drive changes: {}", e); } self.is_processing.store(false, Ordering::Release); } }) } async fn check_for_changes(&self) -> Result<(), Box> { let Some(client) = &self.state.drive else { return Ok(()); }; self.check_gbdialog_changes(client).await?; self.check_gbot(client).await?; self.check_gbkb_changes(client).await?; Ok(()) } async fn check_gbdialog_changes( &self, client: &Client, ) -> Result<(), Box> { let prefix = ".gbdialog/"; let mut current_files = HashMap::new(); let mut continuation_token = None; loop { let list_objects = match tokio::time::timeout( Duration::from_secs(30), client .list_objects_v2() .bucket(self.bucket_name.to_lowercase()) .set_continuation_token(continuation_token) .send(), ) .await { Ok(Ok(list)) => list, Ok(Err(e)) => return Err(e.into()), Err(_) => { log::error!("Timeout listing objects in bucket {}", self.bucket_name); return Ok(()); } }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); let path_parts: Vec<&str> = path.split('/').collect(); if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") { continue; } if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") { continue; } let file_state = FileState { etag: obj.e_tag().unwrap_or_default().to_string(), }; current_files.insert(path, file_state); } if !list_objects.is_truncated.unwrap_or(false) { break; } continuation_token = list_objects.next_continuation_token; } let mut file_states = self.file_states.write().await; for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { if current_state.etag != previous_state.etag { if let Err(e) = self.compile_tool(client, path).await { log::error!("Failed to compile tool {}: {}", path, e); } } } else if let Err(e) = self.compile_tool(client, path).await { log::error!("Failed to compile tool {}: {}", path, e); } } let previous_paths: Vec = file_states .keys() .filter(|k| k.starts_with(prefix)) .cloned() .collect(); for path in previous_paths { if !current_files.contains_key(&path) { file_states.remove(&path); } } for (path, state) in current_files { file_states.insert(path, state); } Ok(()) } async fn check_gbot(&self, client: &Client) -> Result<(), Box> { let config_manager = ConfigManager::new(self.state.conn.clone()); let mut continuation_token = None; loop { let list_objects = match tokio::time::timeout( Duration::from_secs(30), client .list_objects_v2() .bucket(self.bucket_name.to_lowercase()) .set_continuation_token(continuation_token) .send(), ) .await { Ok(Ok(list)) => list, Ok(Err(e)) => return Err(e.into()), Err(_) => { log::error!("Timeout listing objects in bucket {}", self.bucket_name); return Ok(()); } }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); let path_parts: Vec<&str> = path.split('/').collect(); if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") { continue; } let path_lower = path.to_ascii_lowercase(); if !path_lower.ends_with("config.csv") { continue; } match client .head_object() .bucket(&self.bucket_name) .key(&path) .send() .await { Ok(_head_res) => { let response = client .get_object() .bucket(&self.bucket_name) .key(&path) .send() .await?; let bytes = response.body.collect().await?.into_bytes(); let csv_content = String::from_utf8(bytes.to_vec()) .map_err(|e| format!("UTF-8 error in {}: {}", path, e))?; let llm_lines: Vec<_> = csv_content .lines() .filter(|line| line.trim_start().starts_with("llm-")) .collect(); if llm_lines.is_empty() { let _ = config_manager.sync_gbot_config(&self.bot_id, &csv_content); } else { use crate::llm::local::ensure_llama_servers_running; let mut restart_needed = false; for line in llm_lines { let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let key = parts[0].trim(); let new_value = parts[1].trim(); match config_manager.get_config(&self.bot_id, key, None) { Ok(old_value) => { if old_value != new_value { info!( "Detected change in {} (old: {}, new: {})", key, old_value, new_value ); restart_needed = true; } } Err(_) => { restart_needed = true; } } } } let _ = config_manager.sync_gbot_config(&self.bot_id, &csv_content); if restart_needed { if let Err(e) = ensure_llama_servers_running(Arc::clone(&self.state)).await { log::error!("Failed to restart LLaMA servers after llm- config change: {}", e); } } } if csv_content.lines().any(|line| line.starts_with("theme-")) { self.broadcast_theme_change(&csv_content).await?; } } Err(e) => { log::error!("Config file {} not found or inaccessible: {}", path, e); } } } if !list_objects.is_truncated.unwrap_or(false) { break; } continuation_token = list_objects.next_continuation_token; } Ok(()) } async fn broadcast_theme_change( &self, csv_content: &str, ) -> Result<(), Box> { let mut theme_data = serde_json::json!({ "event": "change_theme", "data": {} }); for line in csv_content.lines() { let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let key = parts[0].trim(); let value = parts[1].trim(); match key { "theme-color1" => { theme_data["data"]["color1"] = serde_json::Value::String(value.to_string()); } "theme-color2" => { theme_data["data"]["color2"] = serde_json::Value::String(value.to_string()); } "theme-logo" => { theme_data["data"]["logo_url"] = serde_json::Value::String(value.to_string()); } "theme-title" => { theme_data["data"]["title"] = serde_json::Value::String(value.to_string()); } "theme-logo-text" => { theme_data["data"]["logo_text"] = serde_json::Value::String(value.to_string()); } _ => {} } } } let response_channels = self.state.response_channels.lock().await; for (session_id, tx) in response_channels.iter() { let theme_response = crate::shared::models::BotResponse { bot_id: self.bot_id.to_string(), user_id: "system".to_string(), session_id: session_id.clone(), channel: "web".to_string(), content: serde_json::to_string(&theme_data)?, message_type: MessageType::BOT_RESPONSE, stream_token: None, is_complete: true, suggestions: Vec::new(), context_name: None, context_length: 0, context_max_length: 0, }; let _ = tx.try_send(theme_response); } drop(response_channels); Ok(()) } async fn compile_tool( &self, client: &Client, file_path: &str, ) -> Result<(), Box> { info!( "Fetching object from Drive: bucket={}, key={}", &self.bucket_name, file_path ); let response = match client .get_object() .bucket(&self.bucket_name) .key(file_path) .send() .await { Ok(res) => { info!( "Successfully fetched object from Drive: bucket={}, key={}, size={}", &self.bucket_name, file_path, res.content_length().unwrap_or(0) ); res } Err(e) => { log::error!( "Failed to fetch object from Drive: bucket={}, key={}, error={:?}", &self.bucket_name, file_path, e ); return Err(e.into()); } }; let bytes = response.body.collect().await?.into_bytes(); let source_content = String::from_utf8(bytes.to_vec())?; let tool_name = file_path .rsplit('/') .next() .unwrap_or(file_path) .strip_suffix(".bas") .unwrap_or(file_path) .to_string(); let bot_name = self .bucket_name .strip_suffix(".gbai") .unwrap_or(&self.bucket_name); let work_dir = format!("./work/{}.gbai/{}.gbdialog", bot_name, bot_name); let state_clone = Arc::clone(&self.state); let work_dir_clone = work_dir.clone(); let tool_name_clone = tool_name.clone(); let source_content_clone = source_content.clone(); let bot_id = self.bot_id; tokio::task::spawn_blocking(move || { std::fs::create_dir_all(&work_dir_clone)?; let local_source_path = format!("{}/{}.bas", work_dir_clone, tool_name_clone); std::fs::write(&local_source_path, &source_content_clone)?; let mut compiler = BasicCompiler::new(state_clone, bot_id); let result = compiler.compile_file(&local_source_path, &work_dir_clone)?; if let Some(mcp_tool) = result.mcp_tool { info!( "MCP tool definition generated with {} parameters", mcp_tool.input_schema.properties.len() ); } Ok::<(), Box>(()) }) .await??; Ok(()) } async fn check_gbkb_changes( &self, client: &Client, ) -> Result<(), Box> { let bot_name = self .bucket_name .strip_suffix(".gbai") .unwrap_or(&self.bucket_name); let gbkb_prefix = format!("{}.gbkb/", bot_name); let mut current_files = HashMap::new(); let mut continuation_token = None; let mut files_processed = 0; let mut files_to_process = Vec::new(); let mut pdf_files_found = 0; loop { let list_objects = match tokio::time::timeout( Duration::from_secs(30), client .list_objects_v2() .bucket(self.bucket_name.to_lowercase()) .prefix(&gbkb_prefix) .set_continuation_token(continuation_token) .send(), ) .await { Ok(Ok(list)) => list, Ok(Err(e)) => return Err(e.into()), Err(_) => { log::error!( "Timeout listing .gbkb objects in bucket {}", self.bucket_name ); return Ok(()); } }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); if path.ends_with('/') { continue; } let file_state = FileState { etag: obj.e_tag().unwrap_or_default().to_string(), }; current_files.insert(path.clone(), file_state); } if !list_objects.is_truncated.unwrap_or(false) { break; } continuation_token = list_objects.next_continuation_token; } let mut file_states = self.file_states.write().await; for (path, current_state) in current_files.iter() { let is_new = !file_states.contains_key(path); let is_modified = file_states .get(path) .map(|prev| prev.etag != current_state.etag) .unwrap_or(false); if is_new || is_modified { if path.to_lowercase().ends_with(".pdf") { pdf_files_found += 1; info!( "Detected {} PDF in .gbkb: {} (will extract text for vectordb)", if is_new { "new" } else { "changed" }, path ); } else { info!( "Detected {} in .gbkb: {}", if is_new { "new file" } else { "change" }, path ); } files_to_process.push(path.clone()); files_processed += 1; if files_to_process.len() >= 10 { for file_path in std::mem::take(&mut files_to_process) { if let Err(e) = self.download_gbkb_file(client, &file_path).await { log::error!("Failed to download .gbkb file {}: {}", file_path, e); } } tokio::time::sleep(Duration::from_millis(100)).await; } let path_parts: Vec<&str> = path.split('/').collect(); if path_parts.len() >= 2 { let kb_name = path_parts[1]; let kb_folder_path = self .work_root .join(bot_name) .join(&gbkb_prefix) .join(kb_name); info!( "Triggering KB indexing for folder: {} (PDF text extraction enabled)", kb_folder_path.display() ); match self .kb_manager .handle_gbkb_change(bot_name, &kb_folder_path) .await { Ok(_) => { debug!( "Successfully processed KB change for {}/{}", bot_name, kb_name ); } Err(e) => { log::error!( "Failed to process .gbkb change for {}/{}: {}", bot_name, kb_name, e ); } } } } } let paths_to_remove: Vec = file_states .keys() .filter(|path| path.starts_with(&gbkb_prefix) && !current_files.contains_key(*path)) .cloned() .collect(); for file_path in files_to_process { if let Err(e) = self.download_gbkb_file(client, &file_path).await { log::error!("Failed to download .gbkb file {}: {}", file_path, e); } } if files_processed > 0 { info!( "Processed {} .gbkb files (including {} PDFs for text extraction)", files_processed, pdf_files_found ); } for (path, state) in current_files { file_states.insert(path, state); } for path in paths_to_remove { info!("Detected deletion in .gbkb: {}", path); file_states.remove(&path); let path_parts: Vec<&str> = path.split('/').collect(); if path_parts.len() >= 2 { let kb_name = path_parts[1]; let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name); if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) { if let Err(e) = self.kb_manager.clear_kb(bot_name, kb_name).await { log::error!("Failed to clear KB {}: {}", kb_name, e); } } } } Ok(()) } async fn download_gbkb_file( &self, client: &Client, file_path: &str, ) -> Result<(), Box> { let bot_name = self .bucket_name .strip_suffix(".gbai") .unwrap_or(&self.bucket_name); let local_path = self.work_root.join(bot_name).join(file_path); if file_path.to_lowercase().ends_with(".pdf") { debug!("Downloading PDF file for text extraction: {}", file_path); } if let Some(parent) = local_path.parent() { tokio::fs::create_dir_all(parent).await?; } let response = client .get_object() .bucket(&self.bucket_name) .key(file_path) .send() .await?; let bytes = response.body.collect().await?.into_bytes(); tokio::fs::write(&local_path, bytes).await?; info!( "Downloaded .gbkb file {} to {}", file_path, local_path.display() ); Ok(()) } }