diff --git a/add-req.sh b/add-req.sh index 0fce6738..89f9a9f2 100755 --- a/add-req.sh +++ b/add-req.sh @@ -22,9 +22,9 @@ dirs=( # "auth" # "automation" # "basic" - # "bot" + "bot" "bootstrap" - "package_manager" + #"package_manager" # "channels" # "config" # "context" diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index 9e48a403..1670fd1b 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -1,7 +1,7 @@ use crate::config::AppConfig; use crate::package_manager::{InstallMode, PackageManager}; -use anyhow::{Result, Context}; -use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName}; +use anyhow::Result; +use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName, Selectable}; use dotenvy::dotenv; use log::{debug, error, info, trace}; use aws_sdk_s3::Client; @@ -14,8 +14,14 @@ use std::path::Path; use std::process::Command; use std::sync::{Arc, Mutex}; use uuid::Uuid; +use diesel::SelectableHelper; + +use diesel::Queryable; #[derive(QueryableByName)] +#[diesel(check_for_backend(diesel::pg::Pg))] +#[derive(Queryable)] +#[diesel(table_name = crate::shared::models::schema::bots)] struct BotIdRow { #[diesel(sql_type = diesel::sql_types::Uuid)] id: uuid::Uuid, @@ -141,8 +147,8 @@ impl BootstrapManager { let mut conn = diesel::pg::PgConnection::establish(&database_url) .map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?; let default_bot_id: uuid::Uuid = diesel::sql_query("SELECT id FROM bots LIMIT 1") - .get_result::(&mut conn) - .map(|row| row.id) + .load::(&mut conn) + .map(|rows| rows.first().map(|r| r.id).unwrap_or_else(|| uuid::Uuid::new_v4())) .unwrap_or_else(|_| uuid::Uuid::new_v4()); if let Err(e) = self.update_bot_config(&default_bot_id, component.name) { @@ -551,7 +557,7 @@ impl BootstrapManager { let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()); // Create new connection for config loading - let mut config_conn = diesel::PgConnection::establish(&database_url)?; + let config_conn = diesel::PgConnection::establish(&database_url)?; let config_manager = ConfigManager::new(Arc::new(Mutex::new(config_conn))); // Use default bot ID or create one if needed diff --git a/src/bot/mod.rs b/src/bot/mod.rs index bd56ba6d..88848dee 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -3,11 +3,12 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession}; use crate::shared::state::AppState; use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_ws::Message as WsMessage; +use futures::TryFutureExt; use log::{debug, error, info, warn}; use chrono::Utc; use serde_json; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; use crate::kb::embeddings::generate_embeddings; use uuid::Uuid; @@ -16,13 +17,119 @@ use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, Qdra use crate::context::langcache::{get_langcache_client}; +use crate::drive_monitor::DriveMonitor; + +use tokio::sync::Mutex as AsyncMutex; + pub struct BotOrchestrator { pub state: Arc, + pub mounted_bots: Arc>>>, } impl BotOrchestrator { + /// Creates a new BotOrchestrator instance pub fn new(state: Arc) -> Self { - Self { state } + Self { + state, + mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())), + } + } + + /// Mounts all available bots from the database table + pub async fn mount_all_bots(&self) -> Result<(), Box> { + info!("Mounting all available bots from database"); + + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + let mut db_conn = self.state.conn.lock().unwrap(); + let active_bots = bots + .filter(is_active.eq(true)) + .select(id) + .load::(&mut *db_conn) + .map_err(|e| { + error!("Failed to query active bots: {}", e); + e + })?; + + for bot_guid in active_bots { + if let Err(e) = self.mount_bot(&bot_guid.to_string()).await { + error!("Failed to mount bot {}: {}", bot_guid, e); + // Continue mounting other bots even if one fails + continue; + } + } + + Ok(()) + } + + /// Creates a new bot with its storage bucket + pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box> { + let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid); + // Generate a new GUID if needed + + // Create bucket in storage + crate::create_bucket::create_bucket(&bucket_name)?; + + // TODO: Add bot to database + Ok(()) + } + + /// Mounts a bot by activating its resources (drive monitor, etc) + pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box> { + // Remove .gbai suffix if present to normalize bot GUID + let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid); + info!("Mounting bot: {}", bot_guid); + let bot_guid = bot_guid.to_string(); // Ensure we have an owned String + + let config = self.state.config.as_ref().ok_or("AppConfig not initialized")?; + // Use bot_guid directly without appending .gbai since it's now part of the ID + use diesel::prelude::*; +use crate::shared::models::schema::bots::dsl::*; + +let mut db_conn = self.state.conn.lock().unwrap(); +let bot_name: String = bots + .filter(id.eq(Uuid::parse_str(&bot_guid)?)) + .select(name) + .first(&mut *db_conn) + .map_err(|e| { + error!("Failed to query bot name for {}: {}", bot_guid, e); + e + })?; + +let bucket_name = format!("{}.gbai", bot_name); + + + // Check if bot is already mounted + { + let mounted_bots = self.mounted_bots.lock().await; + if mounted_bots.contains_key(&bot_guid) { + warn!("Bot {} is already mounted", bot_guid); + return Ok(()); + } + } + + // Initialize and spawn drive monitor asynchronously + let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); + let drive_monitor_clone = drive_monitor.clone(); + // Clone bot_guid to avoid moving it into the async block + let bot_guid_clone = bot_guid.clone(); + tokio::spawn(async move { + if let Err(e) = drive_monitor_clone.spawn().await { + error!("Failed to spawn drive monitor for bot {}: {}", bot_guid_clone, e); + } + }); + + // Track mounted bot + let guid = bot_guid.clone(); + let drive_monitor_clone = drive_monitor.clone(); + { + let mut mounted_bots = self.mounted_bots.lock().await; + mounted_bots.insert(guid, drive_monitor_clone); + } + + info!("Successfully mounted bot: {}", bot_guid); + Ok(()) } pub async fn handle_user_input( @@ -165,15 +272,8 @@ impl BotOrchestrator { e })?; - let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { - Uuid::parse_str(&bot_guid).map_err(|e| { - warn!("Invalid BOT_GUID from env: {}", e); - e - })? - } else { - warn!("BOT_GUID not set in environment, using nil UUID"); - Uuid::nil() - }; + let bot_id = Uuid::nil(); // Using nil UUID for default bot + // Default to announcements bot let session = { let mut sm = self.state.session_manager.lock().await; @@ -669,7 +769,7 @@ impl BotOrchestrator { "Running start script for session: {} with token: {:?}", session.id, token ); - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot")); let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid); let start_script = match std::fs::read_to_string(&start_script_path) { Ok(content) => content, @@ -808,6 +908,7 @@ impl Default for BotOrchestrator { fn default() -> Self { Self { state: Arc::new(AppState::default()), + mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())), } } } diff --git a/src/file/mod.rs b/src/file/mod.rs index 08d74dc6..2f9d2927 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -3,17 +3,12 @@ use crate::shared::state::AppState; use actix_multipart::Multipart; use actix_web::web; use actix_web::{post, HttpResponse}; -use base64::Engine; use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder}; use aws_config::BehaviorVersion; -// Removed unused import use std::io::Write; use tempfile::NamedTempFile; use tokio_stream::StreamExt as TokioStreamExt; -use reqwest::Client as HttpClient; -use hmac::{Hmac, Mac}; -use sha2::Sha256; -use chrono::Utc; +// Removed unused import #[post("/files/upload/{folder_path}")] pub async fn upload_file( diff --git a/src/main.rs b/src/main.rs index 3ca7d7de..6e3f1322 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,6 @@ use actix_web::{web, App, HttpServer}; use dotenvy::dotenv; use log::info; use std::collections::HashMap; -use std::env; use std::sync::{Arc, Mutex}; mod auth; @@ -45,7 +44,6 @@ use crate::bootstrap::BootstrapManager; use crate::bot::{start_session, websocket_handler}; use crate::channels::{VoiceAdapter, WebChannelAdapter}; use crate::config::AppConfig; -use crate::drive_monitor::DriveMonitor; #[cfg(feature = "email")] use crate::email::{ get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email, @@ -61,6 +59,7 @@ use crate::shared::state::AppState; use crate::web_server::{bot_index, index, static_files}; use crate::whatsapp::whatsapp_webhook_verify; use crate::whatsapp::WhatsAppAdapter; +use crate::bot::BotOrchestrator; #[cfg(not(feature = "desktop"))] #[tokio::main] @@ -222,9 +221,9 @@ async fn main() -> std::io::Result<()> { let app_state = Arc::new(AppState { s3_client: Some(drive), - bucket_name: format!("{}{}.gbai", cfg.drive.org_prefix, env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string())), config: Some(cfg.clone()), conn: db_pool.clone(), + bucket_name: "default.gbai".to_string(), // Default bucket name custom_conn: db_custom_pool.clone(), redis_client: redis_client.clone(), session_manager: session_manager.clone(), @@ -259,19 +258,21 @@ async fn main() -> std::io::Result<()> { .expect("Failed to create runtime for automation"); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); - let scripts_dir = format!("work/{}.gbai/.gbdialog", bot_guid); + let scripts_dir = "work/default.gbai/.gbdialog".to_string(); let automation = AutomationService::new(automation_state, &scripts_dir); automation.spawn().await.ok(); }); }); - let drive_state = app_state.clone(); - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); - let bucket_name = format!("{}{}.gbai", cfg.drive.org_prefix, bot_guid); - let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name)); - let _drive_handle = drive_monitor.spawn(); + // Initialize bot orchestrator and mount all bots + let bot_orchestrator = BotOrchestrator::new(app_state.clone()); + + // Mount all active bots from database + if let Err(e) = bot_orchestrator.mount_all_bots().await { + log::error!("Failed to mount bots: {}", e); + } + HttpServer::new(move || { let cors = Cors::default() .allow_any_origin() diff --git a/src/session/mod.rs b/src/session/mod.rs index ee291265..61c99e53 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -395,7 +395,7 @@ async fn create_session(data: web::Data) -> Result { #[actix_web::get("/api/sessions")] async fn get_sessions(data: web::Data) -> Result { let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone())); match orchestrator.get_user_sessions(user_id).await { Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)), Err(e) => { @@ -416,7 +416,7 @@ async fn get_session_history( match Uuid::parse_str(&session_id) { Ok(session_uuid) => { - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone())); match orchestrator .get_conversation_history(session_uuid, user_id) .await