From d2314ce44be89bc1d3ae45c6c653f450096dd8ef Mon Sep 17 00:00:00 2001 From: Rodrigo Rodriguez Date: Thu, 30 Apr 2026 17:08:38 -0300 Subject: [PATCH] fix: unified atomic redis guard for start.bas in both websocket and stream flows --- botserver/src/core/bot/mod.rs | 60 +++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/botserver/src/core/bot/mod.rs b/botserver/src/core/bot/mod.rs index 0697095d..86b5b4d6 100644 --- a/botserver/src/core/bot/mod.rs +++ b/botserver/src/core/bot/mod.rs @@ -772,7 +772,8 @@ let system_prompt = if !message.active_switchers.is_empty() { { // Execute start.bas on first message only (use Redis flag to prevent re-execution) let actual_session_id = session.id.to_string(); - let start_bas_flag_key = format!("start_bas_executed:{}", actual_session_id); + let bot_id_str = session.bot_id.to_string(); + let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id); // Check Redis flag - only execute start.bas once per session // We use SET NX to atomically check and set the flag @@ -797,12 +798,16 @@ let system_prompt = if !message.active_switchers.is_empty() { } is_first_time } else { + // Fallback: if Redis is down, we check if it's already initialized in memory if possible, + // but here we default to false to be safe and avoid double execution in most cases warn!("Failed to get Redis connection for start.bas guard, skipping to avoid redundant execution"); false } } else { - trace!("No cache configured, skipping start.bas guard in stream_response"); - false + // If no cache, we can't easily track globally, so we run it (legacy behavior) + // or we could track in-memory. For now, let's allow it to run once per instance. + trace!("No cache configured, allowing start.bas execution in stream_response"); + true }; if should_execute_start_bas { @@ -1814,6 +1819,34 @@ async fn handle_websocket( }; if let Ok(Some(mut session)) = session_result { + // Unified guard for start.bas execution (shared with stream_response) + let actual_session_id = session.id.to_string(); + let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id); + + let should_execute = if let Some(cache) = &state_for_start.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + let was_set: Option = redis::cmd("SET") + .arg(&start_bas_flag_key) + .arg("1") + .arg("NX") + .arg("EX") + .arg(86400) + .query_async(&mut conn) + .await + .ok(); + was_set.is_some() + } else { + false // Safe fallback + } + } else { + true // No cache, allow execution + }; + + if !should_execute { + trace!("start.bas already executed for session {}, skipping in handle_websocket", actual_session_id); + return; + } + info!("start.bas: Found session {} for websocket session {}", session.id, session_id); // Save session ID before session is moved into closure @@ -1848,19 +1881,14 @@ async fn handle_websocket( match result { Ok(Ok(())) => { info!("start.bas executed successfully for bot {}", bot_name); - // Set Redis flag so start.bas does not run again in stream_response - if let Some(cache) = &state_for_redis.cache { - if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { - let key = format!("start_bas_executed:{}", session_id_for_redis); - let _: Result<(), redis::RedisError> = redis::cmd("SET") - .arg(&key) - .arg("1") - .arg("EX") - .arg(86400) // 24h TTL - .query_async(&mut conn) - .await; - } - } + } + Ok(Err(e)) => { + error!("start.bas error for bot {}: {}", bot_name, e); + } + Err(e) => { + error!("start.bas task error for bot {}: {}", bot_name, e); + } + } // Fetch suggestions and switchers from Redis and send to frontend // Use session_id_for_redis (DB session) not session_id_str (WebSocket session) for Redis key consistency