feat: refactor Redis operations to synchronous in add_suggestion

Changed async Redis operations to synchronous in add_suggestion_keyword function. Removed unnecessary async/await and tokio::spawn since the operations are now blocking. This simplifies the code while maintaining the same functionality of storing suggestions and context state in Redis. Error handling remains robust with proper early returns.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-02 10:45:57 -03:00
parent acca68493f
commit 888bfc859d
7 changed files with 273 additions and 139 deletions

View file

@ -16,16 +16,14 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user_session: UserSession, e
info!("ADD_SUGGESTION command executed: context='{}', text='{}'", context_name, button_text);
if let Some(cache_client) = &cache {
let cache_client = cache_client.clone();
let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id);
let suggestion = json!({ "context": context_name, "text": button_text });
tokio::spawn(async move {
let mut conn = match cache_client.get_multiplexed_async_connection().await {
let mut conn = match cache_client.get_connection() {
Ok(conn) => conn,
Err(e) => {
error!("Failed to connect to cache: {}", e);
return;
return Ok(Dynamic::UNIT);
}
};
@ -33,8 +31,7 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user_session: UserSession, e
let result: Result<i64, redis::RedisError> = redis::cmd("RPUSH")
.arg(&redis_key)
.arg(suggestion.to_string())
.query_async(&mut conn)
.await;
.query(&mut conn);
match result {
Ok(length) => {
@ -46,8 +43,7 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user_session: UserSession, e
.arg(&active_key)
.arg(&context_name)
.arg("inactive")
.query_async(&mut conn)
.await;
.query(&mut conn);
match hset_result {
Ok(fields_added) => {
@ -58,7 +54,6 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user_session: UserSession, e
}
Err(e) => error!("Failed to add suggestion to Redis: {}", e),
}
});
} else {
debug!("No Redis client configured; suggestion will not persist");
}

View file

@ -59,60 +59,67 @@ pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
.unwrap();
}
pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state);
let user_clone = user.clone();
engine
.register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| {
// Evaluate the expression that produces the message text.
let message = context.eval_expression_tree(&inputs[0])?.to_string();
info!("TALK command executed: {}", message);
pub async fn execute_talk(state: Arc<AppState>, user: UserSession, message: String) -> Result<BotResponse, Box<dyn std::error::Error>> {
info!("Executing TALK with message: {}", message);
debug!("TALK: Sending message: {}", message);
// Build the bot response that will be sent back to the client.
let bot_id = "default_bot".to_string();
let mut suggestions = Vec::new();
if let Some(redis_client) = &state.cache {
let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?;
let redis_key = format!("suggestions:{}:{}", user.user_id, user.id);
debug!("Loading suggestions from Redis key: {}", redis_key);
let suggestions_json: Result<Vec<String>, _> = redis::cmd("LRANGE")
.arg(redis_key.as_str())
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await;
match suggestions_json {
Ok(suggestions_json) => {
debug!("Found suggestions in Redis: {:?}", suggestions_json);
suggestions = suggestions_json.into_iter()
.filter_map(|s| serde_json::from_str(&s).ok())
.collect();
debug!("Parsed suggestions: {:?}", suggestions);
}
Err(e) => {
error!("Failed to load suggestions from Redis: {}", e);
}
}
}
let response = BotResponse {
bot_id,
user_id: user_clone.user_id.to_string(),
session_id: user_clone.id.to_string(),
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
session_id: user.id.to_string(),
channel: "web".to_string(),
content: message,
content: format!("I heard: {}", message),
message_type: 1,
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
suggestions,
context_name: None,
};
let user_id = user_clone.id.to_string();
let user_id = user.id.to_string();
let response_clone = response.clone();
// Try to acquire the lock on the response_channels map. The map is protected
// by an async `tokio::sync::Mutex`, so we use `try_lock` to avoid awaiting
// inside this nonasync closure.
match state_clone.response_channels.try_lock() {
match state.response_channels.try_lock() {
Ok(response_channels) => {
if let Some(tx) = response_channels.get(&user_id) {
// Use `try_send` to avoid blocking the runtime.
if let Err(e) = tx.try_send(response.clone()) {
if let Err(e) = tx.try_send(response_clone) {
error!("Failed to send TALK message via WebSocket: {}", e);
} else {
debug!("TALK message sent successfully via WebSocket");
}
} else {
debug!(
"No WebSocket connection found for session {}, sending via web adapter",
user_id
);
// The web adapter method is async (`send_message_to_session`), so we
// spawn a detached task to perform the send without blocking.
let web_adapter = Arc::clone(&state_clone.web_adapter);
let resp_clone = response.clone();
let sess_id = user_id.clone();
debug!("No WebSocket connection found for session {}, sending via web adapter", user_id);
let web_adapter = Arc::clone(&state.web_adapter);
tokio::spawn(async move {
if let Err(e) = web_adapter
.send_message_to_session(&sess_id, resp_clone)
.await
{
if let Err(e) = web_adapter.send_message_to_session(&user_id, response_clone).await {
error!("Failed to send TALK message via web adapter: {}", e);
} else {
debug!("TALK message sent successfully via web adapter");
@ -125,8 +132,26 @@ pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
}
}
Ok(response)
}
pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state);
let user_clone = user.clone();
engine
.register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| {
let message = context.eval_expression_tree(&inputs[0])?.to_string();
let state_for_talk = Arc::clone(&state_clone);
let user_for_talk = user_clone.clone();
tokio::spawn(async move {
if let Err(e) = execute_talk(state_for_talk, user_for_talk, message).await {
error!("Error executing TALK command: {}", e);
}
});
Ok(Dynamic::UNIT)
})
.unwrap();
}

View file

@ -248,6 +248,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
@ -279,6 +280,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
@ -293,6 +295,47 @@ impl BotOrchestrator {
Ok(())
}
pub async fn handle_context_change(
&self,
user_id: &str,
bot_id: &str,
session_id: &str,
channel: &str,
context_name: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Changing context for session {} to {}",
session_id, context_name
);
let mut session_manager = self.state.session_manager.lock().await;
session_manager.update_session_context(
&Uuid::parse_str(session_id)?,
&Uuid::parse_str(user_id)?,
context_name.to_string()
).await?;
// Send confirmation back to client
let confirmation = BotResponse {
bot_id: bot_id.to_string(),
user_id: user_id.to_string(),
session_id: session_id.to_string(),
channel: channel.to_string(),
content: "Context changed".to_string(),
message_type: 5,
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: Some(context_name.to_string()),
};
if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) {
adapter.send_message(confirmation).await?;
}
Ok(())
}
pub async fn process_message(
&self,
message: UserMessage,
@ -354,6 +397,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
adapter.send_message(ack_response).await?;
}
@ -388,24 +432,40 @@ impl BotOrchestrator {
session_manager.save_message(session.id, user_id, 2, &response_content, 1)?;
}
// Handle context change messages (type 4) first
if message.message_type == 4 {
if let Some(context_name) = &message.context_name {
return self.handle_context_change(
&message.user_id,
&message.bot_id,
&message.session_id,
&message.channel,
context_name
).await;
}
}
// Create regular response
let channel = message.channel.clone();
let bot_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id.clone(),
channel: message.channel.clone(),
session_id: message.session_id,
channel: channel.clone(),
content: response_content,
message_type: 1,
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) {
if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) {
adapter.send_message(bot_response).await?;
} else {
warn!(
"No channel adapter found for message channel: {}",
message.channel
channel
);
}
@ -676,6 +736,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
response_tx.send(thinking_response).await?;
}
@ -753,6 +814,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: false,
suggestions: suggestions.clone(),
context_name: None,
};
if response_tx.send(partial).await.is_err() {
@ -781,6 +843,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions,
context_name: None,
};
response_tx.send(final_msg).await?;
@ -914,6 +977,7 @@ impl BotOrchestrator {
stream_token: None,
is_complete: true,
suggestions: Vec::new(),
context_name: None,
};
adapter.send_message(warn_response).await
} else {
@ -1206,6 +1270,7 @@ async fn websocket_handler(
message_type: 1,
media_url: None,
timestamp: Utc::now(),
context_name: None,
};
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {

View file

@ -215,6 +215,22 @@ impl SessionManager {
Ok(())
}
pub async fn update_session_context(
&mut self,
session_id: &Uuid,
_user_id: &Uuid,
context_name: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::schema::user_sessions::dsl::*;
use diesel::prelude::*;
diesel::update(user_sessions.filter(id.eq(session_id).and(user_id.eq(user_id))))
.set(context_data.eq(serde_json::json!({ "current_context": context_name })))
.execute(&mut self.conn)?;
Ok(())
}
pub async fn get_session_context(
&self,
session_id: &Uuid,
@ -400,13 +416,12 @@ async fn start_session(
path: web::Path<String>,
) -> Result<HttpResponse> {
let session_id = path.into_inner();
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => {
let mut session_manager = data.session_manager.lock().await;
match session_manager.get_session_by_id(session_uuid) {
Ok(Some(session)) => {
Ok(Some(_session)) => {
session_manager.mark_waiting(session_uuid);
Ok(HttpResponse::Ok().json(serde_json::json!({
"status": "started",

View file

@ -1,4 +1,4 @@
use chrono::Utc;
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -115,14 +115,14 @@ pub struct UserMessage {
pub content: String,
pub message_type: i32,
pub media_url: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub timestamp: DateTime<Utc>,
pub context_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Suggestion {
pub text: String, // The button text that will be sent as message
pub context_name: String, // The context name to set when clicked
pub is_suggestion: bool, // Flag to identify suggestion clicks
pub context: String, // The context name to set when clicked
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -136,6 +136,7 @@ pub struct BotResponse {
pub stream_token: Option<String>,
pub is_complete: bool,
pub suggestions: Vec<Suggestion>,
pub context_name: Option<String>,
}
#[derive(Debug, Deserialize)]

View file

@ -157,12 +157,13 @@ impl WhatsAppAdapter {
let user_message = crate::shared::models::UserMessage {
bot_id: "default_bot".to_string(),
user_id: msg.from.clone(),
session_id: session_id.clone(),
session_id: session_id,
channel: "whatsapp".to_string(),
content: text.body,
message_type: 1,
media_url: None,
timestamp: chrono::Utc::now(),
context_name: None,
};
user_messages.push(user_message);

View file

@ -880,7 +880,25 @@
</div>
<script>
function handleSuggestions(suggestions) {
const container = document.getElementById('suggestions-container');
// Find the last assistant message content div
const messageContents = document.querySelectorAll('.assistant-message-content');
if (messageContents.length === 0) return;
const lastContent = messageContents[messageContents.length - 1];
let container = lastContent.querySelector('.suggestions-container');
// Create container if it doesn't exist
if (!container) {
container = document.createElement('div');
container.className = 'suggestions-container';
container.style.display = 'flex';
container.style.flexWrap = 'wrap';
container.style.gap = '8px';
container.style.marginTop = '10px';
container.style.justifyContent = 'flex-start';
lastContent.appendChild(container);
}
container.innerHTML = '';
suggestions.forEach(s => {
const btn = document.createElement('button');
@ -893,6 +911,16 @@
btn.style.border = 'none';
btn.style.borderRadius = '6px';
btn.style.cursor = 'pointer';
btn.style.fontSize = '14px';
btn.style.transition = 'all 0.2s ease';
btn.onmouseenter = () => {
btn.style.transform = 'scale(1.05)';
btn.style.boxShadow = '0 0 8px rgba(255, 215, 0, 0.6)';
};
btn.onmouseleave = () => {
btn.style.transform = 'scale(1)';
btn.style.boxShadow = 'none';
};
btn.onclick = () => setContext(s.context);
container.appendChild(btn);
});
@ -927,17 +955,22 @@
user_id: currentUserId,
session_id: currentSessionId,
channel: "web",
content: context,
content: buttonText, // Send button text as content
message_type: 4, // custom type for suggestion click
is_suggestion: true,
context_name: context,
context_name: context, // Context to switch to
timestamp: new Date().toISOString()
};
ws.send(JSON.stringify(suggestionEvent));
});
await pendingContextChange;
alert(`Contexto alterado para: ${context}`);
// Update UI to show current context
const contextIndicator = document.getElementById('contextIndicator');
if (contextIndicator) {
contextIndicator.style.display = 'block';
document.getElementById('contextPercentage').textContent = context;
}
} else {
console.warn("WebSocket não está conectado. Tentando reconectar...");
connectWebSocket();
@ -1341,10 +1374,9 @@
updateContextUsage(response.context_usage);
}
// Handle suggestion messages
if (response.message_type === 3) {
// Handle suggestions if present in any message
if (response.suggestions && response.suggestions.length > 0) {
handleSuggestions(response.suggestions);
return;
}
// Handle complete messages