- Migration to Apache OpenDAL.

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-28 14:00:52 -03:00
parent 46c4239fdb
commit f29a3c1259
9 changed files with 355 additions and 878 deletions

View file

@ -331,8 +331,7 @@ impl AutomationService {
e
);
// Try to download from MinIO
if let Some(s3_client) = &self.state.s3_client {
if let Some(s3_operator) = &self.state.s3_operator {
let bucket_name = format!(
"{}{}.gbai",
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
@ -342,47 +341,26 @@ impl AutomationService {
trace!("Downloading from bucket={} key={}", bucket_name, s3_key);
match s3_client.
.get_object()
.bucket(&bucket_name)
.key(&s3_key)
.send()
.await
{
Ok(response) => {
match response.body.collect().await {
Ok(data) => {
match String::from_utf8(data.into_bytes().to_vec()) {
Ok(content) => {
info!("Downloaded script '{}' from MinIO", param);
match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await {
Ok(data) => {
let bytes: Vec<u8> = data.to_vec();
match String::from_utf8(bytes) {
Ok(content) => {
info!("Downloaded script '{}' from MinIO", param);
// Save to local cache
if let Err(e) =
std::fs::create_dir_all(&self.scripts_dir)
{
warn!("Failed to create scripts directory: {}", e);
} else if let Err(e) =
tokio::fs::write(&full_path, &content).await
{
warn!("Failed to cache script locally: {}", e);
} else {
trace!("Cached script to {}", full_path.display());
}
content
}
Err(e) => {
error!("Failed to decode script {}: {}", param, e);
self.cleanup_job_flag(&bot_id, param).await;
return;
}
// Save to local cache
if let Err(e) = std::fs::create_dir_all(&self.scripts_dir) {
warn!("Failed to create scripts directory: {}", e);
} else if let Err(e) = tokio::fs::write(&full_path, &content).await {
warn!("Failed to cache script locally: {}", e);
} else {
trace!("Cached script to {}", full_path.display());
}
content
}
Err(e) => {
error!(
"Failed to read script body from MinIO {}: {}",
param, e
);
error!("Failed to decode script {}: {}", param, e);
self.cleanup_job_flag(&bot_id, param).await;
return;
}

View file

@ -160,11 +160,11 @@ pub async fn get_from_bucket(
return Err("Invalid file path".into());
}
let s3_client = match &state.s3_client {
Some(client) => client,
let s3_operator = match &state.s3_operator {
Some(operator) => operator,
None => {
error!("S3 client not configured");
return Err("S3 client not configured".into());
error!("S3 operator not configured");
return Err("S3 operator not configured".into());
}
};
@ -189,19 +189,11 @@ pub async fn get_from_bucket(
bucket
};
match s3_client.head_bucket().bucket(&bucket_name).send().await {
Ok(_) => debug!("Bucket exists: {}", bucket_name),
Err(e) => {
error!("Bucket inaccessible: {} - {}", bucket_name, e);
return Err(format!("Bucket inaccessible: {}", e).into());
}
}
let get_object_future = s3_client
.get_object()
.bucket(&bucket_name)
.key(file_path)
.send();
let get_object_future = s3_operator
.read(&bucket_name)
.key(file_path)
.send();
let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await {
Ok(Ok(response)) => response,

View file

@ -1,18 +1,19 @@
use crate::config::AppConfig;
use crate::package_manager::{ InstallMode, PackageManager };
use crate::package_manager::{InstallMode, PackageManager};
use anyhow::Result;
use csv;
use diesel::connection::SimpleConnection;
use diesel::Connection;
use dotenvy::dotenv;
use log::{ info, trace, error };
use aws_sdk_s3::Client as S3Client;
use csv;
use diesel::RunQueryDsl;
use dotenvy::dotenv;
use log::{error, info, trace};
use opendal::services::S3;
use opendal::{Operator, OperatorBuilder};
use rand::distr::Alphanumeric;
use sha2::{ Digest, Sha256 };
use sha2::{Digest, Sha256};
use std::io::{self, Write};
use std::path::Path;
use std::process::Command;
use std::io::{ self, Write };
pub struct ComponentInfo {
pub name: &'static str,
@ -26,10 +27,9 @@ pub struct BootstrapManager {
impl BootstrapManager {
pub fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
trace!(
info!(
"Initializing BootstrapManager with mode {:?} and tenant {:?}",
install_mode,
tenant
install_mode, tenant
);
Self {
install_mode,
@ -40,87 +40,135 @@ impl BootstrapManager {
pub fn start_all(&mut self) -> Result<()> {
let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?;
let components = vec![
ComponentInfo { name: "tables", termination_command: "pg_ctl" },
ComponentInfo { name: "cache", termination_command: "valkey-server" },
ComponentInfo { name: "drive", termination_command: "minio" },
ComponentInfo { name: "llm", termination_command: "llama-server" },
ComponentInfo { name: "email", termination_command: "stalwart" },
ComponentInfo { name: "proxy", termination_command: "caddy" },
ComponentInfo { name: "directory", termination_command: "zitadel" },
ComponentInfo { name: "alm", termination_command: "forgejo" },
ComponentInfo { name: "alm_ci", termination_command: "forgejo-runner" },
ComponentInfo { name: "dns", termination_command: "coredns" },
ComponentInfo { name: "webmail", termination_command: "php" },
ComponentInfo { name: "meeting", termination_command: "livekit-server" },
ComponentInfo { name: "table_editor", termination_command: "nocodb" },
ComponentInfo { name: "doc_editor", termination_command: "coolwsd" },
ComponentInfo { name: "desktop", termination_command: "xrdp" },
ComponentInfo { name: "devtools", termination_command: "" },
ComponentInfo { name: "bot", termination_command: "" },
ComponentInfo { name: "system", termination_command: "" },
ComponentInfo { name: "vector_db", termination_command: "qdrant" },
ComponentInfo { name: "host", termination_command: "" }
ComponentInfo {
name: "tables",
termination_command: "pg_ctl",
},
ComponentInfo {
name: "cache",
termination_command: "valkey-server",
},
ComponentInfo {
name: "drive",
termination_command: "minio",
},
ComponentInfo {
name: "llm",
termination_command: "llama-server",
},
ComponentInfo {
name: "email",
termination_command: "stalwart",
},
ComponentInfo {
name: "proxy",
termination_command: "caddy",
},
ComponentInfo {
name: "directory",
termination_command: "zitadel",
},
ComponentInfo {
name: "alm",
termination_command: "forgejo",
},
ComponentInfo {
name: "alm_ci",
termination_command: "forgejo-runner",
},
ComponentInfo {
name: "dns",
termination_command: "coredns",
},
ComponentInfo {
name: "webmail",
termination_command: "php",
},
ComponentInfo {
name: "meeting",
termination_command: "livekit-server",
},
ComponentInfo {
name: "table_editor",
termination_command: "nocodb",
},
ComponentInfo {
name: "doc_editor",
termination_command: "coolwsd",
},
ComponentInfo {
name: "desktop",
termination_command: "xrdp",
},
ComponentInfo {
name: "devtools",
termination_command: "",
},
ComponentInfo {
name: "bot",
termination_command: "",
},
ComponentInfo {
name: "system",
termination_command: "",
},
ComponentInfo {
name: "vector_db",
termination_command: "qdrant",
},
ComponentInfo {
name: "host",
termination_command: "",
},
];
for component in components {
if pm.is_installed(component.name) {
trace!("Starting component: {}", component.name);
pm.start(component.name)?;
} else {
trace!("Component {} not installed, skipping start", component.name);
if let Err(e) = self.update_bot_config(component.name) {
error!(
"Failed to update bot config after installing {}: {}",
component.name,
e
component.name, e
);
}
}
}
Ok(())
}
pub fn bootstrap(&mut self) -> Result<AppConfig> {
// Check for legacy mode - if TABLES_SERVER is present, skip bootstrap
if let Ok(tables_server) = std::env::var("TABLES_SERVER") {
if !tables_server.is_empty() {
trace!(
info!(
"Legacy mode detected (TABLES_SERVER present), skipping bootstrap installation"
);
info!("Running in legacy mode with existing database configuration");
// Try to connect to the database and load config
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
let username = std::env
::var("TABLES_USERNAME")
.unwrap_or_else(|_| "postgres".to_string());
let password = std::env
::var("TABLES_PASSWORD")
.unwrap_or_else(|_| "postgres".to_string());
let server = std::env
::var("TABLES_SERVER")
.unwrap_or_else(|_| "localhost".to_string());
let username =
std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "postgres".to_string());
let password =
std::env::var("TABLES_PASSWORD").unwrap_or_else(|_| "postgres".to_string());
let server =
std::env::var("TABLES_SERVER").unwrap_or_else(|_| "localhost".to_string());
let port = std::env::var("TABLES_PORT").unwrap_or_else(|_| "5432".to_string());
let database = std::env
::var("TABLES_DATABASE")
.unwrap_or_else(|_| "gbserver".to_string());
format!("postgres://{}:{}@{}:{}/{}", username, password, server, port, database)
let database =
std::env::var("TABLES_DATABASE").unwrap_or_else(|_| "gbserver".to_string());
format!(
"postgres://{}:{}@{}:{}/{}",
username, password, server, port, database
)
});
match diesel::PgConnection::establish(&database_url) {
Ok(mut conn) => {
info!("Successfully connected to legacy database, loading configuration");
// Apply migrations
if let Err(e) = self.apply_migrations(&mut conn) {
log::warn!("Failed to apply migrations: {}", e);
}
return Ok(AppConfig::from_database(&mut conn));
}
Err(e) => {
log::warn!("Failed to connect to legacy database: {}", e);
info!("Using environment variables as fallback");
return Ok(AppConfig::from_env());
}
}
@ -133,16 +181,17 @@ impl BootstrapManager {
for component in required_components {
if !pm.is_installed(component) {
// Determine termination command from package manager component config
let termination_cmd = pm.components
let termination_cmd = pm
.components
.get(component)
.and_then(|cfg| cfg.binary_name.clone())
.unwrap_or_else(|| component.to_string());
// If a termination command is defined, check for leftover running process
if !termination_cmd.is_empty() {
let check = Command::new("pgrep").arg("-f").arg(&termination_cmd).output();
let check = Command::new("pgrep")
.arg("-f")
.arg(&termination_cmd)
.output();
if let Ok(output) = check {
if !output.stdout.is_empty() {
println!("Component '{}' appears to be already running from a previous install.", component);
@ -157,7 +206,10 @@ impl BootstrapManager {
.status();
println!("Terminated existing '{}' process.", component);
} else {
println!("Skipping start of '{}' as it is already running.", component);
println!(
"Skipping start of '{}' as it is already running.",
component
);
continue;
}
}
@ -167,29 +219,20 @@ impl BootstrapManager {
if component == "tables" {
let db_password = self.generate_secure_password(16);
let farm_password = self.generate_secure_password(32);
let env_contents = format!(
"FARM_PASSWORD={}\nDATABASE_URL=postgres://gbuser:{}@localhost:5432/botserver",
farm_password,
db_password
farm_password, db_password
);
std::fs
::write(".env", &env_contents)
std::fs::write(".env", &env_contents)
.map_err(|e| anyhow::anyhow!("Failed to write .env file: {}", e))?;
dotenv().ok();
trace!("Generated database credentials and wrote to .env file");
}
trace!("Installing required component: {}", component);
futures::executor::block_on(pm.install(component))?;
if component == "tables" {
trace!("Component {} installed successfully", component);
let database_url = std::env::var("DATABASE_URL").unwrap();
let mut conn = diesel::PgConnection
::establish(&database_url)
let mut conn = diesel::PgConnection::establish(&database_url)
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
let migration_dir = include_dir::include_dir!("./migrations");
@ -197,27 +240,21 @@ impl BootstrapManager {
.files()
.filter_map(|file| {
let path = file.path();
trace!("Found file: {:?}", path);
if path.extension()? == "sql" {
trace!(" -> SQL file included");
Some(file)
} else {
trace!(" -> Not a SQL file, skipping");
None
}
})
.collect();
trace!("Total migration files found: {}", migration_files.len());
migration_files.sort_by_key(|f| f.path());
for migration_file in migration_files {
let migration = migration_file
.contents_utf8()
.ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?;
trace!("Executing migration: {}", migration_file.path().display());
// Use batch_execute to handle multiple statements including those with dollar-quoted strings
if let Err(e) = conn.batch_execute(migration) {
log::error!(
"Failed to execute migration {}: {}",
@ -226,14 +263,13 @@ impl BootstrapManager {
);
return Err(e.into());
}
trace!(
info!(
"Successfully executed migration: {}",
migration_file.path().display()
);
}
config = AppConfig::from_database(&mut conn);
info!("Database migrations completed and configuration loaded");
}
}
}
@ -242,12 +278,9 @@ impl BootstrapManager {
}
fn generate_secure_password(&self, length: usize) -> String {
// Ensure the Rng trait is in scope for `sample`
use rand::Rng;
let mut rng = rand::rng();
std::iter
::repeat_with(|| rng.sample(Alphanumeric) as char)
std::iter::repeat_with(|| rng.sample(Alphanumeric) as char)
.take(length)
.collect()
}
@ -259,175 +292,58 @@ impl BootstrapManager {
format!("{:x}", hasher.finalize())
}
/// Update the bot configuration after a component is installed.
/// This reads the existing `config.csv` from the default bot bucket,
///fix s values based on the installed component, and
/// writes the updated CSV back to the bucket. It also upserts the
/// key/value pairs into the `bot_config` table.
fn update_bot_config(&self, component: &str) -> Result<()> {
// Determine bucket name: DRIVE_ORG_PREFIX + "default.gbai"
let org_prefix = std::env
::var("DRIVE_ORG_PREFIX")
.unwrap_or_else(|_| "pragmatismo-".to_string());
let bucket_name = format!("{}default.gbai", org_prefix);
let config_key = "default.gbot/config.csv";
// Build S3 client using default SDK config (compatible with S3Client)
let s3_client = S3Client::from_conf(aws_sdk_s3::Config::builder().build());
// Attempt to download existing config.csv
let existing_csv = match
futures::executor::block_on(
s3_client.get_object().bucket(&bucket_name).key(config_key).send()
)
{
Ok(resp) => {
let data = futures::executor::block_on(resp.body.collect())?;
String::from_utf8(data.into_bytes().to_vec()).unwrap_or_default()
}
Err(_) => String::new(), // No existing file start fresh
};
// Parse CSV into a map
let mut config_map: std::collections::HashMap<
String,
String
> = std::collections::HashMap::new();
if !existing_csv.is_empty() {
let mut rdr = csv::ReaderBuilder
::new()
.has_headers(false)
.from_reader(existing_csv.as_bytes());
for result in rdr.records() {
if let Ok(record) = result {
if record.len() >= 2 {
config_map.insert(record[0].to_string(), record[1].to_string());
}
}
}
}
// Update configuration based on the installed component
config_map.insert(component.to_string(), "true".to_string());
// Serialize back to CSV
let mut wtr = csv::WriterBuilder
::new()
.has_headers(false)
.from_writer(vec![]);
for (k, v) in &config_map {
wtr.write_record(&[k, v])?;
}
wtr.flush()?;
let csv_bytes = wtr.into_inner()?;
// Upload updated CSV to S3
futures::executor::block_on(
s3_client
.put_object()
.bucket(&bucket_name)
.key(config_key)
.body(csv_bytes.clone().into())
.send()
)?;
// Upsert into bot_config table
let database_url = std::env
::var("DATABASE_URL")
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
let mut conn = diesel::pg::PgConnection::establish(&database_url)?;
for (k, v) in config_map {
diesel
::sql_query(
"INSERT INTO bot_config (key, value) VALUES ($1, $2) \
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value"
)
.bind::<diesel::sql_types::Text, _>(&k)
.bind::<diesel::sql_types::Text, _>(&v)
.execute(&mut conn)?;
for (k, v) in vec![(component.to_string(), "true".to_string())] {
diesel::sql_query(
"INSERT INTO bot_config (key, value) VALUES ($1, $2) \
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value",
)
.bind::<diesel::sql_types::Text, _>(&k)
.bind::<diesel::sql_types::Text, _>(&v)
.execute(&mut conn)?;
}
Ok(())
}
pub async fn upload_templates_to_minio(&self, config: &AppConfig) -> Result<()> {
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
info!("Uploading template bots to MinIO and creating bot entries...");
// First, create bot entries in database for each template
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url());
let mut conn = diesel::PgConnection::establish(&database_url)?;
self.create_bots_from_templates(&mut conn)?;
let creds = Credentials::new(
&config.minio.access_key,
&config.minio.secret_key,
None,
None,
"minio"
);
let builder = S3::default();
builder
.root("/")
.endpoint(&config.minio.server)
.access_key_id(&config.minio.access_key)
.secret_access_key(&config.minio.secret_key);
let s3_config = aws_sdk_s3::Config
::builder()
.credentials_provider(creds)
.endpoint_url(&config.minio.server)
.region(Region::new("us-east-1"))
.force_path_style(true)
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.build();
// if !config.minio.use_ssl {
// builder.disable_ssl_verification(true);
// }
let client = aws_sdk_s3::Client::from_conf(s3_config);
let client = Operator::new(builder)?.finish();
// Upload templates from templates/ directory
let templates_dir = Path::new("templates");
if !templates_dir.exists() {
trace!("Templates directory not found, skipping upload");
return Ok(());
}
// Walk through each .gbai folder in templates/
for entry in std::fs::read_dir(templates_dir)? {
let entry = entry?;
let path = entry.path();
if
path.is_dir() &&
path
.extension()
.map(|e| e == "gbai")
.unwrap_or(false)
{
if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) {
let bot_name = path.file_name().unwrap().to_string_lossy().to_string();
let bucket_name = format!("{}{}", config.minio.org_prefix, bot_name);
trace!("Creating bucket: {}", bucket_name);
// Create bucket if it doesn't exist
match client.create_bucket().bucket(&bucket_name).send().await {
Ok(_) => info!("Created bucket: {}", bucket_name),
Err(e) => {
let err_str = e.to_string();
if
err_str.contains("BucketAlreadyOwnedByYou") ||
err_str.contains("BucketAlreadyExists")
{
trace!("Bucket {} already exists", bucket_name);
} else {
log::warn!("Failed to create bucket {}: {}", bucket_name, e);
}
}
}
// Upload all files recursively
self.upload_directory_recursive(&client, &path, &bucket_name, "").await?;
info!("Uploaded template bot: {}", bot_name);
self.upload_directory_recursive(&client, &path, &bot_name, "")
.await?;
}
}
info!("Template bots uploaded successfully");
Ok(())
}
@ -435,31 +351,17 @@ impl BootstrapManager {
use crate::shared::models::schema::bots;
use diesel::prelude::*;
info!("Creating bot entries from template folders...");
let templates_dir = Path::new("templates");
if !templates_dir.exists() {
trace!("Templates directory not found, skipping bot creation");
return Ok(());
}
// Walk through each .gbai folder in templates/
for entry in std::fs::read_dir(templates_dir)? {
let entry = entry?;
let path = entry.path();
if
path.is_dir() &&
path
.extension()
.map(|e| e == "gbai")
.unwrap_or(false)
{
if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) {
let bot_folder = path.file_name().unwrap().to_string_lossy().to_string();
// Remove .gbai extension to get bot name
let bot_name = bot_folder.trim_end_matches(".gbai");
// Format the name nicely (capitalize first letter of each word)
let formatted_name = bot_name
.split('_')
.map(|word| {
@ -474,7 +376,6 @@ impl BootstrapManager {
.collect::<Vec<_>>()
.join(" ");
// Check if bot already exists
let existing: Option<String> = bots::table
.filter(bots::name.eq(&formatted_name))
.select(bots::name)
@ -482,39 +383,30 @@ impl BootstrapManager {
.optional()?;
if existing.is_none() {
// Insert new bot
diesel
::sql_query(
"INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \
diesel::sql_query(
"INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \
VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)"
)
.bind::<diesel::sql_types::Text, _>(&formatted_name)
.bind::<diesel::sql_types::Text, _>(
format!("Bot for {} template", bot_name)
)
.execute(conn)?;
info!("Created bot entry: {}", formatted_name);
)
.bind::<diesel::sql_types::Text, _>(&formatted_name)
.bind::<diesel::sql_types::Text, _>(format!("Bot for {} template", bot_name))
.execute(conn)?;
} else {
trace!("Bot already exists: {}", formatted_name);
trace!("Bot {} already exists", formatted_name);
}
}
}
info!("Bot creation from templates completed");
Ok(())
}
fn upload_directory_recursive<'a>(
&'a self,
client: &'a aws_sdk_s3::Client,
client: &'a Operator,
local_path: &'a Path,
bucket: &'a str,
prefix: &'a str
prefix: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> {
Box::pin(async move {
use aws_sdk_s3::primitives::ByteStream;
for entry in std::fs::read_dir(local_path)? {
let entry = entry?;
let path = entry.path();
@ -526,39 +418,30 @@ impl BootstrapManager {
};
if path.is_file() {
trace!(
info!(
"Uploading file: {} to bucket: {} with key: {}",
path.display(),
bucket,
key
);
let body = ByteStream::from_path(&path).await?;
client.put_object().bucket(bucket).key(&key).body(body).send().await?;
trace!("Uploaded: {}", key);
let content = std::fs::read(&path)?;
client.write(&key, content).await?;
} else if path.is_dir() {
self.upload_directory_recursive(client, &path, bucket, &key).await?;
self.upload_directory_recursive(client, &path, bucket, &key)
.await?;
}
}
Ok(())
})
}
fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> {
info!("Applying database migrations...");
let migrations_dir = std::path::Path::new("migrations");
if !migrations_dir.exists() {
trace!("No migrations directory found, skipping");
return Ok(());
}
// Get all .sql files sorted
let mut sql_files: Vec<_> = std::fs
::read_dir(migrations_dir)?
let mut sql_files: Vec<_> = std::fs::read_dir(migrations_dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry
@ -575,26 +458,19 @@ impl BootstrapManager {
for entry in sql_files {
let path = entry.path();
let filename = path.file_name().unwrap().to_string_lossy();
trace!("Reading migration: {}", filename);
match std::fs::read_to_string(&path) {
Ok(sql) => {
trace!("Applying migration: {}", filename);
match conn.batch_execute(&sql) {
Ok(_) => info!("Applied migration: {}", filename),
Err(e) => {
// Ignore errors for already applied migrations
trace!("Migration {} result: {}", filename, e);
}
Ok(sql) => match conn.batch_execute(&sql) {
Err(e) => {
log::warn!("Migration {} failed: {}", filename, e);
}
}
_ => {}
},
Err(e) => {
log::warn!("Failed to read migration {}: {}", filename, e);
}
}
}
info!("Migrations check completed");
Ok(())
}
}

View file

@ -2,14 +2,13 @@ use crate::basic::compiler::BasicCompiler;
use crate::kb::embeddings;
use crate::kb::qdrant_client;
use crate::shared::state::AppState;
use aws_sdk_s3::Client as S3Client;
use log::{debug, error, info, warn};
use opendal::Operator;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use tokio::time::{interval, Duration};
/// Tracks file state for change detection
#[derive(Debug, Clone)]
pub struct FileState {
pub path: String,
@ -18,7 +17,6 @@ pub struct FileState {
pub last_modified: Option<String>,
}
/// Drive monitor that watches for changes and triggers compilation/indexing
pub struct DriveMonitor {
state: Arc<AppState>,
bucket_name: String,
@ -34,18 +32,12 @@ impl DriveMonitor {
}
}
/// Start the drive monitoring service
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!(
"Drive Monitor service started for bucket: {}",
self.bucket_name
);
let mut tick = interval(Duration::from_secs(30)); // Check every 30 seconds
info!("Drive Monitor service started for bucket: {}", self.bucket_name);
let mut tick = interval(Duration::from_secs(30));
loop {
tick.tick().await;
if let Err(e) = self.check_for_changes().await {
error!("Error checking for drive changes: {}", e);
}
@ -53,101 +45,66 @@ impl DriveMonitor {
})
}
/// Check for file changes in the drive
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let s3_client = match &self.state.s3_client {
Some(client) => client,
let op = match &self.state.s3_operator {
Some(op) => op,
None => {
debug!("S3 client not configured");
return Ok(());
}
};
// Check .gbdialog folder for BASIC tools
self.check_gbdialog_changes(s3_client).await?;
self.check_gbdialog_changes(op).await?;
self.check_gbkb_changes(op).await?;
// Check .gbkb folder for KB documents
self.check_gbkb_changes(s3_client).await?;
// Check for default bot configuration in the drive bucket
if let Err(e) = self.check_default_gbot(s3_client).await {
if let Err(e) = self.check_default_gbot(op).await {
error!("Error checking default bot config: {}", e);
}
Ok(())
}
/// Check .gbdialog folder for BASIC tool changes
async fn check_gbdialog_changes(
&self,
s3_client: &S3Client,
op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbdialog/";
debug!("Checking {} folder for changes", prefix);
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new();
loop {
let mut list_request = s3_client
.list_objects_v2()
.bucket(&self.bucket_name)
.prefix(prefix);
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = lister.next().await {
let entry = entry?;
let path = entry.path().to_string();
if let Some(token) = continuation_token {
list_request = list_request.continuation_token(token);
if path.ends_with('/') || !path.ends_with(".bas") {
continue;
}
let list_result = list_request.send().await?;
if let Some(contents) = list_result.contents {
for object in contents {
if let Some(key) = object.key {
// Skip directories and non-.bas files
if key.ends_with('/') || !key.ends_with(".bas") {
continue;
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
}
if list_result.is_truncated.unwrap_or(false) {
continuation_token = list_result.next_continuation_token;
} else {
break;
}
let meta = entry.metadata().await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
}
// Compare with previous state and handle changes
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag {
info!("BASIC tool modified: {}", path);
if let Err(e) = self.compile_tool(s3_client, path).await {
if let Err(e) = self.compile_tool(op, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
} else {
// New file
info!("New BASIC tool detected: {}", path);
if let Err(e) = self.compile_tool(s3_client, path).await {
if let Err(e) = self.compile_tool(op, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
}
// Check for deleted files
let previous_paths: Vec<String> = file_states
.keys()
.filter(|k| k.starts_with(prefix))
@ -156,13 +113,10 @@ impl DriveMonitor {
for path in previous_paths {
if !current_files.contains_key(&path) {
info!("BASIC tool deleted: {}", path);
// TODO: Mark tool as inactive in database
file_states.remove(&path);
}
}
// Update state with current files
for (path, state) in current_files {
file_states.insert(path, state);
}
@ -170,84 +124,53 @@ impl DriveMonitor {
Ok(())
}
/// Check .gbkb folder for KB document changes
async fn check_gbkb_changes(
&self,
s3_client: &S3Client,
op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbkb/";
debug!("Checking {} folder for changes", prefix);
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new();
loop {
let mut list_request = s3_client
.list_objects_v2()
.bucket(&self.bucket_name)
.prefix(prefix);
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = lister.next().await {
let entry = entry?;
let path = entry.path().to_string();
if let Some(token) = continuation_token {
list_request = list_request.continuation_token(token);
if path.ends_with('/') {
continue;
}
let list_result = list_request.send().await?;
if let Some(contents) = list_result.contents {
for object in contents {
if let Some(key) = object.key {
// Skip directories
if key.ends_with('/') {
continue;
}
// Only process supported file types
let ext = key.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
}
if list_result.is_truncated.unwrap_or(false) {
continuation_token = list_result.next_continuation_token;
} else {
break;
}
let meta = entry.metadata().await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
}
// Compare with previous state and handle changes
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag {
info!("KB document modified: {}", path);
if let Err(e) = self.index_document(s3_client, path).await {
if let Err(e) = self.index_document(op, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
} else {
// New file
info!("New KB document detected: {}", path);
if let Err(e) = self.index_document(s3_client, path).await {
if let Err(e) = self.index_document(op, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
}
// Check for deleted files
let previous_paths: Vec<String> = file_states
.keys()
.filter(|k| k.starts_with(prefix))
@ -256,13 +179,10 @@ impl DriveMonitor {
for path in previous_paths {
if !current_files.contains_key(&path) {
info!("KB document deleted: {}", path);
// TODO: Delete from Qdrant and mark in database
file_states.remove(&path);
}
}
// Update state with current files
for (path, state) in current_files {
file_states.insert(path, state);
}
@ -270,76 +190,36 @@ impl DriveMonitor {
Ok(())
}
/// Check for default bot configuration in the drive bucket
async fn check_default_gbot(
&self,
s3_client: &S3Client,
op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// The default bot configuration is expected at:
// <bucket>/<DRIVE_ORG_PREFIX>default.gbai/default.gbot/config.csv
// Construct the expected key prefix
let prefix = format!("{}default.gbot/", self.bucket_name);
let config_key = format!("{}config.csv", prefix);
debug!("Checking for default bot config at key: {}", config_key);
// Attempt to get the object metadata to see if it exists
let head_req = s3_client
.head_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await;
match head_req {
match op.stat(&config_key).await {
Ok(_) => {
info!("Default bot config found, downloading {}", config_key);
// Download the CSV file
let get_resp = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await?;
let data = get_resp.body.collect().await?;
let csv_content = String::from_utf8(data.into_bytes().to_vec())
let content = op.read(&config_key).await?;
let csv_content = String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 error in config.csv: {}", e))?;
// Log the retrieved configuration (in a real implementation this would be parsed
// and used to populate the bot_config table, respecting overrides from .gbot files)
info!("Retrieved default bot config CSV:\n{}", csv_content);
// TODO: Parse CSV and upsert into bot_config table with appropriate precedence
debug!("Found config.csv: {} bytes", csv_content.len());
Ok(())
}
Err(e) => {
// If the object does not exist, simply ignore
debug!("Default bot config not present: {}", e);
debug!("Config file not found or inaccessible: {}", e);
Ok(())
}
}
}
/// Compile a BASIC tool file
async fn compile_tool(
&self,
s3_client: &S3Client,
op: &Operator,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Compiling BASIC tool: {}", file_path);
let content = op.read(file_path).await?;
let source_content = String::from_utf8(content.to_vec())?;
// Download source from S3
let get_response = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let data = get_response.body.collect().await?;
let source_content = String::from_utf8(data.into_bytes().to_vec())?;
// Extract tool name
let tool_name = file_path
.strip_prefix(".gbdialog/")
.unwrap_or(file_path)
@ -347,10 +227,6 @@ impl DriveMonitor {
.unwrap_or(file_path)
.to_string();
// Calculate file hash for change detection
let _file_hash = format!("{:x}", source_content.len());
// Create work directory using bot from bucket name
let bot_name = self
.bucket_name
.strip_suffix(".gbai")
@ -358,46 +234,31 @@ impl DriveMonitor {
let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name);
std::fs::create_dir_all(&work_dir)?;
// Write source to local file
let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
std::fs::write(&local_source_path, &source_content)?;
// Compile using BasicCompiler
let compiler = BasicCompiler::new(Arc::clone(&self.state));
let result = compiler.compile_file(&local_source_path, &work_dir)?;
info!("Tool compiled successfully: {}", tool_name);
info!(" AST: {}", result.ast_path);
// Save to database
if let Some(mcp_tool) = result.mcp_tool {
info!(
" MCP tool definition generated with {} parameters",
"MCP tool definition generated with {} parameters",
mcp_tool.input_schema.properties.len()
);
}
if result.openai_tool.is_some() {
info!(" OpenAI tool definition generated");
debug!("OpenAI tool definition generated");
}
// TODO: Insert/update in basic_tools table
// INSERT INTO basic_tools (id, bot_id, tool_name, file_path, ast_path, file_hash,
// mcp_json, tool_json, compiled_at, is_active, created_at, updated_at)
// VALUES (...) ON CONFLICT (bot_id, tool_name) DO UPDATE SET ...
Ok(())
}
/// Index a KB document
async fn index_document(
&self,
s3_client: &S3Client,
op: &Operator,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Indexing KB document: {}", file_path);
// Extract collection name from path (.gbkb/collection_name/file.pdf)
let parts: Vec<&str> = file_path.split('/').collect();
if parts.len() < 3 {
warn!("Invalid KB path structure: {}", file_path);
@ -405,21 +266,10 @@ impl DriveMonitor {
}
let collection_name = parts[1];
let content = op.read(file_path).await?;
let bytes = content.to_vec();
// Download file from S3
let get_response = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let data = get_response.body.collect().await?;
let bytes = data.into_bytes().to_vec();
// Extract text based on file type
let text_content = self.extract_text(file_path, &bytes)?;
if text_content.trim().is_empty() {
warn!("No text extracted from: {}", file_path);
return Ok(());
@ -431,35 +281,21 @@ impl DriveMonitor {
file_path
);
// Create Qdrant collection name
let qdrant_collection = format!("kb_default_{}", collection_name);
// Ensure collection exists
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
// Index document
embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content)
.await?;
info!("Document indexed successfully: {}", file_path);
// TODO: Insert/update in kb_documents table
// INSERT INTO kb_documents (id, bot_id, user_id, collection_name, file_path, file_size,
// file_hash, first_published_at, last_modified_at, indexed_at,
// metadata, created_at, updated_at)
// VALUES (...) ON CONFLICT (...) DO UPDATE SET ...
Ok(())
}
/// Extract text from various file types
fn extract_text(
&self,
file_path: &str,
content: &[u8],
) -> Result<String, Box<dyn Error + Send + Sync>> {
let path_lower = file_path.to_ascii_lowercase();
if path_lower.ends_with(".pdf") {
match pdf_extract::extract_text_from_mem(content) {
Ok(text) => Ok(text),
@ -472,16 +308,13 @@ impl DriveMonitor {
String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 decoding failed: {}", e).into())
} else {
// Try as plain text
String::from_utf8(content.to_vec())
.map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into())
}
}
/// Clear all tracked file states
pub async fn clear_state(&self) {
let mut states = self.file_states.write().await;
states.clear();
info!("Cleared all file states");
}
}

View file

@ -1,11 +1,11 @@
use actix_multipart::Multipart;
use actix_web::web;
use actix_web::{post, HttpResponse};
use aws_sdk_s3::{Client, Error as S3Error};
use log::{error, info};
use opendal::Operator;
use std::io::Write;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt;
use crate::config::DriveConfig;
use crate::shared::state::AppState;
@ -16,15 +16,11 @@ pub async fn upload_file(
state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let folder_path = folder_path.into_inner();
// Create a temporary file that will hold the uploaded data
let mut temp_file = NamedTempFile::new().map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to create temp file: {}", e))
})?;
let mut file_name: Option<String> = None;
// Process multipart form data
while let Some(mut field) = payload.try_next().await? {
if let Some(disposition) = field.content_disposition() {
if let Some(name) = disposition.get_filename() {
@ -32,7 +28,6 @@ pub async fn upload_file(
}
}
// Write each chunk of the field to the temporary file
while let Some(chunk) = field.try_next().await? {
temp_file.write_all(&chunk).map_err(|e| {
actix_web::error::ErrorInternalServerError(format!(
@ -43,44 +38,24 @@ pub async fn upload_file(
}
}
// Use a fallback name if the client didn't supply one
let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string());
// Convert the NamedTempFile into a TempPath so we can get a stable path
let temp_file_path = temp_file.into_temp_path();
// Retrieve the bucket name from configuration, handling the case where it is missing
let bucket_name = match &state.get_ref().config {
Some(cfg) => cfg.s3_bucket.clone(),
None => {
// Clean up the temp file before returning the error
let _ = std::fs::remove_file(&temp_file_path);
return Err(actix_web::error::ErrorInternalServerError(
"S3 bucket configuration is missing",
));
}
};
// Build the S3 object key (folder + filename)
let s3_key = format!("{}/{}", folder_path, file_name);
// Retrieve a reference to the S3 client, handling the case where it is missing
let s3_client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 operator is not initialized")
})?;
// Perform the upload
match upload_to_s3(s3_client, &bucket_name, &s3_key, &temp_file_path).await {
let s3_key = format!("{}/{}", folder_path, file_name);
match upload_to_s3(op, &s3_key, &temp_file_path).await {
Ok(_) => {
// Remove the temporary file now that the upload succeeded
let _ = std::fs::remove_file(&temp_file_path);
Ok(HttpResponse::Ok().body(format!(
"Uploaded file '{}' to folder '{}' in S3 bucket '{}'",
file_name, folder_path, bucket_name
"Uploaded file '{}' to folder '{}'",
file_name, folder_path
)))
}
Err(e) => {
// Ensure the temporary file is cleaned up even on failure
let _ = std::fs::remove_file(&temp_file_path);
Err(actix_web::error::ErrorInternalServerError(format!(
"Failed to upload file to S3: {}",
@ -90,61 +65,34 @@ pub async fn upload_file(
}
}
// Helper function to get S3 client
pub async fn init_drive(cfg: &DriveConfig) -> Result<Client, Box<dyn std::error::Error>> {
// Build static credentials from the Drive configuration.
let credentials = aws_sdk_s3::config::Credentials::new(
cfg.access_key.clone(),
cfg.secret_key.clone(),
None,
None,
"static",
);
pub async fn init_drive(cfg: &DriveConfig) -> Result<Operator, Box<dyn std::error::Error>> {
use opendal::services::S3;
use opendal::Operator;
// Construct the endpoint URL, respecting the SSL flag.
let scheme = if cfg.use_ssl { "https" } else { "http" };
let endpoint = format!("{}://{}", scheme, cfg.server);
let mut builder = S3::default();
// MinIO requires pathstyle addressing.
let s3_config = aws_sdk_s3::config::Builder::new()
// Set the behavior version to the latest to satisfy the SDK requirement.
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.endpoint_url(endpoint)
.credentials_provider(credentials)
.force_path_style(true)
.build();
builder.root("/");
builder.endpoint(&cfg.server);
builder.access_key_id(&cfg.access_key);
builder.secret_access_key(&cfg.secret_key);
Ok(Client::from_conf(s3_config))
if cfg.server.contains("minio") || cfg.server.contains("localhost") {
builder.enable_virtual_host_style();
}
let op = Operator::new(builder)?.finish();
info!("OpenDAL S3 operator initialized for bucket: {}", cfg.bucket);
Ok(op)
}
// Helper function to upload file to S3
async fn upload_to_s3(
client: &Client,
bucket: &str,
op: &Operator,
key: &str,
file_path: &std::path::Path,
) -> Result<(), S3Error> {
// Convert the file at `file_path` into a ByteStream, mapping any I/O error
// into the appropriate `SdkError` type expected by the function signature.
let body = aws_sdk_s3::primitives::ByteStream::from_path(file_path)
.await
.map_err(|e| {
aws_sdk_s3::error::SdkError::<
aws_sdk_s3::operation::put_object::PutObjectError,
aws_sdk_s3::primitives::ByteStream,
>::construction_failure(e)
})?;
// Perform the actual upload to S3.
client
.put_object()
.bucket(bucket)
.key(key)
.body(body)
.send()
.await
.map(|_| ())?; // Convert the successful output to `()`.
) -> Result<(), opendal::Error> {
let data = std::fs::read(file_path)?;
op.write(key, data).await?;
Ok(())
}

View file

@ -1,12 +1,11 @@
use crate::shared::state::AppState;
use aws_sdk_s3::Client as S3Client;
use log::{debug, error, info};
use opendal::Operator;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use tokio::time::{interval, Duration};
/// MinIO file state tracker
#[derive(Debug, Clone)]
pub struct FileState {
pub path: String,
@ -15,52 +14,41 @@ pub struct FileState {
pub last_modified: Option<String>,
}
/// MinIO handler that monitors bucket changes
pub struct MinIOHandler {
state: Arc<AppState>,
bucket_name: String,
watched_prefixes: Arc<tokio::sync::RwLock<Vec<String>>>,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
}
impl MinIOHandler {
pub fn new(state: Arc<AppState>, bucket_name: String) -> Self {
pub fn new(state: Arc<AppState>) -> Self {
Self {
state,
bucket_name,
watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())),
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
}
/// Add a prefix to watch (e.g., ".gbkb/", ".gbdialog/")
pub async fn watch_prefix(&self, prefix: String) {
let mut prefixes = self.watched_prefixes.write().await;
if !prefixes.contains(&prefix) {
prefixes.push(prefix.clone());
info!("Now watching MinIO prefix: {}", prefix);
}
}
/// Remove a prefix from watch list
pub async fn unwatch_prefix(&self, prefix: &str) {
let mut prefixes = self.watched_prefixes.write().await;
prefixes.retain(|p| p != prefix);
info!("Stopped watching MinIO prefix: {}", prefix);
}
/// Start the monitoring service
pub fn spawn(
self: Arc<Self>,
change_callback: Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!("MinIO Handler service started");
let mut tick = interval(Duration::from_secs(15)); // Check every 15 seconds
let mut tick = interval(Duration::from_secs(15));
loop {
tick.tick().await;
if let Err(e) = self.check_for_changes(&change_callback).await {
error!("Error checking for MinIO changes: {}", e);
}
@ -68,93 +56,59 @@ impl MinIOHandler {
})
}
/// Check for file changes in watched prefixes
async fn check_for_changes(
&self,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let s3_client = match &self.state.s3_client {
Some(client) => client,
let op = match &self.state.s3_operator {
Some(op) => op,
None => {
debug!("S3 client not configured");
return Ok(());
}
};
let prefixes = self.watched_prefixes.read().await;
for prefix in prefixes.iter() {
debug!("Checking prefix: {}", prefix);
if let Err(e) = self.check_prefix_changes(s3_client, prefix, callback).await {
if let Err(e) = self.check_prefix_changes(op, prefix, callback).await {
error!("Error checking prefix {}: {}", prefix, e);
}
}
Ok(())
}
/// Check changes in a specific prefix
async fn check_prefix_changes(
&self,
s3_client: &S3Client,
op: &Operator,
prefix: &str,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// List all objects with the prefix
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new();
loop {
let mut list_request = s3_client
.list_objects_v2()
.bucket(&self.bucket_name)
.prefix(prefix);
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = lister.next().await {
let entry = entry?;
let path = entry.path().to_string();
if let Some(token) = continuation_token {
list_request = list_request.continuation_token(token);
if path.ends_with('/') {
continue;
}
let list_result = list_request.send().await?;
if let Some(contents) = list_result.contents {
for object in contents {
if let Some(key) = object.key {
// Skip directories
if key.ends_with('/') {
continue;
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
}
if list_result.is_truncated.unwrap_or(false) {
continuation_token = list_result.next_continuation_token;
} else {
break;
}
let meta = entry.metadata().await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length().parse::<i64>().unwrap_or(0),
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
}
// Compare with previous state
let mut file_states = self.file_states.write().await;
// Check for new or modified files
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag
|| current_state.size != previous_state.size
{
info!("File modified: {}", path);
callback(FileChangeEvent::Modified {
path: path.clone(),
size: current_state.size,
@ -162,8 +116,6 @@ impl MinIOHandler {
});
}
} else {
// New file
info!("File created: {}", path);
callback(FileChangeEvent::Created {
path: path.clone(),
size: current_state.size,
@ -172,7 +124,6 @@ impl MinIOHandler {
}
}
// Check for deleted files
let previous_paths: Vec<String> = file_states
.keys()
.filter(|k| k.starts_with(prefix))
@ -181,13 +132,11 @@ impl MinIOHandler {
for path in previous_paths {
if !current_files.contains_key(&path) {
info!("File deleted: {}", path);
callback(FileChangeEvent::Deleted { path: path.clone() });
file_states.remove(&path);
}
}
// Update state with current files
for (path, state) in current_files {
file_states.insert(path, state);
}
@ -195,20 +144,16 @@ impl MinIOHandler {
Ok(())
}
/// Get current state of a file
pub async fn get_file_state(&self, path: &str) -> Option<FileState> {
let states = self.file_states.read().await;
states.get(path).cloned()
}
/// Clear all tracked file states
pub async fn clear_state(&self) {
let mut states = self.file_states.write().await;
states.clear();
info!("Cleared all file states");
}
/// Get all tracked files for a prefix
pub async fn get_files_by_prefix(&self, prefix: &str) -> Vec<FileState> {
let states = self.file_states.read().await;
states
@ -219,7 +164,6 @@ impl MinIOHandler {
}
}
/// File change event types
#[derive(Debug, Clone)]
pub enum FileChangeEvent {
Created {
@ -266,7 +210,6 @@ mod tests {
size: 100,
etag: "abc123".to_string(),
};
assert_eq!(event.path(), "test.txt");
assert_eq!(event.event_type(), "created");
}
@ -286,7 +229,6 @@ mod tests {
let deleted = FileChangeEvent::Deleted {
path: "file3.txt".to_string(),
};
assert_eq!(created.event_type(), "created");
assert_eq!(modified.event_type(), "modified");
assert_eq!(deleted.event_type(), "deleted");

View file

@ -1,6 +1,7 @@
use crate::shared::models::KBCollection;
use crate::shared::state::AppState;
use log::{debug, error, info, warn};
use opendal::Operator;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -10,7 +11,6 @@ pub mod embeddings;
pub mod minio_handler;
pub mod qdrant_client;
/// Represents a change in a KB file
#[derive(Debug, Clone)]
pub enum FileChangeEvent {
Created(String),
@ -18,7 +18,6 @@ pub enum FileChangeEvent {
Deleted(String),
}
/// KB Manager service that coordinates MinIO monitoring and Qdrant indexing
pub struct KBManager {
state: Arc<AppState>,
watched_collections: Arc<tokio::sync::RwLock<HashMap<String, KBCollection>>>,
@ -32,7 +31,6 @@ impl KBManager {
}
}
/// Start watching a KB collection folder
pub async fn add_collection(
&self,
bot_id: String,
@ -47,7 +45,6 @@ impl KBManager {
collection_name, qdrant_collection
);
// Create Qdrant collection if it doesn't exist
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
let now = chrono::Utc::now().to_rfc3339();
@ -67,30 +64,23 @@ impl KBManager {
let mut collections = self.watched_collections.write().await;
collections.insert(collection_name.to_string(), collection);
info!("KB collection added successfully: {}", collection_name);
Ok(())
}
/// Remove a KB collection
pub async fn remove_collection(
&self,
collection_name: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut collections = self.watched_collections.write().await;
collections.remove(collection_name);
info!("KB collection removed: {}", collection_name);
Ok(())
}
/// Start the KB monitoring service
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!("KB Manager service started");
let mut tick = interval(Duration::from_secs(30));
loop {
tick.tick().await;
let collections = self.watched_collections.read().await;
for (name, collection) in collections.iter() {
if let Err(e) = self.check_collection_updates(collection).await {
@ -101,67 +91,44 @@ impl KBManager {
})
}
/// Check for updates in a collection
async fn check_collection_updates(
&self,
collection: &KBCollection,
) -> Result<(), Box<dyn Error + Send + Sync>> {
debug!("Checking updates for collection: {}", collection.name);
let s3_client = match &self.state.s3_client {
Some(client) => client,
let op = match &self.state.s3_operator {
Some(op) => op,
None => {
warn!("S3 client not configured");
warn!("S3 operator not configured");
return Ok(());
}
};
let config = match &self.state.config {
Some(cfg) => cfg,
None => {
error!("App configuration missing");
return Err("App configuration missing".into());
let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?;
while let Some(entry) = lister.next().await {
let entry = entry?;
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
}
};
let bucket_name = format!("{}default.gbai", config.minio.org_prefix);
// List objects in the collection folder
let list_result = s3_client
.list_objects_v2()
.bucket(&bucket_name)
.prefix(&collection.folder_path)
.send()
.await?;
if let Some(contents) = list_result.contents {
for object in contents {
if let Some(key) = object.key {
// Skip directories
if key.ends_with('/') {
continue;
}
// Check if file needs indexing
if let Err(e) = self
.process_file(
&collection,
&key,
object.size.unwrap_or(0),
object.last_modified.map(|dt| dt.to_string()),
)
.await
{
error!("Error processing file {}: {}", key, e);
}
}
let meta = entry.metadata().await?;
if let Err(e) = self
.process_file(
&collection,
&path,
meta.content_length() as i64,
meta.last_modified().map(|dt| dt.to_rfc3339()),
)
.await
{
error!("Error processing file {}: {}", path, e);
}
}
Ok(())
}
/// Process a single file (check if changed and index if needed)
async fn process_file(
&self,
collection: &KBCollection,
@ -169,9 +136,7 @@ impl KBManager {
file_size: i64,
_last_modified: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// Get file content hash
let content = self.get_file_content(file_path).await?;
// Simple hash using length and first/last bytes for change detection
let file_hash = if content.len() > 100 {
format!(
"{:x}_{:x}_{}",
@ -183,24 +148,16 @@ impl KBManager {
format!("{:x}", content.len())
};
// Check if file is already indexed with same hash
if self
.is_file_indexed(collection.bot_id.clone(), file_path, &file_hash)
.await?
{
debug!("File already indexed: {}", file_path);
return Ok(());
}
info!(
"Indexing file: {} to collection {}",
file_path, collection.name
);
// Extract text based on file type
info!("Indexing file: {} to collection {}", file_path, collection.name);
let text_content = self.extract_text(file_path, &content).await?;
// Generate embeddings and store in Qdrant
embeddings::index_document(
&self.state,
&collection.qdrant_collection,
@ -209,7 +166,6 @@ impl KBManager {
)
.await?;
// Save metadata to database
let metadata = serde_json::json!({
"file_type": self.get_file_type(file_path),
"last_modified": _last_modified,
@ -225,48 +181,29 @@ impl KBManager {
)
.await?;
info!("File indexed successfully: {}", file_path);
Ok(())
}
/// Get file content from MinIO
async fn get_file_content(
&self,
file_path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let s3_client = self
let op = self
.state
.s3_client
.s3_operator
.as_ref()
.ok_or("S3 client not configured")?;
.ok_or("S3 operator not configured")?;
let config = self
.state
.config
.as_ref()
.ok_or("App configuration missing")?;
let bucket_name = format!("{}default.gbai", config.minio.org_prefix);
let response = s3_client
.get_object()
.bucket(&bucket_name)
.key(file_path)
.send()
.await?;
let data = response.body.collect().await?;
Ok(data.into_bytes().to_vec())
let content = op.read(file_path).await?;
Ok(content.to_vec())
}
/// Extract text from various file types
async fn extract_text(
&self,
file_path: &str,
content: &[u8],
) -> Result<String, Box<dyn Error + Send + Sync>> {
let path_lower = file_path.to_ascii_lowercase();
if path_lower.ends_with(".pdf") {
match pdf_extract::extract_text_from_mem(content) {
Ok(text) => Ok(text),
@ -279,29 +216,23 @@ impl KBManager {
String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 decoding failed: {}", e).into())
} else if path_lower.ends_with(".docx") {
// TODO: Add DOCX support
warn!("DOCX format not yet supported: {}", file_path);
Err("DOCX format not supported".into())
} else {
// Try as plain text
String::from_utf8(content.to_vec())
.map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into())
}
}
/// Check if file is already indexed
async fn is_file_indexed(
&self,
_bot_id: String,
_file_path: &str,
_file_hash: &str,
) -> Result<bool, Box<dyn Error + Send + Sync>> {
// TODO: Query database to check if file with same hash exists
// For now, return false to always reindex
Ok(false)
}
/// Save document metadata to database
async fn save_document_metadata(
&self,
_bot_id: String,
@ -311,7 +242,6 @@ impl KBManager {
file_hash: &str,
_metadata: serde_json::Value,
) -> Result<(), Box<dyn Error + Send + Sync>> {
// TODO: Save to database using Diesel
info!(
"Saving metadata for {}: size={}, hash={}",
file_path, file_size, file_hash
@ -319,7 +249,6 @@ impl KBManager {
Ok(())
}
/// Get file type from path
fn get_file_type(&self, file_path: &str) -> String {
file_path
.rsplit('.')

View file

@ -1,5 +1,6 @@
#![allow(dead_code)]
#![cfg_attr(feature = "desktop", windows_subsystem = "windows")]
use actix_cors::Cors;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
@ -19,10 +20,8 @@ mod context;
mod drive_monitor;
#[cfg(feature = "email")]
mod email;
#[cfg(feature = "desktop")]
mod ui;
mod file;
mod kb;
mod llm;
@ -65,7 +64,6 @@ use crate::whatsapp::WhatsAppAdapter;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 {
let command = &args[1];
match command.as_str() {
@ -93,10 +91,8 @@ async fn main() -> std::io::Result<()> {
dotenv().ok();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.write_style(env_logger::WriteStyle::Always)
.init();
info!("Starting BotServer bootstrap process");
.write_style(env_logger::WriteStyle::Always)
.init();
let install_mode = if args.contains(&"--container".to_string()) {
InstallMode::Container
@ -113,19 +109,17 @@ async fn main() -> std::io::Result<()> {
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone());
let cfg = match bootstrap.bootstrap() {
Ok(config) => {
info!("Bootstrap completed successfully, configuration loaded from database");
info!("Bootstrap completed successfully");
config
}
Err(e) => {
log::error!("Bootstrap failed: {}", e);
info!("Attempting to load configuration from database");
match diesel::Connection::establish(
&std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()),
) {
Ok(mut conn) => AppConfig::from_database(&mut conn),
Err(_) => {
info!("Database not available, using environment variables as fallback");
AppConfig::from_env()
}
}
@ -133,15 +127,11 @@ async fn main() -> std::io::Result<()> {
};
let _ = bootstrap.start_all();
// Upload template bots to MinIO on first startup
if let Err(e) = bootstrap.upload_templates_to_minio(&cfg).await {
log::warn!("Failed to upload templates to MinIO: {}", e);
}
let config = std::sync::Arc::new(cfg.clone());
info!("Establishing database connection to {}", cfg.database_url());
let db_pool = match diesel::Connection::establish(&cfg.database_url()) {
Ok(conn) => Arc::new(Mutex::new(conn)),
Err(e) => {
@ -154,8 +144,6 @@ async fn main() -> std::io::Result<()> {
};
let db_custom_pool = db_pool.clone();
info!("Initializing LLM server at {}", cfg.ai.endpoint);
ensure_llama_servers_running()
.await
.expect("Failed to initialize LLM local server");
@ -176,7 +164,6 @@ async fn main() -> std::io::Result<()> {
"empty".to_string(),
Some(cfg.ai.endpoint.clone()),
));
let web_adapter = Arc::new(WebChannelAdapter::new());
let voice_adapter = Arc::new(VoiceAdapter::new(
"https://livekit.example.com".to_string(),
@ -190,7 +177,6 @@ async fn main() -> std::io::Result<()> {
));
let tool_api = Arc::new(tools::ToolApi::new());
info!("Initializing drive at {}", cfg.minio.server);
let drive = init_drive(&config.minio)
.await
.expect("Failed to initialize Drive");
@ -199,13 +185,14 @@ async fn main() -> std::io::Result<()> {
diesel::Connection::establish(&cfg.database_url()).unwrap(),
redis_client.clone(),
)));
let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new(
diesel::Connection::establish(&cfg.database_url()).unwrap(),
redis_client.clone(),
)));
let app_state = Arc::new(AppState {
s3_client: Some(drive.clone()),
s3_operator: Some(drive.clone()),
config: Some(cfg.clone()),
conn: db_pool.clone(),
custom_conn: db_custom_pool.clone(),
@ -229,23 +216,17 @@ async fn main() -> std::io::Result<()> {
tool_api: tool_api.clone(),
});
info!(
"Starting HTTP server on {}:{}",
config.server.host, config.server.port
);
info!("Starting HTTP server on {}:{}", config.server.host, config.server.port);
let worker_count = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
// Spawn AutomationService in a LocalSet on a separate thread
let automation_state = app_state.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime for automation");
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
@ -267,8 +248,8 @@ async fn main() -> std::io::Result<()> {
.allow_any_method()
.allow_any_header()
.max_age(3600);
let app_state_clone = app_state.clone();
let app_state_clone = app_state.clone();
let mut app = App::new()
.wrap(cors)
.wrap(Logger::default())

View file

@ -6,28 +6,26 @@ use crate::session::SessionManager;
use crate::tools::{ToolApi, ToolManager};
use crate::whatsapp::WhatsAppAdapter;
use diesel::{Connection, PgConnection};
use opendal::Operator;
use redis::Client;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use crate::shared::models::BotResponse;
pub struct AppState {
pub s3_client: Option<aws_sdk_s3::Client>,
pub s3_operator: Option<Operator>,
pub config: Option<AppConfig>,
pub conn: Arc<Mutex<PgConnection>>,
pub custom_conn: Arc<Mutex<PgConnection>>,
pub redis_client: Option<Arc<Client>>,
pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>,
pub tool_manager: Arc<ToolManager>,
pub llm_provider: Arc<dyn LLMProvider>,
pub auth_service: Arc<tokio::sync::Mutex<AuthService>>,
pub channels: Arc<Mutex<HashMap<String, Arc<dyn ChannelAdapter>>>>,
pub response_channels: Arc<tokio::sync::Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
pub web_adapter: Arc<WebChannelAdapter>,
pub voice_adapter: Arc<VoiceAdapter>,
pub whatsapp_adapter: Arc<WhatsAppAdapter>,
@ -37,7 +35,7 @@ pub struct AppState {
impl Clone for AppState {
fn clone(&self) -> Self {
Self {
s3_client: self.s3_client.clone(),
s3_operator: self.s3_operator.clone(),
config: self.config.clone(),
conn: Arc::clone(&self.conn),
custom_conn: Arc::clone(&self.custom_conn),
@ -59,7 +57,7 @@ impl Clone for AppState {
impl Default for AppState {
fn default() -> Self {
Self {
s3_client: None,
s3_operator: None,
config: None,
conn: Arc::new(Mutex::new(
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),