diff --git a/src/basic/keywords/add_suggestion.rs b/src/basic/keywords/add_suggestion.rs index 632a4bb6..aeb5ca86 100644 --- a/src/basic/keywords/add_suggestion.rs +++ b/src/basic/keywords/add_suggestion.rs @@ -16,49 +16,44 @@ pub fn add_suggestion_keyword(state: Arc, user_session: UserSession, e info!("ADD_SUGGESTION command executed: context='{}', text='{}'", context_name, button_text); if let Some(cache_client) = &cache { - let cache_client = cache_client.clone(); let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id); let suggestion = json!({ "context": context_name, "text": button_text }); - tokio::spawn(async move { - let mut conn = match cache_client.get_multiplexed_async_connection().await { - Ok(conn) => conn, - Err(e) => { - error!("Failed to connect to cache: {}", e); - return; - } - }; - - // Append suggestion to Redis list - RPUSH returns the new length as i64 - let result: Result = redis::cmd("RPUSH") - .arg(&redis_key) - .arg(suggestion.to_string()) - .query_async(&mut conn) - .await; - - match result { - Ok(length) => { - debug!("Suggestion added successfully to Redis key {}, new length: {}", redis_key, length); - - // Also register context as inactive initially - let active_key = format!("active_context:{}:{}", user_session.user_id, user_session.id); - let hset_result: Result = redis::cmd("HSET") - .arg(&active_key) - .arg(&context_name) - .arg("inactive") - .query_async(&mut conn) - .await; - - match hset_result { - Ok(fields_added) => { - debug!("Context state set to inactive for {}, fields added: {}", context_name, fields_added) - }, - Err(e) => error!("Failed to set context state: {}", e), - } - } - Err(e) => error!("Failed to add suggestion to Redis: {}", e), + let mut conn = match cache_client.get_connection() { + Ok(conn) => conn, + Err(e) => { + error!("Failed to connect to cache: {}", e); + return Ok(Dynamic::UNIT); } - }); + }; + + // Append suggestion to Redis list - RPUSH returns the new length as i64 + let result: Result = redis::cmd("RPUSH") + .arg(&redis_key) + .arg(suggestion.to_string()) + .query(&mut conn); + + match result { + Ok(length) => { + debug!("Suggestion added successfully to Redis key {}, new length: {}", redis_key, length); + + // Also register context as inactive initially + let active_key = format!("active_context:{}:{}", user_session.user_id, user_session.id); + let hset_result: Result = redis::cmd("HSET") + .arg(&active_key) + .arg(&context_name) + .arg("inactive") + .query(&mut conn); + + match hset_result { + Ok(fields_added) => { + debug!("Context state set to inactive for {}, fields added: {}", context_name, fields_added) + }, + Err(e) => error!("Failed to set context state: {}", e), + } + } + Err(e) => error!("Failed to add suggestion to Redis: {}", e), + } } else { debug!("No Redis client configured; suggestion will not persist"); } @@ -66,4 +61,4 @@ pub fn add_suggestion_keyword(state: Arc, user_session: UserSession, e Ok(Dynamic::UNIT) }) .unwrap(); -} \ No newline at end of file +} diff --git a/src/basic/keywords/hear_talk.rs b/src/basic/keywords/hear_talk.rs index 53c045bf..258b949a 100644 --- a/src/basic/keywords/hear_talk.rs +++ b/src/basic/keywords/hear_talk.rs @@ -59,74 +59,99 @@ pub fn hear_keyword(state: Arc, user: UserSession, engine: &mut Engine .unwrap(); } +pub async fn execute_talk(state: Arc, user: UserSession, message: String) -> Result> { + info!("Executing TALK with message: {}", message); + debug!("TALK: Sending message: {}", message); + + let mut suggestions = Vec::new(); + + if let Some(redis_client) = &state.cache { + let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?; + + let redis_key = format!("suggestions:{}:{}", user.user_id, user.id); + debug!("Loading suggestions from Redis key: {}", redis_key); + let suggestions_json: Result, _> = redis::cmd("LRANGE") + .arg(redis_key.as_str()) + .arg(0) + .arg(-1) + .query_async(&mut conn) + .await; + + match suggestions_json { + Ok(suggestions_json) => { + debug!("Found suggestions in Redis: {:?}", suggestions_json); + suggestions = suggestions_json.into_iter() + .filter_map(|s| serde_json::from_str(&s).ok()) + .collect(); + debug!("Parsed suggestions: {:?}", suggestions); + } + Err(e) => { + error!("Failed to load suggestions from Redis: {}", e); + } + } + } + + let response = BotResponse { + bot_id: "default_bot".to_string(), + user_id: "default_user".to_string(), + session_id: user.id.to_string(), + channel: "web".to_string(), + content: format!("I heard: {}", message), + message_type: 1, + stream_token: None, + is_complete: true, + suggestions, + context_name: None, + }; + + let user_id = user.id.to_string(); + let response_clone = response.clone(); + + match state.response_channels.try_lock() { + Ok(response_channels) => { + if let Some(tx) = response_channels.get(&user_id) { + if let Err(e) = tx.try_send(response_clone) { + error!("Failed to send TALK message via WebSocket: {}", e); + } else { + debug!("TALK message sent successfully via WebSocket"); + } + } else { + debug!("No WebSocket connection found for session {}, sending via web adapter", user_id); + let web_adapter = Arc::clone(&state.web_adapter); + tokio::spawn(async move { + if let Err(e) = web_adapter.send_message_to_session(&user_id, response_clone).await { + error!("Failed to send TALK message via web adapter: {}", e); + } else { + debug!("TALK message sent successfully via web adapter"); + } + }); + } + } + Err(_) => { + error!("Failed to acquire lock on response_channels for TALK command"); + } + } + + Ok(response) +} + pub fn talk_keyword(state: Arc, user: UserSession, engine: &mut Engine) { let state_clone = Arc::clone(&state); let user_clone = user.clone(); engine .register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| { - // Evaluate the expression that produces the message text. let message = context.eval_expression_tree(&inputs[0])?.to_string(); - info!("TALK command executed: {}", message); - debug!("TALK: Sending message: {}", message); + let state_for_talk = Arc::clone(&state_clone); + let user_for_talk = user_clone.clone(); - // Build the bot response that will be sent back to the client. - let bot_id = "default_bot".to_string(); - let response = BotResponse { - bot_id, - user_id: user_clone.user_id.to_string(), - session_id: user_clone.id.to_string(), - channel: "web".to_string(), - content: message, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - }; - - let user_id = user_clone.id.to_string(); - - // Try to acquire the lock on the response_channels map. The map is protected - // by an async `tokio::sync::Mutex`, so we use `try_lock` to avoid awaiting - // inside this non‑async closure. - match state_clone.response_channels.try_lock() { - Ok(response_channels) => { - if let Some(tx) = response_channels.get(&user_id) { - // Use `try_send` to avoid blocking the runtime. - if let Err(e) = tx.try_send(response.clone()) { - error!("Failed to send TALK message via WebSocket: {}", e); - } else { - debug!("TALK message sent successfully via WebSocket"); - } - } else { - debug!( - "No WebSocket connection found for session {}, sending via web adapter", - user_id - ); - // The web adapter method is async (`send_message_to_session`), so we - // spawn a detached task to perform the send without blocking. - let web_adapter = Arc::clone(&state_clone.web_adapter); - let resp_clone = response.clone(); - let sess_id = user_id.clone(); - tokio::spawn(async move { - if let Err(e) = web_adapter - .send_message_to_session(&sess_id, resp_clone) - .await - { - error!("Failed to send TALK message via web adapter: {}", e); - } else { - debug!("TALK message sent successfully via web adapter"); - } - }); - } + tokio::spawn(async move { + if let Err(e) = execute_talk(state_for_talk, user_for_talk, message).await { + error!("Error executing TALK command: {}", e); } - Err(_) => { - error!("Failed to acquire lock on response_channels for TALK command"); - } - } + }); Ok(Dynamic::UNIT) }) .unwrap(); } - diff --git a/src/bot/mod.rs b/src/bot/mod.rs index e94cf1f8..2e09821d 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -248,6 +248,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { @@ -279,6 +280,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { @@ -293,6 +295,47 @@ impl BotOrchestrator { Ok(()) } + pub async fn handle_context_change( + &self, + user_id: &str, + bot_id: &str, + session_id: &str, + channel: &str, + context_name: &str, + ) -> Result<(), Box> { + info!( + "Changing context for session {} to {}", + session_id, context_name + ); + + let mut session_manager = self.state.session_manager.lock().await; + session_manager.update_session_context( + &Uuid::parse_str(session_id)?, + &Uuid::parse_str(user_id)?, + context_name.to_string() + ).await?; + + // Send confirmation back to client + let confirmation = BotResponse { + bot_id: bot_id.to_string(), + user_id: user_id.to_string(), + session_id: session_id.to_string(), + channel: channel.to_string(), + content: "Context changed".to_string(), + message_type: 5, + stream_token: None, + is_complete: true, + suggestions: Vec::new(), + context_name: Some(context_name.to_string()), + }; + + if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { + adapter.send_message(confirmation).await?; + } + + Ok(()) + } + pub async fn process_message( &self, message: UserMessage, @@ -354,6 +397,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; adapter.send_message(ack_response).await?; } @@ -388,27 +432,43 @@ impl BotOrchestrator { session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; } - let bot_response = BotResponse { - bot_id: message.bot_id, - user_id: message.user_id, - session_id: message.session_id.clone(), - channel: message.channel.clone(), - content: response_content, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - }; - - if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { - adapter.send_message(bot_response).await?; - } else { - warn!( - "No channel adapter found for message channel: {}", - message.channel - ); + // Handle context change messages (type 4) first + if message.message_type == 4 { + if let Some(context_name) = &message.context_name { + return self.handle_context_change( + &message.user_id, + &message.bot_id, + &message.session_id, + &message.channel, + context_name + ).await; + } } + // Create regular response +let channel = message.channel.clone(); +let bot_response = BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id, + channel: channel.clone(), + content: response_content, + message_type: 1, + stream_token: None, + is_complete: true, + suggestions: Vec::new(), + context_name: None, +}; + +if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { + adapter.send_message(bot_response).await?; +} else { + warn!( + "No channel adapter found for message channel: {}", + channel + ); +} + Ok(()) } @@ -676,6 +736,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; response_tx.send(thinking_response).await?; } @@ -753,6 +814,7 @@ impl BotOrchestrator { stream_token: None, is_complete: false, suggestions: suggestions.clone(), + context_name: None, }; if response_tx.send(partial).await.is_err() { @@ -781,6 +843,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions, + context_name: None, }; response_tx.send(final_msg).await?; @@ -914,6 +977,7 @@ impl BotOrchestrator { stream_token: None, is_complete: true, suggestions: Vec::new(), + context_name: None, }; adapter.send_message(warn_response).await } else { @@ -1206,6 +1270,7 @@ async fn websocket_handler( message_type: 1, media_url: None, timestamp: Utc::now(), + context_name: None, }; if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { diff --git a/src/session/mod.rs b/src/session/mod.rs index 17eb7b5d..53fd427c 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -215,6 +215,22 @@ impl SessionManager { Ok(()) } + pub async fn update_session_context( + &mut self, + session_id: &Uuid, + _user_id: &Uuid, + context_name: String, + ) -> Result<(), Box> { + use crate::shared::models::schema::user_sessions::dsl::*; + use diesel::prelude::*; + + diesel::update(user_sessions.filter(id.eq(session_id).and(user_id.eq(user_id)))) + .set(context_data.eq(serde_json::json!({ "current_context": context_name }))) + .execute(&mut self.conn)?; + + Ok(()) + } + pub async fn get_session_context( &self, session_id: &Uuid, @@ -400,13 +416,12 @@ async fn start_session( path: web::Path, ) -> Result { let session_id = path.into_inner(); - let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); match Uuid::parse_str(&session_id) { Ok(session_uuid) => { let mut session_manager = data.session_manager.lock().await; match session_manager.get_session_by_id(session_uuid) { - Ok(Some(session)) => { + Ok(Some(_session)) => { session_manager.mark_waiting(session_uuid); Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "started", diff --git a/src/shared/models.rs b/src/shared/models.rs index 7ef9d34e..635e4a2c 100644 --- a/src/shared/models.rs +++ b/src/shared/models.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{DateTime, Utc}; use diesel::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -115,14 +115,14 @@ pub struct UserMessage { pub content: String, pub message_type: i32, pub media_url: Option, - pub timestamp: chrono::DateTime, + pub timestamp: DateTime, + pub context_name: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Suggestion { pub text: String, // The button text that will be sent as message - pub context_name: String, // The context name to set when clicked - pub is_suggestion: bool, // Flag to identify suggestion clicks + pub context: String, // The context name to set when clicked } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -136,6 +136,7 @@ pub struct BotResponse { pub stream_token: Option, pub is_complete: bool, pub suggestions: Vec, + pub context_name: Option, } #[derive(Debug, Deserialize)] diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index 84a51c32..6ff033d0 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -154,16 +154,17 @@ impl WhatsAppAdapter { if let Some(text) = msg.text { let session_id = self.get_session_id(&msg.from).await; - let user_message = crate::shared::models::UserMessage { - bot_id: "default_bot".to_string(), - user_id: msg.from.clone(), - session_id: session_id.clone(), - channel: "whatsapp".to_string(), - content: text.body, - message_type: 1, - media_url: None, - timestamp: chrono::Utc::now(), - }; + let user_message = crate::shared::models::UserMessage { + bot_id: "default_bot".to_string(), + user_id: msg.from.clone(), + session_id: session_id, + channel: "whatsapp".to_string(), + content: text.body, + message_type: 1, + media_url: None, + timestamp: chrono::Utc::now(), + context_name: None, + }; user_messages.push(user_message); } diff --git a/web/html/index.html b/web/html/index.html index 9e2492e3..a17dbb0a 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -880,7 +880,25 @@