From 7c69284f0776328d0bcceca38d76d55e9de9a8cd Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 5 Nov 2025 10:15:36 -0300 Subject: [PATCH] feat(compiler): clean up stale schedules on script recompilation Added tracking of previously scheduled scripts using a `HashSet` and initialized it in `BasicCompiler::new`. Updated `compile_file` and `preprocess_basic` to require mutable access, allowing schedule cleanup before processing. Implemented logic to delete existing scheduled automations for a script using Diesel queries, ensuring old schedules are removed when a script is recompiled without a `SET_SCHEDULE`. Added necessary Diesel imports and `TriggerKind` reference. This prevents duplicate or orphaned scheduled tasks. --- src/basic/compiler/mod.rs | 74 ++++++++++++---- src/basic/keywords/add_tool.rs | 18 ---- src/basic/keywords/mod.rs | 3 +- src/basic/keywords/remove_tool.rs | 138 ------------------------------ src/basic/mod.rs | 2 - src/bootstrap/mod.rs | 16 +++- src/config/mod.rs | 4 +- src/drive_monitor/mod.rs | 2 +- 8 files changed, 71 insertions(+), 186 deletions(-) delete mode 100644 src/basic/keywords/remove_tool.rs diff --git a/src/basic/compiler/mod.rs b/src/basic/compiler/mod.rs index d2093735..0492d60a 100644 --- a/src/basic/compiler/mod.rs +++ b/src/basic/compiler/mod.rs @@ -3,6 +3,11 @@ use crate::basic::keywords::set_schedule::execute_set_schedule; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use diesel::QueryDsl; +use diesel::ExpressionMethods; +use std::collections::HashSet; +use diesel::RunQueryDsl; +use crate::shared::models::TriggerKind; use std::error::Error; use std::fs; use std::path::Path; @@ -89,16 +94,21 @@ pub struct OpenAIProperty { pub struct BasicCompiler { state: Arc, bot_id: uuid::Uuid, + previous_schedules: HashSet, // Tracks script names with SET_SCHEDULE } impl BasicCompiler { pub fn new(state: Arc, bot_id: uuid::Uuid) -> Self { - Self { state, bot_id } + Self { + state, + bot_id, + previous_schedules: HashSet::new(), + } } /// Compile a BASIC file to AST and generate tool definitions pub fn compile_file( - &self, + &mut self, source_path: &str, output_dir: &str, ) -> Result> { @@ -288,10 +298,10 @@ impl BasicCompiler { "integer" | "int" | "number" => "integer".to_string(), "float" | "double" | "decimal" => "number".to_string(), "boolean" | "bool" => "boolean".to_string(), - "date" | "datetime" => "string".to_string(), // Dates as strings + "date" | "datetime" => "string".to_string(), "array" | "list" => "array".to_string(), "object" | "map" => "object".to_string(), - _ => "string".to_string(), // Default to string + _ => "string".to_string(), } } @@ -367,33 +377,41 @@ impl BasicCompiler { } /// Preprocess BASIC script (basic transformations) - fn preprocess_basic(&self, source: &str, source_path: &str, bot_id: uuid::Uuid) -> Result> { + fn preprocess_basic(&mut self, source: &str, source_path: &str, bot_id: uuid::Uuid) -> Result> { + let bot_uuid = bot_id; let mut result = String::new(); + let mut has_schedule = false; + let script_name = Path::new(source_path) + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown") + .to_string(); + + // Remove any existing schedule for this script before processing + { + let mut conn = self.state.conn.lock().unwrap(); + use crate::shared::models::system_automations::dsl::*; + diesel::delete(system_automations + .filter(bot_id.eq(bot_uuid)) + .filter(kind.eq(TriggerKind::Scheduled as i32)) + .filter(param.eq(&script_name)) + ) + .execute(&mut *conn) + .ok(); + } for line in source.lines() { let trimmed = line.trim(); - // Skip empty lines and comments if trimmed.is_empty() || trimmed.starts_with("//") || trimmed.starts_with("REM") { continue; } - // Handle SET_SCHEDULE keyword during preprocessing if trimmed.starts_with("SET_SCHEDULE") { - // Expected format: SET_SCHEDULE "cron_expression" - // Extract the quoted cron expression + has_schedule = true; let parts: Vec<&str> = trimmed.split('"').collect(); if parts.len() >= 3 { let cron = parts[1]; - - // Get the current script's name (file stem) - use std::path::Path; - let script_name = Path::new(source_path) - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown") - .to_string(); - let mut conn = self.state.conn.lock().unwrap(); if let Err(e) = execute_set_schedule(&mut *conn, cron, &script_name, bot_id) { log::error!("Failed to schedule SET_SCHEDULE during preprocessing: {}", e); @@ -404,7 +422,6 @@ impl BasicCompiler { continue; } - // Skip PARAM and DESCRIPTION lines (metadata) if trimmed.starts_with("PARAM ") || trimmed.starts_with("DESCRIPTION ") { continue; } @@ -413,6 +430,25 @@ impl BasicCompiler { result.push('\n'); } + if self.previous_schedules.contains(&script_name) && !has_schedule { + let mut conn = self.state.conn.lock().unwrap(); + use crate::shared::models::system_automations::dsl::*; + diesel::delete(system_automations + .filter(bot_id.eq(bot_uuid)) + .filter(kind.eq(TriggerKind::Scheduled as i32)) + .filter(param.eq(&script_name)) + ) + .execute(&mut *conn) + .map_err(|e| log::error!("Failed to remove schedule for {}: {}", script_name, e)) + .ok(); + } + + if has_schedule { + self.previous_schedules.insert(script_name); + } else { + self.previous_schedules.remove(&script_name); + } + Ok(result) } } diff --git a/src/basic/keywords/add_tool.rs b/src/basic/keywords/add_tool.rs index 9f5cd04b..4695b650 100644 --- a/src/basic/keywords/add_tool.rs +++ b/src/basic/keywords/add_tool.rs @@ -206,24 +206,6 @@ pub fn get_session_tools( .load::(conn) } -/// Remove a tool association from a session -pub fn remove_session_tool( - conn: &mut PgConnection, - session_id: &Uuid, - tool_name: &str, -) -> Result { - use crate::shared::models::schema::session_tool_associations; - - let session_id_str = session_id.to_string(); - - diesel::delete( - session_tool_associations::table - .filter(session_tool_associations::session_id.eq(&session_id_str)) - .filter(session_tool_associations::tool_name.eq(tool_name)), - ) - .execute(conn) -} - /// Clear all tool associations for a session pub fn clear_session_tools( conn: &mut PgConnection, diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index 783ec4d9..0dbafddc 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -14,10 +14,9 @@ pub mod list_tools; pub mod llm_keyword; pub mod on; pub mod print; -pub mod remove_tool; pub mod set; -pub mod set_kb; pub mod set_schedule; +pub mod set_kb; pub mod wait; pub mod add_suggestion; pub mod set_user; diff --git a/src/basic/keywords/remove_tool.rs b/src/basic/keywords/remove_tool.rs deleted file mode 100644 index f5db7a8e..00000000 --- a/src/basic/keywords/remove_tool.rs +++ /dev/null @@ -1,138 +0,0 @@ -use crate::basic::keywords::add_tool::remove_session_tool; -use crate::shared::models::UserSession; -use crate::shared::state::AppState; -use log::{error, info}; -use rhai::{Dynamic, Engine}; -use std::sync::Arc; - -pub fn remove_tool_keyword(state: Arc, user: UserSession, engine: &mut Engine) { - let state_clone = Arc::clone(&state); - let user_clone = user.clone(); - - engine - .register_custom_syntax(&["REMOVE_TOOL", "$expr$"], false, move |context, inputs| { - let tool_path = context.eval_expression_tree(&inputs[0])?; - let tool_path_str = tool_path.to_string().trim_matches('"').to_string(); - - info!( - "REMOVE_TOOL command executed: {} for session: {}", - tool_path_str, user_clone.id - ); - - // Extract tool name from path (e.g., "enrollment.bas" -> "enrollment") - let tool_name = tool_path_str - .strip_prefix(".gbdialog/") - .unwrap_or(&tool_path_str) - .strip_suffix(".bas") - .unwrap_or(&tool_path_str) - .to_string(); - - // Validate tool name - if tool_name.is_empty() { - return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( - "Invalid tool name".into(), - rhai::Position::NONE, - ))); - } - - let state_for_task = Arc::clone(&state_clone); - let user_for_task = user_clone.clone(); - let tool_name_for_task = tool_name.clone(); - - // Spawn async task to remove tool association from session - let (tx, rx) = std::sync::mpsc::channel(); - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build(); - - let send_err = if let Ok(rt) = rt { - let result = rt.block_on(async move { - disassociate_tool_from_session( - &state_for_task, - &user_for_task, - &tool_name_for_task, - ) - .await - }); - tx.send(result).err() - } else { - tx.send(Err("Failed to build tokio runtime".to_string())) - .err() - }; - - if send_err.is_some() { - error!("Failed to send result from thread"); - } - }); - - match rx.recv_timeout(std::time::Duration::from_secs(10)) { - Ok(Ok(message)) => { - info!("REMOVE_TOOL completed: {}", message); - Ok(Dynamic::from(message)) - } - Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( - e.into(), - rhai::Position::NONE, - ))), - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - Err(Box::new(rhai::EvalAltResult::ErrorRuntime( - "REMOVE_TOOL timed out".into(), - rhai::Position::NONE, - ))) - } - Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( - format!("REMOVE_TOOL failed: {}", e).into(), - rhai::Position::NONE, - ))), - } - }) - .unwrap(); -} - -/// Remove a tool association from the current session -async fn disassociate_tool_from_session( - state: &AppState, - user: &UserSession, - tool_name: &str, -) -> Result { - let mut conn = state.conn.lock().map_err(|e| { - error!("Failed to acquire database lock: {}", e); - format!("Database connection error: {}", e) - })?; - - // Remove the tool association - let delete_result = remove_session_tool(&mut *conn, &user.id, tool_name); - - match delete_result { - Ok(rows_affected) => { - if rows_affected > 0 { - info!( - "Tool '{}' removed from session '{}' (user: {}, bot: {})", - tool_name, user.id, user.user_id, user.bot_id - ); - Ok(format!( - "Tool '{}' has been removed from this conversation", - tool_name - )) - } else { - info!( - "Tool '{}' was not associated with session '{}'", - tool_name, user.id - ); - Ok(format!( - "Tool '{}' was not active in this conversation", - tool_name - )) - } - } - Err(e) => { - error!( - "Failed to remove tool '{}' from session '{}': {}", - tool_name, user.id, e - ); - Err(format!("Failed to remove tool from session: {}", e)) - } - } -} diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 0b6d2c9d..8b790244 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -26,7 +26,6 @@ use self::keywords::list_tools::list_tools_keyword; use self::keywords::llm_keyword::llm_keyword; use self::keywords::on::on_keyword; use self::keywords::print::print_keyword; -use self::keywords::remove_tool::remove_tool_keyword; use self::keywords::set::set_keyword; use self::keywords::set_kb::{add_kb_keyword, set_kb_keyword}; use self::keywords::wait::wait_keyword; @@ -76,7 +75,6 @@ impl ScriptService { set_kb_keyword(state.clone(), user.clone(), &mut engine); add_kb_keyword(state.clone(), user.clone(), &mut engine); add_tool_keyword(state.clone(), user.clone(), &mut engine); - remove_tool_keyword(state.clone(), user.clone(), &mut engine); clear_tools_keyword(state.clone(), user.clone(), &mut engine); list_tools_keyword(state.clone(), user.clone(), &mut engine); add_website_keyword(state.clone(), user.clone(), &mut engine); diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index 57216731..3c768c3b 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -1,4 +1,4 @@ -use crate::config::AppConfig; +use crate::config::{AppConfig, write_drive_config_to_env}; use crate::package_manager::{InstallMode, PackageManager}; use crate::shared::utils::establish_pg_connection; use anyhow::Result; @@ -279,11 +279,19 @@ impl BootstrapManager { self.s3_client = futures::executor::block_on(Self::create_s3_operator(&config)); // Load config from CSV if available - if let Ok(csv_config) = self.load_config_from_csv().await { - Ok(csv_config) + let final_config = if let Ok(csv_config) = self.load_config_from_csv().await { + csv_config } else { - Ok(config) + config + }; + + // Write drive config to .env file if not already present (first bootstrap) + if std::env::var("DRIVE_SERVER").is_err() { + write_drive_config_to_env(&final_config.drive) + .map_err(|e| anyhow::anyhow!("Failed to write drive config to .env: {}", e))?; } + + Ok(final_config) } diff --git a/src/config/mod.rs b/src/config/mod.rs index c8b72c12..30cd5936 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,7 +1,7 @@ use diesel::prelude::*; use diesel::pg::PgConnection; use uuid::Uuid; -use log::{info, trace, warn}; +use log::{info, trace}; // removed unused serde import use std::collections::HashMap; use std::fs::OpenOptions; @@ -197,7 +197,7 @@ impl AppConfig { } -fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> { +pub fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> { let mut file = OpenOptions::new() .append(true) .create(true) diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 01e23779..672d1fc8 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -339,7 +339,7 @@ impl DriveMonitor { let local_source_path = format!("{}/{}.bas", work_dir, tool_name); std::fs::write(&local_source_path, &source_content)?; - let compiler = BasicCompiler::new(Arc::clone(&self.state), self.bot_id); + let mut compiler = BasicCompiler::new(Arc::clone(&self.state), self.bot_id); let result = compiler.compile_file(&local_source_path, &work_dir)?; if let Some(mcp_tool) = result.mcp_tool {