Add trace logging to AutomationService and increase timeout values in LLM commands
This commit is contained in:
parent
3761707c5c
commit
a716f69702
3 changed files with 165 additions and 55 deletions
|
|
@ -3,7 +3,7 @@ use crate::shared::models::{Automation, TriggerKind};
|
|||
use crate::shared::state::AppState;
|
||||
use chrono::{DateTime, Datelike, Timelike, Utc};
|
||||
use diesel::prelude::*;
|
||||
use log::{error, info, warn};
|
||||
use log::{error, info, trace, warn};
|
||||
use std::env;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
|
@ -17,6 +17,7 @@ pub struct AutomationService {
|
|||
|
||||
impl AutomationService {
|
||||
pub fn new(state: Arc<AppState>, scripts_dir: &str) -> Self {
|
||||
trace!("Creating AutomationService with scripts_dir='{}'", scripts_dir);
|
||||
Self {
|
||||
state,
|
||||
scripts_dir: scripts_dir.to_string(),
|
||||
|
|
@ -24,6 +25,7 @@ impl AutomationService {
|
|||
}
|
||||
|
||||
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
|
||||
trace!("Spawning AutomationService background task");
|
||||
let service = Arc::new(self);
|
||||
tokio::task::spawn_local({
|
||||
let service = service.clone();
|
||||
|
|
@ -32,9 +34,11 @@ impl AutomationService {
|
|||
let mut last_check = Utc::now();
|
||||
loop {
|
||||
interval.tick().await;
|
||||
trace!("Automation cycle tick started; last_check={}", last_check);
|
||||
if let Err(e) = service.run_cycle(&mut last_check).await {
|
||||
error!("Automation cycle error: {}", e);
|
||||
}
|
||||
trace!("Automation cycle tick completed");
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -44,45 +48,75 @@ impl AutomationService {
|
|||
&self,
|
||||
last_check: &mut DateTime<Utc>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
trace!("Running automation cycle; last_check={}", last_check);
|
||||
let automations = self.load_active_automations().await?;
|
||||
trace!("Loaded {} active automations", automations.len());
|
||||
self.check_table_changes(&automations, *last_check).await;
|
||||
self.process_schedules(&automations).await;
|
||||
*last_check = Utc::now();
|
||||
trace!("Automation cycle finished; new last_check={}", last_check);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_active_automations(&self) -> Result<Vec<Automation>, diesel::result::Error> {
|
||||
trace!("Loading active automations from database");
|
||||
use crate::shared::models::system_automations::dsl::*;
|
||||
let mut conn = self.state.conn.lock().unwrap();
|
||||
system_automations
|
||||
let result = system_automations
|
||||
.filter(is_active.eq(true))
|
||||
.load::<Automation>(&mut *conn)
|
||||
.map_err(Into::into)
|
||||
.load::<Automation>(&mut *conn);
|
||||
trace!("Database query for active automations completed");
|
||||
result.map_err(Into::into)
|
||||
}
|
||||
|
||||
async fn check_table_changes(&self, automations: &[Automation], since: DateTime<Utc>) {
|
||||
trace!("Checking table changes since={}", since);
|
||||
for automation in automations {
|
||||
let trigger_kind = match crate::shared::models::TriggerKind::from_i32(automation.kind) {
|
||||
trace!(
|
||||
"Checking automation id={} kind={} target={:?}",
|
||||
automation.id,
|
||||
automation.kind,
|
||||
automation.target
|
||||
);
|
||||
|
||||
let trigger_kind = match TriggerKind::from_i32(automation.kind) {
|
||||
Some(k) => k,
|
||||
None => continue,
|
||||
None => {
|
||||
trace!("Skipping automation {}: invalid TriggerKind", automation.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if !matches!(
|
||||
trigger_kind,
|
||||
TriggerKind::TableUpdate | TriggerKind::TableInsert | TriggerKind::TableDelete
|
||||
) {
|
||||
trace!(
|
||||
"Skipping automation {}: trigger_kind {:?} not table-related",
|
||||
automation.id,
|
||||
trigger_kind
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let table = match &automation.target {
|
||||
Some(t) => t,
|
||||
None => continue,
|
||||
None => {
|
||||
trace!("Skipping automation {}: no table target", automation.id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let column = match trigger_kind {
|
||||
TriggerKind::TableInsert => "created_at",
|
||||
_ => "updated_at",
|
||||
};
|
||||
trace!(
|
||||
"Building query for table='{}' column='{}' trigger_kind={:?}",
|
||||
table,
|
||||
column,
|
||||
trigger_kind
|
||||
);
|
||||
|
||||
let query = format!(
|
||||
"SELECT COUNT(*) as count FROM {} WHERE {} > $1",
|
||||
|
|
@ -104,11 +138,23 @@ impl AutomationService {
|
|||
|
||||
match count_result {
|
||||
Ok(result) if result.count > 0 => {
|
||||
trace!(
|
||||
"Detected {} change(s) in table='{}'; triggering automation {}",
|
||||
result.count,
|
||||
table,
|
||||
automation.id
|
||||
);
|
||||
drop(conn_guard);
|
||||
self.execute_action(&automation.param).await;
|
||||
self.update_last_triggered(automation.id).await;
|
||||
}
|
||||
Ok(_result) => {}
|
||||
Ok(result) => {
|
||||
trace!(
|
||||
"No changes detected for automation {} (count={})",
|
||||
automation.id,
|
||||
result.count
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error checking changes for table '{}': {}", table, e);
|
||||
}
|
||||
|
|
@ -118,12 +164,31 @@ impl AutomationService {
|
|||
|
||||
async fn process_schedules(&self, automations: &[Automation]) {
|
||||
let now = Utc::now();
|
||||
trace!(
|
||||
"Processing scheduled automations at UTC={}",
|
||||
now.format("%Y-%m-%d %H:%M:%S")
|
||||
);
|
||||
for automation in automations {
|
||||
if let Some(TriggerKind::Scheduled) = TriggerKind::from_i32(automation.kind) {
|
||||
trace!(
|
||||
"Evaluating schedule pattern={:?} for automation {}",
|
||||
automation.schedule,
|
||||
automation.id
|
||||
);
|
||||
if let Some(pattern) = &automation.schedule {
|
||||
if Self::should_run_cron(pattern, now.timestamp()) {
|
||||
trace!(
|
||||
"Pattern matched; executing automation {} param='{}'",
|
||||
automation.id,
|
||||
automation.param
|
||||
);
|
||||
self.execute_action(&automation.param).await;
|
||||
self.update_last_triggered(automation.id).await;
|
||||
} else {
|
||||
trace!(
|
||||
"Pattern did not match for automation {}",
|
||||
automation.id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -131,6 +196,7 @@ impl AutomationService {
|
|||
}
|
||||
|
||||
async fn update_last_triggered(&self, automation_id: Uuid) {
|
||||
trace!("Updating last_triggered for automation_id={}", automation_id);
|
||||
use crate::shared::models::system_automations::dsl::*;
|
||||
let mut conn = self.state.conn.lock().unwrap();
|
||||
let now = Utc::now();
|
||||
|
|
@ -142,30 +208,45 @@ impl AutomationService {
|
|||
"Failed to update last_triggered for automation {}: {}",
|
||||
automation_id, e
|
||||
);
|
||||
} else {
|
||||
trace!("Successfully updated last_triggered for {}", automation_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn should_run_cron(pattern: &str, timestamp: i64) -> bool {
|
||||
trace!("Evaluating cron pattern='{}' at timestamp={}", pattern, timestamp);
|
||||
let parts: Vec<&str> = pattern.split_whitespace().collect();
|
||||
if parts.len() != 5 {
|
||||
trace!("Invalid cron pattern '{}'", pattern);
|
||||
return false;
|
||||
}
|
||||
let dt = match DateTime::<Utc>::from_timestamp(timestamp, 0) {
|
||||
Some(dt) => dt,
|
||||
None => return false,
|
||||
None => {
|
||||
trace!("Invalid timestamp={}", timestamp);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
let minute = dt.minute() as i32;
|
||||
let hour = dt.hour() as i32;
|
||||
let day = dt.day() as i32;
|
||||
let month = dt.month() as i32;
|
||||
let weekday = dt.weekday().num_days_from_monday() as i32;
|
||||
[minute, hour, day, month, weekday]
|
||||
let match_result = [minute, hour, day, month, weekday]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.all(|(i, &val)| Self::cron_part_matches(parts[i], val))
|
||||
.all(|(i, &val)| Self::cron_part_matches(parts[i], val));
|
||||
trace!(
|
||||
"Cron pattern='{}' result={} at {}",
|
||||
pattern,
|
||||
match_result,
|
||||
dt
|
||||
);
|
||||
match_result
|
||||
}
|
||||
|
||||
fn cron_part_matches(part: &str, value: i32) -> bool {
|
||||
trace!("Checking cron part '{}' against value={}", part, value);
|
||||
if part == "*" {
|
||||
return true;
|
||||
}
|
||||
|
|
@ -183,46 +264,38 @@ impl AutomationService {
|
|||
}
|
||||
|
||||
async fn execute_action(&self, param: &str) {
|
||||
// Get bot_id early to use in Redis key
|
||||
trace!("Starting execute_action with param='{}'", param);
|
||||
let bot_id_string = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
|
||||
let bot_id = Uuid::parse_str(&bot_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
||||
trace!("Resolved bot_id={} for param='{}'", bot_id, param);
|
||||
|
||||
// Check if this job is already running for this bot
|
||||
let is_bas_file = param.ends_with(".bas");
|
||||
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||
trace!("Redis key for job tracking: {}", redis_key);
|
||||
|
||||
// Try to check if job is running using Redis
|
||||
if let Some(redis_client) = &self.state.redis_client {
|
||||
match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(mut conn) => {
|
||||
// Check if key exists
|
||||
trace!("Connected to Redis; checking if job '{}' is running", param);
|
||||
let is_running: Result<bool, redis::RedisError> = redis::cmd("EXISTS")
|
||||
.arg(&redis_key)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
if let Ok(true) = is_running {
|
||||
if is_bas_file {
|
||||
warn!(
|
||||
"⚠️ Job '{}' is already running for bot '{}', skipping execution to allow only one .bas execution per bot",
|
||||
param, bot_id
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Job '{}' is already running for bot '{}', skipping execution",
|
||||
param, bot_id
|
||||
);
|
||||
}
|
||||
warn!(
|
||||
"Job '{}' is already running for bot '{}'; skipping execution",
|
||||
param, bot_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark job as running (set with 300 second expiry as safety)
|
||||
let _: Result<(), redis::RedisError> = redis::cmd("SETEX")
|
||||
.arg(&redis_key)
|
||||
.arg(300) // 5 minutes expiry
|
||||
.arg(300)
|
||||
.arg("1")
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
trace!("Job '{}' marked as running in Redis", param);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to connect to Redis for job tracking: {}", e);
|
||||
|
|
@ -231,16 +304,20 @@ impl AutomationService {
|
|||
}
|
||||
|
||||
let full_path = Path::new(&self.scripts_dir).join(param);
|
||||
trace!("Resolved full path: {}", full_path.display());
|
||||
|
||||
let script_content = match tokio::fs::read_to_string(&full_path).await {
|
||||
Ok(content) => content,
|
||||
Ok(content) => {
|
||||
trace!("Script '{}' read successfully", param);
|
||||
content
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read script {}: {}", full_path.display(), e);
|
||||
// Clean up running flag on error
|
||||
self.cleanup_job_flag(&bot_id, param).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("Executing action with param: {} for bot: {}", param, bot_id);
|
||||
|
||||
let user_session = crate::shared::models::UserSession {
|
||||
id: Uuid::new_v4(),
|
||||
user_id: Uuid::new_v4(),
|
||||
|
|
@ -252,30 +329,46 @@ impl AutomationService {
|
|||
created_at: Utc::now(),
|
||||
updated_at: Utc::now(),
|
||||
};
|
||||
trace!(
|
||||
"Created temporary UserSession id={} for bot_id={}",
|
||||
user_session.id,
|
||||
bot_id
|
||||
);
|
||||
|
||||
let script_service = ScriptService::new(Arc::clone(&self.state), user_session);
|
||||
let ast = match script_service.compile(&script_content) {
|
||||
Ok(ast) => ast,
|
||||
Ok(ast) => {
|
||||
trace!("Compilation successful for script '{}'", param);
|
||||
ast
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error compiling script: {}", e);
|
||||
error!("Error compiling script '{}': {}", param, e);
|
||||
self.cleanup_job_flag(&bot_id, param).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Running compiled script '{}'", param);
|
||||
match script_service.run(&ast) {
|
||||
Ok(_result) => {
|
||||
info!("Script executed successfully");
|
||||
Ok(_) => {
|
||||
info!("Script '{}' executed successfully", param);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error executing script: {}", e);
|
||||
error!("Error executing script '{}': {}", param, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up running flag after execution
|
||||
trace!("Cleaning up Redis flag for job '{}'", param);
|
||||
self.cleanup_job_flag(&bot_id, param).await;
|
||||
trace!("Finished execute_action for '{}'", param);
|
||||
}
|
||||
|
||||
async fn cleanup_job_flag(&self, bot_id: &Uuid, param: &str) {
|
||||
trace!(
|
||||
"Cleaning up Redis flag for bot_id={} param='{}'",
|
||||
bot_id,
|
||||
param
|
||||
);
|
||||
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||
|
||||
if let Some(redis_client) = &self.state.redis_client {
|
||||
|
|
@ -285,6 +378,7 @@ impl AutomationService {
|
|||
.arg(&redis_key)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
trace!("Removed Redis key '{}'", redis_key);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to connect to Redis for cleanup: {}", e);
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ pub fn llm_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
|
|||
}
|
||||
});
|
||||
|
||||
match rx.recv_timeout(Duration::from_secs(180)) {
|
||||
match rx.recv_timeout(Duration::from_secs(500)) {
|
||||
Ok(Ok(result)) => Ok(Dynamic::from(result)),
|
||||
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
|
||||
e.to_string().into(),
|
||||
|
|
|
|||
|
|
@ -189,14 +189,22 @@ async fn start_llm_server(
|
|||
std::env::set_var("OMP_PROC_BIND", "close");
|
||||
|
||||
// "cd {} && numactl --interleave=all ./llama-server -m {} --host 0.0.0.0 --port {} --threads 20 --threads-batch 40 --temp 0.7 --parallel 1 --repeat-penalty 1.1 --ctx-size 8192 --batch-size 8192 -n 4096 --mlock --no-mmap --flash-attn --no-kv-offload --no-mmap &",
|
||||
if cfg!(windows) {
|
||||
let mut cmd = tokio::process::Command::new("cmd");
|
||||
cmd.arg("/C").arg(format!(
|
||||
"cd {} && .\\llama-server.exe -m {} --host 0.0.0.0 --port {} --top_p 0.95 --temp 0.6 --flash-attn on --ctx-size 4096 --repeat-penalty 1.2 -ngl 20 ",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
cmd.spawn()?;
|
||||
} else {
|
||||
let mut cmd = tokio::process::Command::new("sh");
|
||||
cmd.arg("-c").arg(format!(
|
||||
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --top_p 0.95 --temp 0.6 --flash-attn on --ctx-size 4096 --repeat-penalty 1.2 -ngl 20 &",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
cmd.spawn()?;
|
||||
}
|
||||
|
||||
let mut cmd = tokio::process::Command::new("sh");
|
||||
cmd.arg("-c").arg(format!(
|
||||
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --top_p 0.95 --temp 0.6 --flash-attn on --ctx-size 4096 --repeat-penalty 1.2 -ngl 20 &",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
|
||||
cmd.spawn()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -207,16 +215,24 @@ async fn start_embedding_server(
|
|||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let port = url.split(':').last().unwrap_or("8082");
|
||||
|
||||
let mut cmd = tokio::process::Command::new("sh");
|
||||
cmd.arg("-c").arg(format!(
|
||||
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 &",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
|
||||
cmd.spawn()?;
|
||||
if cfg!(windows) {
|
||||
let mut cmd = tokio::process::Command::new("cmd");
|
||||
cmd.arg("/c").arg(format!(
|
||||
"cd {} && .\\llama-server.exe -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
cmd.spawn()?;
|
||||
} else {
|
||||
let mut cmd = tokio::process::Command::new("sh");
|
||||
cmd.arg("-c").arg(format!(
|
||||
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 &",
|
||||
llama_cpp_path, model_path, port
|
||||
));
|
||||
cmd.spawn()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn is_server_running(url: &str) -> bool {
|
||||
let client = reqwest::Client::new();
|
||||
match client.get(&format!("{}/health", url)).send().await {
|
||||
|
|
@ -275,7 +291,7 @@ pub async fn chat_completions_local(
|
|||
|
||||
// Send request to llama.cpp server
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(180)) // 2 minute timeout
|
||||
.timeout(Duration::from_secs(500)) // 2 minute timeout
|
||||
.build()
|
||||
.map_err(|e| {
|
||||
error!("Error creating HTTP client: {}", e);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue