feat(automation): improve logging and bot path resolution
- Updated RUST_LOG environment variable in launch.json to include more detailed debug logging configuration - Changed trace! logs to debug! in AutomationService for better visibility - Replaced environment variable usage with get_default_bot helper function - Improved bot path resolution by using bot name from database - Added error handling for bot name query - Simplified S3 bucket name generation using get_default_bot - Removed unused imports and environment variable dependencies
This commit is contained in:
parent
6244c99854
commit
4bb3664dfd
7 changed files with 110 additions and 109 deletions
3
.vscode/launch.json
vendored
3
.vscode/launch.json
vendored
|
|
@ -14,7 +14,8 @@
|
|||
},
|
||||
"args": [],
|
||||
"env": {
|
||||
"RUST_LOG": "info"
|
||||
"RUST_LOG": "debug,actix_server=off,hyper_util=off,aws_smithy_runtime=off,aws_smithy_runtime_api=off,tracing=off,aws_sdk_s3=off"
|
||||
|
||||
},
|
||||
"cwd": "${workspaceFolder}"
|
||||
},
|
||||
|
|
|
|||
|
|
@ -3,8 +3,7 @@ use crate::shared::models::{Automation, TriggerKind};
|
|||
use crate::shared::state::AppState;
|
||||
use chrono::{DateTime, Datelike, Timelike, Utc};
|
||||
use diesel::prelude::*;
|
||||
use log::{error, info, trace, warn};
|
||||
use std::env;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
|
|
@ -176,14 +175,14 @@ impl AutomationService {
|
|||
);
|
||||
for automation in automations {
|
||||
if let Some(TriggerKind::Scheduled) = TriggerKind::from_i32(automation.kind) {
|
||||
trace!(
|
||||
debug!(
|
||||
"Evaluating schedule pattern={:?} for automation {}",
|
||||
automation.schedule,
|
||||
automation.id
|
||||
);
|
||||
if let Some(pattern) = &automation.schedule {
|
||||
if Self::should_run_cron(pattern, now.timestamp()) {
|
||||
trace!(
|
||||
debug!(
|
||||
"Pattern matched; executing automation {} param='{}'",
|
||||
automation.id,
|
||||
automation.param
|
||||
|
|
@ -191,7 +190,7 @@ impl AutomationService {
|
|||
self.execute_action(&automation.param).await;
|
||||
self.update_last_triggered(automation.id).await;
|
||||
} else {
|
||||
trace!("Pattern did not match for automation {}", automation.id);
|
||||
debug!("Pattern did not match for automation {}", automation.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -278,8 +277,7 @@ impl AutomationService {
|
|||
|
||||
async fn execute_action(&self, param: &str) {
|
||||
trace!("Starting execute_action with param='{}'", param);
|
||||
let bot_id_string = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
|
||||
let bot_id = Uuid::parse_str(&bot_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
||||
let (bot_id, _) = crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap());
|
||||
trace!("Resolved bot_id={} for param='{}'", bot_id, param);
|
||||
|
||||
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||
|
|
@ -316,7 +314,34 @@ impl AutomationService {
|
|||
}
|
||||
}
|
||||
|
||||
let full_path = Path::new(&self.scripts_dir).join(param);
|
||||
// Get bot name from database
|
||||
let bot_name = {
|
||||
use crate::shared::models::bots;
|
||||
let mut conn = self.state.conn.lock().unwrap();
|
||||
match bots::table
|
||||
.filter(bots::id.eq(bot_id))
|
||||
.select(bots::name)
|
||||
.first::<String>(&mut *conn)
|
||||
.optional()
|
||||
{
|
||||
Ok(Some(name)) => name,
|
||||
Ok(None) => {
|
||||
warn!("No bot found with id {}, using default name", bot_id);
|
||||
crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to query bot name: {}", e);
|
||||
crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let path_str = format!("./work/{}.gbai/{}.gbdialog/{}",
|
||||
bot_name,
|
||||
bot_name,
|
||||
param
|
||||
);
|
||||
let full_path = Path::new(&path_str);
|
||||
trace!("Resolved full path: {}", full_path.display());
|
||||
|
||||
let script_content = match tokio::fs::read_to_string(&full_path).await {
|
||||
|
|
@ -333,9 +358,8 @@ impl AutomationService {
|
|||
|
||||
if let Some(client) = &self.state.drive {
|
||||
let bucket_name = format!(
|
||||
"{}{}.gbai",
|
||||
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
|
||||
env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string())
|
||||
"{}.gbai",
|
||||
crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).0.to_string()
|
||||
);
|
||||
let s3_key = format!(".gbdialog/{}", param);
|
||||
|
||||
|
|
|
|||
|
|
@ -162,22 +162,8 @@ pub async fn get_from_bucket(
|
|||
let client = state.drive.as_ref().ok_or("S3 client not configured")?;
|
||||
|
||||
let bucket_name = {
|
||||
let cfg = state
|
||||
.config
|
||||
.as_ref()
|
||||
.ok_or_else(|| -> Box<dyn Error + Send + Sync> {
|
||||
error!("App configuration missing");
|
||||
"App configuration missing".into()
|
||||
})?;
|
||||
|
||||
let org_prefix = &cfg.drive.org_prefix;
|
||||
|
||||
if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') {
|
||||
error!("Invalid org_prefix: {}", org_prefix);
|
||||
return Err("Invalid organization prefix".into());
|
||||
}
|
||||
|
||||
let bucket = format!("{}default.gbai", org_prefix);
|
||||
let bucket = format!("default.gbai");
|
||||
debug!("Resolved bucket name: {}", bucket);
|
||||
bucket
|
||||
};
|
||||
|
|
|
|||
139
src/bot/mod.rs
139
src/bot/mod.rs
|
|
@ -1,4 +1,5 @@
|
|||
use crate::channels::ChannelAdapter;
|
||||
use crate::config::ConfigManager;
|
||||
use crate::context::langcache::get_langcache_client;
|
||||
use crate::drive_monitor::DriveMonitor;
|
||||
use crate::kb::embeddings::generate_embeddings;
|
||||
|
|
@ -16,7 +17,28 @@ use std::sync::Arc;
|
|||
use tokio::sync::mpsc;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use uuid::Uuid;
|
||||
use crate::config::ConfigManager;
|
||||
|
||||
pub fn get_default_bot(conn: &mut PgConnection) -> (Uuid, String) {
|
||||
use crate::shared::models::schema::bots::dsl::*;
|
||||
use diesel::prelude::*;
|
||||
|
||||
match bots
|
||||
.filter(is_active.eq(true))
|
||||
.select((id, name))
|
||||
.first::<(Uuid, String)>(conn)
|
||||
.optional()
|
||||
{
|
||||
Ok(Some((bot_id, bot_name))) => (bot_id, bot_name),
|
||||
Ok(None) => {
|
||||
warn!("No active bots found, using nil UUID");
|
||||
(Uuid::nil(), "default".to_string())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to query default bot: {}", e);
|
||||
(Uuid::nil(), "default".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BotOrchestrator {
|
||||
pub state: Arc<AppState>,
|
||||
|
|
@ -108,13 +130,11 @@ impl BotOrchestrator {
|
|||
|
||||
pub async fn create_bot(
|
||||
&self,
|
||||
bot_guid: &str,
|
||||
bot_name: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let bucket_name = format!(
|
||||
"{}{}.gbai",
|
||||
self.state.config.as_ref().unwrap().drive.org_prefix,
|
||||
bot_guid
|
||||
);
|
||||
// TODO: Move logic to here after duplication refactor
|
||||
|
||||
let bucket_name = format!("{}.gbai", bot_name);
|
||||
crate::create_bucket::create_bucket(&bucket_name)?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -273,8 +293,9 @@ impl BotOrchestrator {
|
|||
"Sending direct message to session {}: '{}'",
|
||||
session_id, content
|
||||
);
|
||||
let (bot_id, _) = get_default_bot(&mut self.state.conn.lock().unwrap());
|
||||
let bot_response = BotResponse {
|
||||
bot_id: "default_bot".to_string(),
|
||||
bot_id: bot_id.to_string(),
|
||||
user_id: "default_user".to_string(),
|
||||
session_id: session_id.to_string(),
|
||||
channel: channel.to_string(),
|
||||
|
|
@ -312,13 +333,15 @@ impl BotOrchestrator {
|
|||
"Changing context for session {} to {}",
|
||||
session_id, context_name
|
||||
);
|
||||
|
||||
|
||||
let mut session_manager = self.state.session_manager.lock().await;
|
||||
session_manager.update_session_context(
|
||||
&Uuid::parse_str(session_id)?,
|
||||
&Uuid::parse_str(user_id)?,
|
||||
context_name.to_string()
|
||||
).await?;
|
||||
session_manager
|
||||
.update_session_context(
|
||||
&Uuid::parse_str(session_id)?,
|
||||
&Uuid::parse_str(user_id)?,
|
||||
context_name.to_string(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send confirmation back to client
|
||||
let confirmation = BotResponse {
|
||||
|
|
@ -444,25 +467,31 @@ impl BotOrchestrator {
|
|||
// Handle context change messages (type 4) first
|
||||
if message.message_type == 4 {
|
||||
if let Some(context_name) = &message.context_name {
|
||||
return self.handle_context_change(
|
||||
&message.user_id,
|
||||
&message.bot_id,
|
||||
&message.session_id,
|
||||
&message.channel,
|
||||
context_name
|
||||
).await;
|
||||
return self
|
||||
.handle_context_change(
|
||||
&message.user_id,
|
||||
&message.bot_id,
|
||||
&message.session_id,
|
||||
&message.channel,
|
||||
context_name,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Create regular response
|
||||
let channel = message.channel.clone();
|
||||
let channel = message.channel.clone();
|
||||
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
|
||||
let max_context_size = config_manager
|
||||
.get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "llm-server-ctx-size", None)
|
||||
.get_config(
|
||||
&Uuid::parse_str(&message.bot_id).unwrap_or_default(),
|
||||
"llm-server-ctx-size",
|
||||
None,
|
||||
)
|
||||
.unwrap_or_default()
|
||||
.parse::<usize>()
|
||||
.unwrap_or(0);
|
||||
|
||||
|
||||
let current_context_length = 0usize;
|
||||
|
||||
let bot_response = BotResponse {
|
||||
|
|
@ -480,14 +509,11 @@ let channel = message.channel.clone();
|
|||
context_max_length: max_context_size,
|
||||
};
|
||||
|
||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
|
||||
adapter.send_message(bot_response).await?;
|
||||
} else {
|
||||
warn!(
|
||||
"No channel adapter found for message channel: {}",
|
||||
channel
|
||||
);
|
||||
}
|
||||
if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
|
||||
adapter.send_message(bot_response).await?;
|
||||
} else {
|
||||
warn!("No channel adapter found for message channel: {}", channel);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -654,15 +680,6 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
|
|||
e
|
||||
})?;
|
||||
|
||||
let _bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") {
|
||||
Uuid::parse_str(&bot_guid).map_err(|e| {
|
||||
warn!("Invalid BOT_GUID from env: {}", e);
|
||||
e
|
||||
})?
|
||||
} else {
|
||||
warn!("BOT_GUID not set in environment, using nil UUID");
|
||||
Uuid::nil()
|
||||
};
|
||||
|
||||
let session = {
|
||||
let mut sm = self.state.session_manager.lock().await;
|
||||
|
|
@ -859,11 +876,15 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
|
|||
|
||||
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
|
||||
let max_context_size = config_manager
|
||||
.get_config(&Uuid::parse_str(&message.bot_id).unwrap_or_default(), "llm-server-ctx-size", None)
|
||||
.get_config(
|
||||
&Uuid::parse_str(&message.bot_id).unwrap_or_default(),
|
||||
"llm-server-ctx-size",
|
||||
None,
|
||||
)
|
||||
.unwrap_or_default()
|
||||
.parse::<usize>()
|
||||
.unwrap_or(0);
|
||||
|
||||
|
||||
let current_context_length = 0usize;
|
||||
|
||||
let final_msg = BotResponse {
|
||||
|
|
@ -933,7 +954,6 @@ if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
|
|||
})?
|
||||
};
|
||||
|
||||
|
||||
let start_script_path = format!("./work/{}.gbai/{}.gbdialog/start.ast", bot_name, bot_name);
|
||||
|
||||
let start_script = match std::fs::read_to_string(&start_script_path) {
|
||||
|
|
@ -1086,32 +1106,10 @@ pub fn bot_from_url(
|
|||
}
|
||||
}
|
||||
|
||||
// Fall back to first available bot
|
||||
match bots
|
||||
.filter(is_active.eq(true))
|
||||
.select((id, name))
|
||||
.first::<(Uuid, String)>(db_conn)
|
||||
.optional()
|
||||
{
|
||||
Ok(Some((first_bot_id, first_bot_name))) => {
|
||||
log::info!(
|
||||
"Using first available bot: {} ({})",
|
||||
first_bot_id,
|
||||
first_bot_name
|
||||
);
|
||||
Ok((first_bot_id, first_bot_name))
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("No active bots found in database");
|
||||
Err(HttpResponse::ServiceUnavailable()
|
||||
.json(serde_json::json!({"error": "No bots available"})))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to query bots: {}", e);
|
||||
Err(HttpResponse::InternalServerError()
|
||||
.json(serde_json::json!({"error": "Failed to query bots"})))
|
||||
}
|
||||
}
|
||||
// Fall back to default bot
|
||||
let (bot_id, bot_name) = get_default_bot(db_conn);
|
||||
log::info!("Using default bot: {} ({})", bot_id, bot_name);
|
||||
Ok((bot_id, bot_name))
|
||||
}
|
||||
|
||||
impl Default for BotOrchestrator {
|
||||
|
|
@ -1388,7 +1386,6 @@ async fn websocket_handler(
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
|
||||
#[actix_web::post("/api/warn")]
|
||||
async fn send_warning_handler(
|
||||
data: web::Data<AppState>,
|
||||
|
|
|
|||
|
|
@ -44,7 +44,6 @@ pub struct DriveConfig {
|
|||
pub access_key: String,
|
||||
pub secret_key: String,
|
||||
pub use_ssl: bool,
|
||||
pub org_prefix: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
@ -196,7 +195,6 @@ impl AppConfig {
|
|||
access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"),
|
||||
secret_key: get_str("DRIVE_SECRET", "minioadmin"),
|
||||
use_ssl: get_bool("DRIVE_USE_SSL", false),
|
||||
org_prefix: get_str("DRIVE_ORG_PREFIX", "pragmatismo-"),
|
||||
};
|
||||
|
||||
let email = EmailConfig {
|
||||
|
|
@ -276,10 +274,7 @@ impl AppConfig {
|
|||
use_ssl: std::env::var("DRIVE_USE_SSL")
|
||||
.unwrap_or_else(|_| "false".to_string())
|
||||
.parse()
|
||||
.unwrap_or(false),
|
||||
org_prefix: std::env::var("DRIVE_ORG_PREFIX")
|
||||
.unwrap_or_else(|_| "pragmatismo-".to_string()),
|
||||
};
|
||||
.unwrap_or(false) };
|
||||
|
||||
let email = EmailConfig {
|
||||
from: std::env::var("EMAIL_FROM").unwrap_or_else(|_| "noreply@example.com".to_string()),
|
||||
|
|
@ -370,7 +365,6 @@ fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> {
|
|||
writeln!(file, "DRIVE_ACCESSKEY={}", drive.access_key)?;
|
||||
writeln!(file, "DRIVE_SECRET={}", drive.secret_key)?;
|
||||
writeln!(file, "DRIVE_USE_SSL={}", drive.use_ssl)?;
|
||||
writeln!(file, "DRIVE_ORG_PREFIX={}", drive.org_prefix)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ impl DriveMonitor {
|
|||
.await
|
||||
{
|
||||
Ok(head_res) => {
|
||||
debug!(
|
||||
trace!(
|
||||
"HeadObject successful for {}, metadata: {:?}",
|
||||
path, head_res
|
||||
);
|
||||
|
|
@ -283,10 +283,10 @@ impl DriveMonitor {
|
|||
);
|
||||
|
||||
let bytes = response.body.collect().await?.into_bytes();
|
||||
debug!("Collected {} bytes for {}", bytes.len(), path);
|
||||
trace!("Collected {} bytes for {}", bytes.len(), path);
|
||||
let csv_content = String::from_utf8(bytes.to_vec())
|
||||
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
|
||||
debug!("Found {}: {} bytes", path, csv_content.len());
|
||||
trace!("Found {}: {} bytes", path, csv_content.len());
|
||||
|
||||
// Restart LLaMA servers only if llm- properties changed
|
||||
let llm_lines: Vec<_> = csv_content
|
||||
|
|
|
|||
|
|
@ -176,7 +176,6 @@ async fn create_s3_client(
|
|||
server: std::env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"),
|
||||
access_key: std::env::var("DRIVE_ACCESS_KEY").expect("DRIVE_ACCESS_KEY not set"),
|
||||
secret_key: std::env::var("DRIVE_SECRET_KEY").expect("DRIVE_SECRET_KEY not set"),
|
||||
org_prefix: "".to_string(),
|
||||
use_ssl: false,
|
||||
};
|
||||
Ok(init_drive(&config).await?)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue