2025-10-06 10:30:17 -03:00
|
|
|
use crate::basic::ScriptService;
|
2025-10-06 14:55:04 -03:00
|
|
|
use crate::shared::models::{Automation, TriggerKind};
|
2025-10-06 10:30:17 -03:00
|
|
|
use crate::shared::state::AppState;
|
2025-10-11 12:29:03 -03:00
|
|
|
use chrono::{DateTime, Datelike, Timelike, Utc};
|
|
|
|
|
use diesel::prelude::*;
|
2025-10-17 13:11:49 -03:00
|
|
|
use log::{error, info, trace, warn};
|
2025-10-16 14:22:28 -03:00
|
|
|
use std::env;
|
2025-10-06 10:30:17 -03:00
|
|
|
use std::path::Path;
|
2025-10-11 20:02:14 -03:00
|
|
|
use std::sync::Arc;
|
2025-10-06 10:30:17 -03:00
|
|
|
use tokio::time::Duration;
|
|
|
|
|
use uuid::Uuid;
|
2025-10-11 12:29:03 -03:00
|
|
|
|
2025-10-06 10:30:17 -03:00
|
|
|
pub struct AutomationService {
|
2025-10-12 20:12:49 -03:00
|
|
|
state: Arc<AppState>,
|
2025-10-06 10:30:17 -03:00
|
|
|
scripts_dir: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AutomationService {
|
2025-10-12 20:12:49 -03:00
|
|
|
pub fn new(state: Arc<AppState>, scripts_dir: &str) -> Self {
|
2025-10-20 19:49:54 -03:00
|
|
|
trace!(
|
|
|
|
|
"Creating AutomationService with scripts_dir='{}'",
|
|
|
|
|
scripts_dir
|
|
|
|
|
);
|
2025-10-06 10:30:17 -03:00
|
|
|
Self {
|
|
|
|
|
state,
|
|
|
|
|
scripts_dir: scripts_dir.to_string(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Spawning AutomationService background task");
|
2025-10-11 20:02:14 -03:00
|
|
|
let service = Arc::new(self);
|
|
|
|
|
tokio::task::spawn_local({
|
|
|
|
|
let service = service.clone();
|
|
|
|
|
async move {
|
|
|
|
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
|
|
|
|
let mut last_check = Utc::now();
|
|
|
|
|
loop {
|
|
|
|
|
interval.tick().await;
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Automation cycle tick started; last_check={}", last_check);
|
2025-10-11 20:02:14 -03:00
|
|
|
if let Err(e) = service.run_cycle(&mut last_check).await {
|
|
|
|
|
error!("Automation cycle error: {}", e);
|
|
|
|
|
}
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Automation cycle tick completed");
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn run_cycle(
|
|
|
|
|
&self,
|
|
|
|
|
last_check: &mut DateTime<Utc>,
|
|
|
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Running automation cycle; last_check={}", last_check);
|
2025-10-06 10:30:17 -03:00
|
|
|
let automations = self.load_active_automations().await?;
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Loaded {} active automations", automations.len());
|
2025-10-06 10:30:17 -03:00
|
|
|
self.check_table_changes(&automations, *last_check).await;
|
|
|
|
|
self.process_schedules(&automations).await;
|
|
|
|
|
*last_check = Utc::now();
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Automation cycle finished; new last_check={}", last_check);
|
2025-10-06 10:30:17 -03:00
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-11 12:29:03 -03:00
|
|
|
async fn load_active_automations(&self) -> Result<Vec<Automation>, diesel::result::Error> {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Loading active automations from database");
|
2025-10-11 12:29:03 -03:00
|
|
|
use crate::shared::models::system_automations::dsl::*;
|
2025-10-20 19:49:54 -03:00
|
|
|
let result = {
|
|
|
|
|
let mut conn = self.state.conn.lock().unwrap();
|
|
|
|
|
system_automations
|
|
|
|
|
.filter(is_active.eq(true))
|
|
|
|
|
.load::<Automation>(&mut *conn)
|
|
|
|
|
}; // conn is dropped here
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Database query for active automations completed");
|
|
|
|
|
result.map_err(Into::into)
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn check_table_changes(&self, automations: &[Automation], since: DateTime<Utc>) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Checking table changes since={}", since);
|
2025-10-11 12:29:03 -03:00
|
|
|
for automation in automations {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Checking automation id={} kind={} target={:?}",
|
|
|
|
|
automation.id,
|
|
|
|
|
automation.kind,
|
|
|
|
|
automation.target
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let trigger_kind = match TriggerKind::from_i32(automation.kind) {
|
2025-10-11 20:02:14 -03:00
|
|
|
Some(k) => k,
|
2025-10-17 13:11:49 -03:00
|
|
|
None => {
|
|
|
|
|
trace!("Skipping automation {}: invalid TriggerKind", automation.id);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2025-10-11 20:02:14 -03:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if !matches!(
|
|
|
|
|
trigger_kind,
|
|
|
|
|
TriggerKind::TableUpdate | TriggerKind::TableInsert | TriggerKind::TableDelete
|
|
|
|
|
) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Skipping automation {}: trigger_kind {:?} not table-related",
|
|
|
|
|
automation.id,
|
|
|
|
|
trigger_kind
|
|
|
|
|
);
|
2025-10-11 20:02:14 -03:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let table = match &automation.target {
|
|
|
|
|
Some(t) => t,
|
2025-10-17 13:11:49 -03:00
|
|
|
None => {
|
|
|
|
|
trace!("Skipping automation {}: no table target", automation.id);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2025-10-11 20:02:14 -03:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let column = match trigger_kind {
|
|
|
|
|
TriggerKind::TableInsert => "created_at",
|
|
|
|
|
_ => "updated_at",
|
|
|
|
|
};
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Building query for table='{}' column='{}' trigger_kind={:?}",
|
|
|
|
|
table,
|
|
|
|
|
column,
|
|
|
|
|
trigger_kind
|
|
|
|
|
);
|
2025-10-11 20:02:14 -03:00
|
|
|
|
|
|
|
|
let query = format!(
|
|
|
|
|
"SELECT COUNT(*) as count FROM {} WHERE {} > $1",
|
|
|
|
|
table, column
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
#[derive(diesel::QueryableByName)]
|
|
|
|
|
struct CountResult {
|
|
|
|
|
#[diesel(sql_type = diesel::sql_types::BigInt)]
|
|
|
|
|
count: i64,
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-20 19:49:54 -03:00
|
|
|
let count_result = {
|
|
|
|
|
let mut conn_guard = self.state.conn.lock().unwrap();
|
|
|
|
|
let conn = &mut *conn_guard;
|
|
|
|
|
|
|
|
|
|
diesel::sql_query(&query)
|
|
|
|
|
.bind::<diesel::sql_types::Timestamp, _>(since.naive_utc())
|
|
|
|
|
.get_result::<CountResult>(conn)
|
|
|
|
|
}; // conn_guard is dropped here
|
2025-10-11 20:02:14 -03:00
|
|
|
|
|
|
|
|
match count_result {
|
|
|
|
|
Ok(result) if result.count > 0 => {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Detected {} change(s) in table='{}'; triggering automation {}",
|
|
|
|
|
result.count,
|
|
|
|
|
table,
|
|
|
|
|
automation.id
|
|
|
|
|
);
|
2025-10-11 20:02:14 -03:00
|
|
|
self.execute_action(&automation.param).await;
|
|
|
|
|
self.update_last_triggered(automation.id).await;
|
|
|
|
|
}
|
2025-10-17 13:11:49 -03:00
|
|
|
Ok(result) => {
|
|
|
|
|
trace!(
|
|
|
|
|
"No changes detected for automation {} (count={})",
|
|
|
|
|
automation.id,
|
|
|
|
|
result.count
|
|
|
|
|
);
|
|
|
|
|
}
|
2025-10-11 20:02:14 -03:00
|
|
|
Err(e) => {
|
|
|
|
|
error!("Error checking changes for table '{}': {}", table, e);
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn process_schedules(&self, automations: &[Automation]) {
|
2025-10-11 12:29:03 -03:00
|
|
|
let now = Utc::now();
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Processing scheduled automations at UTC={}",
|
|
|
|
|
now.format("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
);
|
2025-10-06 10:30:17 -03:00
|
|
|
for automation in automations {
|
|
|
|
|
if let Some(TriggerKind::Scheduled) = TriggerKind::from_i32(automation.kind) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Evaluating schedule pattern={:?} for automation {}",
|
|
|
|
|
automation.schedule,
|
|
|
|
|
automation.id
|
|
|
|
|
);
|
2025-10-06 10:30:17 -03:00
|
|
|
if let Some(pattern) = &automation.schedule {
|
2025-10-11 12:29:03 -03:00
|
|
|
if Self::should_run_cron(pattern, now.timestamp()) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Pattern matched; executing automation {} param='{}'",
|
|
|
|
|
automation.id,
|
|
|
|
|
automation.param
|
|
|
|
|
);
|
2025-10-06 10:30:17 -03:00
|
|
|
self.execute_action(&automation.param).await;
|
|
|
|
|
self.update_last_triggered(automation.id).await;
|
2025-10-17 13:11:49 -03:00
|
|
|
} else {
|
2025-10-20 19:49:54 -03:00
|
|
|
trace!("Pattern did not match for automation {}", automation.id);
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn update_last_triggered(&self, automation_id: Uuid) {
|
2025-10-20 19:49:54 -03:00
|
|
|
trace!(
|
|
|
|
|
"Updating last_triggered for automation_id={}",
|
|
|
|
|
automation_id
|
|
|
|
|
);
|
2025-10-11 12:29:03 -03:00
|
|
|
use crate::shared::models::system_automations::dsl::*;
|
|
|
|
|
let now = Utc::now();
|
2025-10-20 19:49:54 -03:00
|
|
|
let result = {
|
|
|
|
|
let mut conn = self.state.conn.lock().unwrap();
|
|
|
|
|
diesel::update(system_automations.filter(id.eq(automation_id)))
|
|
|
|
|
.set(last_triggered.eq(now.naive_utc()))
|
|
|
|
|
.execute(&mut *conn)
|
|
|
|
|
}; // conn is dropped here
|
|
|
|
|
|
|
|
|
|
if let Err(e) = result {
|
2025-10-11 12:29:03 -03:00
|
|
|
error!(
|
|
|
|
|
"Failed to update last_triggered for automation {}: {}",
|
|
|
|
|
automation_id, e
|
|
|
|
|
);
|
2025-10-17 13:11:49 -03:00
|
|
|
} else {
|
|
|
|
|
trace!("Successfully updated last_triggered for {}", automation_id);
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn should_run_cron(pattern: &str, timestamp: i64) -> bool {
|
2025-10-20 19:49:54 -03:00
|
|
|
trace!(
|
|
|
|
|
"Evaluating cron pattern='{}' at timestamp={}",
|
|
|
|
|
pattern,
|
|
|
|
|
timestamp
|
|
|
|
|
);
|
2025-10-06 10:30:17 -03:00
|
|
|
let parts: Vec<&str> = pattern.split_whitespace().collect();
|
|
|
|
|
if parts.len() != 5 {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Invalid cron pattern '{}'", pattern);
|
2025-10-06 10:30:17 -03:00
|
|
|
return false;
|
|
|
|
|
}
|
2025-10-11 20:02:14 -03:00
|
|
|
let dt = match DateTime::<Utc>::from_timestamp(timestamp, 0) {
|
|
|
|
|
Some(dt) => dt,
|
2025-10-17 13:11:49 -03:00
|
|
|
None => {
|
|
|
|
|
trace!("Invalid timestamp={}", timestamp);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2025-10-11 20:02:14 -03:00
|
|
|
};
|
2025-10-06 10:30:17 -03:00
|
|
|
let minute = dt.minute() as i32;
|
|
|
|
|
let hour = dt.hour() as i32;
|
|
|
|
|
let day = dt.day() as i32;
|
|
|
|
|
let month = dt.month() as i32;
|
|
|
|
|
let weekday = dt.weekday().num_days_from_monday() as i32;
|
2025-10-17 13:11:49 -03:00
|
|
|
let match_result = [minute, hour, day, month, weekday]
|
2025-10-06 10:30:17 -03:00
|
|
|
.iter()
|
|
|
|
|
.enumerate()
|
2025-10-17 13:11:49 -03:00
|
|
|
.all(|(i, &val)| Self::cron_part_matches(parts[i], val));
|
|
|
|
|
trace!(
|
|
|
|
|
"Cron pattern='{}' result={} at {}",
|
|
|
|
|
pattern,
|
|
|
|
|
match_result,
|
|
|
|
|
dt
|
|
|
|
|
);
|
|
|
|
|
match_result
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn cron_part_matches(part: &str, value: i32) -> bool {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Checking cron part '{}' against value={}", part, value);
|
2025-10-06 10:30:17 -03:00
|
|
|
if part == "*" {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
if part.contains('/') {
|
|
|
|
|
let parts: Vec<&str> = part.split('/').collect();
|
|
|
|
|
if parts.len() != 2 {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
let step: i32 = parts[1].parse().unwrap_or(1);
|
|
|
|
|
if parts[0] == "*" {
|
|
|
|
|
return value % step == 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
part.parse::<i32>().map_or(false, |num| num == value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn execute_action(&self, param: &str) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Starting execute_action with param='{}'", param);
|
2025-10-16 14:22:28 -03:00
|
|
|
let bot_id_string = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
|
|
|
|
|
let bot_id = Uuid::parse_str(&bot_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Resolved bot_id={} for param='{}'", bot_id, param);
|
2025-10-16 14:22:28 -03:00
|
|
|
|
|
|
|
|
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Redis key for job tracking: {}", redis_key);
|
2025-10-16 14:22:28 -03:00
|
|
|
|
|
|
|
|
if let Some(redis_client) = &self.state.redis_client {
|
|
|
|
|
match redis_client.get_multiplexed_async_connection().await {
|
|
|
|
|
Ok(mut conn) => {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Connected to Redis; checking if job '{}' is running", param);
|
2025-10-16 14:22:28 -03:00
|
|
|
let is_running: Result<bool, redis::RedisError> = redis::cmd("EXISTS")
|
|
|
|
|
.arg(&redis_key)
|
|
|
|
|
.query_async(&mut conn)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
if let Ok(true) = is_running {
|
2025-10-17 13:11:49 -03:00
|
|
|
warn!(
|
|
|
|
|
"Job '{}' is already running for bot '{}'; skipping execution",
|
|
|
|
|
param, bot_id
|
|
|
|
|
);
|
2025-10-16 14:22:28 -03:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _: Result<(), redis::RedisError> = redis::cmd("SETEX")
|
|
|
|
|
.arg(&redis_key)
|
2025-10-17 13:11:49 -03:00
|
|
|
.arg(300)
|
2025-10-16 14:22:28 -03:00
|
|
|
.arg("1")
|
|
|
|
|
.query_async(&mut conn)
|
|
|
|
|
.await;
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Job '{}' marked as running in Redis", param);
|
2025-10-16 14:22:28 -03:00
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!("Failed to connect to Redis for job tracking: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-06 10:30:17 -03:00
|
|
|
let full_path = Path::new(&self.scripts_dir).join(param);
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Resolved full path: {}", full_path.display());
|
|
|
|
|
|
2025-10-11 20:25:08 -03:00
|
|
|
let script_content = match tokio::fs::read_to_string(&full_path).await {
|
2025-10-17 13:11:49 -03:00
|
|
|
Ok(content) => {
|
|
|
|
|
trace!("Script '{}' read successfully", param);
|
|
|
|
|
content
|
|
|
|
|
}
|
2025-10-11 20:25:08 -03:00
|
|
|
Err(e) => {
|
2025-10-20 23:32:49 -03:00
|
|
|
warn!(
|
|
|
|
|
"Script not found locally at {}, attempting to download from MinIO: {}",
|
|
|
|
|
full_path.display(),
|
|
|
|
|
e
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Try to download from MinIO
|
|
|
|
|
if let Some(s3_client) = &self.state.s3_client {
|
|
|
|
|
let bucket_name = format!(
|
|
|
|
|
"{}{}.gbai",
|
|
|
|
|
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
|
|
|
|
|
env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string())
|
|
|
|
|
);
|
|
|
|
|
let s3_key = format!(".gbdialog/{}", param);
|
|
|
|
|
|
|
|
|
|
trace!("Downloading from bucket={} key={}", bucket_name, s3_key);
|
|
|
|
|
|
|
|
|
|
match s3_client
|
|
|
|
|
.get_object()
|
|
|
|
|
.bucket(&bucket_name)
|
|
|
|
|
.key(&s3_key)
|
|
|
|
|
.send()
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(response) => {
|
|
|
|
|
match response.body.collect().await {
|
|
|
|
|
Ok(data) => {
|
|
|
|
|
match String::from_utf8(data.into_bytes().to_vec()) {
|
|
|
|
|
Ok(content) => {
|
|
|
|
|
info!("Downloaded script '{}' from MinIO", param);
|
|
|
|
|
|
|
|
|
|
// Save to local cache
|
|
|
|
|
if let Err(e) =
|
|
|
|
|
std::fs::create_dir_all(&self.scripts_dir)
|
|
|
|
|
{
|
|
|
|
|
warn!("Failed to create scripts directory: {}", e);
|
|
|
|
|
} else if let Err(e) =
|
|
|
|
|
tokio::fs::write(&full_path, &content).await
|
|
|
|
|
{
|
|
|
|
|
warn!("Failed to cache script locally: {}", e);
|
|
|
|
|
} else {
|
|
|
|
|
trace!("Cached script to {}", full_path.display());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
content
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("Failed to decode script {}: {}", param, e);
|
|
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(
|
|
|
|
|
"Failed to read script body from MinIO {}: {}",
|
|
|
|
|
param, e
|
|
|
|
|
);
|
|
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("Failed to download script {} from MinIO: {}", param, e);
|
|
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
error!("S3 client not available, cannot download script {}", param);
|
|
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
2025-10-11 20:25:08 -03:00
|
|
|
}
|
|
|
|
|
};
|
2025-10-17 13:11:49 -03:00
|
|
|
|
2025-10-11 20:25:08 -03:00
|
|
|
let user_session = crate::shared::models::UserSession {
|
|
|
|
|
id: Uuid::new_v4(),
|
|
|
|
|
user_id: Uuid::new_v4(),
|
2025-10-16 14:22:28 -03:00
|
|
|
bot_id,
|
2025-10-11 20:25:08 -03:00
|
|
|
title: "Automation".to_string(),
|
2025-10-12 13:27:48 -03:00
|
|
|
answer_mode: 0,
|
2025-10-11 20:25:08 -03:00
|
|
|
current_tool: None,
|
|
|
|
|
context_data: serde_json::Value::Null,
|
|
|
|
|
created_at: Utc::now(),
|
|
|
|
|
updated_at: Utc::now(),
|
|
|
|
|
};
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Created temporary UserSession id={} for bot_id={}",
|
|
|
|
|
user_session.id,
|
|
|
|
|
bot_id
|
|
|
|
|
);
|
2025-10-11 20:25:08 -03:00
|
|
|
|
2025-10-20 19:49:54 -03:00
|
|
|
let result = {
|
|
|
|
|
let script_service = ScriptService::new(Arc::clone(&self.state), user_session);
|
|
|
|
|
let ast = match script_service.compile(&script_content) {
|
|
|
|
|
Ok(ast) => {
|
|
|
|
|
trace!("Compilation successful for script '{}'", param);
|
|
|
|
|
ast
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("Error compiling script '{}': {}", param, e);
|
|
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
trace!("Running compiled script '{}'", param);
|
|
|
|
|
script_service.run(&ast)
|
|
|
|
|
}; // script_service and ast are dropped here
|
2025-10-11 20:25:08 -03:00
|
|
|
|
2025-10-20 19:49:54 -03:00
|
|
|
match result {
|
2025-10-17 13:11:49 -03:00
|
|
|
Ok(_) => {
|
|
|
|
|
info!("Script '{}' executed successfully", param);
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
Err(e) => {
|
2025-10-17 13:11:49 -03:00
|
|
|
error!("Error executing script '{}': {}", param, e);
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|
2025-10-16 14:22:28 -03:00
|
|
|
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Cleaning up Redis flag for job '{}'", param);
|
2025-10-16 14:22:28 -03:00
|
|
|
self.cleanup_job_flag(&bot_id, param).await;
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Finished execute_action for '{}'", param);
|
2025-10-16 14:22:28 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn cleanup_job_flag(&self, bot_id: &Uuid, param: &str) {
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!(
|
|
|
|
|
"Cleaning up Redis flag for bot_id={} param='{}'",
|
|
|
|
|
bot_id,
|
|
|
|
|
param
|
|
|
|
|
);
|
2025-10-16 14:22:28 -03:00
|
|
|
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
|
|
|
|
|
|
|
|
|
if let Some(redis_client) = &self.state.redis_client {
|
|
|
|
|
match redis_client.get_multiplexed_async_connection().await {
|
|
|
|
|
Ok(mut conn) => {
|
|
|
|
|
let _: Result<(), redis::RedisError> = redis::cmd("DEL")
|
|
|
|
|
.arg(&redis_key)
|
|
|
|
|
.query_async(&mut conn)
|
|
|
|
|
.await;
|
2025-10-17 13:11:49 -03:00
|
|
|
trace!("Removed Redis key '{}'", redis_key);
|
2025-10-16 14:22:28 -03:00
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!("Failed to connect to Redis for cleanup: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-06 10:30:17 -03:00
|
|
|
}
|
|
|
|
|
}
|