Refactor BotOrchestrator for async bot mounting
* Introduced async mounting
This commit is contained in:
parent
f85b19efda
commit
4f12608668
1 changed files with 140 additions and 249 deletions
389
src/bot/mod.rs
389
src/bot/mod.rs
|
|
@ -3,22 +3,17 @@ use crate::shared::models::{BotResponse, UserMessage, UserSession};
|
||||||
use crate::shared::state::AppState;
|
use crate::shared::state::AppState;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse, Result};
|
use actix_web::{web, HttpRequest, HttpResponse, Result};
|
||||||
use actix_ws::Message as WsMessage;
|
use actix_ws::Message as WsMessage;
|
||||||
use futures::TryFutureExt;
|
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use crate::kb::embeddings::generate_embeddings;
|
use crate::kb::embeddings::generate_embeddings;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint};
|
use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint};
|
||||||
use crate::context::langcache::{get_langcache_client};
|
use crate::context::langcache::get_langcache_client;
|
||||||
|
|
||||||
|
|
||||||
use crate::drive_monitor::DriveMonitor;
|
use crate::drive_monitor::DriveMonitor;
|
||||||
|
|
||||||
use tokio::sync::Mutex as AsyncMutex;
|
use tokio::sync::Mutex as AsyncMutex;
|
||||||
|
|
||||||
pub struct BotOrchestrator {
|
pub struct BotOrchestrator {
|
||||||
|
|
@ -27,7 +22,6 @@ pub struct BotOrchestrator {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BotOrchestrator {
|
impl BotOrchestrator {
|
||||||
/// Creates a new BotOrchestrator instance
|
|
||||||
pub fn new(state: Arc<AppState>) -> Self {
|
pub fn new(state: Arc<AppState>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
state,
|
state,
|
||||||
|
|
@ -35,10 +29,7 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mounts all available bots from the database table
|
|
||||||
pub async fn mount_all_bots(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn mount_all_bots(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Mounting all available bots from database");
|
|
||||||
|
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
||||||
|
|
@ -53,54 +44,89 @@ impl BotOrchestrator {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
for bot_guid in active_bots {
|
for bot_guid in active_bots {
|
||||||
if let Err(e) = self.mount_bot(&bot_guid.to_string()).await {
|
let state_clone = self.state.clone();
|
||||||
error!("Failed to mount bot {}: {}", bot_guid, e);
|
let mounted_bots_clone = self.mounted_bots.clone();
|
||||||
// Continue mounting other bots even if one fails
|
let bot_guid_str = bot_guid.to_string();
|
||||||
continue;
|
|
||||||
}
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone()).await {
|
||||||
|
error!("Failed to mount bot {}: {}", bot_guid_str, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new bot with its storage bucket
|
async fn mount_bot_task(
|
||||||
pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
state: Arc<AppState>,
|
||||||
let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid);
|
mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
|
||||||
// Generate a new GUID if needed
|
bot_guid: String,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// Create bucket in storage
|
use diesel::prelude::*;
|
||||||
crate::create_bucket::create_bucket(&bucket_name)?;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
|
|
||||||
// TODO: Add bot to database
|
let bot_name: String = {
|
||||||
|
let mut db_conn = state.conn.lock().unwrap();
|
||||||
|
bots
|
||||||
|
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
||||||
|
.select(name)
|
||||||
|
.first(&mut *db_conn)
|
||||||
|
.map_err(|e| {
|
||||||
|
error!("Failed to query bot name for {}: {}", bot_guid, e);
|
||||||
|
e
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let bucket_name = format!("{}.gbai", bot_name);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mounted = mounted_bots.lock().await;
|
||||||
|
if mounted.contains_key(&bot_guid) {
|
||||||
|
warn!("Bot {} is already mounted", bot_guid);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name));
|
||||||
|
|
||||||
|
let _handle = drive_monitor.clone().spawn().await;
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut mounted = mounted_bots.lock().await;
|
||||||
|
mounted.insert(bot_guid.clone(), drive_monitor);
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Bot {} mounted successfully", bot_guid);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid);
|
||||||
|
crate::create_bucket::create_bucket(&bucket_name)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mounts a bot by activating its resources (drive monitor, etc)
|
|
||||||
pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// Remove .gbai suffix if present to normalize bot GUID
|
let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid).to_string();
|
||||||
let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid);
|
|
||||||
info!("Mounting bot: {}", bot_guid);
|
|
||||||
let bot_guid = bot_guid.to_string(); // Ensure we have an owned String
|
|
||||||
|
|
||||||
let config = self.state.config.as_ref().ok_or("AppConfig not initialized")?;
|
|
||||||
// Use bot_guid directly without appending .gbai since it's now part of the ID
|
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
|
|
||||||
let mut db_conn = self.state.conn.lock().unwrap();
|
let bot_name: String = {
|
||||||
let bot_name: String = bots
|
let mut db_conn = self.state.conn.lock().unwrap();
|
||||||
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
bots
|
||||||
.select(name)
|
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
||||||
.first(&mut *db_conn)
|
.select(name)
|
||||||
.map_err(|e| {
|
.first(&mut *db_conn)
|
||||||
error!("Failed to query bot name for {}: {}", bot_guid, e);
|
.map_err(|e| {
|
||||||
e
|
error!("Failed to query bot name for {}: {}", bot_guid, e);
|
||||||
})?;
|
e
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let bucket_name = format!("{}.gbai", bot_name);
|
||||||
|
|
||||||
let bucket_name = format!("{}.gbai", bot_name);
|
|
||||||
|
|
||||||
|
|
||||||
// Check if bot is already mounted
|
|
||||||
{
|
{
|
||||||
let mounted_bots = self.mounted_bots.lock().await;
|
let mounted_bots = self.mounted_bots.lock().await;
|
||||||
if mounted_bots.contains_key(&bot_guid) {
|
if mounted_bots.contains_key(&bot_guid) {
|
||||||
|
|
@ -109,26 +135,15 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize and spawn drive monitor asynchronously
|
|
||||||
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name));
|
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name));
|
||||||
let drive_monitor_clone = drive_monitor.clone();
|
|
||||||
// Clone bot_guid to avoid moving it into the async block
|
let _handle = drive_monitor.clone().spawn().await;
|
||||||
let bot_guid_clone = bot_guid.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = drive_monitor_clone.spawn().await {
|
|
||||||
error!("Failed to spawn drive monitor for bot {}: {}", bot_guid_clone, e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Track mounted bot
|
|
||||||
let guid = bot_guid.clone();
|
|
||||||
let drive_monitor_clone = drive_monitor.clone();
|
|
||||||
{
|
{
|
||||||
let mut mounted_bots = self.mounted_bots.lock().await;
|
let mut mounted_bots = self.mounted_bots.lock().await;
|
||||||
mounted_bots.insert(guid, drive_monitor_clone);
|
mounted_bots.insert(bot_guid.clone(), drive_monitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Successfully mounted bot: {}", bot_guid);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -137,10 +152,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
session_id: Uuid,
|
session_id: Uuid,
|
||||||
user_input: &str,
|
user_input: &str,
|
||||||
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Handling user input for session {}: '{}'", session_id, user_input);
|
||||||
"Handling user input for session {}: '{}'",
|
|
||||||
session_id, user_input
|
|
||||||
);
|
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
session_manager.provide_input(session_id, user_input.to_string())?;
|
session_manager.provide_input(session_id, user_input.to_string())?;
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
|
@ -181,10 +193,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
bot_id: &str,
|
bot_id: &str,
|
||||||
mode: i32,
|
mode: i32,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Setting answer mode for user {} with bot {} to mode {}", user_id, bot_id, mode);
|
||||||
"Setting answer mode for user {} with bot {} to mode {}",
|
|
||||||
user_id, bot_id, mode
|
|
||||||
);
|
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
session_manager.update_answer_mode(user_id, bot_id, mode)?;
|
session_manager.update_answer_mode(user_id, bot_id, mode)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -199,10 +208,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
event_type: &str,
|
event_type: &str,
|
||||||
data: serde_json::Value,
|
data: serde_json::Value,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Sending event '{}' to session {} on channel {}", event_type, session_id, channel);
|
||||||
"Sending event '{}' to session {} on channel {}",
|
|
||||||
event_type, session_id, channel
|
|
||||||
);
|
|
||||||
let event_response = BotResponse {
|
let event_response = BotResponse {
|
||||||
bot_id: bot_id.to_string(),
|
bot_id: bot_id.to_string(),
|
||||||
user_id: user_id.to_string(),
|
user_id: user_id.to_string(),
|
||||||
|
|
@ -220,8 +226,9 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
||||||
adapter.send_message(event_response).await?;
|
adapter.send_message(event_response).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!("No channel adapter found for channel 1: {}", channel);
|
warn!("No channel adapter found for channel: {}", channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -231,10 +238,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
channel: &str,
|
channel: &str,
|
||||||
content: &str,
|
content: &str,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Sending direct message to session {}: '{}'", session_id, content);
|
||||||
"Sending direct message to session {}: '{}'",
|
|
||||||
session_id, content
|
|
||||||
);
|
|
||||||
let bot_response = BotResponse {
|
let bot_response = BotResponse {
|
||||||
bot_id: "default_bot".to_string(),
|
bot_id: "default_bot".to_string(),
|
||||||
user_id: "default_user".to_string(),
|
user_id: "default_user".to_string(),
|
||||||
|
|
@ -249,8 +253,9 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
||||||
adapter.send_message(bot_response).await?;
|
adapter.send_message(bot_response).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!("No channel adapter found for channel 2: {}", channel);
|
warn!("No channel adapter found for direct message on channel: {}", channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -258,53 +263,35 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
&self,
|
&self,
|
||||||
message: UserMessage,
|
message: UserMessage,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Processing message from channel: {}, user: {}, session: {}", message.channel, message.user_id, message.session_id);
|
||||||
"Processing message from channel: {}, user: {}, session: {}",
|
debug!("Message content: '{}', type: {}", message.content, message.message_type);
|
||||||
message.channel, message.user_id, message.session_id
|
|
||||||
);
|
|
||||||
debug!(
|
|
||||||
"Message content: '{}', type: {}",
|
|
||||||
message.content, message.message_type
|
|
||||||
);
|
|
||||||
|
|
||||||
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
||||||
error!("Invalid user ID provided: {}", e);
|
error!("Invalid user ID provided: {}", e);
|
||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let bot_id = Uuid::nil(); // Using nil UUID for default bot
|
let bot_id = Uuid::nil();
|
||||||
// Default to announcements bot
|
|
||||||
|
|
||||||
let session = {
|
let session = {
|
||||||
let mut sm = self.state.session_manager.lock().await;
|
let mut sm = self.state.session_manager.lock().await;
|
||||||
let session_id = Uuid::parse_str(&message.session_id).map_err(|e| {
|
let session_id = Uuid::parse_str(&message.session_id).map_err(|e| {
|
||||||
error!("Invalid session ID: {}", e);
|
error!("Invalid session ID: {}", e);
|
||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
match sm.get_session_by_id(session_id)? {
|
match sm.get_session_by_id(session_id)? {
|
||||||
Some(session) => session,
|
Some(session) => session,
|
||||||
None => {
|
None => {
|
||||||
error!(
|
error!("Failed to create session for user {} with bot {}", user_id, bot_id);
|
||||||
"Failed to create session for user {} with bot {}",
|
|
||||||
user_id, bot_id
|
|
||||||
);
|
|
||||||
return Err("Failed to create session".into());
|
return Err("Failed to create session".into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if self.is_waiting_for_input(session.id).await {
|
if self.is_waiting_for_input(session.id).await {
|
||||||
debug!(
|
debug!("Session {} is waiting for input, processing as variable input", session.id);
|
||||||
"Session {} is waiting for input, processing as variable input",
|
if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? {
|
||||||
session.id
|
info!("Stored user input in variable '{}' for session {}", variable_name, session.id);
|
||||||
);
|
|
||||||
if let Some(variable_name) =
|
|
||||||
self.handle_user_input(session.id, &message.content).await?
|
|
||||||
{
|
|
||||||
debug!(
|
|
||||||
"Stored user input in variable '{}' for session {}",
|
|
||||||
variable_name, session.id
|
|
||||||
);
|
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
||||||
let ack_response = BotResponse {
|
let ack_response = BotResponse {
|
||||||
bot_id: message.bot_id.clone(),
|
bot_id: message.bot_id.clone(),
|
||||||
|
|
@ -363,10 +350,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
||||||
adapter.send_message(bot_response).await?;
|
adapter.send_message(bot_response).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!("No channel adapter found for message channel: {}", message.channel);
|
||||||
"No channel adapter found for channel 3: {}",
|
|
||||||
message.channel
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -398,7 +382,6 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
session_manager.get_conversation_history(session.id, session.user_id)?
|
session_manager.get_conversation_history(session.id, session.user_id)?
|
||||||
};
|
};
|
||||||
|
|
||||||
// Prompt compactor: keep only last 10 entries
|
|
||||||
let recent_history = if history.len() > 10 {
|
let recent_history = if history.len() > 10 {
|
||||||
&history[history.len() - 10..]
|
&history[history.len() - 10..]
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -408,31 +391,23 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
for (role, content) in recent_history {
|
for (role, content) in recent_history {
|
||||||
prompt.push_str(&format!("{}: {}\n", role, content));
|
prompt.push_str(&format!("{}: {}\n", role, content));
|
||||||
}
|
}
|
||||||
|
|
||||||
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
|
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
|
||||||
|
|
||||||
// Determine which cache backend to use
|
|
||||||
let use_langcache = std::env::var("LLM_CACHE")
|
let use_langcache = std::env::var("LLM_CACHE")
|
||||||
.unwrap_or_else(|_| "false".to_string())
|
.unwrap_or_else(|_| "false".to_string())
|
||||||
.eq_ignore_ascii_case("true");
|
.eq_ignore_ascii_case("true");
|
||||||
|
|
||||||
if use_langcache {
|
if use_langcache {
|
||||||
// Ensure LangCache collection exists
|
|
||||||
ensure_collection_exists(&self.state, "semantic_cache").await?;
|
ensure_collection_exists(&self.state, "semantic_cache").await?;
|
||||||
|
|
||||||
// Get LangCache client
|
|
||||||
let langcache_client = get_langcache_client()?;
|
let langcache_client = get_langcache_client()?;
|
||||||
|
|
||||||
// Isolate the user question (ignore conversation history)
|
|
||||||
let isolated_question = message.content.trim().to_string();
|
let isolated_question = message.content.trim().to_string();
|
||||||
|
|
||||||
// Generate embedding for the isolated question
|
|
||||||
let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?;
|
let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?;
|
||||||
let question_embedding = question_embeddings
|
let question_embedding = question_embeddings
|
||||||
.get(0)
|
.get(0)
|
||||||
.ok_or_else(|| "Failed to generate embedding for question")?
|
.ok_or_else(|| "Failed to generate embedding for question")?
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
// Search for similar question in LangCache
|
|
||||||
let search_results = langcache_client
|
let search_results = langcache_client
|
||||||
.search("semantic_cache", question_embedding.clone(), 1)
|
.search("semantic_cache", question_embedding.clone(), 1)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
@ -444,13 +419,11 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate response via LLM provider using full prompt (including history)
|
|
||||||
let response = self.state
|
let response = self.state
|
||||||
.llm_provider
|
.llm_provider
|
||||||
.generate(&prompt, &serde_json::Value::Null)
|
.generate(&prompt, &serde_json::Value::Null)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Store isolated question and response in LangCache
|
|
||||||
let point = QdrantPoint {
|
let point = QdrantPoint {
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
vector: question_embedding,
|
vector: question_embedding,
|
||||||
|
|
@ -460,26 +433,21 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
"response": response
|
"response": response
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
langcache_client
|
langcache_client
|
||||||
.upsert_points("semantic_cache", vec![point])
|
.upsert_points("semantic_cache", vec![point])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
} else {
|
} else {
|
||||||
// Ensure semantic cache collection exists
|
|
||||||
ensure_collection_exists(&self.state, "semantic_cache").await?;
|
ensure_collection_exists(&self.state, "semantic_cache").await?;
|
||||||
|
|
||||||
// Get Qdrant client
|
|
||||||
let qdrant_client = get_qdrant_client(&self.state)?;
|
let qdrant_client = get_qdrant_client(&self.state)?;
|
||||||
|
|
||||||
// Generate embedding for the prompt
|
|
||||||
let embeddings = generate_embeddings(vec![prompt.clone()]).await?;
|
let embeddings = generate_embeddings(vec![prompt.clone()]).await?;
|
||||||
let embedding = embeddings
|
let embedding = embeddings
|
||||||
.get(0)
|
.get(0)
|
||||||
.ok_or_else(|| "Failed to generate embedding")?
|
.ok_or_else(|| "Failed to generate embedding")?
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
// Search for similar prompt in Qdrant
|
|
||||||
let search_results = qdrant_client
|
let search_results = qdrant_client
|
||||||
.search("semantic_cache", embedding.clone(), 1)
|
.search("semantic_cache", embedding.clone(), 1)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
@ -492,13 +460,11 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate response via LLM provider
|
|
||||||
let response = self.state
|
let response = self.state
|
||||||
.llm_provider
|
.llm_provider
|
||||||
.generate(&prompt, &serde_json::Value::Null)
|
.generate(&prompt, &serde_json::Value::Null)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Store prompt and response in Qdrant
|
|
||||||
let point = QdrantPoint {
|
let point = QdrantPoint {
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
vector: embedding,
|
vector: embedding,
|
||||||
|
|
@ -507,14 +473,13 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
"response": response
|
"response": response
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
qdrant_client
|
qdrant_client
|
||||||
.upsert_points("semantic_cache", vec![point])
|
.upsert_points("semantic_cache", vec![point])
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stream_response(
|
pub async fn stream_response(
|
||||||
|
|
@ -522,10 +487,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
message: UserMessage,
|
message: UserMessage,
|
||||||
response_tx: mpsc::Sender<BotResponse>,
|
response_tx: mpsc::Sender<BotResponse>,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Streaming response for user: {}, session: {}", message.user_id, message.session_id);
|
||||||
"Streaming response for user: {}, session: {}",
|
|
||||||
message.user_id, message.session_id
|
|
||||||
);
|
|
||||||
|
|
||||||
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
||||||
error!("Invalid user ID: {}", e);
|
error!("Invalid user ID: {}", e);
|
||||||
|
|
@ -548,6 +510,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
error!("Invalid session ID: {}", e);
|
error!("Invalid session ID: {}", e);
|
||||||
e
|
e
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
match sm.get_session_by_id(session_id)? {
|
match sm.get_session_by_id(session_id)? {
|
||||||
Some(sess) => sess,
|
Some(sess) => sess,
|
||||||
None => {
|
None => {
|
||||||
|
|
@ -600,12 +563,9 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
for (role, content) in &history {
|
for (role, content) in &history {
|
||||||
p.push_str(&format!("{}: {}\n", role, content));
|
p.push_str(&format!("{}: {}\n", role, content));
|
||||||
}
|
}
|
||||||
p.push_str(&format!("User: {}\nAssistant:", message.content));
|
|
||||||
|
|
||||||
debug!(
|
p.push_str(&format!("User: {}\nAssistant:", message.content));
|
||||||
"Stream prompt constructed with {} history entries",
|
info!("Stream prompt constructed with {} history entries", history.len());
|
||||||
history.len()
|
|
||||||
);
|
|
||||||
p
|
p
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -656,22 +616,20 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
|
|
||||||
if !first_word_received && !chunk.trim().is_empty() {
|
if !first_word_received && !chunk.trim().is_empty() {
|
||||||
first_word_received = true;
|
first_word_received = true;
|
||||||
debug!("First word received in stream: '{}'", chunk);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
analysis_buffer.push_str(&chunk);
|
analysis_buffer.push_str(&chunk);
|
||||||
|
|
||||||
if analysis_buffer.contains("**") && !in_analysis {
|
if analysis_buffer.contains("**") && !in_analysis {
|
||||||
in_analysis = true;
|
in_analysis = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if in_analysis {
|
if in_analysis {
|
||||||
if analysis_buffer.ends_with("final") {
|
if analysis_buffer.ends_with("final") {
|
||||||
debug!(
|
info!("Analysis section completed, buffer length: {}", analysis_buffer.len());
|
||||||
"Analysis section completed, buffer length: {}",
|
|
||||||
analysis_buffer.len()
|
|
||||||
);
|
|
||||||
in_analysis = false;
|
in_analysis = false;
|
||||||
analysis_buffer.clear();
|
analysis_buffer.clear();
|
||||||
|
|
||||||
if message.channel == "web" {
|
if message.channel == "web" {
|
||||||
let orchestrator = BotOrchestrator::new(Arc::clone(&self.state));
|
let orchestrator = BotOrchestrator::new(Arc::clone(&self.state));
|
||||||
orchestrator
|
orchestrator
|
||||||
|
|
@ -695,6 +653,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
full_response.push_str(&chunk);
|
full_response.push_str(&chunk);
|
||||||
|
|
||||||
let partial = BotResponse {
|
let partial = BotResponse {
|
||||||
bot_id: message.bot_id.clone(),
|
bot_id: message.bot_id.clone(),
|
||||||
user_id: message.user_id.clone(),
|
user_id: message.user_id.clone(),
|
||||||
|
|
@ -712,10 +671,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
info!("Stream processing completed, {} chunks processed", chunk_count);
|
||||||
"Stream processing completed, {} chunks processed",
|
|
||||||
chunk_count
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut sm = self.state.session_manager.lock().await;
|
let mut sm = self.state.session_manager.lock().await;
|
||||||
|
|
@ -732,8 +688,8 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
stream_token: None,
|
stream_token: None,
|
||||||
is_complete: true,
|
is_complete: true,
|
||||||
};
|
};
|
||||||
response_tx.send(final_msg).await?;
|
|
||||||
|
|
||||||
|
response_tx.send(final_msg).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -751,10 +707,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
session_id: Uuid,
|
session_id: Uuid,
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Getting conversation history for session {} user {}", session_id, user_id);
|
||||||
"Getting conversation history for session {} user {}",
|
|
||||||
session_id, user_id
|
|
||||||
);
|
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
let history = session_manager.get_conversation_history(session_id, user_id)?;
|
let history = session_manager.get_conversation_history(session_id, user_id)?;
|
||||||
Ok(history)
|
Ok(history)
|
||||||
|
|
@ -765,12 +718,11 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
token: Option<String>,
|
token: Option<String>,
|
||||||
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Running start script for session: {} with token: {:?}", session.id, token);
|
||||||
"Running start script for session: {} with token: {:?}",
|
|
||||||
session.id, token
|
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
|
||||||
);
|
|
||||||
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
|
|
||||||
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
|
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
|
||||||
|
|
||||||
let start_script = match std::fs::read_to_string(&start_script_path) {
|
let start_script = match std::fs::read_to_string(&start_script_path) {
|
||||||
Ok(content) => content,
|
Ok(content) => content,
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
|
@ -778,10 +730,8 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug!(
|
|
||||||
"Start script content for session {}: {}",
|
info!("Start script content for session {}: {}", session.id, start_script);
|
||||||
session.id, start_script
|
|
||||||
);
|
|
||||||
|
|
||||||
let session_clone = session.clone();
|
let session_clone = session.clone();
|
||||||
let state_clone = state.clone();
|
let state_clone = state.clone();
|
||||||
|
|
@ -794,17 +744,11 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
.and_then(|ast| script_service.run(&ast))
|
.and_then(|ast| script_service.run(&ast))
|
||||||
{
|
{
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
info!(
|
info!("Start script executed successfully for session {}, result: {}", session_clone.id, result);
|
||||||
"Start script executed successfully for session {}, result: {}",
|
|
||||||
session_clone.id, result
|
|
||||||
);
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(
|
error!("Failed to run start script for session {}: {}", session_clone.id, e);
|
||||||
"Failed to run start script for session {}: {}",
|
|
||||||
session_clone.id, e
|
|
||||||
);
|
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -816,10 +760,8 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
channel: &str,
|
channel: &str,
|
||||||
message: &str,
|
message: &str,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
warn!(
|
warn!("Sending warning to session {} on channel {}: {}", session_id, channel, message);
|
||||||
"Sending warning to session {} on channel {}: {}",
|
|
||||||
session_id, channel, message
|
|
||||||
);
|
|
||||||
if channel == "web" {
|
if channel == "web" {
|
||||||
self.send_event(
|
self.send_event(
|
||||||
"system",
|
"system",
|
||||||
|
|
@ -847,10 +789,7 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
};
|
};
|
||||||
adapter.send_message(warn_response).await
|
adapter.send_message(warn_response).await
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!("No channel adapter found for warning on channel: {}", channel);
|
||||||
"No channel adapter found for warning on channel: {}",
|
|
||||||
channel
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -863,10 +802,8 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
_bot_id: &str,
|
_bot_id: &str,
|
||||||
token: Option<String>,
|
token: Option<String>,
|
||||||
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!(
|
info!("Triggering auto welcome for user: {}, session: {}, token: {:?}", user_id, session_id, token);
|
||||||
"Triggering auto welcome for user: {}, session: {}, token: {:?}",
|
|
||||||
user_id, session_id, token
|
|
||||||
);
|
|
||||||
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
|
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
|
||||||
error!("Invalid session ID: {}", e);
|
error!("Invalid session ID: {}", e);
|
||||||
e
|
e
|
||||||
|
|
@ -884,24 +821,9 @@ let bucket_name = format!("{}.gbai", bot_name);
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?;
|
let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?;
|
||||||
info!(
|
info!("Auto welcome completed for session: {} with result: {}", session_id, result);
|
||||||
"Auto welcome completed for session: {} with result: {}",
|
|
||||||
session_id, result
|
|
||||||
);
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_web_response_channel(
|
|
||||||
&self,
|
|
||||||
session_id: &str,
|
|
||||||
) -> Result<mpsc::Sender<BotResponse>, Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
let response_channels = self.state.response_channels.lock().await;
|
|
||||||
if let Some(tx) = response_channels.get(session_id) {
|
|
||||||
Ok(tx.clone())
|
|
||||||
} else {
|
|
||||||
Err("No response channel found for session".into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BotOrchestrator {
|
impl Default for BotOrchestrator {
|
||||||
|
|
@ -927,7 +849,6 @@ async fn websocket_handler(
|
||||||
.unwrap_or_else(|| Uuid::new_v4().to_string())
|
.unwrap_or_else(|| Uuid::new_v4().to_string())
|
||||||
.replace("undefined", &Uuid::new_v4().to_string());
|
.replace("undefined", &Uuid::new_v4().to_string());
|
||||||
|
|
||||||
// Ensure user exists in database before proceeding
|
|
||||||
let user_id = {
|
let user_id = {
|
||||||
let user_uuid = Uuid::parse_str(&user_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
let user_uuid = Uuid::parse_str(&user_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
||||||
let mut sm = data.session_manager.lock().await;
|
let mut sm = data.session_manager.lock().await;
|
||||||
|
|
@ -942,8 +863,8 @@ async fn websocket_handler(
|
||||||
|
|
||||||
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
|
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
|
||||||
let (tx, mut rx) = mpsc::channel::<BotResponse>(100);
|
let (tx, mut rx) = mpsc::channel::<BotResponse>(100);
|
||||||
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
|
||||||
|
|
||||||
|
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
||||||
orchestrator
|
orchestrator
|
||||||
.register_response_channel(session_id.clone(), tx.clone())
|
.register_response_channel(session_id.clone(), tx.clone())
|
||||||
.await;
|
.await;
|
||||||
|
|
@ -956,7 +877,6 @@ async fn websocket_handler(
|
||||||
.add_connection(session_id.clone(), tx.clone())
|
.add_connection(session_id.clone(), tx.clone())
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Get first available bot from database
|
|
||||||
let bot_id = {
|
let bot_id = {
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
@ -998,16 +918,13 @@ async fn websocket_handler(
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
info!(
|
info!("WebSocket connection established for session: {}, user: {}", session_id, user_id);
|
||||||
"WebSocket connection established for session: {}, user: {}",
|
|
||||||
session_id, user_id
|
|
||||||
);
|
|
||||||
|
|
||||||
// Trigger auto welcome (start.bas)
|
|
||||||
let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data));
|
let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data));
|
||||||
let user_id_welcome = user_id.clone();
|
let user_id_welcome = user_id.clone();
|
||||||
let session_id_welcome = session_id.clone();
|
let session_id_welcome = session_id.clone();
|
||||||
let bot_id_welcome = bot_id.clone();
|
let bot_id_welcome = bot_id.clone();
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
if let Err(e) = orchestrator_clone
|
if let Err(e) = orchestrator_clone
|
||||||
.trigger_auto_welcome(&session_id_welcome, &user_id_welcome, &bot_id_welcome, None)
|
.trigger_auto_welcome(&session_id_welcome, &user_id_welcome, &bot_id_welcome, None)
|
||||||
|
|
@ -1023,10 +940,7 @@ async fn websocket_handler(
|
||||||
let user_id_clone = user_id.clone();
|
let user_id_clone = user_id.clone();
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
info!(
|
info!("Starting WebSocket sender for session {}", session_id_clone1);
|
||||||
"Starting WebSocket sender for session {}",
|
|
||||||
session_id_clone1
|
|
||||||
);
|
|
||||||
let mut message_count = 0;
|
let mut message_count = 0;
|
||||||
while let Some(msg) = rx.recv().await {
|
while let Some(msg) = rx.recv().await {
|
||||||
message_count += 1;
|
message_count += 1;
|
||||||
|
|
@ -1037,23 +951,17 @@ async fn websocket_handler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!(
|
info!("WebSocket sender terminated for session {}, sent {} messages", session_id_clone1, message_count);
|
||||||
"WebSocket sender terminated for session {}, sent {} messages",
|
|
||||||
session_id_clone1, message_count
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
info!(
|
info!("Starting WebSocket receiver for session {}", session_id_clone2);
|
||||||
"Starting WebSocket receiver for session {}",
|
|
||||||
session_id_clone2
|
|
||||||
);
|
|
||||||
let mut message_count = 0;
|
let mut message_count = 0;
|
||||||
while let Some(Ok(msg)) = msg_stream.recv().await {
|
while let Some(Ok(msg)) = msg_stream.recv().await {
|
||||||
match msg {
|
match msg {
|
||||||
WsMessage::Text(text) => {
|
WsMessage::Text(text) => {
|
||||||
message_count += 1;
|
message_count += 1;
|
||||||
// Get first available bot from database
|
|
||||||
let bot_id = {
|
let bot_id = {
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
@ -1077,42 +985,35 @@ async fn websocket_handler(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse the text as JSON to extract the content field
|
|
||||||
let json_value: serde_json::Value = match serde_json::from_str(&text) {
|
let json_value: serde_json::Value = match serde_json::from_str(&text) {
|
||||||
Ok(value) => value,
|
Ok(value) => value,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error parsing JSON message {}: {}", message_count, e);
|
error!("Error parsing JSON message {}: {}", message_count, e);
|
||||||
continue; // Skip processing this message
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Extract content from JSON, fallback to original text if content field doesn't exist
|
|
||||||
let content = json_value["content"]
|
let content = json_value["content"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let user_message = UserMessage {
|
let user_message = UserMessage {
|
||||||
bot_id: bot_id,
|
bot_id,
|
||||||
user_id: user_id_clone.clone(),
|
user_id: user_id_clone.clone(),
|
||||||
session_id: session_id_clone2.clone(),
|
session_id: session_id_clone2.clone(),
|
||||||
channel: "web".to_string(),
|
channel: "web".to_string(),
|
||||||
content: content,
|
content,
|
||||||
message_type: 1,
|
message_type: 1,
|
||||||
media_url: None,
|
media_url: None,
|
||||||
timestamp: Utc::now(),
|
timestamp: Utc::now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
|
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
|
||||||
error!(
|
error!("Error processing WebSocket message {}: {}", message_count, e);
|
||||||
"Error processing WebSocket message {}: {}",
|
|
||||||
message_count, e
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
WsMessage::Close(_) => {
|
WsMessage::Close(_) => {
|
||||||
// Get first available bot from database
|
|
||||||
let bot_id = {
|
let bot_id = {
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
@ -1135,6 +1036,7 @@ async fn websocket_handler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
orchestrator
|
orchestrator
|
||||||
.send_event(
|
.send_event(
|
||||||
&user_id_clone,
|
&user_id_clone,
|
||||||
|
|
@ -1146,6 +1048,7 @@ async fn websocket_handler(
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
web_adapter.remove_connection(&session_id_clone2).await;
|
web_adapter.remove_connection(&session_id_clone2).await;
|
||||||
orchestrator
|
orchestrator
|
||||||
.unregister_response_channel(&session_id_clone2)
|
.unregister_response_channel(&session_id_clone2)
|
||||||
|
|
@ -1155,16 +1058,10 @@ async fn websocket_handler(
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!(
|
info!("WebSocket receiver terminated for session {}, processed {} messages", session_id_clone2, message_count);
|
||||||
"WebSocket receiver terminated for session {}, processed {} messages",
|
|
||||||
session_id_clone2, message_count
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(
|
info!("WebSocket handler setup completed for session {}", session_id);
|
||||||
"WebSocket handler setup completed for session {}",
|
|
||||||
session_id
|
|
||||||
);
|
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1177,6 +1074,7 @@ async fn start_session(
|
||||||
.get("session_id")
|
.get("session_id")
|
||||||
.and_then(|s| s.as_str())
|
.and_then(|s| s.as_str())
|
||||||
.unwrap_or("");
|
.unwrap_or("");
|
||||||
|
|
||||||
let token = info
|
let token = info
|
||||||
.get("token")
|
.get("token")
|
||||||
.and_then(|t| t.as_str())
|
.and_then(|t| t.as_str())
|
||||||
|
|
@ -1210,12 +1108,10 @@ async fn start_session(
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token).await;
|
let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token).await;
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
info!(
|
info!("Start script completed successfully for session: {}", session_id);
|
||||||
"Start script completed successfully for session: {}",
|
|
||||||
session_id
|
|
||||||
);
|
|
||||||
Ok(HttpResponse::Ok().json(serde_json::json!({
|
Ok(HttpResponse::Ok().json(serde_json::json!({
|
||||||
"status": "started",
|
"status": "started",
|
||||||
"session_id": session.id,
|
"session_id": session.id,
|
||||||
|
|
@ -1231,10 +1127,7 @@ async fn start_session(
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(
|
error!("Error running start script for session {}: {}", session_id, e);
|
||||||
"Error running start script for session {}: {}",
|
|
||||||
session_id, e
|
|
||||||
);
|
|
||||||
Ok(HttpResponse::InternalServerError()
|
Ok(HttpResponse::InternalServerError()
|
||||||
.json(serde_json::json!({"error": e.to_string()})))
|
.json(serde_json::json!({"error": e.to_string()})))
|
||||||
}
|
}
|
||||||
|
|
@ -1249,14 +1142,12 @@ async fn send_warning_handler(
|
||||||
let default_session = "default".to_string();
|
let default_session = "default".to_string();
|
||||||
let default_channel = "web".to_string();
|
let default_channel = "web".to_string();
|
||||||
let default_message = "Warning!".to_string();
|
let default_message = "Warning!".to_string();
|
||||||
|
|
||||||
let session_id = info.get("session_id").unwrap_or(&default_session);
|
let session_id = info.get("session_id").unwrap_or(&default_session);
|
||||||
let channel = info.get("channel").unwrap_or(&default_channel);
|
let channel = info.get("channel").unwrap_or(&default_channel);
|
||||||
let message = info.get("message").unwrap_or(&default_message);
|
let message = info.get("message").unwrap_or(&default_message);
|
||||||
|
|
||||||
info!(
|
info!("Sending warning via API - session: {}, channel: {}", session_id, channel);
|
||||||
"Sending warning via API - session: {}, channel: {}",
|
|
||||||
session_id, channel
|
|
||||||
);
|
|
||||||
|
|
||||||
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
||||||
if let Err(e) = orchestrator
|
if let Err(e) = orchestrator
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue