fix: start.bas redundant execution with atomic SET NX guard
All checks were successful
BotServer CI / build (push) Successful in 42s
All checks were successful
BotServer CI / build (push) Successful in 42s
This commit is contained in:
parent
5c54f7bc72
commit
debe29a6dc
1 changed files with 23 additions and 22 deletions
|
|
@ -772,23 +772,37 @@ let system_prompt = if !message.active_switchers.is_empty() {
|
||||||
{
|
{
|
||||||
// Execute start.bas on first message only (use Redis flag to prevent re-execution)
|
// Execute start.bas on first message only (use Redis flag to prevent re-execution)
|
||||||
let actual_session_id = session.id.to_string();
|
let actual_session_id = session.id.to_string();
|
||||||
|
let start_bas_flag_key = format!("start_bas_executed:{}", actual_session_id);
|
||||||
|
|
||||||
// Check Redis flag - only execute start.bas once per session
|
// Check Redis flag - only execute start.bas once per session
|
||||||
|
// We use SET NX to atomically check and set the flag
|
||||||
let should_execute_start_bas = if let Some(cache) = &self.state.cache {
|
let should_execute_start_bas = if let Some(cache) = &self.state.cache {
|
||||||
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
|
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
|
||||||
let key = format!("start_bas_executed:{}", actual_session_id);
|
let was_set: Option<String> = redis::cmd("SET")
|
||||||
let already_executed: Option<String> = redis::cmd("GET")
|
.arg(&start_bas_flag_key)
|
||||||
.arg(&key)
|
.arg("1")
|
||||||
|
.arg("NX")
|
||||||
|
.arg("EX")
|
||||||
|
.arg(86400) // 24h TTL
|
||||||
.query_async(&mut conn)
|
.query_async(&mut conn)
|
||||||
.await
|
.await
|
||||||
.ok()
|
.ok();
|
||||||
.flatten();
|
|
||||||
already_executed.is_none()
|
// If SET NX returned "OK" (Some("OK")), it means it was the first time
|
||||||
|
// If it returned nil (None), it means it was already set
|
||||||
|
let is_first_time = was_set.is_some();
|
||||||
|
|
||||||
|
if !is_first_time {
|
||||||
|
trace!("start.bas already executed for session {}, skipping in stream_response", actual_session_id);
|
||||||
|
}
|
||||||
|
is_first_time
|
||||||
} else {
|
} else {
|
||||||
true // No cache connection, execute as fallback
|
warn!("Failed to get Redis connection for start.bas guard, skipping to avoid redundant execution");
|
||||||
|
false
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
true // No cache configured, execute as fallback
|
trace!("No cache configured, skipping start.bas guard in stream_response");
|
||||||
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
if should_execute_start_bas {
|
if should_execute_start_bas {
|
||||||
|
|
@ -845,20 +859,7 @@ let system_prompt = if !message.active_switchers.is_empty() {
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
trace!("start.bas completed successfully for session {}", actual_session_id);
|
info!("start.bas completed successfully for session {}", actual_session_id);
|
||||||
// Set Redis flag so start.bas does not run again for this session
|
|
||||||
if let Some(cache) = &self.state.cache {
|
|
||||||
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
|
|
||||||
let key = format!("start_bas_executed:{}", actual_session_id);
|
|
||||||
let _: Result<(), redis::RedisError> = redis::cmd("SET")
|
|
||||||
.arg(&key)
|
|
||||||
.arg("1")
|
|
||||||
.arg("EX")
|
|
||||||
.arg(86400) // 24h TTL
|
|
||||||
.query_async(&mut conn)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
error!("start.bas error for session {}: {}", actual_session_id, e);
|
error!("start.bas error for session {}: {}", actual_session_id, e);
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue