feat(compiler): clean up stale schedules on script recompilation

Added tracking of previously scheduled scripts using a `HashSet` and initialized it in `BasicCompiler::new`. Updated `compile_file` and `preprocess_basic` to require mutable access, allowing schedule cleanup before processing. Implemented logic to delete existing scheduled automations for a script using Diesel queries, ensuring old schedules are removed when a script is recompiled without a `SET_SCHEDULE`. Added necessary Diesel imports and `TriggerKind` reference. This prevents duplicate or orphaned scheduled tasks.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-05 10:15:36 -03:00
parent 72f7081acb
commit 7c69284f07
8 changed files with 71 additions and 186 deletions

View file

@ -3,6 +3,11 @@ use crate::basic::keywords::set_schedule::execute_set_schedule;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use diesel::QueryDsl;
use diesel::ExpressionMethods;
use std::collections::HashSet;
use diesel::RunQueryDsl;
use crate::shared::models::TriggerKind;
use std::error::Error;
use std::fs;
use std::path::Path;
@ -89,16 +94,21 @@ pub struct OpenAIProperty {
pub struct BasicCompiler {
state: Arc<AppState>,
bot_id: uuid::Uuid,
previous_schedules: HashSet<String>, // Tracks script names with SET_SCHEDULE
}
impl BasicCompiler {
pub fn new(state: Arc<AppState>, bot_id: uuid::Uuid) -> Self {
Self { state, bot_id }
Self {
state,
bot_id,
previous_schedules: HashSet::new(),
}
}
/// Compile a BASIC file to AST and generate tool definitions
pub fn compile_file(
&self,
&mut self,
source_path: &str,
output_dir: &str,
) -> Result<CompilationResult, Box<dyn Error + Send + Sync>> {
@ -288,10 +298,10 @@ impl BasicCompiler {
"integer" | "int" | "number" => "integer".to_string(),
"float" | "double" | "decimal" => "number".to_string(),
"boolean" | "bool" => "boolean".to_string(),
"date" | "datetime" => "string".to_string(), // Dates as strings
"date" | "datetime" => "string".to_string(),
"array" | "list" => "array".to_string(),
"object" | "map" => "object".to_string(),
_ => "string".to_string(), // Default to string
_ => "string".to_string(),
}
}
@ -367,33 +377,41 @@ impl BasicCompiler {
}
/// Preprocess BASIC script (basic transformations)
fn preprocess_basic(&self, source: &str, source_path: &str, bot_id: uuid::Uuid) -> Result<String, Box<dyn Error + Send + Sync>> {
fn preprocess_basic(&mut self, source: &str, source_path: &str, bot_id: uuid::Uuid) -> Result<String, Box<dyn Error + Send + Sync>> {
let bot_uuid = bot_id;
let mut result = String::new();
let mut has_schedule = false;
let script_name = Path::new(source_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
// Remove any existing schedule for this script before processing
{
let mut conn = self.state.conn.lock().unwrap();
use crate::shared::models::system_automations::dsl::*;
diesel::delete(system_automations
.filter(bot_id.eq(bot_uuid))
.filter(kind.eq(TriggerKind::Scheduled as i32))
.filter(param.eq(&script_name))
)
.execute(&mut *conn)
.ok();
}
for line in source.lines() {
let trimmed = line.trim();
// Skip empty lines and comments
if trimmed.is_empty() || trimmed.starts_with("//") || trimmed.starts_with("REM") {
continue;
}
// Handle SET_SCHEDULE keyword during preprocessing
if trimmed.starts_with("SET_SCHEDULE") {
// Expected format: SET_SCHEDULE "cron_expression"
// Extract the quoted cron expression
has_schedule = true;
let parts: Vec<&str> = trimmed.split('"').collect();
if parts.len() >= 3 {
let cron = parts[1];
// Get the current script's name (file stem)
use std::path::Path;
let script_name = Path::new(source_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let mut conn = self.state.conn.lock().unwrap();
if let Err(e) = execute_set_schedule(&mut *conn, cron, &script_name, bot_id) {
log::error!("Failed to schedule SET_SCHEDULE during preprocessing: {}", e);
@ -404,7 +422,6 @@ impl BasicCompiler {
continue;
}
// Skip PARAM and DESCRIPTION lines (metadata)
if trimmed.starts_with("PARAM ") || trimmed.starts_with("DESCRIPTION ") {
continue;
}
@ -413,6 +430,25 @@ impl BasicCompiler {
result.push('\n');
}
if self.previous_schedules.contains(&script_name) && !has_schedule {
let mut conn = self.state.conn.lock().unwrap();
use crate::shared::models::system_automations::dsl::*;
diesel::delete(system_automations
.filter(bot_id.eq(bot_uuid))
.filter(kind.eq(TriggerKind::Scheduled as i32))
.filter(param.eq(&script_name))
)
.execute(&mut *conn)
.map_err(|e| log::error!("Failed to remove schedule for {}: {}", script_name, e))
.ok();
}
if has_schedule {
self.previous_schedules.insert(script_name);
} else {
self.previous_schedules.remove(&script_name);
}
Ok(result)
}
}

View file

@ -206,24 +206,6 @@ pub fn get_session_tools(
.load::<String>(conn)
}
/// Remove a tool association from a session
pub fn remove_session_tool(
conn: &mut PgConnection,
session_id: &Uuid,
tool_name: &str,
) -> Result<usize, diesel::result::Error> {
use crate::shared::models::schema::session_tool_associations;
let session_id_str = session_id.to_string();
diesel::delete(
session_tool_associations::table
.filter(session_tool_associations::session_id.eq(&session_id_str))
.filter(session_tool_associations::tool_name.eq(tool_name)),
)
.execute(conn)
}
/// Clear all tool associations for a session
pub fn clear_session_tools(
conn: &mut PgConnection,

View file

@ -14,10 +14,9 @@ pub mod list_tools;
pub mod llm_keyword;
pub mod on;
pub mod print;
pub mod remove_tool;
pub mod set;
pub mod set_kb;
pub mod set_schedule;
pub mod set_kb;
pub mod wait;
pub mod add_suggestion;
pub mod set_user;

View file

@ -1,138 +0,0 @@
use crate::basic::keywords::add_tool::remove_session_tool;
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::{error, info};
use rhai::{Dynamic, Engine};
use std::sync::Arc;
pub fn remove_tool_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state);
let user_clone = user.clone();
engine
.register_custom_syntax(&["REMOVE_TOOL", "$expr$"], false, move |context, inputs| {
let tool_path = context.eval_expression_tree(&inputs[0])?;
let tool_path_str = tool_path.to_string().trim_matches('"').to_string();
info!(
"REMOVE_TOOL command executed: {} for session: {}",
tool_path_str, user_clone.id
);
// Extract tool name from path (e.g., "enrollment.bas" -> "enrollment")
let tool_name = tool_path_str
.strip_prefix(".gbdialog/")
.unwrap_or(&tool_path_str)
.strip_suffix(".bas")
.unwrap_or(&tool_path_str)
.to_string();
// Validate tool name
if tool_name.is_empty() {
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"Invalid tool name".into(),
rhai::Position::NONE,
)));
}
let state_for_task = Arc::clone(&state_clone);
let user_for_task = user_clone.clone();
let tool_name_for_task = tool_name.clone();
// Spawn async task to remove tool association from session
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build();
let send_err = if let Ok(rt) = rt {
let result = rt.block_on(async move {
disassociate_tool_from_session(
&state_for_task,
&user_for_task,
&tool_name_for_task,
)
.await
});
tx.send(result).err()
} else {
tx.send(Err("Failed to build tokio runtime".to_string()))
.err()
};
if send_err.is_some() {
error!("Failed to send result from thread");
}
});
match rx.recv_timeout(std::time::Duration::from_secs(10)) {
Ok(Ok(message)) => {
info!("REMOVE_TOOL completed: {}", message);
Ok(Dynamic::from(message))
}
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
e.into(),
rhai::Position::NONE,
))),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"REMOVE_TOOL timed out".into(),
rhai::Position::NONE,
)))
}
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("REMOVE_TOOL failed: {}", e).into(),
rhai::Position::NONE,
))),
}
})
.unwrap();
}
/// Remove a tool association from the current session
async fn disassociate_tool_from_session(
state: &AppState,
user: &UserSession,
tool_name: &str,
) -> Result<String, String> {
let mut conn = state.conn.lock().map_err(|e| {
error!("Failed to acquire database lock: {}", e);
format!("Database connection error: {}", e)
})?;
// Remove the tool association
let delete_result = remove_session_tool(&mut *conn, &user.id, tool_name);
match delete_result {
Ok(rows_affected) => {
if rows_affected > 0 {
info!(
"Tool '{}' removed from session '{}' (user: {}, bot: {})",
tool_name, user.id, user.user_id, user.bot_id
);
Ok(format!(
"Tool '{}' has been removed from this conversation",
tool_name
))
} else {
info!(
"Tool '{}' was not associated with session '{}'",
tool_name, user.id
);
Ok(format!(
"Tool '{}' was not active in this conversation",
tool_name
))
}
}
Err(e) => {
error!(
"Failed to remove tool '{}' from session '{}': {}",
tool_name, user.id, e
);
Err(format!("Failed to remove tool from session: {}", e))
}
}
}

