fix: DEADLOCK in check_gbkb_changes - removed nested file_states read lock
All checks were successful
BotServer CI/CD / build (push) Successful in 3m44s
All checks were successful
BotServer CI/CD / build (push) Successful in 3m44s
Root cause: file_states.write().await was held while trying to acquire file_states.read().await for KB backoff check. Tokio RwLock is not reentrant - this caused permanent deadlock. Fix: Removed the file_states.read() backoff check. KB processor now just checks files_being_indexed set and queues to pending_kb_index. Backoff is handled by the KB processor itself based on fail_count. This fixes salesianos DriveMonitor hanging for 5+ minutes every cycle. Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
parent
3322234712
commit
666acb9360
1 changed files with 12 additions and 45 deletions
|
|
@ -1540,8 +1540,10 @@ impl DriveMonitor {
|
|||
);
|
||||
}
|
||||
|
||||
debug!("[GBKB] Pushing to download queue: {}", path);
|
||||
files_to_process.push(path.clone());
|
||||
files_processed += 1;
|
||||
debug!("[GBKB] Queue size: {}/10", files_to_process.len());
|
||||
|
||||
if files_to_process.len() >= 10 {
|
||||
debug!("[GBKB] Downloading batch of {} files", files_to_process.len());
|
||||
|
|
@ -1554,6 +1556,7 @@ impl DriveMonitor {
|
|||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Queue KB folder for indexing - no read lock needed here
|
||||
let path_parts: Vec<&str> = path.split('/').collect();
|
||||
if path_parts.len() >= 3 {
|
||||
let kb_name = path_parts[1];
|
||||
|
|
@ -1561,52 +1564,16 @@ impl DriveMonitor {
|
|||
|
||||
#[cfg(any(feature = "research", feature = "llm"))]
|
||||
{
|
||||
// Check fail_count for backoff
|
||||
let mut should_skip = false;
|
||||
{
|
||||
let states = self.file_states.read().await;
|
||||
let kb_folder_pattern = format!("{}/", kb_name);
|
||||
let kb_states: Vec<_> = states.iter()
|
||||
.filter(|(p, _)| p.contains(&kb_folder_pattern))
|
||||
.collect();
|
||||
if !kb_states.is_empty() {
|
||||
let max_fail_count = kb_states.iter()
|
||||
.map(|(_, s)| s.fail_count)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
if max_fail_count > 0 {
|
||||
let wait_seconds = match max_fail_count {
|
||||
1 => 300,
|
||||
2 => 900,
|
||||
_ => 3600,
|
||||
};
|
||||
if let Some(last_failed) = kb_states.iter()
|
||||
.filter_map(|(_, s)| s.last_failed_at)
|
||||
.max()
|
||||
{
|
||||
let elapsed = chrono::Utc::now() - last_failed;
|
||||
if elapsed.num_seconds() < wait_seconds {
|
||||
trace!("[DRIVE_MONITOR] KB {} in backoff (fail_count={}), skipping",
|
||||
kb_key, max_fail_count);
|
||||
should_skip = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check if already being indexed (no read lock on file_states needed)
|
||||
let indexing_set = self.files_being_indexed.read().await;
|
||||
let already_indexing = indexing_set.contains(&kb_key);
|
||||
drop(indexing_set);
|
||||
|
||||
if !should_skip {
|
||||
// Check if already being indexed
|
||||
let indexing_set = self.files_being_indexed.read().await;
|
||||
let already_indexing = indexing_set.contains(&kb_key);
|
||||
drop(indexing_set);
|
||||
|
||||
if !already_indexing {
|
||||
// Queue for background KB processor - no blocking!
|
||||
let mut pending = self.pending_kb_index.write().await;
|
||||
if pending.insert(kb_key.clone()) {
|
||||
trace!("[DRIVE_MONITOR] Queued KB {} for indexing (non-blocking)", kb_key);
|
||||
}
|
||||
if !already_indexing {
|
||||
// Queue for background KB processor - no blocking!
|
||||
let mut pending = self.pending_kb_index.write().await;
|
||||
if pending.insert(kb_key.clone()) {
|
||||
debug!("[GBKB] Queued KB {} for indexing (non-blocking)", kb_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue