From f48fa6d5f0f46c8947bb700938c828eff3e01d18 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 12 Apr 2026 07:47:13 -0300 Subject: [PATCH] Add fail_count/last_failed_at to FileState for indexing retries - Skip re-indexing files that failed 3+ times within 1 hour - Update file_states on indexing success (indexed=true, fail_count=0) - Update file_states on indexing failure (fail_count++, last_failed_at=now) - Don't skip KB indexing when embedding server not marked ready yet - Embedding server health will be detected via wait_for_server() in kb_indexer - Remove drive_monitor bypass of embedding check - let kb_indexer handle it --- src/drive/drive_monitor/mod.rs | 78 +++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 7b3303cb..2a7bc61f 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -10,6 +10,7 @@ use crate::core::shared::state::AppState; #[cfg(feature = "drive")] use aws_sdk_s3::Client; +use chrono::{DateTime, Utc}; use log::{debug, error, info, trace, warn}; use std::collections::HashMap; #[cfg(any(feature = "research", feature = "llm"))] @@ -26,12 +27,18 @@ use tokio::fs as tokio_fs; const MAX_BACKOFF_SECS: u64 = 300; const INITIAL_BACKOFF_SECS: u64 = 30; +const RETRY_BACKOFF_SECS: i64 = 3600; +const MAX_FAIL_COUNT: u32 = 3; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FileState { pub etag: String, #[serde(default)] pub indexed: bool, + #[serde(default)] + pub last_failed_at: Option>, + #[serde(default)] + pub fail_count: u32, } #[derive(Debug, Clone)] @@ -504,6 +511,8 @@ impl DriveMonitor { let file_state = FileState { etag: obj.e_tag().unwrap_or_default().to_string(), indexed: false, + last_failed_at: None, + fail_count: 0, }; current_files.insert(path, file_state); } @@ -644,7 +653,7 @@ impl DriveMonitor { } } let mut states = self.file_states.write().await; - states.insert(prompt_state_key, FileState { etag, indexed: false }); + states.insert(prompt_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0 }); drop(states); let file_states_clone = Arc::clone(&self.file_states); let work_root_clone = self.work_root.clone(); @@ -1200,6 +1209,8 @@ impl DriveMonitor { let file_state = FileState { etag: obj.e_tag().unwrap_or_default().to_string(), indexed: false, + last_failed_at: None, + fail_count: 0, }; current_files.insert(path.clone(), file_state); } @@ -1220,6 +1231,20 @@ impl DriveMonitor { .unwrap_or(false); if is_new || is_modified { + if let Some(prev_state) = file_states.get(path) { + if prev_state.fail_count >= MAX_FAIL_COUNT { + let elapsed = Utc::now() + .signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now())); + if elapsed.num_seconds() < RETRY_BACKOFF_SECS { + trace!( + "Skipping {} - fail_count={} (last failed {}s ago, max {}s backoff)", + path, prev_state.fail_count, elapsed.num_seconds(), RETRY_BACKOFF_SECS + ); + continue; + } + } + } + if path.to_lowercase().ends_with(".pdf") { pdf_files_found += 1; trace!( @@ -1270,8 +1295,7 @@ impl DriveMonitor { #[cfg(any(feature = "research", feature = "llm"))] { if !is_embedding_server_ready() { - trace!("Embedding server not ready, deferring KB indexing for {}", kb_folder_path.display()); - continue; + info!("Embedding server not yet marked ready, KB indexing will wait for it"); } // Create a unique key for this KB folder to track indexing state @@ -1300,6 +1324,11 @@ impl DriveMonitor { let _files_being_indexed = Arc::clone(&self.files_being_indexed); let file_key = Arc::clone(&self.files_being_indexed); let kb_key_owned = kb_key.clone(); + let file_states = Arc::clone(&self.file_states); + let work_root = self.work_root.clone(); + let bucket_name = self.bucket_name.clone(); + let gbkb_prefix_owned = gbkb_prefix.clone(); + let bot_name_for_spawn = bot_name.to_string(); tokio::spawn(async move { trace!( @@ -1319,6 +1348,43 @@ impl DriveMonitor { indexing_set.remove(&kb_key_owned); } + let kb_prefix = format!("{}/", gbkb_prefix_owned); + let kb_folder_name = kb_folder_owned + .strip_prefix(&work_root) + .ok() + .and_then(|p| p.to_str()) + .unwrap_or("") + .trim_start_matches(&format!("{}/", bot_name_for_spawn)) + .trim_start_matches(&kb_prefix) + .to_string(); + + { + let mut states = file_states.write().await; + for (path, state) in states.iter_mut() { + if path.starts_with(&format!("{}{}/", kb_prefix, kb_folder_name)) { + match &result { + Ok(Ok(_)) => { + state.indexed = true; + state.fail_count = 0; + state.last_failed_at = None; + } + Ok(Err(_)) | Err(_) => { + state.fail_count = state.fail_count.saturating_add(1); + state.last_failed_at = Some(chrono::Utc::now()); + } + } + } + } + let states_clone = Arc::clone(&file_states); + let work_root_clone = work_root.clone(); + let bucket_name_clone = bucket_name.clone(); + tokio::spawn(async move { + if let Err(e) = Self::save_file_states_static(&states_clone, &work_root_clone, &bucket_name_clone).await { + warn!("Failed to save file states after indexing update: {}", e); + } + }); + } + match result { Ok(Ok(_)) => { debug!( @@ -1375,8 +1441,10 @@ impl DriveMonitor { } for (path, mut state) in current_files { if let Some(previous) = file_states.get(&path) { - if previous.indexed && state.etag == previous.etag { - state.indexed = true; + if state.etag == previous.etag { + state.indexed = previous.indexed; + state.fail_count = previous.fail_count; + state.last_failed_at = previous.last_failed_at; } } file_states.insert(path, state);