diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index ef41e616..89cde8c2 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -123,50 +123,43 @@ impl DriveMonitor { .join("file_states.json") } - /// Load file states from disk to avoid reprocessing unchanged files - async fn load_file_states(&self) -> Result<(), Box> { - let path = self.file_state_path(); - debug!("[DRIVE_MONITOR] Loading file states from {} for bot {}", path.display(), self.bot_id); - if path.exists() { - match tokio_fs::read_to_string(&path).await { - Ok(content) => { - match serde_json::from_str::>(&content) { - Ok(states) => { - let mut file_states = self.file_states.write().await; - let count = states.len(); - *file_states = states; - info!( - "[DRIVE_MONITOR] Loaded {} file states from disk for bot {}", - count, - self.bot_id - ); - } - Err(e) => { - warn!( - "[DRIVE_MONITOR] Failed to parse file states from {}: {}. Starting with empty state.", - path.display(), - e - ); - } - } - } - Err(e) => { - warn!( - "[DRIVE_MONITOR] Failed to read file states from {}: {}. Starting with empty state.", - path.display(), - e - ); - } - } - } else { - debug!( - "[DRIVE_MONITOR] No existing file states found at {} for bot {}. Starting fresh.", - path.display(), + /// Load file states from disk to avoid reprocessing unchanged files + async fn load_file_states(&self) -> Result<(), Box> { + let path = self.file_state_path(); + if path.exists() { + match tokio_fs::read_to_string(&path).await { + Ok(content) => { + match serde_json::from_str::>(&content) { + Ok(states) => { + let mut file_states = self.file_states.write().await; + let count = states.len(); + *file_states = states; + info!( + "Loaded {} file states from disk for bot {}", + count, self.bot_id - ); + ); + } + Err(e) => { + warn!( + "Failed to parse file states from {}: {}. Starting with empty state.", + path.display(), + e + ); + } + } } - Ok(()) + Err(e) => { + warn!( + "Failed to read file states from {}: {}. Starting with empty state.", + path.display(), + e + ); + } + } } + Ok(()) + } /// Static helper to save file states (used by background tasks) async fn save_file_states_static( @@ -181,7 +174,7 @@ impl DriveMonitor { if let Some(parent) = path.parent() { if let Err(e) = tokio_fs::create_dir_all(parent).await { warn!( - "[DRIVE_MONITOR] Failed to create directory for file states: {} - {}", + "Failed to create directory for file states: {} - {}", parent.display(), e ); @@ -193,13 +186,13 @@ impl DriveMonitor { Ok(content) => { if let Err(e) = tokio_fs::write(&path, content).await { warn!( - "[DRIVE_MONITOR] Failed to save file states to {}: {}", + "Failed to save file states to {}: {}", path.display(), e ); } else { debug!( - "[DRIVE_MONITOR] Saved {} file states to disk for bucket {}", + "Saved {} file states to disk for bucket {}", states.len(), bucket_name ); @@ -207,7 +200,7 @@ impl DriveMonitor { } Err(e) => { warn!( - "[DRIVE_MONITOR] Failed to serialize file states: {}", + "Failed to serialize file states: {}", e ); } @@ -228,11 +221,11 @@ impl DriveMonitor { { Ok(Ok(_)) => true, Ok(Err(e)) => { - debug!("[DRIVE_MONITOR] Health check failed: {}", e); + debug!("Health check failed: {}", e); false } Err(_) => { - debug!("[DRIVE_MONITOR] Health check timed out"); + debug!("Health check timed out"); false } } @@ -276,16 +269,14 @@ impl DriveMonitor { file_states: Arc>>, is_processing: Arc, ) { - tokio::spawn(async move { - trace!("[KB_PROCESSOR] Starting for bot {} (bucket: {})", bot_name, bot_id); - - // Keep running as long as the DriveMonitor is active - while is_processing.load(std::sync::atomic::Ordering::SeqCst) { - // Get one pending KB folder from the queue - let kb_key = { - let pending = pending_kb_index.write().await; - pending.iter().next().cloned() - }; + tokio::spawn(async move { + // Keep running as long as the DriveMonitor is active + while is_processing.load(std::sync::atomic::Ordering::SeqCst) { + // Get one pending KB folder from the queue + let kb_key = { + let pending = pending_kb_index.write().await; + pending.iter().next().cloned() + }; let Some(kb_key) = kb_key else { // Nothing pending, wait and retry @@ -321,7 +312,7 @@ impl DriveMonitor { indexing.insert(kb_key.clone()); } - trace!("[KB_PROCESSOR] Indexing KB: {} for bot: {}", kb_key, bot_name); + trace!("Indexing KB: {} for bot: {}", kb_key, bot_name); // Perform the actual KB indexing let result = tokio::time::timeout( @@ -343,7 +334,7 @@ impl DriveMonitor { match result { Ok(Ok(_)) => { - info!("[KB_PROCESSOR] Successfully indexed KB: {}", kb_key); + info!("Successfully indexed KB: {}", kb_key); { let mut indexed = kb_indexed_folders.write().await; indexed.insert(kb_key.clone()); @@ -358,7 +349,7 @@ match result { } } Ok(Err(e)) => { - warn!("[KB_PROCESSOR] Failed to index KB {}: {}", kb_key, e); + warn!("Failed to index KB {}: {}", kb_key, e); // Update fail count let mut states = file_states.write().await; for (path, state) in states.iter_mut() { @@ -369,7 +360,7 @@ match result { } } Err(_) => { - error!("[KB_PROCESSOR] KB indexing timed out after 120s for {}", kb_key); + error!("KB indexing timed out after 120s for {}", kb_key); let mut states = file_states.write().await; for (path, state) in states.iter_mut() { if path.contains(&format!("{}/", kb_folder_name)) { @@ -381,7 +372,7 @@ match result { } } - trace!("[KB_PROCESSOR] Stopping for bot {}", bot_name); + trace!("Stopping for bot {}", bot_name); }); } @@ -391,14 +382,12 @@ match result { // KB indexing not available in this build } - pub async fn start_monitoring(&self) -> Result<(), Box> { - trace!("start_monitoring ENTER"); - let start_mem = MemoryStats::current(); - trace!( - "[DRIVE_MONITOR] Starting DriveMonitor for bot {}, RSS={}", - self.bot_id, - MemoryStats::format_bytes(start_mem.rss_bytes) - ); + pub async fn start_monitoring(&self) -> Result<(), Box> { + let start_mem = MemoryStats::current(); + trace!( + "Starting DriveMonitor for bot {}", + self.bot_id + ); // Check if already processing to prevent duplicate monitoring if self.is_processing.load(std::sync::atomic::Ordering::Acquire) { @@ -409,14 +398,14 @@ match result { // Load file states from disk to avoid reprocessing unchanged files if let Err(e) = self.load_file_states().await { warn!( - "[DRIVE_MONITOR] Failed to load file states for bot {}: {}", + "Failed to load file states for bot {}: {}", self.bot_id, e ); } if !self.check_drive_health().await { warn!( - "[DRIVE_MONITOR] S3/MinIO not available for bucket {}, will retry with backoff", + "S3/MinIO not available for bucket {}, will retry with backoff", self.bucket_name ); } @@ -448,7 +437,7 @@ match result { let after_initial = MemoryStats::current(); trace!( - "[DRIVE_MONITOR] After initial check, RSS={} (delta={})", + "After initial check, RSS={} (delta={})", MemoryStats::format_bytes(after_initial.rss_bytes), MemoryStats::format_bytes(after_initial.rss_bytes.saturating_sub(start_mem.rss_bytes)) ); @@ -457,49 +446,42 @@ match result { self.is_processing.store(true, std::sync::atomic::Ordering::SeqCst); trace!("Forced is_processing to true for periodic monitoring"); - let self_clone = self.clone(); // Don't wrap in Arc::new - that creates a copy - tokio::spawn(async move { - let mut consecutive_processing_failures = 0; - trace!("Starting periodic monitoring loop for bot {}", self_clone.bot_id); + let self_clone = self.clone(); + tokio::spawn(async move { + let mut consecutive_processing_failures = 0; - let is_processing_state = self_clone.is_processing.load(std::sync::atomic::Ordering::SeqCst); - trace!("is_processing state at loop start: {} for bot {}", is_processing_state, self_clone.bot_id); - - while self_clone - .is_processing - .load(std::sync::atomic::Ordering::SeqCst) - { - debug!("[DRIVE_MONITOR] Inside monitoring loop for bot {}", self_clone.bot_id); - debug!("[DRIVE_MONITOR] Periodic check starting for bot {}", self_clone.bot_id); + while self_clone + .is_processing + .load(std::sync::atomic::Ordering::SeqCst) + { - // Smart sleep based on fail_count - prevent excessive retries - { - let states = self_clone.file_states.read().await; - let max_fail_count = states.values() - .map(|s| s.fail_count) - .max() - .unwrap_or(0); - - let base_sleep = if max_fail_count >= 3 { - 3600 // 1 hour for fail_count >= 3 - } else if max_fail_count >= 2 { - 900 // 15 min for fail_count >= 2 - } else if max_fail_count >= 1 { - 300 // 5 min for fail_count >= 1 - } else { - 10 // 10 sec default - }; - - if base_sleep > 10 { - debug!("[DRIVE_MONITOR] Sleep {}s based on fail_count={}", base_sleep, max_fail_count); - } - - tokio::time::sleep(Duration::from_secs(base_sleep)).await; - } + // Smart sleep based on fail_count - prevent excessive retries + { + let states = self_clone.file_states.read().await; + let max_fail_count = states.values() + .map(|s| s.fail_count) + .max() + .unwrap_or(0); - debug!("[DRIVE_MONITOR] Checking drive health for bot {}", self_clone.bot_id); - // Skip drive health check - just proceed with monitoring - // if !self_clone.check_drive_health().await { + let base_sleep = if max_fail_count >= 3 { + 3600 + } else if max_fail_count >= 2 { + 900 + } else if max_fail_count >= 1 { + 300 + } else { + 30 + }; + + if base_sleep > 10 { + debug!("Sleep {}s based on fail_count={}", base_sleep, max_fail_count); + } + + drop(states); + tokio::time::sleep(Duration::from_secs(base_sleep)).await; + } + + // Skip drive health check - just proceed with monitoring if false { let failures = self_clone .consecutive_failures @@ -512,9 +494,8 @@ match result { continue; } - debug!("[DRIVE_MONITOR] About to call check_for_changes for bot {}", self_clone.bot_id); - // Add timeout to prevent hanging - match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await { + // Add timeout to prevent hanging + match tokio::time::timeout(Duration::from_secs(12), self_clone.check_for_changes()).await { Ok(Ok(_)) => { let prev_failures = self_clone.consecutive_failures.swap(0, Ordering::Relaxed); @@ -559,18 +540,15 @@ match result { Ok(()) } - pub async fn stop_monitoring(&self) -> Result<(), Box> { - trace!("Stopping DriveMonitor for bot {}", self.bot_id); + pub async fn stop_monitoring(&self) -> Result<(), Box> { + self.is_processing + .store(false, std::sync::atomic::Ordering::SeqCst); - self.is_processing - .store(false, std::sync::atomic::Ordering::SeqCst); + self.file_states.write().await.clear(); + self.consecutive_failures.store(0, Ordering::Relaxed); - self.file_states.write().await.clear(); - self.consecutive_failures.store(0, Ordering::Relaxed); - - trace!("DriveMonitor stopped for bot {}", self.bot_id); - Ok(()) - } + Ok(()) + } pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { trace!( @@ -621,7 +599,7 @@ match result { trace!("check_for_changes ENTER"); let start_mem = MemoryStats::current(); trace!( - "[DRIVE_MONITOR] check_for_changes START, RSS={}", + "check_for_changes START, RSS={}", MemoryStats::format_bytes(start_mem.rss_bytes) ); @@ -636,7 +614,7 @@ match result { trace!("check_for_changes: check_gbdialog_changes done"); let after_dialog = MemoryStats::current(); trace!( - "[DRIVE_MONITOR] After gbdialog, RSS={} (delta={})", + "After gbdialog, RSS={} (delta={})", MemoryStats::format_bytes(after_dialog.rss_bytes), MemoryStats::format_bytes(after_dialog.rss_bytes.saturating_sub(start_mem.rss_bytes)) ); @@ -647,7 +625,7 @@ match result { trace!("check_for_changes: check_gbot done"); let after_gbot = MemoryStats::current(); trace!( - "[DRIVE_MONITOR] After gbot, RSS={} (delta={})", + "After gbot, RSS={} (delta={})", MemoryStats::format_bytes(after_gbot.rss_bytes), MemoryStats::format_bytes(after_gbot.rss_bytes.saturating_sub(after_dialog.rss_bytes)) ); @@ -658,7 +636,7 @@ match result { trace!("check_for_changes: check_gbkb_changes done"); let after_gbkb = MemoryStats::current(); trace!( - "[DRIVE_MONITOR] After gbkb, RSS={} (delta={})", + "After gbkb, RSS={} (delta={})", MemoryStats::format_bytes(after_gbkb.rss_bytes), MemoryStats::format_bytes(after_gbkb.rss_bytes.saturating_sub(after_gbot.rss_bytes)) ); @@ -668,7 +646,7 @@ match result { let total_delta = after_gbkb.rss_bytes.saturating_sub(start_mem.rss_bytes); if total_delta > 50 * 1024 * 1024 { warn!( - "[DRIVE_MONITOR] check_for_changes grew by {} - potential leak!", + "check_for_changes grew by {} - potential leak!", MemoryStats::format_bytes(total_delta) ); } @@ -1173,62 +1151,65 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), trace!("check_gbot EXIT"); Ok(()) } - async fn broadcast_theme_change( - &self, - csv_content: &str, - ) -> Result<(), Box> { - let mut theme_data = serde_json::json!({ - "event": "change_theme", - "data": {} - }); - for line in csv_content.lines() { - let parts: Vec<&str> = line.split(',').collect(); - if parts.len() >= 2 { - let key = parts[0].trim(); - let value = parts[1].trim(); - match key { - "theme-color1" => { - theme_data["data"]["color1"] = serde_json::Value::String(value.to_string()); - } - "theme-color2" => { - theme_data["data"]["color2"] = serde_json::Value::String(value.to_string()); - } - "theme-logo" => { - theme_data["data"]["logo_url"] = - serde_json::Value::String(value.to_string()); - } - "theme-title" => { - theme_data["data"]["title"] = serde_json::Value::String(value.to_string()); - } - "theme-logo-text" => { - theme_data["data"]["logo_text"] = - serde_json::Value::String(value.to_string()); - } - _ => {} - } - } + async fn broadcast_theme_change( + &self, + csv_content: &str, + ) -> Result<(), Box> { + let mut theme_data = serde_json::json!({ + "event": "change_theme", + "data": {} + }); + for line in csv_content.lines() { + let parts: Vec<&str> = line.split(',').collect(); + if parts.len() >= 2 { + let key = parts[0].trim(); + let value = parts[1].trim(); + match key { + "theme-color1" => { + theme_data["data"]["color1"] = serde_json::Value::String(value.to_string()); + } + "theme-color2" => { + theme_data["data"]["color2"] = serde_json::Value::String(value.to_string()); + } + "theme-logo" => { + theme_data["data"]["logo_url"] = + serde_json::Value::String(value.to_string()); + } + "theme-title" => { + theme_data["data"]["title"] = serde_json::Value::String(value.to_string()); + } + "theme-logo-text" => { + theme_data["data"]["logo_text"] = + serde_json::Value::String(value.to_string()); + } + _ => {} } - let response_channels = self.state.response_channels.lock().await; - for (session_id, tx) in response_channels.iter() { - let theme_response = crate::core::shared::models::BotResponse { - bot_id: self.bot_id.to_string(), - user_id: "system".to_string(), - session_id: session_id.clone(), - channel: "web".to_string(), - content: serde_json::to_string(&theme_data)?, - message_type: MessageType::BOT_RESPONSE, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - context_name: None, - context_length: 0, - context_max_length: 0, - }; - let _ = tx.try_send(theme_response); - } - drop(response_channels); - Ok(()) + } } + // Clone channels to avoid holding lock while sending + let channels: Vec<_> = { + let response_channels = self.state.response_channels.lock().await; + response_channels.iter().map(|(id, tx)| (id.clone(), tx.clone())).collect() + }; + for (session_id, tx) in channels { + let theme_response = crate::core::shared::models::BotResponse { + bot_id: self.bot_id.to_string(), + user_id: "system".to_string(), + session_id, + channel: "web".to_string(), + content: serde_json::to_string(&theme_data)?, + message_type: MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: Vec::new(), + context_name: None, + context_length: 0, + context_max_length: 0, + }; + let _ = tx.try_send(theme_response); + } + Ok(()) + } async fn compile_tool( &self, client: &Client, @@ -1479,51 +1460,48 @@ etag: normalize_etag(obj.e_tag().unwrap_or_default()), .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { - trace!("[GBKB] Scan already in progress for bot {}, skipping", self.bot_id); + trace!("Scan already in progress for bot {}, skipping", self.bot_id); return Ok(()); } - debug!("[GBKB] check_gbkb_changes ENTER for bot {} (prefix: {})", self.bot_id, self.bucket_name); - let bot_name = self - .bucket_name - .strip_suffix(".gbai") - .unwrap_or(&self.bucket_name); + let bot_name = self + .bucket_name + .strip_suffix(".gbai") + .unwrap_or(&self.bucket_name); - let gbkb_prefix = format!("{}.gbkb/", bot_name); - debug!("[GBKB] Listing objects with prefix: {}", gbkb_prefix); - let mut current_files = HashMap::new(); - let mut continuation_token = None; + let gbkb_prefix = format!("{}.gbkb/", bot_name); + let mut current_files = HashMap::new(); + let mut continuation_token = None; - let mut files_processed = 0; - let mut files_to_process = Vec::new(); - let mut pdf_files_found = 0; + let mut files_processed = 0; + let mut files_to_process = Vec::new(); + let mut pdf_files_found = 0; - loop { - let list_objects = match tokio::time::timeout( - Duration::from_secs(30), - client - .list_objects_v2() - .bucket(self.bucket_name.to_lowercase()) - .prefix(&gbkb_prefix) - .set_continuation_token(continuation_token) - .send(), - ) - .await - { - Ok(Ok(list)) => list, - Ok(Err(e)) => { - debug!("[GBKB] Error listing objects: {}", e); - return Err(e.into()); - } - Err(_) => { - log::error!( - "Timeout listing .gbkb objects in bucket {}", - self.bucket_name - ); - return Ok(()); - } - }; - debug!("[GBKB] Listed {} objects in this page", list_objects.contents.as_ref().map(|c| c.len()).unwrap_or(0)); + loop { + let list_objects = match tokio::time::timeout( + Duration::from_secs(30), + client + .list_objects_v2() + .bucket(self.bucket_name.to_lowercase()) + .prefix(&gbkb_prefix) + .set_continuation_token(continuation_token) + .send(), + ) + .await + { + Ok(Ok(list)) => list, + Ok(Err(e)) => { + debug!("Error listing objects: {}", e); + return Err(e.into()); + } + Err(_) => { + log::error!( + "Timeout listing .gbkb objects in bucket {}", + self.bucket_name + ); + return Ok(()); + } + }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); @@ -1554,7 +1532,7 @@ let file_state = FileState { continuation_token = list_objects.next_continuation_token; } - debug!("[GBKB] Found {} files total, acquiring file_states lock...", current_files.len()); + // Check if ALL KBs for this bot are already indexed in Qdrant // If so, only scan for NEW files - skip re-indexing existing ones @@ -1579,8 +1557,7 @@ let file_state = FileState { } } - let mut file_states = self.file_states.write().await; - debug!("[GBKB] file_states lock acquired, processing {} files (all_indexed={}, file_states_count={})", current_files.len(), all_indexed, file_states.len()); + let mut file_states = self.file_states.write().await; // Build set of already-indexed KB folder names for quick lookup let indexed_kb_names: HashSet = { @@ -1591,118 +1568,88 @@ let file_state = FileState { .collect() }; - for (path, current_state) in current_files.iter() { - let is_new = !file_states.contains_key(path); - debug!("[GBKB] DEBUG: path={} in_file_states={}", path, !is_new); + for (path, current_state) in current_files.iter() { + let is_new = !file_states.contains_key(path); // Skip files from already-indexed KB folders that are not new // This prevents re-download loop when file_states fails to load let kb_name_from_path = path.split('/').nth(1).map(|s| s.to_string()); if all_indexed && !is_new { - trace!("[GBKB] Skipping already indexed file: {}", path); + trace!("Skipping already indexed file: {}", path); continue; } // Extra safety: if file_states is empty but KB is indexed, skip non-new files if file_states.is_empty() && all_indexed { if let Some(kb) = &kb_name_from_path { if indexed_kb_names.contains(kb) { - trace!("[GBKB] Skipping file from indexed KB (empty file_states): {}", path); + trace!("Skipping file from indexed KB (empty file_states): {}", path); continue; } } } - // Use last_modified as primary change detector (more stable than ETag) - // ETags can change due to metadata updates even when content is identical - let is_modified = if let Some(prev) = file_states.get(path) { - // If last_modified matches, content hasn't changed regardless of ETag - if prev.last_modified == current_state.last_modified { - false - } else { - // Different timestamp - use ETag to confirm content actually changed - prev.etag != current_state.etag - } - } else { - false - }; + // Use only last_modified for change detection - more reliable than ETag + let is_modified = if let Some(prev) = file_states.get(path) { + prev.last_modified != current_state.last_modified + } else { + false + }; - if is_new || is_modified { - debug!("[GBKB] New/modified file: {} (new={}, modified={})", path, is_new, is_modified); + if is_new || is_modified { + #[cfg(any(feature = "research", feature = "llm"))] + { + // Only remove from indexed_folders if KB is actually being re-indexed + let path_parts: Vec<&str> = path.split('/').collect(); + if path_parts.len() >= 2 { + let kb_name = path_parts[1]; + let kb_key = format!("{}_{}", bot_name, kb_name); - #[cfg(any(feature = "research", feature = "llm"))] - { - // Only remove from indexed_folders if KB is actually being re-indexed - // Don't remove if already indexed in Qdrant (skip unnecessary re-queueing) - let path_parts: Vec<&str> = path.split('/').collect(); - if path_parts.len() >= 2 { - let kb_name = path_parts[1]; - let kb_key = format!("{}_{}", bot_name, kb_name); - - let already_indexed = { - let indexed_folders = self.kb_indexed_folders.read().await; - indexed_folders.contains(&kb_key) - }; - - // Only remove and re-queue if NOT already indexed - // This prevents infinite reindexing loops when files haven't really changed - if !already_indexed { - let mut indexed_folders = self.kb_indexed_folders.write().await; - if indexed_folders.remove(&kb_key) { - debug!("[GBKB] Removed {} from indexed set due to file change", kb_key); - } - } else { - trace!("[GBKB] KB {} already indexed, skipping re-queue", kb_key); - } - } - } - if let Some(prev_state) = file_states.get(path) { - if prev_state.fail_count >= MAX_FAIL_COUNT { - let elapsed = Utc::now() - .signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now())); - if elapsed.num_seconds() < RETRY_BACKOFF_SECS { - trace!( - "Skipping {} - fail_count={} (last failed {}s ago, max {}s backoff)", - path, prev_state.fail_count, elapsed.num_seconds(), RETRY_BACKOFF_SECS - ); - continue; - } - } - } + // Check and remove in one atomic operation + let should_remove = { + let indexed_folders = self.kb_indexed_folders.read().await; + indexed_folders.contains(&kb_key) + }; - if path.to_lowercase().ends_with(".pdf") { - pdf_files_found += 1; - debug!("[GBKB] Detected PDF: {}", path); - } else { - trace!( - "Detected {} in .gbkb: {}", - if is_new { "new file" } else { "change" }, - path - ); - } + // Only remove if NOT already indexed + if !should_remove { + let mut indexed_folders = self.kb_indexed_folders.write().await; + indexed_folders.remove(&kb_key); + } + } + } + if let Some(prev_state) = file_states.get(path) { + if prev_state.fail_count >= MAX_FAIL_COUNT { + let elapsed = Utc::now() + .signed_duration_since(prev_state.last_failed_at.unwrap_or(Utc::now())); + if elapsed.num_seconds() < RETRY_BACKOFF_SECS { + continue; + } + } + } - debug!("[GBKB] Pushing to download queue: {}", path); - files_to_process.push(path.clone()); - files_processed += 1; - debug!("[GBKB] Queue size: {}/10", files_to_process.len()); + if path.to_lowercase().ends_with(".pdf") { + pdf_files_found += 1; + } + + files_to_process.push(path.clone()); + files_processed += 1; // REMOVED: Skip downloads if LLM is actively streaming - was causing deadlocks // #[cfg(any(feature = "research", feature = "llm"))] // if is_llm_streaming() { - // debug!("[GBKB] Skipping download - LLM is streaming, will retry later"); + // debug!("Skipping download - LLM is streaming, will retry later"); // files_to_process.clear(); // break; // } - if files_to_process.len() >= 10 { - debug!("[GBKB] Downloading batch of {} files", files_to_process.len()); - for file_path in std::mem::take(&mut files_to_process) { - debug!("[GBKB] Downloading: {}", file_path); - if let Err(e) = self.download_gbkb_file(client, &file_path).await { - log::error!("Failed to download .gbkb file {}: {}", file_path, e); - } - } - tokio::time::sleep(Duration::from_millis(100)).await; - } + if files_to_process.len() >= 10 { + for file_path in std::mem::take(&mut files_to_process) { + if let Err(e) = self.download_gbkb_file(client, &file_path).await { + log::error!("Failed to download .gbkb file {}: {}", file_path, e); + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } // Queue KB folder for indexing - only if not already indexed and no files changed let path_parts: Vec<&str> = path.split('/').collect(); @@ -1722,14 +1669,10 @@ let file_state = FileState { indexed_folders.contains(&kb_key) }; - if !already_indexed { - let mut pending = self.pending_kb_index.write().await; - if pending.insert(kb_key.clone()) { - debug!("[GBKB] Queued KB {} for indexing (non-blocking)", kb_key); - } - } else { - trace!("[GBKB] KB {} already indexed, skipping", kb_key); - } + if !already_indexed { + let mut pending = self.pending_kb_index.write().await; + pending.insert(kb_key.clone()); + } } } @@ -1744,16 +1687,12 @@ let file_state = FileState { // Download remaining files (less than 10) if !files_to_process.is_empty() { - debug!("[GBKB] Downloading final batch of {} files", files_to_process.len()); - for file_path in std::mem::take(&mut files_to_process) { - debug!("[GBKB] Downloading: {}", file_path); - if let Err(e) = self.download_gbkb_file(client, &file_path).await { - log::error!("Failed to download .gbkb file {}: {}", file_path, e); - } - } - } - - debug!("[GBKB] Processed {} files, {} PDFs found", files_processed, pdf_files_found); + for file_path in std::mem::take(&mut files_to_process) { + if let Err(e) = self.download_gbkb_file(client, &file_path).await { + log::error!("Failed to download .gbkb file {}: {}", file_path, e); + } + } + } let paths_to_remove: Vec = file_states .keys() @@ -1848,11 +1787,9 @@ let file_state = FileState { } } - debug!("[GBKB] check_gbkb_changes EXIT for bot {}", self.bot_id); - trace!("check_gbkb_changes EXIT"); - self.scanning.store(false, Ordering::Release); - Ok(()) - } + self.scanning.store(false, Ordering::Release); + Ok(()) + } async fn download_gbkb_file( &self, diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 00629f6e..1e007d5c 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -227,8 +227,15 @@ impl OpenAIClient { })); } for (role, content) in history { + // Filter out internal roles not valid for OpenAI API + let api_role = match role.as_str() { + "user" | "assistant" | "system" | "developer" | "tool" => role.as_str(), + // Convert internal roles to valid API roles + "episodic" | "compact" => "system", + _ => "system", + }; messages.push(serde_json::json!({ - "role": role, + "role": api_role, "content": Self::sanitize_utf8(content) })); } diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index d1f692ce..3c720d4e 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -845,8 +845,8 @@ fn init_llm_provider( /// Start background services and monitors pub async fn start_background_services( - app_state: Arc, - pool: &crate::core::shared::utils::DbPool, + app_state: Arc, + _pool: &crate::core::shared::utils::DbPool, ) { use crate::core::shared::memory_monitor::{log_process_memory, start_memory_monitor}; @@ -890,8 +890,9 @@ pub async fn start_background_services( trace!("ensure_llama_servers_running completed"); } - // DISABLED: DriveMonitor for testing WebSocket/LLM response in isolation - // start_drive_monitors(app_state.clone(), pool).await; + // Start DriveMonitor for S3/MinIO file watching + #[cfg(feature = "drive")] + start_drive_monitors(app_state.clone(), _pool).await; // Local file monitoring removed - Drive (MinIO) is the only source now // #[cfg(feature = "local-files")] @@ -901,17 +902,17 @@ pub async fn start_background_services( // start_config_watcher(app_state.clone()).await; } -#[cfg(feature = "drive")] -async fn start_drive_monitors( + #[cfg(feature = "drive")] + async fn start_drive_monitors( app_state: Arc, - pool: &crate::core::shared::utils::DbPool, -) { + _pool: &crate::core::shared::utils::DbPool, + ) { use crate::core::shared::memory_monitor::register_thread; use crate::core::shared::models::schema::bots; use diesel::prelude::*; let drive_monitor_state = app_state.clone(); - let pool_clone = pool.clone(); + let pool_clone = _pool.clone(); let state_for_scan = app_state.clone(); tokio::spawn(async move {