Merge branch 'main' into main

This commit is contained in:
Rodrigo Rodriguez 2025-10-31 16:05:14 -03:00 committed by GitHub
commit ea765811a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2003 additions and 802 deletions

6
.vscode/launch.json vendored
View file

@ -16,6 +16,9 @@
}
},
"args": [],
"env": {
"RUST_LOG": "debug"
},
"cwd": "${workspaceFolder}"
},
{
@ -30,6 +33,9 @@
}
},
"args": [],
"env": {
"RUST_LOG": "trace"
},
"cwd": "${workspaceFolder}"
}
]

1073
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -53,6 +53,8 @@ anyhow = "1.0"
argon2 = "0.5"
async-stream = "0.3"
async-trait = "0.1"
aws-config = "1.8.8"
aws-sdk-s3 = { version = "1.109.0", features = ["behavior-version-latest"] }
base64 = "0.22"
bytes = "1.8"
chrono = { version = "0.4", features = ["serde"] }
@ -64,6 +66,7 @@ env_logger = "0.11"
futures = "0.3"
futures-util = "0.3"
headless_chrome = { version = "1.0.18", optional = true }
hmac = "0.12.1"
imap = { version = "3.0.0-alpha.15", optional = true }
include_dir = "0.7"
indicatif = "0.18.0"
@ -71,9 +74,9 @@ lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1",
livekit = "0.7"
log = "0.4"
mailparse = "0.15"
mockito = "1.7.0"
native-tls = "0.2"
num-format = "0.4"
opendal = { version = "0.54.1", features = ["services-s3"] }
pdf-extract = "0.10.0"
qdrant-client = { version = "1.12", optional = true }
rand = "0.9.2"

View file

