diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index 3a4f527c..9dd3b2e3 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -208,18 +208,32 @@ pub async fn verify_webhook( pub async fn handle_webhook( State(state): State>, Path(bot_id): Path, - Json(payload): Json, + body: axum::body::Bytes, ) -> impl IntoResponse { + debug!("Raw webhook body: {}", String::from_utf8_lossy(&body)); + + let payload: WhatsAppWebhook = match serde_json::from_slice(&body) { + Ok(p) => p, + Err(e) => { + error!("Failed to deserialize WhatsApp webhook: {}", e); + return StatusCode::BAD_REQUEST; + } + }; + info!("WhatsApp webhook received for bot {}: {:?}", bot_id, payload.object); + debug!("Webhook entry count: {}", payload.entry.len()); if payload.object != "whatsapp_business_account" { return StatusCode::OK; } for entry in payload.entry { + debug!("Entry changes count: {}", entry.changes.len()); for change in entry.changes { + debug!("Change field: {}", change.field); if change.field == "messages" { debug!("Processing 'messages' field change for bot {}", bot_id); + debug!("Contacts count: {}", change.value.contacts.len()); let contact = change.value.contacts.first(); let contact_name = contact.map(|c| c.profile.name.clone()); let contact_phone = contact.map(|c| c.wa_id.clone()); @@ -260,6 +274,29 @@ async fn process_incoming_message( contact_name: Option, contact_phone: Option, ) -> Result<(), Box> { + // Deduplicate messages using cache to prevent processing the same message twice + // WhatsApp may retry webhook delivery, causing duplicate processing + let message_id_key = format!("wa_msg_processed:{}", message.id); + if let Some(cache) = &state.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + // SETNX returns true (1) if key was set (first time), false (0) if key existed (duplicate) + let is_new_message: bool = redis::cmd("SET") + .arg(&message_id_key) + .arg("1") + .arg("NX") // Only set if not exists + .arg("EX") + .arg("300") // 5 minutes TTL + .query_async(&mut conn) + .await + .unwrap_or(false); + + if !is_new_message { + info!("Skipping duplicate WhatsApp message ID: {}", message.id); + return Ok(()); + } + } + } + let phone = contact_phone .clone() .unwrap_or_else(|| message.from.clone()); @@ -278,6 +315,44 @@ async fn process_incoming_message( return Ok(()); } + // Handle /clear command - available to all users + if content.trim().to_lowercase() == "/clear" { + let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id); + + // Find and clear the user's session + match find_or_create_session(&state, bot_id, &phone, &name).await { + Ok((session, _)) => { + // Clear message history for this session + if let Err(e) = clear_session_history(&state, &session.id).await { + error!("Failed to clear session history: {}", e); + } + + let bot_response = BotResponse { + bot_id: bot_id.to_string(), + session_id: session.id.to_string(), + user_id: phone.clone(), + channel: "whatsapp".to_string(), + content: "🧹 Histórico de conversa limpo! Posso ajudar com algo novo?".to_string(), + message_type: MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + context_length: 0, + context_max_length: 0, + }; + if let Err(e) = adapter.send_message(bot_response).await { + error!("Failed to send clear confirmation: {}", e); + } + info!("Cleared conversation history for WhatsApp user {}", phone); + } + Err(e) => { + error!("Failed to get session for /clear: {}", e); + } + } + return Ok(()); + } + if content.starts_with('/') { if let Some(response) = process_attendant_command(&state, &phone, &content).await { let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id); @@ -551,6 +626,31 @@ async fn find_or_create_session( Ok(result) } +async fn clear_session_history( + state: &Arc, + session_id: &Uuid, +) -> Result<(), Box> { + let conn = state.conn.clone(); + let session_id_copy = *session_id; + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::core::shared::models::schema::message_history::dsl::*; + + diesel::delete(message_history.filter(session_id.eq(session_id_copy))) + .execute(&mut db_conn) + .map_err(|e| format!("Delete messages error: {}", e))?; + + info!("Cleared message history for session {}", session_id_copy); + Ok::<(), String>(()) + }) + .await + .map_err(|e| format!("Task error: {}", e))??; + + Ok(()) +} + fn check_needs_human(session: &UserSession) -> bool { if let Some(needs_human) = session.context_data.get("needs_human") { return needs_human.as_bool().unwrap_or(false); @@ -596,6 +696,55 @@ async fn route_to_bot( tokio::spawn(async move { let mut buffer = String::new(); + const MAX_WHATSAPP_LENGTH: usize = 4000; + const MIN_FLUSH_PARAGRAPHS: usize = 3; + + /// Check if a line is a list item + fn is_list_item(line: &str) -> bool { + let trimmed = line.trim(); + trimmed.starts_with("- ") + || trimmed.starts_with("* ") + || trimmed.starts_with("• ") + || trimmed.chars().next().map(|c| c.is_numeric()).unwrap_or(false) + } + + /// Check if buffer contains a list (any line starting with list marker) + fn contains_list(text: &str) -> bool { + text.lines().any(is_list_item) + } + + /// Send a WhatsApp message part + async fn send_part( + adapter: &crate::core::bot::channels::whatsapp::WhatsAppAdapter, + phone: &str, + content: String, + is_final: bool, + ) { + if content.trim().is_empty() { + return; + } + let wa_response = crate::core::shared::models::BotResponse { + bot_id: String::new(), + user_id: phone.to_string(), + session_id: String::new(), + channel: "whatsapp".to_string(), + content, + message_type: crate::core::shared::models::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: is_final, + suggestions: vec![], + context_name: None, + context_length: 0, + context_max_length: 0, + }; + + if let Err(e) = adapter.send_message(wa_response).await { + log::error!("Failed to send WhatsApp response part: {}", e); + } + + // Small delay between parts to avoid rate limiting + tokio::time::sleep(tokio::time::Duration::from_millis(300)).await; + } while let Some(response) = rx.recv().await { let is_final = response.is_complete; @@ -604,20 +753,61 @@ async fn route_to_bot( buffer.push_str(&response.content); } - // Only send when the complete message is ready - // This ensures lists and all content are sent as one complete message - if is_final && !buffer.is_empty() { - let mut wa_response = response; - wa_response.user_id.clone_from(&phone); - wa_response.channel = "whatsapp".to_string(); - wa_response.content = buffer.clone(); - wa_response.is_complete = true; + // SIMPLE LOGIC: + // 1. If buffer contains a list, ONLY flush when is_final or too long + // 2. If no list, use normal paragraph-based flushing - if let Err(e) = adapter_for_send.send_message(wa_response).await { - error!("Failed to send WhatsApp response: {}", e); + let has_list = contains_list(&buffer); + + debug!( + "WA stream: is_final={}, has_list={}, buffer_len={}, buffer_preview={:?}", + is_final, has_list, buffer.len(), &buffer.chars().take(100).collect::() + ); + + if has_list { + // With lists: only flush when final or too long + // This ensures the ENTIRE list is sent as one message + if is_final || buffer.len() >= MAX_WHATSAPP_LENGTH { + info!("WA sending list message, len={}", buffer.len()); + if buffer.len() > MAX_WHATSAPP_LENGTH { + let parts = adapter_for_send.split_message_smart(&buffer, MAX_WHATSAPP_LENGTH); + for part in parts { + send_part(&adapter_for_send, &phone, part, is_final).await; + } + } else { + send_part(&adapter_for_send, &phone, buffer.clone(), is_final).await; + } + buffer.clear(); + } else { + debug!("WA waiting for more list content (buffer len={})", buffer.len()); } + // Otherwise: wait for more content (don't flush mid-list) + } else { + // No list: use normal paragraph-based flushing + let paragraph_count = buffer + .split("\n\n") + .filter(|p| !p.trim().is_empty()) + .count(); - buffer.clear(); + let ends_with_paragraph = buffer.ends_with("\n\n") || + (buffer.ends_with('\n') && buffer.len() > 1 && !buffer[..buffer.len()-1].ends_with('\n')); + + let should_flush = buffer.len() >= MAX_WHATSAPP_LENGTH || + (paragraph_count >= MIN_FLUSH_PARAGRAPHS && ends_with_paragraph) || + (is_final && !buffer.is_empty()); + + if should_flush { + info!("WA sending non-list message, len={}, paragraphs={}", buffer.len(), paragraph_count); + if buffer.len() > MAX_WHATSAPP_LENGTH { + let parts = adapter_for_send.split_message_smart(&buffer, MAX_WHATSAPP_LENGTH); + for part in parts { + send_part(&adapter_for_send, &phone, part, is_final).await; + } + } else { + send_part(&adapter_for_send, &phone, buffer.clone(), is_final).await; + } + buffer.clear(); + } } } });