refactor: drive monitor to use postgres instead of json
All checks were successful
BotServer CI/CD / build (push) Successful in 7m19s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-14 15:31:30 -03:00
parent 45d5a444eb
commit c9d5bf361a
2 changed files with 307 additions and 293 deletions

View file

@ -180,4 +180,176 @@ impl DriveFileRepository {
.load(&mut conn)
.unwrap_or_default()
}
pub fn delete_file(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::delete(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.eq(file_path)),
)
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn get_all_files_for_bot(&self, bot_id: Uuid) -> Vec<DriveFile> {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return vec![],
};
drive_files::table
.filter(drive_files::bot_id.eq(bot_id))
.load(&mut conn)
.unwrap_or_default()
}
pub fn get_files_by_type(&self, bot_id: Uuid, file_type: &str) -> Vec<DriveFile> {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return vec![],
};
drive_files::table
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_type.eq(file_type)),
)
.load(&mut conn)
.unwrap_or_default()
}
/// Check if a file exists for the given bot and path
pub fn has_file(&self, bot_id: Uuid, file_path: &str) -> bool {
self.get_file_state(bot_id, file_path).is_some()
}
/// Upsert a file with full state (including indexed and fail_count)
pub fn upsert_file_full(
&self,
bot_id: Uuid,
file_path: &str,
file_type: &str,
etag: Option<String>,
last_modified: Option<DateTime<Utc>>,
indexed: bool,
fail_count: i32,
last_failed_at: Option<DateTime<Utc>>,
) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
let now = Utc::now();
diesel::insert_into(drive_files::table)
.values((
drive_files::bot_id.eq(bot_id),
drive_files::file_path.eq(file_path),
drive_files::file_type.eq(file_type),
drive_files::etag.eq(&etag),
drive_files::last_modified.eq(last_modified),
drive_files::indexed.eq(indexed),
drive_files::fail_count.eq(fail_count),
drive_files::last_failed_at.eq(last_failed_at),
drive_files::created_at.eq(now),
drive_files::updated_at.eq(now),
))
.on_conflict((drive_files::bot_id, drive_files::file_path))
.do_update()
.set((
drive_files::etag.eq(&etag),
drive_files::last_modified.eq(last_modified),
drive_files::indexed.eq(indexed),
drive_files::fail_count.eq(fail_count),
drive_files::last_failed_at.eq(last_failed_at),
drive_files::updated_at.eq(now),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
/// Mark all files matching a path pattern as indexed (for KB folder indexing)
pub fn mark_indexed_by_pattern(&self, bot_id: Uuid, pattern: &str) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::update(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.like(format!("%{pattern}%"))),
)
.set((
drive_files::indexed.eq(true),
drive_files::fail_count.eq(0),
drive_files::last_failed_at.eq(None::<DateTime<Utc>>),
drive_files::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
/// Mark all files matching a path pattern as failed (increment fail_count)
pub fn mark_failed_by_pattern(&self, bot_id: Uuid, pattern: &str) -> Result<(), String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::update(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.like(format!("%{pattern}%"))),
)
.set((
drive_files::fail_count.eq(sql("fail_count + 1")),
drive_files::last_failed_at.eq(Some(Utc::now())),
drive_files::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.map_err(|e| e.to_string())?;
Ok(())
}
/// Get all files for a bot whose path starts with the given prefix
pub fn get_files_by_prefix(&self, bot_id: Uuid, prefix: &str) -> Vec<DriveFile> {
let mut conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return vec![],
};
drive_files::table
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.like(format!("{prefix}%"))),
)
.load(&mut conn)
.unwrap_or_default()
}
/// Delete all files for a bot whose path starts with the given prefix
pub fn delete_by_prefix(&self, bot_id: Uuid, prefix: &str) -> Result<usize, String> {
let mut conn = self.pool.get().map_err(|e| e.to_string())?;
diesel::delete(drive_files::table)
.filter(
drive_files::bot_id
.eq(bot_id)
.and(drive_files::file_path.like(format!("{prefix}%"))),
)
.execute(&mut conn)
.map_err(|e| e.to_string())
}
/// Check if any files exist with the given prefix
pub fn has_files_with_prefix(&self, bot_id: Uuid, prefix: &str) -> bool {
!self.get_files_by_prefix(bot_id, prefix).is_empty()
}
}

View file

