feat(automation): add unique constraint and refactor action execution

- Added UNIQUE constraint on system_automations (bot_id, kind, param) to prevent duplicate automations
- Refactored execute_action to accept full Automation struct instead of just param
- Simplified bot name resolution by using automation.bot_id directly
- Improved error handling in action execution with proper error propagation
- Removed redundant bot name lookup logic by leveraging existing bot_id
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-02 20:57:53 -03:00
parent d2ee695d8b
commit 1d7d0e10c0
9 changed files with 284 additions and 501 deletions

15
migrations/6.0.10.sql Normal file
View file

@ -0,0 +1,15 @@
-- Migration 6.0.10: Add unique constraint for system_automations upsert
-- Description: Creates a unique constraint matching the ON CONFLICT target in set_schedule.rs
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'system_automations_bot_kind_param_unique'
) THEN
ALTER TABLE public.system_automations
ADD CONSTRAINT system_automations_bot_kind_param_unique
UNIQUE (bot_id, kind, param);
END IF;
END
$$;

View file

@ -9,3 +9,8 @@ ADD COLUMN IF NOT EXISTS bot_id UUID NOT NULL;
-- Create an index on bot_id for faster lookups -- Create an index on bot_id for faster lookups
CREATE INDEX IF NOT EXISTS idx_system_automations_bot_id CREATE INDEX IF NOT EXISTS idx_system_automations_bot_id
ON public.system_automations (bot_id); ON public.system_automations (bot_id);
ALTER TABLE public.system_automations
ADD CONSTRAINT IF NOT EXISTS system_automations_bot_kind_param_unique
UNIQUE (bot_id, kind, param);

View file

