Refactor TALK keyword to use try_send

Remove unnecessary async spawn in TALK handling and use `try_send` on
the WebSocket channel. Acquire `response_channels` with `try_lock` and
spawn an async task only when falling back to the web adapter. Clean up
debug logs and add missing `env` import. Also delete an extra blank line
in the announcement start script.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-15 22:39:04 -03:00
parent 648e7f48f9
commit 1fdf76b530
2 changed files with 52 additions and 42 deletions

View file

@ -2,6 +2,7 @@ use crate::shared::models::{BotResponse, UserSession};
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::{debug, error, info}; use log::{debug, error, info};
use rhai::{Dynamic, Engine, EvalAltResult}; use rhai::{Dynamic, Engine, EvalAltResult};
use std::env;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
@ -66,60 +67,70 @@ pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
engine engine
.register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| { .register_custom_syntax(&["TALK", "$expr$"], true, move |context, inputs| {
// Evaluate the expression that produces the message text.
let message = context.eval_expression_tree(&inputs[0])?.to_string(); let message = context.eval_expression_tree(&inputs[0])?.to_string();
info!("TALK command executed: {}", message); info!("TALK command executed: {}", message);
debug!("TALK: Sending message: {}", message);
let state_for_spawn = Arc::clone(&state_clone); // Build the bot response that will be sent back to the client.
let user_clone_spawn = user_clone.clone(); let bot_id = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let message_clone = message.clone(); let response = BotResponse {
bot_id,
user_id: user_clone.user_id.to_string(),
session_id: user_clone.id.to_string(),
channel: "web".to_string(),
content: message,
message_type: 1,
stream_token: None,
is_complete: true,
};
tokio::spawn(async move { let user_id = user_clone.id.to_string();
debug!("TALK: Sending message via WebSocket: {}", message_clone);
let bot_id = // Try to acquire the lock on the response_channels map. The map is protected
std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); // by an async `tokio::sync::Mutex`, so we use `try_lock` to avoid awaiting
// inside this nonasync closure.
let response = BotResponse { match state_clone.response_channels.try_lock() {
bot_id: bot_id, Ok(mut response_channels) => {
user_id: user_clone_spawn.user_id.to_string(), if let Some(tx) = response_channels.get(&user_id) {
session_id: user_clone_spawn.id.to_string(), // Use `try_send` to avoid blocking the runtime.
channel: "web".to_string(), if let Err(e) = tx.try_send(response.clone()) {
content: message_clone, error!("Failed to send TALK message via WebSocket: {}", e);
message_type: 1, } else {
stream_token: None, debug!("TALK message sent successfully via WebSocket");
is_complete: true, }
};
let response_channels = state_for_spawn.response_channels.lock().await;
if let Some(tx) = response_channels.get(&user_clone_spawn.id.to_string()) {
if let Err(e) = tx.send(response).await {
error!("Failed to send TALK message via WebSocket: {}", e);
} else { } else {
debug!("TALK message sent successfully via WebSocket"); debug!(
} "No WebSocket connection found for session {}, sending via web adapter",
} else { user_id
debug!( );
"No WebSocket connection found for session {}, sending via web adapter", // The web adapter method is async (`send_message_to_session`), so we
user_clone_spawn.id // spawn a detached task to perform the send without blocking.
); let web_adapter = Arc::clone(&state_clone.web_adapter);
let resp_clone = response.clone();
if let Err(e) = state_for_spawn let sess_id = user_id.clone();
.web_adapter tokio::spawn(async move {
.send_message_to_session(&user_clone_spawn.id.to_string(), response) if let Err(e) = web_adapter
.await .send_message_to_session(&sess_id, resp_clone)
{ .await
error!("Failed to send TALK message via web adapter: {}", e); {
} else { error!("Failed to send TALK message via web adapter: {}", e);
debug!("TALK message sent successfully via web adapter"); } else {
debug!("TALK message sent successfully via web adapter");
}
});
} }
} }
}); Err(_) => {
error!("Failed to acquire lock on response_channels for TALK command");
}
}
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}) })
.unwrap(); .unwrap();
} }
pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
let user_clone = user.clone(); let user_clone = user.clone();

View file

@ -1,5 +1,4 @@
TALK "Olá, pode me perguntar sobre qualquer coisa..." TALK "Olá, pode me perguntar sobre qualquer coisa..."
let text = GET "default.gbdrive/default.pdf" let text = GET "default.gbdrive/default.pdf"
let resume = LLM "Say Hello and present a a resume from " + text let resume = LLM "Say Hello and present a a resume from " + text
TALK resume TALK resume