Add new KB and session association tables

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-20 19:49:54 -03:00
parent de5b651b89
commit 248ad08efc
7 changed files with 230 additions and 157 deletions

View file

@ -1,71 +1,6 @@
-- Migration 6.0.3: KB and Tools tables (SQLite and Postgres compatible)
-- No triggers, no functions, pure table definitions
-- Table for KB documents metadata
CREATE TABLE IF NOT EXISTS kb_documents (
id TEXT PRIMARY KEY,
bot_id TEXT NOT NULL,
user_id TEXT NOT NULL,
collection_name TEXT NOT NULL,
file_path TEXT NOT NULL,
file_size INTEGER NOT NULL DEFAULT 0,
file_hash TEXT NOT NULL,
first_published_at TEXT NOT NULL,
last_modified_at TEXT NOT NULL,
indexed_at TEXT,
metadata TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(bot_id, user_id, collection_name, file_path)
);
CREATE INDEX IF NOT EXISTS idx_kb_documents_bot_id ON kb_documents(bot_id);
CREATE INDEX IF NOT EXISTS idx_kb_documents_user_id ON kb_documents(user_id);
CREATE INDEX IF NOT EXISTS idx_kb_documents_collection ON kb_documents(collection_name);
CREATE INDEX IF NOT EXISTS idx_kb_documents_hash ON kb_documents(file_hash);
CREATE INDEX IF NOT EXISTS idx_kb_documents_indexed_at ON kb_documents(indexed_at);
-- Table for KB collections (per user)
CREATE TABLE IF NOT EXISTS kb_collections (
id TEXT PRIMARY KEY,
bot_id TEXT NOT NULL,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
folder_path TEXT NOT NULL,
qdrant_collection TEXT NOT NULL,
document_count INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(bot_id, user_id, name)
);
CREATE INDEX IF NOT EXISTS idx_kb_collections_bot_id ON kb_collections(bot_id);
CREATE INDEX IF NOT EXISTS idx_kb_collections_user_id ON kb_collections(user_id);
CREATE INDEX IF NOT EXISTS idx_kb_collections_name ON kb_collections(name);
CREATE INDEX IF NOT EXISTS idx_kb_collections_active ON kb_collections(is_active);
-- Table for compiled BASIC tools
CREATE TABLE IF NOT EXISTS basic_tools (
id TEXT PRIMARY KEY,
bot_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
file_path TEXT NOT NULL,
ast_path TEXT NOT NULL,
file_hash TEXT NOT NULL,
mcp_json TEXT,
tool_json TEXT,
compiled_at TEXT NOT NULL,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(bot_id, tool_name)
);
CREATE INDEX IF NOT EXISTS idx_basic_tools_bot_id ON basic_tools(bot_id);
CREATE INDEX IF NOT EXISTS idx_basic_tools_name ON basic_tools(tool_name);
CREATE INDEX IF NOT EXISTS idx_basic_tools_active ON basic_tools(is_active);
CREATE INDEX IF NOT EXISTS idx_basic_tools_hash ON basic_tools(file_hash);
-- Migration 6.0.3: Additional KB and session tables
-- This migration adds user_kb_associations and session_tool_associations tables
-- Note: kb_documents, kb_collections, and basic_tools are already created in 6.0.2
-- Table for user KB associations (which KBs are active for a user)
CREATE TABLE IF NOT EXISTS user_kb_associations (

View file

@ -273,21 +273,21 @@ WHERE status = 'running'
ORDER BY component_name;
-- View: Bot with all configurations
CREATE OR REPLACE VIEW v_bot_full_config AS
SELECT
b.bot_id,
b.name as bot_name,
b.status,
t.name as tenant_name,
t.slug as tenant_slug,
bc.config_key,
bc.config_value,
bc.config_type,
bc.is_encrypted
FROM bots b
LEFT JOIN tenants t ON b.tenant_id = t.id
LEFT JOIN bot_configuration bc ON b.bot_id = bc.bot_id
ORDER BY b.bot_id, bc.config_key;
-- CREATE OR REPLACE VIEW v_bot_full_config AS
-- SELECT
-- b.id,
-- b.name as bot_name,
-- b.status,
-- t.name as tenant_name,
-- t.slug as tenant_slug,
-- bc.config_key,
-- bc.config_value,
-- bc.config_type,
-- bc.is_encrypted
-- FROM bots b
-- LEFT JOIN tenants t ON b.tenant_id = t.id
-- LEFT JOIN bot_configuration bc ON b.id = bc.bot_id
-- ORDER BY b.id, bc.config_key;
-- View: Active models by type
CREATE OR REPLACE VIEW v_active_models AS

View file

@ -17,7 +17,10 @@ pub struct AutomationService {
impl AutomationService {
pub fn new(state: Arc<AppState>, scripts_dir: &str) -> Self {
trace!("Creating AutomationService with scripts_dir='{}'", scripts_dir);
trace!(
"Creating AutomationService with scripts_dir='{}'",
scripts_dir
);
Self {
state,
scripts_dir: scripts_dir.to_string(),
@ -61,10 +64,12 @@ impl AutomationService {
async fn load_active_automations(&self) -> Result<Vec<Automation>, diesel::result::Error> {
trace!("Loading active automations from database");
use crate::shared::models::system_automations::dsl::*;
let mut conn = self.state.conn.lock().unwrap();
let result = system_automations
.filter(is_active.eq(true))
.load::<Automation>(&mut *conn);
let result = {
let mut conn = self.state.conn.lock().unwrap();
system_automations
.filter(is_active.eq(true))
.load::<Automation>(&mut *conn)
}; // conn is dropped here
trace!("Database query for active automations completed");
result.map_err(Into::into)
}
@ -123,18 +128,20 @@ impl AutomationService {
table, column
);
let mut conn_guard = self.state.conn.lock().unwrap();
let conn = &mut *conn_guard;
#[derive(diesel::QueryableByName)]
struct CountResult {
#[diesel(sql_type = diesel::sql_types::BigInt)]
count: i64,
}
let count_result = diesel::sql_query(&query)
.bind::<diesel::sql_types::Timestamp, _>(since.naive_utc())
.get_result::<CountResult>(conn);
let count_result = {
let mut conn_guard = self.state.conn.lock().unwrap();
let conn = &mut *conn_guard;
diesel::sql_query(&query)
.bind::<diesel::sql_types::Timestamp, _>(since.naive_utc())
.get_result::<CountResult>(conn)
}; // conn_guard is dropped here
match count_result {
Ok(result) if result.count > 0 => {
@ -144,7 +151,6 @@ impl AutomationService {
table,
automation.id
);
drop(conn_guard);
self.execute_action(&automation.param).await;
self.update_last_triggered(automation.id).await;
}
@ -185,10 +191,7 @@ impl AutomationService {
self.execute_action(&automation.param).await;
self.update_last_triggered(automation.id).await;
} else {
trace!(
"Pattern did not match for automation {}",
automation.id
);
trace!("Pattern did not match for automation {}", automation.id);
}
}
}
@ -196,14 +199,20 @@ impl AutomationService {
}
async fn update_last_triggered(&self, automation_id: Uuid) {
trace!("Updating last_triggered for automation_id={}", automation_id);
trace!(
"Updating last_triggered for automation_id={}",
automation_id
);
use crate::shared::models::system_automations::dsl::*;
let mut conn = self.state.conn.lock().unwrap();
let now = Utc::now();
if let Err(e) = diesel::update(system_automations.filter(id.eq(automation_id)))
.set(last_triggered.eq(now.naive_utc()))
.execute(&mut *conn)
{
let result = {
let mut conn = self.state.conn.lock().unwrap();
diesel::update(system_automations.filter(id.eq(automation_id)))
.set(last_triggered.eq(now.naive_utc()))
.execute(&mut *conn)
}; // conn is dropped here
if let Err(e) = result {
error!(
"Failed to update last_triggered for automation {}: {}",
automation_id, e
@ -214,7 +223,11 @@ impl AutomationService {
}
fn should_run_cron(pattern: &str, timestamp: i64) -> bool {
trace!("Evaluating cron pattern='{}' at timestamp={}", pattern, timestamp);
trace!(
"Evaluating cron pattern='{}' at timestamp={}",
pattern,
timestamp
);
let parts: Vec<&str> = pattern.split_whitespace().collect();
if parts.len() != 5 {
trace!("Invalid cron pattern '{}'", pattern);
@ -335,21 +348,25 @@ impl AutomationService {
bot_id
);
let script_service = ScriptService::new(Arc::clone(&self.state), user_session);
let ast = match script_service.compile(&script_content) {
Ok(ast) => {
trace!("Compilation successful for script '{}'", param);
ast
}
Err(e) => {
error!("Error compiling script '{}': {}", param, e);
self.cleanup_job_flag(&bot_id, param).await;
return;
}
};
let result = {
let script_service = ScriptService::new(Arc::clone(&self.state), user_session);
let ast = match script_service.compile(&script_content) {
Ok(ast) => {
trace!("Compilation successful for script '{}'", param);
ast
}
Err(e) => {
error!("Error compiling script '{}': {}", param, e);
self.cleanup_job_flag(&bot_id, param).await;
return;
}
};
trace!("Running compiled script '{}'", param);
match script_service.run(&ast) {
trace!("Running compiled script '{}'", param);
script_service.run(&ast)
}; // script_service and ast are dropped here
match result {
Ok(_) => {
info!("Script '{}' executed successfully", param);
}

View file

@ -1,11 +1,11 @@
use crate::config::AppConfig;
use crate::package_manager::{InstallMode, PackageManager};
use anyhow::Result;
use diesel::{Connection, RunQueryDsl};
use diesel::connection::SimpleConnection;
use diesel::Connection;
use dotenvy::dotenv;
use log::{info, trace};
use rand::distr::Alphanumeric;
use rand::rng;
use sha2::{Digest, Sha256};
pub struct BootstrapManager {
@ -64,7 +64,7 @@ impl BootstrapManager {
pub fn bootstrap(&mut self) -> Result<AppConfig> {
let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?;
let required_components = vec!["tables"];
let required_components = vec!["tables", "drive", "cache", "llm"];
let mut config = AppConfig::from_env();
for component in required_components {
@ -87,11 +87,8 @@ impl BootstrapManager {
trace!("Installing required component: {}", component);
futures::executor::block_on(pm.install(component))?;
trace!("Starting component after install: {}", component);
pm.start(component)?;
if component == "tables" {
trace!("Starting component after install: {}", component);
trace!("Component {} installed successfully", component);
let database_url = std::env::var("DATABASE_URL").unwrap();
let mut conn = diesel::PgConnection::establish(&database_url)
@ -102,20 +99,39 @@ impl BootstrapManager {
.files()
.filter_map(|file| {
let path = file.path();
trace!("Found file: {:?}", path);
if path.extension()? == "sql" {
Some(path.to_path_buf())
trace!(" -> SQL file included");
Some(file)
} else {
trace!(" -> Not a SQL file, skipping");
None
}
})
.collect();
migration_files.sort();
trace!("Total migration files found: {}", migration_files.len());
migration_files.sort_by_key(|f| f.path());
for migration_file in migration_files {
let migration = std::fs::read_to_string(&migration_file)?;
trace!("Executing migration: {}", migration_file.display());
diesel::sql_query(&migration).execute(&mut conn)?;
let migration = migration_file
.contents_utf8()
.ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?;
trace!("Executing migration: {}", migration_file.path().display());
// Use batch_execute to handle multiple statements including those with dollar-quoted strings
if let Err(e) = conn.batch_execute(migration) {
log::error!(
"Failed to execute migration {}: {}",
migration_file.path().display(),
e
);
return Err(e.into());
}
trace!(
"Successfully executed migration: {}",
migration_file.path().display()
);
}
config = AppConfig::from_database(&mut conn);
@ -130,7 +146,7 @@ impl BootstrapManager {
fn generate_secure_password(&self, length: usize) -> String {
// Ensure the Rng trait is in scope for `sample`
use rand::Rng;
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
std::iter::repeat_with(|| rng.sample(Alphanumeric) as char)
.take(length)

View file

@ -68,14 +68,20 @@ async fn main() -> std::io::Result<()> {
Ok(_) => return Ok(()),
Err(e) => {
eprintln!("CLI error: {}", e);
return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("CLI command failed: {}", e)));
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("CLI command failed: {}", e),
));
}
}
}
_ => {
eprintln!("Unknown command: {}", command);
eprintln!("Run 'botserver --help' for usage information");
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Unknown command: {}", command)));
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Unknown command: {}", command),
));
}
}
}
@ -106,7 +112,10 @@ async fn main() -> std::io::Result<()> {
Err(e) => {
log::error!("Bootstrap failed: {}", e);
info!("Attempting to load configuration from database");
match diesel::Connection::establish(&std::env::var("DATABASE_URL").unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string())) {
match diesel::Connection::establish(
&std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()),
) {
Ok(mut conn) => AppConfig::from_database(&mut conn),
Err(_) => {
info!("Database not available, using environment variables as fallback");
@ -124,16 +133,25 @@ async fn main() -> std::io::Result<()> {
Ok(conn) => Arc::new(Mutex::new(conn)),
Err(e) => {
log::error!("Failed to connect to main database: {}", e);
return Err(std::io::Error::new(std::io::ErrorKind::ConnectionRefused, format!("Database connection failed: {}", e)));
return Err(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Database connection failed: {}", e),
));
}
};
let db_custom_pool = db_pool.clone();
info!("Initializing LLM server at {}", cfg.ai.endpoint);
ensure_llama_servers_running().await.expect("Failed to initialize LLM local server");
ensure_llama_servers_running()
.await
.expect("Failed to initialize LLM local server");
let cache_url = cfg.config_path("cache").join("redis.conf").display().to_string();
let cache_url = cfg
.config_path("cache")
.join("redis.conf")
.display()
.to_string();
let redis_client = match redis::Client::open(cache_url.as_str()) {
Ok(client) => Some(Arc::new(client)),
Err(e) => {
@ -143,15 +161,28 @@ async fn main() -> std::io::Result<()> {
};
let tool_manager = Arc::new(tools::ToolManager::new());
let llm_provider = Arc::new(crate::llm::OpenAIClient::new("empty".to_string(), Some(cfg.ai.endpoint.clone())));
let llm_provider = Arc::new(crate::llm::OpenAIClient::new(
"empty".to_string(),
Some(cfg.ai.endpoint.clone()),
));
let web_adapter = Arc::new(WebChannelAdapter::new());
let voice_adapter = Arc::new(VoiceAdapter::new("https://livekit.example.com".to_string(), "api_key".to_string(), "api_secret".to_string()));
let whatsapp_adapter = Arc::new(WhatsAppAdapter::new("whatsapp_token".to_string(), "phone_number_id".to_string(), "verify_token".to_string()));
let voice_adapter = Arc::new(VoiceAdapter::new(
"https://livekit.example.com".to_string(),
"api_key".to_string(),
"api_secret".to_string(),
));
let whatsapp_adapter = Arc::new(WhatsAppAdapter::new(
"whatsapp_token".to_string(),
"phone_number_id".to_string(),
"verify_token".to_string(),
));
let tool_api = Arc::new(tools::ToolApi::new());
info!("Initializing MinIO drive at {}", cfg.minio.server);
let drive = init_drive(&config.minio).await.expect("Failed to initialize Drive");
let drive = init_drive(&config.minio)
.await
.expect("Failed to initialize Drive");
let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new(
diesel::Connection::establish(&cfg.database_url()).unwrap(),
@ -174,7 +205,10 @@ async fn main() -> std::io::Result<()> {
auth_service: auth_service.clone(),
channels: Arc::new(Mutex::new({
let mut map = HashMap::new();
map.insert("web".to_string(), web_adapter.clone() as Arc<dyn crate::channels::ChannelAdapter>);
map.insert(
"web".to_string(),
web_adapter.clone() as Arc<dyn crate::channels::ChannelAdapter>,
);
map
})),
response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
@ -184,13 +218,32 @@ async fn main() -> std::io::Result<()> {
tool_api: tool_api.clone(),
});
info!("Starting HTTP server on {}:{}", config.server.host, config.server.port);
info!(
"Starting HTTP server on {}:{}",
config.server.host, config.server.port
);
let worker_count = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
let worker_count = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
// Spawn AutomationService in a LocalSet on a separate thread
let automation_state = app_state.clone();
let automation = AutomationService::new(automation_state, "templates/announcements.gbai/announcements.gbdialog");
let _automation_handle = automation.spawn();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime for automation");
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let automation = AutomationService::new(
automation_state,
"templates/announcements.gbai/announcements.gbdialog",
);
automation.spawn().await.ok();
});
});
let drive_state = app_state.clone();
let bucket_name = format!("{}default.gbai", cfg.minio.org_prefix);
@ -198,7 +251,11 @@ async fn main() -> std::io::Result<()> {
let _drive_handle = drive_monitor.spawn();
HttpServer::new(move || {
let cors = Cors::default().allow_any_origin().allow_any_method().allow_any_header().max_age(3600);
let cors = Cors::default()
.allow_any_origin()
.allow_any_method()
.allow_any_header()
.max_age(3600);
let app_state_clone = app_state.clone();
let mut app = App::new()

View file

@ -78,8 +78,10 @@ impl PackageManager {
}
self.run_commands(post_cmds, "local", &component.name)?;
trace!("Starting component after installation: {}", component.name);
self.start(&component.name)?;
trace!(
"Component '{}' installation completed successfully",
component.name
);
Ok(())
}
@ -568,7 +570,7 @@ impl PackageManager {
if target == "local" {
trace!("Executing command: {}", rendered_cmd);
let mut child = Command::new("bash")
let child = Command::new("bash")
.current_dir(&bin_path)
.args(&["-c", &rendered_cmd])
.spawn()

View file

@ -4,7 +4,6 @@ use crate::package_manager::{InstallMode, OsType};
use anyhow::Result;
use log::trace;
use rand::distr::Alphanumeric;
use rand::rng;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::path::PathBuf;
@ -166,8 +165,8 @@ impl PackageManager {
"echo \"host all all all md5\" > {{CONF_PATH}}/pg_hba.conf".to_string(),
"touch {{CONF_PATH}}/pg_ident.conf".to_string(),
"./bin/pg_ctl -D {{DATA_PATH}}/pgdata -l {{LOGS_PATH}}/postgres.log start; while ! ./bin/pg_isready -d {{DATA_PATH}}/pgdata >/dev/null 2>&1; do sleep 15; done;".to_string(),
"./bin/psql -U gbuser -c \"CREATE DATABASE botserver WITH OWNER gbuser\" || true ".to_string()
format!("./bin/pg_ctl -D {{{{DATA_PATH}}}}/pgdata -l {{{{LOGS_PATH}}}}/postgres.log start; for i in 1 2 3 4 5 6 7 8 9 10; do ./bin/pg_isready -h localhost -p 5432 >/dev/null 2>&1 && break; echo 'Waiting for PostgreSQL to start...' >&2; sleep 1; done; ./bin/pg_isready -h localhost -p 5432"),
format!("PGPASSWORD={} ./bin/psql -h localhost -U gbuser -d postgres -c \"CREATE DATABASE botserver WITH OWNER gbuser\" 2>&1 | grep -v 'already exists' || true", db_password)
],
pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![
@ -650,6 +649,33 @@ impl PackageManager {
let conf_path = self.base_path.join("conf").join(&component.name);
let logs_path = self.base_path.join("logs").join(&component.name);
// For PostgreSQL, check if it's already running
if component.name == "tables" {
let check_cmd = format!(
"./bin/pg_ctl -D {} status",
data_path.join("pgdata").display()
);
let check_output = std::process::Command::new("sh")
.current_dir(&bin_path)
.arg("-c")
.arg(&check_cmd)
.output();
if let Ok(output) = check_output {
if output.status.success() {
trace!(
"Component {} is already running, skipping start",
component.name
);
// Return a dummy child process handle - PostgreSQL is already running
return Ok(std::process::Command::new("sh")
.arg("-c")
.arg("echo 'Already running'")
.spawn()?);
}
}
}
let rendered_cmd = component
.exec_cmd
.replace("{{BIN_PATH}}", &bin_path.to_string_lossy())
@ -663,11 +689,31 @@ impl PackageManager {
rendered_cmd
);
Ok(std::process::Command::new("sh")
let child = std::process::Command::new("sh")
.current_dir(&bin_path)
.arg("-c")
.arg(&rendered_cmd)
.spawn()?)
.spawn();
// Handle "already running" errors gracefully
match child {
Ok(c) => Ok(c),
Err(e) => {
let err_msg = e.to_string();
if err_msg.contains("already running") || component.name == "tables" {
trace!(
"Component {} may already be running, continuing anyway",
component.name
);
Ok(std::process::Command::new("sh")
.arg("-c")
.arg("echo 'Already running'")
.spawn()?)
} else {
Err(e.into())
}
}
}
} else {
Err(anyhow::anyhow!("Component {} not found", component))
}