From ad4aca21ffec3de5db510317aa1e366a15c50499 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 10 Mar 2026 21:18:14 -0300 Subject: [PATCH] feat: implement WhatsApp bursting rate limit and consume activation tokens --- src/basic/keywords/on_change.rs | 1 + src/core/bot/channels/whatsapp_queue.rs | 87 +++++++++++-------- src/core/bot/channels/whatsapp_queue_tests.rs | 38 +------- 3 files changed, 56 insertions(+), 70 deletions(-) diff --git a/src/basic/keywords/on_change.rs b/src/basic/keywords/on_change.rs index 4b4f15e6..314306a0 100644 --- a/src/basic/keywords/on_change.rs +++ b/src/basic/keywords/on_change.rs @@ -176,6 +176,7 @@ pub fn fetch_folder_changes( #[cfg(test)] mod tests { use super::*; + use std::path::Path; fn apply_filters(events: Vec, filters: &Option) -> Vec { let Some(ref filters) = filters else { diff --git a/src/core/bot/channels/whatsapp_queue.rs b/src/core/bot/channels/whatsapp_queue.rs index 335aecef..aa56ebeb 100644 --- a/src/core/bot/channels/whatsapp_queue.rs +++ b/src/core/bot/channels/whatsapp_queue.rs @@ -1,7 +1,8 @@ //! WhatsApp Message Queue //! -//! Implements a Redis-backed queue for WhatsApp messages to enforce -//! Meta's official rate limits: 1 message per 6 seconds per recipient. +//! Enforces Meta's bursting rules (up to 45 msgs in a burst window) and +//! handles cooling off (steady state 1 msg/6s) and rate limit errors (131056) +//! with exponential backoff 4^X. use log::{error, info, warn}; use redis::AsyncCommands; @@ -26,8 +27,9 @@ pub struct WhatsAppMessageQueue { impl WhatsAppMessageQueue { const QUEUE_KEY: &'static str = "whatsapp:message_queue"; - const LAST_SENT_PREFIX: &'static str = "whatsapp:last_sent:"; - const MIN_DELAY_SECS: i64 = 6; + const TFT_PREFIX: &'static str = "whatsapp:tft:"; // Theoretical Finish Time + const BURST_CAPACITY: i64 = 45; + const RATE_SECS: i64 = 6; pub fn new(redis_url: &str) -> Result { Ok(Self { @@ -45,7 +47,7 @@ impl WhatsAppMessageQueue { } pub async fn start_worker(self: Arc) { - info!("WhatsApp queue worker started (Meta rate: 1 msg/6s per recipient)"); + info!("WhatsApp queue worker started (Burst: up to 45 msgs in 6s per recipient)"); loop { if let Err(e) = self.process_next().await { error!("WhatsApp queue worker error: {}", e); @@ -57,38 +59,32 @@ impl WhatsAppMessageQueue { async fn process_next(&self) -> Result<(), Box> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - let result: Option<(String, String)> = conn.blpop(Self::QUEUE_KEY, 5.0).await?; + let result: Option<(String, String)> = conn.blpop::<&str, Option<(String, String)>>(Self::QUEUE_KEY, 5.0).await?; if let Some((_key, json)) = result { let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?; - // Wait for rate limit (stored in Redis) + // 1. Proactive Rate Limiting (Burst 45, steady state 1/6s) self.wait_for_rate_limit(&msg.to, &mut conn).await?; - // Send with retry logic - let mut retry_count = 0; + // 2. Reactive Retry Logic (4^X for error 131056) + let mut x = 0; loop { match self.send_message(&msg).await { - Ok(_) => { - // Update last sent time in Redis - let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, msg.to); - let now = chrono::Utc::now().timestamp(); - let _: () = conn.set(last_sent_key, now).await?; - break; - } + Ok(_) => break, Err(e) => { let error_str = e.to_string(); if error_str.contains("131056") { - let wait_secs = 4_i64.pow(retry_count); - warn!("Rate limit hit for {}, retrying in {}s", msg.to, wait_secs); - sleep(Duration::from_secs(wait_secs as u64)).await; - retry_count += 1; - if retry_count > 5 { - error!("Max retries for {}: {}", msg.to, e); + let wait_secs = 4_u64.pow(x as u32); + warn!("WhatsApp 131056 rate limit for {}: retrying in 4^{} = {}s", msg.to, x, wait_secs); + sleep(Duration::from_secs(wait_secs)).await; + x += 1; + if x > 5 { + error!("Max retries (4^5) exceeded for {}: {}", msg.to, e); break; } } else { - error!("Failed to send to {}: {}", msg.to, e); + error!("WhatsApp send failure for {}: {}", msg.to, e); break; } } @@ -99,26 +95,47 @@ impl WhatsAppMessageQueue { Ok(()) } + /// Implements Virtual Clock / Leaky Bucket for Meta's Bursting Rules. + /// Capacity: 45 messages (represented as 45 * 6s = 270s of "debt"). + /// Steady rate: 1 message per 6 seconds. async fn wait_for_rate_limit(&self, recipient: &str, conn: &mut redis::aio::MultiplexedConnection) -> Result<(), Box> { - let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, recipient); + let tft_key = format!("{}{}", Self::TFT_PREFIX, recipient); - // Get last sent time from Redis - let last_sent: Option = conn.get(&last_sent_key).await?; + let now = chrono::Utc::now().timestamp(); + let tft: i64 = conn.get::<_, Option>(&tft_key).await?.unwrap_or(0); - if let Some(last_ts) = last_sent { - let now = chrono::Utc::now().timestamp(); - let since_last = now - last_ts; - - if since_last < Self::MIN_DELAY_SECS { - let wait_time = Self::MIN_DELAY_SECS - since_last; - warn!("Rate limiting {}: waiting {}s", recipient, wait_time); - sleep(Duration::from_secs(wait_time as u64)).await; + // Max "borrowing" is 45 messages * 6s = 270s + let max_debt_secs = Self::BURST_CAPACITY * Self::RATE_SECS; + + let mut wait_secs = 0; + let mut new_tft = if tft > now { + // Recipient is in debt + let debt = tft - now; + if debt + Self::RATE_SECS > max_debt_secs { + // Next message would exceed burst capacity + wait_secs = (debt + Self::RATE_SECS) - max_debt_secs; + tft + Self::RATE_SECS + } else { + tft + Self::RATE_SECS } + } else { + // Recipient has no active debt + now + Self::RATE_SECS + }; + + if wait_secs > 0 { + warn!("Burst capacity exhausted for {}: waiting {}s cooling off", recipient, wait_secs); + sleep(Duration::from_secs(wait_secs as u64)).await; + // Advance TFT if we waited (now has changed) + new_tft = chrono::Utc::now().timestamp() + (new_tft - (now + wait_secs as i64)); } + // Store the new Theoretical Finish Time with TTL to clean up Redis + let _: () = conn.set(&tft_key, new_tft).await?; + let _: () = conn.expire(&tft_key, max_debt_secs + 3600).await?; + Ok(()) } - async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box> { let client = reqwest::Client::new(); let url = format!( diff --git a/src/core/bot/channels/whatsapp_queue_tests.rs b/src/core/bot/channels/whatsapp_queue_tests.rs index 0064a7da..33f96596 100644 --- a/src/core/bot/channels/whatsapp_queue_tests.rs +++ b/src/core/bot/channels/whatsapp_queue_tests.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use super::super::whatsapp_queue::*; + use crate::core::bot::channels::whatsapp_queue::*; use std::sync::Arc; use tokio::time::Duration; @@ -20,40 +20,8 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_per_recipient_rate_limit() { - let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap()); - - let msg1 = QueuedWhatsAppMessage { - to: "+5511999999999".to_string(), - message: "Message 1".to_string(), - api_key: "test_key".to_string(), - phone_number_id: "123456".to_string(), - api_version: "v17.0".to_string(), - }; - - let msg2 = QueuedWhatsAppMessage { - to: "+5511999999999".to_string(), - message: "Message 2".to_string(), - api_key: "test_key".to_string(), - phone_number_id: "123456".to_string(), - api_version: "v17.0".to_string(), - }; - - queue.enqueue(msg1).await.unwrap(); - queue.enqueue(msg2).await.unwrap(); - - let start = std::time::Instant::now(); - - // Process both messages - let _ = queue.process_next().await; - let _ = queue.process_next().await; - - let elapsed = start.elapsed(); - - // Should take at least 6 seconds due to Meta rate limiting - assert!(elapsed >= Duration::from_secs(6)); - } + // Note: test_per_recipient_rate_limit removed because we now allow per-recipient bursting. + // Throttling is handled reactively via 131056 error code. #[tokio::test] async fn test_different_recipients_no_delay() {