diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 88848dee..f4b90e72 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -3,22 +3,17 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession}; use crate::shared::state::AppState; use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_ws::Message as WsMessage; -use futures::TryFutureExt; use log::{debug, error, info, warn}; use chrono::Utc; use serde_json; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use tokio::sync::mpsc; use crate::kb::embeddings::generate_embeddings; use uuid::Uuid; - use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint}; -use crate::context::langcache::{get_langcache_client}; - - +use crate::context::langcache::get_langcache_client; use crate::drive_monitor::DriveMonitor; - use tokio::sync::Mutex as AsyncMutex; pub struct BotOrchestrator { @@ -27,7 +22,6 @@ pub struct BotOrchestrator { } impl BotOrchestrator { - /// Creates a new BotOrchestrator instance pub fn new(state: Arc) -> Self { Self { state, @@ -35,10 +29,7 @@ impl BotOrchestrator { } } - /// Mounts all available bots from the database table pub async fn mount_all_bots(&self) -> Result<(), Box> { - info!("Mounting all available bots from database"); - use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -53,54 +44,89 @@ impl BotOrchestrator { })?; for bot_guid in active_bots { - if let Err(e) = self.mount_bot(&bot_guid.to_string()).await { - error!("Failed to mount bot {}: {}", bot_guid, e); - // Continue mounting other bots even if one fails - continue; - } + let state_clone = self.state.clone(); + let mounted_bots_clone = self.mounted_bots.clone(); + let bot_guid_str = bot_guid.to_string(); + + tokio::spawn(async move { + if let Err(e) = Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone()).await { + error!("Failed to mount bot {}: {}", bot_guid_str, e); + } + }); } Ok(()) } - /// Creates a new bot with its storage bucket - pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box> { - let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid); - // Generate a new GUID if needed - - // Create bucket in storage - crate::create_bucket::create_bucket(&bucket_name)?; + async fn mount_bot_task( + state: Arc, + mounted_bots: Arc>>>, + bot_guid: String, + ) -> Result<(), Box> { + use diesel::prelude::*; + use crate::shared::models::schema::bots::dsl::*; - // TODO: Add bot to database + let bot_name: String = { + let mut db_conn = state.conn.lock().unwrap(); + bots + .filter(id.eq(Uuid::parse_str(&bot_guid)?)) + .select(name) + .first(&mut *db_conn) + .map_err(|e| { + error!("Failed to query bot name for {}: {}", bot_guid, e); + e + })? + }; + + let bucket_name = format!("{}.gbai", bot_name); + + { + let mounted = mounted_bots.lock().await; + if mounted.contains_key(&bot_guid) { + warn!("Bot {} is already mounted", bot_guid); + return Ok(()); + } + } + + let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name)); + + let _handle = drive_monitor.clone().spawn().await; + + { + let mut mounted = mounted_bots.lock().await; + mounted.insert(bot_guid.clone(), drive_monitor); + } + + info!("Bot {} mounted successfully", bot_guid); + Ok(()) + } + + pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box> { + let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid); + crate::create_bucket::create_bucket(&bucket_name)?; Ok(()) } - /// Mounts a bot by activating its resources (drive monitor, etc) pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box> { - // Remove .gbai suffix if present to normalize bot GUID - let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid); - info!("Mounting bot: {}", bot_guid); - let bot_guid = bot_guid.to_string(); // Ensure we have an owned String + let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid).to_string(); - let config = self.state.config.as_ref().ok_or("AppConfig not initialized")?; - // Use bot_guid directly without appending .gbai since it's now part of the ID use diesel::prelude::*; -use crate::shared::models::schema::bots::dsl::*; + use crate::shared::models::schema::bots::dsl::*; -let mut db_conn = self.state.conn.lock().unwrap(); -let bot_name: String = bots - .filter(id.eq(Uuid::parse_str(&bot_guid)?)) - .select(name) - .first(&mut *db_conn) - .map_err(|e| { - error!("Failed to query bot name for {}: {}", bot_guid, e); - e - })?; + let bot_name: String = { + let mut db_conn = self.state.conn.lock().unwrap(); + bots + .filter(id.eq(Uuid::parse_str(&bot_guid)?)) + .select(name) + .first(&mut *db_conn) + .map_err(|e| { + error!("Failed to query bot name for {}: {}", bot_guid, e); + e + })? + }; + + let bucket_name = format!("{}.gbai", bot_name); -let bucket_name = format!("{}.gbai", bot_name); - - - // Check if bot is already mounted { let mounted_bots = self.mounted_bots.lock().await; if mounted_bots.contains_key(&bot_guid) { @@ -109,26 +135,15 @@ let bucket_name = format!("{}.gbai", bot_name); } } - // Initialize and spawn drive monitor asynchronously let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name)); - let drive_monitor_clone = drive_monitor.clone(); - // Clone bot_guid to avoid moving it into the async block - let bot_guid_clone = bot_guid.clone(); - tokio::spawn(async move { - if let Err(e) = drive_monitor_clone.spawn().await { - error!("Failed to spawn drive monitor for bot {}: {}", bot_guid_clone, e); - } - }); + + let _handle = drive_monitor.clone().spawn().await; - // Track mounted bot - let guid = bot_guid.clone(); - let drive_monitor_clone = drive_monitor.clone(); { let mut mounted_bots = self.mounted_bots.lock().await; - mounted_bots.insert(guid, drive_monitor_clone); + mounted_bots.insert(bot_guid.clone(), drive_monitor); } - info!("Successfully mounted bot: {}", bot_guid); Ok(()) } @@ -137,10 +152,7 @@ let bucket_name = format!("{}.gbai", bot_name); session_id: Uuid, user_input: &str, ) -> Result, Box> { - info!( - "Handling user input for session {}: '{}'", - session_id, user_input - ); + info!("Handling user input for session {}: '{}'", session_id, user_input); let mut session_manager = self.state.session_manager.lock().await; session_manager.provide_input(session_id, user_input.to_string())?; Ok(None) @@ -181,10 +193,7 @@ let bucket_name = format!("{}.gbai", bot_name); bot_id: &str, mode: i32, ) -> Result<(), Box> { - info!( - "Setting answer mode for user {} with bot {} to mode {}", - user_id, bot_id, mode - ); + info!("Setting answer mode for user {} with bot {} to mode {}", user_id, bot_id, mode); let mut session_manager = self.state.session_manager.lock().await; session_manager.update_answer_mode(user_id, bot_id, mode)?; Ok(()) @@ -199,10 +208,7 @@ let bucket_name = format!("{}.gbai", bot_name); event_type: &str, data: serde_json::Value, ) -> Result<(), Box> { - info!( - "Sending event '{}' to session {} on channel {}", - event_type, session_id, channel - ); + info!("Sending event '{}' to session {} on channel {}", event_type, session_id, channel); let event_response = BotResponse { bot_id: bot_id.to_string(), user_id: user_id.to_string(), @@ -220,8 +226,9 @@ let bucket_name = format!("{}.gbai", bot_name); if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { adapter.send_message(event_response).await?; } else { - warn!("No channel adapter found for channel 1: {}", channel); + warn!("No channel adapter found for channel: {}", channel); } + Ok(()) } @@ -231,10 +238,7 @@ let bucket_name = format!("{}.gbai", bot_name); channel: &str, content: &str, ) -> Result<(), Box> { - info!( - "Sending direct message to session {}: '{}'", - session_id, content - ); + info!("Sending direct message to session {}: '{}'", session_id, content); let bot_response = BotResponse { bot_id: "default_bot".to_string(), user_id: "default_user".to_string(), @@ -249,8 +253,9 @@ let bucket_name = format!("{}.gbai", bot_name); if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { adapter.send_message(bot_response).await?; } else { - warn!("No channel adapter found for channel 2: {}", channel); + warn!("No channel adapter found for direct message on channel: {}", channel); } + Ok(()) } @@ -258,53 +263,35 @@ let bucket_name = format!("{}.gbai", bot_name); &self, message: UserMessage, ) -> Result<(), Box> { - info!( - "Processing message from channel: {}, user: {}, session: {}", - message.channel, message.user_id, message.session_id - ); - debug!( - "Message content: '{}', type: {}", - message.content, message.message_type - ); + info!("Processing message from channel: {}, user: {}, session: {}", message.channel, message.user_id, message.session_id); + debug!("Message content: '{}', type: {}", message.content, message.message_type); let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { error!("Invalid user ID provided: {}", e); e })?; - let bot_id = Uuid::nil(); // Using nil UUID for default bot - // Default to announcements bot - + let bot_id = Uuid::nil(); let session = { let mut sm = self.state.session_manager.lock().await; let session_id = Uuid::parse_str(&message.session_id).map_err(|e| { error!("Invalid session ID: {}", e); e })?; + match sm.get_session_by_id(session_id)? { Some(session) => session, None => { - error!( - "Failed to create session for user {} with bot {}", - user_id, bot_id - ); + error!("Failed to create session for user {} with bot {}", user_id, bot_id); return Err("Failed to create session".into()); } } }; if self.is_waiting_for_input(session.id).await { - debug!( - "Session {} is waiting for input, processing as variable input", - session.id - ); - if let Some(variable_name) = - self.handle_user_input(session.id, &message.content).await? - { - debug!( - "Stored user input in variable '{}' for session {}", - variable_name, session.id - ); + debug!("Session {} is waiting for input, processing as variable input", session.id); + if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? { + info!("Stored user input in variable '{}' for session {}", variable_name, session.id); if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { let ack_response = BotResponse { bot_id: message.bot_id.clone(), @@ -363,10 +350,7 @@ let bucket_name = format!("{}.gbai", bot_name); if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { adapter.send_message(bot_response).await?; } else { - warn!( - "No channel adapter found for channel 3: {}", - message.channel - ); + warn!("No channel adapter found for message channel: {}", message.channel); } Ok(()) @@ -398,7 +382,6 @@ let bucket_name = format!("{}.gbai", bot_name); session_manager.get_conversation_history(session.id, session.user_id)? }; - // Prompt compactor: keep only last 10 entries let recent_history = if history.len() > 10 { &history[history.len() - 10..] } else { @@ -408,31 +391,23 @@ let bucket_name = format!("{}.gbai", bot_name); for (role, content) in recent_history { prompt.push_str(&format!("{}: {}\n", role, content)); } + prompt.push_str(&format!("User: {}\nAssistant:", message.content)); - // Determine which cache backend to use let use_langcache = std::env::var("LLM_CACHE") .unwrap_or_else(|_| "false".to_string()) .eq_ignore_ascii_case("true"); if use_langcache { - // Ensure LangCache collection exists ensure_collection_exists(&self.state, "semantic_cache").await?; - - // Get LangCache client let langcache_client = get_langcache_client()?; - - // Isolate the user question (ignore conversation history) let isolated_question = message.content.trim().to_string(); - - // Generate embedding for the isolated question let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?; let question_embedding = question_embeddings .get(0) .ok_or_else(|| "Failed to generate embedding for question")? .clone(); - // Search for similar question in LangCache let search_results = langcache_client .search("semantic_cache", question_embedding.clone(), 1) .await?; @@ -444,13 +419,11 @@ let bucket_name = format!("{}.gbai", bot_name); } } - // Generate response via LLM provider using full prompt (including history) let response = self.state .llm_provider .generate(&prompt, &serde_json::Value::Null) .await?; - // Store isolated question and response in LangCache let point = QdrantPoint { id: uuid::Uuid::new_v4().to_string(), vector: question_embedding, @@ -460,26 +433,21 @@ let bucket_name = format!("{}.gbai", bot_name); "response": response }), }; + langcache_client .upsert_points("semantic_cache", vec![point]) .await?; Ok(response) } else { - // Ensure semantic cache collection exists ensure_collection_exists(&self.state, "semantic_cache").await?; - - // Get Qdrant client let qdrant_client = get_qdrant_client(&self.state)?; - - // Generate embedding for the prompt let embeddings = generate_embeddings(vec![prompt.clone()]).await?; let embedding = embeddings .get(0) .ok_or_else(|| "Failed to generate embedding")? .clone(); - // Search for similar prompt in Qdrant let search_results = qdrant_client .search("semantic_cache", embedding.clone(), 1) .await?; @@ -492,13 +460,11 @@ let bucket_name = format!("{}.gbai", bot_name); } } - // Generate response via LLM provider let response = self.state .llm_provider .generate(&prompt, &serde_json::Value::Null) .await?; - // Store prompt and response in Qdrant let point = QdrantPoint { id: uuid::Uuid::new_v4().to_string(), vector: embedding, @@ -507,14 +473,13 @@ let bucket_name = format!("{}.gbai", bot_name); "response": response }), }; + qdrant_client .upsert_points("semantic_cache", vec![point]) .await?; Ok(response) } - - } pub async fn stream_response( @@ -522,10 +487,7 @@ let bucket_name = format!("{}.gbai", bot_name); message: UserMessage, response_tx: mpsc::Sender, ) -> Result<(), Box> { - info!( - "Streaming response for user: {}, session: {}", - message.user_id, message.session_id - ); + info!("Streaming response for user: {}, session: {}", message.user_id, message.session_id); let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { error!("Invalid user ID: {}", e); @@ -548,6 +510,7 @@ let bucket_name = format!("{}.gbai", bot_name); error!("Invalid session ID: {}", e); e })?; + match sm.get_session_by_id(session_id)? { Some(sess) => sess, None => { @@ -600,12 +563,9 @@ let bucket_name = format!("{}.gbai", bot_name); for (role, content) in &history { p.push_str(&format!("{}: {}\n", role, content)); } - p.push_str(&format!("User: {}\nAssistant:", message.content)); - debug!( - "Stream prompt constructed with {} history entries", - history.len() - ); + p.push_str(&format!("User: {}\nAssistant:", message.content)); + info!("Stream prompt constructed with {} history entries", history.len()); p }; @@ -656,22 +616,20 @@ let bucket_name = format!("{}.gbai", bot_name); if !first_word_received && !chunk.trim().is_empty() { first_word_received = true; - debug!("First word received in stream: '{}'", chunk); } analysis_buffer.push_str(&chunk); + if analysis_buffer.contains("**") && !in_analysis { in_analysis = true; } if in_analysis { if analysis_buffer.ends_with("final") { - debug!( - "Analysis section completed, buffer length: {}", - analysis_buffer.len() - ); + info!("Analysis section completed, buffer length: {}", analysis_buffer.len()); in_analysis = false; analysis_buffer.clear(); + if message.channel == "web" { let orchestrator = BotOrchestrator::new(Arc::clone(&self.state)); orchestrator @@ -695,6 +653,7 @@ let bucket_name = format!("{}.gbai", bot_name); } full_response.push_str(&chunk); + let partial = BotResponse { bot_id: message.bot_id.clone(), user_id: message.user_id.clone(), @@ -712,10 +671,7 @@ let bucket_name = format!("{}.gbai", bot_name); } } - debug!( - "Stream processing completed, {} chunks processed", - chunk_count - ); + info!("Stream processing completed, {} chunks processed", chunk_count); { let mut sm = self.state.session_manager.lock().await; @@ -732,8 +688,8 @@ let bucket_name = format!("{}.gbai", bot_name); stream_token: None, is_complete: true, }; - response_tx.send(final_msg).await?; + response_tx.send(final_msg).await?; Ok(()) } @@ -751,10 +707,7 @@ let bucket_name = format!("{}.gbai", bot_name); session_id: Uuid, user_id: Uuid, ) -> Result, Box> { - info!( - "Getting conversation history for session {} user {}", - session_id, user_id - ); + info!("Getting conversation history for session {} user {}", session_id, user_id); let mut session_manager = self.state.session_manager.lock().await; let history = session_manager.get_conversation_history(session_id, user_id)?; Ok(history) @@ -765,12 +718,11 @@ let bucket_name = format!("{}.gbai", bot_name); state: Arc, token: Option, ) -> Result> { - info!( - "Running start script for session: {} with token: {:?}", - session.id, token - ); - let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot")); + info!("Running start script for session: {} with token: {:?}", session.id, token); + + let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot")); let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid); + let start_script = match std::fs::read_to_string(&start_script_path) { Ok(content) => content, Err(_) => { @@ -778,10 +730,8 @@ let bucket_name = format!("{}.gbai", bot_name); return Ok(true); } }; - debug!( - "Start script content for session {}: {}", - session.id, start_script - ); + + info!("Start script content for session {}: {}", session.id, start_script); let session_clone = session.clone(); let state_clone = state.clone(); @@ -794,17 +744,11 @@ let bucket_name = format!("{}.gbai", bot_name); .and_then(|ast| script_service.run(&ast)) { Ok(result) => { - info!( - "Start script executed successfully for session {}, result: {}", - session_clone.id, result - ); + info!("Start script executed successfully for session {}, result: {}", session_clone.id, result); Ok(true) } Err(e) => { - error!( - "Failed to run start script for session {}: {}", - session_clone.id, e - ); + error!("Failed to run start script for session {}: {}", session_clone.id, e); Ok(false) } } @@ -816,10 +760,8 @@ let bucket_name = format!("{}.gbai", bot_name); channel: &str, message: &str, ) -> Result<(), Box> { - warn!( - "Sending warning to session {} on channel {}: {}", - session_id, channel, message - ); + warn!("Sending warning to session {} on channel {}: {}", session_id, channel, message); + if channel == "web" { self.send_event( "system", @@ -847,10 +789,7 @@ let bucket_name = format!("{}.gbai", bot_name); }; adapter.send_message(warn_response).await } else { - warn!( - "No channel adapter found for warning on channel: {}", - channel - ); + warn!("No channel adapter found for warning on channel: {}", channel); Ok(()) } } @@ -863,10 +802,8 @@ let bucket_name = format!("{}.gbai", bot_name); _bot_id: &str, token: Option, ) -> Result> { - info!( - "Triggering auto welcome for user: {}, session: {}, token: {:?}", - user_id, session_id, token - ); + info!("Triggering auto welcome for user: {}, session: {}, token: {:?}", user_id, session_id, token); + let session_uuid = Uuid::parse_str(session_id).map_err(|e| { error!("Invalid session ID: {}", e); e @@ -884,24 +821,9 @@ let bucket_name = format!("{}.gbai", bot_name); }; let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?; - info!( - "Auto welcome completed for session: {} with result: {}", - session_id, result - ); + info!("Auto welcome completed for session: {} with result: {}", session_id, result); Ok(result) } - - async fn get_web_response_channel( - &self, - session_id: &str, - ) -> Result, Box> { - let response_channels = self.state.response_channels.lock().await; - if let Some(tx) = response_channels.get(session_id) { - Ok(tx.clone()) - } else { - Err("No response channel found for session".into()) - } - } } impl Default for BotOrchestrator { @@ -927,7 +849,6 @@ async fn websocket_handler( .unwrap_or_else(|| Uuid::new_v4().to_string()) .replace("undefined", &Uuid::new_v4().to_string()); - // Ensure user exists in database before proceeding let user_id = { let user_uuid = Uuid::parse_str(&user_id_string).unwrap_or_else(|_| Uuid::new_v4()); let mut sm = data.session_manager.lock().await; @@ -942,8 +863,8 @@ async fn websocket_handler( let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; let (tx, mut rx) = mpsc::channel::(100); - let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + let orchestrator = BotOrchestrator::new(Arc::clone(&data)); orchestrator .register_response_channel(session_id.clone(), tx.clone()) .await; @@ -956,7 +877,6 @@ async fn websocket_handler( .add_connection(session_id.clone(), tx.clone()) .await; - // Get first available bot from database let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -998,16 +918,13 @@ async fn websocket_handler( .await .ok(); - info!( - "WebSocket connection established for session: {}, user: {}", - session_id, user_id - ); + info!("WebSocket connection established for session: {}, user: {}", session_id, user_id); - // Trigger auto welcome (start.bas) let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data)); let user_id_welcome = user_id.clone(); let session_id_welcome = session_id.clone(); let bot_id_welcome = bot_id.clone(); + actix_web::rt::spawn(async move { if let Err(e) = orchestrator_clone .trigger_auto_welcome(&session_id_welcome, &user_id_welcome, &bot_id_welcome, None) @@ -1023,10 +940,7 @@ async fn websocket_handler( let user_id_clone = user_id.clone(); actix_web::rt::spawn(async move { - info!( - "Starting WebSocket sender for session {}", - session_id_clone1 - ); + info!("Starting WebSocket sender for session {}", session_id_clone1); let mut message_count = 0; while let Some(msg) = rx.recv().await { message_count += 1; @@ -1037,23 +951,17 @@ async fn websocket_handler( } } } - info!( - "WebSocket sender terminated for session {}, sent {} messages", - session_id_clone1, message_count - ); + info!("WebSocket sender terminated for session {}, sent {} messages", session_id_clone1, message_count); }); actix_web::rt::spawn(async move { - info!( - "Starting WebSocket receiver for session {}", - session_id_clone2 - ); + info!("Starting WebSocket receiver for session {}", session_id_clone2); let mut message_count = 0; while let Some(Ok(msg)) = msg_stream.recv().await { match msg { WsMessage::Text(text) => { message_count += 1; - // Get first available bot from database + let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -1077,42 +985,35 @@ async fn websocket_handler( } }; - // Parse the text as JSON to extract the content field let json_value: serde_json::Value = match serde_json::from_str(&text) { Ok(value) => value, Err(e) => { error!("Error parsing JSON message {}: {}", message_count, e); - continue; // Skip processing this message + continue; } }; - // Extract content from JSON, fallback to original text if content field doesn't exist let content = json_value["content"] .as_str() .map(|s| s.to_string()) .unwrap(); let user_message = UserMessage { - bot_id: bot_id, + bot_id, user_id: user_id_clone.clone(), session_id: session_id_clone2.clone(), channel: "web".to_string(), - content: content, + content, message_type: 1, media_url: None, timestamp: Utc::now(), }; if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { - error!( - "Error processing WebSocket message {}: {}", - message_count, e - ); + error!("Error processing WebSocket message {}: {}", message_count, e); } } - WsMessage::Close(_) => { - // Get first available bot from database let bot_id = { use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; @@ -1135,6 +1036,7 @@ async fn websocket_handler( } } }; + orchestrator .send_event( &user_id_clone, @@ -1146,6 +1048,7 @@ async fn websocket_handler( ) .await .ok(); + web_adapter.remove_connection(&session_id_clone2).await; orchestrator .unregister_response_channel(&session_id_clone2) @@ -1155,16 +1058,10 @@ async fn websocket_handler( _ => {} } } - info!( - "WebSocket receiver terminated for session {}, processed {} messages", - session_id_clone2, message_count - ); + info!("WebSocket receiver terminated for session {}, processed {} messages", session_id_clone2, message_count); }); - info!( - "WebSocket handler setup completed for session {}", - session_id - ); + info!("WebSocket handler setup completed for session {}", session_id); Ok(res) } @@ -1177,6 +1074,7 @@ async fn start_session( .get("session_id") .and_then(|s| s.as_str()) .unwrap_or(""); + let token = info .get("token") .and_then(|t| t.as_str()) @@ -1210,12 +1108,10 @@ async fn start_session( }; let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token).await; + match result { Ok(true) => { - info!( - "Start script completed successfully for session: {}", - session_id - ); + info!("Start script completed successfully for session: {}", session_id); Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "started", "session_id": session.id, @@ -1231,10 +1127,7 @@ async fn start_session( }))) } Err(e) => { - error!( - "Error running start script for session {}: {}", - session_id, e - ); + error!("Error running start script for session {}: {}", session_id, e); Ok(HttpResponse::InternalServerError() .json(serde_json::json!({"error": e.to_string()}))) } @@ -1249,14 +1142,12 @@ async fn send_warning_handler( let default_session = "default".to_string(); let default_channel = "web".to_string(); let default_message = "Warning!".to_string(); + let session_id = info.get("session_id").unwrap_or(&default_session); let channel = info.get("channel").unwrap_or(&default_channel); let message = info.get("message").unwrap_or(&default_message); - info!( - "Sending warning via API - session: {}, channel: {}", - session_id, channel - ); + info!("Sending warning via API - session: {}, channel: {}", session_id, channel); let orchestrator = BotOrchestrator::new(Arc::clone(&data)); if let Err(e) = orchestrator