Fix HEAR: block thread on channel instead of throw+re-run from top
Some checks failed
BotServer CI / build (push) Failing after 6m54s

- Add hear_channels: HashMap<Uuid, SyncSender<String>> to AppState
- HEAR now blocks the spawn_blocking thread via sync_channel recv()
- deliver_hear_input() called at top of stream_response() to unblock
- Script continues from exact HEAR position, no side-effect re-execution
- All three HEAR variants (basic, AS TYPE, AS MENU) use same mechanism
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-16 23:00:17 -03:00
parent 35b793d29c
commit dae78479d2
6 changed files with 114 additions and 136 deletions

View file

@ -6,6 +6,6 @@ mod validators;
pub use processing::{process_hear_input, process_audio_to_text, process_qrcode, process_video_description}; pub use processing::{process_hear_input, process_audio_to_text, process_qrcode, process_video_description};
pub use talk::{execute_talk, talk_keyword}; pub use talk::{execute_talk, talk_keyword};
pub use syntax::hear_keyword; pub use syntax::{hear_keyword, deliver_hear_input};
pub use types::{InputType, ValidationResult}; pub use types::{InputType, ValidationResult};
pub use validators::validate_input; pub use validators::validate_input;

View file

@ -1,7 +1,7 @@
use crate::core::shared::models::UserSession; use crate::core::shared::models::UserSession;
use crate::core::shared::state::AppState; use crate::core::shared::state::AppState;
use log::trace; use log::trace;
use rhai::{Engine, EvalAltResult}; use rhai::{Dynamic, Engine, EvalAltResult};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
@ -9,71 +9,91 @@ use super::types::InputType;
pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
register_hear_basic(Arc::clone(&state), user.clone(), engine); register_hear_basic(Arc::clone(&state), user.clone(), engine);
register_hear_as_type(Arc::clone(&state), user.clone(), engine); register_hear_as_type(Arc::clone(&state), user.clone(), engine);
register_hear_as_menu(state, user, engine); register_hear_as_menu(state, user, engine);
} }
/// Block the Rhai thread until the user sends input.
/// Registers a sync_channel receiver in AppState::hear_channels keyed by session_id,
/// then blocks. The async message handler calls `deliver_hear_input` to unblock it.
fn hear_block(state: &Arc<AppState>, session_id: uuid::Uuid, variable_name: &str, wait_data: serde_json::Value) -> Result<Dynamic, Box<EvalAltResult>> {
let (tx, rx) = std::sync::mpsc::sync_channel::<String>(1);
// Register the sender so the async handler can find it
if let Ok(mut map) = state.hear_channels.lock() {
map.insert(session_id, tx);
}
// Mark session as waiting and store metadata in Redis (for UI hints like menus)
let state_clone = Arc::clone(state);
let var = variable_name.to_string();
let _ = tokio::runtime::Handle::current().block_on(async move {
{
let mut sm = state_clone.session_manager.lock().await;
sm.mark_waiting(session_id);
}
if let Some(redis) = &state_clone.cache {
if let Ok(mut conn) = redis.get_multiplexed_async_connection().await {
let key = format!("hear:{session_id}:{var}");
let _: Result<(), _> = redis::cmd("SET")
.arg(&key)
.arg(wait_data.to_string())
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
}
}
});
trace!("HEAR {variable_name}: blocking thread, waiting for user input");
// Block the Rhai thread (runs in spawn_blocking, so this is safe)
match rx.recv() {
Ok(value) => {
trace!("HEAR {variable_name}: received '{value}', resuming script");
Ok(value.into())
}
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"HEAR channel closed".into(),
rhai::Position::NONE,
))),
}
}
/// Called by the async message handler when the user sends a reply.
/// Unblocks the waiting Rhai thread.
pub fn deliver_hear_input(state: &AppState, session_id: uuid::Uuid, value: String) -> bool {
if let Ok(mut map) = state.hear_channels.lock() {
if let Some(tx) = map.remove(&session_id) {
return tx.send(value).is_ok();
}
}
false
}
fn register_hear_basic(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { fn register_hear_basic(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let session_id = user.id; let session_id = user.id;
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
engine engine
.register_custom_syntax(["HEAR", "$ident$"], true, move |_context, inputs| { .register_custom_syntax(["HEAR", "$ident$"], true, move |context, inputs| {
let variable_name = inputs[0] let variable_name = inputs[0]
.get_string_value() .get_string_value()
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime( .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime(
"Expected identifier as string".into(), "Expected identifier".into(),
rhai::Position::NONE, rhai::Position::NONE,
)))? )))?
.to_lowercase(); .to_lowercase();
trace!( let value = hear_block(&state_clone, session_id, &variable_name, json!({
"HEAR command waiting for user input to store in variable: {}", "variable": variable_name,
variable_name "type": "any",
); "waiting": true
}))?;
let state_for_spawn = Arc::clone(&state_clone); context.scope_mut().set_or_push(&variable_name, value.clone());
let session_id_clone = session_id; Ok(value)
tokio::spawn(async move {
trace!(
"HEAR: Setting session {} to wait for input for variable '{}'",
session_id_clone,
variable_name
);
{
let mut session_manager = state_for_spawn.session_manager.lock().await;
session_manager.mark_waiting(session_id_clone);
}
if let Some(redis_client) = &state_for_spawn.cache {
if let Ok(conn) = redis_client.get_multiplexed_async_connection().await {
let mut conn = conn;
let key = format!("hear:{session_id_clone}:{variable_name}");
let wait_data = json!({
"variable": variable_name,
"type": "any",
"waiting": true,
"retry_count": 0
});
let _: Result<(), _> = redis::cmd("SET")
.arg(key)
.arg(wait_data.to_string())
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
}
}
});
Err(Box::new(EvalAltResult::ErrorRuntime(
"Waiting for user input".into(),
rhai::Position::NONE,
)))
}) })
.expect("valid syntax registration"); .expect("valid syntax registration");
} }
@ -86,7 +106,7 @@ fn register_hear_as_type(state: Arc<AppState>, user: UserSession, engine: &mut E
.register_custom_syntax( .register_custom_syntax(
["HEAR", "$ident$", "AS", "$ident$"], ["HEAR", "$ident$", "AS", "$ident$"],
true, true,
move |_context, inputs| { move |context, inputs| {
let variable_name = inputs[0] let variable_name = inputs[0]
.get_string_value() .get_string_value()
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime( .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime(
@ -102,47 +122,15 @@ fn register_hear_as_type(state: Arc<AppState>, user: UserSession, engine: &mut E
)))? )))?
.to_string(); .to_string();
let _input_type = InputType::parse_type(&type_name); let value = hear_block(&state_clone, session_id, &variable_name, json!({
"variable": variable_name,
"type": type_name.to_lowercase(),
"waiting": true,
"max_retries": 3
}))?;
trace!("HEAR {variable_name} AS {type_name} - waiting for validated input"); context.scope_mut().set_or_push(&variable_name, value.clone());
Ok(value)
let state_for_spawn = Arc::clone(&state_clone);
let session_id_clone = session_id;
let var_name_clone = variable_name;
let type_clone = type_name;
tokio::spawn(async move {
{
let mut session_manager = state_for_spawn.session_manager.lock().await;
session_manager.mark_waiting(session_id_clone);
}
if let Some(redis_client) = &state_for_spawn.cache {
if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await
{
let key = format!("hear:{session_id_clone}:{var_name_clone}");
let wait_data = json!({
"variable": var_name_clone,
"type": type_clone.to_lowercase(),
"waiting": true,
"retry_count": 0,
"max_retries": 3
});
let _: Result<(), _> = redis::cmd("SET")
.arg(key)
.arg(wait_data.to_string())
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
}
}
});
Err(Box::new(EvalAltResult::ErrorRuntime(
"Waiting for user input".into(),
rhai::Position::NONE,
)))
}, },
) )
.expect("valid syntax registration"); .expect("valid syntax registration");
@ -193,48 +181,17 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
))); )));
} }
trace!("HEAR {} AS MENU with options: {:?}", variable_name, options); // Store suggestions in Redis for UI
let state_for_suggestions = Arc::clone(&state_clone);
let state_for_spawn = Arc::clone(&state_clone); let opts_clone = options.clone();
let session_id_clone = session_id; let _ = tokio::runtime::Handle::current().block_on(async move {
let var_name_clone = variable_name; if let Some(redis) = &state_for_suggestions.cache {
let options_clone = options; if let Ok(mut conn) = redis.get_multiplexed_async_connection().await {
let key = format!("suggestions:{session_id}:{session_id}");
tokio::spawn(async move { for opt in &opts_clone {
{
let mut session_manager = state_for_spawn.session_manager.lock().await;
session_manager.mark_waiting(session_id_clone);
}
if let Some(redis_client) = &state_for_spawn.cache {
if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await
{
let key = format!("hear:{session_id_clone}:{var_name_clone}");
let wait_data = json!({
"variable": var_name_clone,
"type": "menu",
"options": options_clone,
"waiting": true,
"retry_count": 0
});
let _: Result<(), _> = redis::cmd("SET")
.arg(key)
.arg(wait_data.to_string())
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
let suggestions_key =
format!("suggestions:{session_id_clone}:{session_id_clone}");
for opt in &options_clone {
let suggestion = json!({
"text": opt,
"value": opt
});
let _: Result<(), _> = redis::cmd("RPUSH") let _: Result<(), _> = redis::cmd("RPUSH")
.arg(&suggestions_key) .arg(&key)
.arg(suggestion.to_string()) .arg(json!({"text": opt, "value": opt}).to_string())
.query_async(&mut conn) .query_async(&mut conn)
.await; .await;
} }
@ -242,10 +199,15 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
} }
}); });
Err(Box::new(EvalAltResult::ErrorRuntime( let value = hear_block(&state_clone, session_id, &variable_name, json!({
"Waiting for user input".into(), "variable": variable_name,
rhai::Position::NONE, "type": "menu",
))) "options": options,
"waiting": true
}))?;
context.scope_mut().set_or_push(&variable_name, value.clone());
Ok(value)
}, },
) )
.expect("valid syntax registration"); .expect("valid syntax registration");

