diff --git a/src/basic/keywords/hearing/mod.rs b/src/basic/keywords/hearing/mod.rs index 076d4850..168b1424 100644 --- a/src/basic/keywords/hearing/mod.rs +++ b/src/basic/keywords/hearing/mod.rs @@ -6,6 +6,6 @@ mod validators; pub use processing::{process_hear_input, process_audio_to_text, process_qrcode, process_video_description}; pub use talk::{execute_talk, talk_keyword}; -pub use syntax::hear_keyword; +pub use syntax::{hear_keyword, deliver_hear_input}; pub use types::{InputType, ValidationResult}; pub use validators::validate_input; diff --git a/src/basic/keywords/hearing/syntax.rs b/src/basic/keywords/hearing/syntax.rs index 5c56f930..43142af2 100644 --- a/src/basic/keywords/hearing/syntax.rs +++ b/src/basic/keywords/hearing/syntax.rs @@ -1,7 +1,7 @@ use crate::core::shared::models::UserSession; use crate::core::shared::state::AppState; use log::trace; -use rhai::{Engine, EvalAltResult}; +use rhai::{Dynamic, Engine, EvalAltResult}; use serde_json::json; use std::sync::Arc; @@ -9,71 +9,91 @@ use super::types::InputType; pub fn hear_keyword(state: Arc, user: UserSession, engine: &mut Engine) { register_hear_basic(Arc::clone(&state), user.clone(), engine); - register_hear_as_type(Arc::clone(&state), user.clone(), engine); - register_hear_as_menu(state, user, engine); } +/// Block the Rhai thread until the user sends input. +/// Registers a sync_channel receiver in AppState::hear_channels keyed by session_id, +/// then blocks. The async message handler calls `deliver_hear_input` to unblock it. +fn hear_block(state: &Arc, session_id: uuid::Uuid, variable_name: &str, wait_data: serde_json::Value) -> Result> { + let (tx, rx) = std::sync::mpsc::sync_channel::(1); + + // Register the sender so the async handler can find it + if let Ok(mut map) = state.hear_channels.lock() { + map.insert(session_id, tx); + } + + // Mark session as waiting and store metadata in Redis (for UI hints like menus) + let state_clone = Arc::clone(state); + let var = variable_name.to_string(); + let _ = tokio::runtime::Handle::current().block_on(async move { + { + let mut sm = state_clone.session_manager.lock().await; + sm.mark_waiting(session_id); + } + if let Some(redis) = &state_clone.cache { + if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { + let key = format!("hear:{session_id}:{var}"); + let _: Result<(), _> = redis::cmd("SET") + .arg(&key) + .arg(wait_data.to_string()) + .arg("EX") + .arg(3600) + .query_async(&mut conn) + .await; + } + } + }); + + trace!("HEAR {variable_name}: blocking thread, waiting for user input"); + + // Block the Rhai thread (runs in spawn_blocking, so this is safe) + match rx.recv() { + Ok(value) => { + trace!("HEAR {variable_name}: received '{value}', resuming script"); + Ok(value.into()) + } + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "HEAR channel closed".into(), + rhai::Position::NONE, + ))), + } +} + +/// Called by the async message handler when the user sends a reply. +/// Unblocks the waiting Rhai thread. +pub fn deliver_hear_input(state: &AppState, session_id: uuid::Uuid, value: String) -> bool { + if let Ok(mut map) = state.hear_channels.lock() { + if let Some(tx) = map.remove(&session_id) { + return tx.send(value).is_ok(); + } + } + false +} + fn register_hear_basic(state: Arc, user: UserSession, engine: &mut Engine) { let session_id = user.id; let state_clone = Arc::clone(&state); engine - .register_custom_syntax(["HEAR", "$ident$"], true, move |_context, inputs| { + .register_custom_syntax(["HEAR", "$ident$"], true, move |context, inputs| { let variable_name = inputs[0] .get_string_value() .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime( - "Expected identifier as string".into(), + "Expected identifier".into(), rhai::Position::NONE, )))? .to_lowercase(); - trace!( - "HEAR command waiting for user input to store in variable: {}", - variable_name - ); + let value = hear_block(&state_clone, session_id, &variable_name, json!({ + "variable": variable_name, + "type": "any", + "waiting": true + }))?; - let state_for_spawn = Arc::clone(&state_clone); - let session_id_clone = session_id; - - tokio::spawn(async move { - trace!( - "HEAR: Setting session {} to wait for input for variable '{}'", - session_id_clone, - variable_name - ); - - { - let mut session_manager = state_for_spawn.session_manager.lock().await; - session_manager.mark_waiting(session_id_clone); - } - - if let Some(redis_client) = &state_for_spawn.cache { - if let Ok(conn) = redis_client.get_multiplexed_async_connection().await { - let mut conn = conn; - let key = format!("hear:{session_id_clone}:{variable_name}"); - let wait_data = json!({ - "variable": variable_name, - "type": "any", - "waiting": true, - "retry_count": 0 - }); - let _: Result<(), _> = redis::cmd("SET") - .arg(key) - .arg(wait_data.to_string()) - .arg("EX") - .arg(3600) - .query_async(&mut conn) - .await; - } - } - }); - - Err(Box::new(EvalAltResult::ErrorRuntime( - "Waiting for user input".into(), - rhai::Position::NONE, - ))) + context.scope_mut().set_or_push(&variable_name, value.clone()); + Ok(value) }) .expect("valid syntax registration"); } @@ -86,7 +106,7 @@ fn register_hear_as_type(state: Arc, user: UserSession, engine: &mut E .register_custom_syntax( ["HEAR", "$ident$", "AS", "$ident$"], true, - move |_context, inputs| { + move |context, inputs| { let variable_name = inputs[0] .get_string_value() .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime( @@ -102,47 +122,15 @@ fn register_hear_as_type(state: Arc, user: UserSession, engine: &mut E )))? .to_string(); - let _input_type = InputType::parse_type(&type_name); + let value = hear_block(&state_clone, session_id, &variable_name, json!({ + "variable": variable_name, + "type": type_name.to_lowercase(), + "waiting": true, + "max_retries": 3 + }))?; - trace!("HEAR {variable_name} AS {type_name} - waiting for validated input"); - - let state_for_spawn = Arc::clone(&state_clone); - let session_id_clone = session_id; - let var_name_clone = variable_name; - let type_clone = type_name; - - tokio::spawn(async move { - { - let mut session_manager = state_for_spawn.session_manager.lock().await; - session_manager.mark_waiting(session_id_clone); - } - - if let Some(redis_client) = &state_for_spawn.cache { - if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await - { - let key = format!("hear:{session_id_clone}:{var_name_clone}"); - let wait_data = json!({ - "variable": var_name_clone, - "type": type_clone.to_lowercase(), - "waiting": true, - "retry_count": 0, - "max_retries": 3 - }); - let _: Result<(), _> = redis::cmd("SET") - .arg(key) - .arg(wait_data.to_string()) - .arg("EX") - .arg(3600) - .query_async(&mut conn) - .await; - } - } - }); - - Err(Box::new(EvalAltResult::ErrorRuntime( - "Waiting for user input".into(), - rhai::Position::NONE, - ))) + context.scope_mut().set_or_push(&variable_name, value.clone()); + Ok(value) }, ) .expect("valid syntax registration"); @@ -193,48 +181,17 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E ))); } - trace!("HEAR {} AS MENU with options: {:?}", variable_name, options); - - let state_for_spawn = Arc::clone(&state_clone); - let session_id_clone = session_id; - let var_name_clone = variable_name; - let options_clone = options; - - tokio::spawn(async move { - { - let mut session_manager = state_for_spawn.session_manager.lock().await; - session_manager.mark_waiting(session_id_clone); - } - - if let Some(redis_client) = &state_for_spawn.cache { - if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await - { - let key = format!("hear:{session_id_clone}:{var_name_clone}"); - let wait_data = json!({ - "variable": var_name_clone, - "type": "menu", - "options": options_clone, - "waiting": true, - "retry_count": 0 - }); - let _: Result<(), _> = redis::cmd("SET") - .arg(key) - .arg(wait_data.to_string()) - .arg("EX") - .arg(3600) - .query_async(&mut conn) - .await; - - let suggestions_key = - format!("suggestions:{session_id_clone}:{session_id_clone}"); - for opt in &options_clone { - let suggestion = json!({ - "text": opt, - "value": opt - }); + // Store suggestions in Redis for UI + let state_for_suggestions = Arc::clone(&state_clone); + let opts_clone = options.clone(); + let _ = tokio::runtime::Handle::current().block_on(async move { + if let Some(redis) = &state_for_suggestions.cache { + if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { + let key = format!("suggestions:{session_id}:{session_id}"); + for opt in &opts_clone { let _: Result<(), _> = redis::cmd("RPUSH") - .arg(&suggestions_key) - .arg(suggestion.to_string()) + .arg(&key) + .arg(json!({"text": opt, "value": opt}).to_string()) .query_async(&mut conn) .await; } @@ -242,10 +199,15 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E } }); - Err(Box::new(EvalAltResult::ErrorRuntime( - "Waiting for user input".into(), - rhai::Position::NONE, - ))) + let value = hear_block(&state_clone, session_id, &variable_name, json!({ + "variable": variable_name, + "type": "menu", + "options": options, + "waiting": true + }))?; + + context.scope_mut().set_or_push(&variable_name, value.clone()); + Ok(value) }, ) .expect("valid syntax registration"); diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 3f9f6cfa..4527a61a 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -448,6 +448,17 @@ impl BotOrchestrator { let session_id = Uuid::parse_str(&message.session_id)?; let message_content = message.content.clone(); + // If a HEAR is blocking the script thread for this session, deliver the input + // directly and return — the script continues from where it paused. + if crate::basic::keywords::hearing::syntax::deliver_hear_input( + &self.state, + session_id, + message_content.clone(), + ) { + trace!("HEAR: delivered input to blocking script for session {session_id}"); + return Ok(()); + } + let (session, context_data, history, model, key, system_prompt, bot_llm_url, explicit_llm_provider) = { let state_clone = self.state.clone(); tokio::task::spawn_blocking( diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 1c69f131..a6e6534c 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -376,6 +376,8 @@ pub struct AppState { pub auth_service: Arc>, pub channels: Arc>>>, pub response_channels: Arc>>>, + /// Blocking channels for HEAR: session_id → sender. Rhai thread blocks on receiver. + pub hear_channels: Arc>>>, pub web_adapter: Arc, pub voice_adapter: Arc, #[cfg(any(feature = "research", feature = "llm"))] @@ -636,6 +638,7 @@ impl Default for AppState { auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())), channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())), web_adapter: Arc::new(WebChannelAdapter::new()), voice_adapter: Arc::new(VoiceAdapter::new()), #[cfg(any(feature = "research", feature = "llm"))] diff --git a/src/core/shared/test_utils.rs b/src/core/shared/test_utils.rs index f24445ea..a2bde527 100644 --- a/src/core/shared/test_utils.rs +++ b/src/core/shared/test_utils.rs @@ -216,6 +216,7 @@ impl TestAppStateBuilder { auth_service: Arc::new(tokio::sync::Mutex::new(create_mock_auth_service())), channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())), web_adapter: Arc::new(WebChannelAdapter::new()), voice_adapter: Arc::new(VoiceAdapter::new()), #[cfg(any(feature = "research", feature = "llm"))] diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index 65669742..826ba7e1 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -576,6 +576,7 @@ pub async fn create_app_state( map })), response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + hear_channels: Arc::new(std::sync::Mutex::new(HashMap::new())), web_adapter: web_adapter.clone(), voice_adapter: voice_adapter.clone(), #[cfg(any(feature = "research", feature = "llm"))]