Fix WhatsApp streaming to send complete messages

- Accumulate all content before sending (no chunking)
- Only send when is_final = true
- Fixes list (li/ul) handling - lists sent as one complete message

- Improves WhatsApp user experience by sending complete formatted responses
- Removes complex chunked logic in favor of simplicity

🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-06 18:52:53 -03:00
parent 859db6b8a0
commit 85b4653899

View file

@ -1,10 +1,11 @@
use crate::core::bot::BotOrchestrator; use crate::core::bot::BotOrchestrator;
use crate::core::bot::channels::whatsapp::WhatsAppAdapter; use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
use crate::core::bot::channels::ChannelAdapter; use crate::core::bot::channels::ChannelAdapter;
use crate::core::config::ConfigManager;
use crate::core::shared::models::{BotResponse, UserMessage, UserSession}; use crate::core::shared::models::{BotResponse, UserMessage, UserSession};
use crate::core::shared::state::{AppState, AttendantNotification}; use crate::core::shared::state::{AppState, AttendantNotification};
use axum::{ use axum::{
extract::{Query, State}, extract::{Path, Query, State},
http::StatusCode, http::StatusCode,
response::IntoResponse, response::IntoResponse,
routing::{get, post}, routing::{get, post},
@ -172,16 +173,17 @@ pub struct WhatsAppStatus {
pub fn configure() -> Router<Arc<AppState>> { pub fn configure() -> Router<Arc<AppState>> {
Router::new() Router::new()
.route("/webhook/whatsapp", get(verify_webhook)) .route("/webhook/whatsapp/:bot_id", get(verify_webhook))
.route("/webhook/whatsapp", post(handle_webhook)) .route("/webhook/whatsapp/:bot_id", post(handle_webhook))
.route("/api/whatsapp/send", post(send_message)) .route("/api/whatsapp/send", post(send_message))
} }
pub async fn verify_webhook( pub async fn verify_webhook(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Path(bot_id): Path<Uuid>,
Query(params): Query<WebhookVerifyQuery>, Query(params): Query<WebhookVerifyQuery>,
) -> impl IntoResponse { ) -> impl IntoResponse {
info!("WhatsApp webhook verification request received"); info!("WhatsApp webhook verification request received for bot {}", bot_id);
let mode = params.mode.unwrap_or_default(); let mode = params.mode.unwrap_or_default();
let token = params.verify_token.unwrap_or_default(); let token = params.verify_token.unwrap_or_default();
@ -192,22 +194,23 @@ pub async fn verify_webhook(
return (StatusCode::FORBIDDEN, "Invalid mode".to_string()); return (StatusCode::FORBIDDEN, "Invalid mode".to_string());
} }
let expected_token = get_verify_token(&state).await; let expected_token = get_verify_token_for_bot(&state, &bot_id).await;
if token == expected_token { if token == expected_token {
info!("Webhook verification successful"); info!("Webhook verification successful for bot {}", bot_id);
(StatusCode::OK, challenge) (StatusCode::OK, challenge)
} else { } else {
warn!("Invalid verify token"); warn!("Invalid verify token for bot {}", bot_id);
(StatusCode::FORBIDDEN, "Invalid verify token".to_string()) (StatusCode::FORBIDDEN, "Invalid verify token".to_string())
} }
} }
pub async fn handle_webhook( pub async fn handle_webhook(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Path(bot_id): Path<Uuid>,
Json(payload): Json<WhatsAppWebhook>, Json(payload): Json<WhatsAppWebhook>,
) -> impl IntoResponse { ) -> impl IntoResponse {
info!("WhatsApp webhook received: {:?}", payload.object); info!("WhatsApp webhook received for bot {}: {:?}", bot_id, payload.object);
if payload.object != "whatsapp_business_account" { if payload.object != "whatsapp_business_account" {
return StatusCode::OK; return StatusCode::OK;
@ -216,20 +219,24 @@ pub async fn handle_webhook(
for entry in payload.entry { for entry in payload.entry {
for change in entry.changes { for change in entry.changes {
if change.field == "messages" { if change.field == "messages" {
debug!("Processing 'messages' field change for bot {}", bot_id);
let contact = change.value.contacts.first(); let contact = change.value.contacts.first();
let contact_name = contact.map(|c| c.profile.name.clone()); let contact_name = contact.map(|c| c.profile.name.clone());
let contact_phone = contact.map(|c| c.wa_id.clone()); let contact_phone = contact.map(|c| c.wa_id.clone());
debug!("Number of messages in webhook: {}", change.value.messages.len());
for message in change.value.messages { for message in change.value.messages {
debug!("Message ID: {}, Type: {}, From: {}", message.id, message.message_type, message.from);
if let Err(e) = process_incoming_message( if let Err(e) = process_incoming_message(
state.clone(), state.clone(),
&bot_id,
&message, &message,
contact_name.clone(), contact_name.clone(),
contact_phone.clone(), contact_phone.clone(),
) )
.await .await
{ {
error!("Failed to process WhatsApp message: {}", e); error!("Failed to process WhatsApp message for bot {}: {}", bot_id, e);
} }
} }
@ -248,6 +255,7 @@ pub async fn handle_webhook(
async fn process_incoming_message( async fn process_incoming_message(
state: Arc<AppState>, state: Arc<AppState>,
bot_id: &Uuid,
message: &WhatsAppMessage, message: &WhatsAppMessage,
contact_name: Option<String>, contact_name: Option<String>,
contact_phone: Option<String>, contact_phone: Option<String>,
@ -258,21 +266,23 @@ async fn process_incoming_message(
let name = contact_name.clone().unwrap_or_else(|| phone.clone()); let name = contact_name.clone().unwrap_or_else(|| phone.clone());
info!( info!(
"Processing WhatsApp message from {} ({}): type={}", "Processing WhatsApp message from {} ({}) for bot {}: type={}",
name, phone, message.message_type name, phone, bot_id, message.message_type
); );
let content = extract_message_content(message); let content = extract_message_content(message);
debug!("Extracted content from WhatsApp message: '{}'", content);
if content.is_empty() { if content.is_empty() {
debug!("Empty message content, skipping"); warn!("Empty message content from WhatsApp, skipping. Message: {:?}", message);
return Ok(()); return Ok(());
} }
if content.starts_with('/') { if content.starts_with('/') {
if let Some(response) = process_attendant_command(&state, &phone, &content).await { if let Some(response) = process_attendant_command(&state, &phone, &content).await {
let adapter = WhatsAppAdapter::new(state.conn.clone(), Uuid::nil()); let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id);
let bot_response = BotResponse { let bot_response = BotResponse {
bot_id: Uuid::nil().to_string(), bot_id: bot_id.to_string(),
session_id: Uuid::nil().to_string(), session_id: Uuid::nil().to_string(),
user_id: phone.clone(), user_id: phone.clone(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
@ -292,7 +302,7 @@ async fn process_incoming_message(
} }
} }
let (session, is_new) = find_or_create_session(&state, &phone, &name).await?; let (session, is_new) = find_or_create_session(&state, bot_id, &phone, &name).await?;
let needs_human = check_needs_human(&session); let needs_human = check_needs_human(&session);
@ -343,7 +353,7 @@ async fn process_attendant_command(
} }
} }
async fn check_is_attendant(state: &Arc<AppState>, phone: &str) -> bool { async fn check_is_attendant(_state: &Arc<AppState>, phone: &str) -> bool {
let phone_clone = phone.to_string(); let phone_clone = phone.to_string();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
@ -455,17 +465,19 @@ fn extract_message_content(message: &WhatsAppMessage) -> String {
async fn find_or_create_session( async fn find_or_create_session(
state: &Arc<AppState>, state: &Arc<AppState>,
bot_id: &Uuid,
phone: &str, phone: &str,
name: &str, name: &str,
) -> Result<(UserSession, bool), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(UserSession, bool), Box<dyn std::error::Error + Send + Sync>> {
let conn = state.conn.clone(); let conn = state.conn.clone();
let phone_clone = phone.to_string(); let phone_clone = phone.to_string();
let name_clone = name.to_string(); let name_clone = name.to_string();
let bot_id_clone = *bot_id;
let result = tokio::task::spawn_blocking(move || { let result = tokio::task::spawn_blocking(move || {
let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?;
use crate::core::shared::models::schema::{bots, user_sessions, users}; use crate::core::shared::models::schema::{user_sessions, users};
let existing_user: Option<(Uuid, String)> = users::table let existing_user: Option<(Uuid, String)> = users::table
.filter(users::email.eq(&phone_clone)) .filter(users::email.eq(&phone_clone))
@ -491,25 +503,20 @@ async fn find_or_create_session(
(new_user_id, name_clone.clone()) (new_user_id, name_clone.clone())
}; };
let bot_id: Uuid = bots::table
.filter(bots::is_active.eq(true))
.select(bots::id)
.first(&mut db_conn)
.map_err(|e| format!("No active bot found: {}", e))?;
let existing_session: Option<UserSession> = user_sessions::table let existing_session: Option<UserSession> = user_sessions::table
.filter(user_sessions::user_id.eq(user_id)) .filter(user_sessions::user_id.eq(user_id))
.filter(user_sessions::bot_id.eq(bot_id)) .filter(user_sessions::bot_id.eq(bot_id_clone))
.order(user_sessions::created_at.desc()) .order(user_sessions::created_at.desc())
.first(&mut db_conn) .first(&mut db_conn)
.optional() .optional()
.map_err(|e| format!("Session query error: {}", e))?; .map_err(|e| format!("Session query error: {}", e))?;
if let Some(session) = existing_session { if let Some(session) = existing_session {
let age = Utc::now() - session.updated_at; diesel::update(user_sessions::table.filter(user_sessions::id.eq(session.id)))
if age.num_hours() < 24 { .set(user_sessions::updated_at.eq(diesel::dsl::now))
return Ok::<(UserSession, bool), String>((session, false)); .execute(&mut db_conn)
} .map_err(|e| format!("Update session error: {}", e))?;
return Ok::<(UserSession, bool), String>((session, false));
} }
let new_session_id = Uuid::new_v4(); let new_session_id = Uuid::new_v4();
@ -523,7 +530,7 @@ async fn find_or_create_session(
.values(( .values((
user_sessions::id.eq(new_session_id), user_sessions::id.eq(new_session_id),
user_sessions::user_id.eq(user_id), user_sessions::user_id.eq(user_id),
user_sessions::bot_id.eq(bot_id), user_sessions::bot_id.eq(bot_id_clone),
user_sessions::context_data.eq(&context_data), user_sessions::context_data.eq(&context_data),
user_sessions::created_at.eq(diesel::dsl::now), user_sessions::created_at.eq(diesel::dsl::now),
user_sessions::updated_at.eq(diesel::dsl::now), user_sessions::updated_at.eq(diesel::dsl::now),
@ -588,15 +595,29 @@ async fn route_to_bot(
let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id);
tokio::spawn(async move { tokio::spawn(async move {
let mut buffer = String::new();
while let Some(response) = rx.recv().await { while let Some(response) = rx.recv().await {
let is_final = response.is_complete;
if !response.content.is_empty() { if !response.content.is_empty() {
buffer.push_str(&response.content);
}
// Only send when the complete message is ready
// This ensures lists and all content are sent as one complete message
if is_final && !buffer.is_empty() {
let mut wa_response = response; let mut wa_response = response;
wa_response.user_id.clone_from(&phone); wa_response.user_id.clone_from(&phone);
wa_response.channel = "whatsapp".to_string(); wa_response.channel = "whatsapp".to_string();
wa_response.content = buffer.clone();
wa_response.is_complete = true;
if let Err(e) = adapter_for_send.send_message(wa_response).await { if let Err(e) = adapter_for_send.send_message(wa_response).await {
error!("Failed to send WhatsApp response: {}", e); error!("Failed to send WhatsApp response: {}", e);
} }
buffer.clear();
} }
} }
}); });
@ -947,18 +968,21 @@ pub async fn attendant_respond(
} }
} }
async fn get_verify_token(_state: &Arc<AppState>) -> String { async fn get_verify_token_for_bot(state: &Arc<AppState>, bot_id: &Uuid) -> String {
use crate::core::secrets::SecretsManager; let config_manager = ConfigManager::new(state.conn.clone());
let bot_id_clone = *bot_id;
match SecretsManager::from_env() { tokio::task::spawn_blocking(move || {
Ok(secrets) => match secrets.get_value("gbo/whatsapp", "verify_token").await { config_manager
Ok(token) => token, .get_config(&bot_id_clone, "whatsapp-verify-token", None)
Err(_) => "webhook_verify".to_string(), .unwrap_or_else(|_| "webhook_verify".to_string())
}, })
Err(_) => "webhook_verify".to_string(), .await
} .unwrap_or_else(|_| "webhook_verify".to_string())
} }
async fn get_default_bot_id(state: &Arc<AppState>) -> Uuid { async fn get_default_bot_id(state: &Arc<AppState>) -> Uuid {
let conn = state.conn.clone(); let conn = state.conn.clone();