feat: Add GUID prefix to Qdrant collection names for KB security isolation
Some checks failed
BotServer CI / build (push) Failing after 1s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-19 19:51:28 -03:00
parent 3db53dfd42
commit 3bb115266b
13 changed files with 292 additions and 25 deletions

View file

@ -0,0 +1,159 @@
use crate::core::shared::models::UserSession;
use crate::core::shared::state::AppState;
use diesel::prelude::*;
use diesel::sql_types::*;
use log::error;
use rhai::{Dynamic, Engine};
use serde_json::Value;
use std::sync::Arc;
#[derive(Debug, QueryableByName)]
struct ColumnRow {
#[diesel(sql_type = Text)]
column_name: String,
}
pub fn detect_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state);
engine
.register_custom_syntax(["DETECT", "$expr$"], false, move |context, inputs| {
let first_input = inputs.first().ok_or_else(|| {
Box::new(rhai::EvalAltResult::ErrorRuntime(
"DETECT requires a table name".into(),
rhai::Position::NONE,
))
})?;
let table_name = context.eval_expression_tree(first_input)?.to_string();
let state_for_thread = Arc::clone(&state_clone);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build();
let send_err = if let Ok(rt) = rt {
let result = rt.block_on(async move {
detect_anomalies_in_table(state_for_thread, &table_name).await
});
tx.send(result).err()
} else {
tx.send(Err("failed to build tokio runtime".into())).err()
};
if send_err.is_some() {
error!("Failed to send DETECT thread result");
}
});
match rx.recv_timeout(std::time::Duration::from_secs(300)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
e.to_string().into(),
rhai::Position::NONE,
))),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"Detection timed out".into(),
rhai::Position::NONE,
)))
}
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("DETECT thread failed: {e}").into(),
rhai::Position::NONE,
))),
}
})
.expect("valid syntax registration");
}
async fn detect_anomalies_in_table(
state: Arc<AppState>,
table_name: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let columns = get_table_columns(&state, table_name)?;
let value_field = find_numeric_field(&columns);
#[derive(QueryableByName)]
struct JsonRow {
#[diesel(sql_type = Text)]
data: String,
}
let column_list = columns.join(", ");
let query = format!(
"SELECT row_to_json(t)::text as data FROM (SELECT {} FROM {} LIMIT 500) t",
column_list, table_name
);
let rows: Vec<JsonRow> = diesel::sql_query(&query)
.load(&mut state.conn.get()?)?;
let records: Vec<Value> = rows
.into_iter()
.filter_map(|row| serde_json::from_str(&row.data).ok())
.collect();
if records.is_empty() {
return Err(format!("No data found in table {}", table_name).into());
}
let botmodels_host =
std::env::var("BOTMODELS_HOST").unwrap_or_else(|_| "http://localhost:8085".to_string());
let botmodels_key =
std::env::var("BOTMODELS_API_KEY").unwrap_or_else(|_| "starter".to_string());
let client = reqwest::Client::new();
let response = client
.post(format!("{}/api/anomaly/detect", botmodels_host))
.header("X-API-Key", &botmodels_key)
.json(&serde_json::json!({
"data": records,
"value_field": value_field
}))
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
return Err(format!("BotModels error: {}", error_text).into());
}
let result: Value = response.json().await?;
Ok(result.to_string())
}
fn get_table_columns(
state: &Arc<AppState>,
table_name: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
let query = format!(
"SELECT column_name FROM information_schema.columns WHERE table_name = '{}' ORDER BY ordinal_position",
table_name
);
let rows: Vec<ColumnRow> = diesel::sql_query(&query)
.load(&mut state.conn.get()?)?;
Ok(rows.into_iter().map(|r| r.column_name).collect())
}
fn find_numeric_field(columns: &[String]) -> String {
let numeric_keywords = ["salario", "salary", "valor", "value", "amount", "preco", "price",
"temperatura", "temp", "pressao", "pressure", "quantidade", "quantity",
"decimal", "numerico", "numeric", "base", "liquido", "bruto"];
for col in columns {
let col_lower = col.to_lowercase();
for keyword in &numeric_keywords {
if col_lower.contains(keyword) {
return col.clone();
}
}
}
columns.first().cloned().unwrap_or_else(|| "value".to_string())
}

View file

