fix: unified atomic redis guard for start.bas in both websocket and stream flows
All checks were successful
BotServer CI / build (push) Successful in 47s

This commit is contained in:
Rodrigo Rodriguez 2026-04-30 17:08:38 -03:00
parent e9b1a3078c
commit d2314ce44b

View file

@ -772,7 +772,8 @@ let system_prompt = if !message.active_switchers.is_empty() {
{
// Execute start.bas on first message only (use Redis flag to prevent re-execution)
let actual_session_id = session.id.to_string();
let start_bas_flag_key = format!("start_bas_executed:{}", actual_session_id);
let bot_id_str = session.bot_id.to_string();
let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id);
// Check Redis flag - only execute start.bas once per session
// We use SET NX to atomically check and set the flag
@ -797,12 +798,16 @@ let system_prompt = if !message.active_switchers.is_empty() {
}
is_first_time
} else {
// Fallback: if Redis is down, we check if it's already initialized in memory if possible,
// but here we default to false to be safe and avoid double execution in most cases
warn!("Failed to get Redis connection for start.bas guard, skipping to avoid redundant execution");
false
}
} else {
trace!("No cache configured, skipping start.bas guard in stream_response");
false
// If no cache, we can't easily track globally, so we run it (legacy behavior)
// or we could track in-memory. For now, let's allow it to run once per instance.
trace!("No cache configured, allowing start.bas execution in stream_response");
true
};
if should_execute_start_bas {
@ -1814,6 +1819,34 @@ async fn handle_websocket(
};
if let Ok(Some(mut session)) = session_result {
// Unified guard for start.bas execution (shared with stream_response)
let actual_session_id = session.id.to_string();
let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id);
let should_execute = if let Some(cache) = &state_for_start.cache {
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
let was_set: Option<String> = redis::cmd("SET")
.arg(&start_bas_flag_key)
.arg("1")
.arg("NX")
.arg("EX")
.arg(86400)
.query_async(&mut conn)
.await
.ok();
was_set.is_some()
} else {
false // Safe fallback
}
} else {
true // No cache, allow execution
};
if !should_execute {
trace!("start.bas already executed for session {}, skipping in handle_websocket", actual_session_id);
return;
}
info!("start.bas: Found session {} for websocket session {}", session.id, session_id);
// Save session ID before session is moved into closure
@ -1848,19 +1881,14 @@ async fn handle_websocket(
match result {
Ok(Ok(())) => {
info!("start.bas executed successfully for bot {}", bot_name);
// Set Redis flag so start.bas does not run again in stream_response
if let Some(cache) = &state_for_redis.cache {
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
let key = format!("start_bas_executed:{}", session_id_for_redis);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("1")
.arg("EX")
.arg(86400) // 24h TTL
.query_async(&mut conn)
.await;
}
}
}
Ok(Err(e)) => {
error!("start.bas error for bot {}: {}", bot_name, e);
}
Err(e) => {
error!("start.bas task error for bot {}: {}", bot_name, e);
}
}
// Fetch suggestions and switchers from Redis and send to frontend
// Use session_id_for_redis (DB session) not session_id_str (WebSocket session) for Redis key consistency