fix: KB processor runs as background task, no longer blocks check_for_changes
All checks were successful
BotServer CI/CD / build (push) Successful in 3m50s

- Added start_kb_processor() method: long-running background task per bot
- check_gbkb_changes now queues KB folders to pending_kb_index (non-blocking)
- KB processor polls pending_kb_index and processes one at a time per bot
- Removed inline tokio::spawn from check_gbkb_changes that was causing 5min timeouts
- Added pending_kb_index field to DriveMonitor struct

This fixes salesianos DriveMonitor timeout - check_for_changes now completes
in seconds instead of hanging on KB embedding/indexing.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-04-12 21:28:03 -03:00
parent ad998b52d4
commit 112ac51da3

View file

@ -54,6 +54,8 @@ pub struct DriveMonitor {
consecutive_failures: Arc<AtomicU32>,
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc<TokioRwLock<HashSet<String>>>,
#[cfg(any(feature = "research", feature = "llm"))]
pending_kb_index: Arc<TokioRwLock<HashSet<String>>>,
}
impl DriveMonitor {
fn normalize_config_value(value: &str) -> String {
@ -82,6 +84,8 @@ impl DriveMonitor {
consecutive_failures: Arc::new(AtomicU32::new(0)),
#[cfg(any(feature = "research", feature = "llm"))]
files_being_indexed: Arc::new(TokioRwLock::new(HashSet::new())),
#[cfg(any(feature = "research", feature = "llm"))]
pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())),
}
}
@ -215,6 +219,125 @@ impl DriveMonitor {
Duration::from_secs(backoff_secs.min(MAX_BACKOFF_SECS))
}
/// Start a long-running background KB processor that handles pending indexing requests
/// Only one instance runs per bot - this is spawned once from start_monitoring
#[cfg(any(feature = "research", feature = "llm"))]
pub fn start_kb_processor(&self) {
let kb_manager = Arc::clone(&self.kb_manager);
let bot_id = self.bot_id;
let bot_name = self.bucket_name.strip_suffix(".gbai").unwrap_or(&self.bucket_name).to_string();
let work_root = self.work_root.clone();
let pending_kb_index = Arc::clone(&self.pending_kb_index);
let files_being_indexed = Arc::clone(&self.files_being_indexed);
let file_states = Arc::clone(&self.file_states);
let is_processing = Arc::clone(&self.is_processing);
tokio::spawn(async move {
trace!("[KB_PROCESSOR] Starting for bot {} (bucket: {})", bot_name, bot_id);
// Keep running as long as the DriveMonitor is active
while is_processing.load(std::sync::atomic::Ordering::SeqCst) {
// Get one pending KB folder from the queue
let kb_key = {
let pending = pending_kb_index.write().await;
pending.iter().next().cloned()
};
let Some(kb_key) = kb_key else {
// Nothing pending, wait and retry
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
};
// Parse KB key to get folder name
let parts: Vec<&str> = kb_key.splitn(2, '_').collect();
if parts.len() < 2 {
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
let kb_folder_name = parts[1];
let kb_folder_path = work_root.join(&bot_name).join(format!("{}.gbkb/", bot_name)).join(kb_folder_name);
// Check if already being indexed
{
let indexing = files_being_indexed.read().await;
if indexing.contains(&kb_key) {
// Already processing, move to next
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
continue;
}
}
// Mark as being indexed
{
let mut indexing = files_being_indexed.write().await;
indexing.insert(kb_key.clone());
}
trace!("[KB_PROCESSOR] Indexing KB: {} for bot: {}", kb_key, bot_name);
// Perform the actual KB indexing
let result = tokio::time::timeout(
Duration::from_secs(120),
kb_manager.handle_gbkb_change(bot_id, &bot_name, kb_folder_path.as_path()),
).await;
// Remove from being indexed
{
let mut indexing = files_being_indexed.write().await;
indexing.remove(&kb_key);
}
// Remove from pending queue
{
let mut pending = pending_kb_index.write().await;
pending.remove(&kb_key);
}
match result {
Ok(Ok(_)) => {
trace!("[KB_PROCESSOR] Successfully indexed KB: {}", kb_key);
// Mark files in this KB as indexed
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.indexed = true;
state.fail_count = 0;
state.last_failed_at = None;
}
}
}
Ok(Err(e)) => {
warn!("[KB_PROCESSOR] Failed to index KB {}: {}", kb_key, e);
// Update fail count
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.fail_count = state.fail_count.saturating_add(1);
state.last_failed_at = Some(chrono::Utc::now());
}
}
}
Err(_) => {
error!("[KB_PROCESSOR] KB indexing timed out after 120s for {}", kb_key);
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.contains(&format!("{}/", kb_folder_name)) {
state.fail_count = state.fail_count.saturating_add(1);
state.last_failed_at = Some(chrono::Utc::now());
}
}
}
}
}
trace!("[KB_PROCESSOR] Stopping for bot {}", bot_name);
});
}
pub async fn start_monitoring(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
trace!("start_monitoring ENTER");
let start_mem = MemoryStats::current();
@ -248,6 +371,10 @@ impl DriveMonitor {
self.is_processing
.store(true, std::sync::atomic::Ordering::SeqCst);
// Start the background KB processor - one instance per bot
#[cfg(any(feature = "research", feature = "llm"))]
self.start_kb_processor();
trace!("start_monitoring: calling check_for_changes...");
trace!("Calling initial check_for_changes...");
@ -1403,178 +1530,64 @@ impl DriveMonitor {
let path_parts: Vec<&str> = path.split('/').collect();
if path_parts.len() >= 3 {
let kb_name = path_parts[1];
let kb_folder_path = self
.work_root
.join(bot_name)
.join(&gbkb_prefix)
.join(kb_name);
let kb_key = format!("{}_{}", bot_name, kb_name);
#[cfg(any(feature = "research", feature = "llm"))]
{
if !is_embedding_server_ready() {
info!("Embedding server not yet marked ready, KB indexing will wait for it");
}
// Create a unique key for this KB folder to track indexing state
let kb_key = format!("{}_{}", bot_name, kb_name);
// Check fail_count for this KB folder - implement backoff
{
let states = self.file_states.read().await;
let kb_folder_pattern = format!("{}/", kb_name);
// Filter only files in this KB folder
let kb_states: Vec<_> = states.iter()
.filter(|(path, _)| path.contains(&kb_folder_pattern))
.collect();
if kb_states.is_empty() {
// No files in file_states yet for this KB - proceed with indexing
} else {
let max_fail_count = kb_states.iter()
.map(|(_, s)| s.fail_count)
.max()
.unwrap_or(0);
// Backoff: wait longer based on fail count
// fail_count 0: no wait, 1: 5min, 2: 15min, 3+: 1h
if max_fail_count > 0 {
let wait_seconds = match max_fail_count {
1 => 300, // 5 min
2 => 900, // 15 min
_ => 3600, // 1 hour
};
if let Some(last_failed) = kb_states.iter()
.filter_map(|(_, s)| s.last_failed_at)
.max()
// Check fail_count for backoff
let mut should_skip = false;
{
let elapsed = chrono::Utc::now() - last_failed;
if elapsed.num_seconds() < wait_seconds {
trace!("[DRIVE_MONITOR] KB folder {} in backoff (fail_count={}, elapsed={}s < {}s), skipping",
kb_key, max_fail_count, elapsed.num_seconds(), wait_seconds);
continue;
}
}
}
}
}
// Check if this KB folder is already being indexed
{
let indexing_set = self.files_being_indexed.read().await;
if indexing_set.contains(&kb_key) {
debug!("[DRIVE_MONITOR] KB folder {} already being indexed, skipping duplicate task", kb_key);
continue;
}
}
// Mark this KB folder as being indexed
{
let mut indexing_set = self.files_being_indexed.write().await;
indexing_set.insert(kb_key.clone());
}
let kb_manager = Arc::clone(&self.kb_manager);
let bot_id = self.bot_id;
let bot_name_owned = bot_name.to_string();
let kb_name_owned = kb_name.to_string();
let kb_folder_owned = kb_folder_path.clone();
let _files_being_indexed = Arc::clone(&self.files_being_indexed);
let file_key = Arc::clone(&self.files_being_indexed);
let kb_key_owned = kb_key.clone();
let file_states = Arc::clone(&self.file_states);
let work_root = self.work_root.clone();
let bucket_name = self.bucket_name.clone();
let gbkb_prefix_owned = gbkb_prefix.clone();
let bot_name_for_spawn = bot_name.to_string();
tokio::spawn(async move {
trace!(
"Triggering KB indexing for folder: {} (PDF text extraction enabled)",
kb_folder_owned.display()
);
let result = tokio::time::timeout(
Duration::from_secs(60),
kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned),
)
.await;
// Always remove from tracking set when done, regardless of outcome
{
let mut indexing_set = file_key.write().await;
indexing_set.remove(&kb_key_owned);
}
let kb_prefix = format!("{}/", gbkb_prefix_owned);
let kb_folder_name = kb_folder_owned
.strip_prefix(&work_root)
.ok()
.and_then(|p| p.to_str())
.unwrap_or("")
.trim_start_matches(&format!("{}/", bot_name_for_spawn))
.trim_start_matches(&kb_prefix)
.to_string();
{
let mut states = file_states.write().await;
for (path, state) in states.iter_mut() {
if path.starts_with(&format!("{}{}/", kb_prefix, kb_folder_name)) {
match &result {
Ok(Ok(_)) => {
state.indexed = true;
state.fail_count = 0;
state.last_failed_at = None;
}
Ok(Err(_)) | Err(_) => {
state.fail_count = state.fail_count.saturating_add(1);
state.last_failed_at = Some(chrono::Utc::now());
let states = self.file_states.read().await;
let kb_folder_pattern = format!("{}/", kb_name);
let kb_states: Vec<_> = states.iter()
.filter(|(p, _)| p.contains(&kb_folder_pattern))
.collect();
if !kb_states.is_empty() {
let max_fail_count = kb_states.iter()
.map(|(_, s)| s.fail_count)
.max()
.unwrap_or(0);
if max_fail_count > 0 {
let wait_seconds = match max_fail_count {
1 => 300,
2 => 900,
_ => 3600,
};
if let Some(last_failed) = kb_states.iter()
.filter_map(|(_, s)| s.last_failed_at)
.max()
{
let elapsed = chrono::Utc::now() - last_failed;
if elapsed.num_seconds() < wait_seconds {
trace!("[DRIVE_MONITOR] KB {} in backoff (fail_count={}), skipping",
kb_key, max_fail_count);
should_skip = true;
}
}
}
}
}
}
let states_clone = Arc::clone(&file_states);
let work_root_clone = work_root.clone();
let bucket_name_clone = bucket_name.clone();
tokio::spawn(async move {
if let Err(e) = Self::save_file_states_static(&states_clone, &work_root_clone, &bucket_name_clone).await {
warn!("Failed to save file states after indexing update: {}", e);
}
});
}
match result {
Ok(Ok(_)) => {
debug!(
"Successfully processed KB change for {}/{}",
bot_name_owned, kb_name_owned
);
}
Ok(Err(e)) => {
log::error!(
"Failed to process .gbkb change for {}/{}: {}",
bot_name_owned,
kb_name_owned,
e
);
}
Err(_) => {
log::error!(
"KB indexing timed out after 60s for {}/{}",
bot_name_owned, kb_name_owned
);
}
}
});
if !should_skip {
// Check if already being indexed
let indexing_set = self.files_being_indexed.read().await;
let already_indexing = indexing_set.contains(&kb_key);
drop(indexing_set);
if !already_indexing {
// Queue for background KB processor - no blocking!
let mut pending = self.pending_kb_index.write().await;
if pending.insert(kb_key.clone()) {
trace!("[DRIVE_MONITOR] Queued KB {} for indexing (non-blocking)", kb_key);
}
}
}
}
#[cfg(not(any(feature = "research", feature = "llm")))]
{
let _ = kb_folder_path;
debug!(
"KB indexing disabled because research/llm features are not enabled"
);
let _ = kb_name;
debug!("KB indexing disabled (research/llm features not enabled)");
}
}
}