feat(bot, bootstrap): add bot mounting and improve DB handling

Introduce bot mounting logic in `BotOrchestrator` to load and manage active bots from the database. Enhance `BootstrapManager` by refining Diesel query usage and connection handling for better reliability and maintainability.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-30 18:48:16 -03:00
parent d803ffd24e
commit f85b19efda
6 changed files with 140 additions and 37 deletions

View file

@ -22,9 +22,9 @@ dirs=(
# "auth"
# "automation"
# "basic"
# "bot"
"bot"
"bootstrap"
"package_manager"
#"package_manager"
# "channels"
# "config"
# "context"

View file

@ -1,7 +1,7 @@
use crate::config::AppConfig;
use crate::package_manager::{InstallMode, PackageManager};
use anyhow::{Result, Context};
use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName};
use anyhow::Result;
use diesel::{connection::SimpleConnection, RunQueryDsl, Connection, QueryableByName, Selectable};
use dotenvy::dotenv;
use log::{debug, error, info, trace};
use aws_sdk_s3::Client;
@ -14,8 +14,14 @@ use std::path::Path;
use std::process::Command;
use std::sync::{Arc, Mutex};
use uuid::Uuid;
use diesel::SelectableHelper;
use diesel::Queryable;
#[derive(QueryableByName)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[derive(Queryable)]
#[diesel(table_name = crate::shared::models::schema::bots)]
struct BotIdRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: uuid::Uuid,
@ -141,8 +147,8 @@ impl BootstrapManager {
let mut conn = diesel::pg::PgConnection::establish(&database_url)
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
let default_bot_id: uuid::Uuid = diesel::sql_query("SELECT id FROM bots LIMIT 1")
.get_result::<BotIdRow>(&mut conn)
.map(|row| row.id)
.load::<BotIdRow>(&mut conn)
.map(|rows| rows.first().map(|r| r.id).unwrap_or_else(|| uuid::Uuid::new_v4()))
.unwrap_or_else(|_| uuid::Uuid::new_v4());
if let Err(e) = self.update_bot_config(&default_bot_id, component.name) {
@ -551,7 +557,7 @@ impl BootstrapManager {
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
// Create new connection for config loading
let mut config_conn = diesel::PgConnection::establish(&database_url)?;
let config_conn = diesel::PgConnection::establish(&database_url)?;
let config_manager = ConfigManager::new(Arc::new(Mutex::new(config_conn)));
// Use default bot ID or create one if needed

View file

@ -3,11 +3,12 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession};
use crate::shared::state::AppState;
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage;
use futures::TryFutureExt;
use log::{debug, error, info, warn};
use chrono::Utc;
use serde_json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use crate::kb::embeddings::generate_embeddings;
use uuid::Uuid;
@ -16,13 +17,119 @@ use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, Qdra
use crate::context::langcache::{get_langcache_client};
use crate::drive_monitor::DriveMonitor;
use tokio::sync::Mutex as AsyncMutex;
pub struct BotOrchestrator {
pub state: Arc<AppState>,
pub mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
}
impl BotOrchestrator {
/// Creates a new BotOrchestrator instance
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
Self {
state,
mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())),
}
}
/// Mounts all available bots from the database table
pub async fn mount_all_bots(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Mounting all available bots from database");
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
let mut db_conn = self.state.conn.lock().unwrap();
let active_bots = bots
.filter(is_active.eq(true))
.select(id)
.load::<uuid::Uuid>(&mut *db_conn)
.map_err(|e| {
error!("Failed to query active bots: {}", e);
e
})?;
for bot_guid in active_bots {
if let Err(e) = self.mount_bot(&bot_guid.to_string()).await {
error!("Failed to mount bot {}: {}", bot_guid, e);
// Continue mounting other bots even if one fails
continue;
}
}
Ok(())
}
/// Creates a new bot with its storage bucket
pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid);
// Generate a new GUID if needed
// Create bucket in storage
crate::create_bucket::create_bucket(&bucket_name)?;
// TODO: Add bot to database
Ok(())
}
/// Mounts a bot by activating its resources (drive monitor, etc)
pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Remove .gbai suffix if present to normalize bot GUID
let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid);
info!("Mounting bot: {}", bot_guid);
let bot_guid = bot_guid.to_string(); // Ensure we have an owned String
let config = self.state.config.as_ref().ok_or("AppConfig not initialized")?;
// Use bot_guid directly without appending .gbai since it's now part of the ID
use diesel::prelude::*;
use crate::shared::models::schema::bots::dsl::*;
let mut db_conn = self.state.conn.lock().unwrap();
let bot_name: String = bots
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
.select(name)
.first(&mut *db_conn)
.map_err(|e| {
error!("Failed to query bot name for {}: {}", bot_guid, e);
e
})?;
let bucket_name = format!("{}.gbai", bot_name);
// Check if bot is already mounted
{
let mounted_bots = self.mounted_bots.lock().await;
if mounted_bots.contains_key(&bot_guid) {
warn!("Bot {} is already mounted", bot_guid);
return Ok(());
}
}
// Initialize and spawn drive monitor asynchronously
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name));
let drive_monitor_clone = drive_monitor.clone();
// Clone bot_guid to avoid moving it into the async block
let bot_guid_clone = bot_guid.clone();
tokio::spawn(async move {
if let Err(e) = drive_monitor_clone.spawn().await {
error!("Failed to spawn drive monitor for bot {}: {}", bot_guid_clone, e);
}
});
// Track mounted bot
let guid = bot_guid.clone();
let drive_monitor_clone = drive_monitor.clone();
{
let mut mounted_bots = self.mounted_bots.lock().await;
mounted_bots.insert(guid, drive_monitor_clone);
}
info!("Successfully mounted bot: {}", bot_guid);
Ok(())
}
pub async fn handle_user_input(
@ -165,15 +272,8 @@ impl BotOrchestrator {
e
})?;
let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") {
Uuid::parse_str(&bot_guid).map_err(|e| {
warn!("Invalid BOT_GUID from env: {}", e);
e
})?
} else {
warn!("BOT_GUID not set in environment, using nil UUID");
Uuid::nil()
};
let bot_id = Uuid::nil(); // Using nil UUID for default bot
// Default to announcements bot
let session = {
let mut sm = self.state.session_manager.lock().await;
@ -669,7 +769,7 @@ impl BotOrchestrator {
"Running start script for session: {} with token: {:?}",
session.id, token
);
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
let start_script = match std::fs::read_to_string(&start_script_path) {
Ok(content) => content,
@ -808,6 +908,7 @@ impl Default for BotOrchestrator {
fn default() -> Self {
Self {
state: Arc::new(AppState::default()),
mounted_bots: Arc::new(AsyncMutex::new(HashMap::new())),
}
}
}

View file

@ -3,17 +3,12 @@ use crate::shared::state::AppState;
use actix_multipart::Multipart;
use actix_web::web;
use actix_web::{post, HttpResponse};
use base64::Engine;
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use aws_config::BehaviorVersion;
// Removed unused import
use std::io::Write;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt;
use reqwest::Client as HttpClient;
use hmac::{Hmac, Mac};
use sha2::Sha256;
use chrono::Utc;
// Removed unused import
#[post("/files/upload/{folder_path}")]
pub async fn upload_file(

View file

@ -7,7 +7,6 @@ use actix_web::{web, App, HttpServer};
use dotenvy::dotenv;
use log::info;
use std::collections::HashMap;
use std::env;
use std::sync::{Arc, Mutex};
mod auth;
@ -45,7 +44,6 @@ use crate::bootstrap::BootstrapManager;
use crate::bot::{start_session, websocket_handler};
use crate::channels::{VoiceAdapter, WebChannelAdapter};
use crate::config::AppConfig;
use crate::drive_monitor::DriveMonitor;
#[cfg(feature = "email")]
use crate::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
@ -61,6 +59,7 @@ use crate::shared::state::AppState;
use crate::web_server::{bot_index, index, static_files};
use crate::whatsapp::whatsapp_webhook_verify;
use crate::whatsapp::WhatsAppAdapter;
use crate::bot::BotOrchestrator;
#[cfg(not(feature = "desktop"))]
#[tokio::main]
@ -222,9 +221,9 @@ async fn main() -> std::io::Result<()> {
let app_state = Arc::new(AppState {
s3_client: Some(drive),
bucket_name: format!("{}{}.gbai", cfg.drive.org_prefix, env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string())),
config: Some(cfg.clone()),
conn: db_pool.clone(),
bucket_name: "default.gbai".to_string(), // Default bucket name
custom_conn: db_custom_pool.clone(),
redis_client: redis_client.clone(),
session_manager: session_manager.clone(),
@ -259,19 +258,21 @@ async fn main() -> std::io::Result<()> {
.expect("Failed to create runtime for automation");
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let scripts_dir = format!("work/{}.gbai/.gbdialog", bot_guid);
let scripts_dir = "work/default.gbai/.gbdialog".to_string();
let automation = AutomationService::new(automation_state, &scripts_dir);
automation.spawn().await.ok();
});
});
let drive_state = app_state.clone();
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let bucket_name = format!("{}{}.gbai", cfg.drive.org_prefix, bot_guid);
let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name));
let _drive_handle = drive_monitor.spawn();
// Initialize bot orchestrator and mount all bots
let bot_orchestrator = BotOrchestrator::new(app_state.clone());
// Mount all active bots from database
if let Err(e) = bot_orchestrator.mount_all_bots().await {
log::error!("Failed to mount bots: {}", e);
}
HttpServer::new(move || {
let cors = Cors::default()
.allow_any_origin()

View file

@ -395,7 +395,7 @@ async fn create_session(data: web::Data<AppState>) -> Result<HttpResponse> {
#[actix_web::get("/api/sessions")]
async fn get_sessions(data: web::Data<AppState>) -> Result<HttpResponse> {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone()));
match orchestrator.get_user_sessions(user_id).await {
Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)),
Err(e) => {
@ -416,7 +416,7 @@ async fn get_session_history(
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => {
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
let orchestrator = BotOrchestrator::new(Arc::new(data.get_ref().clone()));
match orchestrator
.get_conversation_history(session_uuid, user_id)
.await