@ -1,8 +1,9 @@
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
use crate::basic::ScriptService; use crate::basic::ScriptService;
use crate::shared::models::{Automation, TriggerKind}; use crate::shared::models::{Automation, TriggerKind};
use crate::shared::state::AppState; use crate::shared::state::AppState;
use chrono::{DateTime, Datelike, Timelike, Utc}; use chrono::{DateTime, Datelike, Timelike, Utc};
use diesel::prelude::*;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -150,7 +151,9 @@ impl AutomationService {
table, table,
automation.id automation.id
); );
self.execute_action(&automation.param).await; if let Err(e) = self.execute_action(automation).await {
error!("Error executing automation {}: {}", automation.id, e);
}
self.update_last_triggered(automation.id).await; self.update_last_triggered(automation.id).await;
} }
Ok(result) => { Ok(result) => {
@ -187,7 +190,9 @@ impl AutomationService {
automation.id, automation.id,
automation.param automation.param
); );
self.execute_action(&automation.param).await; if let Err(e) = self.execute_action(automation).await {
error!("Error executing automation {}: {}", automation.id, e);
}
self.update_last_triggered(automation.id).await; self.update_last_triggered(automation.id).await;
} else { } else {
trace!("Pattern did not match for automation {}", automation.id); trace!("Pattern did not match for automation {}", automation.id);
@ -275,10 +280,10 @@ impl AutomationService {
part.parse::<i32>().map_or(false, |num| num == value) part.parse::<i32>().map_or(false, |num| num == value)
} }
async fn execute_action(&self, param: &str) { async fn execute_action(&self, automation: &Automation) -> Result<(), Box<dyn std::error::Error>> {
trace!("Starting execute_action with param='{}'", param); let bot_id = automation.bot_id;
let (bot_id, _) = crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()); let param = &automation.param;
trace!("Resolved bot_id={} for param='{}'", bot_id, param); trace!("Starting execute_action for bot_id={} param='{}'", bot_id, param);
let redis_key = format!("job:running:{}:{}", bot_id, param); let redis_key = format!("job:running:{}:{}", bot_id, param);
trace!("Redis key for job tracking: {}", redis_key); trace!("Redis key for job tracking: {}", redis_key);
@ -297,7 +302,6 @@ impl AutomationService {
"Job '{}' is already running for bot '{}'; skipping execution", "Job '{}' is already running for bot '{}'; skipping execution",
param, bot_id param, bot_id
); );
return;
} }
let _: Result<(), redis::RedisError> = redis::cmd("SETEX") let _: Result<(), redis::RedisError> = redis::cmd("SETEX")
@ -314,26 +318,15 @@ impl AutomationService {
} }
} }
// Get bot name from database let bot_name: String = {
let bot_name = { let mut db_conn = self.state.conn.lock().unwrap();
use crate::shared::models::bots; bots.filter(id.eq(&bot_id))
let mut conn = self.state.conn.lock().unwrap(); .select(name)
match bots::table .first(&mut *db_conn)
.filter(bots::id.eq(bot_id)) .map_err(|e| {
.select(bots::name) error!("Failed to query bot name for {}: {}", bot_id, e);
.first::<String>(&mut *conn) e
.optional() })?
{
Ok(Some(name)) => name,
Ok(None) => {
warn!("No bot found with id {}, using default name", bot_id);
crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1
}
Err(e) => {
error!("Failed to query bot name: {}", e);
crate::bot::get_default_bot(&mut self.state.conn.lock().unwrap()).1
}
}
}; };
let script_name = param.strip_suffix(".bas").unwrap_or(param); let script_name = param.strip_suffix(".bas").unwrap_or(param);
@ -358,7 +351,7 @@ impl AutomationService {
e e
); );
self.cleanup_job_flag(&bot_id, param).await; self.cleanup_job_flag(&bot_id, param).await;
return; return Ok(());
} }
}; };
@ -389,7 +382,7 @@ impl AutomationService {
Err(e) => { Err(e) => {
error!("Error compiling script '{}': {}", param, e); error!("Error compiling script '{}': {}", param, e);
self.cleanup_job_flag(&bot_id, param).await; self.cleanup_job_flag(&bot_id, param).await;
return; return Ok(());
} }
}; };
@ -409,6 +402,7 @@ impl AutomationService {
trace!("Cleaning up Redis flag for job '{}'", param); trace!("Cleaning up Redis flag for job '{}'", param);
self.cleanup_job_flag(&bot_id, param).await; self.cleanup_job_flag(&bot_id, param).await;
trace!("Finished execute_action for '{}'", param); trace!("Finished execute_action for '{}'", param);
Ok(())
} }
async fn cleanup_job_flag(&self, bot_id: &Uuid, param: &str) { async fn cleanup_job_flag(&self, bot_id: &Uuid, param: &str) {

View file

@ -1,4 +1,5 @@
use crate::shared::state::AppState; use crate::shared::state::AppState;
use crate::basic::keywords::set_schedule::execute_set_schedule;
use log::{debug, info, warn}; use log::{debug, info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@ -88,11 +89,12 @@ pub struct OpenAIProperty {
/// BASIC Compiler /// BASIC Compiler
pub struct BasicCompiler { pub struct BasicCompiler {
state: Arc<AppState>, state: Arc<AppState>,
bot_id: uuid::Uuid,
} }
impl BasicCompiler { impl BasicCompiler {
pub fn new(state: Arc<AppState>) -> Self { pub fn new(state: Arc<AppState>, bot_id: uuid::Uuid) -> Self {
Self { state } Self { state, bot_id }
} }
/// Compile a BASIC file to AST and generate tool definitions /// Compile a BASIC file to AST and generate tool definitions
@ -121,7 +123,7 @@ impl BasicCompiler {
// Generate AST (using Rhai compilation would happen here) // Generate AST (using Rhai compilation would happen here)
// For now, we'll store the preprocessed script // For now, we'll store the preprocessed script
let ast_content = self.preprocess_basic(&source_content)?; let ast_content = self.preprocess_basic(&source_content, source_path, self.bot_id)?;
fs::write(&ast_path, &ast_content) fs::write(&ast_path, &ast_content)
.map_err(|e| format!("Failed to write AST file: {}", e))?; .map_err(|e| format!("Failed to write AST file: {}", e))?;
@ -368,7 +370,7 @@ impl BasicCompiler {
} }
/// Preprocess BASIC script (basic transformations) /// Preprocess BASIC script (basic transformations)
fn preprocess_basic(&self, source: &str) -> Result<String, Box<dyn Error + Send + Sync>> { fn preprocess_basic(&self, source: &str, source_path: &str, bot_id: uuid::Uuid) -> Result<String, Box<dyn Error + Send + Sync>> {
let mut result = String::new(); let mut result = String::new();
for line in source.lines() { for line in source.lines() {
@ -379,6 +381,32 @@ impl BasicCompiler {
continue; continue;
} }
// Handle SET_SCHEDULE keyword during preprocessing
if trimmed.starts_with("SET_SCHEDULE") {
// Expected format: SET_SCHEDULE "cron_expression"
// Extract the quoted cron expression
let parts: Vec<&str> = trimmed.split('"').collect();
if parts.len() >= 3 {
let cron = parts[1];
// Get the current script's name (file stem)
use std::path::Path;
let script_name = Path::new(source_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let mut conn = self.state.conn.lock().unwrap();
if let Err(e) = execute_set_schedule(&mut *conn, cron, &script_name, bot_id) {
log::error!("Failed to schedule SET_SCHEDULE during preprocessing: {}", e);
}
} else {
log::warn!("Malformed SET_SCHEDULE line ignored: {}", trimmed);
}
continue;
}
// Skip PARAM and DESCRIPTION lines (metadata) // Skip PARAM and DESCRIPTION lines (metadata)
if trimmed.starts_with("PARAM ") || trimmed.starts_with("DESCRIPTION ") { if trimmed.starts_with("PARAM ") || trimmed.starts_with("DESCRIPTION ") {
continue; continue;
@ -407,7 +435,7 @@ mod tests {
#[test] #[test]
fn test_normalize_type() { fn test_normalize_type() {
let compiler = BasicCompiler::new(Arc::new(AppState::default())); let compiler = BasicCompiler::new(Arc::new(AppState::default()), uuid::Uuid::nil());
assert_eq!(compiler.normalize_type("string"), "string"); assert_eq!(compiler.normalize_type("string"), "string");
assert_eq!(compiler.normalize_type("integer"), "integer"); assert_eq!(compiler.normalize_type("integer"), "integer");
@ -418,7 +446,7 @@ mod tests {
#[test] #[test]
fn test_parse_param_line() { fn test_parse_param_line() {
let compiler = BasicCompiler::new(Arc::new(AppState::default())); let compiler = BasicCompiler::new(Arc::new(AppState::default()), uuid::Uuid::nil());
let line = r#"PARAM name AS string LIKE "John Doe" DESCRIPTION "User's full name""#; let line = r#"PARAM name AS string LIKE "John Doe" DESCRIPTION "User's full name""#;
let result = compiler.parse_param_line(line).unwrap(); let result = compiler.parse_param_line(line).unwrap();

View file

@ -1,15 +1,17 @@
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
use crate::kb::minio_handler;
use crate::shared::models::UserSession; use crate::shared::models::UserSession;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::{debug, error, info}; use log::{debug, error, info, trace};
use reqwest::{self, Client}; use reqwest::{self, Client};
use crate::kb::minio_handler;
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn get_keyword(state: Arc<AppState>, user_session: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
engine engine
@ -45,7 +47,9 @@ pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
execute_get(&url_for_blocking).await execute_get(&url_for_blocking).await
} else { } else {
info!("Local file GET request from bucket: {}", url_for_blocking); info!("Local file GET request from bucket: {}", url_for_blocking);
get_from_bucket(&state_for_blocking, &url_for_blocking).await get_from_bucket(&state_for_blocking, &url_for_blocking,
user_session.bot_id)
.await
} }
}); });
tx.send(result).err() tx.send(result).err()
@ -151,6 +155,7 @@ pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Syn
pub async fn get_from_bucket( pub async fn get_from_bucket(
state: &AppState, state: &AppState,
file_path: &str, file_path: &str,
bot_id: uuid::Uuid,
) -> Result<String, Box<dyn Error + Send + Sync>> { ) -> Result<String, Box<dyn Error + Send + Sync>> {
debug!("Getting file from bucket: {}", file_path); debug!("Getting file from bucket: {}", file_path);
@ -160,26 +165,37 @@ pub async fn get_from_bucket(
} }
let client = state.drive.as_ref().ok_or("S3 client not configured")?; let client = state.drive.as_ref().ok_or("S3 client not configured")?;
let bot_name: String = {
let mut db_conn = state.conn.lock().unwrap();
bots.filter(id.eq(&bot_id))
.select(name)
.first(&mut *db_conn)
.map_err(|e| {
error!("Failed to query bot name for {}: {}", bot_id, e);
e
})?
};
let bucket_name = { let bucket_name = {
let bucket = format!("{}.gbai", bot_name);
let bucket = format!("default.gbai"); trace!("Resolved GET bucket name: {}", bucket);
debug!("Resolved bucket name: {}", bucket);
bucket bucket
}; };
let bytes = match tokio::time::timeout( let bytes = match tokio::time::timeout(
Duration::from_secs(30), Duration::from_secs(30),
minio_handler::get_file_content(client, &bucket_name, file_path) minio_handler::get_file_content(client, &bucket_name, file_path),
).await { )
.await
{
Ok(Ok(data)) => data, Ok(Ok(data)) => data,
Ok(Err(e)) => { Ok(Err(e)) => {
error!("S3 read failed: {}", e); error!("drive read failed: {}", e);
return Err(format!("S3 operation failed: {}", e).into()); return Err(format!("S3 operation failed: {}", e).into());
} }
Err(_) => { Err(_) => {
error!("S3 read timed out"); error!("drive read timed out");
return Err("S3 operation timed out".into()); return Err("drive operation timed out".into());
} }
}; };

View file

@ -1,36 +1,10 @@
use diesel::prelude::*; use diesel::prelude::*;
use log::info; use log::info;
use rhai::Dynamic;
use rhai::Engine;
use serde_json::{json, Value}; use serde_json::{json, Value};
use uuid::Uuid; use uuid::Uuid;
use crate::shared::models::TriggerKind; use crate::shared::models::TriggerKind;
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
pub fn set_schedule_keyword(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
engine
.register_custom_syntax(&["SET_SCHEDULE", "$string$", "$string$"], true, {
move |context, inputs| {
let cron = context.eval_expression_tree(&inputs[0])?.to_string();
let script_name = context.eval_expression_tree(&inputs[1])?.to_string();
let mut conn = state_clone.conn.lock().unwrap();
let result = execute_set_schedule(&mut *conn, &cron, &script_name, user.bot_id)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("No rows affected".into())
}
}
})
.unwrap();
}
pub fn execute_set_schedule( pub fn execute_set_schedule(
conn: &mut diesel::PgConnection, conn: &mut diesel::PgConnection,
@ -39,7 +13,7 @@ pub fn execute_set_schedule(
bot_uuid: Uuid, bot_uuid: Uuid,
) -> Result<Value, Box<dyn std::error::Error>> { ) -> Result<Value, Box<dyn std::error::Error>> {
info!( info!(
"Starting execute_set_schedule with cron: {}, script: {}, bot_id: {:?}", "Scheduling SET SCHEDULE cron: {}, script: {}, bot_id: {:?}",
cron, script_name, bot_uuid cron, script_name, bot_uuid
); );
@ -55,7 +29,7 @@ pub fn execute_set_schedule(
let result = diesel::insert_into(system_automations) let result = diesel::insert_into(system_automations)
.values(&new_automation) .values(&new_automation)
.on_conflict((bot_id, param)) .on_conflict((bot_id, kind, param))
.do_update() .do_update()
.set(( .set((
schedule.eq(cron), schedule.eq(cron),

View file

@ -28,7 +28,6 @@ use self::keywords::print::print_keyword;
use self::keywords::remove_tool::remove_tool_keyword; use self::keywords::remove_tool::remove_tool_keyword;
use self::keywords::set::set_keyword; use self::keywords::set::set_keyword;
use self::keywords::set_kb::{add_kb_keyword, set_kb_keyword}; use self::keywords::set_kb::{add_kb_keyword, set_kb_keyword};
use self::keywords::set_schedule::set_schedule_keyword;
use self::keywords::wait::wait_keyword; use self::keywords::wait::wait_keyword;
use self::keywords::add_suggestion::add_suggestion_keyword; use self::keywords::add_suggestion::add_suggestion_keyword;
@ -68,7 +67,6 @@ impl ScriptService {
wait_keyword(&state, user.clone(), &mut engine); wait_keyword(&state, user.clone(), &mut engine);
print_keyword(&state, user.clone(), &mut engine); print_keyword(&state, user.clone(), &mut engine);
on_keyword(&state, user.clone(), &mut engine); on_keyword(&state, user.clone(), &mut engine);
set_schedule_keyword(&state, user.clone(), &mut engine);
hear_keyword(state.clone(), user.clone(), &mut engine); hear_keyword(state.clone(), user.clone(), &mut engine);
talk_keyword(state.clone(), user.clone(), &mut engine); talk_keyword(state.clone(), user.clone(), &mut engine);
set_context_keyword(state.clone(), user.clone(), &mut engine); set_context_keyword(state.clone(), user.clone(), &mut engine);

View file

@ -404,7 +404,7 @@ impl DriveMonitor {
let local_source_path = format!("{}/{}.bas", work_dir, tool_name); let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
std::fs::write(&local_source_path, &source_content)?; std::fs::write(&local_source_path, &source_content)?;
let compiler = BasicCompiler::new(Arc::clone(&self.state)); let compiler = BasicCompiler::new(Arc::clone(&self.state), self.bot_id);
let result = compiler.compile_file(&local_source_path, &work_dir)?; let result = compiler.compile_file(&local_source_path, &work_dir)?;
if let Some(mcp_tool) = result.mcp_tool { if let Some(mcp_tool) = result.mcp_tool {

File diff suppressed because it is too large Load diff