View file

@ -448,6 +448,17 @@ impl BotOrchestrator {
let session_id = Uuid::parse_str(&message.session_id)?; let session_id = Uuid::parse_str(&message.session_id)?;
let message_content = message.content.clone(); let message_content = message.content.clone();
// If a HEAR is blocking the script thread for this session, deliver the input
// directly and return — the script continues from where it paused.
if crate::basic::keywords::hearing::syntax::deliver_hear_input(
&self.state,
session_id,
message_content.clone(),
) {
trace!("HEAR: delivered input to blocking script for session {session_id}");
return Ok(());
}
let (session, context_data, history, model, key, system_prompt, bot_llm_url, explicit_llm_provider) = { let (session, context_data, history, model, key, system_prompt, bot_llm_url, explicit_llm_provider) = {
let state_clone = self.state.clone(); let state_clone = self.state.clone();
tokio::task::spawn_blocking( tokio::task::spawn_blocking(

View file

@ -376,6 +376,8 @@ pub struct AppState {
pub auth_service: Arc<tokio::sync::Mutex<AuthService>>, pub auth_service: Arc<tokio::sync::Mutex<AuthService>>,
pub channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<dyn ChannelAdapter>>>>, pub channels: Arc<tokio::sync::Mutex<HashMap<String, Arc<dyn ChannelAdapter>>>>,
pub response_channels: Arc<tokio::sync::Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>, pub response_channels: Arc<tokio::sync::Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
/// Blocking channels for HEAR: session_id → sender. Rhai thread blocks on receiver.
pub hear_channels: Arc<std::sync::Mutex<HashMap<uuid::Uuid, std::sync::mpsc::SyncSender<String>>>>,
pub web_adapter: Arc<WebChannelAdapter>, pub web_adapter: Arc<WebChannelAdapter>,
pub voice_adapter: Arc<VoiceAdapter>, pub voice_adapter: Arc<VoiceAdapter>,
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]
@ -636,6 +638,7 @@ impl Default for AppState {
auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())), auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())),
channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())),
web_adapter: Arc::new(WebChannelAdapter::new()), web_adapter: Arc::new(WebChannelAdapter::new()),
voice_adapter: Arc::new(VoiceAdapter::new()), voice_adapter: Arc::new(VoiceAdapter::new()),
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]

View file

@ -216,6 +216,7 @@ impl TestAppStateBuilder {
auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())), auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())),
channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())),
web_adapter: Arc::new(WebChannelAdapter::new()), web_adapter: Arc::new(WebChannelAdapter::new()),
voice_adapter: Arc::new(VoiceAdapter::new()), voice_adapter: Arc::new(VoiceAdapter::new()),
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]

View file

@ -576,6 +576,7 @@ pub async fn create_app_state(
map map
})), })),
response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())),
web_adapter: web_adapter.clone(), web_adapter: web_adapter.clone(),
voice_adapter: voice_adapter.clone(), voice_adapter: voice_adapter.clone(),
#[cfg(any(feature = "research", feature = "llm"))] #[cfg(any(feature = "research", feature = "llm"))]