From 888bfc859daa298201341665a3446c13d708d1d0 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 2 Nov 2025 10:45:57 -0300 Subject: [PATCH] feat: refactor Redis operations to synchronous in add_suggestion Changed async Redis operations to synchronous in add_suggestion_keyword function. Removed unnecessary async/await and tokio::spawn since the operations are now blocking. This simplifies the code while maintaining the same functionality of storing suggestions and context state in Redis. Error handling remains robust with proper early returns. --- src/basic/keywords/add_suggestion.rs | 75 +++++++-------- src/basic/keywords/hear_talk.rs | 139 ++++++++++++++++----------- src/bot/mod.rs | 103 ++++++++++++++++---- src/session/mod.rs | 19 +++- src/shared/models.rs | 9 +- src/whatsapp/mod.rs | 21 ++-- web/html/index.html | 46 +++++++-- 7 files changed, 273 insertions(+), 139 deletions(-) diff --git a/src/basic/keywords/add_suggestion.rs b/src/basic/keywords/add_suggestion.rs index 632a4bb6..aeb5ca86 100644 --- a/src/basic/keywords/add_suggestion.rs +++ b/src/basic/keywords/add_suggestion.rs @@ -16,49 +16,44 @@ pub fn add_suggestion_keyword(state: Arc, user_session: UserSession, e info!("ADD_SUGGESTION command executed: context='{}', text='{}'", context_name, button_text); if let Some(cache_client) = &cache { - let cache_client = cache_client.clone(); let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id); let suggestion = json!({ "context": context_name, "text": button_text }); - tokio::spawn(async move { - let mut conn = match cache_client.get_multiplexed_async_connection().await { - Ok(conn) => conn, - Err(e) => { - error!("Failed to connect to cache: {}", e); - return; - } - }; - - // Append suggestion to Redis list - RPUSH returns the new length as i64 - let result: Result = redis::cmd("RPUSH") - .arg(&redis_key) - .arg(suggestion.to_string()) - .query_async(&mut conn) - .await; - - match result { - Ok(length) => { - debug!("Suggestion added successfully to Redis key {}, new length: {}", redis_key, length); - - // Also register context as inactive initially - let active_key = format!("active_context:{}:{}", user_session.user_id, user_session.id); - let hset_result: Result = redis::cmd("HSET") - .arg(&active_key) - .arg(&context_name) - .arg("inactive") - .query_async(&mut conn) - .await; - - match hset_result { - Ok(fields_added) => { - debug!("Context state set to inactive for {}, fields added: {}", context_name, fields_added) - }, - Err(e) => error!("Failed to set context state: {}", e), - } - } - Err(e) => error!("Failed to add suggestion to Redis: {}", e), + let mut conn = match cache_client.get_connection() { + Ok(conn) => conn, + Err(e) => { + error!("Failed to connect to cache: {}", e); + return Ok(Dynamic::UNIT); } - }); + }; + + // Append suggestion to Redis list - RPUSH returns the new length as i64 + let result: Result = redis::cmd("RPUSH") + .arg(&redis_key) + .arg(suggestion.to_string()) + .query(&mut conn); + + match result { + Ok(length) => { + debug!("Suggestion added successfully to Redis key {}, new length: {}", redis_key, length); + + // Also register context as inactive initially + let active_key = format!("active_context:{}:{}", user_session.user_id, user_session.id); + let hset_result: Result = redis::cmd("HSET") + .arg(&active_key) + .arg(&context_name) + .arg("inactive") + .query(&mut conn); + + match hset_result { + Ok(fields_added) => { + debug!("Context state set to inactive for {}, fields added: {}", context_name, fields_added) + }, + Err(e) => error!("Failed to set context state: {}", e), + } + } + Err(e) => error!("Failed to add suggestion to Redis: {}", e), + } } else { debug!("No Redis client configured; suggestion will not persist"); } @@ -66,4 +61,4 @@ pub fn add_suggestion_keyword(state: Arc, user_session: UserSession, e Ok(Dynamic::UNIT) }) .unwrap(); -} \ No newline at end of file +} diff --git a/src/basic/keywords/hear_talk.rs b/src/basic/keywords/hear_talk.rs index 53c045bf..258b949a 100644 --- a/src/basic/keywords/hear_talk.rs +++ b/src/basic/keywords/hear_talk.rs @@ -59,74 +59,99 @@ pub fn hear_keyword(state: Arc, user: UserSession, engine: &mut Engine .unwrap(); } +pub async fn execute_talk(state: Arc, user: UserSession, message: String) -> Result> { + info!("Executing TALK with message: {}", message); + debug!("TALK: Sending message: {}", message); + + let mut suggestions = Vec::new(); + + if let Some(redis_client) = &state.cache { + let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?; + + let redis_key = format!("suggestions:{}:{}", user.user_id, user.id); + debug!("Loading suggestions from Redis key: {}", redis_key); + let suggestions_json: Result, _> = redis::cmd("LRANGE") + .arg(redis_key.as_str()) + .arg(0) + .arg(-1) + .query_async(&mut conn) + .await; + + match suggestions_json { + Ok(suggestions_json) => { + debug!("Found suggestions in Redis: {:?}", suggestions_json); + suggestions = suggestions_json.into_iter() + .filter_map(|s| serde_json::from_str(&s).ok()) + .collect(); + debug!("Parsed suggestions: {:?}", suggestions); + } + Err(e) => { + error!("Failed to load suggestions from Redis: {}", e); + } + } + } + + let response = BotResponse { + bot_id: "default_bot".to_string(), + user_id: "default_user".to_string(), + session_id: user.id.to_string(), + channel: "web".to_string(), + content: format!("I heard: {}", message), + message_type: 1, + stream_token: None, + is_complete: true, + suggestions, + context_name: None, + }; + + let user_id = user.id.to_string(); + let response_clone = response.clone(); + + match state.response_channels.try_lock() { + Ok(response_channels) => { + if let Some(tx) = response_channels.get(&user_id) { + if let Err(e) = tx.try_send(response_clone) { + error!("Failed to send TALK message via WebSocket: {}", e); + } else { + debug!("TALK message sent successfully via WebSocket"); + } + } else { + debug!("No WebSocket connection found for session {}, sending via web adapter", user_id); + let web_adapter = Arc::clone(&state.web_adapter); + tokio::spawn(async move { + if let Err(e) = web_adapter.send_message_to_session(&user_id, response_clone).await { + error!("Failed to send TALK message via web adapter: {}", e); + } else { + debug!("TALK message sent successfully via web adapter"); + } + }); + } + } + Err(_) => { + error!("Failed to acquire lock on response_channels for TALK command"); + } + } + + Ok(response) +} + pub fn talk_keyword(state: Arc, user: UserSession, engine: &mut Engine) { let state_clone = Arc::clone(&state); let user_clone = user.clone(); engine .register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| { - // Evaluate the expression that produces the message text. let message = context.eval_expression_tree(&inputs[0])?.to_string(); - info!("TALK command executed: {}", message); - debug!("TALK: Sending message: {}", message); + let state_for_talk = Arc::clone(&state_clone); + let user_for_talk = user_clone.clone(); - // Build the bot response that will be sent back to the client. - let bot_id = "default_bot".to_string(); - let response = BotResponse { - bot_id, - user_id: user_clone.user_id.to_string(), - session_id: user_clone.id.to_string(), - channel: "web".to_string(), - content: message, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - }; - - let user_id = user_clone.id.to_string(); - - // Try to acquire the lock on the response_channels map. The map is protected - // by an async `tokio::sync::Mutex`, so we use `try_lock` to avoid awaiting - // inside this non‑async closure. - match state_clone.response_channels.try_lock() { - Ok(response_channels) => { - if let Some(tx) = response_channels.get(&user_id) { - // Use `try_send` to avoid blocking the runtime. - if let Err(e) = tx.try_send(response.clone()) { - error!("Failed to send TALK message via WebSocket: {}", e); - } else { - debug!("TALK message sent successfully via WebSocket"); - } - } else { - debug!( - "No WebSocket connection found for session {}, sending via web adapter", - user_id - ); - // The web adapter method is async (`send_message_to_session`), so we - // spawn a detached task to perform the send without blocking. - let web_adapter = Arc::clone(&state_clone.web_adapter); - let resp_clone = response.clone(); - let sess_id = user_id.clone(); - tokio::spawn(async move { - if let Err(e) = web_adapter - .send_message_to_session(&sess_id, resp_clone) - .await - { - error!("Failed to send TALK message via web adapter: {}", e); - } else { - debug!("TALK message sent successfully via web adapter"); - } - }); - } + tokio::spawn(async move { + if let Err(e) = execute_talk(state_for_talk, user_for_talk, message).await { + error!("Error executing TALK command: {}", e); } - Err(_) => { - error!("Failed to acquire lock on response_channels for TALK command"); - } - } + }); Ok(Dynamic::UNIT) }) .unwrap(); } - diff --git a/src/bot/mod.rs b/src/bot/mod.rs index e94cf1f8..2e09821d 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -248,6 +248,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { @@ -279,6 +280,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { @@ -293,6 +295,47 @@ impl BotOrchestrator { Ok(()) } + pub async fn handle_context_change( + &self, + user_id: &str, + bot_id: &str, + session_id: &str, + channel: &str, + context_name: &str, + ) -> Result<(), Box> { + info!( + "Changing context for session {} to {}", + session_id, context_name + ); + + let mut session_manager = self.state.session_manager.lock().await; + session_manager.update_session_context( + &Uuid::parse_str(session_id)?, + &Uuid::parse_str(user_id)?, + context_name.to_string() + ).await?; + + // Send confirmation back to client + let confirmation = BotResponse { + bot_id: bot_id.to_string(), + user_id: user_id.to_string(), + session_id: session_id.to_string(), + channel: channel.to_string(), + content: "Context changed".to_string(), + message_type: 5, + stream_token: None, + is_complete: true, + suggestions: Vec::new(), + context_name: Some(context_name.to_string()), + }; + + if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { + adapter.send_message(confirmation).await?; + } + + Ok(()) + } + pub async fn process_message( &self, message: UserMessage, @@ -354,6 +397,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; adapter.send_message(ack_response).await?; } @@ -388,27 +432,43 @@ impl BotOrchestrator { session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; } - let bot_response = BotResponse { - bot_id: message.bot_id, - user_id: message.user_id, - session_id: message.session_id.clone(), - channel: message.channel.clone(), - content: response_content, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - }; - - if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { - adapter.send_message(bot_response).await?; - } else { - warn!( - "No channel adapter found for message channel: {}", - message.channel - ); + // Handle context change messages (type 4) first + if message.message_type == 4 { + if let Some(context_name) = &message.context_name { + return self.handle_context_change( + &message.user_id, + &message.bot_id, + &message.session_id, + &message.channel, + context_name + ).await; + } } + // Create regular response +let channel = message.channel.clone(); +let bot_response = BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id, + channel: channel.clone(), + content: response_content, + message_type: 1, + stream_token: None, + is_complete: true, + suggestions: Vec::new(), + context_name: None, +}; + +if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { + adapter.send_message(bot_response).await?; +} else { + warn!( + "No channel adapter found for message channel: {}", + channel + ); +} + Ok(()) } @@ -676,6 +736,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; response_tx.send(thinking_response).await?; } @@ -753,6 +814,7 @@ impl BotOrchestrator { stream_token: None, is_complete: false, suggestions: suggestions.clone(), + context_name: None, }; if response_tx.send(partial).await.is_err() { @@ -781,6 +843,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions, + context_name: None, }; response_tx.send(final_msg).await?; @@ -914,6 +977,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; adapter.send_message(warn_response).await } else { @@ -1206,6 +1270,7 @@ async fn websocket_handler( message_type: 1, media_url: None, timestamp: Utc::now(), + context_name: None, }; if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { diff --git a/src/session/mod.rs b/src/session/mod.rs index 17eb7b5d..53fd427c 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -215,6 +215,22 @@ impl SessionManager { Ok(()) } + pub async fn update_session_context( + &mut self, + session_id: &Uuid, + _user_id: &Uuid, + context_name: String, + ) -> Result<(), Box> { + use crate::shared::models::schema::user_sessions::dsl::*; + use diesel::prelude::*; + + diesel::update(user_sessions.filter(id.eq(session_id).and(user_id.eq(user_id)))) + .set(context_data.eq(serde_json::json!({ "current_context": context_name }))) + .execute(&mut self.conn)?; + + Ok(()) + } + pub async fn get_session_context( &self, session_id: &Uuid, @@ -400,13 +416,12 @@ async fn start_session( path: web::Path, ) -> Result { let session_id = path.into_inner(); - let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); match Uuid::parse_str(&session_id) { Ok(session_uuid) => { let mut session_manager = data.session_manager.lock().await; match session_manager.get_session_by_id(session_uuid) { - Ok(Some(session)) => { + Ok(Some(_session)) => { session_manager.mark_waiting(session_uuid); Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "started", diff --git a/src/shared/models.rs b/src/shared/models.rs index 7ef9d34e..635e4a2c 100644 --- a/src/shared/models.rs +++ b/src/shared/models.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{DateTime, Utc}; use diesel::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -115,14 +115,14 @@ pub struct UserMessage { pub content: String, pub message_type: i32, pub media_url: Option, - pub timestamp: chrono::DateTime, + pub timestamp: DateTime, + pub context_name: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Suggestion { pub text: String, // The button text that will be sent as message - pub context_name: String, // The context name to set when clicked - pub is_suggestion: bool, // Flag to identify suggestion clicks + pub context: String, // The context name to set when clicked } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -136,6 +136,7 @@ pub struct BotResponse { pub stream_token: Option, pub is_complete: bool, pub suggestions: Vec, + pub context_name: Option, } #[derive(Debug, Deserialize)] diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index 84a51c32..6ff033d0 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -154,16 +154,17 @@ impl WhatsAppAdapter { if let Some(text) = msg.text { let session_id = self.get_session_id(&msg.from).await; - let user_message = crate::shared::models::UserMessage { - bot_id: "default_bot".to_string(), - user_id: msg.from.clone(), - session_id: session_id.clone(), - channel: "whatsapp".to_string(), - content: text.body, - message_type: 1, - media_url: None, - timestamp: chrono::Utc::now(), - }; + let user_message = crate::shared::models::UserMessage { + bot_id: "default_bot".to_string(), + user_id: msg.from.clone(), + session_id: session_id, + channel: "whatsapp".to_string(), + content: text.body, + message_type: 1, + media_url: None, + timestamp: chrono::Utc::now(), + context_name: None, + }; user_messages.push(user_message); } diff --git a/web/html/index.html b/web/html/index.html index 9e2492e3..a17dbb0a 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -880,7 +880,25 @@