From e517dfec480eb46424b6a7d237ad23b20b58b0b1 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 28 Oct 2025 14:00:52 -0300 Subject: [PATCH] - Migration to Apache OpenDAL. --- src/automation/mod.rs | 56 ++--- src/basic/keywords/get.rs | 24 +- src/bootstrap/mod.rs | 450 ++++++++++++++------------------------ src/drive_monitor/mod.rs | 305 ++++++-------------------- src/file/mod.rs | 118 +++------- src/kb/minio_handler.rs | 104 ++------- src/kb/mod.rs | 131 +++-------- src/main.rs | 35 +-- src/shared/state.rs | 10 +- 9 files changed, 355 insertions(+), 878 deletions(-) diff --git a/src/automation/mod.rs b/src/automation/mod.rs index c394b81ad..13035db48 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -331,8 +331,7 @@ impl AutomationService { e ); - // Try to download from MinIO - if let Some(s3_client) = &self.state.s3_client { + if let Some(s3_operator) = &self.state.s3_operator { let bucket_name = format!( "{}{}.gbai", env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()), @@ -342,47 +341,26 @@ impl AutomationService { trace!("Downloading from bucket={} key={}", bucket_name, s3_key); - match s3_client. - .get_object() - .bucket(&bucket_name) - .key(&s3_key) - .send() - .await - { - Ok(response) => { - match response.body.collect().await { - Ok(data) => { - match String::from_utf8(data.into_bytes().to_vec()) { - Ok(content) => { - info!("Downloaded script '{}' from MinIO", param); + match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await { + Ok(data) => { + let bytes: Vec = data.to_vec(); + match String::from_utf8(bytes) { + Ok(content) => { + info!("Downloaded script '{}' from MinIO", param); - // Save to local cache - if let Err(e) = - std::fs::create_dir_all(&self.scripts_dir) - { - warn!("Failed to create scripts directory: {}", e); - } else if let Err(e) = - tokio::fs::write(&full_path, &content).await - { - warn!("Failed to cache script locally: {}", e); - } else { - trace!("Cached script to {}", full_path.display()); - } - - content - } - Err(e) => { - error!("Failed to decode script {}: {}", param, e); - self.cleanup_job_flag(&bot_id, param).await; - return; - } + // Save to local cache + if let Err(e) = std::fs::create_dir_all(&self.scripts_dir) { + warn!("Failed to create scripts directory: {}", e); + } else if let Err(e) = tokio::fs::write(&full_path, &content).await { + warn!("Failed to cache script locally: {}", e); + } else { + trace!("Cached script to {}", full_path.display()); } + + content } Err(e) => { - error!( - "Failed to read script body from MinIO {}: {}", - param, e - ); + error!("Failed to decode script {}: {}", param, e); self.cleanup_job_flag(&bot_id, param).await; return; } diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index a83d42199..5b81b641c 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -160,11 +160,11 @@ pub async fn get_from_bucket( return Err("Invalid file path".into()); } - let s3_client = match &state.s3_client { - Some(client) => client, + let s3_operator = match &state.s3_operator { + Some(operator) => operator, None => { - error!("S3 client not configured"); - return Err("S3 client not configured".into()); + error!("S3 operator not configured"); + return Err("S3 operator not configured".into()); } }; @@ -189,19 +189,11 @@ pub async fn get_from_bucket( bucket }; - match s3_client.head_bucket().bucket(&bucket_name).send().await { - Ok(_) => debug!("Bucket exists: {}", bucket_name), - Err(e) => { - error!("Bucket inaccessible: {} - {}", bucket_name, e); - return Err(format!("Bucket inaccessible: {}", e).into()); - } - } - let get_object_future = s3_client - .get_object() - .bucket(&bucket_name) - .key(file_path) - .send(); +let get_object_future = s3_operator + .read(&bucket_name) + .key(file_path) + .send(); let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await { Ok(Ok(response)) => response, diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index dcc7f3c2c..74699064c 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -1,18 +1,19 @@ use crate::config::AppConfig; -use crate::package_manager::{ InstallMode, PackageManager }; +use crate::package_manager::{InstallMode, PackageManager}; use anyhow::Result; +use csv; use diesel::connection::SimpleConnection; use diesel::Connection; -use dotenvy::dotenv; -use log::{ info, trace, error }; -use aws_sdk_s3::Client as S3Client; -use csv; use diesel::RunQueryDsl; +use dotenvy::dotenv; +use log::{error, info, trace}; +use opendal::services::S3; +use opendal::{Operator, OperatorBuilder}; use rand::distr::Alphanumeric; -use sha2::{ Digest, Sha256 }; +use sha2::{Digest, Sha256}; +use std::io::{self, Write}; use std::path::Path; use std::process::Command; -use std::io::{ self, Write }; pub struct ComponentInfo { pub name: &'static str, @@ -26,10 +27,9 @@ pub struct BootstrapManager { impl BootstrapManager { pub fn new(install_mode: InstallMode, tenant: Option) -> Self { - trace!( + info!( "Initializing BootstrapManager with mode {:?} and tenant {:?}", - install_mode, - tenant + install_mode, tenant ); Self { install_mode, @@ -40,87 +40,135 @@ impl BootstrapManager { pub fn start_all(&mut self) -> Result<()> { let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?; let components = vec![ - ComponentInfo { name: "tables", termination_command: "pg_ctl" }, - ComponentInfo { name: "cache", termination_command: "valkey-server" }, - ComponentInfo { name: "drive", termination_command: "minio" }, - ComponentInfo { name: "llm", termination_command: "llama-server" }, - ComponentInfo { name: "email", termination_command: "stalwart" }, - ComponentInfo { name: "proxy", termination_command: "caddy" }, - ComponentInfo { name: "directory", termination_command: "zitadel" }, - ComponentInfo { name: "alm", termination_command: "forgejo" }, - ComponentInfo { name: "alm_ci", termination_command: "forgejo-runner" }, - ComponentInfo { name: "dns", termination_command: "coredns" }, - ComponentInfo { name: "webmail", termination_command: "php" }, - ComponentInfo { name: "meeting", termination_command: "livekit-server" }, - ComponentInfo { name: "table_editor", termination_command: "nocodb" }, - ComponentInfo { name: "doc_editor", termination_command: "coolwsd" }, - ComponentInfo { name: "desktop", termination_command: "xrdp" }, - ComponentInfo { name: "devtools", termination_command: "" }, - ComponentInfo { name: "bot", termination_command: "" }, - ComponentInfo { name: "system", termination_command: "" }, - ComponentInfo { name: "vector_db", termination_command: "qdrant" }, - ComponentInfo { name: "host", termination_command: "" } + ComponentInfo { + name: "tables", + termination_command: "pg_ctl", + }, + ComponentInfo { + name: "cache", + termination_command: "valkey-server", + }, + ComponentInfo { + name: "drive", + termination_command: "minio", + }, + ComponentInfo { + name: "llm", + termination_command: "llama-server", + }, + ComponentInfo { + name: "email", + termination_command: "stalwart", + }, + ComponentInfo { + name: "proxy", + termination_command: "caddy", + }, + ComponentInfo { + name: "directory", + termination_command: "zitadel", + }, + ComponentInfo { + name: "alm", + termination_command: "forgejo", + }, + ComponentInfo { + name: "alm_ci", + termination_command: "forgejo-runner", + }, + ComponentInfo { + name: "dns", + termination_command: "coredns", + }, + ComponentInfo { + name: "webmail", + termination_command: "php", + }, + ComponentInfo { + name: "meeting", + termination_command: "livekit-server", + }, + ComponentInfo { + name: "table_editor", + termination_command: "nocodb", + }, + ComponentInfo { + name: "doc_editor", + termination_command: "coolwsd", + }, + ComponentInfo { + name: "desktop", + termination_command: "xrdp", + }, + ComponentInfo { + name: "devtools", + termination_command: "", + }, + ComponentInfo { + name: "bot", + termination_command: "", + }, + ComponentInfo { + name: "system", + termination_command: "", + }, + ComponentInfo { + name: "vector_db", + termination_command: "qdrant", + }, + ComponentInfo { + name: "host", + termination_command: "", + }, ]; for component in components { if pm.is_installed(component.name) { - trace!("Starting component: {}", component.name); pm.start(component.name)?; } else { - trace!("Component {} not installed, skipping start", component.name); if let Err(e) = self.update_bot_config(component.name) { error!( "Failed to update bot config after installing {}: {}", - component.name, - e + component.name, e ); } } } + Ok(()) } pub fn bootstrap(&mut self) -> Result { - // Check for legacy mode - if TABLES_SERVER is present, skip bootstrap if let Ok(tables_server) = std::env::var("TABLES_SERVER") { if !tables_server.is_empty() { - trace!( + info!( "Legacy mode detected (TABLES_SERVER present), skipping bootstrap installation" ); - info!("Running in legacy mode with existing database configuration"); - - // Try to connect to the database and load config let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| { - let username = std::env - ::var("TABLES_USERNAME") - .unwrap_or_else(|_| "postgres".to_string()); - let password = std::env - ::var("TABLES_PASSWORD") - .unwrap_or_else(|_| "postgres".to_string()); - let server = std::env - ::var("TABLES_SERVER") - .unwrap_or_else(|_| "localhost".to_string()); + let username = + std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "postgres".to_string()); + let password = + std::env::var("TABLES_PASSWORD").unwrap_or_else(|_| "postgres".to_string()); + let server = + std::env::var("TABLES_SERVER").unwrap_or_else(|_| "localhost".to_string()); let port = std::env::var("TABLES_PORT").unwrap_or_else(|_| "5432".to_string()); - let database = std::env - ::var("TABLES_DATABASE") - .unwrap_or_else(|_| "gbserver".to_string()); - format!("postgres://{}:{}@{}:{}/{}", username, password, server, port, database) + let database = + std::env::var("TABLES_DATABASE").unwrap_or_else(|_| "gbserver".to_string()); + format!( + "postgres://{}:{}@{}:{}/{}", + username, password, server, port, database + ) }); match diesel::PgConnection::establish(&database_url) { Ok(mut conn) => { - info!("Successfully connected to legacy database, loading configuration"); - - // Apply migrations if let Err(e) = self.apply_migrations(&mut conn) { log::warn!("Failed to apply migrations: {}", e); } - return Ok(AppConfig::from_database(&mut conn)); } Err(e) => { log::warn!("Failed to connect to legacy database: {}", e); - info!("Using environment variables as fallback"); return Ok(AppConfig::from_env()); } } @@ -133,16 +181,17 @@ impl BootstrapManager { for component in required_components { if !pm.is_installed(component) { - // Determine termination command from package manager component config - let termination_cmd = pm.components + let termination_cmd = pm + .components .get(component) .and_then(|cfg| cfg.binary_name.clone()) .unwrap_or_else(|| component.to_string()); - // If a termination command is defined, check for leftover running process if !termination_cmd.is_empty() { - let check = Command::new("pgrep").arg("-f").arg(&termination_cmd).output(); - + let check = Command::new("pgrep") + .arg("-f") + .arg(&termination_cmd) + .output(); if let Ok(output) = check { if !output.stdout.is_empty() { println!("Component '{}' appears to be already running from a previous install.", component); @@ -157,7 +206,10 @@ impl BootstrapManager { .status(); println!("Terminated existing '{}' process.", component); } else { - println!("Skipping start of '{}' as it is already running.", component); + println!( + "Skipping start of '{}' as it is already running.", + component + ); continue; } } @@ -167,29 +219,20 @@ impl BootstrapManager { if component == "tables" { let db_password = self.generate_secure_password(16); let farm_password = self.generate_secure_password(32); - let env_contents = format!( "FARM_PASSWORD={}\nDATABASE_URL=postgres://gbuser:{}@localhost:5432/botserver", - farm_password, - db_password + farm_password, db_password ); - - std::fs - ::write(".env", &env_contents) + std::fs::write(".env", &env_contents) .map_err(|e| anyhow::anyhow!("Failed to write .env file: {}", e))?; dotenv().ok(); - trace!("Generated database credentials and wrote to .env file"); } - trace!("Installing required component: {}", component); futures::executor::block_on(pm.install(component))?; if component == "tables" { - trace!("Component {} installed successfully", component); - let database_url = std::env::var("DATABASE_URL").unwrap(); - let mut conn = diesel::PgConnection - ::establish(&database_url) + let mut conn = diesel::PgConnection::establish(&database_url) .map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?; let migration_dir = include_dir::include_dir!("./migrations"); @@ -197,27 +240,21 @@ impl BootstrapManager { .files() .filter_map(|file| { let path = file.path(); - trace!("Found file: {:?}", path); if path.extension()? == "sql" { - trace!(" -> SQL file included"); Some(file) } else { - trace!(" -> Not a SQL file, skipping"); None } }) .collect(); - trace!("Total migration files found: {}", migration_files.len()); migration_files.sort_by_key(|f| f.path()); for migration_file in migration_files { let migration = migration_file .contents_utf8() .ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?; - trace!("Executing migration: {}", migration_file.path().display()); - // Use batch_execute to handle multiple statements including those with dollar-quoted strings if let Err(e) = conn.batch_execute(migration) { log::error!( "Failed to execute migration {}: {}", @@ -226,14 +263,13 @@ impl BootstrapManager { ); return Err(e.into()); } - trace!( + info!( "Successfully executed migration: {}", migration_file.path().display() ); } config = AppConfig::from_database(&mut conn); - info!("Database migrations completed and configuration loaded"); } } } @@ -242,12 +278,9 @@ impl BootstrapManager { } fn generate_secure_password(&self, length: usize) -> String { - // Ensure the Rng trait is in scope for `sample` use rand::Rng; let mut rng = rand::rng(); - - std::iter - ::repeat_with(|| rng.sample(Alphanumeric) as char) + std::iter::repeat_with(|| rng.sample(Alphanumeric) as char) .take(length) .collect() } @@ -259,175 +292,58 @@ impl BootstrapManager { format!("{:x}", hasher.finalize()) } - /// Update the bot configuration after a component is installed. - /// This reads the existing `config.csv` from the default bot bucket, - ///fix s values based on the installed component, and - /// writes the updated CSV back to the bucket. It also upserts the - /// key/value pairs into the `bot_config` table. fn update_bot_config(&self, component: &str) -> Result<()> { - // Determine bucket name: DRIVE_ORG_PREFIX + "default.gbai" - let org_prefix = std::env - ::var("DRIVE_ORG_PREFIX") - .unwrap_or_else(|_| "pragmatismo-".to_string()); - let bucket_name = format!("{}default.gbai", org_prefix); - let config_key = "default.gbot/config.csv"; - - // Build S3 client using default SDK config (compatible with S3Client) - let s3_client = S3Client::from_conf(aws_sdk_s3::Config::builder().build()); - - // Attempt to download existing config.csv - let existing_csv = match - futures::executor::block_on( - s3_client.get_object().bucket(&bucket_name).key(config_key).send() - ) - { - Ok(resp) => { - let data = futures::executor::block_on(resp.body.collect())?; - String::from_utf8(data.into_bytes().to_vec()).unwrap_or_default() - } - Err(_) => String::new(), // No existing file – start fresh - }; - - // Parse CSV into a map - let mut config_map: std::collections::HashMap< - String, - String - > = std::collections::HashMap::new(); - if !existing_csv.is_empty() { - let mut rdr = csv::ReaderBuilder - ::new() - .has_headers(false) - .from_reader(existing_csv.as_bytes()); - for result in rdr.records() { - if let Ok(record) = result { - if record.len() >= 2 { - config_map.insert(record[0].to_string(), record[1].to_string()); - } - } - } - } - - // Update configuration based on the installed component - config_map.insert(component.to_string(), "true".to_string()); - - // Serialize back to CSV - let mut wtr = csv::WriterBuilder - ::new() - .has_headers(false) - .from_writer(vec![]); - for (k, v) in &config_map { - wtr.write_record(&[k, v])?; - } - wtr.flush()?; - let csv_bytes = wtr.into_inner()?; - - // Upload updated CSV to S3 - futures::executor::block_on( - s3_client - .put_object() - .bucket(&bucket_name) - .key(config_key) - .body(csv_bytes.clone().into()) - .send() - )?; - - // Upsert into bot_config table - let database_url = std::env - ::var("DATABASE_URL") + let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()); let mut conn = diesel::pg::PgConnection::establish(&database_url)?; - for (k, v) in config_map { - diesel - ::sql_query( - "INSERT INTO bot_config (key, value) VALUES ($1, $2) \ - ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value" - ) - .bind::(&k) - .bind::(&v) - .execute(&mut conn)?; + for (k, v) in vec![(component.to_string(), "true".to_string())] { + diesel::sql_query( + "INSERT INTO bot_config (key, value) VALUES ($1, $2) \ + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + ) + .bind::(&k) + .bind::(&v) + .execute(&mut conn)?; } Ok(()) } pub async fn upload_templates_to_minio(&self, config: &AppConfig) -> Result<()> { - use aws_sdk_s3::config::Credentials; - use aws_sdk_s3::config::Region; - - info!("Uploading template bots to MinIO and creating bot entries..."); - - // First, create bot entries in database for each template let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url()); let mut conn = diesel::PgConnection::establish(&database_url)?; self.create_bots_from_templates(&mut conn)?; - let creds = Credentials::new( - &config.minio.access_key, - &config.minio.secret_key, - None, - None, - "minio" - ); + let builder = S3::default(); + builder + .root("/") + .endpoint(&config.minio.server) + .access_key_id(&config.minio.access_key) + .secret_access_key(&config.minio.secret_key); - let s3_config = aws_sdk_s3::Config - ::builder() - .credentials_provider(creds) - .endpoint_url(&config.minio.server) - .region(Region::new("us-east-1")) - .force_path_style(true) - .behavior_version(aws_sdk_s3::config::BehaviorVersion::latest()) - .build(); + // if !config.minio.use_ssl { + // builder.disable_ssl_verification(true); + // } - let client = aws_sdk_s3::Client::from_conf(s3_config); + let client = Operator::new(builder)?.finish(); - // Upload templates from templates/ directory let templates_dir = Path::new("templates"); if !templates_dir.exists() { - trace!("Templates directory not found, skipping upload"); return Ok(()); } - // Walk through each .gbai folder in templates/ for entry in std::fs::read_dir(templates_dir)? { let entry = entry?; let path = entry.path(); - - if - path.is_dir() && - path - .extension() - .map(|e| e == "gbai") - .unwrap_or(false) - { + if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) { let bot_name = path.file_name().unwrap().to_string_lossy().to_string(); - let bucket_name = format!("{}{}", config.minio.org_prefix, bot_name); - trace!("Creating bucket: {}", bucket_name); - - // Create bucket if it doesn't exist - match client.create_bucket().bucket(&bucket_name).send().await { - Ok(_) => info!("Created bucket: {}", bucket_name), - Err(e) => { - let err_str = e.to_string(); - if - err_str.contains("BucketAlreadyOwnedByYou") || - err_str.contains("BucketAlreadyExists") - { - trace!("Bucket {} already exists", bucket_name); - } else { - log::warn!("Failed to create bucket {}: {}", bucket_name, e); - } - } - } - - // Upload all files recursively - self.upload_directory_recursive(&client, &path, &bucket_name, "").await?; - info!("Uploaded template bot: {}", bot_name); + self.upload_directory_recursive(&client, &path, &bot_name, "") + .await?; } } - info!("Template bots uploaded successfully"); Ok(()) } @@ -435,31 +351,17 @@ impl BootstrapManager { use crate::shared::models::schema::bots; use diesel::prelude::*; - info!("Creating bot entries from template folders..."); - let templates_dir = Path::new("templates"); if !templates_dir.exists() { - trace!("Templates directory not found, skipping bot creation"); return Ok(()); } - // Walk through each .gbai folder in templates/ for entry in std::fs::read_dir(templates_dir)? { let entry = entry?; let path = entry.path(); - - if - path.is_dir() && - path - .extension() - .map(|e| e == "gbai") - .unwrap_or(false) - { + if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) { let bot_folder = path.file_name().unwrap().to_string_lossy().to_string(); - // Remove .gbai extension to get bot name let bot_name = bot_folder.trim_end_matches(".gbai"); - - // Format the name nicely (capitalize first letter of each word) let formatted_name = bot_name .split('_') .map(|word| { @@ -474,7 +376,6 @@ impl BootstrapManager { .collect::>() .join(" "); - // Check if bot already exists let existing: Option = bots::table .filter(bots::name.eq(&formatted_name)) .select(bots::name) @@ -482,39 +383,30 @@ impl BootstrapManager { .optional()?; if existing.is_none() { - // Insert new bot - diesel - ::sql_query( - "INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \ + diesel::sql_query( + "INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \ VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)" - ) - .bind::(&formatted_name) - .bind::( - format!("Bot for {} template", bot_name) - ) - .execute(conn)?; - - info!("Created bot entry: {}", formatted_name); + ) + .bind::(&formatted_name) + .bind::(format!("Bot for {} template", bot_name)) + .execute(conn)?; } else { - trace!("Bot already exists: {}", formatted_name); + trace!("Bot {} already exists", formatted_name); } } } - info!("Bot creation from templates completed"); Ok(()) } fn upload_directory_recursive<'a>( &'a self, - client: &'a aws_sdk_s3::Client, + client: &'a Operator, local_path: &'a Path, bucket: &'a str, - prefix: &'a str + prefix: &'a str, ) -> std::pin::Pin> + 'a>> { Box::pin(async move { - use aws_sdk_s3::primitives::ByteStream; - for entry in std::fs::read_dir(local_path)? { let entry = entry?; let path = entry.path(); @@ -526,39 +418,30 @@ impl BootstrapManager { }; if path.is_file() { - trace!( + info!( "Uploading file: {} to bucket: {} with key: {}", path.display(), bucket, key ); - - let body = ByteStream::from_path(&path).await?; - - client.put_object().bucket(bucket).key(&key).body(body).send().await?; - - trace!("Uploaded: {}", key); + let content = std::fs::read(&path)?; + client.write(&key, content).await?; } else if path.is_dir() { - self.upload_directory_recursive(client, &path, bucket, &key).await?; + self.upload_directory_recursive(client, &path, bucket, &key) + .await?; } } - Ok(()) }) } fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> { - info!("Applying database migrations..."); - let migrations_dir = std::path::Path::new("migrations"); if !migrations_dir.exists() { - trace!("No migrations directory found, skipping"); return Ok(()); } - // Get all .sql files sorted - let mut sql_files: Vec<_> = std::fs - ::read_dir(migrations_dir)? + let mut sql_files: Vec<_> = std::fs::read_dir(migrations_dir)? .filter_map(|entry| entry.ok()) .filter(|entry| { entry @@ -575,26 +458,19 @@ impl BootstrapManager { for entry in sql_files { let path = entry.path(); let filename = path.file_name().unwrap().to_string_lossy(); - - trace!("Reading migration: {}", filename); match std::fs::read_to_string(&path) { - Ok(sql) => { - trace!("Applying migration: {}", filename); - match conn.batch_execute(&sql) { - Ok(_) => info!("Applied migration: {}", filename), - Err(e) => { - // Ignore errors for already applied migrations - trace!("Migration {} result: {}", filename, e); - } + Ok(sql) => match conn.batch_execute(&sql) { + Err(e) => { + log::warn!("Migration {} failed: {}", filename, e); } - } + _ => {} + }, Err(e) => { log::warn!("Failed to read migration {}: {}", filename, e); } } } - info!("Migrations check completed"); Ok(()) } } diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 6a55d494c..a78754788 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -2,14 +2,13 @@ use crate::basic::compiler::BasicCompiler; use crate::kb::embeddings; use crate::kb::qdrant_client; use crate::shared::state::AppState; -use aws_sdk_s3::Client as S3Client; use log::{debug, error, info, warn}; +use opendal::Operator; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; use tokio::time::{interval, Duration}; -/// Tracks file state for change detection #[derive(Debug, Clone)] pub struct FileState { pub path: String, @@ -18,7 +17,6 @@ pub struct FileState { pub last_modified: Option, } -/// Drive monitor that watches for changes and triggers compilation/indexing pub struct DriveMonitor { state: Arc, bucket_name: String, @@ -34,18 +32,12 @@ impl DriveMonitor { } } - /// Start the drive monitoring service pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - info!( - "Drive Monitor service started for bucket: {}", - self.bucket_name - ); - let mut tick = interval(Duration::from_secs(30)); // Check every 30 seconds - + info!("Drive Monitor service started for bucket: {}", self.bucket_name); + let mut tick = interval(Duration::from_secs(30)); loop { tick.tick().await; - if let Err(e) = self.check_for_changes().await { error!("Error checking for drive changes: {}", e); } @@ -53,101 +45,66 @@ impl DriveMonitor { }) } - /// Check for file changes in the drive async fn check_for_changes(&self) -> Result<(), Box> { - let s3_client = match &self.state.s3_client { - Some(client) => client, + let op = match &self.state.s3_operator { + Some(op) => op, None => { - debug!("S3 client not configured"); return Ok(()); } }; - // Check .gbdialog folder for BASIC tools - self.check_gbdialog_changes(s3_client).await?; - - // Check .gbkb folder for KB documents - self.check_gbkb_changes(s3_client).await?; - // Check for default bot configuration in the drive bucket - if let Err(e) = self.check_default_gbot(s3_client).await { + self.check_gbdialog_changes(op).await?; + self.check_gbkb_changes(op).await?; + + if let Err(e) = self.check_default_gbot(op).await { error!("Error checking default bot config: {}", e); } Ok(()) } - /// Check .gbdialog folder for BASIC tool changes async fn check_gbdialog_changes( &self, - s3_client: &S3Client, + op: &Operator, ) -> Result<(), Box> { let prefix = ".gbdialog/"; - debug!("Checking {} folder for changes", prefix); - - let mut continuation_token: Option = None; + let mut current_files = HashMap::new(); - - loop { - let mut list_request = s3_client - .list_objects_v2() - .bucket(&self.bucket_name) - .prefix(prefix); - - if let Some(token) = continuation_token { - list_request = list_request.continuation_token(token); + + let mut lister = op.lister_with(prefix).recursive(true).await?; + while let Some(entry) = lister.next().await { + let entry = entry?; + let path = entry.path().to_string(); + + if path.ends_with('/') || !path.ends_with(".bas") { + continue; } - let list_result = list_request.send().await?; - - if let Some(contents) = list_result.contents { - for object in contents { - if let Some(key) = object.key { - // Skip directories and non-.bas files - if key.ends_with('/') || !key.ends_with(".bas") { - continue; - } - - let file_state = FileState { - path: key.clone(), - size: object.size.unwrap_or(0), - etag: object.e_tag.unwrap_or_default(), - last_modified: object.last_modified.map(|dt| dt.to_string()), - }; - - current_files.insert(key, file_state); - } - } - } - - if list_result.is_truncated.unwrap_or(false) { - continuation_token = list_result.next_continuation_token; - } else { - break; - } + let meta = entry.metadata().await?; + let file_state = FileState { + path: path.clone(), + size: meta.content_length() as i64, + etag: meta.etag().unwrap_or_default().to_string(), + last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), + }; + current_files.insert(path, file_state); } - // Compare with previous state and handle changes let mut file_states = self.file_states.write().await; - for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { - // File exists, check if modified if current_state.etag != previous_state.etag { - info!("BASIC tool modified: {}", path); - if let Err(e) = self.compile_tool(s3_client, path).await { + if let Err(e) = self.compile_tool(op, path).await { error!("Failed to compile tool {}: {}", path, e); } } } else { - // New file - info!("New BASIC tool detected: {}", path); - if let Err(e) = self.compile_tool(s3_client, path).await { + if let Err(e) = self.compile_tool(op, path).await { error!("Failed to compile tool {}: {}", path, e); } } } - // Check for deleted files let previous_paths: Vec = file_states .keys() .filter(|k| k.starts_with(prefix)) @@ -156,13 +113,10 @@ impl DriveMonitor { for path in previous_paths { if !current_files.contains_key(&path) { - info!("BASIC tool deleted: {}", path); - // TODO: Mark tool as inactive in database file_states.remove(&path); } } - // Update state with current files for (path, state) in current_files { file_states.insert(path, state); } @@ -170,84 +124,53 @@ impl DriveMonitor { Ok(()) } - /// Check .gbkb folder for KB document changes async fn check_gbkb_changes( &self, - s3_client: &S3Client, + op: &Operator, ) -> Result<(), Box> { let prefix = ".gbkb/"; - debug!("Checking {} folder for changes", prefix); - - let mut continuation_token: Option = None; + let mut current_files = HashMap::new(); - - loop { - let mut list_request = s3_client - .list_objects_v2() - .bucket(&self.bucket_name) - .prefix(prefix); - - if let Some(token) = continuation_token { - list_request = list_request.continuation_token(token); + + let mut lister = op.lister_with(prefix).recursive(true).await?; + while let Some(entry) = lister.next().await { + let entry = entry?; + let path = entry.path().to_string(); + + if path.ends_with('/') { + continue; } - let list_result = list_request.send().await?; - - if let Some(contents) = list_result.contents { - for object in contents { - if let Some(key) = object.key { - // Skip directories - if key.ends_with('/') { - continue; - } - - // Only process supported file types - let ext = key.rsplit('.').next().unwrap_or("").to_lowercase(); - if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) { - continue; - } - - let file_state = FileState { - path: key.clone(), - size: object.size.unwrap_or(0), - etag: object.e_tag.unwrap_or_default(), - last_modified: object.last_modified.map(|dt| dt.to_string()), - }; - - current_files.insert(key, file_state); - } - } + let ext = path.rsplit('.').next().unwrap_or("").to_lowercase(); + if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) { + continue; } - if list_result.is_truncated.unwrap_or(false) { - continuation_token = list_result.next_continuation_token; - } else { - break; - } + let meta = entry.metadata().await?; + let file_state = FileState { + path: path.clone(), + size: meta.content_length() as i64, + etag: meta.etag().unwrap_or_default().to_string(), + last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), + }; + current_files.insert(path, file_state); } - // Compare with previous state and handle changes let mut file_states = self.file_states.write().await; - for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { - // File exists, check if modified if current_state.etag != previous_state.etag { - info!("KB document modified: {}", path); - if let Err(e) = self.index_document(s3_client, path).await { + if let Err(e) = self.index_document(op, path).await { error!("Failed to index document {}: {}", path, e); } } } else { - // New file - info!("New KB document detected: {}", path); - if let Err(e) = self.index_document(s3_client, path).await { + if let Err(e) = self.index_document(op, path).await { error!("Failed to index document {}: {}", path, e); } } } - // Check for deleted files let previous_paths: Vec = file_states .keys() .filter(|k| k.starts_with(prefix)) @@ -256,13 +179,10 @@ impl DriveMonitor { for path in previous_paths { if !current_files.contains_key(&path) { - info!("KB document deleted: {}", path); - // TODO: Delete from Qdrant and mark in database file_states.remove(&path); } } - // Update state with current files for (path, state) in current_files { file_states.insert(path, state); } @@ -270,76 +190,36 @@ impl DriveMonitor { Ok(()) } - /// Check for default bot configuration in the drive bucket async fn check_default_gbot( &self, - s3_client: &S3Client, + op: &Operator, ) -> Result<(), Box> { - // The default bot configuration is expected at: - // /default.gbai/default.gbot/config.csv - // Construct the expected key prefix let prefix = format!("{}default.gbot/", self.bucket_name); let config_key = format!("{}config.csv", prefix); - - debug!("Checking for default bot config at key: {}", config_key); - - // Attempt to get the object metadata to see if it exists - let head_req = s3_client - .head_object() - .bucket(&self.bucket_name) - .key(&config_key) - .send() - .await; - - match head_req { + + match op.stat(&config_key).await { Ok(_) => { - info!("Default bot config found, downloading {}", config_key); - // Download the CSV file - let get_resp = s3_client - .get_object() - .bucket(&self.bucket_name) - .key(&config_key) - .send() - .await?; - - let data = get_resp.body.collect().await?; - let csv_content = String::from_utf8(data.into_bytes().to_vec()) + let content = op.read(&config_key).await?; + let csv_content = String::from_utf8(content.to_vec()) .map_err(|e| format!("UTF-8 error in config.csv: {}", e))?; - - // Log the retrieved configuration (in a real implementation this would be parsed - // and used to populate the bot_config table, respecting overrides from .gbot files) - info!("Retrieved default bot config CSV:\n{}", csv_content); - // TODO: Parse CSV and upsert into bot_config table with appropriate precedence + debug!("Found config.csv: {} bytes", csv_content.len()); Ok(()) } Err(e) => { - // If the object does not exist, simply ignore - debug!("Default bot config not present: {}", e); + debug!("Config file not found or inaccessible: {}", e); Ok(()) } } } - /// Compile a BASIC tool file async fn compile_tool( &self, - s3_client: &S3Client, + op: &Operator, file_path: &str, ) -> Result<(), Box> { - info!("Compiling BASIC tool: {}", file_path); + let content = op.read(file_path).await?; + let source_content = String::from_utf8(content.to_vec())?; - // Download source from S3 - let get_response = s3_client - .get_object() - .bucket(&self.bucket_name) - .key(file_path) - .send() - .await?; - - let data = get_response.body.collect().await?; - let source_content = String::from_utf8(data.into_bytes().to_vec())?; - - // Extract tool name let tool_name = file_path .strip_prefix(".gbdialog/") .unwrap_or(file_path) @@ -347,10 +227,6 @@ impl DriveMonitor { .unwrap_or(file_path) .to_string(); - // Calculate file hash for change detection - let _file_hash = format!("{:x}", source_content.len()); - - // Create work directory using bot from bucket name let bot_name = self .bucket_name .strip_suffix(".gbai") @@ -358,46 +234,31 @@ impl DriveMonitor { let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name); std::fs::create_dir_all(&work_dir)?; - // Write source to local file let local_source_path = format!("{}/{}.bas", work_dir, tool_name); std::fs::write(&local_source_path, &source_content)?; - // Compile using BasicCompiler let compiler = BasicCompiler::new(Arc::clone(&self.state)); let result = compiler.compile_file(&local_source_path, &work_dir)?; - info!("Tool compiled successfully: {}", tool_name); - info!(" AST: {}", result.ast_path); - - // Save to database if let Some(mcp_tool) = result.mcp_tool { info!( - " MCP tool definition generated with {} parameters", + "MCP tool definition generated with {} parameters", mcp_tool.input_schema.properties.len() ); } if result.openai_tool.is_some() { - info!(" OpenAI tool definition generated"); + debug!("OpenAI tool definition generated"); } - // TODO: Insert/update in basic_tools table - // INSERT INTO basic_tools (id, bot_id, tool_name, file_path, ast_path, file_hash, - // mcp_json, tool_json, compiled_at, is_active, created_at, updated_at) - // VALUES (...) ON CONFLICT (bot_id, tool_name) DO UPDATE SET ... - Ok(()) } - /// Index a KB document async fn index_document( &self, - s3_client: &S3Client, + op: &Operator, file_path: &str, ) -> Result<(), Box> { - info!("Indexing KB document: {}", file_path); - - // Extract collection name from path (.gbkb/collection_name/file.pdf) let parts: Vec<&str> = file_path.split('/').collect(); if parts.len() < 3 { warn!("Invalid KB path structure: {}", file_path); @@ -405,21 +266,10 @@ impl DriveMonitor { } let collection_name = parts[1]; - - // Download file from S3 - let get_response = s3_client - .get_object() - .bucket(&self.bucket_name) - .key(file_path) - .send() - .await?; - - let data = get_response.body.collect().await?; - let bytes = data.into_bytes().to_vec(); - - // Extract text based on file type + let content = op.read(file_path).await?; + let bytes = content.to_vec(); + let text_content = self.extract_text(file_path, &bytes)?; - if text_content.trim().is_empty() { warn!("No text extracted from: {}", file_path); return Ok(()); @@ -431,35 +281,21 @@ impl DriveMonitor { file_path ); - // Create Qdrant collection name let qdrant_collection = format!("kb_default_{}", collection_name); - - // Ensure collection exists qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?; - - // Index document + embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content) .await?; - info!("Document indexed successfully: {}", file_path); - - // TODO: Insert/update in kb_documents table - // INSERT INTO kb_documents (id, bot_id, user_id, collection_name, file_path, file_size, - // file_hash, first_published_at, last_modified_at, indexed_at, - // metadata, created_at, updated_at) - // VALUES (...) ON CONFLICT (...) DO UPDATE SET ... - Ok(()) } - /// Extract text from various file types fn extract_text( &self, file_path: &str, content: &[u8], ) -> Result> { let path_lower = file_path.to_ascii_lowercase(); - if path_lower.ends_with(".pdf") { match pdf_extract::extract_text_from_mem(content) { Ok(text) => Ok(text), @@ -472,16 +308,13 @@ impl DriveMonitor { String::from_utf8(content.to_vec()) .map_err(|e| format!("UTF-8 decoding failed: {}", e).into()) } else { - // Try as plain text String::from_utf8(content.to_vec()) .map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into()) } } - /// Clear all tracked file states pub async fn clear_state(&self) { let mut states = self.file_states.write().await; states.clear(); - info!("Cleared all file states"); } } diff --git a/src/file/mod.rs b/src/file/mod.rs index c7120b9b5..c3b247045 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -1,11 +1,11 @@ use actix_multipart::Multipart; use actix_web::web; use actix_web::{post, HttpResponse}; -use aws_sdk_s3::{Client, Error as S3Error}; +use log::{error, info}; +use opendal::Operator; use std::io::Write; use tempfile::NamedTempFile; use tokio_stream::StreamExt as TokioStreamExt; - use crate::config::DriveConfig; use crate::shared::state::AppState; @@ -16,15 +16,11 @@ pub async fn upload_file( state: web::Data, ) -> Result { let folder_path = folder_path.into_inner(); - - // Create a temporary file that will hold the uploaded data let mut temp_file = NamedTempFile::new().map_err(|e| { actix_web::error::ErrorInternalServerError(format!("Failed to create temp file: {}", e)) })?; let mut file_name: Option = None; - - // Process multipart form data while let Some(mut field) = payload.try_next().await? { if let Some(disposition) = field.content_disposition() { if let Some(name) = disposition.get_filename() { @@ -32,7 +28,6 @@ pub async fn upload_file( } } - // Write each chunk of the field to the temporary file while let Some(chunk) = field.try_next().await? { temp_file.write_all(&chunk).map_err(|e| { actix_web::error::ErrorInternalServerError(format!( @@ -43,44 +38,24 @@ pub async fn upload_file( } } - // Use a fallback name if the client didn't supply one let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string()); - - // Convert the NamedTempFile into a TempPath so we can get a stable path let temp_file_path = temp_file.into_temp_path(); - - // Retrieve the bucket name from configuration, handling the case where it is missing - let bucket_name = match &state.get_ref().config { - Some(cfg) => cfg.s3_bucket.clone(), - None => { - // Clean up the temp file before returning the error - let _ = std::fs::remove_file(&temp_file_path); - return Err(actix_web::error::ErrorInternalServerError( - "S3 bucket configuration is missing", - )); - } - }; - - // Build the S3 object key (folder + filename) - let s3_key = format!("{}/{}", folder_path, file_name); - - // Retrieve a reference to the S3 client, handling the case where it is missing - let s3_client = state.get_ref().s3_client.as_ref().ok_or_else(|| { - actix_web::error::ErrorInternalServerError("S3 client is not initialized") + + let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| { + actix_web::error::ErrorInternalServerError("S3 operator is not initialized") })?; - // Perform the upload - match upload_to_s3(s3_client, &bucket_name, &s3_key, &temp_file_path).await { + let s3_key = format!("{}/{}", folder_path, file_name); + + match upload_to_s3(op, &s3_key, &temp_file_path).await { Ok(_) => { - // Remove the temporary file now that the upload succeeded let _ = std::fs::remove_file(&temp_file_path); Ok(HttpResponse::Ok().body(format!( - "Uploaded file '{}' to folder '{}' in S3 bucket '{}'", - file_name, folder_path, bucket_name + "Uploaded file '{}' to folder '{}'", + file_name, folder_path ))) } Err(e) => { - // Ensure the temporary file is cleaned up even on failure let _ = std::fs::remove_file(&temp_file_path); Err(actix_web::error::ErrorInternalServerError(format!( "Failed to upload file to S3: {}", @@ -90,61 +65,34 @@ pub async fn upload_file( } } -// Helper function to get S3 client -pub async fn init_drive(cfg: &DriveConfig) -> Result> { - // Build static credentials from the Drive configuration. - let credentials = aws_sdk_s3::config::Credentials::new( - cfg.access_key.clone(), - cfg.secret_key.clone(), - None, - None, - "static", - ); +pub async fn init_drive(cfg: &DriveConfig) -> Result> { + use opendal::services::S3; + use opendal::Operator; + + let mut builder = S3::default(); + + builder.root("/"); + builder.endpoint(&cfg.server); + builder.access_key_id(&cfg.access_key); + builder.secret_access_key(&cfg.secret_key); + + + if cfg.server.contains("minio") || cfg.server.contains("localhost") { + builder.enable_virtual_host_style(); + } - // Construct the endpoint URL, respecting the SSL flag. - let scheme = if cfg.use_ssl { "https" } else { "http" }; - let endpoint = format!("{}://{}", scheme, cfg.server); - - // MinIO requires path‑style addressing. - let s3_config = aws_sdk_s3::config::Builder::new() - // Set the behavior version to the latest to satisfy the SDK requirement. - .behavior_version(aws_sdk_s3::config::BehaviorVersion::latest()) - .region(aws_sdk_s3::config::Region::new("us-east-1")) - .endpoint_url(endpoint) - .credentials_provider(credentials) - .force_path_style(true) - .build(); - - Ok(Client::from_conf(s3_config)) + let op = Operator::new(builder)?.finish(); + + info!("OpenDAL S3 operator initialized for bucket: {}", cfg.bucket); + Ok(op) } -// Helper function to upload file to S3 async fn upload_to_s3( - client: &Client, - bucket: &str, + op: &Operator, key: &str, file_path: &std::path::Path, -) -> Result<(), S3Error> { - // Convert the file at `file_path` into a ByteStream, mapping any I/O error - // into the appropriate `SdkError` type expected by the function signature. - let body = aws_sdk_s3::primitives::ByteStream::from_path(file_path) - .await - .map_err(|e| { - aws_sdk_s3::error::SdkError::< - aws_sdk_s3::operation::put_object::PutObjectError, - aws_sdk_s3::primitives::ByteStream, - >::construction_failure(e) - })?; - - // Perform the actual upload to S3. - client - .put_object() - .bucket(bucket) - .key(key) - .body(body) - .send() - .await - .map(|_| ())?; // Convert the successful output to `()`. - +) -> Result<(), opendal::Error> { + let data = std::fs::read(file_path)?; + op.write(key, data).await?; Ok(()) } diff --git a/src/kb/minio_handler.rs b/src/kb/minio_handler.rs index 619948f8b..4ac206ba9 100644 --- a/src/kb/minio_handler.rs +++ b/src/kb/minio_handler.rs @@ -1,12 +1,11 @@ use crate::shared::state::AppState; -use aws_sdk_s3::Client as S3Client; use log::{debug, error, info}; +use opendal::Operator; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; use tokio::time::{interval, Duration}; -/// MinIO file state tracker #[derive(Debug, Clone)] pub struct FileState { pub path: String, @@ -15,52 +14,41 @@ pub struct FileState { pub last_modified: Option, } -/// MinIO handler that monitors bucket changes pub struct MinIOHandler { state: Arc, - bucket_name: String, watched_prefixes: Arc>>, file_states: Arc>>, } impl MinIOHandler { - pub fn new(state: Arc, bucket_name: String) -> Self { + pub fn new(state: Arc) -> Self { Self { state, - bucket_name, watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())), file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), } } - /// Add a prefix to watch (e.g., ".gbkb/", ".gbdialog/") pub async fn watch_prefix(&self, prefix: String) { let mut prefixes = self.watched_prefixes.write().await; if !prefixes.contains(&prefix) { prefixes.push(prefix.clone()); - info!("Now watching MinIO prefix: {}", prefix); } } - /// Remove a prefix from watch list pub async fn unwatch_prefix(&self, prefix: &str) { let mut prefixes = self.watched_prefixes.write().await; prefixes.retain(|p| p != prefix); - info!("Stopped watching MinIO prefix: {}", prefix); } - /// Start the monitoring service pub fn spawn( self: Arc, change_callback: Arc, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - info!("MinIO Handler service started"); - let mut tick = interval(Duration::from_secs(15)); // Check every 15 seconds - + let mut tick = interval(Duration::from_secs(15)); loop { tick.tick().await; - if let Err(e) = self.check_for_changes(&change_callback).await { error!("Error checking for MinIO changes: {}", e); } @@ -68,93 +56,59 @@ impl MinIOHandler { }) } - /// Check for file changes in watched prefixes async fn check_for_changes( &self, callback: &Arc, ) -> Result<(), Box> { - let s3_client = match &self.state.s3_client { - Some(client) => client, + let op = match &self.state.s3_operator { + Some(op) => op, None => { - debug!("S3 client not configured"); return Ok(()); } }; let prefixes = self.watched_prefixes.read().await; - for prefix in prefixes.iter() { - debug!("Checking prefix: {}", prefix); - - if let Err(e) = self.check_prefix_changes(s3_client, prefix, callback).await { + if let Err(e) = self.check_prefix_changes(op, prefix, callback).await { error!("Error checking prefix {}: {}", prefix, e); } } - Ok(()) } - /// Check changes in a specific prefix async fn check_prefix_changes( &self, - s3_client: &S3Client, + op: &Operator, prefix: &str, callback: &Arc, ) -> Result<(), Box> { - // List all objects with the prefix - let mut continuation_token: Option = None; let mut current_files = HashMap::new(); - - loop { - let mut list_request = s3_client - .list_objects_v2() - .bucket(&self.bucket_name) - .prefix(prefix); - - if let Some(token) = continuation_token { - list_request = list_request.continuation_token(token); + + let mut lister = op.lister_with(prefix).recursive(true).await?; + while let Some(entry) = lister.next().await { + let entry = entry?; + let path = entry.path().to_string(); + + if path.ends_with('/') { + continue; } - let list_result = list_request.send().await?; - - if let Some(contents) = list_result.contents { - for object in contents { - if let Some(key) = object.key { - // Skip directories - if key.ends_with('/') { - continue; - } - - let file_state = FileState { - path: key.clone(), - size: object.size.unwrap_or(0), - etag: object.e_tag.unwrap_or_default(), - last_modified: object.last_modified.map(|dt| dt.to_string()), - }; - - current_files.insert(key, file_state); - } - } - } - - if list_result.is_truncated.unwrap_or(false) { - continuation_token = list_result.next_continuation_token; - } else { - break; - } + let meta = entry.metadata().await?; + let file_state = FileState { + path: path.clone(), + size: meta.content_length().parse::().unwrap_or(0), + etag: meta.etag().unwrap_or_default().to_string(), + last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()), + }; + current_files.insert(path, file_state); } - // Compare with previous state let mut file_states = self.file_states.write().await; - - // Check for new or modified files for (path, current_state) in current_files.iter() { if let Some(previous_state) = file_states.get(path) { - // File exists, check if modified if current_state.etag != previous_state.etag || current_state.size != previous_state.size { - info!("File modified: {}", path); callback(FileChangeEvent::Modified { path: path.clone(), size: current_state.size, @@ -162,8 +116,6 @@ impl MinIOHandler { }); } } else { - // New file - info!("File created: {}", path); callback(FileChangeEvent::Created { path: path.clone(), size: current_state.size, @@ -172,7 +124,6 @@ impl MinIOHandler { } } - // Check for deleted files let previous_paths: Vec = file_states .keys() .filter(|k| k.starts_with(prefix)) @@ -181,13 +132,11 @@ impl MinIOHandler { for path in previous_paths { if !current_files.contains_key(&path) { - info!("File deleted: {}", path); callback(FileChangeEvent::Deleted { path: path.clone() }); file_states.remove(&path); } } - // Update state with current files for (path, state) in current_files { file_states.insert(path, state); } @@ -195,20 +144,16 @@ impl MinIOHandler { Ok(()) } - /// Get current state of a file pub async fn get_file_state(&self, path: &str) -> Option { let states = self.file_states.read().await; states.get(path).cloned() } - /// Clear all tracked file states pub async fn clear_state(&self) { let mut states = self.file_states.write().await; states.clear(); - info!("Cleared all file states"); } - /// Get all tracked files for a prefix pub async fn get_files_by_prefix(&self, prefix: &str) -> Vec { let states = self.file_states.read().await; states @@ -219,7 +164,6 @@ impl MinIOHandler { } } -/// File change event types #[derive(Debug, Clone)] pub enum FileChangeEvent { Created { @@ -266,7 +210,6 @@ mod tests { size: 100, etag: "abc123".to_string(), }; - assert_eq!(event.path(), "test.txt"); assert_eq!(event.event_type(), "created"); } @@ -286,7 +229,6 @@ mod tests { let deleted = FileChangeEvent::Deleted { path: "file3.txt".to_string(), }; - assert_eq!(created.event_type(), "created"); assert_eq!(modified.event_type(), "modified"); assert_eq!(deleted.event_type(), "deleted"); diff --git a/src/kb/mod.rs b/src/kb/mod.rs index a325fdcb1..9888d5c72 100644 --- a/src/kb/mod.rs +++ b/src/kb/mod.rs @@ -1,6 +1,7 @@ use crate::shared::models::KBCollection; use crate::shared::state::AppState; use log::{debug, error, info, warn}; +use opendal::Operator; use std::collections::HashMap; use std::error::Error; use std::sync::Arc; @@ -10,7 +11,6 @@ pub mod embeddings; pub mod minio_handler; pub mod qdrant_client; -/// Represents a change in a KB file #[derive(Debug, Clone)] pub enum FileChangeEvent { Created(String), @@ -18,7 +18,6 @@ pub enum FileChangeEvent { Deleted(String), } -/// KB Manager service that coordinates MinIO monitoring and Qdrant indexing pub struct KBManager { state: Arc, watched_collections: Arc>>, @@ -32,7 +31,6 @@ impl KBManager { } } - /// Start watching a KB collection folder pub async fn add_collection( &self, bot_id: String, @@ -41,13 +39,12 @@ impl KBManager { ) -> Result<(), Box> { let folder_path = format!(".gbkb/{}", collection_name); let qdrant_collection = format!("kb_{}_{}", bot_id, collection_name); - + info!( "Adding KB collection: {} -> {}", collection_name, qdrant_collection ); - // Create Qdrant collection if it doesn't exist qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?; let now = chrono::Utc::now().to_rfc3339(); @@ -67,30 +64,23 @@ impl KBManager { let mut collections = self.watched_collections.write().await; collections.insert(collection_name.to_string(), collection); - info!("KB collection added successfully: {}", collection_name); Ok(()) } - /// Remove a KB collection pub async fn remove_collection( &self, collection_name: &str, ) -> Result<(), Box> { let mut collections = self.watched_collections.write().await; collections.remove(collection_name); - info!("KB collection removed: {}", collection_name); Ok(()) } - /// Start the KB monitoring service pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - info!("KB Manager service started"); let mut tick = interval(Duration::from_secs(30)); - loop { tick.tick().await; - let collections = self.watched_collections.read().await; for (name, collection) in collections.iter() { if let Err(e) = self.check_collection_updates(collection).await { @@ -101,67 +91,44 @@ impl KBManager { }) } - /// Check for updates in a collection async fn check_collection_updates( &self, collection: &KBCollection, ) -> Result<(), Box> { - debug!("Checking updates for collection: {}", collection.name); - - let s3_client = match &self.state.s3_client { - Some(client) => client, + let op = match &self.state.s3_operator { + Some(op) => op, None => { - warn!("S3 client not configured"); + warn!("S3 operator not configured"); return Ok(()); } }; - let config = match &self.state.config { - Some(cfg) => cfg, - None => { - error!("App configuration missing"); - return Err("App configuration missing".into()); + let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?; + while let Some(entry) = lister.next().await { + let entry = entry?; + let path = entry.path().to_string(); + + if path.ends_with('/') { + continue; } - }; - let bucket_name = format!("{}default.gbai", config.minio.org_prefix); - - // List objects in the collection folder - let list_result = s3_client - .list_objects_v2() - .bucket(&bucket_name) - .prefix(&collection.folder_path) - .send() - .await?; - - if let Some(contents) = list_result.contents { - for object in contents { - if let Some(key) = object.key { - // Skip directories - if key.ends_with('/') { - continue; - } - - // Check if file needs indexing - if let Err(e) = self - .process_file( - &collection, - &key, - object.size.unwrap_or(0), - object.last_modified.map(|dt| dt.to_string()), - ) - .await - { - error!("Error processing file {}: {}", key, e); - } - } + let meta = entry.metadata().await?; + if let Err(e) = self + .process_file( + &collection, + &path, + meta.content_length() as i64, + meta.last_modified().map(|dt| dt.to_rfc3339()), + ) + .await + { + error!("Error processing file {}: {}", path, e); } } Ok(()) } - /// Process a single file (check if changed and index if needed) async fn process_file( &self, collection: &KBCollection, @@ -169,9 +136,7 @@ impl KBManager { file_size: i64, _last_modified: Option, ) -> Result<(), Box> { - // Get file content hash let content = self.get_file_content(file_path).await?; - // Simple hash using length and first/last bytes for change detection let file_hash = if content.len() > 100 { format!( "{:x}_{:x}_{}", @@ -183,24 +148,16 @@ impl KBManager { format!("{:x}", content.len()) }; - // Check if file is already indexed with same hash if self .is_file_indexed(collection.bot_id.clone(), file_path, &file_hash) .await? { - debug!("File already indexed: {}", file_path); return Ok(()); } - info!( - "Indexing file: {} to collection {}", - file_path, collection.name - ); - - // Extract text based on file type + info!("Indexing file: {} to collection {}", file_path, collection.name); let text_content = self.extract_text(file_path, &content).await?; - - // Generate embeddings and store in Qdrant + embeddings::index_document( &self.state, &collection.qdrant_collection, @@ -209,7 +166,6 @@ impl KBManager { ) .await?; - // Save metadata to database let metadata = serde_json::json!({ "file_type": self.get_file_type(file_path), "last_modified": _last_modified, @@ -225,48 +181,29 @@ impl KBManager { ) .await?; - info!("File indexed successfully: {}", file_path); Ok(()) } - /// Get file content from MinIO async fn get_file_content( &self, file_path: &str, ) -> Result, Box> { - let s3_client = self + let op = self .state - .s3_client + .s3_operator .as_ref() - .ok_or("S3 client not configured")?; + .ok_or("S3 operator not configured")?; - let config = self - .state - .config - .as_ref() - .ok_or("App configuration missing")?; - - let bucket_name = format!("{}default.gbai", config.minio.org_prefix); - - let response = s3_client - .get_object() - .bucket(&bucket_name) - .key(file_path) - .send() - .await?; - - let data = response.body.collect().await?; - Ok(data.into_bytes().to_vec()) + let content = op.read(file_path).await?; + Ok(content.to_vec()) } - /// Extract text from various file types async fn extract_text( &self, file_path: &str, content: &[u8], ) -> Result> { let path_lower = file_path.to_ascii_lowercase(); - if path_lower.ends_with(".pdf") { match pdf_extract::extract_text_from_mem(content) { Ok(text) => Ok(text), @@ -279,29 +216,23 @@ impl KBManager { String::from_utf8(content.to_vec()) .map_err(|e| format!("UTF-8 decoding failed: {}", e).into()) } else if path_lower.ends_with(".docx") { - // TODO: Add DOCX support warn!("DOCX format not yet supported: {}", file_path); Err("DOCX format not supported".into()) } else { - // Try as plain text String::from_utf8(content.to_vec()) .map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into()) } } - /// Check if file is already indexed async fn is_file_indexed( &self, _bot_id: String, _file_path: &str, _file_hash: &str, ) -> Result> { - // TODO: Query database to check if file with same hash exists - // For now, return false to always reindex Ok(false) } - /// Save document metadata to database async fn save_document_metadata( &self, _bot_id: String, @@ -311,7 +242,6 @@ impl KBManager { file_hash: &str, _metadata: serde_json::Value, ) -> Result<(), Box> { - // TODO: Save to database using Diesel info!( "Saving metadata for {}: size={}, hash={}", file_path, file_size, file_hash @@ -319,7 +249,6 @@ impl KBManager { Ok(()) } - /// Get file type from path fn get_file_type(&self, file_path: &str) -> String { file_path .rsplit('.') diff --git a/src/main.rs b/src/main.rs index 783025f36..6abb60355 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] #![cfg_attr(feature = "desktop", windows_subsystem = "windows")] + use actix_cors::Cors; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; @@ -19,10 +20,8 @@ mod context; mod drive_monitor; #[cfg(feature = "email")] mod email; - #[cfg(feature = "desktop")] mod ui; - mod file; mod kb; mod llm; @@ -65,7 +64,6 @@ use crate::whatsapp::WhatsAppAdapter; #[tokio::main] async fn main() -> std::io::Result<()> { let args: Vec = std::env::args().collect(); - if args.len() > 1 { let command = &args[1]; match command.as_str() { @@ -93,10 +91,8 @@ async fn main() -> std::io::Result<()> { dotenv().ok(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) - .write_style(env_logger::WriteStyle::Always) - .init(); - - info!("Starting BotServer bootstrap process"); + .write_style(env_logger::WriteStyle::Always) + .init(); let install_mode = if args.contains(&"--container".to_string()) { InstallMode::Container @@ -113,19 +109,17 @@ async fn main() -> std::io::Result<()> { let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()); let cfg = match bootstrap.bootstrap() { Ok(config) => { - info!("Bootstrap completed successfully, configuration loaded from database"); + info!("Bootstrap completed successfully"); config } Err(e) => { log::error!("Bootstrap failed: {}", e); - info!("Attempting to load configuration from database"); match diesel::Connection::establish( &std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()), ) { Ok(mut conn) => AppConfig::from_database(&mut conn), Err(_) => { - info!("Database not available, using environment variables as fallback"); AppConfig::from_env() } } @@ -133,15 +127,11 @@ async fn main() -> std::io::Result<()> { }; let _ = bootstrap.start_all(); - - // Upload template bots to MinIO on first startup if let Err(e) = bootstrap.upload_templates_to_minio(&cfg).await { log::warn!("Failed to upload templates to MinIO: {}", e); } let config = std::sync::Arc::new(cfg.clone()); - - info!("Establishing database connection to {}", cfg.database_url()); let db_pool = match diesel::Connection::establish(&cfg.database_url()) { Ok(conn) => Arc::new(Mutex::new(conn)), Err(e) => { @@ -154,8 +144,6 @@ async fn main() -> std::io::Result<()> { }; let db_custom_pool = db_pool.clone(); - - info!("Initializing LLM server at {}", cfg.ai.endpoint); ensure_llama_servers_running() .await .expect("Failed to initialize LLM local server"); @@ -176,7 +164,6 @@ async fn main() -> std::io::Result<()> { "empty".to_string(), Some(cfg.ai.endpoint.clone()), )); - let web_adapter = Arc::new(WebChannelAdapter::new()); let voice_adapter = Arc::new(VoiceAdapter::new( "https://livekit.example.com".to_string(), @@ -190,7 +177,6 @@ async fn main() -> std::io::Result<()> { )); let tool_api = Arc::new(tools::ToolApi::new()); - info!("Initializing drive at {}", cfg.minio.server); let drive = init_drive(&config.minio) .await .expect("Failed to initialize Drive"); @@ -199,13 +185,14 @@ async fn main() -> std::io::Result<()> { diesel::Connection::establish(&cfg.database_url()).unwrap(), redis_client.clone(), ))); + let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new( diesel::Connection::establish(&cfg.database_url()).unwrap(), redis_client.clone(), ))); let app_state = Arc::new(AppState { - s3_client: Some(drive.clone()), + s3_operator: Some(drive.clone()), config: Some(cfg.clone()), conn: db_pool.clone(), custom_conn: db_custom_pool.clone(), @@ -229,23 +216,17 @@ async fn main() -> std::io::Result<()> { tool_api: tool_api.clone(), }); - info!( - "Starting HTTP server on {}:{}", - config.server.host, config.server.port - ); - + info!("Starting HTTP server on {}:{}", config.server.host, config.server.port); let worker_count = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(4); - // Spawn AutomationService in a LocalSet on a separate thread let automation_state = app_state.clone(); std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("Failed to create runtime for automation"); - let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); @@ -267,8 +248,8 @@ async fn main() -> std::io::Result<()> { .allow_any_method() .allow_any_header() .max_age(3600); - let app_state_clone = app_state.clone(); + let app_state_clone = app_state.clone(); let mut app = App::new() .wrap(cors) .wrap(Logger::default()) diff --git a/src/shared/state.rs b/src/shared/state.rs index 07eeaf8ef..fe39d97f3 100644 --- a/src/shared/state.rs +++ b/src/shared/state.rs @@ -6,28 +6,26 @@ use crate::session::SessionManager; use crate::tools::{ToolApi, ToolManager}; use crate::whatsapp::WhatsAppAdapter; use diesel::{Connection, PgConnection}; +use opendal::Operator; use redis::Client; use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex; use tokio::sync::mpsc; - use crate::shared::models::BotResponse; pub struct AppState { - pub s3_client: Option, + pub s3_operator: Option, pub config: Option, pub conn: Arc>, pub custom_conn: Arc>, pub redis_client: Option>, - pub session_manager: Arc>, pub tool_manager: Arc, pub llm_provider: Arc, pub auth_service: Arc>, pub channels: Arc>>>, pub response_channels: Arc>>>, - pub web_adapter: Arc, pub voice_adapter: Arc, pub whatsapp_adapter: Arc, @@ -37,7 +35,7 @@ pub struct AppState { impl Clone for AppState { fn clone(&self) -> Self { Self { - s3_client: self.s3_client.clone(), + s3_operator: self.s3_operator.clone(), config: self.config.clone(), conn: Arc::clone(&self.conn), custom_conn: Arc::clone(&self.custom_conn), @@ -59,7 +57,7 @@ impl Clone for AppState { impl Default for AppState { fn default() -> Self { Self { - s3_client: None, + s3_operator: None, config: None, conn: Arc::new(Mutex::new( diesel::PgConnection::establish("postgres://localhost/test").unwrap(),