View file

@ -26,7 +26,6 @@ use self::keywords::list_tools::list_tools_keyword;
use self::keywords::llm_keyword::llm_keyword;
use self::keywords::on::on_keyword;
use self::keywords::print::print_keyword;
use self::keywords::remove_tool::remove_tool_keyword;
use self::keywords::set::set_keyword;
use self::keywords::set_kb::{add_kb_keyword, set_kb_keyword};
use self::keywords::wait::wait_keyword;
@ -76,7 +75,6 @@ impl ScriptService {
set_kb_keyword(state.clone(), user.clone(), &mut engine);
add_kb_keyword(state.clone(), user.clone(), &mut engine);
add_tool_keyword(state.clone(), user.clone(), &mut engine);
remove_tool_keyword(state.clone(), user.clone(), &mut engine);
clear_tools_keyword(state.clone(), user.clone(), &mut engine);
list_tools_keyword(state.clone(), user.clone(), &mut engine);
add_website_keyword(state.clone(), user.clone(), &mut engine);

View file

@ -1,4 +1,4 @@
use crate::config::AppConfig;
use crate::config::{AppConfig, write_drive_config_to_env};
use crate::package_manager::{InstallMode, PackageManager};
use crate::shared::utils::establish_pg_connection;
use anyhow::Result;
@ -279,11 +279,19 @@ impl BootstrapManager {
self.s3_client = futures::executor::block_on(Self::create_s3_operator(&config));
// Load config from CSV if available
if let Ok(csv_config) = self.load_config_from_csv().await {
Ok(csv_config)
let final_config = if let Ok(csv_config) = self.load_config_from_csv().await {
csv_config
} else {
Ok(config)
config
};
// Write drive config to .env file if not already present (first bootstrap)
if std::env::var("DRIVE_SERVER").is_err() {
write_drive_config_to_env(&final_config.drive)
.map_err(|e| anyhow::anyhow!("Failed to write drive config to .env: {}", e))?;
}
Ok(final_config)
}

View file

@ -1,7 +1,7 @@
use diesel::prelude::*;
use diesel::pg::PgConnection;
use uuid::Uuid;
use log::{info, trace, warn};
use log::{info, trace};
// removed unused serde import
use std::collections::HashMap;
use std::fs::OpenOptions;
@ -197,7 +197,7 @@ impl AppConfig {
}
fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> {
pub fn write_drive_config_to_env(drive: &DriveConfig) -> std::io::Result<()> {
let mut file = OpenOptions::new()
.append(true)
.create(true)

View file

@ -339,7 +339,7 @@ impl DriveMonitor {
let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
std::fs::write(&local_source_path, &source_content)?;
let compiler = BasicCompiler::new(Arc::clone(&self.state), self.bot_id);
let mut compiler = BasicCompiler::new(Arc::clone(&self.state), self.bot_id);
let result = compiler.compile_file(&local_source_path, &work_dir)?;
if let Some(mcp_tool) = result.mcp_tool {