From debe29a6dcad660efd9af9ca387d99ee841a0537 Mon Sep 17 00:00:00 2001 From: Rodrigo Rodriguez Date: Thu, 30 Apr 2026 16:45:37 -0300 Subject: [PATCH] fix: start.bas redundant execution with atomic SET NX guard --- botserver/src/core/bot/mod.rs | 45 ++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/botserver/src/core/bot/mod.rs b/botserver/src/core/bot/mod.rs index f129f3dd..00011f57 100644 --- a/botserver/src/core/bot/mod.rs +++ b/botserver/src/core/bot/mod.rs @@ -772,23 +772,37 @@ let system_prompt = if !message.active_switchers.is_empty() { { // Execute start.bas on first message only (use Redis flag to prevent re-execution) let actual_session_id = session.id.to_string(); + let start_bas_flag_key = format!("start_bas_executed:{}", actual_session_id); // Check Redis flag - only execute start.bas once per session + // We use SET NX to atomically check and set the flag let should_execute_start_bas = if let Some(cache) = &self.state.cache { if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { - let key = format!("start_bas_executed:{}", actual_session_id); - let already_executed: Option = redis::cmd("GET") - .arg(&key) + let was_set: Option = redis::cmd("SET") + .arg(&start_bas_flag_key) + .arg("1") + .arg("NX") + .arg("EX") + .arg(86400) // 24h TTL .query_async(&mut conn) .await - .ok() - .flatten(); - already_executed.is_none() + .ok(); + + // If SET NX returned "OK" (Some("OK")), it means it was the first time + // If it returned nil (None), it means it was already set + let is_first_time = was_set.is_some(); + + if !is_first_time { + trace!("start.bas already executed for session {}, skipping in stream_response", actual_session_id); + } + is_first_time } else { - true // No cache connection, execute as fallback + warn!("Failed to get Redis connection for start.bas guard, skipping to avoid redundant execution"); + false } } else { - true // No cache configured, execute as fallback + trace!("No cache configured, skipping start.bas guard in stream_response"); + false }; if should_execute_start_bas { @@ -845,20 +859,7 @@ let system_prompt = if !message.active_switchers.is_empty() { match result { Ok(Ok(())) => { - trace!("start.bas completed successfully for session {}", actual_session_id); - // Set Redis flag so start.bas does not run again for this session - if let Some(cache) = &self.state.cache { - if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { - let key = format!("start_bas_executed:{}", actual_session_id); - let _: Result<(), redis::RedisError> = redis::cmd("SET") - .arg(&key) - .arg("1") - .arg("EX") - .arg(86400) // 24h TTL - .query_async(&mut conn) - .await; - } - } + info!("start.bas completed successfully for session {}", actual_session_id); } Ok(Err(e)) => { error!("start.bas error for session {}: {}", actual_session_id, e);