@ -22,9 +22,9 @@ dirs=(
# "auth"
# "automation"
# "basic"
# "bot"
"bot"
"bootstrap"
"package_manager"
#"package_manager"
# "channels"
# "config"
# "context"

View file

@ -183,31 +183,6 @@ BEGIN
END IF;
END $$;
-- ============================================================================
-- DEFAULT SERVER CONFIGURATION
-- Insert default values that replace .env
-- ============================================================================
INSERT INTO server_configuration (id, config_key, config_value, config_type, description) VALUES
(gen_random_uuid()::text, 'SERVER_HOST', '127.0.0.1', 'string', 'Server bind address'),
(gen_random_uuid()::text, 'SERVER_PORT', '8080', 'integer', 'Server port'),
(gen_random_uuid()::text, 'TABLES_SERVER', 'localhost', 'string', 'PostgreSQL server address'),
(gen_random_uuid()::text, 'TABLES_PORT', '5432', 'integer', 'PostgreSQL port'),
(gen_random_uuid()::text, 'TABLES_DATABASE', 'botserver', 'string', 'PostgreSQL database name'),
(gen_random_uuid()::text, 'TABLES_USERNAME', 'botserver', 'string', 'PostgreSQL username'),
(gen_random_uuid()::text, 'DRIVE_SERVER', 'localhost:9000', 'string', 'MinIO server address'),
(gen_random_uuid()::text, 'DRIVE_USE_SSL', 'false', 'boolean', 'Use SSL for drive'),
(gen_random_uuid()::text, 'DRIVE_ORG_PREFIX', 'botserver', 'string', 'Drive organization prefix'),
(gen_random_uuid()::text, 'DRIVE_BUCKET', 'default', 'string', 'Default S3 bucket'),
(gen_random_uuid()::text, 'VECTORDB_URL', 'http://localhost:6333', 'string', 'Qdrant vector database URL'),
(gen_random_uuid()::text, 'CACHE_URL', 'redis://localhost:6379', 'string', 'Redis cache URL'),
(gen_random_uuid()::text, 'STACK_PATH', './botserver-stack', 'string', 'Base path for all components'),
(gen_random_uuid()::text, 'SITES_ROOT', './botserver-stack/sites', 'string', 'Root path for sites')
ON CONFLICT (config_key) DO NOTHING;
-- ============================================================================
-- DEFAULT TENANT
-- Create default tenant for single-tenant installations
-- ============================================================================
INSERT INTO tenants (id, name, slug, is_active) VALUES
(gen_random_uuid(), 'Default Tenant', 'default', true)
ON CONFLICT (slug) DO NOTHING;

View file

@ -1,11 +0,0 @@
-- Migration 6.0.6: Add LLM configuration defaults
-- Description: Configure local LLM server settings with model paths
-- Insert LLM configuration with defaults
INSERT INTO server_configuration (id, config_key, config_value, config_type, description) VALUES
(gen_random_uuid()::text, 'LLM_LOCAL', 'true', 'boolean', 'Enable local LLM server'),
(gen_random_uuid()::text, 'LLM_MODEL_PATH', 'botserver-stack/data/llm/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf', 'string', 'Path to LLM model file'),
(gen_random_uuid()::text, 'LLM_URL', 'http://localhost:8081', 'string', 'Local LLM server URL'),
(gen_random_uuid()::text, 'EMBEDDING_MODEL_PATH', 'botserver-stack/data/llm/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf', 'string', 'Path to embedding model file'),
(gen_random_uuid()::text, 'EMBEDDING_URL', 'http://localhost:8082', 'string', 'Embedding server URL')
ON CONFLICT (config_key) DO NOTHING;

View file

@ -1,11 +1,11 @@
use actix_web::{web, HttpResponse, Result};
use actix_web::{HttpRequest, HttpResponse, Result, web};
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
Argon2,
};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use log::{error, warn};
use log::{error};
use redis::Client;
use std::collections::HashMap;
use std::sync::Arc;
@ -148,6 +148,7 @@ impl AuthService {
#[actix_web::get("/api/auth")]
async fn auth_handler(
req: HttpRequest,
data: web::Data<AppState>,
web::Query(params): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse> {
@ -166,45 +167,10 @@ async fn auth_handler(
}
};
let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") {
match Uuid::parse_str(&bot_guid) {
Ok(uuid) => uuid,
Err(e) => {
warn!("Invalid BOT_GUID from env: {}", e);
return Ok(HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Invalid BOT_GUID"})));
}
}
} else {
// BOT_GUID not set, get first available bot from database
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
let mut db_conn = data.conn.lock().unwrap();
match bots
.filter(is_active.eq(true))
.select(id)
.first::<Uuid>(&mut *db_conn)
.optional()
{
Ok(Some(first_bot_id)) => {
log::info!(
"BOT_GUID not set, using first available bot: {}",
first_bot_id
);
first_bot_id
}
Ok(None) => {
error!("No active bots found in database");
return Ok(HttpResponse::ServiceUnavailable()
.json(serde_json::json!({"error": "No bots available"})));
}
Err(e) => {
error!("Failed to query bots: {}", e);
return Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Failed to query bots"})));
}
}
let mut db_conn = data.conn.lock().unwrap();
let (bot_id, bot_name) = match crate::bot::bot_from_url(&mut *db_conn, req.path()) {
Ok((id, name)) => (id, name),
Err(res) => return Ok(res),
};
let session = {
@ -224,35 +190,40 @@ async fn auth_handler(
}
};
let session_id_clone = session.id.clone();
let auth_script_path = "./templates/annoucements.gbai/annoucements.gbdialog/auth.bas";
let auth_script = match std::fs::read_to_string(auth_script_path) {
Ok(content) => content,
Err(_) => r#"SET_USER "00000000-0000-0000-0000-000000000001""#.to_string(),
};
let script_service = crate::basic::ScriptService::new(Arc::clone(&data), session.clone());
match script_service
.compile(&auth_script)
.and_then(|ast| script_service.run(&ast))
{
Ok(result) => {
if result.to_string() == "false" {
error!("Auth script returned false, authentication failed");
return Ok(HttpResponse::Unauthorized()
.json(serde_json::json!({"error": "Authentication failed"})));
let auth_script_path = format!("./work/{}.gbai/{}.gbdialog/auth.ast", bot_name, bot_name);
if std::path::Path::new(&auth_script_path).exists() {
let auth_script = match std::fs::read_to_string(&auth_script_path) {
Ok(content) => content,
Err(e) => {
error!("Failed to read auth script: {}", e);
return Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Failed to read auth script"})));
}
};
let script_service = crate::basic::ScriptService::new(Arc::clone(&data), session.clone());
match script_service
.compile(&auth_script)
.and_then(|ast| script_service.run(&ast))
{
Ok(result) => {
if result.to_string() == "false" {
error!("Auth script returned false, authentication failed");
return Ok(HttpResponse::Unauthorized()
.json(serde_json::json!({"error": "Authentication failed"})));
}
}
Err(e) => {
error!("Failed to run auth script: {}", e);
return Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Auth failed"})));
}
}
Err(e) => {
error!("Failed to run auth script: {}", e);
return Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Auth failed"})));
}
}
let session = {
let mut sm = data.session_manager.lock().await;
match sm.get_session_by_id(session_id_clone) {
match sm.get_session_by_id(session.id) {
Ok(Some(s)) => s,
Ok(None) => {
error!("Failed to retrieve session");

View file

@ -331,7 +331,7 @@ impl AutomationService {
e
);
if let Some(s3_operator) = &self.state.s3_operator {
if let Some(client) = &self.state.s3_client {
let bucket_name = format!(
"{}{}.gbai",
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
@ -341,10 +341,9 @@ impl AutomationService {
trace!("Downloading from bucket={} key={}", bucket_name, s3_key);
match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await {
match crate::kb::minio_handler::get_file_content(client, &bucket_name, &s3_key).await {
Ok(data) => {
let bytes: Vec<u8> = data.to_vec();
match String::from_utf8(bytes) {
match String::from_utf8(data) {
Ok(content) => {
info!("Downloaded script '{}' from MinIO", param);

View file

@ -2,6 +2,7 @@ use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::{debug, error, info};
use reqwest::{self, Client};
use crate::kb::minio_handler;
use rhai::{Dynamic, Engine};
use std::error::Error;
use std::path::Path;
@ -158,13 +159,7 @@ pub async fn get_from_bucket(
return Err("Invalid file path".into());
}
let s3_operator = match &state.s3_operator {
Some(operator) => operator,
None => {
error!("S3 operator not configured");
return Err("S3 operator not configured".into());
}
};
let client = state.s3_client.as_ref().ok_or("S3 client not configured")?;
let bucket_name = {
let cfg = state
@ -187,11 +182,11 @@ pub async fn get_from_bucket(
bucket
};
let response = match tokio::time::timeout(
Duration::from_secs(30),
s3_operator.read(&format!("{}/{}", bucket_name, file_path))
let bytes = match tokio::time::timeout(
Duration::from_secs(30),
minio_handler::get_file_content(client, &bucket_name, file_path)
).await {
Ok(Ok(response)) => response,
Ok(Ok(data)) => data,
Ok(Err(e)) => {
error!("S3 read failed: {}", e);
return Err(format!("S3 operation failed: {}", e).into());
@ -202,15 +197,7 @@ pub async fn get_from_bucket(
}
};
let bytes = response.to_vec();
debug!(
"Retrieved {} bytes from S3 for key: {}",
bytes.len(),
file_path
);
let content = if file_path.to_ascii_lowercase().ends_with(".pdf") {
debug!("Processing as PDF file: {}", file_path);
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => text,
Err(e) => {

View file

@ -1,20 +1,25 @@
use crate::config::AppConfig;
use crate::package_manager::{InstallMode, PackageManager};
use anyhow::Result;
use diesel::connection::SimpleConnection;
use diesel::RunQueryDsl;
use diesel::{Connection, QueryableByName};
use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName};
use dotenvy::dotenv;
use log::{debug, error, info, trace};
use opendal::Operator;
use aws_sdk_s3::Client;
use aws_config::BehaviorVersion;
use rand::distr::Alphanumeric;
use rand::Rng;
use sha2::{Digest, Sha256};
use std::io::{self, Write};
use std::path::Path;
use std::process::Command;
use std::sync::{Arc, Mutex};
use diesel::Queryable;
#[derive(QueryableByName)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[derive(Queryable)]
#[diesel(table_name = crate::shared::models::schema::bots)]
struct BotIdRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: uuid::Uuid,
@ -28,21 +33,21 @@ pub struct ComponentInfo {
pub struct BootstrapManager {
pub install_mode: InstallMode,
pub tenant: Option<String>,
pub s3_operator: Operator,
pub s3_client: Client,
}
impl BootstrapManager {
pub fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
pub async fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
info!(
"Initializing BootstrapManager with mode {:?} and tenant {:?}",
install_mode, tenant
);
let config = AppConfig::from_env();
let s3_operator = Self::create_s3_operator(&config);
let s3_client = futures::executor::block_on(Self::create_s3_operator(&config));
Self {
install_mode,
tenant,
s3_operator,
s3_client,
}
}
@ -140,8 +145,8 @@ impl BootstrapManager {
let mut conn = diesel::pg::PgConnection::establish(&database_url)
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
let default_bot_id: uuid::Uuid = diesel::sql_query("SELECT id FROM bots LIMIT 1")
.get_result::<BotIdRow>(&mut conn)
.map(|row| row.id)
.load::<BotIdRow>(&mut conn)
.map(|rows| rows.first().map(|r| r.id).unwrap_or_else(|| uuid::Uuid::new_v4()))
.unwrap_or_else(|_| uuid::Uuid::new_v4());
if let Err(e) = self.update_bot_config(&default_bot_id, component.name) {
@ -156,7 +161,8 @@ impl BootstrapManager {
Ok(())
}
pub fn bootstrap(&mut self) -> Result<AppConfig> {
pub async fn bootstrap(&mut self) -> Result<AppConfig> {
// First check for legacy mode
if let Ok(tables_server) = std::env::var("TABLES_SERVER") {
if !tables_server.is_empty() {
info!(
@ -164,7 +170,7 @@ impl BootstrapManager {
);
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
let username =
std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "postgres".to_string());
std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "gbuser".to_string());
let password =
std::env::var("TABLES_PASSWORD").unwrap_or_else(|_| "postgres".to_string());
let server =
@ -178,6 +184,11 @@ impl BootstrapManager {
)
});
// In legacy mode, still try to load config.csv if available
if let Ok(config) = self.load_config_from_csv().await {
return Ok(config);
}
match diesel::PgConnection::establish(&database_url) {
Ok(mut conn) => {
if let Err(e) = self.apply_migrations(&mut conn) {
@ -292,47 +303,50 @@ impl BootstrapManager {
}
}
self.s3_operator = Self::create_s3_operator(&config);
let default_bucket_path = Path::new("templates/default.gbai/default.gbot/config.csv");
if default_bucket_path.exists() {
info!("Found initial config.csv, uploading to default.gbai/default.gbot");
let operator = &self.s3_operator;
futures::executor::block_on(async {
let content = std::fs::read(default_bucket_path).expect("Failed to read config.csv");
operator.write("default.gbai/default.gbot/config.csv", content).await
})?;
debug!("Initial config.csv uploaded successfully");
self.s3_client = futures::executor::block_on(Self::create_s3_operator(&config));
// Load config from CSV if available
if let Ok(csv_config) = self.load_config_from_csv().await {
Ok(csv_config)
} else {
Ok(config)
}
Ok(config)
}
fn create_s3_operator(config: &AppConfig) -> Operator {
use opendal::Scheme;
use std::collections::HashMap;
let mut endpoint = config.drive.server.clone();
if !endpoint.ends_with('/') {
endpoint.push('/');
}
let mut map = HashMap::new();
map.insert("endpoint".to_string(), endpoint);
map.insert("access_key_id".to_string(), config.drive.access_key.clone());
map.insert(
"secret_access_key".to_string(),
config.drive.secret_key.clone(),
);
map.insert(
"bucket".to_string(),
format!("default.gbai"),
);
map.insert("region".to_string(), "auto".to_string());
map.insert("force_path_style".to_string(), "true".to_string());
trace!("Creating S3 operator with endpoint {}", config.drive.server);
async fn create_s3_operator(config: &AppConfig) -> Client {
let endpoint = if !config.drive.server.ends_with('/') {
format!("{}/", config.drive.server)
} else {
config.drive.server.clone()
};
Operator::via_iter(Scheme::S3, map).expect("Failed to initialize S3 operator")
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.drive.access_key.clone(),
config.drive.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = aws_sdk_s3::config::Builder::from(&base_config)
.force_path_style(true)
.build();
aws_sdk_s3::Client::from_conf(s3_config)
}
fn generate_secure_password(&self, length: usize) -> String {
let mut rng = rand::rng();
std::iter::repeat_with(|| rng.sample(Alphanumeric) as char)
@ -381,7 +395,7 @@ impl BootstrapManager {
if !templates_dir.exists() {
return Ok(());
}
let operator = &self.s3_operator;
let client = &self.s3_client;
for entry in std::fs::read_dir(templates_dir)? {
let entry = entry?;
let path = entry.path();
@ -395,27 +409,30 @@ impl BootstrapManager {
let bot_name = path.file_name().unwrap().to_string_lossy().to_string();
let bucket = bot_name.trim_start_matches('/').to_string();
info!("Uploading template {} to Drive bucket {}", bot_name, bucket);
if operator.stat(&bucket).await.is_err() {
info!("Bucket {} not found, creating it", bucket);
let bucket_path = if bucket.ends_with('/') { bucket.clone() } else { format!("{}/", bucket) };
match operator.create_dir(&bucket_path).await {
Ok(_) => {
debug!("Bucket {} created successfully", bucket);
}
Err(e) => {
let err_msg = format!("{}", e);
if err_msg.contains("BucketAlreadyOwnedByYou") {
log::warn!("Bucket {} already exists, reusing default.gbai", bucket);
self.upload_directory_recursive(&operator, &Path::new("templates/default.gbai"), "default.gbai").await?;
continue;
} else {
return Err(e.into());
}
}
}
}
self.upload_directory_recursive(&operator, &path, &bucket).await?;
info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
// Check if bucket exists
if client.head_bucket().bucket(&bucket).send().await.is_err() {
info!("Bucket {} not found, creating it", bucket);
match client.create_bucket()
.bucket(&bucket)
.send()
.await {
Ok(_) => {
debug!("Bucket {} created successfully", bucket);
}
Err(e) => {
error!("Failed to create bucket {}: {:?}", bucket, e);
return Err(anyhow::anyhow!(
"Failed to create bucket {}: {}. Check S3 credentials and endpoint configuration",
bucket, e
));
}
}
}
self.upload_directory_recursive(client, &path, &bucket, "/")
.await?;
info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
}
}
Ok(())
@ -436,22 +453,9 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) {
let bot_folder = path.file_name().unwrap().to_string_lossy().to_string();
let bot_name = bot_folder.trim_end_matches(".gbai");
let formatted_name = bot_name
.split('_')
.map(|word| {
let mut chars = word.chars();
match chars.next() {
None => String::new(),
Some(first) => {
first.to_uppercase().collect::<String>() + chars.as_str()
}
}
})
.collect::<Vec<_>>()
.join(" ");
let existing: Option<String> = bots::table
.filter(bots::name.eq(&formatted_name))
.filter(bots::name.eq(&bot_name))
.select(bots::name)
.first(conn)
.optional()?;
@ -461,11 +465,11 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
"INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \
VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)"
)
.bind::<diesel::sql_types::Text, _>(&formatted_name)
.bind::<diesel::sql_types::Text, _>(&bot_name)
.bind::<diesel::sql_types::Text, _>(format!("Bot for {} template", bot_name))
.execute(conn)?;
} else {
log::trace!("Bot {} already exists", formatted_name);
log::trace!("Bot {} already exists", bot_name);
}
}
}
@ -475,8 +479,9 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
fn upload_directory_recursive<'a>(
&'a self,
client: &'a Operator,
client: &'a Client,
local_path: &'a Path,
bucket: &'a str,
prefix: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> {
Box::pin(async move {
@ -490,26 +495,79 @@ info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
let entry = entry?;
let path = entry.path();
let file_name = path.file_name().unwrap().to_string_lossy().to_string();
let key = if prefix.is_empty() {
file_name.clone()
} else {
format!("{}/{}", prefix.trim_end_matches('/'), file_name)
};
// Construct key path, ensuring no duplicate slashes
let mut key = prefix.trim_matches('/').to_string();
if !key.is_empty() {
key.push('/');
}
key.push_str(&file_name);
if path.is_file() {
info!("Uploading file: {} with key: {}", path.display(), key);
info!("Uploading file: {} to bucket {} with key: {}",
path.display(), bucket, key);
let content = std::fs::read(&path)?;
trace!("Writing file {} with key {}", path.display(), key);
client.write(&key, content).await?;
trace!("Successfully wrote file {}", path.display());
client.put_object()
.bucket(bucket)
.key(&key)
.body(content.into())
.send()
.await?;
} else if path.is_dir() {
self.upload_directory_recursive(client, &path, &key).await?;
self.upload_directory_recursive(client, &path, bucket, &key).await?;
}
}
Ok(())
})
}
async fn load_config_from_csv(&self) -> Result<AppConfig> {
use crate::config::ConfigManager;
use uuid::Uuid;
let client = &self.s3_client;
let bucket = "default.gbai";
let config_key = "default.gbot/config.csv";
match client.get_object()
.bucket(bucket)
.key(config_key)
.send()
.await
{
Ok(response) => {
let bytes = response.body.collect().await?.into_bytes();
let csv_content = String::from_utf8(bytes.to_vec())?;
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
// Create new connection for config loading
let config_conn = diesel::PgConnection::establish(&database_url)?;
let config_manager = ConfigManager::new(Arc::new(Mutex::new(config_conn)));
// Use default bot ID or create one if needed
let default_bot_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000")?;
// Write CSV to temp file for ConfigManager
let temp_path = std::env::temp_dir().join("config.csv");
std::fs::write(&temp_path, csv_content)?;
config_manager.sync_gbot_config(&default_bot_id, temp_path.to_str().unwrap())
.map_err(|e| anyhow::anyhow!("Failed to sync gbot config: {}", e))?;
// Load config from database which now has the CSV values
let mut config_conn = diesel::PgConnection::establish(&database_url)?;
let config = AppConfig::from_database(&mut config_conn);
info!("Successfully loaded config from CSV");
Ok(config)
}
Err(e) => {
debug!("No config.csv found: {}", e);
Err(e.into())
}
}
}
fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> {
let migrations_dir = std::path::Path::new("migrations");
if !migrations_dir.exists() {

View file

@ -3,6 +3,7 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession};
use crate::shared::state::AppState;
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage;
use diesel::PgConnection;
use log::{debug, error, info, warn};
use chrono::Utc;
use serde_json;
@ -11,18 +12,140 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use crate::kb::embeddings::generate_embeddings;
use uuid::Uuid;
use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint};
use crate::context::langcache::{get_langcache_client};
use crate::context::langcache::get_langcache_client;
use crate::drive_monitor::DriveMonitor;
use tokio::sync::Mutex as AsyncMutex;
pub struct BotOrchestrator {
pub state: Arc<AppState>,
pub mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
}
impl BotOrchestrator {
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
Self {
state,
mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())),
}
}
pub async fn mount_all_bots(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
let mut db_conn = self.state.conn.lock().unwrap();
let active_bots = bots
.filter(is_active.eq(true))
.select(id)
.load::<uuid::Uuid>(&mut *db_conn)
.map_err(|e| {
error!("Failed to query active bots: {}", e);
e
})?;
for bot_guid in active_bots {
let state_clone = self.state.clone();
let mounted_bots_clone = self.mounted_bots.clone();
let bot_guid_str = bot_guid.to_string();
tokio::spawn(async move {
if let Err(e) = Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone()).await {
error!("Failed to mount bot {}: {}", bot_guid_str, e);
}
});
}
Ok(())
}
async fn mount_bot_task(
state: Arc<AppState>,
mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
bot_guid: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use diesel::prelude::*;
use crate::shared::models::schema::bots::dsl::*;
let bot_name: String = {
let mut db_conn = state.conn.lock().unwrap();
bots
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
.select(name)
.first(&mut *db_conn)
.map_err(|e| {
error!("Failed to query bot name for {}: {}", bot_guid, e);
e
})?
};
let bucket_name = format!("{}.gbai", bot_name);
{
let mounted = mounted_bots.lock().await;
if mounted.contains_key(&bot_guid) {
warn!("Bot {} is already mounted", bot_guid);
return Ok(());
}
}
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name));
let _handle = drive_monitor.clone().spawn().await;
{
let mut mounted = mounted_bots.lock().await;
mounted.insert(bot_guid.clone(), drive_monitor);
}
info!("Bot {} mounted successfully", bot_guid);
Ok(())
}
pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid);
crate::create_bucket::create_bucket(&bucket_name)?;
Ok(())
}
pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid).to_string();
use diesel::prelude::*;
use crate::shared::models::schema::bots::dsl::*;
let bot_name: String = {
let mut db_conn = self.state.conn.lock().unwrap();
bots
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
.select(name)
.first(&mut *db_conn)
.map_err(|e| {
error!("Failed to query bot name for {}: {}", bot_guid, e);
e
})?
};
let bucket_name = format!("{}.gbai", bot_name);
{
let mounted_bots = self.mounted_bots.lock().await;
if mounted_bots.contains_key(&bot_guid) {
warn!("Bot {} is already mounted", bot_guid);
return Ok(());
}
}
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name));
let _handle = drive_monitor.clone().spawn().await;
{
let mut mounted_bots = self.mounted_bots.lock().await;
mounted_bots.insert(bot_guid.clone(), drive_monitor);
}
Ok(())
}
pub async fn handle_user_input(
@ -30,10 +153,7 @@ impl BotOrchestrator {
session_id: Uuid,
user_input: &str,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
info!(
"Handling user input for session {}: '{}'",
session_id, user_input
);
info!("Handling user input for session {}: '{}'", session_id, user_input);
let mut session_manager = self.state.session_manager.lock().await;
session_manager.provide_input(session_id, user_input.to_string())?;
Ok(None)
@ -74,10 +194,7 @@ impl BotOrchestrator {
bot_id: &str,
mode: i32,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Setting answer mode for user {} with bot {} to mode {}",
user_id, bot_id, mode
);
info!("Setting answer mode for user {} with bot {} to mode {}", user_id, bot_id, mode);
let mut session_manager = self.state.session_manager.lock().await;
session_manager.update_answer_mode(user_id, bot_id, mode)?;
Ok(())
@ -92,10 +209,7 @@ impl BotOrchestrator {
event_type: &str,
data: serde_json::Value,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Sending event '{}' to session {} on channel {}",
event_type, session_id, channel
);
info!("Sending event '{}' to session {} on channel {}", event_type, session_id, channel);
let event_response = BotResponse {
bot_id: bot_id.to_string(),
user_id: user_id.to_string(),
@ -113,8 +227,9 @@ impl BotOrchestrator {
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
adapter.send_message(event_response).await?;
} else {
warn!("No channel adapter found for channel 1: {}", channel);
warn!("No channel adapter found for channel: {}", channel);
}
Ok(())
}
@ -124,10 +239,7 @@ impl BotOrchestrator {
channel: &str,
content: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Sending direct message to session {}: '{}'",
session_id, content
);
info!("Sending direct message to session {}: '{}'", session_id, content);
let bot_response = BotResponse {
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
@ -142,8 +254,9 @@ impl BotOrchestrator {
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
adapter.send_message(bot_response).await?;
} else {
warn!("No channel adapter found for channel 2: {}", channel);
warn!("No channel adapter found for direct message on channel: {}", channel);
}
Ok(())
}
@ -151,60 +264,35 @@ impl BotOrchestrator {
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Processing message from channel: {}, user: {}, session: {}",
message.channel, message.user_id, message.session_id
);
debug!(
"Message content: '{}', type: {}",
message.content, message.message_type
);
info!("Processing message from channel: {}, user: {}, session: {}", message.channel, message.user_id, message.session_id);
debug!("Message content: '{}', type: {}", message.content, message.message_type);
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
error!("Invalid user ID provided: {}", e);
e
})?;
let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") {
Uuid::parse_str(&bot_guid).map_err(|e| {
warn!("Invalid BOT_GUID from env: {}", e);
e
})?
} else {
warn!("BOT_GUID not set in environment, using nil UUID");
Uuid::nil()
};
let bot_id = Uuid::nil();
let session = {
let mut sm = self.state.session_manager.lock().await;
let session_id = Uuid::parse_str(&message.session_id).map_err(|e| {
error!("Invalid session ID: {}", e);
e
})?;
match sm.get_session_by_id(session_id)? {
Some(session) => session,
None => {
error!(
"Failed to create session for user {} with bot {}",
user_id, bot_id
);
error!("Failed to create session for user {} with bot {}", user_id, bot_id);
return Err("Failed to create session".into());
}
}
};
if self.is_waiting_for_input(session.id).await {
debug!(
"Session {} is waiting for input, processing as variable input",
session.id
);
if let Some(variable_name) =
self.handle_user_input(session.id, &message.content).await?
{
debug!(
"Stored user input in variable '{}' for session {}",
variable_name, session.id
);
debug!("Session {} is waiting for input, processing as variable input", session.id);
if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? {
info!("Stored user input in variable '{}' for session {}", variable_name, session.id);
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
let ack_response = BotResponse {
bot_id: message.bot_id.clone(),
@ -263,10 +351,7 @@ impl BotOrchestrator {
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
adapter.send_message(bot_response).await?;
} else {
warn!(
"No channel adapter found for channel 3: {}",
message.channel
);
warn!("No channel adapter found for message channel: {}", message.channel);
}
Ok(())
@ -298,7 +383,6 @@ impl BotOrchestrator {
session_manager.get_conversation_history(session.id, session.user_id)?
};
// Prompt compactor: keep only last 10 entries
let recent_history = if history.len() > 10 {
&history[history.len() - 10..]
} else {
@ -308,31 +392,23 @@ impl BotOrchestrator {
for (role, content) in recent_history {
prompt.push_str(&format!("{}: {}\n", role, content));
}
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
// Determine which cache backend to use
let use_langcache = std::env::var("LLM_CACHE")
.unwrap_or_else(|_| "false".to_string())
.eq_ignore_ascii_case("true");
if use_langcache {
// Ensure LangCache collection exists
ensure_collection_exists(&self.state, "semantic_cache").await?;
// Get LangCache client
let langcache_client = get_langcache_client()?;
// Isolate the user question (ignore conversation history)
let isolated_question = message.content.trim().to_string();
// Generate embedding for the isolated question
let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?;
let question_embedding = question_embeddings
.get(0)
.ok_or_else(|| "Failed to generate embedding for question")?
.clone();
// Search for similar question in LangCache
let search_results = langcache_client
.search("semantic_cache", question_embedding.clone(), 1)
.await?;
@ -344,13 +420,11 @@ impl BotOrchestrator {
}
}
// Generate response via LLM provider using full prompt (including history)
let response = self.state
.llm_provider
.generate(&prompt, &serde_json::Value::Null)
.await?;
// Store isolated question and response in LangCache
let point = QdrantPoint {
id: uuid::Uuid::new_v4().to_string(),
vector: question_embedding,
@ -360,26 +434,21 @@ impl BotOrchestrator {
"response": response
}),
};
langcache_client
.upsert_points("semantic_cache", vec![point])
.await?;
Ok(response)
} else {
// Ensure semantic cache collection exists
ensure_collection_exists(&self.state, "semantic_cache").await?;
// Get Qdrant client
let qdrant_client = get_qdrant_client(&self.state)?;
// Generate embedding for the prompt
let embeddings = generate_embeddings(vec![prompt.clone()]).await?;
let embedding = embeddings
.get(0)
.ok_or_else(|| "Failed to generate embedding")?
.clone();
// Search for similar prompt in Qdrant
let search_results = qdrant_client
.search("semantic_cache", embedding.clone(), 1)
.await?;
@ -392,13 +461,11 @@ impl BotOrchestrator {
}
}
// Generate response via LLM provider
let response = self.state
.llm_provider
.generate(&prompt, &serde_json::Value::Null)
.await?;
// Store prompt and response in Qdrant
let point = QdrantPoint {
id: uuid::Uuid::new_v4().to_string(),
vector: embedding,
@ -407,14 +474,13 @@ impl BotOrchestrator {
"response": response
}),
};
qdrant_client
.upsert_points("semantic_cache", vec![point])
.await?;
Ok(response)
}
}
pub async fn stream_response(
@ -422,10 +488,7 @@ impl BotOrchestrator {
message: UserMessage,
response_tx: mpsc::Sender<BotResponse>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Streaming response for user: {}, session: {}",
message.user_id, message.session_id
);
info!("Streaming response for user: {}, session: {}", message.user_id, message.session_id);
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
error!("Invalid user ID: {}", e);
@ -448,6 +511,7 @@ impl BotOrchestrator {
error!("Invalid session ID: {}", e);
e
})?;
match sm.get_session_by_id(session_id)? {
Some(sess) => sess,
None => {
@ -500,12 +564,9 @@ impl BotOrchestrator {
for (role, content) in &history {
p.push_str(&format!("{}: {}\n", role, content));
}
p.push_str(&format!("User: {}\nAssistant:", message.content));
debug!(
"Stream prompt constructed with {} history entries",
history.len()
);
p.push_str(&format!("User: {}\nAssistant:", message.content));
info!("Stream prompt constructed with {} history entries", history.len());
p
};
@ -556,22 +617,20 @@ impl BotOrchestrator {
if !first_word_received && !chunk.trim().is_empty() {
first_word_received = true;
debug!("First word received in stream: '{}'", chunk);
}
analysis_buffer.push_str(&chunk);
if analysis_buffer.contains("**") && !in_analysis {
in_analysis = true;
}
if in_analysis {
if analysis_buffer.ends_with("final") {
debug!(
"Analysis section completed, buffer length: {}",
analysis_buffer.len()
);
info!("Analysis section completed, buffer length: {}", analysis_buffer.len());
in_analysis = false;
analysis_buffer.clear();
if message.channel == "web" {
let orchestrator = BotOrchestrator::new(Arc::clone(&self.state));
orchestrator
@ -595,6 +654,7 @@ impl BotOrchestrator {
}
full_response.push_str(&chunk);
let partial = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
@ -612,10 +672,7 @@ impl BotOrchestrator {
}
}
debug!(
"Stream processing completed, {} chunks processed",
chunk_count
);
info!("Stream processing completed, {} chunks processed", chunk_count);
{
let mut sm = self.state.session_manager.lock().await;
@ -632,8 +689,8 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
};
response_tx.send(final_msg).await?;
response_tx.send(final_msg).await?;
Ok(())
}
@ -651,10 +708,7 @@ impl BotOrchestrator {
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
info!(
"Getting conversation history for session {} user {}",
session_id, user_id
);
info!("Getting conversation history for session {} user {}", session_id, user_id);
let mut session_manager = self.state.session_manager.lock().await;
let history = session_manager.get_conversation_history(session_id, user_id)?;
Ok(history)
@ -665,12 +719,11 @@ impl BotOrchestrator {
state: Arc<AppState>,
token: Option<String>,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
info!(
"Running start script for session: {} with token: {:?}",
session.id, token
);
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
info!("Running start script for session: {} with token: {:?}", session.id, token);
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
let start_script = match std::fs::read_to_string(&start_script_path) {
Ok(content) => content,
Err(_) => {
@ -678,10 +731,8 @@ impl BotOrchestrator {
return Ok(true);
}
};
debug!(
"Start script content for session {}: {}",
session.id, start_script
);
info!("Start script content for session {}: {}", session.id, start_script);
let session_clone = session.clone();
let state_clone = state.clone();
@ -694,17 +745,11 @@ impl BotOrchestrator {
.and_then(|ast| script_service.run(&ast))
{
Ok(result) => {
info!(
"Start script executed successfully for session {}, result: {}",
session_clone.id, result
);
info!("Start script executed successfully for session {}, result: {}", session_clone.id, result);
Ok(true)
}
Err(e) => {
error!(
"Failed to run start script for session {}: {}",
session_clone.id, e
);
error!("Failed to run start script for session {}: {}", session_clone.id, e);
Ok(false)
}
}
@ -716,10 +761,8 @@ impl BotOrchestrator {
channel: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
warn!(
"Sending warning to session {} on channel {}: {}",
session_id, channel, message
);
warn!("Sending warning to session {} on channel {}: {}", session_id, channel, message);
if channel == "web" {
self.send_event(
"system",
@ -747,10 +790,7 @@ impl BotOrchestrator {
};
adapter.send_message(warn_response).await
} else {
warn!(
"No channel adapter found for warning on channel: {}",
channel
);
warn!("No channel adapter found for warning on channel: {}", channel);
Ok(())
}
}
@ -763,10 +803,8 @@ impl BotOrchestrator {
_bot_id: &str,
token: Option<String>,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
info!(
"Triggering auto welcome for user: {}, session: {}, token: {:?}",
user_id, session_id, token
);
info!("Triggering auto welcome for user: {}, session: {}, token: {:?}", user_id, session_id, token);
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
error!("Invalid session ID: {}", e);
e
@ -784,22 +822,53 @@ impl BotOrchestrator {
};
let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?;
info!(
"Auto welcome completed for session: {} with result: {}",
session_id, result
);
info!("Auto welcome completed for session: {} with result: {}", session_id, result);
Ok(result)
}
}
async fn get_web_response_channel(
&self,
session_id: &str,
) -> Result<mpsc::Sender<BotResponse>, Box<dyn std::error::Error + Send + Sync>> {
let response_channels = self.state.response_channels.lock().await;
if let Some(tx) = response_channels.get(session_id) {
Ok(tx.clone())
} else {
Err("No response channel found for session".into())
pub fn bot_from_url(
db_conn: &mut PgConnection,
path: &str
) -> Result<(Uuid, String), HttpResponse> {
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
// Extract bot name from first path segment
if let Some(bot_name) = path.split('/').nth(1).filter(|s| !s.is_empty()) {
match bots
.filter(name.eq(bot_name))
.filter(is_active.eq(true))
.select((id, name))
.first::<(Uuid, String)>(db_conn)
.optional()
{
Ok(Some((bot_id, bot_name))) => return Ok((bot_id, bot_name)),
Ok(None) => warn!("No active bot found with name: {}", bot_name),
Err(e) => error!("Failed to query bot by name: {}", e),
}
}
// Fall back to first available bot
match bots
.filter(is_active.eq(true))
.select((id, name))
.first::<(Uuid, String)>(db_conn)
.optional()
{
Ok(Some((first_bot_id, first_bot_name))) => {
log::info!("Using first available bot: {} ({})", first_bot_id, first_bot_name);
Ok((first_bot_id, first_bot_name))
}
Ok(None) => {
error!("No active bots found in database");
Err(HttpResponse::ServiceUnavailable()
.json(serde_json::json!({"error": "No bots available"})))
}
Err(e) => {
error!("Failed to query bots: {}", e);
Err(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Failed to query bots"})))
}
}
}
@ -808,6 +877,7 @@ impl Default for BotOrchestrator {
fn default() -> Self {
Self {
state: Arc::new(AppState::default()),
mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())),
}
}
}
@ -826,7 +896,6 @@ async fn websocket_handler(
.unwrap_or_else(|| Uuid::new_v4().to_string())
.replace("undefined", &Uuid::new_v4().to_string());
// Ensure user exists in database before proceeding
let user_id = {
let user_uuid = Uuid::parse_str(&user_id_string).unwrap_or_else(|_| Uuid::new_v4());
let mut sm = data.session_manager.lock().await;
@ -841,8 +910,8 @@ async fn websocket_handler(
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
let (tx, mut rx) = mpsc::channel::<BotResponse>(100);
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
orchestrator
.register_response_channel(session_id.clone(), tx.clone())
.await;
@ -855,7 +924,6 @@ async fn websocket_handler(
.add_connection(session_id.clone(), tx.clone())
.await;
// Get first available bot from database
let bot_id = {
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
@ -897,16 +965,13 @@ async fn websocket_handler(
.await
.ok();
info!(
"WebSocket connection established for session: {}, user: {}",
session_id, user_id
);
info!("WebSocket connection established for session: {}, user: {}", session_id, user_id);
// Trigger auto welcome (start.bas)
let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data));
let user_id_welcome = user_id.clone();
let session_id_welcome = session_id.clone();
let bot_id_welcome = bot_id.clone();
actix_web::rt::spawn(async move {
if let Err(e) = orchestrator_clone
.trigger_auto_welcome(&session_id_welcome, &user_id_welcome, &bot_id_welcome, None)
@ -922,10 +987,7 @@ async fn websocket_handler(
let user_id_clone = user_id.clone();
actix_web::rt::spawn(async move {
info!(
"Starting WebSocket sender for session {}",
session_id_clone1
);
info!("Starting WebSocket sender for session {}", session_id_clone1);
let mut message_count = 0;
while let Some(msg) = rx.recv().await {
message_count += 1;
@ -936,23 +998,17 @@ async fn websocket_handler(
}
}
}
info!(
"WebSocket sender terminated for session {}, sent {} messages",
session_id_clone1, message_count
);
info!("WebSocket sender terminated for session {}, sent {} messages", session_id_clone1, message_count);
});
actix_web::rt::spawn(async move {
info!(
"Starting WebSocket receiver for session {}",
session_id_clone2
);
info!("Starting WebSocket receiver for session {}", session_id_clone2);
let mut message_count = 0;
while let Some(Ok(msg)) = msg_stream.recv().await {
match msg {
WsMessage::Text(text) => {
message_count += 1;
// Get first available bot from database
let bot_id = {
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
@ -976,42 +1032,37 @@ async fn websocket_handler(
}
};
// Parse the text as JSON to extract the content field
let json_value: serde_json::Value = match serde_json::from_str(&text) {
Ok(value) => value,
Err(e) => {
error!("Error parsing JSON message {}: {}", message_count, e);
continue; // Skip processing this message
continue;
}
};
// Extract content from JSON, fallback to original text if content field doesn't exist
let content = json_value["content"]
.as_str()
.map(|s| s.to_string())
.unwrap();
let user_message = UserMessage {
bot_id: bot_id,
bot_id,
user_id: user_id_clone.clone(),
session_id: session_id_clone2.clone(),
channel: "web".to_string(),
content: content,
content,
message_type: 1,
media_url: None,
timestamp: Utc::now(),
};
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
error!(
"Error processing WebSocket message {}: {}",
message_count, e
);
error!("Error processing WebSocket message {}: {}", message_count, e);
}
}
WsMessage::Close(_) => {
// Get first available bot from database
WsMessage::Close(reason) => {
debug!("WebSocket closing for session {} - reason: {:?}", session_id_clone2, reason);
let bot_id = {
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
@ -1034,7 +1085,9 @@ async fn websocket_handler(
}
}
};
orchestrator
debug!("Sending session_end event for {}", session_id_clone2);
if let Err(e) = orchestrator
.send_event(
&user_id_clone,
&bot_id,
@ -1044,26 +1097,28 @@ async fn websocket_handler(
serde_json::json!({}),
)
.await
.ok();
{
error!("Failed to send session_end event: {}", e);
}
debug!("Removing WebSocket connection for {}", session_id_clone2);
web_adapter.remove_connection(&session_id_clone2).await;
debug!("Unregistering response channel for {}", session_id_clone2);
orchestrator
.unregister_response_channel(&session_id_clone2)
.await;
info!("WebSocket fully closed for session {}", session_id_clone2);
break;
}
_ => {}
}
}
info!(
"WebSocket receiver terminated for session {}, processed {} messages",
session_id_clone2, message_count
);
info!("WebSocket receiver terminated for session {}, processed {} messages", session_id_clone2, message_count);
});
info!(
"WebSocket handler setup completed for session {}",
session_id
);
info!("WebSocket handler setup completed for session {}", session_id);
Ok(res)
}
@ -1076,6 +1131,7 @@ async fn start_session(
.get("session_id")
.and_then(|s| s.as_str())
.unwrap_or("");
let token = info
.get("token")
.and_then(|t| t.as_str())
@ -1109,12 +1165,10 @@ async fn start_session(
};
let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token).await;
match result {
Ok(true) => {
info!(
"Start script completed successfully for session: {}",
session_id
);
info!("Start script completed successfully for session: {}", session_id);
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "started",
"session_id": session.id,
@ -1130,10 +1184,7 @@ async fn start_session(
})))
}
Err(e) => {
error!(
"Error running start script for session {}: {}",
session_id, e
);
error!("Error running start script for session {}: {}", session_id, e);
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
@ -1148,14 +1199,12 @@ async fn send_warning_handler(
let default_session = "default".to_string();
let default_channel = "web".to_string();
let default_message = "Warning!".to_string();
let session_id = info.get("session_id").unwrap_or(&default_session);
let channel = info.get("channel").unwrap_or(&default_channel);
let message = info.get("message").unwrap_or(&default_message);
info!(
"Sending warning via API - session: {}, channel: {}",
session_id, channel
);
info!("Sending warning via API - session: {}, channel: {}", session_id, channel);
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
if let Err(e) = orchestrator

View file

@ -3,6 +3,8 @@ use diesel::sql_types::Text;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
@ -190,11 +192,18 @@ impl AppConfig {
.and_then(|p| p.parse().ok())
.unwrap_or_else(|| get_u32("CUSTOM_PORT", 5432)),
database: std::env::var("CUSTOM_DATABASE")
.unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "botserver")),
.unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "gbuser")),
};
let minio = DriveConfig {
server: get_str("DRIVE_SERVER", "http://localhost:9000"),
server: {
let server = get_str("DRIVE_SERVER", "http://localhost:9000");
if !server.starts_with("http://") && !server.starts_with("https://") {
format!("http://{}", server)
} else {
server
}
},
access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"),
secret_key: get_str("DRIVE_SECRET", "minioadmin"),
use_ssl: get_bool("DRIVE_USE_SSL", false),
@ -216,6 +225,11 @@ impl AppConfig {
endpoint: get_str("AI_ENDPOINT", "https://api.openai.com"),
};
// Write drive config to .env file
if let Err(e) = write_drive_config_to_env(&minio) {
warn!("Failed to write drive config to .env: {}", e);
}
AppConfig {
drive: minio,
server: ServerConfig {
@ -367,6 +381,22 @@ impl AppConfig {
}
}
fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> {
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(".env")?;
writeln!(file,"")?;
writeln!(file, "DRIVE_SERVER={}", drive.server)?;
writeln!(file, "DRIVE_ACCESSKEY={}", drive.access_key)?;
writeln!(file, "DRIVE_SECRET={}", drive.secret_key)?;
writeln!(file, "DRIVE_USE_SSL={}", drive.use_ssl)?;
writeln!(file, "DRIVE_ORG_PREFIX={}", drive.org_prefix)?;
Ok(())
}
fn parse_database_url(url: &str) -> (String, String, String, u32, String) {
if let Some(stripped) = url.strip_prefix("postgres://") {
let parts: Vec<&str> = stripped.split('@').collect();

8
src/create_bucket.rs Normal file
View file

@ -0,0 +1,8 @@
use std::fs;
use std::path::Path;
pub fn create_bucket(bucket_name: &str) -> std::io::Result<()> {
let bucket_path = Path::new("buckets").join(bucket_name);
fs::create_dir_all(&bucket_path)?;
Ok(())
}

View file

@ -2,8 +2,8 @@ use crate::basic::compiler::BasicCompiler;
use crate::kb::embeddings;
use crate::kb::qdrant_client;
use crate::shared::state::AppState;
use aws_sdk_s3::Client;
use log::{debug, error, info, warn};
use opendal::Operator;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -34,7 +34,10 @@ impl DriveMonitor {
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!("Drive Monitor service started for bucket: {}", self.bucket_name);
info!(
"Drive Monitor service started for bucket: {}",
self.bucket_name
);
let mut tick = interval(Duration::from_secs(30));
loop {
tick.tick().await;
@ -46,17 +49,17 @@ impl DriveMonitor {
}
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
let client = match &self.state.s3_client {
Some(client) => client,
None => {
return Ok(());
}
};
self.check_gbdialog_changes(op).await?;
self.check_gbkb_changes(op).await?;
if let Err(e) = self.check_default_gbot(op).await {
self.check_gbdialog_changes(client).await?;
self.check_gbkb_changes(client).await?;
if let Err(e) = self.check_gbot(client).await {
error!("Error checking default bot config: {}", e);
}
@ -65,40 +68,57 @@ impl DriveMonitor {
async fn check_gbdialog_changes(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbdialog/";
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
let path = entry.path().to_string();
if path.ends_with('/') || !path.ends_with(".bas") {
continue;
let mut continuation_token = None;
loop {
let list_objects = client
.list_objects_v2()
.bucket(&self.bucket_name.to_lowercase())
.set_continuation_token(continuation_token)
.send()
.await?;
debug!("List objects result: {:?}", list_objects);
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
let path_parts: Vec<&str> = path.split('/').collect();
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") {
continue;
}
if path.ends_with('/') || !path.ends_with(".bas") {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
if current_state.etag != previous_state.etag {
if let Err(e) = self.compile_tool(op, path).await {
if let Err(e) = self.compile_tool(client, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
} else {
if let Err(e) = self.compile_tool(op, path).await {
if let Err(e) = self.compile_tool(client, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
@ -125,45 +145,67 @@ impl DriveMonitor {
async fn check_gbkb_changes(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbkb/";
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
let mut continuation_token = None;
loop {
let list_objects = client
.list_objects_v2()
.bucket(&self.bucket_name.to_lowercase())
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
debug!("List objects result: {:?}", list_objects);
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
let path_parts: Vec<&str> = path.split('/').collect();
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbkb") {
continue;
}
if path.ends_with('/') {
continue;
}
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
if current_state.etag != previous_state.etag {
if let Err(e) = self.index_document(op, path).await {
if let Err(e) = self.index_document(client, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
} else {
if let Err(e) = self.index_document(op, path).await {
if let Err(e) = self.index_document(client, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
@ -188,38 +230,103 @@ impl DriveMonitor {
Ok(())
}
async fn check_default_gbot(
async fn check_gbot(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = format!("{}default.gbot/", self.bucket_name);
let config_key = format!("{}config.csv", prefix);
match op.stat(&config_key).await {
Ok(_) => {
let content = op.read(&config_key).await?;
let csv_content = String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 error in config.csv: {}", e))?;
debug!("Found config.csv: {} bytes", csv_content.len());
Ok(())
let prefix = ".gbot/";
let mut continuation_token = None;
loop {
let list_objects = client
.list_objects_v2()
.bucket(&self.bucket_name.to_lowercase())
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
let path_parts: Vec<&str> = path.split('/').collect();
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") {
continue;
}
if !path.ends_with("config.csv") {
continue;
}
debug!("Checking config file at path: {}", path);
match client
.head_object()
.bucket(&self.bucket_name)
.key(&path)
.send()
.await
{
Ok(head_res) => {
debug!("HeadObject successful for {}, metadata: {:?}", path, head_res);
let response = client
.get_object()
.bucket(&self.bucket_name)
.key(&path)
.send()
.await?;
debug!("GetObject successful for {}, content length: {}", path, response.content_length().unwrap_or(0));
let bytes = response.body.collect().await?.into_bytes();
debug!("Collected {} bytes for {}", bytes.len(), path);
let csv_content = String::from_utf8(bytes.to_vec())
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
debug!("Found {}: {} bytes", path, csv_content.len());
}
Err(e) => {
debug!("Config file {} not found or inaccessible: {}", path, e);
}
}
}
Err(e) => {
debug!("Config file not found or inaccessible: {}", e);
Ok(())
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
continuation_token = list_objects.next_continuation_token;
}
Ok(())
}
async fn compile_tool(
&self,
op: &Operator,
client: &Client,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = op.read(file_path).await?;
let source_content = String::from_utf8(content.to_vec())?;
debug!("Fetching object from S3: bucket={}, key={}", &self.bucket_name, file_path);
let response = match client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await {
Ok(res) => {
debug!("Successfully fetched object from S3: bucket={}, key={}, size={}",
&self.bucket_name, file_path, res.content_length().unwrap_or(0));
res
}
Err(e) => {
error!("Failed to fetch object from S3: bucket={}, key={}, error={:?}",
&self.bucket_name, file_path, e);
return Err(e.into());
}
};
let bytes = response.body.collect().await?.into_bytes();
let source_content = String::from_utf8(bytes.to_vec())?;
let tool_name = file_path
.strip_prefix(".gbdialog/")
.split('/')
.last()
.unwrap_or(file_path)
.strip_suffix(".bas")
.unwrap_or(file_path)
@ -229,7 +336,7 @@ impl DriveMonitor {
.bucket_name
.strip_suffix(".gbai")
.unwrap_or(&self.bucket_name);
let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name);
let work_dir = format!("./work/{}.gbai/{}.gbdialog", bot_name, bot_name);
std::fs::create_dir_all(&work_dir)?;
let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
@ -254,7 +361,7 @@ impl DriveMonitor {
async fn index_document(
&self,
op: &Operator,
client: &Client,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let parts: Vec<&str> = file_path.split('/').collect();
@ -264,9 +371,14 @@ impl DriveMonitor {
}
let collection_name = parts[1];
let content = op.read(file_path).await?;
let bytes = content.to_vec();
let response = client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes();
let text_content = self.extract_text(file_path, &bytes)?;
if text_content.trim().is_empty() {
warn!("No text extracted from: {}", file_path);
@ -281,7 +393,7 @@ impl DriveMonitor {
let qdrant_collection = format!("kb_default_{}", collection_name);
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content)
.await?;

View file

@ -3,10 +3,12 @@ use crate::shared::state::AppState;
use actix_multipart::Multipart;
use actix_web::web;
use actix_web::{post, HttpResponse};
use opendal::Operator;
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use aws_config::BehaviorVersion;
use std::io::Write;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt;
// Removed unused import
#[post("/files/upload/{folder_path}")]
pub async fn upload_file(
@ -40,13 +42,13 @@ pub async fn upload_file(
let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string());
let temp_file_path = temp_file.into_temp_path();
let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 operator is not initialized")
let client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
})?;
let s3_key = format!("{}/{}", folder_path, file_name);
match upload_to_s3(op, &s3_key, &temp_file_path).await {
match upload_to_s3(client, &state.get_ref().bucket_name, &s3_key, &temp_file_path).await {
Ok(_) => {
let _ = std::fs::remove_file(&temp_file_path);
Ok(HttpResponse::Ok().body(format!(
@ -64,27 +66,149 @@ pub async fn upload_file(
}
}
pub async fn init_drive(config: &DriveConfig) -> Result<Operator, Box<dyn std::error::Error>> {
use opendal::services::S3;
use opendal::Operator;
let client = Operator::new(
S3::default()
.root("/")
.endpoint(&config.server)
.access_key_id(&config.access_key)
.secret_access_key(&config.secret_key),
)?
.finish();
pub async fn aws_s3_bucket_delete(
bucket: &str,
endpoint: &str,
access_key: &str,
secret_key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
access_key.to_string(),
secret_key.to_string(),
None,
None,
"static",
)
)
.load()
.await;
Ok(client)
let client = S3Client::new(&config);
client.delete_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
pub async fn aws_s3_bucket_create(
bucket: &str,
endpoint: &str,
access_key: &str,
secret_key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
access_key.to_string(),
secret_key.to_string(),
None,
None,
"static",
)
)
.load()
.await;
let client = S3Client::new(&config);
client.create_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
pub async fn init_drive(config: &DriveConfig) -> Result<S3Client, Box<dyn std::error::Error>> {
let endpoint = if !config.server.ends_with('/') {
format!("{}/", config.server)
} else {
config.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.access_key.clone(),
config.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
Ok(S3Client::from_conf(s3_config))
}
async fn upload_to_s3(
op: &Operator,
client: &S3Client,
bucket: &str,
key: &str,
file_path: &std::path::Path,
) -> Result<(), Box<dyn std::error::Error>> {
let data = std::fs::read(file_path)?;
op.write(key, data).await?;
client.put_object()
.bucket(bucket)
.key(key)
.body(data.into())
.send()
.await?;
Ok(())
}
async fn create_s3_client(
) -> Result<S3Client, Box<dyn std::error::Error>> {
let config = DriveConfig {
server: std::env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"),
access_key: std::env::var("DRIVE_ACCESS_KEY").expect("DRIVE_ACCESS_KEY not set"),
secret_key: std::env::var("DRIVE_SECRET_KEY").expect("DRIVE_SECRET_KEY not set"),
org_prefix: "".to_string(),
use_ssl: false,
};
Ok(init_drive(&config).await?)
}
pub async fn bucket_exists(client: &S3Client, bucket: &str) -> Result<bool, Box<dyn std::error::Error>> {
match client.head_bucket().bucket(bucket).send().await {
Ok(_) => Ok(true),
Err(e) => {
if e.to_string().contains("NoSuchBucket") {
Ok(false)
} else {
Err(Box::new(e))
}
}
}
}
pub async fn create_bucket(client: &S3Client, bucket: &str) -> Result<(), Box<dyn std::error::Error>> {
client.create_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
#[cfg(test)]
mod bucket_tests {
include!("tests/bucket_tests.rs");
}
#[cfg(test)]
mod tests {
include!("tests/tests.rs");
}

View file

@ -0,0 +1,70 @@
use super::*;
use aws_sdk_s3::Client as S3Client;
use std::env;
#[tokio::test]
async fn test_aws_s3_bucket_create() {
if env::var("CI").is_ok() {
return; // Skip in CI environment
}
let bucket = "test-bucket-aws";
let endpoint = "http://localhost:4566"; // LocalStack default endpoint
let access_key = "test";
let secret_key = "test";
match aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await {
Ok(_) => {
// Verify bucket exists
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.load()
.await;
let client = S3Client::new(&config);
let exists = bucket_exists(&client, bucket).await.unwrap_or(false);
assert!(exists, "Bucket should exist after creation");
},
Err(e) => {
println!("Bucket creation failed: {:?}", e);
}
}
}
#[tokio::test]
async fn test_aws_s3_bucket_delete() {
if env::var("CI").is_ok() {
return; // Skip in CI environment
}
let bucket = "test-delete-bucket-aws";
let endpoint = "http://localhost:4566"; // LocalStack default endpoint
let access_key = "test";
let secret_key = "test";
// First create the bucket
if let Err(e) = aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await {
println!("Failed to create test bucket: {:?}", e);
return;
}
// Then test deletion
match aws_s3_bucket_delete(bucket, endpoint, access_key, secret_key).await {
Ok(_) => {
// Verify bucket no longer exists
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.load()
.await;
let client = S3Client::new(&config);
let exists = bucket_exists(&client, bucket).await.unwrap_or(false);
assert!(!exists, "Bucket should not exist after deletion");
},
Err(e) => {
println!("Bucket deletion failed: {:?}", e);
}
}
}

80
src/file/tests/tests.rs Normal file
View file

@ -0,0 +1,80 @@
use super::*;
#[tokio::test]
async fn test_create_s3_client() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
// Test bucket operations
if let Err(e) = create_bucket(&client, "test.gbai").await {
println!("Bucket creation failed: {:?}", e);
}
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
// Cleanup
std::env::remove_var("DRIVE_SERVER");
std::env::remove_var("DRIVE_ACCESS_KEY");
std::env::remove_var("DRIVE_SECRET_KEY");
}
#[tokio::test]
async fn test_bucket_exists() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
}
#[tokio::test]
async fn test_create_bucket() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
}

View file

@ -1,7 +1,6 @@
use crate::shared::state::AppState;
use log::error;
use opendal::Operator;
use tokio_stream::StreamExt;
use aws_sdk_s3::Client;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -17,14 +16,32 @@ pub struct FileState {
pub struct MinIOHandler {
state: Arc<AppState>,
s3: Arc<Client>,
watched_prefixes: Arc<tokio::sync::RwLock<Vec<String>>>,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
}
pub async fn get_file_content(
client: &aws_sdk_s3::Client,
bucket: &str,
key: &str
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let response = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes().to_vec();
Ok(bytes)
}
impl MinIOHandler {
pub fn new(state: Arc<AppState>) -> Self {
let client = state.s3_client.as_ref().expect("S3 client must be initialized").clone();
Self {
state,
state: Arc::clone(&state),
s3: Arc::new(client),
watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())),
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
@ -61,16 +78,9 @@ impl MinIOHandler {
&self,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
None => {
return Ok(());
}
};
let prefixes = self.watched_prefixes.read().await;
for prefix in prefixes.iter() {
if let Err(e) = self.check_prefix_changes(op, prefix, callback).await {
if let Err(e) = self.check_prefix_changes(&self.s3, prefix, callback).await {
error!("Error checking prefix {}: {}", prefix, e);
}
}
@ -79,28 +89,41 @@ impl MinIOHandler {
async fn check_prefix_changes(
&self,
op: &Operator,
client: &Client,
prefix: &str,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = lister.try_next().await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
let mut continuation_token = None;
loop {
let list_objects = client.list_objects_v2()
.bucket(&self.state.bucket_name)
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
if path.ends_with('/') {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
@ -146,7 +169,7 @@ impl MinIOHandler {
pub async fn get_file_state(&self, path: &str) -> Option<FileState> {
let states = self.file_states.read().await;
states.get(path).cloned()
states.get(&path.to_string()).cloned()
}
pub async fn clear_state(&self) {

View file

@ -1,7 +1,8 @@
use crate::shared::models::KBCollection;
use crate::shared::state::AppState;
use log::{ error, info, warn};
use tokio_stream::StreamExt;
// Removed unused import
// Removed duplicate import since we're using the module directly
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -95,35 +96,16 @@ impl KBManager {
&self,
collection: &KBCollection,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
let _client = match &self.state.s3_client {
Some(client) => client,
None => {
warn!("S3 operator not configured");
warn!("S3 client not configured");
return Ok(());
}
};
let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?;
while let Some(entry) = lister.try_next().await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
}
let meta = op.stat(&path).await?;
if let Err(e) = self
.process_file(
&collection,
&path,
meta.content_length() as i64,
meta.last_modified().map(|dt| dt.to_rfc3339()),
)
.await
{
error!("Error processing file {}: {}", path, e);
}
}
let minio_handler = minio_handler::MinIOHandler::new(self.state.clone());
minio_handler.watch_prefix(collection.folder_path.clone()).await;
Ok(())
}
@ -135,7 +117,8 @@ impl KBManager {
file_size: i64,
_last_modified: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = self.get_file_content(file_path).await?;
let client = self.state.s3_client.as_ref().ok_or("S3 client not configured")?;
let content = minio_handler::get_file_content(client, &self.state.bucket_name, file_path).await?;
let file_hash = if content.len() > 100 {
format!(
"{:x}_{:x}_{}",
@ -183,20 +166,6 @@ impl KBManager {
Ok(())
}
async fn get_file_content(
&self,
file_path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let op = self
.state
.s3_operator
.as_ref()
.ok_or("S3 operator not configured")?;
let content = op.read(file_path).await?;
Ok(content.to_vec())
}
async fn extract_text(
&self,
file_path: &str,

View file

@ -36,6 +36,7 @@ mod tools;
mod web_automation;
mod web_server;
mod whatsapp;
mod create_bucket;
use crate::auth::auth_handler;
use crate::automation::AutomationService;
@ -43,7 +44,6 @@ use crate::bootstrap::BootstrapManager;
use crate::bot::{start_session, websocket_handler};
use crate::channels::{VoiceAdapter, WebChannelAdapter};
use crate::config::AppConfig;
use crate::drive_monitor::DriveMonitor;
#[cfg(feature = "email")]
use crate::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
@ -59,10 +59,17 @@ use crate::shared::state::AppState;
use crate::web_server::{bot_index, index, static_files};
use crate::whatsapp::whatsapp_webhook_verify;
use crate::whatsapp::WhatsAppAdapter;
use crate::bot::BotOrchestrator;
#[cfg(not(feature = "desktop"))]
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Test bucket creation
match create_bucket::create_bucket("test-bucket") {
Ok(_) => println!("Bucket created successfully"),
Err(e) => eprintln!("Failed to create bucket: {}", e),
}
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 {
let command = &args[1];
@ -89,6 +96,7 @@ async fn main() -> std::io::Result<()> {
}
}
// Rest of the original main function remains unchanged...
dotenv().ok();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.write_style(env_logger::WriteStyle::Always)
@ -106,7 +114,7 @@ async fn main() -> std::io::Result<()> {
None
};
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone());
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await;
// Prevent double bootstrap: skip if environment already initialized
let env_path = std::env::current_dir()?.join("botserver-stack").join(".env");
@ -120,7 +128,7 @@ async fn main() -> std::io::Result<()> {
Err(_) => AppConfig::from_env(),
}
} else {
match bootstrap.bootstrap() {
match bootstrap.bootstrap().await {
Ok(config) => {
info!("Bootstrap completed successfully");
config
@ -138,9 +146,13 @@ async fn main() -> std::io::Result<()> {
}
};
let _ = bootstrap.start_all();
if let Err(e) = bootstrap.upload_templates_to_drive(&cfg).await {
// Start all services (synchronous)
if let Err(e) = bootstrap.start_all() {
log::warn!("Failed to start all services: {}", e);
}
// Upload templates (asynchronous)
if let Err(e) = futures::executor::block_on(bootstrap.upload_templates_to_drive(&cfg)) {
log::warn!("Failed to upload templates to MinIO: {}", e);
}
@ -193,7 +205,6 @@ async fn main() -> std::io::Result<()> {
));
let tool_api = Arc::new(tools::ToolApi::new());
let drive = init_drive(&config.drive)
.await
.expect("Failed to initialize Drive");
@ -209,9 +220,10 @@ async fn main() -> std::io::Result<()> {
)));
let app_state = Arc::new(AppState {
s3_operator: Some(drive.clone()),
s3_client: Some(drive),
config: Some(cfg.clone()),
conn: db_pool.clone(),
bucket_name: "default.gbai".to_string(), // Default bucket name
custom_conn: db_custom_pool.clone(),
redis_client: redis_client.clone(),
session_manager: session_manager.clone(),
@ -246,19 +258,21 @@ async fn main() -> std::io::Result<()> {
.expect("Failed to create runtime for automation");
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let scripts_dir = format!("work/{}.gbai/.gbdialog", bot_guid);
let scripts_dir = "work/default.gbai/.gbdialog".to_string();
let automation = AutomationService::new(automation_state, &scripts_dir);
automation.spawn().await.ok();
});
});
let drive_state = app_state.clone();
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let bucket_name = format!("{}{}.gbai", cfg.drive.org_prefix, bot_guid);
let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name));
let _drive_handle = drive_monitor.spawn();
// Initialize bot orchestrator and mount all bots
let bot_orchestrator = BotOrchestrator::new(app_state.clone());
// Mount all active bots from database
if let Err(e) = bot_orchestrator.mount_all_bots().await {
log::error!("Failed to mount bots: {}", e);
}
HttpServer::new(move || {
let cors = Cors::default()
.allow_any_origin()
@ -271,25 +285,21 @@ async fn main() -> std::io::Result<()> {
.wrap(cors)
.wrap(Logger::default())
.wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i"))
.app_data(web::Data::from(app_state_clone));
app = app
.service(upload_file)
.service(index)
.service(static_files)
.service(websocket_handler)
.app_data(web::Data::from(app_state_clone))
.service(auth_handler)
.service(whatsapp_webhook_verify)
.service(chat_completions_local)
.service(create_session)
.service(embeddings_local)
.service(get_session_history)
.service(get_sessions)
.service(index)
.service(start_session)
.service(upload_file)
.service(voice_start)
.service(voice_stop)
.service(create_session)
.service(get_sessions)
.service(start_session)
.service(get_session_history)
.service(chat_completions_local)
.service(embeddings_local)
.service(bot_index); // Must be last - catches all remaining paths
.service(whatsapp_webhook_verify)
.service(websocket_handler);
#[cfg(feature = "email")]
{
app = app
@ -299,9 +309,12 @@ async fn main() -> std::io::Result<()> {
.service(send_email)
.service(save_draft)
.service(save_click);
}
}
app = app.service(static_files);
app = app.service(bot_index);
app
})
.workers(worker_count)
.bind((config.server.host.clone(), config.server.port))?

View file

@ -89,17 +89,9 @@ impl PackageManager {
),
binary_name: Some("minio".to_string()),
pre_install_cmds_linux: vec![],
post_install_cmds_linux: vec![
"wget https://dl.min.io/client/mc/release/linux-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
post_install_cmds_linux: vec![],
pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![
"wget https://dl.min.io/client/mc/release/darwin-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
post_install_cmds_macos: vec![],
pre_install_cmds_windows: vec![],
post_install_cmds_windows: vec![],
env_vars: HashMap::from([
@ -107,7 +99,7 @@ impl PackageManager {
("DRIVE_ROOT_PASSWORD".to_string(), drive_password.clone()),
]),
data_download_list: Vec::new(),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 & sleep 5 && {{BIN_PATH}}/mc alias set drive http://localhost:9000 minioadmin minioadmin && {{BIN_PATH}}/mc admin user add drive $DRIVE_ROOT_USER $DRIVE_ROOT_PASSWORD && {{BIN_PATH}}/mc admin policy attach drive readwrite --user $DRIVE_ROOT_USER && {{BIN_PATH}}/mc mb drive/default.gbai || true".to_string(),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 &".to_string(),
},
);

View file

@ -395,7 +395,7 @@ async fn create_session(data: web::Data<AppState>) -> Result<HttpResponse> {
#[actix_web::get("/api/sessions")]
async fn get_sessions(data: web::Data<AppState>) -> Result<HttpResponse> {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone()));
match orchestrator.get_user_sessions(user_id).await {
Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)),
Err(e) => {
@ -416,7 +416,7 @@ async fn get_session_history(
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => {
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone()));
match orchestrator
.get_conversation_history(session_uuid, user_id)
.await

View file

@ -6,8 +6,8 @@ use crate::session::SessionManager;
use crate::tools::{ToolApi, ToolManager};
use crate::whatsapp::WhatsAppAdapter;
use diesel::{Connection, PgConnection};
use opendal::Operator;
use redis::Client;
use aws_sdk_s3::Client as S3Client;
use redis::Client as RedisClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
@ -15,11 +15,12 @@ use tokio::sync::mpsc;
use crate::shared::models::BotResponse;
pub struct AppState {
pub s3_operator: Option<Operator>,
pub s3_client: Option<S3Client>,
pub bucket_name: String,
pub config: Option<AppConfig>,
pub conn: Arc<Mutex<PgConnection>>,
pub custom_conn: Arc<Mutex<PgConnection>>,
pub redis_client: Option<Arc<Client>>,
pub redis_client: Option<Arc<RedisClient>>,
pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>,
pub tool_manager: Arc<ToolManager>,
pub llm_provider: Arc<dyn LLMProvider>,
@ -35,7 +36,8 @@ pub struct AppState {
impl Clone for AppState {
fn clone(&self) -> Self {
Self {
s3_operator: self.s3_operator.clone(),
s3_client: self.s3_client.clone(),
bucket_name: self.bucket_name.clone(),
config: self.config.clone(),
conn: Arc::clone(&self.conn),
custom_conn: Arc::clone(&self.custom_conn),
@ -57,7 +59,8 @@ impl Clone for AppState {
impl Default for AppState {
fn default() -> Self {
Self {
s3_operator: None,
s3_client: None,
bucket_name: "default.gbai".to_string(),
config: None,
conn: Arc::new(Mutex::new(
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),

View file

@ -13,6 +13,7 @@ use std::io::Write;
use std::str::FromStr;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_successful_file_upload() -> Result<()> {
// Setup test environment and MinIO client

View file

@ -26,7 +26,7 @@ async fn bot_index(req: HttpRequest) -> Result<HttpResponse> {
}
}
#[actix_web::get("/{filename:.*}")]
#[actix_web::get("/static/{filename:.*}")]
async fn static_files(req: HttpRequest) -> Result<HttpResponse> {
let filename = req.match_info().query("filename");
let path = format!("web/html/{}", filename);