refactor(bot): reorganize imports, improve logging and formatting
- Reordered and deduplicated `use` statements, adding missing imports for `langcache`, `DriveMonitor`, `generate_embeddings`, `Qdrant` client, and `chrono::Utc`. - Moved Diesel prelude import into the function scope where it is used to limit its visibility. - Refactored async task spawning to handle errors more clearly and consistently. - Enhanced string formatting for bucket names and log messages, introducing multiline `warn!` and `error!` calls for better readability. - Applied consistent code style (spacing, line breaks) across the module to improve maintainability.
This commit is contained in:
parent
4e781b1815
commit
d6fcc346fc
3 changed files with 273 additions and 122 deletions
283
src/bot/mod.rs
283
src/bot/mod.rs
|
|
@ -1,21 +1,21 @@
|
||||||
use crate::channels::ChannelAdapter;
|
use crate::channels::ChannelAdapter;
|
||||||
|
use crate::context::langcache::get_langcache_client;
|
||||||
|
use crate::drive_monitor::DriveMonitor;
|
||||||
|
use crate::kb::embeddings::generate_embeddings;
|
||||||
|
use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint};
|
||||||
use crate::shared::models::{BotResponse, UserMessage, UserSession};
|
use crate::shared::models::{BotResponse, UserMessage, UserSession};
|
||||||
use crate::shared::state::AppState;
|
use crate::shared::state::AppState;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse, Result};
|
use actix_web::{web, HttpRequest, HttpResponse, Result};
|
||||||
use actix_ws::Message as WsMessage;
|
use actix_ws::Message as WsMessage;
|
||||||
|
use chrono::Utc;
|
||||||
use diesel::PgConnection;
|
use diesel::PgConnection;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use chrono::Utc;
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use crate::kb::embeddings::generate_embeddings;
|
|
||||||
use uuid::Uuid;
|
|
||||||
use crate::kb::qdrant_client::{ensure_collection_exists, get_qdrant_client, QdrantPoint};
|
|
||||||
use crate::context::langcache::get_langcache_client;
|
|
||||||
use crate::drive_monitor::DriveMonitor;
|
|
||||||
use tokio::sync::Mutex as AsyncMutex;
|
use tokio::sync::Mutex as AsyncMutex;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub struct BotOrchestrator {
|
pub struct BotOrchestrator {
|
||||||
pub state: Arc<AppState>,
|
pub state: Arc<AppState>,
|
||||||
|
|
@ -50,7 +50,10 @@ impl BotOrchestrator {
|
||||||
let bot_guid_str = bot_guid.to_string();
|
let bot_guid_str = bot_guid.to_string();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone()).await {
|
if let Err(e) =
|
||||||
|
Self::mount_bot_task(state_clone, mounted_bots_clone, bot_guid_str.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
error!("Failed to mount bot {}: {}", bot_guid_str, e);
|
error!("Failed to mount bot {}: {}", bot_guid_str, e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -64,13 +67,12 @@ impl BotOrchestrator {
|
||||||
mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
|
mounted_bots: Arc<AsyncMutex<HashMap<String, Arc<DriveMonitor>>>>,
|
||||||
bot_guid: String,
|
bot_guid: String,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
use diesel::prelude::*;
|
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
|
use diesel::prelude::*;
|
||||||
|
|
||||||
let bot_name: String = {
|
let bot_name: String = {
|
||||||
let mut db_conn = state.conn.lock().unwrap();
|
let mut db_conn = state.conn.lock().unwrap();
|
||||||
bots
|
bots.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
||||||
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
|
||||||
.select(name)
|
.select(name)
|
||||||
.first(&mut *db_conn)
|
.first(&mut *db_conn)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
|
|
@ -91,7 +93,7 @@ impl BotOrchestrator {
|
||||||
|
|
||||||
let bot_id = Uuid::parse_str(&bot_guid)?;
|
let bot_id = Uuid::parse_str(&bot_guid)?;
|
||||||
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name, bot_id));
|
let drive_monitor = Arc::new(DriveMonitor::new(state.clone(), bucket_name, bot_id));
|
||||||
|
|
||||||
let _handle = drive_monitor.clone().spawn().await;
|
let _handle = drive_monitor.clone().spawn().await;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
@ -103,22 +105,34 @@ impl BotOrchestrator {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn create_bot(
|
||||||
let bucket_name = format!("{}{}.gbai", self.state.config.as_ref().unwrap().drive.org_prefix, bot_guid);
|
&self,
|
||||||
|
bot_guid: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let bucket_name = format!(
|
||||||
|
"{}{}.gbai",
|
||||||
|
self.state.config.as_ref().unwrap().drive.org_prefix,
|
||||||
|
bot_guid
|
||||||
|
);
|
||||||
crate::create_bucket::create_bucket(&bucket_name)?;
|
crate::create_bucket::create_bucket(&bucket_name)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn mount_bot(&self, bot_guid: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn mount_bot(
|
||||||
let bot_guid = bot_guid.strip_suffix(".gbai").unwrap_or(bot_guid).to_string();
|
&self,
|
||||||
|
bot_guid: &str,
|
||||||
use diesel::prelude::*;
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let bot_guid = bot_guid
|
||||||
|
.strip_suffix(".gbai")
|
||||||
|
.unwrap_or(bot_guid)
|
||||||
|
.to_string();
|
||||||
|
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
|
use diesel::prelude::*;
|
||||||
|
|
||||||
let bot_name: String = {
|
let bot_name: String = {
|
||||||
let mut db_conn = self.state.conn.lock().unwrap();
|
let mut db_conn = self.state.conn.lock().unwrap();
|
||||||
bots
|
bots.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
||||||
.filter(id.eq(Uuid::parse_str(&bot_guid)?))
|
|
||||||
.select(name)
|
.select(name)
|
||||||
.first(&mut *db_conn)
|
.first(&mut *db_conn)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
|
|
@ -139,7 +153,7 @@ impl BotOrchestrator {
|
||||||
|
|
||||||
let bot_id = Uuid::parse_str(&bot_guid)?;
|
let bot_id = Uuid::parse_str(&bot_guid)?;
|
||||||
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name, bot_id));
|
let drive_monitor = Arc::new(DriveMonitor::new(self.state.clone(), bucket_name, bot_id));
|
||||||
|
|
||||||
let _handle = drive_monitor.clone().spawn().await;
|
let _handle = drive_monitor.clone().spawn().await;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
@ -155,7 +169,10 @@ impl BotOrchestrator {
|
||||||
session_id: Uuid,
|
session_id: Uuid,
|
||||||
user_input: &str,
|
user_input: &str,
|
||||||
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Handling user input for session {}: '{}'", session_id, user_input);
|
info!(
|
||||||
|
"Handling user input for session {}: '{}'",
|
||||||
|
session_id, user_input
|
||||||
|
);
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
session_manager.provide_input(session_id, user_input.to_string())?;
|
session_manager.provide_input(session_id, user_input.to_string())?;
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
|
@ -196,7 +213,10 @@ impl BotOrchestrator {
|
||||||
bot_id: &str,
|
bot_id: &str,
|
||||||
mode: i32,
|
mode: i32,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Setting answer mode for user {} with bot {} to mode {}", user_id, bot_id, mode);
|
info!(
|
||||||
|
"Setting answer mode for user {} with bot {} to mode {}",
|
||||||
|
user_id, bot_id, mode
|
||||||
|
);
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
session_manager.update_answer_mode(user_id, bot_id, mode)?;
|
session_manager.update_answer_mode(user_id, bot_id, mode)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -211,21 +231,24 @@ impl BotOrchestrator {
|
||||||
event_type: &str,
|
event_type: &str,
|
||||||
data: serde_json::Value,
|
data: serde_json::Value,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Sending event '{}' to session {} on channel {}", event_type, session_id, channel);
|
info!(
|
||||||
let event_response = BotResponse {
|
"Sending event '{}' to session {} on channel {}",
|
||||||
bot_id: bot_id.to_string(),
|
event_type, session_id, channel
|
||||||
user_id: user_id.to_string(),
|
);
|
||||||
session_id: session_id.to_string(),
|
let event_response = BotResponse {
|
||||||
channel: channel.to_string(),
|
bot_id: bot_id.to_string(),
|
||||||
content: serde_json::to_string(&serde_json::json!({
|
user_id: user_id.to_string(),
|
||||||
"event": event_type,
|
session_id: session_id.to_string(),
|
||||||
"data": data
|
channel: channel.to_string(),
|
||||||
}))?,
|
content: serde_json::to_string(&serde_json::json!({
|
||||||
message_type: 2,
|
"event": event_type,
|
||||||
stream_token: None,
|
"data": data
|
||||||
is_complete: true,
|
}))?,
|
||||||
suggestions: Vec::new(),
|
message_type: 2,
|
||||||
};
|
stream_token: None,
|
||||||
|
is_complete: true,
|
||||||
|
suggestions: Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
||||||
adapter.send_message(event_response).await?;
|
adapter.send_message(event_response).await?;
|
||||||
|
|
@ -242,7 +265,10 @@ impl BotOrchestrator {
|
||||||
channel: &str,
|
channel: &str,
|
||||||
content: &str,
|
content: &str,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Sending direct message to session {}: '{}'", session_id, content);
|
info!(
|
||||||
|
"Sending direct message to session {}: '{}'",
|
||||||
|
session_id, content
|
||||||
|
);
|
||||||
let bot_response = BotResponse {
|
let bot_response = BotResponse {
|
||||||
bot_id: "default_bot".to_string(),
|
bot_id: "default_bot".to_string(),
|
||||||
user_id: "default_user".to_string(),
|
user_id: "default_user".to_string(),
|
||||||
|
|
@ -258,7 +284,10 @@ impl BotOrchestrator {
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
|
||||||
adapter.send_message(bot_response).await?;
|
adapter.send_message(bot_response).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!("No channel adapter found for direct message on channel: {}", channel);
|
warn!(
|
||||||
|
"No channel adapter found for direct message on channel: {}",
|
||||||
|
channel
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -268,8 +297,14 @@ impl BotOrchestrator {
|
||||||
&self,
|
&self,
|
||||||
message: UserMessage,
|
message: UserMessage,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Processing message from channel: {}, user: {}, session: {}", message.channel, message.user_id, message.session_id);
|
info!(
|
||||||
debug!("Message content: '{}', type: {}", message.content, message.message_type);
|
"Processing message from channel: {}, user: {}, session: {}",
|
||||||
|
message.channel, message.user_id, message.session_id
|
||||||
|
);
|
||||||
|
debug!(
|
||||||
|
"Message content: '{}', type: {}",
|
||||||
|
message.content, message.message_type
|
||||||
|
);
|
||||||
|
|
||||||
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
||||||
error!("Invalid user ID provided: {}", e);
|
error!("Invalid user ID provided: {}", e);
|
||||||
|
|
@ -287,16 +322,27 @@ impl BotOrchestrator {
|
||||||
match sm.get_session_by_id(session_id)? {
|
match sm.get_session_by_id(session_id)? {
|
||||||
Some(session) => session,
|
Some(session) => session,
|
||||||
None => {
|
None => {
|
||||||
error!("Failed to create session for user {} with bot {}", user_id, bot_id);
|
error!(
|
||||||
|
"Failed to create session for user {} with bot {}",
|
||||||
|
user_id, bot_id
|
||||||
|
);
|
||||||
return Err("Failed to create session".into());
|
return Err("Failed to create session".into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if self.is_waiting_for_input(session.id).await {
|
if self.is_waiting_for_input(session.id).await {
|
||||||
debug!("Session {} is waiting for input, processing as variable input", session.id);
|
debug!(
|
||||||
if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? {
|
"Session {} is waiting for input, processing as variable input",
|
||||||
info!("Stored user input in variable '{}' for session {}", variable_name, session.id);
|
session.id
|
||||||
|
);
|
||||||
|
if let Some(variable_name) =
|
||||||
|
self.handle_user_input(session.id, &message.content).await?
|
||||||
|
{
|
||||||
|
info!(
|
||||||
|
"Stored user input in variable '{}' for session {}",
|
||||||
|
variable_name, session.id
|
||||||
|
);
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
||||||
let ack_response = BotResponse {
|
let ack_response = BotResponse {
|
||||||
bot_id: message.bot_id.clone(),
|
bot_id: message.bot_id.clone(),
|
||||||
|
|
@ -357,7 +403,10 @@ impl BotOrchestrator {
|
||||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
|
||||||
adapter.send_message(bot_response).await?;
|
adapter.send_message(bot_response).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!("No channel adapter found for message channel: {}", message.channel);
|
warn!(
|
||||||
|
"No channel adapter found for message channel: {}",
|
||||||
|
message.channel
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -426,7 +475,8 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = self.state
|
let response = self
|
||||||
|
.state
|
||||||
.llm_provider
|
.llm_provider
|
||||||
.generate(&prompt, &serde_json::Value::Null)
|
.generate(&prompt, &serde_json::Value::Null)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
@ -467,7 +517,8 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let response = self.state
|
let response = self
|
||||||
|
.state
|
||||||
.llm_provider
|
.llm_provider
|
||||||
.generate(&prompt, &serde_json::Value::Null)
|
.generate(&prompt, &serde_json::Value::Null)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
@ -494,7 +545,10 @@ impl BotOrchestrator {
|
||||||
message: UserMessage,
|
message: UserMessage,
|
||||||
response_tx: mpsc::Sender<BotResponse>,
|
response_tx: mpsc::Sender<BotResponse>,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Streaming response for user: {}, session: {}", message.user_id, message.session_id);
|
info!(
|
||||||
|
"Streaming response for user: {}, session: {}",
|
||||||
|
message.user_id, message.session_id
|
||||||
|
);
|
||||||
|
|
||||||
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
let user_id = Uuid::parse_str(&message.user_id).map_err(|e| {
|
||||||
error!("Invalid user ID: {}", e);
|
error!("Invalid user ID: {}", e);
|
||||||
|
|
@ -572,7 +626,10 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.push_str(&format!("User: {}\nAssistant:", message.content));
|
p.push_str(&format!("User: {}\nAssistant:", message.content));
|
||||||
info!("Stream prompt constructed with {} history entries", history.len());
|
info!(
|
||||||
|
"Stream prompt constructed with {} history entries",
|
||||||
|
history.len()
|
||||||
|
);
|
||||||
p
|
p
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -628,13 +685,18 @@ impl BotOrchestrator {
|
||||||
|
|
||||||
analysis_buffer.push_str(&chunk);
|
analysis_buffer.push_str(&chunk);
|
||||||
|
|
||||||
if analysis_buffer.contains("**") && !in_analysis {
|
if (analysis_buffer.contains("**") || analysis_buffer.contains("<think>"))
|
||||||
|
&& !in_analysis
|
||||||
|
{
|
||||||
in_analysis = true;
|
in_analysis = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if in_analysis {
|
if in_analysis {
|
||||||
if analysis_buffer.ends_with("final") {
|
if analysis_buffer.ends_with("final") || analysis_buffer.ends_with("</think>") {
|
||||||
info!("Analysis section completed, buffer length: {}", analysis_buffer.len());
|
info!(
|
||||||
|
"Analysis section completed, buffer length: {}",
|
||||||
|
analysis_buffer.len()
|
||||||
|
);
|
||||||
in_analysis = false;
|
in_analysis = false;
|
||||||
analysis_buffer.clear();
|
analysis_buffer.clear();
|
||||||
|
|
||||||
|
|
@ -680,7 +742,10 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Stream processing completed, {} chunks processed", chunk_count);
|
info!(
|
||||||
|
"Stream processing completed, {} chunks processed",
|
||||||
|
chunk_count
|
||||||
|
);
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut sm = self.state.session_manager.lock().await;
|
let mut sm = self.state.session_manager.lock().await;
|
||||||
|
|
@ -717,7 +782,10 @@ impl BotOrchestrator {
|
||||||
session_id: Uuid,
|
session_id: Uuid,
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Getting conversation history for session {} user {}", session_id, user_id);
|
info!(
|
||||||
|
"Getting conversation history for session {} user {}",
|
||||||
|
session_id, user_id
|
||||||
|
);
|
||||||
let mut session_manager = self.state.session_manager.lock().await;
|
let mut session_manager = self.state.session_manager.lock().await;
|
||||||
let history = session_manager.get_conversation_history(session_id, user_id)?;
|
let history = session_manager.get_conversation_history(session_id, user_id)?;
|
||||||
Ok(history)
|
Ok(history)
|
||||||
|
|
@ -728,7 +796,10 @@ impl BotOrchestrator {
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
token: Option<String>,
|
token: Option<String>,
|
||||||
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Running start script for session: {} with token: {:?}", session.id, token);
|
info!(
|
||||||
|
"Running start script for session: {} with token: {:?}",
|
||||||
|
session.id, token
|
||||||
|
);
|
||||||
|
|
||||||
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
|
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| String::from("default_bot"));
|
||||||
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
|
let start_script_path = format!("./{}.gbai/.gbdialog/start.bas", bot_guid);
|
||||||
|
|
@ -741,7 +812,10 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("Start script content for session {}: {}", session.id, start_script);
|
info!(
|
||||||
|
"Start script content for session {}: {}",
|
||||||
|
session.id, start_script
|
||||||
|
);
|
||||||
|
|
||||||
let session_clone = session.clone();
|
let session_clone = session.clone();
|
||||||
let state_clone = state.clone();
|
let state_clone = state.clone();
|
||||||
|
|
@ -754,11 +828,17 @@ impl BotOrchestrator {
|
||||||
.and_then(|ast| script_service.run(&ast))
|
.and_then(|ast| script_service.run(&ast))
|
||||||
{
|
{
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
info!("Start script executed successfully for session {}, result: {}", session_clone.id, result);
|
info!(
|
||||||
|
"Start script executed successfully for session {}, result: {}",
|
||||||
|
session_clone.id, result
|
||||||
|
);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to run start script for session {}: {}", session_clone.id, e);
|
error!(
|
||||||
|
"Failed to run start script for session {}: {}",
|
||||||
|
session_clone.id, e
|
||||||
|
);
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -770,7 +850,10 @@ impl BotOrchestrator {
|
||||||
channel: &str,
|
channel: &str,
|
||||||
message: &str,
|
message: &str,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
warn!("Sending warning to session {} on channel {}: {}", session_id, channel, message);
|
warn!(
|
||||||
|
"Sending warning to session {} on channel {}: {}",
|
||||||
|
session_id, channel, message
|
||||||
|
);
|
||||||
|
|
||||||
if channel == "web" {
|
if channel == "web" {
|
||||||
self.send_event(
|
self.send_event(
|
||||||
|
|
@ -800,7 +883,10 @@ impl BotOrchestrator {
|
||||||
};
|
};
|
||||||
adapter.send_message(warn_response).await
|
adapter.send_message(warn_response).await
|
||||||
} else {
|
} else {
|
||||||
warn!("No channel adapter found for warning on channel: {}", channel);
|
warn!(
|
||||||
|
"No channel adapter found for warning on channel: {}",
|
||||||
|
channel
|
||||||
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -813,7 +899,10 @@ impl BotOrchestrator {
|
||||||
_bot_id: &str,
|
_bot_id: &str,
|
||||||
token: Option<String>,
|
token: Option<String>,
|
||||||
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
info!("Triggering auto welcome for user: {}, session: {}, token: {:?}", user_id, session_id, token);
|
info!(
|
||||||
|
"Triggering auto welcome for user: {}, session: {}, token: {:?}",
|
||||||
|
user_id, session_id, token
|
||||||
|
);
|
||||||
|
|
||||||
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
|
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
|
||||||
error!("Invalid session ID: {}", e);
|
error!("Invalid session ID: {}", e);
|
||||||
|
|
@ -832,14 +921,17 @@ impl BotOrchestrator {
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?;
|
let result = Self::run_start_script(&session, Arc::clone(&self.state), token).await?;
|
||||||
info!("Auto welcome completed for session: {} with result: {}", session_id, result);
|
info!(
|
||||||
|
"Auto welcome completed for session: {} with result: {}",
|
||||||
|
session_id, result
|
||||||
|
);
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bot_from_url(
|
pub fn bot_from_url(
|
||||||
db_conn: &mut PgConnection,
|
db_conn: &mut PgConnection,
|
||||||
path: &str
|
path: &str,
|
||||||
) -> Result<(Uuid, String), HttpResponse> {
|
) -> Result<(Uuid, String), HttpResponse> {
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
@ -867,7 +959,11 @@ pub fn bot_from_url(
|
||||||
.optional()
|
.optional()
|
||||||
{
|
{
|
||||||
Ok(Some((first_bot_id, first_bot_name))) => {
|
Ok(Some((first_bot_id, first_bot_name))) => {
|
||||||
log::info!("Using first available bot: {} ({})", first_bot_id, first_bot_name);
|
log::info!(
|
||||||
|
"Using first available bot: {} ({})",
|
||||||
|
first_bot_id,
|
||||||
|
first_bot_name
|
||||||
|
);
|
||||||
Ok((first_bot_id, first_bot_name))
|
Ok((first_bot_id, first_bot_name))
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
|
|
@ -975,7 +1071,10 @@ async fn websocket_handler(
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
info!("WebSocket connection established for session: {}, user: {}", session_id, user_id);
|
info!(
|
||||||
|
"WebSocket connection established for session: {}, user: {}",
|
||||||
|
session_id, user_id
|
||||||
|
);
|
||||||
|
|
||||||
let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data));
|
let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data));
|
||||||
let user_id_welcome = user_id.clone();
|
let user_id_welcome = user_id.clone();
|
||||||
|
|
@ -997,7 +1096,10 @@ async fn websocket_handler(
|
||||||
let user_id_clone = user_id.clone();
|
let user_id_clone = user_id.clone();
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
info!("Starting WebSocket sender for session {}", session_id_clone1);
|
info!(
|
||||||
|
"Starting WebSocket sender for session {}",
|
||||||
|
session_id_clone1
|
||||||
|
);
|
||||||
let mut message_count = 0;
|
let mut message_count = 0;
|
||||||
while let Some(msg) = rx.recv().await {
|
while let Some(msg) = rx.recv().await {
|
||||||
message_count += 1;
|
message_count += 1;
|
||||||
|
|
@ -1008,11 +1110,17 @@ async fn websocket_handler(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("WebSocket sender terminated for session {}, sent {} messages", session_id_clone1, message_count);
|
info!(
|
||||||
|
"WebSocket sender terminated for session {}, sent {} messages",
|
||||||
|
session_id_clone1, message_count
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
actix_web::rt::spawn(async move {
|
actix_web::rt::spawn(async move {
|
||||||
info!("Starting WebSocket receiver for session {}", session_id_clone2);
|
info!(
|
||||||
|
"Starting WebSocket receiver for session {}",
|
||||||
|
session_id_clone2
|
||||||
|
);
|
||||||
let mut message_count = 0;
|
let mut message_count = 0;
|
||||||
while let Some(Ok(msg)) = msg_stream.recv().await {
|
while let Some(Ok(msg)) = msg_stream.recv().await {
|
||||||
match msg {
|
match msg {
|
||||||
|
|
@ -1067,12 +1175,18 @@ async fn websocket_handler(
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
|
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
|
||||||
error!("Error processing WebSocket message {}: {}", message_count, e);
|
error!(
|
||||||
|
"Error processing WebSocket message {}: {}",
|
||||||
|
message_count, e
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
WsMessage::Close(reason) => {
|
WsMessage::Close(reason) => {
|
||||||
debug!("WebSocket closing for session {} - reason: {:?}", session_id_clone2, reason);
|
debug!(
|
||||||
|
"WebSocket closing for session {} - reason: {:?}",
|
||||||
|
session_id_clone2, reason
|
||||||
|
);
|
||||||
|
|
||||||
let bot_id = {
|
let bot_id = {
|
||||||
use crate::shared::models::schema::bots::dsl::*;
|
use crate::shared::models::schema::bots::dsl::*;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
@ -1125,10 +1239,16 @@ async fn websocket_handler(
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
info!("WebSocket receiver terminated for session {}, processed {} messages", session_id_clone2, message_count);
|
info!(
|
||||||
|
"WebSocket receiver terminated for session {}, processed {} messages",
|
||||||
|
session_id_clone2, message_count
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
info!("WebSocket handler setup completed for session {}", session_id);
|
info!(
|
||||||
|
"WebSocket handler setup completed for session {}",
|
||||||
|
session_id
|
||||||
|
);
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1178,7 +1298,10 @@ async fn start_session(
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
info!("Start script completed successfully for session: {}", session_id);
|
info!(
|
||||||
|
"Start script completed successfully for session: {}",
|
||||||
|
session_id
|
||||||
|
);
|
||||||
Ok(HttpResponse::Ok().json(serde_json::json!({
|
Ok(HttpResponse::Ok().json(serde_json::json!({
|
||||||
"status": "started",
|
"status": "started",
|
||||||
"session_id": session.id,
|
"session_id": session.id,
|
||||||
|
|
@ -1194,7 +1317,10 @@ async fn start_session(
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error running start script for session {}: {}", session_id, e);
|
error!(
|
||||||
|
"Error running start script for session {}: {}",
|
||||||
|
session_id, e
|
||||||
|
);
|
||||||
Ok(HttpResponse::InternalServerError()
|
Ok(HttpResponse::InternalServerError()
|
||||||
.json(serde_json::json!({"error": e.to_string()})))
|
.json(serde_json::json!({"error": e.to_string()})))
|
||||||
}
|
}
|
||||||
|
|
@ -1214,7 +1340,10 @@ async fn send_warning_handler(
|
||||||
let channel = info.get("channel").unwrap_or(&default_channel);
|
let channel = info.get("channel").unwrap_or(&default_channel);
|
||||||
let message = info.get("message").unwrap_or(&default_message);
|
let message = info.get("message").unwrap_or(&default_message);
|
||||||
|
|
||||||
info!("Sending warning via API - session: {}, channel: {}", session_id, channel);
|
info!(
|
||||||
|
"Sending warning via API - session: {}, channel: {}",
|
||||||
|
session_id, channel
|
||||||
|
);
|
||||||
|
|
||||||
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
let orchestrator = BotOrchestrator::new(Arc::clone(&data));
|
||||||
if let Err(e) = orchestrator
|
if let Err(e) = orchestrator
|
||||||
|
|
|
||||||
|
|
@ -232,13 +232,11 @@ impl DriveMonitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_gbot(&self, client: &Client) -> Result<(), Box<dyn Error + Send + Sync>> {
|
async fn check_gbot(&self, client: &Client) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||||
|
|
||||||
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
|
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
|
||||||
|
|
||||||
let mut continuation_token = None;
|
let mut continuation_token = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
let list_objects = client
|
let list_objects = client
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(&self.bucket_name.to_lowercase())
|
.bucket(&self.bucket_name.to_lowercase())
|
||||||
|
|
@ -276,61 +274,66 @@ impl DriveMonitor {
|
||||||
.bucket(&self.bucket_name)
|
.bucket(&self.bucket_name)
|
||||||
.key(&path)
|
.key(&path)
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
debug!(
|
debug!(
|
||||||
"GetObject successful for {}, content length: {}",
|
"GetObject successful for {}, content length: {}",
|
||||||
path,
|
path,
|
||||||
response.content_length().unwrap_or(0)
|
response.content_length().unwrap_or(0)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
let bytes = response.body.collect().await?.into_bytes();
|
let bytes = response.body.collect().await?.into_bytes();
|
||||||
debug!("Collected {} bytes for {}", bytes.len(), path);
|
debug!("Collected {} bytes for {}", bytes.len(), path);
|
||||||
let csv_content = String::from_utf8(bytes.to_vec())
|
let csv_content = String::from_utf8(bytes.to_vec())
|
||||||
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
|
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
|
||||||
debug!("Found {}: {} bytes", path, csv_content.len());
|
debug!("Found {}: {} bytes", path, csv_content.len());
|
||||||
|
|
||||||
|
// Restart LLaMA servers only if llm- properties changed
|
||||||
|
let llm_lines: Vec<_> = csv_content
|
||||||
|
.lines()
|
||||||
|
.filter(|line| line.trim_start().starts_with("llm-"))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !llm_lines.is_empty() {
|
||||||
|
use crate::llm_legacy::llm_local::ensure_llama_servers_running;
|
||||||
|
let mut restart_needed = false;
|
||||||
|
|
||||||
// Restart LLaMA servers only if llm- properties changed
|
for line in llm_lines {
|
||||||
let llm_lines: Vec<_> = csv_content
|
let parts: Vec<&str> = line.split(',').collect();
|
||||||
.lines()
|
if parts.len() >= 2 {
|
||||||
.filter(|line| line.trim_start().starts_with("llm-"))
|
let key = parts[0].trim();
|
||||||
.collect();
|
let new_value = parts[1].trim();
|
||||||
|
match config_manager.get_config(&self.bot_id, key, None) {
|
||||||
if !llm_lines.is_empty() {
|
Ok(old_value) => {
|
||||||
use crate::llm_legacy::llm_local::ensure_llama_servers_running;
|
if old_value != new_value {
|
||||||
let mut restart_needed = false;
|
info!(
|
||||||
|
"Detected change in {} (old: {}, new: {})",
|
||||||
for line in llm_lines {
|
key, old_value, new_value
|
||||||
let parts: Vec<&str> = line.split(',').collect();
|
);
|
||||||
if parts.len() >= 2 {
|
|
||||||
let key = parts[0].trim();
|
|
||||||
let new_value = parts[1].trim();
|
|
||||||
match config_manager.get_config(&self.bot_id, key, None) {
|
|
||||||
Ok(old_value) => {
|
|
||||||
if old_value != new_value {
|
|
||||||
info!("Detected change in {} (old: {}, new: {})", key, old_value, new_value);
|
|
||||||
restart_needed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
info!("New llm- property detected: {}", key);
|
|
||||||
restart_needed = true;
|
restart_needed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(_) => {
|
||||||
|
info!("New llm- property detected: {}", key);
|
||||||
|
restart_needed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if restart_needed {
|
_ = config_manager.sync_gbot_config(&self.bot_id, &csv_content);
|
||||||
info!("Detected llm- configuration change, restarting LLaMA servers...");
|
if restart_needed {
|
||||||
if let Err(e) = ensure_llama_servers_running(&self.state).await {
|
info!("Detected llm- configuration change, restarting LLaMA servers...");
|
||||||
error!("Failed to restart LLaMA servers after llm- config change: {}", e);
|
if let Err(e) = ensure_llama_servers_running(&self.state).await {
|
||||||
}
|
error!("Failed to restart LLaMA servers after llm- config change: {}", e);
|
||||||
} else {
|
|
||||||
info!("No llm- property changes detected; skipping LLaMA server restart.");
|
|
||||||
}
|
}
|
||||||
config_manager.sync_gbot_config(&self.bot_id, &csv_content);
|
} else {
|
||||||
|
info!("No llm- property changes detected; skipping LLaMA server restart.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_ = config_manager.sync_gbot_config(&self.bot_id, &csv_content);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ use log::{error, info};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::env;
|
use std::env;
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
use crate::config::ConfigManager;
|
use crate::config::ConfigManager;
|
||||||
|
|
@ -59,7 +60,7 @@ struct LlamaCppResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ensure_llama_servers_running(
|
pub async fn ensure_llama_servers_running(
|
||||||
app_state: &AppState,
|
app_state: &Arc<AppState>,
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let conn = app_state.conn.clone();
|
let conn = app_state.conn.clone();
|
||||||
let config_manager = ConfigManager::new(conn.clone());
|
let config_manager = ConfigManager::new(conn.clone());
|
||||||
|
|
@ -120,6 +121,7 @@ pub async fn ensure_llama_servers_running(
|
||||||
if !llm_running && !llm_model.is_empty() {
|
if !llm_running && !llm_model.is_empty() {
|
||||||
info!("🔄 Starting LLM server...");
|
info!("🔄 Starting LLM server...");
|
||||||
tasks.push(tokio::spawn(start_llm_server(
|
tasks.push(tokio::spawn(start_llm_server(
|
||||||
|
Arc::clone(app_state),
|
||||||
llm_server_path.clone(),
|
llm_server_path.clone(),
|
||||||
llm_model.clone(),
|
llm_model.clone(),
|
||||||
llm_url.clone(),
|
llm_url.clone(),
|
||||||
|
|
@ -206,6 +208,7 @@ pub async fn ensure_llama_servers_running(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_llm_server(
|
async fn start_llm_server(
|
||||||
|
app_state: Arc<AppState>,
|
||||||
llama_cpp_path: String,
|
llama_cpp_path: String,
|
||||||
model_path: String,
|
model_path: String,
|
||||||
url: String,
|
url: String,
|
||||||
|
|
@ -217,14 +220,30 @@ async fn start_llm_server(
|
||||||
std::env::set_var("OMP_PROC_BIND", "close");
|
std::env::set_var("OMP_PROC_BIND", "close");
|
||||||
|
|
||||||
// "cd {} && numactl --interleave=all ./llama-server -m {} --host 0.0.0.0 --port {} --threads 20 --threads-batch 40 --temp 0.7 --parallel 1 --repeat-penalty 1.1 --ctx-size 8192 --batch-size 8192 -n 4096 --mlock --no-mmap --flash-attn --no-kv-offload --no-mmap &",
|
// "cd {} && numactl --interleave=all ./llama-server -m {} --host 0.0.0.0 --port {} --threads 20 --threads-batch 40 --temp 0.7 --parallel 1 --repeat-penalty 1.1 --ctx-size 8192 --batch-size 8192 -n 4096 --mlock --no-mmap --flash-attn --no-kv-offload --no-mmap &",
|
||||||
// Read config values with defaults
|
// Read config values with defaults
|
||||||
let n_moe = env::var("LLM_SERVER_N_MOE").unwrap_or("4".to_string());
|
|
||||||
let ctx_size = env::var("LLM_SERVER_CTX_SIZE").unwrap_or("4096".to_string());
|
|
||||||
let parallel = env::var("LLM_SERVER_PARALLEL").unwrap_or("1".to_string());
|
|
||||||
let cont_batching = env::var("LLM_SERVER_CONT_BATCHING").unwrap_or("true".to_string());
|
|
||||||
let mlock = env::var("LLM_SERVER_MLOCK").unwrap_or("true".to_string());
|
|
||||||
let no_mmap = env::var("LLM_SERVER_NO_MMAP").unwrap_or("true".to_string());
|
let conn = app_state.conn.clone();
|
||||||
let gpu_layers = env::var("LLM_SERVER_GPU_LAYERS").unwrap_or("20".to_string());
|
let config_manager = ConfigManager::new(conn.clone());
|
||||||
|
|
||||||
|
let default_bot_id = {
|
||||||
|
let mut conn = conn.lock().unwrap();
|
||||||
|
bots.filter(name.eq("default"))
|
||||||
|
.select(id)
|
||||||
|
.first::<uuid::Uuid>(&mut *conn)
|
||||||
|
.unwrap_or_else(|_| uuid::Uuid::nil())
|
||||||
|
};
|
||||||
|
|
||||||
|
let n_moe = config_manager.get_config(&default_bot_id, "llm-server-n-moe", None).unwrap_or("4".to_string());
|
||||||
|
let ctx_size = config_manager.get_config(&default_bot_id, "llm-server-ctx-size", None).unwrap_or("4096".to_string());
|
||||||
|
let parallel = config_manager.get_config(&default_bot_id, "llm-server-parallel", None).unwrap_or("1".to_string());
|
||||||
|
let cont_batching = config_manager.get_config(&default_bot_id, "llm-server-cont-batching", None).unwrap_or("true".to_string());
|
||||||
|
let mlock = config_manager.get_config(&default_bot_id, "llm-server-mlock", None).unwrap_or("true".to_string());
|
||||||
|
let no_mmap = config_manager.get_config(&default_bot_id, "llm-server-no-mmap", None).unwrap_or("true".to_string());
|
||||||
|
let gpu_layers = config_manager.get_config(&default_bot_id, "llm-server-gpu-layers", None).unwrap_or("20".to_string());
|
||||||
|
|
||||||
// Build command arguments dynamically
|
// Build command arguments dynamically
|
||||||
let mut args = format!(
|
let mut args = format!(
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue