feat(automation): increase schedule field size and improve task checking

- Increased schedule field size from bpchar(12) to bpchar(20) in database schema
- Reduced task checking interval from 60s to 5s for more responsive automation
- Improved error handling for schedule parsing and execution
- Added proper error logging for automation failures
- Changed automation execution to use bot_id instead of nil UUID
- Enhanced HEAR keyword functionality (partial diff shown)
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-12 15:04:04 -03:00
parent a9af37e385
commit 7de29e6189
4 changed files with 1886 additions and 1374 deletions

View file

@ -58,7 +58,7 @@ CREATE TABLE public.system_automations (
bot_id uuid NOT NULL, bot_id uuid NOT NULL,
kind int4 NOT NULL, kind int4 NOT NULL,
"target" varchar(32) NULL, "target" varchar(32) NULL,
schedule bpchar(12) NULL, schedule bpchar(20) NULL,
param varchar(32) NOT NULL, param varchar(32) NOT NULL,
is_active bool DEFAULT true NOT NULL, is_active bool DEFAULT true NOT NULL,
last_triggered timestamptz NULL, last_triggered timestamptz NULL,

View file

@ -18,7 +18,7 @@ impl AutomationService {
Self { state } Self { state }
} }
pub async fn spawn(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { pub async fn spawn(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut ticker = interval(Duration::from_secs(60)); let mut ticker = interval(Duration::from_secs(5));
loop { loop {
ticker.tick().await; ticker.tick().await;
if let Err(e) = self.check_scheduled_tasks().await { if let Err(e) = self.check_scheduled_tasks().await {
@ -41,23 +41,33 @@ impl AutomationService {
.load::<Automation>(&mut conn)?; .load::<Automation>(&mut conn)?;
for automation in automations { for automation in automations {
if let Some(schedule_str) = &automation.schedule { if let Some(schedule_str) = &automation.schedule {
if let Ok(parsed_schedule) = Schedule::from_str(schedule_str) { match Schedule::from_str(schedule_str.trim()) {
let now = Utc::now(); Ok(parsed_schedule) => {
let next_run = parsed_schedule.upcoming(Utc).next(); let now = Utc::now();
if let Some(next_time) = next_run { let next_run = parsed_schedule.upcoming(Utc).next();
let time_until_next = next_time - now; if let Some(next_time) = next_run {
if time_until_next.num_minutes() < 1 { let time_until_next = next_time - now;
if let Some(last_triggered) = automation.last_triggered { if time_until_next.num_minutes() < 1 {
if (now - last_triggered).num_minutes() < 1 { if let Some(last_triggered) = automation.last_triggered {
continue; if (now - last_triggered).num_minutes() < 1 {
continue;
}
}
if let Err(e) = self.execute_automation(&automation).await {
error!("Error executing automation {}: {}", automation.id, e);
}
if let Err(e) = diesel::update(system_automations.filter(id.eq(automation.id)))
.set(lt_column.eq(Some(now)))
.execute(&mut conn)
{
error!("Error updating last_triggered for automation {}: {}", automation.id, e);
} }
} }
self.execute_automation(&automation).await?;
diesel::update(system_automations.filter(id.eq(automation.id)))
.set(lt_column.eq(Some(now)))
.execute(&mut conn)?;
} }
} }
Err(e) => {
error!("Error parsing schedule for automation {} ({}): {}", automation.id, schedule_str, e);
}
} }
} }
} }
@ -91,7 +101,7 @@ impl AutomationService {
}; };
let session = { let session = {
let mut sm = self.state.session_manager.lock().await; let mut sm = self.state.session_manager.lock().await;
let admin_user = uuid::Uuid::nil(); let admin_user = automation.bot_id;
sm.get_or_create_user_session(admin_user, automation.bot_id, "Automation")? sm.get_or_create_user_session(admin_user, automation.bot_id, "Automation")?
.ok_or("Failed to create session")? .ok_or("Failed to create session")?
}; };

View file

@ -4,105 +4,138 @@ use log::{error, trace};
use rhai::{Dynamic, Engine, EvalAltResult}; use rhai::{Dynamic, Engine, EvalAltResult};
use std::sync::Arc; use std::sync::Arc;
pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let session_id = user.id; let session_id = user.id;
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
engine engine
.register_custom_syntax(&["HEAR", "$ident$"], true, move |_context, inputs| { .register_custom_syntax(&["HEAR", "$ident$"], true, move |_context, inputs| {
let variable_name = inputs[0].get_string_value().expect("Expected identifier as string").to_string(); let variable_name = inputs[0]
trace!("HEAR command waiting for user input to store in variable: {}", variable_name); .get_string_value()
let state_for_spawn = Arc::clone(&state_clone); .expect("Expected identifier as string")
let session_id_clone = session_id; .to_string();
let var_name_clone = variable_name.clone(); trace!(
tokio::spawn(async move { "HEAR command waiting for user input to store in variable: {}",
trace!("HEAR: Setting session {} to wait for input for variable '{}'", session_id_clone, var_name_clone); variable_name
let mut session_manager = state_for_spawn.session_manager.lock().await; );
session_manager.mark_waiting(session_id_clone); let state_for_spawn = Arc::clone(&state_clone);
if let Some(redis_client) = &state_for_spawn.cache { let session_id_clone = session_id;
let mut conn = match redis_client.get_multiplexed_async_connection().await { let var_name_clone = variable_name.clone();
Ok(conn) => conn, tokio::spawn(async move {
Err(e) => { trace!(
error!("Failed to connect to cache: {}", e); "HEAR: Setting session {} to wait for input for variable '{}'",
return; session_id_clone,
} var_name_clone
}; );
let key = format!("hear:{}:{}", session_id_clone, var_name_clone); let mut session_manager = state_for_spawn.session_manager.lock().await;
let _: Result<(), _> = redis::cmd("SET").arg(&key).arg("waiting").query_async(&mut conn).await; session_manager.mark_waiting(session_id_clone);
} if let Some(redis_client) = &state_for_spawn.cache {
}); let mut conn = match redis_client.get_multiplexed_async_connection().await {
Err(Box::new(EvalAltResult::ErrorRuntime("Waiting for user input".into(), rhai::Position::NONE))) Ok(conn) => conn,
}) Err(e) => {
.unwrap(); error!("Failed to connect to cache: {}", e);
return;
}
};
let key = format!("hear:{}:{}", session_id_clone, var_name_clone);
let _: Result<(), _> = redis::cmd("SET")
.arg(&key)
.arg("waiting")
.query_async(&mut conn)
.await;
}
});
Err(Box::new(EvalAltResult::ErrorRuntime(
"Waiting for user input".into(),
rhai::Position::NONE,
)))
})
.unwrap();
} }
pub async fn execute_talk(state: Arc<AppState>, user_session: UserSession, message: String) -> Result<BotResponse, Box<dyn std::error::Error>> { pub async fn execute_talk(
let mut suggestions = Vec::new(); state: Arc<AppState>,
if let Some(redis_client) = &state.cache { user_session: UserSession,
let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?; message: String,
let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id); ) -> Result<BotResponse, Box<dyn std::error::Error>> {
let suggestions_json: Result<Vec<String>, _> = redis::cmd("LRANGE").arg(redis_key.as_str()).arg(0).arg(-1).query_async(&mut conn).await; let mut suggestions = Vec::new();
match suggestions_json { if let Some(redis_client) = &state.cache {
Ok(suggestions_json) => { let mut conn: redis::aio::MultiplexedConnection =
suggestions = suggestions_json.into_iter().filter_map(|s| serde_json::from_str(&s).ok()).collect(); redis_client.get_multiplexed_async_connection().await?;
} let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id);
Err(e) => { let suggestions_json: Result<Vec<String>, _> = redis::cmd("LRANGE")
error!("Failed to load suggestions from Redis: {}", e); .arg(redis_key.as_str())
} .arg(0)
} .arg(-1)
} .query_async(&mut conn)
let response = BotResponse { .await;
bot_id: user_session.bot_id.to_string(), match suggestions_json {
user_id: user_session.user_id.to_string(), Ok(suggestions_json) => {
session_id: user_session.id.to_string(), suggestions = suggestions_json
channel: "web".to_string(), .into_iter()
content: message, .filter_map(|s| serde_json::from_str(&s).ok())
message_type: 1, .collect();
stream_token: None, }
is_complete: true, Err(e) => {
suggestions, error!("Failed to load suggestions from Redis: {}", e);
context_name: None, }
context_length: 0, }
context_max_length: 0, }
}; let response = BotResponse {
let user_id = user_session.id.to_string(); bot_id: user_session.bot_id.to_string(),
let response_clone = response.clone(); user_id: user_session.user_id.to_string(),
match state.response_channels.try_lock() { session_id: user_session.id.to_string(),
Ok(response_channels) => { channel: "web".to_string(),
if let Some(tx) = response_channels.get(&user_id) { content: message,
if let Err(e) = tx.try_send(response_clone) { message_type: 1,
error!("Failed to send TALK message via WebSocket: {}", e); stream_token: None,
} else { is_complete: true,
trace!("TALK message sent via WebSocket"); suggestions,
} context_name: None,
} else { context_length: 0,
let web_adapter = Arc::clone(&state.web_adapter); context_max_length: 0,
tokio::spawn(async move { };
if let Err(e) = web_adapter.send_message_to_session(&user_id, response_clone).await { let user_id = user_session.id.to_string();
error!("Failed to send TALK message via web adapter: {}", e); let response_clone = response.clone();
} else { match state.response_channels.try_lock() {
trace!("TALK message sent via web adapter"); Ok(response_channels) => {
} if let Some(tx) = response_channels.get(&user_id) {
}); if let Err(e) = tx.try_send(response_clone) {
} error!("Failed to send TALK message via WebSocket: {}", e);
} } else {
Err(_) => { trace!("TALK message sent via WebSocket");
error!("Failed to acquire lock on response_channels for TALK command"); }
} } else {
} let web_adapter = Arc::clone(&state.web_adapter);
Ok(response) tokio::spawn(async move {
if let Err(e) = web_adapter
.send_message_to_session(&user_id, response_clone)
.await
{
error!("Failed to send TALK message via web adapter: {}", e);
} else {
trace!("TALK message sent via web adapter");
}
});
}
}
Err(_) => {
error!("Failed to acquire lock on response_channels for TALK command");
}
}
Ok(response)
} }
pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
let user_clone = user.clone(); let user_clone = user.clone();
engine engine
.register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| { .register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| {
let message = context.eval_expression_tree(&inputs[0])?.to_string(); let message = context.eval_expression_tree(&inputs[0])?.to_string();
let state_for_talk = Arc::clone(&state_clone); let state_for_talk = Arc::clone(&state_clone);
let user_for_talk = user_clone.clone(); let user_for_talk = user_clone.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = execute_talk(state_for_talk, user_for_talk, message).await { if let Err(e) = execute_talk(state_for_talk, user_for_talk, message).await {
error!("Error executing TALK command: {}", e); error!("Error executing TALK command: {}", e);
} }
}); });
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}) })
.unwrap(); .unwrap();
} }

File diff suppressed because it is too large Load diff