@ -49,6 +49,7 @@ pub mod llm_keyword;
#[cfg(feature = "llm")] #[cfg(feature = "llm")]
pub mod llm_macros; pub mod llm_macros;
pub mod math; pub mod math;
pub mod detect;
#[cfg(feature = "automation")] #[cfg(feature = "automation")]
pub mod mcp_client; pub mod mcp_client;
#[cfg(feature = "automation")] #[cfg(feature = "automation")]
@ -207,6 +208,7 @@ pub fn get_all_keywords() -> Vec<String> {
"CLEAR TOOLS".to_string(), "CLEAR TOOLS".to_string(),
"CREATE SITE".to_string(), "CREATE SITE".to_string(),
"CREATE TASK".to_string(), "CREATE TASK".to_string(),
"DETECT".to_string(),
"USE TOOL".to_string(), "USE TOOL".to_string(),
"AGGREGATE".to_string(), "AGGREGATE".to_string(),
"DELETE".to_string(), "DELETE".to_string(),

View file

@ -39,6 +39,7 @@ pub fn register_think_kb_keyword(
); );
let session_id = session_clone.id; let session_id = session_clone.id;
let bot_id = session_clone.bot_id;
let bot_name = session_clone.bot_name.clone(); let bot_name = session_clone.bot_name.clone();
let kb_manager = match &state_clone.kb_manager { let kb_manager = match &state_clone.kb_manager {
Some(manager) => Arc::clone(manager), Some(manager) => Arc::clone(manager),
@ -52,7 +53,7 @@ pub fn register_think_kb_keyword(
// Execute KB search in blocking thread // Execute KB search in blocking thread
let result = std::thread::spawn(move || { let result = std::thread::spawn(move || {
tokio::runtime::Handle::current().block_on(async { tokio::runtime::Handle::current().block_on(async {
think_kb_search(kb_manager, db_pool, session_id, &bot_name, &query).await think_kb_search(kb_manager, db_pool, session_id, bot_id, &bot_name, &query).await
}) })
}) })
.join(); .join();
@ -92,6 +93,7 @@ async fn think_kb_search(
kb_manager: Arc<KnowledgeBaseManager>, kb_manager: Arc<KnowledgeBaseManager>,
db_pool: crate::core::shared::utils::DbPool, db_pool: crate::core::shared::utils::DbPool,
session_id: uuid::Uuid, session_id: uuid::Uuid,
bot_id: uuid::Uuid,
bot_name: &str, bot_name: &str,
query: &str, query: &str,
) -> Result<serde_json::Value, String> { ) -> Result<serde_json::Value, String> {
@ -99,7 +101,7 @@ async fn think_kb_search(
// Search active KBs with reasonable limits // Search active KBs with reasonable limits
let kb_contexts = context_manager let kb_contexts = context_manager
.search_active_kbs(session_id, bot_name, query, 10, 2000) .search_active_kbs(session_id, bot_id, bot_name, query, 10, 2000)
.await .await
.map_err(|e| format!("KB search failed: {}", e))?; .map_err(|e| format!("KB search failed: {}", e))?;

View file

@ -103,14 +103,14 @@ fn add_kb_to_session(
(kb_result.folder_path, kb_result.qdrant_collection) (kb_result.folder_path, kb_result.qdrant_collection)
} else { } else {
let default_path = format!("work/{}/{}.gbkb/{}", bot_name, bot_name, kb_name); let default_path = format!("work/{}/{}.gbkb/{}", bot_name, bot_name, kb_name);
let default_collection = format!("{}_{}", bot_name, kb_name); let kb_id = Uuid::new_v4();
let default_collection = format!("{}_{}_{}", bot_name, kb_id, kb_name);
warn!( warn!(
"KB '{}' not found in kb_collections for bot {}. Using default path: {}", "KB '{}' not found in kb_collections for bot {}. Using default path: {}",
kb_name, bot_name, default_path kb_name, bot_name, default_path
); );
let kb_id = Uuid::new_v4();
diesel::sql_query( diesel::sql_query(
"INSERT INTO kb_collections (id, bot_id, name, folder_path, qdrant_collection, document_count) "INSERT INTO kb_collections (id, bot_id, name, folder_path, qdrant_collection, document_count)
VALUES ($1, $2, $3, $4, $5, 0) VALUES ($1, $2, $3, $4, $5, 0)

View file

@ -432,9 +432,11 @@ fn associate_website_with_session_refresh(
.get_result(&mut conn) .get_result(&mut conn)
.map_err(|e| format!("Failed to get bot name: {}", e))?; .map_err(|e| format!("Failed to get bot name: {}", e))?;
let bot_id_short = user.bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!( let collection_name = format!(
"{}_website_{}", "{}_{}_website_{}",
bot_name_result.name, bot_name_result.name,
bot_id_short,
sanitize_url_for_collection(url) sanitize_url_for_collection(url)
); );

View file

@ -55,6 +55,7 @@ use self::keywords::web_data::register_web_data_keywords;
use self::keywords::webhook::webhook_keyword; use self::keywords::webhook::webhook_keyword;
#[cfg(feature = "llm")] #[cfg(feature = "llm")]
use self::keywords::llm_keyword::llm_keyword; use self::keywords::llm_keyword::llm_keyword;
use self::keywords::detect::detect_keyword;
use self::keywords::on::on_keyword; use self::keywords::on::on_keyword;
use self::keywords::print::print_keyword; use self::keywords::print::print_keyword;
use self::keywords::set::set_keyword; use self::keywords::set::set_keyword;
@ -146,6 +147,7 @@ impl ScriptService {
format_keyword(&mut engine); format_keyword(&mut engine);
#[cfg(feature = "llm")] #[cfg(feature = "llm")]
llm_keyword(state.clone(), user.clone(), &mut engine); llm_keyword(state.clone(), user.clone(), &mut engine);
detect_keyword(state.clone(), user.clone(), &mut engine);
get_keyword(state.clone(), user.clone(), &mut engine); get_keyword(state.clone(), user.clone(), &mut engine);
set_keyword(&state, user.clone(), &mut engine); set_keyword(&state, user.clone(), &mut engine);
wait_keyword(&state, user.clone(), &mut engine); wait_keyword(&state, user.clone(), &mut engine);

View file

@ -123,6 +123,7 @@ impl KbContextManager {
pub async fn search_active_kbs( pub async fn search_active_kbs(
&self, &self,
session_id: Uuid, session_id: Uuid,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
query: &str, query: &str,
max_results_per_kb: usize, max_results_per_kb: usize,
@ -153,6 +154,7 @@ impl KbContextManager {
match self match self
.search_single_kb( .search_single_kb(
bot_id,
bot_name, bot_name,
&kb_assoc.kb_name, &kb_assoc.kb_name,
query, query,
@ -352,6 +354,7 @@ impl KbContextManager {
async fn search_single_kb( async fn search_single_kb(
&self, &self,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
kb_name: &str, kb_name: &str,
query: &str, query: &str,
@ -362,7 +365,7 @@ impl KbContextManager {
let search_results = self let search_results = self
.kb_manager .kb_manager
.search(bot_name, kb_name, query, max_results) .search(bot_id, bot_name, kb_name, query, max_results)
.await?; .await?;
let mut kb_search_results = Vec::new(); let mut kb_search_results = Vec::new();
@ -468,6 +471,7 @@ pub async fn inject_kb_context(
kb_manager: Arc<KnowledgeBaseManager>, kb_manager: Arc<KnowledgeBaseManager>,
db_pool: DbPool, db_pool: DbPool,
session_id: Uuid, session_id: Uuid,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
user_query: &str, user_query: &str,
messages: &mut serde_json::Value, messages: &mut serde_json::Value,
@ -476,7 +480,7 @@ pub async fn inject_kb_context(
let context_manager = KbContextManager::new(kb_manager.clone(), db_pool.clone()); let context_manager = KbContextManager::new(kb_manager.clone(), db_pool.clone());
let kb_contexts = context_manager let kb_contexts = context_manager
.search_active_kbs(session_id, bot_name, user_query, 5, max_context_tokens / 2) .search_active_kbs(session_id, bot_id, bot_name, user_query, 5, max_context_tokens / 2)
.await?; .await?;
let website_contexts = context_manager let website_contexts = context_manager

View file

@ -692,6 +692,7 @@ impl BotOrchestrator {
kb_manager.clone(), kb_manager.clone(),
self.state.conn.clone(), self.state.conn.clone(),
session_id, session_id,
session.bot_id,
&bot_name_for_context, &bot_name_for_context,
&message_content, &message_content,
&mut messages, &mut messages,

View file

@ -116,6 +116,7 @@ impl KbIndexer {
pub async fn index_kb_folder( pub async fn index_kb_folder(
&self, &self,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
kb_name: &str, kb_name: &str,
kb_path: &Path, kb_path: &Path,
@ -145,12 +146,13 @@ impl KbIndexer {
self.qdrant_config.url self.qdrant_config.url
); );
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Qdrant vector database not available at {}. Start the vector_db service to enable KB indexing.", "Qdrant vector database is not available at {}. Start the vector_db service to enable KB indexing.",
self.qdrant_config.url self.qdrant_config.url
)); ));
} }
let collection_name = format!("{}_{}", bot_name, kb_name); let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
self.ensure_collection_exists(&collection_name).await?; self.ensure_collection_exists(&collection_name).await?;
@ -746,7 +748,7 @@ impl KbFolderMonitor {
Self { indexer, work_root } Self { indexer, work_root }
} }
pub async fn process_gbkb_folder(&self, bot_name: &str, kb_folder: &Path) -> Result<()> { pub async fn process_gbkb_folder(&self, bot_id: Uuid, bot_name: &str, kb_folder: &Path) -> Result<()> {
let kb_name = kb_folder let kb_name = kb_folder
.file_name() .file_name()
.and_then(|n| n.to_str()) .and_then(|n| n.to_str())
@ -762,7 +764,7 @@ impl KbFolderMonitor {
let result = self let result = self
.indexer .indexer
.index_kb_folder(bot_name, kb_name, &local_path) .index_kb_folder(bot_id, bot_name, kb_name, &local_path)
.await?; .await?;
info!( info!(

View file

@ -18,6 +18,7 @@ use log::{error, info, warn};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub struct KnowledgeBaseManager { pub struct KnowledgeBaseManager {
@ -48,6 +49,7 @@ impl KnowledgeBaseManager {
pub async fn index_kb_folder( pub async fn index_kb_folder(
&self, &self,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
kb_name: &str, kb_name: &str,
kb_path: &Path, kb_path: &Path,
@ -61,7 +63,7 @@ impl KnowledgeBaseManager {
let result = self let result = self
.indexer .indexer
.index_kb_folder(bot_name, kb_name, kb_path) .index_kb_folder(bot_id, bot_name, kb_name, kb_path)
.await?; .await?;
info!( info!(
@ -74,12 +76,14 @@ impl KnowledgeBaseManager {
pub async fn search( pub async fn search(
&self, &self,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
kb_name: &str, kb_name: &str,
query: &str, query: &str,
limit: usize, limit: usize,
) -> Result<Vec<SearchResult>> { ) -> Result<Vec<SearchResult>> {
let collection_name = format!("{}_{}", bot_name, kb_name); let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
self.indexer.search(&collection_name, query, limit).await self.indexer.search(&collection_name, query, limit).await
} }
@ -96,7 +100,7 @@ impl KnowledgeBaseManager {
self.processor.process_document(file_path).await self.processor.process_document(file_path).await
} }
pub async fn handle_gbkb_change(&self, bot_name: &str, kb_folder: &Path) -> Result<()> { pub async fn handle_gbkb_change(&self, bot_id: Uuid, bot_name: &str, kb_folder: &Path) -> Result<()> {
info!( info!(
"Handling .gbkb folder change for bot {} at {}", "Handling .gbkb folder change for bot {} at {}",
bot_name, bot_name,
@ -104,11 +108,12 @@ impl KnowledgeBaseManager {
); );
let monitor = self.monitor.read().await; let monitor = self.monitor.read().await;
monitor.process_gbkb_folder(bot_name, kb_folder).await monitor.process_gbkb_folder(bot_id, bot_name, kb_folder).await
} }
pub async fn clear_kb(&self, bot_name: &str, kb_name: &str) -> Result<()> { pub async fn clear_kb(&self, bot_id: Uuid, bot_name: &str, kb_name: &str) -> Result<()> {
let collection_name = format!("{}_{}", bot_name, kb_name); let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
warn!("Clearing knowledge base collection: {}", collection_name); warn!("Clearing knowledge base collection: {}", collection_name);
@ -124,8 +129,9 @@ impl KnowledgeBaseManager {
} }
} }
pub async fn get_kb_stats(&self, bot_name: &str, kb_name: &str) -> Result<KbStatistics> { pub async fn get_kb_stats(&self, bot_id: Uuid, bot_name: &str, kb_name: &str) -> Result<KbStatistics> {
let collection_name = format!("{}_{}", bot_name, kb_name); let bot_id_short = bot_id.to_string().chars().take(8).collect::<String>();
let collection_name = format!("{}_{}_{}", bot_name, bot_id_short, kb_name);
let collection_info = self.indexer.get_collection_info(&collection_name).await?; let collection_info = self.indexer.get_collection_info(&collection_name).await?;
@ -168,6 +174,7 @@ impl DriveMonitorIntegration {
pub async fn on_gbkb_folder_changed( pub async fn on_gbkb_folder_changed(
&self, &self,
bot_id: Uuid,
bot_name: &str, bot_name: &str,
folder_path: &Path, folder_path: &Path,
change_type: ChangeType, change_type: ChangeType,
@ -180,12 +187,12 @@ impl DriveMonitorIntegration {
folder_path.display() folder_path.display()
); );
self.kb_manager self.kb_manager
.handle_gbkb_change(bot_name, folder_path) .handle_gbkb_change(bot_id, bot_name, folder_path)
.await .await
} }
ChangeType::Deleted => { ChangeType::Deleted => {
if let Some(kb_name) = folder_path.file_name().and_then(|n| n.to_str()) { if let Some(kb_name) = folder_path.file_name().and_then(|n| n.to_str()) {
self.kb_manager.clear_kb(bot_name, kb_name).await self.kb_manager.clear_kb(bot_id, bot_name, kb_name).await
} else { } else {
Err(anyhow::anyhow!("Invalid KB folder path")) Err(anyhow::anyhow!("Invalid KB folder path"))
} }

View file

@ -281,7 +281,7 @@ impl WebsiteCrawlerService {
// Process this batch immediately to free memory // Process this batch immediately to free memory
if batch_idx == 0 || (batch_idx + 1) % 2 == 0 { if batch_idx == 0 || (batch_idx + 1) % 2 == 0 {
// Index every 2 batches to prevent memory buildup // Index every 2 batches to prevent memory buildup
match bot_indexer.index_kb_folder(&bot_name, &kb_name, &work_path).await { match bot_indexer.index_kb_folder(website.bot_id, &bot_name, &kb_name, &work_path).await {
Ok(result) => trace!("Indexed batch {} successfully: {} docs, {} chunks", Ok(result) => trace!("Indexed batch {} successfully: {} docs, {} chunks",
batch_idx + 1, result.documents_processed, result.chunks_indexed), batch_idx + 1, result.documents_processed, result.chunks_indexed),
Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e), Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e),
@ -294,7 +294,7 @@ impl WebsiteCrawlerService {
// Final indexing for any remaining content // Final indexing for any remaining content
bot_indexer bot_indexer
.index_kb_folder(&bot_name, &kb_name, &work_path) .index_kb_folder(website.bot_id, &bot_name, &kb_name, &work_path)
.await?; .await?;
config.calculate_next_crawl(); config.calculate_next_crawl();

View file

@ -1171,6 +1171,7 @@ impl DriveMonitor {
} }
let kb_manager = Arc::clone(&self.kb_manager); let kb_manager = Arc::clone(&self.kb_manager);
let bot_id = self.bot_id;
let bot_name_owned = bot_name.to_string(); let bot_name_owned = bot_name.to_string();
let kb_name_owned = kb_name.to_string(); let kb_name_owned = kb_name.to_string();
let kb_folder_owned = kb_folder_path.clone(); let kb_folder_owned = kb_folder_path.clone();
@ -1185,7 +1186,7 @@ impl DriveMonitor {
let result = tokio::time::timeout( let result = tokio::time::timeout(
Duration::from_secs(KB_INDEXING_TIMEOUT_SECS), Duration::from_secs(KB_INDEXING_TIMEOUT_SECS),
kb_manager.handle_gbkb_change(&bot_name_owned, &kb_folder_owned), kb_manager.handle_gbkb_change(bot_id, &bot_name_owned, &kb_folder_owned),
) )
.await; .await;
@ -1274,7 +1275,7 @@ impl DriveMonitor {
let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name); let kb_prefix = format!("{}{}/", gbkb_prefix, kb_name);
if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) { if !file_states.keys().any(|k| k.starts_with(&kb_prefix)) {
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]
if let Err(e) = self.kb_manager.clear_kb(bot_name, kb_name).await { if let Err(e) = self.kb_manager.clear_kb(self.bot_id, bot_name, kb_name).await {
log::error!("Failed to clear KB {}: {}", kb_name, e); log::error!("Failed to clear KB {}: {}", kb_name, e);
} }

View file

@ -1,4 +1,5 @@
use crate::basic::compiler::BasicCompiler; use crate::basic::compiler::BasicCompiler;
use crate::core::kb::{EmbeddingConfig, KnowledgeBaseManager};
use crate::core::shared::state::AppState; use crate::core::shared::state::AppState;
use diesel::prelude::*; use diesel::prelude::*;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
@ -26,6 +27,8 @@ pub struct LocalFileMonitor {
work_root: PathBuf, work_root: PathBuf,
file_states: Arc<RwLock<HashMap<String, LocalFileState>>>, file_states: Arc<RwLock<HashMap<String, LocalFileState>>>,
is_processing: Arc<AtomicBool>, is_processing: Arc<AtomicBool>,
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager: Option<Arc<KnowledgeBaseManager>>,
} }
impl LocalFileMonitor { impl LocalFileMonitor {
@ -38,6 +41,15 @@ impl LocalFileMonitor {
// Use /opt/gbo/data as the base directory for source files // Use /opt/gbo/data as the base directory for source files
let data_dir = PathBuf::from("/opt/gbo/data"); let data_dir = PathBuf::from("/opt/gbo/data");
#[cfg(any(feature = "research", feature = "llm"))]
let kb_manager = match &state.kb_manager {
Some(km) => Some(Arc::clone(km)),
None => {
debug!("KB manager not available in LocalFileMonitor");
None
}
};
trace!("Initializing with data_dir: {:?}, work_root: {:?}", data_dir, work_root); trace!("Initializing with data_dir: {:?}, work_root: {:?}", data_dir, work_root);
Self { Self {
@ -46,6 +58,8 @@ impl LocalFileMonitor {
work_root, work_root,
file_states: Arc::new(RwLock::new(HashMap::new())), file_states: Arc::new(RwLock::new(HashMap::new())),
is_processing: Arc::new(AtomicBool::new(false)), is_processing: Arc::new(AtomicBool::new(false)),
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager,
} }
} }
@ -191,6 +205,75 @@ impl LocalFileMonitor {
if gbdialog_path.exists() { if gbdialog_path.exists() {
self.compile_gbdialog(bot_name, &gbdialog_path).await?; self.compile_gbdialog(bot_name, &gbdialog_path).await?;
} }
// Index .gbkb folders
#[cfg(any(feature = "research", feature = "llm"))]
{
if let Some(ref kb_manager) = self.kb_manager {
let gbkb_path = path.join(format!("{}.gbkb", bot_name));
if gbkb_path.exists() {
if let Err(e) = self.index_gbkb_folder(bot_name, &gbkb_path, kb_manager).await {
error!("Failed to index .gbkb folder {:?}: {}", gbkb_path, e);
}
}
}
}
}
}
Ok(())
}
#[cfg(any(feature = "research", feature = "llm"))]
async fn index_gbkb_folder(
&self,
bot_name: &str,
gbkb_path: &Path,
_kb_manager: &Arc<KnowledgeBaseManager>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Indexing .gbkb folder for bot {}: {:?}", bot_name, gbkb_path);
// Get bot_id from database
let bot_id = {
use crate::core::shared::models::schema::bots::dsl::*;
let mut conn = self.state.conn.get()
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
bots.filter(name.eq(bot_name))
.select(id)
.first::<Uuid>(&mut *conn)
.map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name, e))?
};
// Load bot-specific embedding config from database
let embedding_config = EmbeddingConfig::from_bot_config(&self.state.conn, &bot_id);
info!("Using embedding config for bot '{}': URL={}, model={}",
bot_name, embedding_config.embedding_url, embedding_config.embedding_model);
// Create a temporary KbIndexer with the bot-specific config
let qdrant_config = crate::core::kb::QdrantConfig::default();
let indexer = crate::core::kb::KbIndexer::new(embedding_config, qdrant_config);
// Index each KB folder inside .gbkb (e.g., carta, proc)
let entries = tokio::fs::read_dir(gbkb_path).await?;
let mut entries = entries;
while let Some(entry) = entries.next_entry().await? {
let kb_folder_path = entry.path();
if kb_folder_path.is_dir() {
if let Some(kb_name) = kb_folder_path.file_name().and_then(|n| n.to_str()) {
info!("Indexing KB '{}' for bot '{}'", kb_name, bot_name);
if let Err(e) = indexer.index_kb_folder(
bot_id,
bot_name,
kb_name,
&kb_folder_path,
).await {
error!("Failed to index KB '{}' for bot '{}': {}", kb_name, bot_name, e);
}
}
} }
} }
@ -327,6 +410,8 @@ impl Clone for LocalFileMonitor {
work_root: self.work_root.clone(), work_root: self.work_root.clone(),
file_states: Arc::clone(&self.file_states), file_states: Arc::clone(&self.file_states),
is_processing: Arc::clone(&self.is_processing), is_processing: Arc::clone(&self.is_processing),
#[cfg(any(feature = "research", feature = "llm"))]
kb_manager: self.kb_manager.clone(),
} }
} }
} }