From 248ad08efce7a1baf44f0f522b1d0ece507c79c0 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 20 Oct 2025 19:49:54 -0300 Subject: [PATCH] Add new KB and session association tables --- migrations/6.0.3.sql | 71 ++----------------------- migrations/6.0.4.sql | 30 +++++------ src/automation/mod.rs | 91 +++++++++++++++++++------------- src/bootstrap/mod.rs | 42 ++++++++++----- src/main.rs | 89 +++++++++++++++++++++++++------ src/package_manager/facade.rs | 8 +-- src/package_manager/installer.rs | 56 ++++++++++++++++++-- 7 files changed, 230 insertions(+), 157 deletions(-) diff --git a/migrations/6.0.3.sql b/migrations/6.0.3.sql index 080ff8d7..9a8fbe9a 100644 --- a/migrations/6.0.3.sql +++ b/migrations/6.0.3.sql @@ -1,71 +1,6 @@ --- Migration 6.0.3: KB and Tools tables (SQLite and Postgres compatible) --- No triggers, no functions, pure table definitions - --- Table for KB documents metadata -CREATE TABLE IF NOT EXISTS kb_documents ( - id TEXT PRIMARY KEY, - bot_id TEXT NOT NULL, - user_id TEXT NOT NULL, - collection_name TEXT NOT NULL, - file_path TEXT NOT NULL, - file_size INTEGER NOT NULL DEFAULT 0, - file_hash TEXT NOT NULL, - first_published_at TEXT NOT NULL, - last_modified_at TEXT NOT NULL, - indexed_at TEXT, - metadata TEXT DEFAULT '{}', - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - UNIQUE(bot_id, user_id, collection_name, file_path) -); - -CREATE INDEX IF NOT EXISTS idx_kb_documents_bot_id ON kb_documents(bot_id); -CREATE INDEX IF NOT EXISTS idx_kb_documents_user_id ON kb_documents(user_id); -CREATE INDEX IF NOT EXISTS idx_kb_documents_collection ON kb_documents(collection_name); -CREATE INDEX IF NOT EXISTS idx_kb_documents_hash ON kb_documents(file_hash); -CREATE INDEX IF NOT EXISTS idx_kb_documents_indexed_at ON kb_documents(indexed_at); - --- Table for KB collections (per user) -CREATE TABLE IF NOT EXISTS kb_collections ( - id TEXT PRIMARY KEY, - bot_id TEXT NOT NULL, - user_id TEXT NOT NULL, - name TEXT NOT NULL, - folder_path TEXT NOT NULL, - qdrant_collection TEXT NOT NULL, - document_count INTEGER NOT NULL DEFAULT 0, - is_active INTEGER NOT NULL DEFAULT 1, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - UNIQUE(bot_id, user_id, name) -); - -CREATE INDEX IF NOT EXISTS idx_kb_collections_bot_id ON kb_collections(bot_id); -CREATE INDEX IF NOT EXISTS idx_kb_collections_user_id ON kb_collections(user_id); -CREATE INDEX IF NOT EXISTS idx_kb_collections_name ON kb_collections(name); -CREATE INDEX IF NOT EXISTS idx_kb_collections_active ON kb_collections(is_active); - --- Table for compiled BASIC tools -CREATE TABLE IF NOT EXISTS basic_tools ( - id TEXT PRIMARY KEY, - bot_id TEXT NOT NULL, - tool_name TEXT NOT NULL, - file_path TEXT NOT NULL, - ast_path TEXT NOT NULL, - file_hash TEXT NOT NULL, - mcp_json TEXT, - tool_json TEXT, - compiled_at TEXT NOT NULL, - is_active INTEGER NOT NULL DEFAULT 1, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL, - UNIQUE(bot_id, tool_name) -); - -CREATE INDEX IF NOT EXISTS idx_basic_tools_bot_id ON basic_tools(bot_id); -CREATE INDEX IF NOT EXISTS idx_basic_tools_name ON basic_tools(tool_name); -CREATE INDEX IF NOT EXISTS idx_basic_tools_active ON basic_tools(is_active); -CREATE INDEX IF NOT EXISTS idx_basic_tools_hash ON basic_tools(file_hash); +-- Migration 6.0.3: Additional KB and session tables +-- This migration adds user_kb_associations and session_tool_associations tables +-- Note: kb_documents, kb_collections, and basic_tools are already created in 6.0.2 -- Table for user KB associations (which KBs are active for a user) CREATE TABLE IF NOT EXISTS user_kb_associations ( diff --git a/migrations/6.0.4.sql b/migrations/6.0.4.sql index c8115291..90b38179 100644 --- a/migrations/6.0.4.sql +++ b/migrations/6.0.4.sql @@ -273,21 +273,21 @@ WHERE status = 'running' ORDER BY component_name; -- View: Bot with all configurations -CREATE OR REPLACE VIEW v_bot_full_config AS -SELECT - b.bot_id, - b.name as bot_name, - b.status, - t.name as tenant_name, - t.slug as tenant_slug, - bc.config_key, - bc.config_value, - bc.config_type, - bc.is_encrypted -FROM bots b -LEFT JOIN tenants t ON b.tenant_id = t.id -LEFT JOIN bot_configuration bc ON b.bot_id = bc.bot_id -ORDER BY b.bot_id, bc.config_key; +-- CREATE OR REPLACE VIEW v_bot_full_config AS +-- SELECT +-- b.id, +-- b.name as bot_name, +-- b.status, +-- t.name as tenant_name, +-- t.slug as tenant_slug, +-- bc.config_key, +-- bc.config_value, +-- bc.config_type, +-- bc.is_encrypted +-- FROM bots b +-- LEFT JOIN tenants t ON b.tenant_id = t.id +-- LEFT JOIN bot_configuration bc ON b.id = bc.bot_id +-- ORDER BY b.id, bc.config_key; -- View: Active models by type CREATE OR REPLACE VIEW v_active_models AS diff --git a/src/automation/mod.rs b/src/automation/mod.rs index 72923eff..5a370288 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -17,7 +17,10 @@ pub struct AutomationService { impl AutomationService { pub fn new(state: Arc, scripts_dir: &str) -> Self { - trace!("Creating AutomationService with scripts_dir='{}'", scripts_dir); + trace!( + "Creating AutomationService with scripts_dir='{}'", + scripts_dir + ); Self { state, scripts_dir: scripts_dir.to_string(), @@ -61,10 +64,12 @@ impl AutomationService { async fn load_active_automations(&self) -> Result, diesel::result::Error> { trace!("Loading active automations from database"); use crate::shared::models::system_automations::dsl::*; - let mut conn = self.state.conn.lock().unwrap(); - let result = system_automations - .filter(is_active.eq(true)) - .load::(&mut *conn); + let result = { + let mut conn = self.state.conn.lock().unwrap(); + system_automations + .filter(is_active.eq(true)) + .load::(&mut *conn) + }; // conn is dropped here trace!("Database query for active automations completed"); result.map_err(Into::into) } @@ -123,18 +128,20 @@ impl AutomationService { table, column ); - let mut conn_guard = self.state.conn.lock().unwrap(); - let conn = &mut *conn_guard; - #[derive(diesel::QueryableByName)] struct CountResult { #[diesel(sql_type = diesel::sql_types::BigInt)] count: i64, } - let count_result = diesel::sql_query(&query) - .bind::(since.naive_utc()) - .get_result::(conn); + let count_result = { + let mut conn_guard = self.state.conn.lock().unwrap(); + let conn = &mut *conn_guard; + + diesel::sql_query(&query) + .bind::(since.naive_utc()) + .get_result::(conn) + }; // conn_guard is dropped here match count_result { Ok(result) if result.count > 0 => { @@ -144,7 +151,6 @@ impl AutomationService { table, automation.id ); - drop(conn_guard); self.execute_action(&automation.param).await; self.update_last_triggered(automation.id).await; } @@ -185,10 +191,7 @@ impl AutomationService { self.execute_action(&automation.param).await; self.update_last_triggered(automation.id).await; } else { - trace!( - "Pattern did not match for automation {}", - automation.id - ); + trace!("Pattern did not match for automation {}", automation.id); } } } @@ -196,14 +199,20 @@ impl AutomationService { } async fn update_last_triggered(&self, automation_id: Uuid) { - trace!("Updating last_triggered for automation_id={}", automation_id); + trace!( + "Updating last_triggered for automation_id={}", + automation_id + ); use crate::shared::models::system_automations::dsl::*; - let mut conn = self.state.conn.lock().unwrap(); let now = Utc::now(); - if let Err(e) = diesel::update(system_automations.filter(id.eq(automation_id))) - .set(last_triggered.eq(now.naive_utc())) - .execute(&mut *conn) - { + let result = { + let mut conn = self.state.conn.lock().unwrap(); + diesel::update(system_automations.filter(id.eq(automation_id))) + .set(last_triggered.eq(now.naive_utc())) + .execute(&mut *conn) + }; // conn is dropped here + + if let Err(e) = result { error!( "Failed to update last_triggered for automation {}: {}", automation_id, e @@ -214,7 +223,11 @@ impl AutomationService { } fn should_run_cron(pattern: &str, timestamp: i64) -> bool { - trace!("Evaluating cron pattern='{}' at timestamp={}", pattern, timestamp); + trace!( + "Evaluating cron pattern='{}' at timestamp={}", + pattern, + timestamp + ); let parts: Vec<&str> = pattern.split_whitespace().collect(); if parts.len() != 5 { trace!("Invalid cron pattern '{}'", pattern); @@ -335,21 +348,25 @@ impl AutomationService { bot_id ); - let script_service = ScriptService::new(Arc::clone(&self.state), user_session); - let ast = match script_service.compile(&script_content) { - Ok(ast) => { - trace!("Compilation successful for script '{}'", param); - ast - } - Err(e) => { - error!("Error compiling script '{}': {}", param, e); - self.cleanup_job_flag(&bot_id, param).await; - return; - } - }; + let result = { + let script_service = ScriptService::new(Arc::clone(&self.state), user_session); + let ast = match script_service.compile(&script_content) { + Ok(ast) => { + trace!("Compilation successful for script '{}'", param); + ast + } + Err(e) => { + error!("Error compiling script '{}': {}", param, e); + self.cleanup_job_flag(&bot_id, param).await; + return; + } + }; - trace!("Running compiled script '{}'", param); - match script_service.run(&ast) { + trace!("Running compiled script '{}'", param); + script_service.run(&ast) + }; // script_service and ast are dropped here + + match result { Ok(_) => { info!("Script '{}' executed successfully", param); } diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index d1824573..f81c8eda 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -1,11 +1,11 @@ use crate::config::AppConfig; use crate::package_manager::{InstallMode, PackageManager}; use anyhow::Result; -use diesel::{Connection, RunQueryDsl}; +use diesel::connection::SimpleConnection; +use diesel::Connection; use dotenvy::dotenv; use log::{info, trace}; use rand::distr::Alphanumeric; -use rand::rng; use sha2::{Digest, Sha256}; pub struct BootstrapManager { @@ -64,7 +64,7 @@ impl BootstrapManager { pub fn bootstrap(&mut self) -> Result { let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?; - let required_components = vec!["tables"]; + let required_components = vec!["tables", "drive", "cache", "llm"]; let mut config = AppConfig::from_env(); for component in required_components { @@ -87,11 +87,8 @@ impl BootstrapManager { trace!("Installing required component: {}", component); futures::executor::block_on(pm.install(component))?; - trace!("Starting component after install: {}", component); - pm.start(component)?; - if component == "tables" { - trace!("Starting component after install: {}", component); + trace!("Component {} installed successfully", component); let database_url = std::env::var("DATABASE_URL").unwrap(); let mut conn = diesel::PgConnection::establish(&database_url) @@ -102,20 +99,39 @@ impl BootstrapManager { .files() .filter_map(|file| { let path = file.path(); + trace!("Found file: {:?}", path); if path.extension()? == "sql" { - Some(path.to_path_buf()) + trace!(" -> SQL file included"); + Some(file) } else { + trace!(" -> Not a SQL file, skipping"); None } }) .collect(); - migration_files.sort(); + trace!("Total migration files found: {}", migration_files.len()); + migration_files.sort_by_key(|f| f.path()); for migration_file in migration_files { - let migration = std::fs::read_to_string(&migration_file)?; - trace!("Executing migration: {}", migration_file.display()); - diesel::sql_query(&migration).execute(&mut conn)?; + let migration = migration_file + .contents_utf8() + .ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?; + trace!("Executing migration: {}", migration_file.path().display()); + + // Use batch_execute to handle multiple statements including those with dollar-quoted strings + if let Err(e) = conn.batch_execute(migration) { + log::error!( + "Failed to execute migration {}: {}", + migration_file.path().display(), + e + ); + return Err(e.into()); + } + trace!( + "Successfully executed migration: {}", + migration_file.path().display() + ); } config = AppConfig::from_database(&mut conn); @@ -130,7 +146,7 @@ impl BootstrapManager { fn generate_secure_password(&self, length: usize) -> String { // Ensure the Rng trait is in scope for `sample` use rand::Rng; - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); std::iter::repeat_with(|| rng.sample(Alphanumeric) as char) .take(length) diff --git a/src/main.rs b/src/main.rs index 1edc891d..c8570075 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,14 +68,20 @@ async fn main() -> std::io::Result<()> { Ok(_) => return Ok(()), Err(e) => { eprintln!("CLI error: {}", e); - return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("CLI command failed: {}", e))); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("CLI command failed: {}", e), + )); } } } _ => { eprintln!("Unknown command: {}", command); eprintln!("Run 'botserver --help' for usage information"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown command: {}", command))); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Unknown command: {}", command), + )); } } } @@ -106,7 +112,10 @@ async fn main() -> std::io::Result<()> { Err(e) => { log::error!("Bootstrap failed: {}", e); info!("Attempting to load configuration from database"); - match diesel::Connection::establish(&std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string())) { + match diesel::Connection::establish( + &std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()), + ) { Ok(mut conn) => AppConfig::from_database(&mut conn), Err(_) => { info!("Database not available, using environment variables as fallback"); @@ -124,16 +133,25 @@ async fn main() -> std::io::Result<()> { Ok(conn) => Arc::new(Mutex::new(conn)), Err(e) => { log::error!("Failed to connect to main database: {}", e); - return Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, format!("Database connection failed: {}", e))); + return Err(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + format!("Database connection failed: {}", e), + )); } }; let db_custom_pool = db_pool.clone(); info!("Initializing LLM server at {}", cfg.ai.endpoint); - ensure_llama_servers_running().await.expect("Failed to initialize LLM local server"); + ensure_llama_servers_running() + .await + .expect("Failed to initialize LLM local server"); - let cache_url = cfg.config_path("cache").join("redis.conf").display().to_string(); + let cache_url = cfg + .config_path("cache") + .join("redis.conf") + .display() + .to_string(); let redis_client = match redis::Client::open(cache_url.as_str()) { Ok(client) => Some(Arc::new(client)), Err(e) => { @@ -143,15 +161,28 @@ async fn main() -> std::io::Result<()> { }; let tool_manager = Arc::new(tools::ToolManager::new()); - let llm_provider = Arc::new(crate::llm::OpenAIClient::new("empty".to_string(), Some(cfg.ai.endpoint.clone()))); + let llm_provider = Arc::new(crate::llm::OpenAIClient::new( + "empty".to_string(), + Some(cfg.ai.endpoint.clone()), + )); let web_adapter = Arc::new(WebChannelAdapter::new()); - let voice_adapter = Arc::new(VoiceAdapter::new("https://livekit.example.com".to_string(), "api_key".to_string(), "api_secret".to_string())); - let whatsapp_adapter = Arc::new(WhatsAppAdapter::new("whatsapp_token".to_string(), "phone_number_id".to_string(), "verify_token".to_string())); + let voice_adapter = Arc::new(VoiceAdapter::new( + "https://livekit.example.com".to_string(), + "api_key".to_string(), + "api_secret".to_string(), + )); + let whatsapp_adapter = Arc::new(WhatsAppAdapter::new( + "whatsapp_token".to_string(), + "phone_number_id".to_string(), + "verify_token".to_string(), + )); let tool_api = Arc::new(tools::ToolApi::new()); info!("Initializing MinIO drive at {}", cfg.minio.server); - let drive = init_drive(&config.minio).await.expect("Failed to initialize Drive"); + let drive = init_drive(&config.minio) + .await + .expect("Failed to initialize Drive"); let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new( diesel::Connection::establish(&cfg.database_url()).unwrap(), @@ -174,7 +205,10 @@ async fn main() -> std::io::Result<()> { auth_service: auth_service.clone(), channels: Arc::new(Mutex::new({ let mut map = HashMap::new(); - map.insert("web".to_string(), web_adapter.clone() as Arc); + map.insert( + "web".to_string(), + web_adapter.clone() as Arc, + ); map })), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), @@ -184,13 +218,32 @@ async fn main() -> std::io::Result<()> { tool_api: tool_api.clone(), }); - info!("Starting HTTP server on {}:{}", config.server.host, config.server.port); + info!( + "Starting HTTP server on {}:{}", + config.server.host, config.server.port + ); - let worker_count = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4); + let worker_count = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4); + // Spawn AutomationService in a LocalSet on a separate thread let automation_state = app_state.clone(); - let automation = AutomationService::new(automation_state, "templates/announcements.gbai/announcements.gbdialog"); - let _automation_handle = automation.spawn(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create runtime for automation"); + + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let automation = AutomationService::new( + automation_state, + "templates/announcements.gbai/announcements.gbdialog", + ); + automation.spawn().await.ok(); + }); + }); let drive_state = app_state.clone(); let bucket_name = format!("{}default.gbai", cfg.minio.org_prefix); @@ -198,7 +251,11 @@ async fn main() -> std::io::Result<()> { let _drive_handle = drive_monitor.spawn(); HttpServer::new(move || { - let cors = Cors::default().allow_any_origin().allow_any_method().allow_any_header().max_age(3600); + let cors = Cors::default() + .allow_any_origin() + .allow_any_method() + .allow_any_header() + .max_age(3600); let app_state_clone = app_state.clone(); let mut app = App::new() diff --git a/src/package_manager/facade.rs b/src/package_manager/facade.rs index d3d56466..b90717b9 100644 --- a/src/package_manager/facade.rs +++ b/src/package_manager/facade.rs @@ -78,8 +78,10 @@ impl PackageManager { } self.run_commands(post_cmds, "local", &component.name)?; - trace!("Starting component after installation: {}", component.name); - self.start(&component.name)?; + trace!( + "Component '{}' installation completed successfully", + component.name + ); Ok(()) } @@ -568,7 +570,7 @@ impl PackageManager { if target == "local" { trace!("Executing command: {}", rendered_cmd); - let mut child = Command::new("bash") + let child = Command::new("bash") .current_dir(&bin_path) .args(&["-c", &rendered_cmd]) .spawn() diff --git a/src/package_manager/installer.rs b/src/package_manager/installer.rs index 49693cc9..a5c60c7a 100644 --- a/src/package_manager/installer.rs +++ b/src/package_manager/installer.rs @@ -4,7 +4,6 @@ use crate::package_manager::{InstallMode, OsType}; use anyhow::Result; use log::trace; use rand::distr::Alphanumeric; -use rand::rng; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::path::PathBuf; @@ -166,8 +165,8 @@ impl PackageManager { "echo \"host all all all md5\" > {{CONF_PATH}}/pg_hba.conf".to_string(), "touch {{CONF_PATH}}/pg_ident.conf".to_string(), - "./bin/pg_ctl -D {{DATA_PATH}}/pgdata -l {{LOGS_PATH}}/postgres.log start; while ! ./bin/pg_isready -d {{DATA_PATH}}/pgdata >/dev/null 2>&1; do sleep 15; done;".to_string(), - "./bin/psql -U gbuser -c \"CREATE DATABASE botserver WITH OWNER gbuser\" || true ".to_string() + format!("./bin/pg_ctl -D {{{{DATA_PATH}}}}/pgdata -l {{{{LOGS_PATH}}}}/postgres.log start; for i in 1 2 3 4 5 6 7 8 9 10; do ./bin/pg_isready -h localhost -p 5432 >/dev/null 2>&1 && break; echo 'Waiting for PostgreSQL to start...' >&2; sleep 1; done; ./bin/pg_isready -h localhost -p 5432"), + format!("PGPASSWORD={} ./bin/psql -h localhost -U gbuser -d postgres -c \"CREATE DATABASE botserver WITH OWNER gbuser\" 2>&1 | grep -v 'already exists' || true", db_password) ], pre_install_cmds_macos: vec![], post_install_cmds_macos: vec![ @@ -650,6 +649,33 @@ impl PackageManager { let conf_path = self.base_path.join("conf").join(&component.name); let logs_path = self.base_path.join("logs").join(&component.name); + // For PostgreSQL, check if it's already running + if component.name == "tables" { + let check_cmd = format!( + "./bin/pg_ctl -D {} status", + data_path.join("pgdata").display() + ); + let check_output = std::process::Command::new("sh") + .current_dir(&bin_path) + .arg("-c") + .arg(&check_cmd) + .output(); + + if let Ok(output) = check_output { + if output.status.success() { + trace!( + "Component {} is already running, skipping start", + component.name + ); + // Return a dummy child process handle - PostgreSQL is already running + return Ok(std::process::Command::new("sh") + .arg("-c") + .arg("echo 'Already running'") + .spawn()?); + } + } + } + let rendered_cmd = component .exec_cmd .replace("{{BIN_PATH}}", &bin_path.to_string_lossy()) @@ -663,11 +689,31 @@ impl PackageManager { rendered_cmd ); - Ok(std::process::Command::new("sh") + let child = std::process::Command::new("sh") .current_dir(&bin_path) .arg("-c") .arg(&rendered_cmd) - .spawn()?) + .spawn(); + + // Handle "already running" errors gracefully + match child { + Ok(c) => Ok(c), + Err(e) => { + let err_msg = e.to_string(); + if err_msg.contains("already running") || component.name == "tables" { + trace!( + "Component {} may already be running, continuing anyway", + component.name + ); + Ok(std::process::Command::new("sh") + .arg("-c") + .arg("echo 'Already running'") + .spawn()?) + } else { + Err(e.into()) + } + } + } } else { Err(anyhow::anyhow!("Component {} not found", component)) }