@ -20,10 +20,7 @@ use std::sync::Arc;
#[cfg(any(feature = "research", feature = "llm"))]
use tokio::sync::RwLock as TokioRwLock;
use tokio::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::fs as tokio_fs;
#[cfg(any(feature = "research", feature = "llm"))]
use crate::drive::drive_files::DriveFileRepository;
#[cfg(any(feature = "research", feature = "llm"))]
@ -42,30 +39,16 @@ pub fn is_llm_streaming() -> bool {
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 30;
const RETRY_BACKOFF_SECS: i64 = 3600;
const MAX_FAIL_COUNT: u32 = 3;
const MAX_FAIL_COUNT: i32 = 3;
fn normalize_etag(etag: &str) -> String {
etag.trim_matches('"').to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileState {
pub etag: String,
#[serde(default)]
pub indexed: bool,
#[serde(default)]
pub last_failed_at: Option<DateTime<Utc>>,
#[serde(default)]
pub fail_count: u32,
#[serde(default)]
pub last_modified: Option<String>,
}
#[derive(Debug, Clone)]
pub struct DriveMonitor {
state: Arc<AppState>,
bucket_name: String,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
bot_id: uuid::Uuid,
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager: Arc<KnowledgeBaseManager>,
@ -81,7 +64,7 @@ pub struct DriveMonitor {
kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(not(any(feature = "research", feature = "llm")))]
_pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
// Database-backed file state repository
// Database-backed file state repository (replaces JSON file_states)
file_repo: Arc<DriveFileRepository>,
}
impl DriveMonitor {
@ -94,7 +77,7 @@ impl DriveMonitor {
}
}
pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
let work_root = PathBuf::from(crate::core::shared::utils::get_work_path());
#[cfg(any(feature = "research", feature = "llm"))]
let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(work_root.clone(), state.conn.clone(), bot_id));
@ -105,7 +88,6 @@ impl DriveMonitor {
Self {
state,
bucket_name,
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
bot_id,
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager,
@ -125,98 +107,6 @@ impl DriveMonitor {
}
}
/// Get the path to the file states JSON file for this bot
fn file_state_path(&self) -> PathBuf {
self.work_root
.join(&self.bucket_name)
.join("file_states.json")
}
/// Load file states from disk to avoid reprocessing unchanged files
async fn load_file_states(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let path = self.file_state_path();
if path.exists() {
match tokio_fs::read_to_string(&path).await {
Ok(content) => {
match serde_json::from_str::<HashMap<String, FileState>>(&content) {
Ok(states) => {
let mut file_states = self.file_states.write().await;
let count = states.len();
*file_states = states;
info!(
"Loaded {} file states from disk for bot {}",
count,
self.bot_id
);
}
Err(e) => {
warn!(
"Failed to parse file states from {}: {}. Starting with empty state.",
path.display(),
e
);
}
}
}
Err(e) => {
warn!(
"Failed to read file states from {}: {}. Starting with empty state.",
path.display(),
e
);
}
}
}
Ok(())
}
/// Static helper to save file states (used by background tasks)
async fn save_file_states_static(
file_states: &Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
work_root: &PathBuf,
bucket_name: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let path = work_root
.join(bucket_name)
.join("file_states.json");
if let Some(parent) = path.parent() {
if let Err(e) = tokio_fs::create_dir_all(parent).await {
warn!(
"Failed to create directory for file states: {} - {}",
parent.display(),
e
);
}
}
let states = file_states.read().await;
match serde_json::to_string_pretty(&*states) {
Ok(content) => {
if let Err(e) = tokio_fs::write(&path, content).await {
warn!(
"Failed to save file states to {}: {}",
path.display(),
e
);
} else {
debug!(
"Saved {} file states to disk for bucket {}",
states.len(),
bucket_name
);
}
}
Err(e) => {
warn!(
"Failed to serialize file states: {}",
e
);
}
}
Ok(())
}
async fn check_drive_health(&self) -> bool {
let Some(client) = &self.state.drive else {
return false;
@ -261,7 +151,7 @@ impl DriveMonitor {
Arc::clone(&self.pending_kb_index),
Arc::clone(&self.files_being_indexed),
Arc::clone(&self.kb_indexed_folders),
Arc::clone(&self.file_states),
Arc::clone(&self.file_repo),
Arc::clone(&self.is_processing),
);
}
@ -275,7 +165,7 @@ impl DriveMonitor {
pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
kb_indexed_folders: Arc<TokioRwLock<HashSet<String>>>,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
file_repo: Arc<DriveFileRepository>,
is_processing: Arc<AtomicBool>,
) {
tokio::spawn(async move {
@ -348,34 +238,23 @@ match result {
let mut indexed = kb_indexed_folders.write().await;
indexed.insert(kb_key.clone());
}
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.indexed = true;
state.fail_count = 0;
state.last_failed_at = None;
}
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_indexed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files indexed for {}: {}", kb_key, e);
}
}
Ok(Err(e)) => {
warn!("Failed to index KB {}: {}", kb_key, e);
// Update fail count
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.fail_count = state.fail_count.saturating_add(1);
state.last_failed_at = Some(chrono::Utc::now());
}
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
Err(_) => {
error!("KB indexing timed out after 120s for {}", kb_key);
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.fail_count = state.fail_count.saturating_add(1);
state.last_failed_at = Some(chrono::Utc::now());
}
let pattern = format!("{}/", kb_folder_name);
if let Err(e) = file_repo.mark_failed_by_pattern(bot_id, &pattern) {
warn!("Failed to mark files failed for {}: {}", kb_key, e);
}
}
}
@ -404,13 +283,7 @@ match result {
return Ok(());
}
// Load file states from disk to avoid reprocessing unchanged files
if let Err(e) = self.load_file_states().await {
warn!(
"Failed to load file states for bot {}: {}",
self.bot_id, e
);
}
// File states are now loaded from DB on demand - no need to load from disk
if !self.check_drive_health().await {
warn!(
@ -466,11 +339,7 @@ match result {
// Smart sleep based on fail_count - prevent excessive retries
{
let states = self_clone.file_states.read().await;
let max_fail_count = states.values()
.map(|s| s.fail_count)
.max()
.unwrap_or(0);
let max_fail_count = self_clone.file_repo.get_max_fail_count(self_clone.bot_id);
let base_sleep = if max_fail_count >= 3 {
3600
@ -486,8 +355,7 @@ match result {
debug!("Sleep {}s based on fail_count={}", base_sleep, max_fail_count);
}
drop(states);
tokio::time::sleep(Duration::from_secs(base_sleep)).await;
tokio::time::sleep(Duration::from_secs(base_sleep as u64)).await;
}
// Skip drive health check - just proceed with monitoring
@ -553,7 +421,6 @@ match result {
self.is_processing
.store(false, std::sync::atomic::Ordering::SeqCst);
self.file_states.write().await.clear();
self.consecutive_failures.store(0, Ordering::Relaxed);
Ok(())
@ -698,14 +565,11 @@ match result {
if path.ends_with('/') || !path.to_ascii_lowercase().ends_with(".bas") {
continue;
}
let file_state = FileState {
etag: normalize_etag(obj.e_tag().unwrap_or_default()),
indexed: false,
last_failed_at: None,
fail_count: 0,
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
let etag = normalize_etag(obj.e_tag().unwrap_or_default());
let last_modified = obj.last_modified().and_then(|dt| {
DateTime::parse_from_rfc3339(&dt.to_string()).ok().map(|d| d.with_timezone(&Utc))
});
current_files.insert(path, (etag, last_modified));
}
if !list_objects.is_truncated.unwrap_or(false) {
break;
@ -713,14 +577,12 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
continuation_token = list_objects.next_continuation_token;
}
// First pass: identify which files need compilation
// We must do this BEFORE acquiring the write lock to avoid deadlock
let files_to_compile: Vec<String> = {
let file_states = self.file_states.read().await;
current_files
.iter()
.filter_map(|(path, current_state)| {
if let Some(previous_state) = file_states.get(path) {
if current_state.etag != previous_state.etag {
.filter_map(|(path, (current_etag, _))| {
if let Some(prev) = self.file_repo.get_file_state(self.bot_id, path) {
if prev.etag.as_deref() != Some(current_etag.as_str()) {
Some(path.clone())
} else {
None
@ -746,16 +608,10 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
}
}
// Now acquire write lock to merge current_files into file_states
let mut file_states = self.file_states.write().await;
// Remove files that no longer exist (deleted from MinIO)
let previous_paths: Vec<String> = file_states
.keys()
.cloned()
.collect();
for path in previous_paths {
if !current_files.contains_key(&path) {
let previous_files = self.file_repo.get_files_by_type(self.bot_id, "gbdialog");
for prev_file in &previous_files {
if !current_files.contains_key(&prev_file.file_path) {
// Delete the compiled .ast file from disk
let bot_name = self
.bucket_name
@ -763,7 +619,7 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
.unwrap_or(&self.bucket_name);
let ast_path = self.work_root
.join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name))
.join(PathBuf::from(&path).file_name().unwrap_or_default().to_str().unwrap_or(""))
.join(PathBuf::from(&prev_file.file_path).file_name().unwrap_or_default().to_str().unwrap_or(""))
.with_extension("ast");
if ast_path.exists() {
@ -785,49 +641,53 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
}
}
file_states.remove(&path);
if let Err(e) = self.file_repo.delete_file(self.bot_id, &prev_file.file_path) {
warn!("Failed to delete file state for {}: {}", prev_file.file_path, e);
}
}
}
// Merge current_files into file_states
// Merge current_files into DB via file_repo
// For each file in current_files:
// - If compilation succeeded: set indexed=true
// - If compilation failed: preserve previous indexed status, increment fail_count
// - If unchanged: preserve existing state (including indexed status)
// - If new and not compiled: add with default state (indexed=false)
for (path, mut new_state) in current_files {
if successful_compilations.contains(&path) {
for (path, (etag, last_modified)) in &current_files {
if successful_compilations.contains(path) {
// Compilation succeeded - mark as indexed
new_state.indexed = true;
new_state.fail_count = 0;
new_state.last_failed_at = None;
} else if let Some(prev_state) = file_states.get(&path) {
if prev_state.etag == new_state.etag {
// File unchanged - preserve all previous state
new_state.indexed = prev_state.indexed;
new_state.fail_count = prev_state.fail_count;
new_state.last_failed_at = prev_state.last_failed_at;
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, path, "gbdialog",
Some(etag.clone()), *last_modified,
true, 0, None,
) {
warn!("Failed to upsert file {}: {}", path, e);
}
} else if let Some(prev) = self.file_repo.get_file_state(self.bot_id, path) {
let etag_unchanged = prev.etag.as_deref() == Some(etag.as_str());
if etag_unchanged {
// File unchanged - preserve all previous state (already in DB)
} else {
// File changed but compilation failed - preserve previous state
// Keep previous indexed status, increment fail_count
new_state.indexed = prev_state.indexed;
new_state.fail_count = prev_state.fail_count + 1;
new_state.last_failed_at = Some(chrono::Utc::now());
// File changed but compilation failed - increment fail_count
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, path, "gbdialog",
Some(etag.clone()), *last_modified,
prev.indexed, prev.fail_count + 1, Some(Utc::now()),
) {
warn!("Failed to upsert file {}: {}", path, e);
}
}
} else {
// New file where compilation failed: indexed=false
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, path, "gbdialog",
Some(etag.clone()), *last_modified,
false, 0, None,
) {
warn!("Failed to upsert file {}: {}", path, e);
}
}
// For new files where compilation failed: indexed remains false
file_states.insert(path, new_state);
}
// Save file states to disk in background to avoid blocking
// Use static helper to avoid double Arc (fixes "dispatch failure" error)
let file_states_clone = Arc::clone(&self.file_states);
let work_root_clone = self.work_root.clone();
let bucket_name_clone = self.bucket_name.clone();
tokio::spawn(async move {
if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await {
warn!("Failed to save file states: {}", e);
}
});
Ok(())
}
async fn check_gbot(&self, client: &Client) -> Result<(), Box<dyn Error + Send + Sync>> {
@ -885,12 +745,9 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
// Check etag to avoid re-downloading unchanged prompt files
let etag = normalize_etag(obj.e_tag().unwrap_or_default());
let prompt_state_key = format!("__prompt__{}", path);
let should_download = {
let states = self.file_states.read().await;
match states.get(&prompt_state_key) {
Some(prev) => prev.etag != etag,
None => true,
}
let should_download = match self.file_repo.get_file_state(self.bot_id, &prompt_state_key) {
Some(prev) => prev.etag.as_deref() != Some(&etag),
None => true,
};
if should_download {
match client.get_object().bucket(&self.bucket_name).key(&path).send().await {
@ -922,17 +779,12 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
log::error!("Failed to download prompt file {}: {}", path, e);
}
}
let mut states = self.file_states.write().await;
states.insert(prompt_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0, last_modified: None });
drop(states);
let file_states_clone = Arc::clone(&self.file_states);
let work_root_clone = self.work_root.clone();
let bucket_name_clone = self.bucket_name.clone();
tokio::spawn(async move {
if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await {
warn!("Failed to save file states after prompt update: {}", e);
}
});
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, &prompt_state_key, "gbot-prompt",
Some(etag), None, false, 0, None,
) {
warn!("Failed to save prompt file state: {}", e);
}
} else {
trace!("Prompt file {} unchanged (etag match), skipping download", path);
}
@ -943,20 +795,17 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
let etag = normalize_etag(obj.e_tag().unwrap_or_default());
let last_modified = obj.last_modified().map(|dt| dt.to_string());
let config_state_key = format!("__config__{}", path);
let should_sync = {
let states = self.file_states.read().await;
match states.get(&config_state_key) {
Some(prev) => {
let etag_changed = prev.etag != etag;
let mod_changed = match (&prev.last_modified, &last_modified) {
(Some(prev_dt), Some(new_dt)) => prev_dt != new_dt,
(None, Some(_)) => true,
_ => false,
};
etag_changed || mod_changed
}
None => true,
let should_sync = match self.file_repo.get_file_state(self.bot_id, &config_state_key) {
Some(prev) => {
let etag_changed = prev.etag.as_deref() != Some(&etag);
let mod_changed = match (&prev.last_modified, &last_modified) {
(Some(prev_dt), Some(new_dt)) => prev_dt.to_string() != new_dt.to_string(),
(None, Some(_)) => true,
_ => false,
};
etag_changed || mod_changed
}
None => true,
};
debug!("check_gbot: config.csv should_sync={} (etag={}, last_modified={:?})", should_sync, etag, last_modified);
if should_sync {
@ -1090,18 +939,16 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
self.broadcast_theme_change(&csv_content).await?;
}
// Update file_states with config.csv ETag and last_modified
let mut states = self.file_states.write().await;
states.insert(config_state_key, FileState { etag, indexed: false, last_failed_at: None, fail_count: 0, last_modified });
drop(states);
let file_states_clone = Arc::clone(&self.file_states);
let work_root_clone = self.work_root.clone();
let bucket_name_clone = self.bucket_name.clone();
tokio::spawn(async move {
if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await {
warn!("Failed to save file states after config update: {}", e);
}
// Update file state in DB for config.csv
let last_mod_dt = last_modified.as_ref().and_then(|s| {
DateTime::parse_from_rfc3339(s).ok().map(|d| d.with_timezone(&Utc))
});
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, &config_state_key, "gbot-config",
Some(etag), last_mod_dt, false, 0, None,
) {
warn!("Failed to save config file state: {}", e);
}
// Check for system-prompt-file and download it
let prompt_file_line = csv_content
@ -1525,14 +1372,11 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()),
continue;
}
let file_state = FileState {
etag: normalize_etag(obj.e_tag().unwrap_or_default()),
indexed: false,
last_failed_at: None,
fail_count: 0,
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path.clone(), file_state);
let etag = normalize_etag(obj.e_tag().unwrap_or_default());
let last_modified = obj.last_modified().and_then(|dt| {
DateTime::parse_from_rfc3339(&dt.to_string()).ok().map(|d| d.with_timezone(&Utc))
});
current_files.insert(path.clone(), (etag, last_modified));
}
if !list_objects.is_truncated.unwrap_or(false) {
@ -1566,8 +1410,6 @@ let file_state = FileState {
}
}
let mut file_states = self.file_states.write().await;
// Build set of already-indexed KB folder names for quick lookup
let indexed_kb_names: HashSet<String> = {
let indexed = self.kb_indexed_folders.read().await;
@ -1577,29 +1419,30 @@ let file_state = FileState {
.collect()
};
for (path, current_state) in current_files.iter() {
let is_new = !file_states.contains_key(path);
for (path, (_, current_last_modified)) in current_files.iter() {
let prev_state = self.file_repo.get_file_state(self.bot_id, path);
let is_new = prev_state.is_none();
// Skip files from already-indexed KB folders that are not new
// This prevents re-download loop when file_states fails to load
// This prevents re-download loop when DB is loaded fresh
let kb_name_from_path = path.split('/').nth(1).map(|s| s.to_string());
if all_indexed && !is_new {
trace!("Skipping already indexed file: {}", path);
continue;
}
// Extra safety: if file_states is empty but KB is indexed, skip non-new files
if file_states.is_empty() && all_indexed {
// Extra safety: if the KB folder is indexed, skip non-new files
if all_indexed {
if let Some(kb) = &kb_name_from_path {
if indexed_kb_names.contains(kb) {
trace!("Skipping file from indexed KB (empty file_states): {}", path);
trace!("Skipping file from indexed KB: {}", path);
continue;
}
}
}
// Use only last_modified for change detection - more reliable than ETag
let is_modified = if let Some(prev) = file_states.get(path) {
prev.last_modified != current_state.last_modified
let is_modified = if let Some(prev) = &prev_state {
prev.last_modified != *current_last_modified
} else {
false
};
@ -1626,10 +1469,10 @@ let file_state = FileState {
}
}
}
if let Some(prev_state) = file_states.get(path) {
if prev_state.fail_count >= MAX_FAIL_COUNT {
if let Some(prev) = &prev_state {
if prev.fail_count >= MAX_FAIL_COUNT {
let elapsed = Utc::now()
.signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now()));
.signed_duration_since(prev.last_failed_at.unwrap_or(Utc::now()));
if elapsed.num_seconds() < RETRY_BACKOFF_SECS {
continue;
}
@ -1703,10 +1546,11 @@ let file_state = FileState {
}
}
let paths_to_remove: Vec<String> = file_states
.keys()
.filter(|path| path.starts_with(&gbkb_prefix) && !current_files.contains_key(*path))
.cloned()
// Find files deleted from MinIO
let previous_gbkb = self.file_repo.get_files_by_prefix(self.bot_id, &gbkb_prefix);
let paths_to_remove: Vec<String> = previous_gbkb.iter()
.filter(|f| !current_files.contains_key(&f.file_path))
.map(|f| f.file_path.clone())
.collect();
if files_processed > 0 {
@ -1715,36 +1559,34 @@ let file_state = FileState {
files_processed, pdf_files_found
);
}
for (path, mut state) in current_files {
// Preserve indexed status and fail history when file hasn't actually changed
// Use last_modified as the primary check (more stable than ETag)
if let Some(previous) = file_states.get(&path) {
let content_unchanged = previous.last_modified == state.last_modified
|| (previous.etag == state.etag && previous.last_modified.is_some() && state.last_modified.is_some());
// Persist each current file to the DB, preserving state when unchanged
for (path, (etag, last_modified)) in &current_files {
if let Some(previous) = self.file_repo.get_file_state(self.bot_id, path) {
let content_unchanged = previous.last_modified == *last_modified
|| (previous.etag.as_deref() == Some(etag.as_str())
&& previous.last_modified.is_some()
&& last_modified.is_some());
if content_unchanged {
state.indexed = previous.indexed;
state.fail_count = previous.fail_count;
state.last_failed_at = previous.last_failed_at;
// Unchanged - leave existing DB row as-is
continue;
}
}
file_states.insert(path, state);
}
// Save file states to disk in background to avoid blocking
// Use static helper to avoid double Arc (fixes "dispatch failure" error)
let file_states_clone = Arc::clone(&self.file_states);
let work_root_clone = self.work_root.clone();
let bucket_name_clone = self.bucket_name.clone();
tokio::spawn(async move {
if let Err(e) = Self::save_file_states_static(&file_states_clone, &work_root_clone, &bucket_name_clone).await {
warn!("Failed to save file states: {}", e);
// New or changed file — upsert with default state (indexed=false)
if let Err(e) = self.file_repo.upsert_file_full(
self.bot_id, path, "gbkb",
Some(etag.clone()), *last_modified,
false, 0, None,
) {
warn!("Failed to upsert gbkb file {}: {}", path, e);
}
});
}
for path in paths_to_remove {
trace!("Detected deletion in .gbkb: {}", path);
file_states.remove(&path);
if let Err(e) = self.file_repo.delete_file(self.bot_id, &path) {
warn!("Failed to delete gbkb file state {}: {}", path, e);
}
// Delete the downloaded file from disk
let bot_name = self
@ -1766,7 +1608,7 @@ let file_state = FileState {
let kb_name = path_parts[1];
let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name);
if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) {
if !self.file_repo.has_files_with_prefix(self.bot_id, &kb_prefix) {
#[cfg(any(feature = "research", feature = "llm"))]
{
if let Err(e) = self.kb_manager.clear_kb(self.bot_id, bot_name, kb_name).await {