From c523cee177356662903ca983c33314c17d6aa7ce Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 9 Mar 2026 21:00:45 -0300 Subject: [PATCH] Use Redis to track last sent time per WhatsApp recipient - Store last_sent timestamp in Redis (whatsapp:last_sent:) - Always wait 6 seconds between messages to same recipient - Persists across restarts --- src/core/bot/channels/whatsapp_queue.rs | 137 ++++++------------------ src/llm/hallucination_detector.rs | 20 +++- 2 files changed, 49 insertions(+), 108 deletions(-) diff --git a/src/core/bot/channels/whatsapp_queue.rs b/src/core/bot/channels/whatsapp_queue.rs index 3cd3777c..335aecef 100644 --- a/src/core/bot/channels/whatsapp_queue.rs +++ b/src/core/bot/channels/whatsapp_queue.rs @@ -1,22 +1,13 @@ //! WhatsApp Message Queue //! //! Implements a Redis-backed queue for WhatsApp messages to enforce -//! Meta's official rate limits: 1 message per 6 seconds per recipient (0.17 msg/sec). -//! -//! ## Meta WhatsApp Rate Limits (Official) -//! - **Base Rate**: 1 message per 6 seconds per recipient (0.17 msg/sec) -//! - **Burst Limit**: Up to 45 messages in 6 seconds (borrows from future quota) -//! - **Post-Burst**: Must wait equivalent time at normal rate -//! - **Error Code**: 131056 when rate limit exceeded -//! - **Retry Strategy**: 4^X seconds (X starts at 0, increments on each failure) +//! Meta's official rate limits: 1 message per 6 seconds per recipient. use log::{error, info, warn}; use redis::AsyncCommands; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::Mutex; +use std::time::Duration; use tokio::time::sleep; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -28,29 +19,19 @@ pub struct QueuedWhatsAppMessage { pub api_version: String, } -#[derive(Debug)] -struct RecipientState { - last_sent: Instant, - burst_count: u32, - burst_started: Option, -} - #[derive(Debug)] pub struct WhatsAppMessageQueue { redis_client: redis::Client, - recipient_states: Arc>>, } impl WhatsAppMessageQueue { const QUEUE_KEY: &'static str = "whatsapp:message_queue"; - const MIN_DELAY: Duration = Duration::from_secs(6); // 1 msg per 6 seconds - const BURST_WINDOW: Duration = Duration::from_secs(6); - const MAX_BURST: u32 = 45; + const LAST_SENT_PREFIX: &'static str = "whatsapp:last_sent:"; + const MIN_DELAY_SECS: i64 = 6; pub fn new(redis_url: &str) -> Result { Ok(Self { redis_client: redis::Client::open(redis_url)?, - recipient_states: Arc::new(Mutex::new(HashMap::new())), }) } @@ -64,7 +45,7 @@ impl WhatsAppMessageQueue { } pub async fn start_worker(self: Arc) { - info!("WhatsApp queue worker started (Meta official rate: 1 msg/6s per recipient)"); + info!("WhatsApp queue worker started (Meta rate: 1 msg/6s per recipient)"); loop { if let Err(e) = self.process_next().await { error!("WhatsApp queue worker error: {}", e); @@ -76,58 +57,38 @@ impl WhatsAppMessageQueue { async fn process_next(&self) -> Result<(), Box> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - // BLPOP returns (key, value) tuple let result: Option<(String, String)> = conn.blpop(Self::QUEUE_KEY, 5.0).await?; if let Some((_key, json)) = result { let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?; - // Check and enforce rate limit for this recipient - self.wait_for_rate_limit(&msg.to).await; + // Wait for rate limit (stored in Redis) + self.wait_for_rate_limit(&msg.to, &mut conn).await?; // Send with retry logic let mut retry_count = 0; loop { match self.send_message(&msg).await { Ok(_) => { - // Update recipient state - let mut states = self.recipient_states.lock().await; - let state = states.entry(msg.to.clone()).or_insert(RecipientState { - last_sent: Instant::now(), - burst_count: 0, - burst_started: None, - }); - - // Track burst - if let Some(burst_start) = state.burst_started { - if burst_start.elapsed() < Self::BURST_WINDOW { - state.burst_count += 1; - } else { - state.burst_count = 1; - state.burst_started = Some(Instant::now()); - } - } else { - state.burst_count = 1; - state.burst_started = Some(Instant::now()); - } - - state.last_sent = Instant::now(); + // Update last sent time in Redis + let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, msg.to); + let now = chrono::Utc::now().timestamp(); + let _: () = conn.set(last_sent_key, now).await?; break; } Err(e) => { let error_str = e.to_string(); if error_str.contains("131056") { - // Rate limit error - exponential backoff - let wait_secs = 4_u64.pow(retry_count); - warn!("Rate limit hit for {}, retrying in {}s (attempt {})", msg.to, wait_secs, retry_count + 1); - sleep(Duration::from_secs(wait_secs)).await; + let wait_secs = 4_i64.pow(retry_count); + warn!("Rate limit hit for {}, retrying in {}s", msg.to, wait_secs); + sleep(Duration::from_secs(wait_secs as u64)).await; retry_count += 1; if retry_count > 5 { - error!("Max retries exceeded for {}: {}", msg.to, e); + error!("Max retries for {}: {}", msg.to, e); break; } } else { - error!("Failed to send WhatsApp message to {}: {}", msg.to, e); + error!("Failed to send to {}: {}", msg.to, e); break; } } @@ -138,58 +99,24 @@ impl WhatsAppMessageQueue { Ok(()) } - async fn wait_for_rate_limit(&self, recipient: &str) { - let mut states = self.recipient_states.lock().await; - let state = states.entry(recipient.to_string()).or_insert(RecipientState { - last_sent: Instant::now() - Self::MIN_DELAY, - burst_count: 0, - burst_started: None, - }); + async fn wait_for_rate_limit(&self, recipient: &str, conn: &mut redis::aio::MultiplexedConnection) -> Result<(), Box> { + let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, recipient); - // Reset burst if window expired - if let Some(burst_start) = state.burst_started { - if burst_start.elapsed() >= Self::BURST_WINDOW { - state.burst_count = 0; - state.burst_started = None; + // Get last sent time from Redis + let last_sent: Option = conn.get(&last_sent_key).await?; + + if let Some(last_ts) = last_sent { + let now = chrono::Utc::now().timestamp(); + let since_last = now - last_ts; + + if since_last < Self::MIN_DELAY_SECS { + let wait_time = Self::MIN_DELAY_SECS - since_last; + warn!("Rate limiting {}: waiting {}s", recipient, wait_time); + sleep(Duration::from_secs(wait_time as u64)).await; } } - - // Check if we can send in burst mode (within 6-second window) - if let Some(burst_start) = state.burst_started { - if burst_start.elapsed() < Self::BURST_WINDOW && state.burst_count < Self::MAX_BURST { - // Can send immediately in burst mode - return; - } - } - - // Check if in burst cooldown - if state.burst_count > 0 { - let cooldown = Self::MIN_DELAY * state.burst_count; - let elapsed = state.last_sent.elapsed(); - if elapsed < cooldown { - let wait_time = cooldown - elapsed; - warn!("Burst cooldown for {}: waiting {:?} (sent {} msgs)", recipient, wait_time, state.burst_count); - drop(states); - sleep(wait_time).await; - return; - } else { - // Cooldown complete, reset burst - let mut states = self.recipient_states.lock().await; - if let Some(s) = states.get_mut(recipient) { - s.burst_count = 0; - s.burst_started = None; - } - drop(states); - } - } - - // Normal rate limit: 6 seconds between messages (if not in burst) - let elapsed = state.last_sent.elapsed(); - if elapsed < Self::MIN_DELAY { - let wait_time = Self::MIN_DELAY - elapsed; - drop(states); - sleep(wait_time).await; - } + + Ok(()) } async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box> { @@ -219,7 +146,7 @@ impl WhatsAppMessageQueue { if response.status().is_success() { let result: serde_json::Value = response.json().await?; let msg_id = result["messages"][0]["id"].as_str().unwrap_or(""); - info!("WhatsApp message sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::(), msg_id); + info!("WhatsApp sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::(), msg_id); Ok(()) } else { let error_text = response.text().await?; diff --git a/src/llm/hallucination_detector.rs b/src/llm/hallucination_detector.rs index 40f3625c..58699ab2 100644 --- a/src/llm/hallucination_detector.rs +++ b/src/llm/hallucination_detector.rs @@ -49,7 +49,21 @@ impl HallucinationDetector { /// Check if a pattern is hallucinating (repeating 50+ times) pub async fn check(&self, pattern: &str) -> bool { - if pattern.trim().is_empty() || pattern.len() < 3 { + let trimmed = pattern.trim(); + + // Ignore short patterns + if trimmed.is_empty() || trimmed.len() < 3 { + return false; + } + + // Ignore Markdown formatting patterns + let md_patterns = ["**", "__", "*", "_", "`", "~~", "---", "***"]; + if md_patterns.iter().any(|p| trimmed == *p) { + return false; + } + + // Ignore patterns that are just Markdown formatting (e.g., " **", "* ", "__") + if trimmed.chars().all(|c| c == '*' || c == '_' || c == '`' || c == '~' || c == '-') { return false; } @@ -60,11 +74,11 @@ impl HallucinationDetector { counts.retain(|_, (_, time)| now.duration_since(*time) < self.config.window); // Increment count for this pattern - let (count, _) = counts.entry(pattern.to_string()).or_insert((0, now)); + let (count, _) = counts.entry(trimmed.to_string()).or_insert((0, now)); *count += 1; if *count >= self.config.threshold { - warn!("Hallucination detected: pattern {:?} repeated {} times", pattern, count); + warn!("Hallucination detected: pattern {:?} repeated {} times", trimmed, count); true } else { false