diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index d4ba33bf..54a99436 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -54,6 +54,8 @@ pub struct DriveMonitor { consecutive_failures: Arc, #[cfg(any(feature = "research", feature = "llm"))] files_being_indexed: Arc>>, + #[cfg(any(feature = "research", feature = "llm"))] + pending_kb_index: Arc>>, } impl DriveMonitor { fn normalize_config_value(value: &str) -> String { @@ -82,6 +84,8 @@ impl DriveMonitor { consecutive_failures: Arc::new(AtomicU32::new(0)), #[cfg(any(feature = "research", feature = "llm"))] files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())), + #[cfg(any(feature = "research", feature = "llm"))] + pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), } } @@ -215,6 +219,125 @@ impl DriveMonitor { Duration::from_secs(backoff_secs.min(MAX_BACKOFF_SECS)) } + /// Start a long-running background KB processor that handles pending indexing requests + /// Only one instance runs per bot - this is spawned once from start_monitoring + #[cfg(any(feature = "research", feature = "llm"))] + pub fn start_kb_processor(&self) { + let kb_manager = Arc::clone(&self.kb_manager); + let bot_id = self.bot_id; + let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name).to_string(); + let work_root = self.work_root.clone(); + let pending_kb_index = Arc::clone(&self.pending_kb_index); + let files_being_indexed = Arc::clone(&self.files_being_indexed); + let file_states = Arc::clone(&self.file_states); + let is_processing = Arc::clone(&self.is_processing); + + tokio::spawn(async move { + trace!("[KB_PROCESSOR] Starting for bot {} (bucket: {})", bot_name, bot_id); + + // Keep running as long as the DriveMonitor is active + while is_processing.load(std::sync::atomic::Ordering::SeqCst) { + // Get one pending KB folder from the queue + let kb_key = { + let pending = pending_kb_index.write().await; + pending.iter().next().cloned() + }; + + let Some(kb_key) = kb_key else { + // Nothing pending, wait and retry + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + }; + + // Parse KB key to get folder name + let parts: Vec<&str> = kb_key.splitn(2, '_').collect(); + if parts.len() < 2 { + let mut pending = pending_kb_index.write().await; + pending.remove(&kb_key); + continue; + } + + let kb_folder_name = parts[1]; + let kb_folder_path = work_root.join(&bot_name).join(format!("{}.gbkb/", bot_name)).join(kb_folder_name); + + // Check if already being indexed + { + let indexing = files_being_indexed.read().await; + if indexing.contains(&kb_key) { + // Already processing, move to next + let mut pending = pending_kb_index.write().await; + pending.remove(&kb_key); + continue; + } + } + + // Mark as being indexed + { + let mut indexing = files_being_indexed.write().await; + indexing.insert(kb_key.clone()); + } + + trace!("[KB_PROCESSOR] Indexing KB: {} for bot: {}", kb_key, bot_name); + + // Perform the actual KB indexing + let result = tokio::time::timeout( + Duration::from_secs(120), + kb_manager.handle_gbkb_change(bot_id, &bot_name, kb_folder_path.as_path()), + ).await; + + // Remove from being indexed + { + let mut indexing = files_being_indexed.write().await; + indexing.remove(&kb_key); + } + + // Remove from pending queue + { + let mut pending = pending_kb_index.write().await; + pending.remove(&kb_key); + } + + match result { + Ok(Ok(_)) => { + trace!("[KB_PROCESSOR] Successfully indexed KB: {}", kb_key); + // Mark files in this KB as indexed + let mut states = file_states.write().await; + for (path, state) in states.iter_mut() { + if path.contains(&format!("{}/", kb_folder_name)) { + state.indexed = true; + state.fail_count = 0; + state.last_failed_at = None; + } + } + } + Ok(Err(e)) => { + warn!("[KB_PROCESSOR] Failed to index KB {}: {}", kb_key, e); + // Update fail count + let mut states = file_states.write().await; + for (path, state) in states.iter_mut() { + if path.contains(&format!("{}/", kb_folder_name)) { + state.fail_count = state.fail_count.saturating_add(1); + state.last_failed_at = Some(chrono::Utc::now()); + } + } + } + Err(_) => { + error!("[KB_PROCESSOR] KB indexing timed out after 120s for {}", kb_key); + let mut states = file_states.write().await; + for (path, state) in states.iter_mut() { + if path.contains(&format!("{}/", kb_folder_name)) { + state.fail_count = state.fail_count.saturating_add(1); + state.last_failed_at = Some(chrono::Utc::now()); + } + } + } + } + } + + trace!("[KB_PROCESSOR] Stopping for bot {}", bot_name); + }); + } + pub async fn start_monitoring(&self) -> Result<(), Box> { trace!("start_monitoring ENTER"); let start_mem = MemoryStats::current(); @@ -248,6 +371,10 @@ impl DriveMonitor { self.is_processing .store(true, std::sync::atomic::Ordering::SeqCst); + // Start the background KB processor - one instance per bot + #[cfg(any(feature = "research", feature = "llm"))] + self.start_kb_processor(); + trace!("start_monitoring: calling check_for_changes..."); trace!("Calling initial check_for_changes..."); @@ -1403,178 +1530,64 @@ impl DriveMonitor { let path_parts: Vec<&str> = path.split('/').collect(); if path_parts.len() >= 3 { let kb_name = path_parts[1]; - let kb_folder_path = self - .work_root - .join(bot_name) - .join(&gbkb_prefix) - .join(kb_name); + let kb_key = format!("{}_{}", bot_name, kb_name); #[cfg(any(feature = "research", feature = "llm"))] { - if !is_embedding_server_ready() { - info!("Embedding server not yet marked ready, KB indexing will wait for it"); - } - - // Create a unique key for this KB folder to track indexing state - let kb_key = format!("{}_{}", bot_name, kb_name); - - // Check fail_count for this KB folder - implement backoff - { - let states = self.file_states.read().await; - let kb_folder_pattern = format!("{}/", kb_name); - - // Filter only files in this KB folder - let kb_states: Vec<_> = states.iter() - .filter(|(path, _)| path.contains(&kb_folder_pattern)) - .collect(); - - if kb_states.is_empty() { - // No files in file_states yet for this KB - proceed with indexing - } else { - let max_fail_count = kb_states.iter() - .map(|(_, s)| s.fail_count) - .max() - .unwrap_or(0); - - // Backoff: wait longer based on fail count - // fail_count 0: no wait, 1: 5min, 2: 15min, 3+: 1h - if max_fail_count > 0 { - let wait_seconds = match max_fail_count { - 1 => 300, // 5 min - 2 => 900, // 15 min - _ => 3600, // 1 hour - }; - - if let Some(last_failed) = kb_states.iter() - .filter_map(|(_, s)| s.last_failed_at) - .max() + // Check fail_count for backoff + let mut should_skip = false; { - let elapsed = chrono::Utc::now() - last_failed; - if elapsed.num_seconds() < wait_seconds { - trace!("[DRIVE_MONITOR] KB folder {} in backoff (fail_count={}, elapsed={}s < {}s), skipping", - kb_key, max_fail_count, elapsed.num_seconds(), wait_seconds); - continue; - } - } - } - } - } - - // Check if this KB folder is already being indexed - { - let indexing_set = self.files_being_indexed.read().await; - if indexing_set.contains(&kb_key) { - debug!("[DRIVE_MONITOR] KB folder {} already being indexed, skipping duplicate task", kb_key); - continue; - } - } - - // Mark this KB folder as being indexed - { - let mut indexing_set = self.files_being_indexed.write().await; - indexing_set.insert(kb_key.clone()); - } - - let kb_manager = Arc::clone(&self.kb_manager); - let bot_id = self.bot_id; - let bot_name_owned = bot_name.to_string(); - let kb_name_owned = kb_name.to_string(); - let kb_folder_owned = kb_folder_path.clone(); - let _files_being_indexed = Arc::clone(&self.files_being_indexed); - let file_key = Arc::clone(&self.files_being_indexed); - let kb_key_owned = kb_key.clone(); - let file_states = Arc::clone(&self.file_states); - let work_root = self.work_root.clone(); - let bucket_name = self.bucket_name.clone(); - let gbkb_prefix_owned = gbkb_prefix.clone(); - let bot_name_for_spawn = bot_name.to_string(); - - tokio::spawn(async move { - trace!( - "Triggering KB indexing for folder: {} (PDF text extraction enabled)", - kb_folder_owned.display() - ); - - let result = tokio::time::timeout( - Duration::from_secs(60), - kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned), - ) - .await; - - // Always remove from tracking set when done, regardless of outcome - { - let mut indexing_set = file_key.write().await; - indexing_set.remove(&kb_key_owned); - } - - let kb_prefix = format!("{}/", gbkb_prefix_owned); - let kb_folder_name = kb_folder_owned - .strip_prefix(&work_root) - .ok() - .and_then(|p| p.to_str()) - .unwrap_or("") - .trim_start_matches(&format!("{}/", bot_name_for_spawn)) - .trim_start_matches(&kb_prefix) - .to_string(); - - { - let mut states = file_states.write().await; - for (path, state) in states.iter_mut() { - if path.starts_with(&format!("{}{}/", kb_prefix, kb_folder_name)) { - match &result { - Ok(Ok(_)) => { - state.indexed = true; - state.fail_count = 0; - state.last_failed_at = None; - } - Ok(Err(_)) | Err(_) => { - state.fail_count = state.fail_count.saturating_add(1); - state.last_failed_at = Some(chrono::Utc::now()); + let states = self.file_states.read().await; + let kb_folder_pattern = format!("{}/", kb_name); + let kb_states: Vec<_> = states.iter() + .filter(|(p, _)| p.contains(&kb_folder_pattern)) + .collect(); + if !kb_states.is_empty() { + let max_fail_count = kb_states.iter() + .map(|(_, s)| s.fail_count) + .max() + .unwrap_or(0); + if max_fail_count > 0 { + let wait_seconds = match max_fail_count { + 1 => 300, + 2 => 900, + _ => 3600, + }; + if let Some(last_failed) = kb_states.iter() + .filter_map(|(_, s)| s.last_failed_at) + .max() + { + let elapsed = chrono::Utc::now() - last_failed; + if elapsed.num_seconds() < wait_seconds { + trace!("[DRIVE_MONITOR] KB {} in backoff (fail_count={}), skipping", + kb_key, max_fail_count); + should_skip = true; + } + } } } } - } - let states_clone = Arc::clone(&file_states); - let work_root_clone = work_root.clone(); - let bucket_name_clone = bucket_name.clone(); - tokio::spawn(async move { - if let Err(e) = Self::save_file_states_static(&states_clone, &work_root_clone, &bucket_name_clone).await { - warn!("Failed to save file states after indexing update: {}", e); - } - }); - } - match result { - Ok(Ok(_)) => { - debug!( - "Successfully processed KB change for {}/{}", - bot_name_owned, kb_name_owned - ); - } - Ok(Err(e)) => { - log::error!( - "Failed to process .gbkb change for {}/{}: {}", - bot_name_owned, - kb_name_owned, - e - ); - } - Err(_) => { - log::error!( - "KB indexing timed out after 60s for {}/{}", - bot_name_owned, kb_name_owned - ); - } - } - }); + if !should_skip { + // Check if already being indexed + let indexing_set = self.files_being_indexed.read().await; + let already_indexing = indexing_set.contains(&kb_key); + drop(indexing_set); + + if !already_indexing { + // Queue for background KB processor - no blocking! + let mut pending = self.pending_kb_index.write().await; + if pending.insert(kb_key.clone()) { + trace!("[DRIVE_MONITOR] Queued KB {} for indexing (non-blocking)", kb_key); + } + } + } } #[cfg(not(any(feature = "research", feature = "llm")))] { - let _ = kb_folder_path; - debug!( - "KB indexing disabled because research/llm features are not enabled" - ); + let _ = kb_name; + debug!("KB indexing disabled (research/llm features not enabled)"); } } }