From 3bb115266b80bd9154c75d7692f95373e7452130 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Thu, 19 Mar 2026 19:51:28 -0300 Subject: [PATCH] feat: Add GUID prefix to Qdrant collection names for KB security isolation --- src/basic/keywords/detect.rs | 159 +++++++++++++++++++++++++ src/basic/keywords/mod.rs | 2 + src/basic/keywords/think_kb.rs | 6 +- src/basic/keywords/use_kb.rs | 4 +- src/basic/keywords/use_website.rs | 4 +- src/basic/mod.rs | 2 + src/core/bot/kb_context.rs | 8 +- src/core/bot/mod.rs | 1 + src/core/kb/kb_indexer.rs | 10 +- src/core/kb/mod.rs | 27 +++-- src/core/kb/website_crawler_service.rs | 4 +- src/drive/drive_monitor/mod.rs | 5 +- src/drive/local_file_monitor.rs | 85 +++++++++++++ 13 files changed, 292 insertions(+), 25 deletions(-) create mode 100644 src/basic/keywords/detect.rs diff --git a/src/basic/keywords/detect.rs b/src/basic/keywords/detect.rs new file mode 100644 index 00000000..fe959202 --- /dev/null +++ b/src/basic/keywords/detect.rs @@ -0,0 +1,159 @@ +use crate::core::shared::models::UserSession; +use crate::core::shared::state::AppState; +use diesel::prelude::*; +use diesel::sql_types::*; +use log::error; +use rhai::{Dynamic, Engine}; +use serde_json::Value; +use std::sync::Arc; + +#[derive(Debug, QueryableByName)] +struct ColumnRow { + #[diesel(sql_type = Text)] + column_name: String, +} + +pub fn detect_keyword(state: Arc, _user: UserSession, engine: &mut Engine) { + let state_clone = Arc::clone(&state); + + engine + .register_custom_syntax(["DETECT", "$expr$"], false, move |context, inputs| { + let first_input = inputs.first().ok_or_else(|| { + Box::new(rhai::EvalAltResult::ErrorRuntime( + "DETECT requires a table name".into(), + rhai::Position::NONE, + )) + })?; + let table_name = context.eval_expression_tree(first_input)?.to_string(); + + let state_for_thread = Arc::clone(&state_clone); + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + detect_anomalies_in_table(state_for_thread, &table_name).await + }); + tx.send(result).err() + } else { + tx.send(Err("failed to build tokio runtime".into())).err() + }; + + if send_err.is_some() { + error!("Failed to send DETECT thread result"); + } + }); + + match rx.recv_timeout(std::time::Duration::from_secs(300)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + e.to_string().into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "Detection timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("DETECT thread failed: {e}").into(), + rhai::Position::NONE, + ))), + } + }) + .expect("valid syntax registration"); +} + +async fn detect_anomalies_in_table( + state: Arc, + table_name: &str, +) -> Result> { + let columns = get_table_columns(&state, table_name)?; + let value_field = find_numeric_field(&columns); + + #[derive(QueryableByName)] + struct JsonRow { + #[diesel(sql_type = Text)] + data: String, + } + + let column_list = columns.join(", "); + let query = format!( + "SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 500) t", + column_list, table_name + ); + + let rows: Vec = diesel::sql_query(&query) + .load(&mut state.conn.get()?)?; + + let records: Vec = rows + .into_iter() + .filter_map(|row| serde_json::from_str(&row.data).ok()) + .collect(); + + if records.is_empty() { + return Err(format!("No data found in table {}", table_name).into()); + } + + let botmodels_host = + std::env::var("BOTMODELS_HOST").unwrap_or_else(|_| "http://localhost:8085".to_string()); + let botmodels_key = + std::env::var("BOTMODELS_API_KEY").unwrap_or_else(|_| "starter".to_string()); + + let client = reqwest::Client::new(); + let response = client + .post(format!("{}/api/anomaly/detect", botmodels_host)) + .header("X-API-Key", &botmodels_key) + .json(&serde_json::json!({ + "data": records, + "value_field": value_field + })) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await?; + return Err(format!("BotModels error: {}", error_text).into()); + } + + let result: Value = response.json().await?; + Ok(result.to_string()) +} + +fn get_table_columns( + state: &Arc, + table_name: &str, +) -> Result, Box> { + let query = format!( + "SELECT column_name FROM information_schema.columns WHERE table_name = '{}' ORDER BY ordinal_position", + table_name + ); + + let rows: Vec = diesel::sql_query(&query) + .load(&mut state.conn.get()?)?; + + Ok(rows.into_iter().map(|r| r.column_name).collect()) +} + +fn find_numeric_field(columns: &[String]) -> String { + let numeric_keywords = ["salario", "salary", "valor", "value", "amount", "preco", "price", + "temperatura", "temp", "pressao", "pressure", "quantidade", "quantity", + "decimal", "numerico", "numeric", "base", "liquido", "bruto"]; + + for col in columns { + let col_lower = col.to_lowercase(); + for keyword in &numeric_keywords { + if col_lower.contains(keyword) { + return col.clone(); + } + } + } + + columns.first().cloned().unwrap_or_else(|| "value".to_string()) +} diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index d563b404..0c56f1ef 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -49,6 +49,7 @@ pub mod llm_keyword; #[cfg(feature = "llm")] pub mod llm_macros; pub mod math; +pub mod detect; #[cfg(feature = "automation")] pub mod mcp_client; #[cfg(feature = "automation")] @@ -207,6 +208,7 @@ pub fn get_all_keywords() -> Vec { "CLEAR TOOLS".to_string(), "CREATE SITE".to_string(), "CREATE TASK".to_string(), + "DETECT".to_string(), "USE TOOL".to_string(), "AGGREGATE".to_string(), "DELETE".to_string(), diff --git a/src/basic/keywords/think_kb.rs b/src/basic/keywords/think_kb.rs index d84b9ac7..fc8290c0 100644 --- a/src/basic/keywords/think_kb.rs +++ b/src/basic/keywords/think_kb.rs @@ -39,6 +39,7 @@ pub fn register_think_kb_keyword( ); let session_id = session_clone.id; + let bot_id = session_clone.bot_id; let bot_name = session_clone.bot_name.clone(); let kb_manager = match &state_clone.kb_manager { Some(manager) => Arc::clone(manager), @@ -52,7 +53,7 @@ pub fn register_think_kb_keyword( // Execute KB search in blocking thread let result = std::thread::spawn(move || { tokio::runtime::Handle::current().block_on(async { - think_kb_search(kb_manager, db_pool, session_id, &bot_name, &query).await + think_kb_search(kb_manager, db_pool, session_id, bot_id, &bot_name, &query).await }) }) .join(); @@ -92,6 +93,7 @@ async fn think_kb_search( kb_manager: Arc, db_pool: crate::core::shared::utils::DbPool, session_id: uuid::Uuid, + bot_id: uuid::Uuid, bot_name: &str, query: &str, ) -> Result { @@ -99,7 +101,7 @@ async fn think_kb_search( // Search active KBs with reasonable limits let kb_contexts = context_manager - .search_active_kbs(session_id, bot_name, query, 10, 2000) + .search_active_kbs(session_id, bot_id, bot_name, query, 10, 2000) .await .map_err(|e| format!("KB search failed: {}", e))?; diff --git a/src/basic/keywords/use_kb.rs b/src/basic/keywords/use_kb.rs index 51acd8a5..78260ca6 100644 --- a/src/basic/keywords/use_kb.rs +++ b/src/basic/keywords/use_kb.rs @@ -103,14 +103,14 @@ fn add_kb_to_session( (kb_result.folder_path, kb_result.qdrant_collection) } else { let default_path = format!("work/{}/{}.gbkb/{}", bot_name, bot_name, kb_name); - let default_collection = format!("{}_{}", bot_name, kb_name); + let kb_id = Uuid::new_v4(); + let default_collection = format!("{}_{}_{}", bot_name, kb_id, kb_name); warn!( "KB '{}' not found in kb_collections for bot {}. Using default path: {}", kb_name, bot_name, default_path ); - let kb_id = Uuid::new_v4(); diesel::sql_query( "INSERT INTO kb_collections (id, bot_id, name, folder_path, qdrant_collection, document_count) VALUES ($1, $2, $3, $4, $5, 0) diff --git a/src/basic/keywords/use_website.rs b/src/basic/keywords/use_website.rs index cbf9993e..2d0d1779 100644 --- a/src/basic/keywords/use_website.rs +++ b/src/basic/keywords/use_website.rs @@ -432,9 +432,11 @@ fn associate_website_with_session_refresh( .get_result(&mut conn) .map_err(|e| format!("Failed to get bot name: {}", e))?; + let bot_id_short = user.bot_id.to_string().chars().take(8).collect::(); let collection_name = format!( - "{}_website_{}", + "{}_{}_website_{}", bot_name_result.name, + bot_id_short, sanitize_url_for_collection(url) ); diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 93925035..9cb12885 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -55,6 +55,7 @@ use self::keywords::web_data::register_web_data_keywords; use self::keywords::webhook::webhook_keyword; #[cfg(feature = "llm")] use self::keywords::llm_keyword::llm_keyword; +use self::keywords::detect::detect_keyword; use self::keywords::on::on_keyword; use self::keywords::print::print_keyword; use self::keywords::set::set_keyword; @@ -146,6 +147,7 @@ impl ScriptService { format_keyword(&mut engine); #[cfg(feature = "llm")] llm_keyword(state.clone(), user.clone(), &mut engine); + detect_keyword(state.clone(), user.clone(), &mut engine); get_keyword(state.clone(), user.clone(), &mut engine); set_keyword(&state, user.clone(), &mut engine); wait_keyword(&state, user.clone(), &mut engine); diff --git a/src/core/bot/kb_context.rs b/src/core/bot/kb_context.rs index 0cf12ce9..81d82e0a 100644 --- a/src/core/bot/kb_context.rs +++ b/src/core/bot/kb_context.rs @@ -123,6 +123,7 @@ impl KbContextManager { pub async fn search_active_kbs( &self, session_id: Uuid, + bot_id: Uuid, bot_name: &str, query: &str, max_results_per_kb: usize, @@ -153,6 +154,7 @@ impl KbContextManager { match self .search_single_kb( + bot_id, bot_name, &kb_assoc.kb_name, query, @@ -352,6 +354,7 @@ impl KbContextManager { async fn search_single_kb( &self, + bot_id: Uuid, bot_name: &str, kb_name: &str, query: &str, @@ -362,7 +365,7 @@ impl KbContextManager { let search_results = self .kb_manager - .search(bot_name, kb_name, query, max_results) + .search(bot_id, bot_name, kb_name, query, max_results) .await?; let mut kb_search_results = Vec::new(); @@ -468,6 +471,7 @@ pub async fn inject_kb_context( kb_manager: Arc, db_pool: DbPool, session_id: Uuid, + bot_id: Uuid, bot_name: &str, user_query: &str, messages: &mut serde_json::Value, @@ -476,7 +480,7 @@ pub async fn inject_kb_context( let context_manager = KbContextManager::new(kb_manager.clone(), db_pool.clone()); let kb_contexts = context_manager - .search_active_kbs(session_id, bot_name, user_query, 5, max_context_tokens / 2) + .search_active_kbs(session_id, bot_id, bot_name, user_query, 5, max_context_tokens / 2) .await?; let website_contexts = context_manager diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 4c063a04..6645d0de 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -692,6 +692,7 @@ impl BotOrchestrator { kb_manager.clone(), self.state.conn.clone(), session_id, + session.bot_id, &bot_name_for_context, &message_content, &mut messages, diff --git a/src/core/kb/kb_indexer.rs b/src/core/kb/kb_indexer.rs index b4d6bce6..036253fa 100644 --- a/src/core/kb/kb_indexer.rs +++ b/src/core/kb/kb_indexer.rs @@ -116,6 +116,7 @@ impl KbIndexer { pub async fn index_kb_folder( &self, + bot_id: Uuid, bot_name: &str, kb_name: &str, kb_path: &Path, @@ -145,12 +146,13 @@ impl KbIndexer { self.qdrant_config.url ); return Err(anyhow::anyhow!( - "Qdrant vector database not available at {}. Start the vector_db service to enable KB indexing.", + "Qdrant vector database is not available at {}. Start the vector_db service to enable KB indexing.", self.qdrant_config.url )); } - let collection_name = format!("{}_{}", bot_name, kb_name); + let bot_id_short = bot_id.to_string().chars().take(8).collect::(); + let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); self.ensure_collection_exists(&collection_name).await?; @@ -746,7 +748,7 @@ impl KbFolderMonitor { Self { indexer, work_root } } - pub async fn process_gbkb_folder(&self, bot_name: &str, kb_folder: &Path) -> Result<()> { + pub async fn process_gbkb_folder(&self, bot_id: Uuid, bot_name: &str, kb_folder: &Path) -> Result<()> { let kb_name = kb_folder .file_name() .and_then(|n| n.to_str()) @@ -762,7 +764,7 @@ impl KbFolderMonitor { let result = self .indexer - .index_kb_folder(bot_name, kb_name, &local_path) + .index_kb_folder(bot_id, bot_name, kb_name, &local_path) .await?; info!( diff --git a/src/core/kb/mod.rs b/src/core/kb/mod.rs index 9d12ff7e..3f515fc5 100644 --- a/src/core/kb/mod.rs +++ b/src/core/kb/mod.rs @@ -18,6 +18,7 @@ use log::{error, info, warn}; use std::path::Path; use std::sync::Arc; use tokio::sync::RwLock; +use uuid::Uuid; #[derive(Debug)] pub struct KnowledgeBaseManager { @@ -48,6 +49,7 @@ impl KnowledgeBaseManager { pub async fn index_kb_folder( &self, + bot_id: Uuid, bot_name: &str, kb_name: &str, kb_path: &Path, @@ -61,7 +63,7 @@ impl KnowledgeBaseManager { let result = self .indexer - .index_kb_folder(bot_name, kb_name, kb_path) + .index_kb_folder(bot_id, bot_name, kb_name, kb_path) .await?; info!( @@ -74,12 +76,14 @@ impl KnowledgeBaseManager { pub async fn search( &self, + bot_id: Uuid, bot_name: &str, kb_name: &str, query: &str, limit: usize, ) -> Result> { - let collection_name = format!("{}_{}", bot_name, kb_name); + let bot_id_short = bot_id.to_string().chars().take(8).collect::(); + let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); self.indexer.search(&collection_name, query, limit).await } @@ -96,7 +100,7 @@ impl KnowledgeBaseManager { self.processor.process_document(file_path).await } - pub async fn handle_gbkb_change(&self, bot_name: &str, kb_folder: &Path) -> Result<()> { + pub async fn handle_gbkb_change(&self, bot_id: Uuid, bot_name: &str, kb_folder: &Path) -> Result<()> { info!( "Handling .gbkb folder change for bot {} at {}", bot_name, @@ -104,11 +108,12 @@ impl KnowledgeBaseManager { ); let monitor = self.monitor.read().await; - monitor.process_gbkb_folder(bot_name, kb_folder).await + monitor.process_gbkb_folder(bot_id, bot_name, kb_folder).await } - pub async fn clear_kb(&self, bot_name: &str, kb_name: &str) -> Result<()> { - let collection_name = format!("{}_{}", bot_name, kb_name); + pub async fn clear_kb(&self, bot_id: Uuid, bot_name: &str, kb_name: &str) -> Result<()> { + let bot_id_short = bot_id.to_string().chars().take(8).collect::(); + let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); warn!("Clearing knowledge base collection: {}", collection_name); @@ -124,8 +129,9 @@ impl KnowledgeBaseManager { } } - pub async fn get_kb_stats(&self, bot_name: &str, kb_name: &str) -> Result { - let collection_name = format!("{}_{}", bot_name, kb_name); + pub async fn get_kb_stats(&self, bot_id: Uuid, bot_name: &str, kb_name: &str) -> Result { + let bot_id_short = bot_id.to_string().chars().take(8).collect::(); + let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name); let collection_info = self.indexer.get_collection_info(&collection_name).await?; @@ -168,6 +174,7 @@ impl DriveMonitorIntegration { pub async fn on_gbkb_folder_changed( &self, + bot_id: Uuid, bot_name: &str, folder_path: &Path, change_type: ChangeType, @@ -180,12 +187,12 @@ impl DriveMonitorIntegration { folder_path.display() ); self.kb_manager - .handle_gbkb_change(bot_name, folder_path) + .handle_gbkb_change(bot_id, bot_name, folder_path) .await } ChangeType::Deleted => { if let Some(kb_name) = folder_path.file_name().and_then(|n| n.to_str()) { - self.kb_manager.clear_kb(bot_name, kb_name).await + self.kb_manager.clear_kb(bot_id, bot_name, kb_name).await } else { Err(anyhow::anyhow!("Invalid KB folder path")) } diff --git a/src/core/kb/website_crawler_service.rs b/src/core/kb/website_crawler_service.rs index 352cb638..b58026d3 100644 --- a/src/core/kb/website_crawler_service.rs +++ b/src/core/kb/website_crawler_service.rs @@ -281,7 +281,7 @@ impl WebsiteCrawlerService { // Process this batch immediately to free memory if batch_idx == 0 || (batch_idx + 1) % 2 == 0 { // Index every 2 batches to prevent memory buildup - match bot_indexer.index_kb_folder(&bot_name, &kb_name, &work_path).await { + match bot_indexer.index_kb_folder(website.bot_id, &bot_name, &kb_name, &work_path).await { Ok(result) => trace!("Indexed batch {} successfully: {} docs, {} chunks", batch_idx + 1, result.documents_processed, result.chunks_indexed), Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e), @@ -294,7 +294,7 @@ impl WebsiteCrawlerService { // Final indexing for any remaining content bot_indexer - .index_kb_folder(&bot_name, &kb_name, &work_path) + .index_kb_folder(website.bot_id, &bot_name, &kb_name, &work_path) .await?; config.calculate_next_crawl(); diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 999c67cb..38f2b0df 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -1171,6 +1171,7 @@ impl DriveMonitor { } let kb_manager = Arc::clone(&self.kb_manager); + let bot_id = self.bot_id; let bot_name_owned = bot_name.to_string(); let kb_name_owned = kb_name.to_string(); let kb_folder_owned = kb_folder_path.clone(); @@ -1185,7 +1186,7 @@ impl DriveMonitor { let result = tokio::time::timeout( Duration::from_secs(KB_INDEXING_TIMEOUT_SECS), - kb_manager.handle_gbkb_change(&bot_name_owned, &kb_folder_owned), + kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned), ) .await; @@ -1274,7 +1275,7 @@ impl DriveMonitor { let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name); if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) { #[cfg(any(feature = "research", feature = "llm"))] - if let Err(e) = self.kb_manager.clear_kb(bot_name, kb_name).await { + if let Err(e) = self.kb_manager.clear_kb(self.bot_id, bot_name, kb_name).await { log::error!("Failed to clear KB {}: {}", kb_name, e); } diff --git a/src/drive/local_file_monitor.rs b/src/drive/local_file_monitor.rs index a5a7c2d1..96f1bbc9 100644 --- a/src/drive/local_file_monitor.rs +++ b/src/drive/local_file_monitor.rs @@ -1,4 +1,5 @@ use crate::basic::compiler::BasicCompiler; +use crate::core::kb::{EmbeddingConfig, KnowledgeBaseManager}; use crate::core::shared::state::AppState; use diesel::prelude::*; use log::{debug, error, info, trace, warn}; @@ -26,6 +27,8 @@ pub struct LocalFileMonitor { work_root: PathBuf, file_states: Arc>>, is_processing: Arc, + #[cfg(any(feature = "research", feature = "llm"))] + kb_manager: Option>, } impl LocalFileMonitor { @@ -38,6 +41,15 @@ impl LocalFileMonitor { // Use /opt/gbo/data as the base directory for source files let data_dir = PathBuf::from("/opt/gbo/data"); + #[cfg(any(feature = "research", feature = "llm"))] + let kb_manager = match &state.kb_manager { + Some(km) => Some(Arc::clone(km)), + None => { + debug!("KB manager not available in LocalFileMonitor"); + None + } + }; + trace!("Initializing with data_dir: {:?}, work_root: {:?}", data_dir, work_root); Self { @@ -46,6 +58,8 @@ impl LocalFileMonitor { work_root, file_states: Arc::new(RwLock::new(HashMap::new())), is_processing: Arc::new(AtomicBool::new(false)), + #[cfg(any(feature = "research", feature = "llm"))] + kb_manager, } } @@ -191,6 +205,75 @@ impl LocalFileMonitor { if gbdialog_path.exists() { self.compile_gbdialog(bot_name, &gbdialog_path).await?; } + + // Index .gbkb folders + #[cfg(any(feature = "research", feature = "llm"))] + { + if let Some(ref kb_manager) = self.kb_manager { + let gbkb_path = path.join(format!("{}.gbkb", bot_name)); + if gbkb_path.exists() { + if let Err(e) = self.index_gbkb_folder(bot_name, &gbkb_path, kb_manager).await { + error!("Failed to index .gbkb folder {:?}: {}", gbkb_path, e); + } + } + } + } + } + } + + Ok(()) + } + + #[cfg(any(feature = "research", feature = "llm"))] + async fn index_gbkb_folder( + &self, + bot_name: &str, + gbkb_path: &Path, + _kb_manager: &Arc, + ) -> Result<(), Box> { + info!("Indexing .gbkb folder for bot {}: {:?}", bot_name, gbkb_path); + + // Get bot_id from database + let bot_id = { + use crate::core::shared::models::schema::bots::dsl::*; + let mut conn = self.state.conn.get() + .map_err(|e| format!("Failed to get DB connection: {}", e))?; + + bots.filter(name.eq(bot_name)) + .select(id) + .first::(&mut *conn) + .map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name, e))? + }; + + // Load bot-specific embedding config from database + let embedding_config = EmbeddingConfig::from_bot_config(&self.state.conn, &bot_id); + info!("Using embedding config for bot '{}': URL={}, model={}", + bot_name, embedding_config.embedding_url, embedding_config.embedding_model); + + // Create a temporary KbIndexer with the bot-specific config + let qdrant_config = crate::core::kb::QdrantConfig::default(); + let indexer = crate::core::kb::KbIndexer::new(embedding_config, qdrant_config); + + // Index each KB folder inside .gbkb (e.g., carta, proc) + let entries = tokio::fs::read_dir(gbkb_path).await?; + let mut entries = entries; + + while let Some(entry) = entries.next_entry().await? { + let kb_folder_path = entry.path(); + + if kb_folder_path.is_dir() { + if let Some(kb_name) = kb_folder_path.file_name().and_then(|n| n.to_str()) { + info!("Indexing KB '{}' for bot '{}'", kb_name, bot_name); + + if let Err(e) = indexer.index_kb_folder( + bot_id, + bot_name, + kb_name, + &kb_folder_path, + ).await { + error!("Failed to index KB '{}' for bot '{}': {}", kb_name, bot_name, e); + } + } } } @@ -327,6 +410,8 @@ impl Clone for LocalFileMonitor { work_root: self.work_root.clone(), file_states: Arc::clone(&self.file_states), is_processing: Arc::clone(&self.is_processing), + #[cfg(any(feature = "research", feature = "llm"))] + kb_manager: self.kb_manager.clone(), } } }