diff --git a/Cargo.toml b/Cargo.toml index cd10bf56c..f78476514 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", # ===== CORE INFRASTRUCTURE (Can be used standalone) ===== scripting = ["dep:rhai"] automation = ["scripting", "dep:cron"] -drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract"] +drive = ["dep:aws-config", "dep:aws-sdk-s3", "dep:aws-smithy-async", "dep:pdf-extract", "dep:notify"] cache = ["dep:redis"] directory = [] crawler = ["drive", "cache"] @@ -215,6 +215,9 @@ rss = { workspace = true } scraper = { workspace = true } walkdir = { workspace = true } +# File system monitoring (for local .gbai monitoring) +notify = { workspace = true, optional = true } + # Embedded static files rust-embed = { workspace = true, optional = true } diff --git a/src/auto_task/app_generator.rs b/src/auto_task/app_generator.rs index 1f6f162c2..0905bdd7f 100644 --- a/src/auto_task/app_generator.rs +++ b/src/auto_task/app_generator.rs @@ -3050,7 +3050,7 @@ NO QUESTIONS. JUST BUILD."# match self .state .llm_provider - .generate_stream(prompt, &llm_config, tx, &model, &key) + .generate_stream(prompt, &llm_config, tx, &model, &key, None) .await { Ok(()) => { diff --git a/src/basic/keywords/model_routing.rs b/src/basic/keywords/model_routing.rs index 8a5b5f6b4..89f2f95b4 100644 --- a/src/basic/keywords/model_routing.rs +++ b/src/basic/keywords/model_routing.rs @@ -424,6 +424,7 @@ fn get_session_model_sync( preference_value: String, } + // 1. Check session preference first (set by USE MODEL) let result: Option = diesel::sql_query( "SELECT preference_value FROM session_preferences \ WHERE session_id = $1 AND preference_key = 'current_model' LIMIT 1", @@ -433,9 +434,72 @@ fn get_session_model_sync( .optional() .map_err(|e| format!("Failed to get session model: {}", e))?; - Ok(result - .map(|r| r.preference_value) - .unwrap_or_else(|| "default".to_string())) + if let Some(pref) = result { + return Ok(pref.preference_value); + } + + // 2. No session preference - get bot's configured model + // Need to get bot_id from session first + #[derive(QueryableByName)] + struct SessionBot { + #[diesel(sql_type = diesel::sql_types::Uuid)] + bot_id: Uuid, + } + + let bot_result: Option = diesel::sql_query( + "SELECT bot_id FROM sessions WHERE id = $1 LIMIT 1", + ) + .bind::(session_id) + .get_result(conn) + .optional() + .map_err(|e| format!("Failed to get session bot: {}", e))?; + + if let Some(session_bot) = bot_result { + let bot_id = session_bot.bot_id; + + // Get bot's llm-model config + #[derive(QueryableByName)] + struct ConfigValue { + #[diesel(sql_type = diesel::sql_types::Text)] + config_value: String, + } + + let bot_model: Option = diesel::sql_query( + "SELECT config_value FROM bot_configuration \ + WHERE bot_id = $1 AND config_key = 'llm-model' LIMIT 1", + ) + .bind::(bot_id) + .get_result(conn) + .optional() + .map_err(|e| format!("Failed to get bot model: {}", e))?; + + if let Some(model) = bot_model { + if !model.config_value.is_empty() && model.config_value != "true" { + return Ok(model.config_value); + } + } + + // 3. Bot has no model configured - fall back to default bot's model + let (default_bot_id, _) = crate::bot::get_default_bot(conn); + + let default_model: Option = diesel::sql_query( + "SELECT config_value FROM bot_configuration \ + WHERE bot_id = $1 AND config_key = 'llm-model' LIMIT 1", + ) + .bind::(default_bot_id) + .get_result(conn) + .optional() + .map_err(|e| format!("Failed to get default bot model: {}", e))?; + + if let Some(model) = default_model { + if !model.config_value.is_empty() && model.config_value != "true" { + return Ok(model.config_value); + } + } + } + + // 4. Ultimate fallback + Ok("llama-3.3:8b".to_string()) } fn list_available_models_sync( diff --git a/src/basic/keywords/use_tool.rs b/src/basic/keywords/use_tool.rs index 9f200c8b7..778bc7df0 100644 --- a/src/basic/keywords/use_tool.rs +++ b/src/basic/keywords/use_tool.rs @@ -1,13 +1,14 @@ use crate::shared::models::UserSession; use crate::shared::state::AppState; use diesel::prelude::*; -use log::{error, trace, warn}; +use log::{error, info, trace, warn}; use rhai::{Dynamic, Engine}; +use std::path::Path; use std::sync::Arc; use uuid::Uuid; pub fn use_tool_keyword(state: Arc, user: UserSession, engine: &mut Engine) { let state_clone = Arc::clone(&state); - let user_clone = user; + let user_clone = user.clone(); engine .register_custom_syntax(["USE", "TOOL", "$expr$"], false, move |context, inputs| { @@ -73,44 +74,152 @@ pub fn use_tool_keyword(state: Arc, user: UserSession, engine: &mut En } }) .expect("valid syntax registration"); + + // Register use_tool(tool_name) function for preprocessor compatibility + let state_clone2 = Arc::clone(&state); + let user_clone2 = user.clone(); + + engine.register_fn("use_tool", move |tool_path: &str| -> Dynamic { + let tool_path_str = tool_path.to_string(); + trace!( + "use_tool function called: {} for session: {}", + tool_path_str, + user_clone2.id + ); + let tool_name = tool_path_str + .strip_prefix(".gbdialog/") + .unwrap_or(&tool_path_str) + .strip_suffix(".bas") + .unwrap_or(&tool_path_str) + .to_string(); + if tool_name.is_empty() { + return Dynamic::from("ERROR: Invalid tool name"); + } + let state_for_task = Arc::clone(&state_clone2); + let user_for_task = user_clone2.clone(); + let tool_name_for_task = tool_name; + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + let send_err = if let Ok(_rt) = rt { + let result = associate_tool_with_session( + &state_for_task, + &user_for_task, + &tool_name_for_task, + ); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".to_string())) + .err() + }; + if send_err.is_some() { + error!("Failed to send result from thread"); + } + }); + match rx.recv_timeout(std::time::Duration::from_secs(10)) { + Ok(Ok(message)) => Dynamic::from(message), + Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Dynamic::from("ERROR: use_tool timed out") + } + Err(e) => Dynamic::from(format!("ERROR: use_tool failed: {}", e)), + } + }); + + // Register USE_TOOL(tool_name) function (uppercase variant) + let state_clone3 = Arc::clone(&state); + let user_clone3 = user; + + engine.register_fn("USE_TOOL", move |tool_path: &str| -> Dynamic { + let tool_path_str = tool_path.to_string(); + trace!( + "USE_TOOL function called: {} for session: {}", + tool_path_str, + user_clone3.id + ); + let tool_name = tool_path_str + .strip_prefix(".gbdialog/") + .unwrap_or(&tool_path_str) + .strip_suffix(".bas") + .unwrap_or(&tool_path_str) + .to_string(); + if tool_name.is_empty() { + return Dynamic::from("ERROR: Invalid tool name"); + } + let state_for_task = Arc::clone(&state_clone3); + let user_for_task = user_clone3.clone(); + let tool_name_for_task = tool_name; + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + let send_err = if let Ok(_rt) = rt { + let result = associate_tool_with_session( + &state_for_task, + &user_for_task, + &tool_name_for_task, + ); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".to_string())) + .err() + }; + if send_err.is_some() { + error!("Failed to send result from thread"); + } + }); + match rx.recv_timeout(std::time::Duration::from_secs(10)) { + Ok(Ok(message)) => Dynamic::from(message), + Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Dynamic::from("ERROR: USE_TOOL timed out") + } + Err(e) => Dynamic::from(format!("ERROR: USE_TOOL failed: {}", e)), + } + }); } fn associate_tool_with_session( state: &AppState, user: &UserSession, tool_name: &str, ) -> Result { - use crate::shared::models::schema::{basic_tools, session_tool_associations}; - let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; - let tool_exists: Result = basic_tools::table - .filter(basic_tools::bot_id.eq(user.bot_id.to_string())) - .filter(basic_tools::tool_name.eq(tool_name)) - .filter(basic_tools::is_active.eq(1)) - .select(diesel::dsl::count(basic_tools::id)) - .first::(&mut *conn) - .map(|count| count > 0); - match tool_exists { - Ok(true) => { - trace!( - "Tool '{}' exists and is active for bot '{}'", - tool_name, - user.bot_id - ); - } - Ok(false) => { - warn!( - "Tool '{}' does not exist or is not active for bot '{}'", - tool_name, user.bot_id - ); - return Err(format!( - "Tool '{}' is not available. Make sure the tool file is compiled and active.", - tool_name - )); - } - Err(e) => { - error!("Failed to check tool existence: {}", e); - return Err(format!("Database error while checking tool: {}", e)); - } + use crate::shared::models::schema::session_tool_associations; + + // Check if tool's .mcp.json file exists in work directory + let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + let gb_dir = format!("{}/gb", home_dir); + + // Get bot name to construct the path + let bot_name = get_bot_name_from_id(state, &user.bot_id)?; + let work_path = Path::new(&gb_dir) + .join("work") + .join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); + let mcp_path = work_path.join(format!("{}.mcp.json", tool_name)); + + trace!("Checking for tool .mcp.json at: {:?}", mcp_path); + + if !mcp_path.exists() { + warn!( + "Tool '{}' .mcp.json file not found at {:?}", + tool_name, mcp_path + ); + return Err(format!( + "Tool '{}' is not available. .mcp.json file not found.", + tool_name + )); } + + info!( + "Tool '{}' .mcp.json found, proceeding with session association", + tool_name + ); + + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let association_id = Uuid::new_v4().to_string(); let session_id_str = user.id.to_string(); let added_at = chrono::Utc::now().to_rfc3339(); @@ -186,3 +295,14 @@ pub fn clear_session_tools( ) .execute(conn) } + +fn get_bot_name_from_id(state: &AppState, bot_id: &uuid::Uuid) -> Result { + use crate::shared::models::schema::bots; + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + let bot_name: String = bots::table + .filter(bots::id.eq(bot_id)) + .select(bots::name) + .first(&mut *conn) + .map_err(|e| format!("Failed to get bot name for id {}: {}", bot_id, e))?; + Ok(bot_name) +} diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index fd1d0eb71..b33656ef2 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -2,6 +2,8 @@ pub mod kb_context; #[cfg(any(feature = "research", feature = "llm"))] use kb_context::inject_kb_context; +pub mod tool_context; +use tool_context::get_session_tools; #[cfg(feature = "llm")] use crate::core::config::ConfigManager; @@ -82,6 +84,18 @@ pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) { } } +/// Get bot ID by name from database +pub fn get_bot_id_by_name(conn: &mut PgConnection, bot_name: &str) -> Result { + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + bots + .filter(name.eq(bot_name)) + .select(id) + .first::(conn) + .map_err(|e| format!("Bot '{}' not found: {}", bot_name, e)) +} + #[derive(Debug)] pub struct BotOrchestrator { pub state: Arc, @@ -166,6 +180,9 @@ impl BotOrchestrator { let mut bots_mounted = 0; let mut bots_created = 0; + let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + let data_dir = format!("{}/data", home_dir); + let directories_to_scan: Vec = vec![ self.state .config @@ -175,6 +192,7 @@ impl BotOrchestrator { .into(), "./templates".into(), "../bottemplates".into(), + data_dir.into(), ]; for dir_path in directories_to_scan { @@ -207,7 +225,7 @@ impl BotOrchestrator { &self, dir_path: &std::path::Path, bots_mounted: &mut i32, - _bots_created: &mut i32, + bots_created: &mut i32, ) -> Result<(), Box> { let entries = std::fs::read_dir(dir_path).map_err(|e| format!("Failed to read directory: {}", e))?; @@ -228,10 +246,25 @@ impl BotOrchestrator { *bots_mounted += 1; } Ok(false) => { - info!( - "Bot '{}' does not exist in database, skipping (run import to create)", - bot_name - ); + // Auto-create bots found in ~/data + if dir_path.to_string_lossy().contains("/data") { + info!("Auto-creating bot '{}' from ~/data", bot_name); + match self.create_bot_simple(bot_name) { + Ok(_) => { + info!("Bot '{}' created successfully", bot_name); + *bots_created += 1; + *bots_mounted += 1; + } + Err(e) => { + error!("Failed to create bot '{}': {}", bot_name, e); + } + } + } else { + info!( + "Bot '{}' does not exist in database, skipping (run import to create)", + bot_name + ); + } } Err(e) => { error!("Failed to check if bot '{}' exists: {}", bot_name, e); @@ -271,6 +304,38 @@ impl BotOrchestrator { Ok(exists.exists) } + fn create_bot_simple(&self, bot_name: &str) -> Result<(), Box> { + use diesel::sql_query; + use uuid::Uuid; + + let mut conn = self + .state + .conn + .get() + .map_err(|e| format!("Failed to get database connection: {e}"))?; + + // Check if bot already exists + let exists = self.ensure_bot_exists(bot_name)?; + if exists { + info!("Bot '{}' already exists, skipping creation", bot_name); + return Ok(()); + } + + let bot_id = Uuid::new_v4(); + + sql_query( + "INSERT INTO bots (id, name, llm_provider, context_provider, is_active, created_at, updated_at) + VALUES ($1, $2, 'openai', 'website', true, NOW(), NOW())" + ) + .bind::(bot_id) + .bind::(bot_name) + .execute(&mut conn) + .map_err(|e| format!("Failed to create bot: {e}"))?; + + info!("Created bot '{}' with ID '{}'", bot_name, bot_id); + Ok(()) + } + #[cfg(feature = "llm")] pub async fn stream_response( &self, @@ -285,6 +350,7 @@ impl BotOrchestrator { let user_id = Uuid::parse_str(&message.user_id)?; let session_id = Uuid::parse_str(&message.session_id)?; + let session_id_str = session_id.to_string(); let message_content = message.content.clone(); let (session, context_data, history, model, key) = { @@ -337,30 +403,112 @@ impl BotOrchestrator { .await?? }; - let system_prompt = "You are a helpful assistant.".to_string(); + let system_prompt = "You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response.".to_string(); let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history); + // Get bot name for KB and tool injection + let bot_name_for_context = { + let conn = self.state.conn.get().ok(); + if let Some(mut db_conn) = conn { + use crate::shared::models::schema::bots::dsl::*; + bots.filter(id.eq(session.bot_id)) + .select(name) + .first::(&mut db_conn) + .unwrap_or_else(|_| "default".to_string()) + } else { + "default".to_string() + } + }; + #[cfg(any(feature = "research", feature = "llm"))] { - if let Some(kb_manager) = self.state.kb_manager.as_ref() { - let bot_name_for_kb = { - let conn = self.state.conn.get().ok(); - if let Some(mut db_conn) = conn { - use crate::shared::models::schema::bots::dsl::*; - bots.filter(id.eq(session.bot_id)) - .select(name) - .first::(&mut db_conn) - .unwrap_or_else(|_| "default".to_string()) - } else { - "default".to_string() - } - }; + // Execute start.bas on first message to load tools + // Check if tools have been loaded for this session + let has_tools_loaded = { + use crate::shared::models::schema::session_tool_associations::dsl::*; + let conn = self.state.conn.get().ok(); + if let Some(mut db_conn) = conn { + let tool_names: Vec = session_tool_associations + .filter(session_id.eq(&session_id_str)) + .select(tool_name) + .limit(1) + .load(&mut db_conn) + .unwrap_or_default(); + !tool_names.is_empty() + } else { + false + } + }; + // If no tools are loaded, execute start.bas + if !has_tools_loaded { + let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + let data_dir = format!("{}/data", home_dir); + let start_script_path = format!("{}/{}.gbai/{}.gbdialog/start.bas", data_dir, bot_name_for_context, bot_name_for_context); + + info!("[START_BAS] Checking for start.bas at: {}", start_script_path); + + if let Ok(metadata) = tokio::fs::metadata(&start_script_path).await { + if metadata.is_file() { + info!("[START_BAS] Found start.bas, executing for session {}", session_id); + + if let Ok(start_script) = tokio::fs::read_to_string(&start_script_path).await { + let state_clone = self.state.clone(); + let session_id_clone = session_id; + let bot_id_clone = session.bot_id; + let bot_name_clone = bot_name_for_context.clone(); + let bot_name_for_log = bot_name_clone.clone(); + let start_script_clone = start_script.clone(); + + tokio::spawn(async move { + let session_result = state_clone.session_manager.lock().await.get_session_by_id(session_id_clone); + + if let Ok(Some(session)) = session_result { + let result = tokio::task::spawn_blocking(move || { + let mut script_service = crate::basic::ScriptService::new( + state_clone.clone(), + session.clone() + ); + script_service.load_bot_config_params(&state_clone, bot_id_clone); + + match script_service.compile(&start_script_clone) { + Ok(ast) => match script_service.run(&ast) { + Ok(_) => { + info!("[START_BAS] Executed start.bas successfully for bot {}", bot_name_clone); + Ok(()) + }, + Err(e) => Err(format!("Script execution error: {}", e)), + }, + Err(e) => Err(format!("Script compilation error: {}", e)), + } + }).await; + + match result { + Ok(Ok(())) => { + info!("[START_BAS] start.bas completed for bot {}", bot_name_for_log); + } + Ok(Err(e)) => { + error!("[START_BAS] start.bas error for bot {}: {}", bot_name_for_log, e); + } + Err(e) => { + error!("[START_BAS] start.bas task error for bot {}: {}", bot_name_for_log, e); + } + } + } + }); + } + } else { + info!("[START_BAS] start.bas not found for bot {}", bot_name_for_context); + } + } + } + + if let Some(kb_manager) = self.state.kb_manager.as_ref() { if let Err(e) = inject_kb_context( kb_manager.clone(), self.state.conn.clone(), session_id, - &bot_name_for_kb, + &bot_name_for_context, &message_content, &mut messages, 8000, @@ -372,11 +520,44 @@ impl BotOrchestrator { } } + // Add the current user message to the messages array + if let Some(msgs_array) = messages.as_array_mut() { + msgs_array.push(serde_json::json!({ + "role": "user", + "content": message_content + })); + } + + // DEBUG: Log messages before sending to LLM + info!("[LLM_CALL] Messages before LLM: {}", serde_json::to_string_pretty(&messages).unwrap_or_default()); + info!("[LLM_CALL] message_content: '{}'", message_content); + let (stream_tx, mut stream_rx) = mpsc::channel::(100); + info!("[STREAM_SETUP] Channel created, starting LLM stream"); let llm = self.state.llm_provider.clone(); let model_clone = model.clone(); let key_clone = key.clone(); + + // Retrieve session tools for tool calling + let session_tools = get_session_tools(&self.state.conn, &bot_name_for_context, &session_id); + let tools_for_llm = match session_tools { + Ok(tools) => { + if !tools.is_empty() { + info!("[TOOLS] Loaded {} tools for session {}", tools.len(), session_id); + Some(tools) + } else { + info!("[TOOLS] No tools associated with session {}", session_id); + None + } + } + Err(e) => { + warn!("[TOOLS] Failed to load session tools: {}", e); + None + } + }; + + // Clone messages for the async task let messages_clone = messages.clone(); // DEBUG: Log exact values being passed to LLM @@ -389,12 +570,14 @@ impl BotOrchestrator { ); tokio::spawn(async move { + info!("[SPAWN_TASK] LLM stream task started"); if let Err(e) = llm - .generate_stream("", &messages_clone, stream_tx, &model_clone, &key_clone) + .generate_stream("", &messages_clone, stream_tx, &model_clone, &key_clone, tools_for_llm.as_ref()) .await { error!("LLM streaming error: {}", e); } + info!("[SPAWN_TASK] LLM stream task completed"); }); let mut full_response = String::new(); @@ -402,6 +585,9 @@ impl BotOrchestrator { let mut in_analysis = false; let handler = llm_models::get_handler(&model); + info!("[STREAM_START] Entering stream processing loop for model: {}", model); + info!("[STREAM_START] About to enter while loop, stream_rx is valid"); + trace!("Using model handler for {}", model); #[cfg(feature = "nvidia")] @@ -426,6 +612,7 @@ impl BotOrchestrator { } while let Some(chunk) = stream_rx.recv().await { + info!("[STREAM_DEBUG] Received chunk: '{}', len: {}", chunk, chunk.len()); trace!("Received LLM chunk: {:?}", chunk); analysis_buffer.push_str(&chunk); @@ -466,12 +653,13 @@ impl BotOrchestrator { if in_analysis && handler.is_analysis_complete(&analysis_buffer) { in_analysis = false; - log::debug!( - "Detected end of thinking/analysis content for model {}", - model + info!( + "[ANALYSIS] Detected end of thinking for model {}. Buffer: '{}'", + model, analysis_buffer ); let processed = handler.process_content(&analysis_buffer); + info!("[ANALYSIS] Processed content: '{}'", processed); if !processed.is_empty() { full_response.push_str(&processed); @@ -506,6 +694,7 @@ impl BotOrchestrator { } if !in_analysis { + info!("[STREAM_CONTENT] Sending chunk: '{}', len: {}", chunk, chunk.len()); full_response.push_str(&chunk); let response = BotResponse { @@ -530,6 +719,8 @@ impl BotOrchestrator { } } + info!("[STREAM_END] While loop exited. full_response length: {}", full_response.len()); + let state_for_save = self.state.clone(); let full_response_clone = full_response.clone(); tokio::task::spawn_blocking( @@ -607,6 +798,25 @@ impl BotOrchestrator { } } +/// Extract bot name from URL like "http://localhost:3000/bot/cristo" or "/cristo/" +fn extract_bot_from_url(url: &str) -> Option { + // Remove protocol and domain + let path_part = url + .split('/') + .skip_while(|&part| part == "http:" || part == "https:" || part.is_empty()) + .skip_while(|&part| part.contains('.') || part == "localhost" || part == "bot") + .collect::>(); + + // First path segment after /bot/ is the bot name + if let Some(&bot_name) = path_part.first() { + if !bot_name.is_empty() && bot_name != "bot" { + return Some(bot_name.to_string()); + } + } + + None +} + pub async fn websocket_handler( ws: WebSocketUpgrade, State(state): State>, @@ -616,6 +826,8 @@ pub async fn websocket_handler( .get("session_id") .and_then(|s| Uuid::parse_str(s).ok()); let user_id = params.get("user_id").and_then(|s| Uuid::parse_str(s).ok()); + + // Extract bot_name from query params let bot_name = params .get("bot_name") .cloned() @@ -725,7 +937,9 @@ async fn handle_websocket( ); if let Some(bot_name) = bot_name_result { - let start_script_path = format!("./work/{}.gbai/{}.gbdialog/start.bas", bot_name, bot_name); + let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + let data_dir = format!("{}/data", home_dir); + let start_script_path = format!("{}/{}.gbai/{}.gbdialog/start.bas", data_dir, bot_name, bot_name); info!("Looking for start.bas at: {}", start_script_path); diff --git a/src/core/bot/tool_context.rs b/src/core/bot/tool_context.rs new file mode 100644 index 000000000..0faf62a1a --- /dev/null +++ b/src/core/bot/tool_context.rs @@ -0,0 +1,127 @@ +use diesel::prelude::*; +use log::{debug, info, warn}; +use serde_json::{json, Value}; +use std::path::Path; +use uuid::Uuid; + +use crate::shared::utils::DbPool; + +/// Structure to hold tool information loaded from .mcp.json files +#[derive(Debug, Clone)] +struct ToolInfo { + name: String, + description: String, + parameters: Vec, +} + +#[derive(Debug, Clone)] +struct ToolParameter { + name: String, + param_type: String, + description: String, + required: bool, + example: Option, +} + +/// Loads tools for a bot and returns them formatted for OpenAI API +pub fn get_session_tools( + db_pool: &DbPool, + bot_name: &str, + session_id: &Uuid, +) -> Result, Box> { + use crate::shared::models::schema::{bots, session_tool_associations}; + + // Get bot_id (we use the query to verify the bot exists) + let mut conn = db_pool.get()?; + let _bot_id: Uuid = bots::table + .filter(bots::name.eq(bot_name)) + .select(bots::id) + .first(&mut *conn) + .map_err(|e| format!("Failed to get bot_id for bot '{}': {}", bot_name, e))?; + + // Get tool names associated with this session + let session_id_str = session_id.to_string(); + let tool_names: Vec = session_tool_associations::table + .filter(session_tool_associations::session_id.eq(&session_id_str)) + .select(session_tool_associations::tool_name) + .load::(&mut *conn) + .map_err(|e| format!("Failed to get tools for session: {}", e))?; + + if tool_names.is_empty() { + debug!("No tools associated with session {}", session_id); + return Ok(vec![]); + } + + // Build path to work/{bot_name}.gbai/{bot_name}.gbdialog directory + let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string()); + let gb_dir = format!("{}/gb", home_dir); + let work_path = Path::new(&gb_dir).join("work").join(format!("{}.gbai/{}.gbdialog", bot_name, bot_name)); + + info!("Loading {} tools for session {} from {:?}", tool_names.len(), session_id, work_path); + + let mut tools = Vec::new(); + + for tool_name in &tool_names { + // Find the .mcp.json file for this tool + let mcp_path = work_path.join(format!("{}.mcp.json", tool_name)); + + if !mcp_path.exists() { + warn!("Tool JSON file not found: {:?}", mcp_path); + continue; + } + + // Read and parse the .mcp.json file + let mcp_content = std::fs::read_to_string(&mcp_path) + .map_err(|e| format!("Failed to read tool file {:?}: {}", mcp_path, e))?; + + let mcp_json: Value = serde_json::from_str(&mcp_content) + .map_err(|e| format!("Failed to parse tool JSON from {:?}: {}", mcp_path, e))?; + + // Extract tool information and format for OpenAI + if let Some(tool) = format_tool_for_openai(&mcp_json, tool_name) { + tools.push(tool); + } + } + + info!("Loaded {} tools for session {}", tools.len(), session_id); + Ok(tools) +} + +/// Formats a tool definition from .mcp.json format to OpenAI tool format +fn format_tool_for_openai(mcp_json: &Value, tool_name: &str) -> Option { + let _name = mcp_json.get("name")?.as_str()?; + let description = mcp_json.get("description")?.as_str()?; + let input_schema = mcp_json.get("input_schema")?; + + let parameters = input_schema.get("properties")?.as_object()?; + let required = input_schema.get("required")?.as_array()?; + + let mut openai_params = serde_json::Map::new(); + + for (param_name, param_info) in parameters { + let param_obj = param_info.as_object()?; + let param_desc = param_obj.get("description")?.as_str().unwrap_or(""); + let param_type = param_obj.get("type")?.as_str().unwrap_or("string"); + + openai_params.insert( + param_name.clone(), + json!({ + "type": param_type, + "description": param_desc + }) + ); + } + + Some(json!({ + "type": "function", + "function": { + "name": tool_name, + "description": description, + "parameters": { + "type": "object", + "properties": openai_params, + "required": required + } + } + })) +} diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 26dd403d5..1687c94a7 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -2,6 +2,9 @@ pub mod model_routing_config; pub mod sse_config; pub mod user_memory_config; +#[cfg(feature = "drive")] +pub mod watcher; + pub use model_routing_config::{ModelRoutingConfig, RoutingStrategy, TaskType}; pub use sse_config::SseConfig; pub use user_memory_config::UserMemoryConfig; diff --git a/src/core/config/watcher.rs b/src/core/config/watcher.rs new file mode 100644 index 000000000..56d450329 --- /dev/null +++ b/src/core/config/watcher.rs @@ -0,0 +1,212 @@ +// Config file watcher - monitors config.csv files and reloads them when changed +use log::{error, info, warn}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::RwLock; + +use crate::shared::state::AppState; + +/// Tracks file state to detect changes +#[derive(Debug, Clone)] +struct FileState { + modified: SystemTime, + size: u64, +} + +/// Config file watcher - monitors config.csv files in data directory +pub struct ConfigWatcher { + data_dir: PathBuf, + file_states: Arc>>, + state: Arc, +} + +impl ConfigWatcher { + pub fn new(data_dir: PathBuf, state: Arc) -> Self { + Self { + data_dir, + file_states: Arc::new(RwLock::new(HashMap::new())), + state, + } + } + + /// Start watching for config.csv changes + pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + info!("Starting config file watcher for: {}", self.data_dir.display()); + + // Initial scan + if let Err(e) = self.scan_configs().await { + error!("Initial config scan failed: {}", e); + } + + // Set up periodic polling (every 5 seconds) + let mut interval = tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + if let Err(e) = self.scan_configs().await { + error!("Config scan failed: {}", e); + } + } + }) + } + + /// Scan all config.csv files in the data directory and reload changed files + async fn scan_configs(&self) -> Result<(), Box> { + // Pattern: data_dir/*.gbai/*.gbot/config.csv + let entries = match tokio::fs::read_dir(&self.data_dir).await { + Ok(e) => e, + Err(e) => { + warn!("Failed to read data directory {}: {}", self.data_dir.display(), e); + return Err(e.into()); + } + }; + + let mut entries = entries; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + // Check if it's a .gbai directory + if path.is_dir() && path.extension().and_then(|s| s.to_str()) == Some("gbai") { + let bot_name = path.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + + // Look for *.gbot/config.csv + let gbot_pattern = path.join(format!("{}.gbot", bot_name)); + let config_path = gbot_pattern.join("config.csv"); + + if config_path.exists() { + if let Err(e) = self.check_and_reload_config(&config_path, bot_name).await { + error!("Failed to check config {:?}: {}", config_path, e); + } + } + } + } + + Ok(()) + } + + /// Check if a config file has changed and reload it + async fn check_and_reload_config( + &self, + config_path: &Path, + bot_name: &str, + ) -> Result<(), Box> { + let metadata = tokio::fs::metadata(config_path).await?; + let modified = metadata.modified()?; + let size = metadata.len(); + + let mut states = self.file_states.write().await; + + // Check if file has changed + let has_changed = match states.get(config_path) { + Some(state) => state.modified != modified || state.size != size, + None => true, + }; + + if has_changed { + info!("Config file changed: {:?}", config_path); + + // Reload the config + match tokio::fs::read_to_string(config_path).await { + Ok(content) => { + let conn = self.state.conn.clone(); + let bot_name_owned = bot_name.to_string(); + let bot_name_for_log = bot_name_owned.clone(); + let bot_name_for_llm = bot_name_owned.clone(); + let content_clone = content.clone(); + + // Sync to database + let sync_result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get() + .map_err(|e| format!("Failed to get DB connection: {}", e))?; + + // Get bot_id by name + let bot_id = crate::bot::get_bot_id_by_name(&mut db_conn, &bot_name_owned) + .map_err(|e| format!("Failed to get bot_id for '{}': {}", bot_name_owned, e))?; + + // Use ConfigManager's sync_gbot_config (public method) + crate::core::config::ConfigManager::new(conn) + .sync_gbot_config(&bot_id, &content_clone) + }).await; + + match sync_result { + Ok(Ok(updated)) => { + info!("Reloaded config for bot '{}' ({} entries updated)", bot_name_for_log, updated); + + // Trigger immediate LLM config refresh + if let Some(dynamic_llm) = &self.state.dynamic_llm_provider { + // Get the updated config values + let pool = self.state.conn.clone(); + let llm_config = tokio::task::spawn_blocking(move || { + let mut db_conn = pool.get() + .map_err(|e| format!("DB connection error: {}", e))?; + + let bot_id = crate::bot::get_bot_id_by_name(&mut db_conn, &bot_name_for_llm) + .map_err(|e| format!("Get bot_id error: {}", e))?; + + let config_manager = crate::core::config::ConfigManager::new(pool); + let llm_server = config_manager.get_config(&bot_id, "llm-server", None) + .unwrap_or_default(); + let llm_model = config_manager.get_config(&bot_id, "llm-model", None) + .unwrap_or_default(); + let llm_key = config_manager.get_config(&bot_id, "llm-key", None) + .unwrap_or_default(); + + Ok::<_, String>((llm_server, llm_model, llm_key)) + }).await; + + if let Ok(Ok((llm_server, llm_model, llm_key))) = llm_config { + if !llm_server.is_empty() { + // Handle both local embedded (llm-server=true) and external API endpoints + if llm_server.eq_ignore_ascii_case("true") { + // Local embedded LLM server - trigger local LLM initialization + info!("ConfigWatcher: Local LLM server enabled for bot '{}', model={}", bot_name_for_log, llm_model); + // The local LLM will be initialized by LocalFileMonitor on next check + // Just trigger a config refresh to notify components + } else { + // External LLM API endpoint - parse URL and endpoint path + let (base_url, endpoint_path) = if llm_server.contains("/chat/completions") || llm_server.contains("/v1/") { + // Extract base URL up to the path + if let Some(pos) = llm_server.find("/v1/chat/completions") { + (&llm_server[..pos], Some(&llm_server[pos..])) + } else if let Some(pos) = llm_server.find("/chat/completions") { + (&llm_server[..pos], Some(&llm_server[pos..])) + } else { + (llm_server.as_str(), None) + } + } else { + (llm_server.as_str(), None) + }; + + info!("ConfigWatcher: Refreshing LLM provider with URL={}, model={}, endpoint={:?}", base_url, llm_model, endpoint_path); + dynamic_llm.update_from_config(base_url, Some(llm_model), endpoint_path.map(|s| s.to_string())).await; + } + } + } + } + } + Ok(Err(e)) => { + error!("Failed to reload config for bot '{}': {}", bot_name_for_log, e); + } + Err(e) => { + error!("Task failed for bot '{}': {}", bot_name_for_log, e); + } + } + } + Err(e) => { + error!("Failed to read config file {:?}: {}", config_path, e); + return Err(e.into()); + } + } + + // Update state + states.insert(config_path.to_path_buf(), FileState { modified, size }); + } + + Ok(()) + } +} diff --git a/src/core/kb/embedding_generator.rs b/src/core/kb/embedding_generator.rs index 8b2a6fbd9..7064ac676 100644 --- a/src/core/kb/embedding_generator.rs +++ b/src/core/kb/embedding_generator.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::Semaphore; +use crate::shared::DbPool; use crate::core::shared::memory_monitor::{log_jemalloc_stats, MemoryStats}; use super::document_processor::TextChunk; @@ -50,17 +51,93 @@ impl Default for EmbeddingConfig { impl EmbeddingConfig { pub fn from_env() -> Self { - let embedding_url = "http://localhost:8082".to_string(); - let embedding_model = "bge-small-en-v1.5".to_string(); - let dimensions = Self::detect_dimensions(&embedding_model); + Self::default() + } + + /// Load embedding config from bot's config.csv (similar to llm-url, llm-model) + /// This allows configuring embedding server per-bot in config.csv: + /// embedding-url,http://localhost:8082 + /// embedding-model,bge-small-en-v1.5 + /// embedding-dimensions,384 + /// embedding-batch-size,16 + /// embedding-timeout,60 + pub fn from_bot_config(pool: &DbPool, bot_id: &uuid::Uuid) -> Self { + use crate::shared::models::schema::bot_configuration::dsl::*; + use diesel::prelude::*; + + let embedding_url = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-url")) + .select(config_value) + .first::(&mut conn) + .ok() + .filter(|s| !s.is_empty()), + Err(_) => None, + }.unwrap_or_else(|| "http://localhost:8082".to_string()); + + let embedding_model = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-model")) + .select(config_value) + .first::(&mut conn) + .ok() + .filter(|s| !s.is_empty()), + Err(_) => None, + }.unwrap_or_else(|| "bge-small-en-v1.5".to_string()); + + let dimensions = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-dimensions")) + .select(config_value) + .first::(&mut conn) + .ok() + .and_then(|v| v.parse().ok()), + Err(_) => None, + }.unwrap_or_else(|| Self::detect_dimensions(&embedding_model)); + + let batch_size = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-batch-size")) + .select(config_value) + .first::(&mut conn) + .ok() + .and_then(|v| v.parse().ok()), + Err(_) => None, + }.unwrap_or(16); + + let timeout_seconds = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-timeout")) + .select(config_value) + .first::(&mut conn) + .ok() + .and_then(|v| v.parse().ok()), + Err(_) => None, + }.unwrap_or(60); + + let max_concurrent_requests = match pool.get() { + Ok(mut conn) => bot_configuration + .filter(bot_id.eq(bot_id)) + .filter(config_key.eq("embedding-concurrent")) + .select(config_value) + .first::(&mut conn) + .ok() + .and_then(|v| v.parse().ok()), + Err(_) => None, + }.unwrap_or(1); Self { embedding_url, embedding_model, dimensions, - batch_size: 16, - timeout_seconds: 60, - max_concurrent_requests: 1, + batch_size, + timeout_seconds, + max_concurrent_requests, connect_timeout_seconds: 10, } } diff --git a/src/drive/local_file_monitor.rs b/src/drive/local_file_monitor.rs new file mode 100644 index 000000000..3c2448af1 --- /dev/null +++ b/src/drive/local_file_monitor.rs @@ -0,0 +1,311 @@ +use crate::basic::compiler::BasicCompiler; +use crate::shared::state::AppState; +use log::{debug, error, info, warn}; +use std::collections::HashMap; +use std::error::Error; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::sync::RwLock; +use tokio::time::Duration; +use notify::{RecursiveMode, EventKind, RecommendedWatcher, Watcher}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct LocalFileState { + modified: SystemTime, + size: u64, +} + +pub struct LocalFileMonitor { + state: Arc, + data_dir: PathBuf, + file_states: Arc>>, + is_processing: Arc, +} + +impl LocalFileMonitor { + pub fn new(state: Arc) -> Self { + // Use ~/data as the base directory + let data_dir = PathBuf::from(std::env::var("HOME") + .unwrap_or_else(|_| ".".to_string())) + .join("data"); + + info!("[LOCAL_MONITOR] Initializing with data_dir: {:?}", data_dir); + + Self { + state, + data_dir, + file_states: Arc::new(RwLock::new(HashMap::new())), + is_processing: Arc::new(AtomicBool::new(false)), + } + } + + pub async fn start_monitoring(&self) -> Result<(), Box> { + info!("[LOCAL_MONITOR] Starting local file monitor for ~/data/*.gbai directories"); + + // Create data directory if it doesn't exist + if let Err(e) = tokio::fs::create_dir_all(&self.data_dir).await { + warn!("[LOCAL_MONITOR] Failed to create data directory: {}", e); + } + + // Initial scan of all .gbai directories + self.scan_and_compile_all().await?; + + self.is_processing.store(true, Ordering::SeqCst); + + // Spawn the monitoring loop + let monitor = self.clone(); + tokio::spawn(async move { + monitor.monitoring_loop().await; + }); + + info!("[LOCAL_MONITOR] Local file monitor started"); + Ok(()) + } + + async fn monitoring_loop(&self) { + info!("[LOCAL_MONITOR] Starting monitoring loop"); + + // Try to create a file system watcher + let (tx, mut rx) = tokio::sync::mpsc::channel(100); + + // Use notify crate for file system watching + let tx_clone = tx.clone(); + let mut watcher: RecommendedWatcher = match RecommendedWatcher::new( + move |res| { + if let Ok(event) = res { + let _ = tx_clone.try_send(event); + } + }, + notify::Config::default(), + ) { + Ok(w) => w, + Err(e) => { + error!("[LOCAL_MONITOR] Failed to create watcher: {}. Falling back to polling.", e); + // Fall back to polling if watcher creation fails + self.polling_loop().await; + return; + } + }; + + // Watch the data directory + if let Err(e) = watcher.watch(&self.data_dir, RecursiveMode::Recursive) { + warn!("[LOCAL_MONITOR] Failed to watch directory {:?}: {}. Using polling fallback.", self.data_dir, e); + drop(watcher); + self.polling_loop().await; + return; + } + + info!("[LOCAL_MONITOR] Watching directory: {:?}", self.data_dir); + + while self.is_processing.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_secs(5)).await; + + // Process events from the watcher + while let Ok(event) = rx.try_recv() { + match event.kind { + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Any => { + for path in &event.paths { + if self.is_gbdialog_file(path) { + info!("[LOCAL_MONITOR] Detected change: {:?}", path); + if let Err(e) = self.compile_local_file(path).await { + error!("[LOCAL_MONITOR] Failed to compile {:?}: {}", path, e); + } + } + } + } + EventKind::Remove(_) => { + for path in &event.paths { + if self.is_gbdialog_file(path) { + info!("[LOCAL_MONITOR] File removed: {:?}", path); + self.remove_file_state(path).await; + } + } + } + _ => {} + } + } + + // Periodic scan to catch any missed changes + if let Err(e) = self.scan_and_compile_all().await { + error!("[LOCAL_MONITOR] Scan failed: {}", e); + } + } + + info!("[LOCAL_MONITOR] Monitoring loop ended"); + } + + async fn polling_loop(&self) { + info!("[LOCAL_MONITOR] Using polling fallback (checking every 10s)"); + + while self.is_processing.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_secs(10)).await; + + if let Err(e) = self.scan_and_compile_all().await { + error!("[LOCAL_MONITOR] Scan failed: {}", e); + } + } + } + + fn is_gbdialog_file(&self, path: &Path) -> bool { + // Check if path is something like ~/data/*.gbai/.gbdialog/*.bas + path.extension() + .and_then(|e| e.to_str()) + .map(|e| e.eq_ignore_ascii_case("bas")) + .unwrap_or(false) + && path.ancestors() + .any(|p| p.ends_with(".gbdialog")) + } + + async fn scan_and_compile_all(&self) -> Result<(), Box> { + debug!("[LOCAL_MONITOR] Scanning ~/data for .gbai directories"); + + let entries = match tokio::fs::read_dir(&self.data_dir).await { + Ok(e) => e, + Err(e) => { + debug!("[LOCAL_MONITOR] Cannot read data directory: {}", e); + return Ok(()); + } + }; + + let mut entries = entries; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + // Check if this is a .gbai directory + if path.extension() + .and_then(|e| e.to_str()) + .map(|e| e.eq_ignore_ascii_case("gbai")) + .unwrap_or(false) + { + let bot_name = path.file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + + // Look for .gbdialog folder inside + let gbdialog_path = path.join(".gbdialog"); + if gbdialog_path.exists() { + self.compile_gbdialog(&bot_name, &gbdialog_path).await?; + } + } + } + + Ok(()) + } + + async fn compile_gbdialog(&self, bot_name: &str, gbdialog_path: &Path) -> Result<(), Box> { + debug!("[LOCAL_MONITOR] Processing .gbdialog for bot: {}", bot_name); + + let entries = tokio::fs::read_dir(gbdialog_path).await?; + let mut entries = entries; + + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + + if path.extension() + .and_then(|e| e.to_str()) + .map(|e| e.eq_ignore_ascii_case("bas")) + .unwrap_or(false) + { + let metadata = tokio::fs::metadata(&path).await?; + let modified = metadata.modified()?; + let size = metadata.len(); + + let file_key = path.to_string_lossy().to_string(); + + // Check if file changed + let should_compile = { + let states = self.file_states.read().await; + states.get(&file_key) + .map(|state| state.modified != modified || state.size != size) + .unwrap_or(true) + }; + + if should_compile { + info!("[LOCAL_MONITOR] Compiling: {:?}", path); + if let Err(e) = self.compile_local_file(&path).await { + error!("[LOCAL_MONITOR] Failed to compile {:?}: {}", path, e); + } + + // Update state + let mut states = self.file_states.write().await; + states.insert(file_key, LocalFileState { modified, size }); + } + } + } + + Ok(()) + } + + async fn compile_local_file(&self, file_path: &Path) -> Result<(), Box> { + let tool_name = file_path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + + // Extract bot name from path like ~/data/cristo.gbai/.gbdialog/file.bas + let bot_name = file_path + .ancestors() + .find(|p| p.extension().and_then(|e| e.to_str()).map(|e| e.eq_ignore_ascii_case("gbai")).unwrap_or(false)) + .and_then(|p| p.file_stem()) + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + + // Create work directory structure + let work_dir = self.data_dir.join(format!("{}.gbai", bot_name)); + + // Read the file content + let source_content = tokio::fs::read_to_string(file_path).await?; + + // Compile the file + let state_clone = Arc::clone(&self.state); + let work_dir_clone = work_dir.clone(); + let tool_name_clone = tool_name.to_string(); + let source_content_clone = source_content.clone(); + let bot_id = uuid::Uuid::new_v4(); // Generate a bot ID or get from somewhere + + tokio::task::spawn_blocking(move || { + std::fs::create_dir_all(&work_dir_clone)?; + let local_source_path = work_dir_clone.join(format!("{}.bas", tool_name_clone)); + std::fs::write(&local_source_path, &source_content_clone)?; + let mut compiler = BasicCompiler::new(state_clone, bot_id); + let result = compiler.compile_file(local_source_path.to_str().unwrap(), work_dir_clone.to_str().unwrap())?; + if let Some(mcp_tool) = result.mcp_tool { + info!( + "[LOCAL_MONITOR] MCP tool generated with {} parameters", + mcp_tool.input_schema.properties.len() + ); + } + Ok::<(), Box>(()) + }) + .await??; + + info!("[LOCAL_MONITOR] Successfully compiled: {:?}", file_path); + Ok(()) + } + + async fn remove_file_state(&self, path: &Path) { + let file_key = path.to_string_lossy().to_string(); + let mut states = self.file_states.write().await; + states.remove(&file_key); + } + + pub async fn stop_monitoring(&self) { + info!("[LOCAL_MONITOR] Stopping local file monitor"); + self.is_processing.store(false, Ordering::SeqCst); + self.file_states.write().await.clear(); + } +} + +impl Clone for LocalFileMonitor { + fn clone(&self) -> Self { + Self { + state: Arc::clone(&self.state), + data_dir: self.data_dir.clone(), + file_states: Arc::clone(&self.file_states), + is_processing: Arc::clone(&self.is_processing), + } + } +} diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 2e6a278fb..b05fe2028 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -15,6 +15,7 @@ use std::sync::Arc; pub mod document_processing; pub mod drive_monitor; +pub mod local_file_monitor; pub mod vectordb; #[derive(Debug, Serialize, Deserialize)] diff --git a/src/llm/cache.rs b/src/llm/cache.rs index f9d005cf8..47b6cba7a 100644 --- a/src/llm/cache.rs +++ b/src/llm/cache.rs @@ -545,13 +545,14 @@ impl LLMProvider for CachedLLMProvider { tx: mpsc::Sender, model: &str, key: &str, + tools: Option<&Vec>, ) -> Result<(), Box> { let bot_id = "default"; if !self.is_cache_enabled(bot_id).await { trace!("Cache disabled for streaming, bypassing"); return self .provider - .generate_stream(prompt, messages, tx, model, key) + .generate_stream(prompt, messages, tx, model, key, tools) .await; } @@ -581,7 +582,7 @@ impl LLMProvider for CachedLLMProvider { }); self.provider - .generate_stream(prompt, messages, buffer_tx, model, key) + .generate_stream(prompt, messages, buffer_tx, model, key, tools) .await?; let full_response = forward_task.await?; diff --git a/src/llm/claude.rs b/src/llm/claude.rs index 0b3f99c71..6a117eacf 100644 --- a/src/llm/claude.rs +++ b/src/llm/claude.rs @@ -583,6 +583,7 @@ impl ClaudeClient { tx: mpsc::Sender, model: &str, key: &str, + _tools: Option<&Vec>, ) -> Result<(), Box> { let model_name = if model.is_empty() { &self.deployment_name @@ -686,6 +687,7 @@ impl LLMProvider for ClaudeClient { tx: mpsc::Sender, model: &str, key: &str, + _tools: Option<&Vec>, ) -> Result<(), Box> { let mut last_error: Option> = None; @@ -700,7 +702,7 @@ impl LLMProvider for ClaudeClient { } match self - .stream_single_attempt(prompt, messages, tx.clone(), model, key) + .stream_single_attempt(prompt, messages, tx.clone(), model, key, _tools) .await { Ok(()) => { diff --git a/src/llm/glm.rs b/src/llm/glm.rs new file mode 100644 index 000000000..277709d3c --- /dev/null +++ b/src/llm/glm.rs @@ -0,0 +1,371 @@ +use async_trait::async_trait; +use futures::StreamExt; +use log::{error, info}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::sync::mpsc; + +use super::LLMProvider; + +// GLM / z.ai API Client +// Similar to OpenAI but with different endpoint structure +// For z.ai, base URL already contains version (e.g., /v4), endpoint is just /chat/completions + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMMessage { + pub role: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub content: Option, + #[serde(default)] + pub tool_calls: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMRequest { + pub model: String, + pub messages: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub stream: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_tokens: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub temperature: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tools: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub tool_choice: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMResponseChoice { + #[serde(default)] + pub index: u32, + pub message: GLMMessage, + #[serde(default)] + pub finish_reason: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMResponse { + pub id: String, + pub object: String, + pub created: u64, + pub model: String, + pub choices: Vec, + #[serde(default)] + pub usage: Option, +} + +// Streaming structures +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct GLMStreamDelta { + #[serde(default)] + pub content: Option, + #[serde(default)] + pub role: Option, + #[serde(default)] + pub tool_calls: Option>, + #[serde(default)] + pub reasoning_content: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMStreamChoice { + #[serde(default)] + pub index: u32, + #[serde(default)] + pub delta: GLMStreamDelta, + #[serde(default)] + pub finish_reason: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GLMStreamChunk { + pub id: String, + pub object: String, + pub created: u64, + pub model: String, + pub choices: Vec, + #[serde(default)] + pub usage: Option, +} + +#[derive(Debug)] +pub struct GLMClient { + client: reqwest::Client, + base_url: String, +} + +impl GLMClient { + pub fn new(base_url: String) -> Self { + // For z.ai GLM API: + // - Base URL typically is: https://api.z.ai/api/coding/paas/v4 + // - Endpoint path is: /chat/completions + // - Full URL becomes: https://api.z.ai/api/coding/paas/v4/chat/completions + + // Remove trailing slash from base_url if present + let base = base_url.trim_end_matches('/').to_string(); + + Self { + client: reqwest::Client::new(), + base_url: base, + } + } + + fn build_url(&self) -> String { + // GLM/z.ai uses /chat/completions (not /v1/chat/completions) + format!("{}/chat/completions", self.base_url) + } +} + +#[async_trait] +impl LLMProvider for GLMClient { + async fn generate( + &self, + prompt: &str, + _config: &Value, + model: &str, + key: &str, + ) -> Result> { + let messages = vec![GLMMessage { + role: "user".to_string(), + content: Some(prompt.to_string()), + tool_calls: None, + }]; + + // Use glm-4.7 instead of glm-4 for z.ai API + let model_name = if model == "glm-4" { "glm-4.7" } else { model }; + + let request = GLMRequest { + model: model_name.to_string(), + messages, + stream: Some(false), + max_tokens: None, + temperature: None, + tools: None, + tool_choice: None, + }; + + let url = self.build_url(); + info!("GLM non-streaming request to: {}", url); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", key)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await.unwrap_or_default(); + error!("GLM API error: {}", error_text); + return Err(format!("GLM API error: {}", error_text).into()); + } + + let glm_response: GLMResponse = response.json().await?; + let content = glm_response + .choices + .first() + .and_then(|c| c.message.content.clone()) + .unwrap_or_default(); + + Ok(content) + } + + async fn generate_stream( + &self, + prompt: &str, + config: &Value, + tx: mpsc::Sender, + model: &str, + key: &str, + tools: Option<&Vec>, + ) -> Result<(), Box> { + // DEBUG: Log what we received + info!("[GLM_DEBUG] config type: {}", config); + info!("[GLM_DEBUG] prompt: '{}'", prompt); + info!("[GLM_DEBUG] config as JSON: {}", serde_json::to_string_pretty(config).unwrap_or_default()); + + // config IS the messages array directly, not nested + let messages = if let Some(msgs) = config.as_array() { + // Convert messages from config format to GLM format + msgs.iter() + .filter_map(|m| { + let role = m.get("role")?.as_str()?; + let content = m.get("content")?.as_str()?; + info!("[GLM_DEBUG] Processing message - role: {}, content: '{}'", role, content); + if !content.is_empty() { + Some(GLMMessage { + role: role.to_string(), + content: Some(content.to_string()), + tool_calls: None, + }) + } else { + info!("[GLM_DEBUG] Skipping empty content message"); + None + } + }) + .collect::>() + } else { + // Fallback to building from prompt + info!("[GLM_DEBUG] No array found, using prompt: '{}'", prompt); + vec![GLMMessage { + role: "user".to_string(), + content: Some(prompt.to_string()), + tool_calls: None, + }] + }; + + // If no messages or all empty, return error + if messages.is_empty() { + return Err("No valid messages in request".into()); + } + + info!("[GLM_DEBUG] Final GLM messages count: {}", messages.len()); + + // Use glm-4.7 for tool calling support + // GLM-4.7 supports standard OpenAI-compatible function calling + let model_name = if model == "glm-4" { "glm-4.7" } else { model }; + + // Set tool_choice to "auto" when tools are present - this tells GLM to automatically decide when to call a tool + let tool_choice = if tools.is_some() { + Some(serde_json::json!("auto")) + } else { + None + }; + + let request = GLMRequest { + model: model_name.to_string(), + messages, + stream: Some(true), + max_tokens: None, + temperature: None, + tools: tools.map(|t| t.clone()), + tool_choice, + }; + + let url = self.build_url(); + info!("GLM streaming request to: {}", url); + + // Log the exact request being sent + let request_json = serde_json::to_string_pretty(&request).unwrap_or_default(); + info!("GLM request body: {}", request_json); + + let response = self + .client + .post(&url) + .header("Authorization", format!("Bearer {}", key)) + .header("Content-Type", "application/json") + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let error_text = response.text().await.unwrap_or_default(); + error!("GLM streaming error: {}", error_text); + return Err(format!("GLM streaming error: {}", error_text).into()); + } + + let mut stream = response.bytes_stream(); + + let mut buffer = Vec::new(); + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result.map_err(|e| format!("Stream error: {}", e))?; + + buffer.extend_from_slice(&chunk); + let data = String::from_utf8_lossy(&buffer); + + // Process SSE lines + for line in data.lines() { + let line = line.trim(); + + if line.is_empty() { + continue; + } + + if line == "data: [DONE]" { + let _ = tx.send(String::new()); // Signal end + return Ok(()); + } + + if line.starts_with("data: ") { + let json_str = line[6..].trim(); + info!("[GLM_SSE] Received SSE line ({} chars): {}", json_str.len(), json_str); + if let Ok(chunk_data) = serde_json::from_str::(json_str) { + if let Some(choices) = chunk_data.get("choices").and_then(|c| c.as_array()) { + for choice in choices { + info!("[GLM_SSE] Processing choice"); + if let Some(delta) = choice.get("delta") { + info!("[GLM_SSE] Delta: {}", serde_json::to_string(delta).unwrap_or_default()); + + // Handle tool_calls (GLM-4.7 standard function calling) + if let Some(tool_calls) = delta.get("tool_calls").and_then(|t| t.as_array()) { + for tool_call in tool_calls { + info!("[GLM_SSE] Tool call detected: {}", serde_json::to_string(tool_call).unwrap_or_default()); + // Send tool_calls as JSON for the calling code to process + let tool_call_json = serde_json::json!({ + "type": "tool_call", + "content": tool_call + }).to_string(); + match tx.send(tool_call_json).await { + Ok(_) => {}, + Err(e) => { + error!("[GLM_TX] Failed to send tool_call to channel: {}", e); + } + } + } + } + + // GLM/z.ai returns both reasoning_content (thinking) and content (response) + // We only send the actual content, ignoring reasoning_content + // This makes GLM behave like OpenAI-compatible APIs + if let Some(content) = delta.get("content").and_then(|c| c.as_str()) { + if !content.is_empty() { + info!("[GLM_TX] Sending to channel: '{}'", content); + match tx.send(content.to_string()).await { + Ok(_) => {}, + Err(e) => { + error!("[GLM_TX] Failed to send to channel: {}", e); + } + } + } + } else { + info!("[GLM_SSE] No content field in delta"); + } + } else { + info!("[GLM_SSE] No delta in choice"); + } + if let Some(reason) = choice.get("finish_reason").and_then(|r| r.as_str()) { + if !reason.is_empty() { + info!("GLM stream finished: {}", reason); + let _ = tx.send(String::new()); + return Ok(()); + } + } + } + } + } + } + } + + // Keep unprocessed data in buffer + if let Some(last_newline) = data.rfind('\n') { + buffer = buffer[last_newline + 1..].to_vec(); + } + } + + let _ = tx.send(String::new()); // Signal completion + Ok(()) + } + + async fn cancel_job( + &self, + _session_id: &str, + ) -> Result<(), Box> { + // GLM doesn't have job cancellation + info!("GLM cancel requested for session {} (no-op)", _session_id); + Ok(()) + } +} diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 852d5b716..ce7c1414a 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -8,11 +8,13 @@ use tokio::sync::{mpsc, RwLock}; pub mod cache; pub mod claude; pub mod episodic_memory; +pub mod glm; pub mod llm_models; pub mod local; pub mod smart_router; pub use claude::ClaudeClient; +pub use glm::GLMClient; pub use llm_models::get_handler; #[async_trait] @@ -32,6 +34,7 @@ pub trait LLMProvider: Send + Sync { tx: mpsc::Sender, model: &str, key: &str, + tools: Option<&Vec>, ) -> Result<(), Box>; async fn cancel_job( @@ -150,10 +153,21 @@ impl OpenAIClient { } } pub fn new(_api_key: String, base_url: Option, endpoint_path: Option) -> Self { + let base = base_url.unwrap_or_else(|| "https://api.openai.com".to_string()); + + // For z.ai API, use different endpoint path + let endpoint = if endpoint_path.is_some() { + endpoint_path.unwrap() + } else if base.contains("z.ai") || base.contains("/v4") { + "/chat/completions".to_string() // z.ai uses /chat/completions, not /v1/chat/completions + } else { + "/v1/chat/completions".to_string() + }; + Self { client: reqwest::Client::new(), - base_url: base_url.unwrap_or_else(|| "https://api.openai.com".to_string()), - endpoint_path: endpoint_path.unwrap_or_else(|| "/v1/chat/completions".to_string()), + base_url: base, + endpoint_path: endpoint, } } @@ -234,6 +248,7 @@ impl LLMProvider for OpenAIClient { info!(" API Key First 8 chars: '{}...'", &key.chars().take(8).collect::()); info!(" API Key Last 8 chars: '...{}'", &key.chars().rev().take(8).collect::()); + // Build the request body (no tools for non-streaming generate) let response = self .client .post(&full_url) @@ -241,7 +256,7 @@ impl LLMProvider for OpenAIClient { .json(&serde_json::json!({ "model": model, "messages": messages, - "stream": true + "stream": false })) .send() .await?; @@ -271,6 +286,7 @@ impl LLMProvider for OpenAIClient { tx: mpsc::Sender, model: &str, key: &str, + tools: Option<&Vec>, ) -> Result<(), Box> { let default_messages = serde_json::json!([{"role": "user", "content": prompt}]); @@ -310,16 +326,30 @@ impl LLMProvider for OpenAIClient { if let Some(msg_array) = messages.as_array() { info!(" Messages: {} messages", msg_array.len()); } + if let Some(tools) = tools { + info!(" Tools: {} tools provided", tools.len()); + } + + // Build the request body - include tools if provided + let mut request_body = serde_json::json!({ + "model": model, + "messages": messages, + "stream": true + }); + + // Add tools to the request if provided + if let Some(tools_value) = tools { + if !tools_value.is_empty() { + request_body["tools"] = serde_json::json!(tools_value); + info!("Added {} tools to LLM request", tools_value.len()); + } + } let response = self .client .post(&full_url) .header("Authorization", &auth_header) - .json(&serde_json::json!({ - "model": model, - "messages": messages, - "stream": true - })) + .json(&request_body) .send() .await?; @@ -371,6 +401,7 @@ pub enum LLMProviderType { OpenAI, Claude, AzureClaude, + GLM, } impl From<&str> for LLMProviderType { @@ -382,6 +413,8 @@ impl From<&str> for LLMProviderType { } else { Self::Claude } + } else if lower.contains("z.ai") || lower.contains("glm") { + Self::GLM } else { Self::OpenAI } @@ -415,6 +448,10 @@ pub fn create_llm_provider( ); std::sync::Arc::new(ClaudeClient::azure(base_url, deployment)) } + LLMProviderType::GLM => { + info!("Creating GLM/z.ai LLM provider with URL: {}", base_url); + std::sync::Arc::new(GLMClient::new(base_url)) + } } } @@ -481,10 +518,11 @@ impl LLMProvider for DynamicLLMProvider { tx: mpsc::Sender, model: &str, key: &str, + tools: Option<&Vec>, ) -> Result<(), Box> { self.get_provider() .await - .generate_stream(prompt, config, tx, model, key) + .generate_stream(prompt, config, tx, model, key, tools) .await } diff --git a/src/main.rs b/src/main.rs index 5117c8db4..fa89425bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1399,6 +1399,7 @@ async fn main() -> std::io::Result<()> { .get_config(&default_bot_id, "llm-key", Some("")) .unwrap_or_default(); + // LLM endpoint path configuration let llm_endpoint_path = config_manager .get_config( &default_bot_id, @@ -1652,6 +1653,12 @@ async fn main() -> std::io::Result<()> { info!("Found {} active bots to monitor", bots_to_monitor.len()); for (bot_id, bot_name) in bots_to_monitor { + // Skip default bot - it's managed locally via ConfigWatcher + if bot_name == "default" { + info!("Skipping DriveMonitor for 'default' bot - managed via ConfigWatcher"); + continue; + } + let bucket_name = format!("{}.gbai", bot_name); let monitor_state = drive_monitor_state.clone(); let bot_id_clone = bot_id; @@ -1682,6 +1689,46 @@ async fn main() -> std::io::Result<()> { }); } + #[cfg(feature = "drive")] + { + // Start local file monitor for ~/data/*.gbai directories + let local_monitor_state = app_state.clone(); + tokio::spawn(async move { + register_thread("local-file-monitor", "drive"); + trace!("Starting LocalFileMonitor for ~/data/*.gbai directories"); + let monitor = crate::drive::local_file_monitor::LocalFileMonitor::new(local_monitor_state); + if let Err(e) = monitor.start_monitoring().await { + error!("LocalFileMonitor failed: {}", e); + } else { + info!("LocalFileMonitor started - watching ~/data/*.gbai/.gbdialog/*.bas"); + } + }); + } + + #[cfg(feature = "drive")] + { + // Start config file watcher for ~/data/*.gbai/*.gbot/config.csv + let config_watcher_state = app_state.clone(); + tokio::spawn(async move { + register_thread("config-file-watcher", "drive"); + trace!("Starting ConfigWatcher for ~/data/*.gbai/*.gbot/config.csv"); + + // Determine data directory + let data_dir = std::env::var("DATA_DIR") + .or_else(|_| std::env::var("HOME").map(|h| format!("{}/data", h))) + .unwrap_or_else(|_| "./botserver-stack/data".to_string()); + let data_dir = std::path::PathBuf::from(data_dir); + + let watcher = crate::core::config::watcher::ConfigWatcher::new( + data_dir, + config_watcher_state, + ); + Arc::new(watcher).spawn(); + + info!("ConfigWatcher started - watching ~/data/*.gbai/*.gbot/config.csv"); + }); + } + #[cfg(feature = "automation")] { let automation_state = app_state.clone();