diff --git a/src/core/bot/channels/mod.rs b/src/core/bot/channels/mod.rs index d5a54d7d..3de30ac2 100644 --- a/src/core/bot/channels/mod.rs +++ b/src/core/bot/channels/mod.rs @@ -2,6 +2,7 @@ pub mod instagram; pub mod teams; pub mod telegram; pub mod whatsapp; +pub mod whatsapp_queue; pub mod whatsapp_rate_limiter; use crate::core::shared::models::BotResponse; diff --git a/src/core/bot/channels/whatsapp.rs b/src/core/bot/channels/whatsapp.rs index bd0dd046..ab139c63 100644 --- a/src/core/bot/channels/whatsapp.rs +++ b/src/core/bot/channels/whatsapp.rs @@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::core::bot::channels::ChannelAdapter; -use crate::core::bot::channels::whatsapp_rate_limiter::WhatsAppRateLimiter; +use crate::core::bot::channels::whatsapp_queue::{QueuedWhatsAppMessage, WhatsAppMessageQueue}; use crate::core::config::ConfigManager; use crate::core::shared::models::BotResponse; use crate::core::shared::utils::DbPool; +use std::sync::Arc; -/// Global rate limiter for WhatsApp API (shared across all adapters) -static WHATSAPP_RATE_LIMITER: std::sync::OnceLock = std::sync::OnceLock::new(); +/// Global WhatsApp message queue (shared across all adapters) +static WHATSAPP_QUEUE: std::sync::OnceLock> = std::sync::OnceLock::new(); #[derive(Debug, Clone)] pub struct WhatsAppAdapter { @@ -19,12 +20,12 @@ pub struct WhatsAppAdapter { webhook_verify_token: String, _business_account_id: String, api_version: String, - rate_limiter: &'static WhatsAppRateLimiter, + queue: &'static Arc, } impl WhatsAppAdapter { pub fn new(pool: DbPool, bot_id: Uuid) -> Self { - let config_manager = ConfigManager::new(pool); + let config_manager = ConfigManager::new(pool.clone()); let api_key = config_manager .get_config(&bot_id, "whatsapp-api-key", None) @@ -46,17 +47,10 @@ impl WhatsAppAdapter { .get_config(&bot_id, "whatsapp-api-version", Some("v17.0")) .unwrap_or_else(|_| "v17.0".to_string()); - // Get rate limit tier from config (default to Tier 1 for safety) - let tier_str = config_manager - .get_config(&bot_id, "whatsapp-rate-tier", None) - .unwrap_or_else(|_| "1".to_string()); - let tier = match tier_str.as_str() { - "1" | "tier1" => super::whatsapp_rate_limiter::WhatsAppTier::Tier1, - "2" | "tier2" => super::whatsapp_rate_limiter::WhatsAppTier::Tier2, - "3" | "tier3" => super::whatsapp_rate_limiter::WhatsAppTier::Tier3, - "4" | "tier4" => super::whatsapp_rate_limiter::WhatsAppTier::Tier4, - _ => super::whatsapp_rate_limiter::WhatsAppTier::Tier1, - }; + // Get Redis URL from config + let redis_url = config_manager + .get_config(&bot_id, "redis-url", Some("redis://127.0.0.1:6379")) + .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); Self { api_key, @@ -64,8 +58,18 @@ impl WhatsAppAdapter { webhook_verify_token: verify_token, _business_account_id: business_account_id, api_version, - rate_limiter: WHATSAPP_RATE_LIMITER.get_or_init(|| { - super::whatsapp_rate_limiter::WhatsAppRateLimiter::from_tier(tier) + queue: WHATSAPP_QUEUE.get_or_init(|| { + let queue = WhatsAppMessageQueue::new(&redis_url) + .unwrap_or_else(|e| { + error!("Failed to create WhatsApp queue: {}", e); + panic!("WhatsApp queue initialization failed"); + }); + let queue = Arc::new(queue); + let worker_queue = Arc::clone(&queue); + tokio::spawn(async move { + worker_queue.start_worker().await; + }); + queue }), } } @@ -134,43 +138,20 @@ impl WhatsAppAdapter { to: &str, message: &str, ) -> Result> { - // Wait for rate limiter before making API call - self.rate_limiter.acquire().await; + // Enqueue message instead of sending directly + let queued_msg = QueuedWhatsAppMessage { + to: to.to_string(), + message: message.to_string(), + api_key: self.api_key.clone(), + phone_number_id: self.phone_number_id.clone(), + api_version: self.api_version.clone(), + }; - let client = reqwest::Client::new(); + self.queue.enqueue(queued_msg).await + .map_err(|e| format!("Failed to enqueue WhatsApp message: {}", e))?; - let url = format!( - "https://graph.facebook.com/{}/{}/messages", - self.api_version, self.phone_number_id - ); - - let payload = serde_json::json!({ - "messaging_product": "whatsapp", - "to": to, - "type": "text", - "text": { - "body": message - } - }); - - let response = client - .post(&url) - .header("Authorization", format!("Bearer {}", self.api_key)) - .header("Content-Type", "application/json") - .json(&payload) - .send() - .await?; - - if response.status().is_success() { - let result: serde_json::Value = response.json().await?; - Ok(result["messages"][0]["id"] - .as_str() - .unwrap_or("") - .to_string()) - } else { - let error_text = response.text().await?; - Err(format!("WhatsApp API error: {}", error_text).into()) - } + info!("WhatsApp message enqueued for {}: {}", to, &message.chars().take(50).collect::()); + Ok("queued".to_string()) } pub async fn send_template_message( @@ -668,11 +649,7 @@ impl ChannelAdapter for WhatsAppAdapter { "WhatsApp message part {}/{} sent to {}: {} (message_id: {})", i + 1, parts.len(), response.user_id, &part.chars().take(50).collect::(), message_id ); - - // Use rate limiter to wait before sending next message - if i < parts.len() - 1 { - self.rate_limiter.acquire().await; - } + // Rate limiting is now handled inside send_whatsapp_message (per-recipient) } } diff --git a/src/core/bot/channels/whatsapp_queue.rs b/src/core/bot/channels/whatsapp_queue.rs new file mode 100644 index 00000000..e1a877bf --- /dev/null +++ b/src/core/bot/channels/whatsapp_queue.rs @@ -0,0 +1,233 @@ +//! WhatsApp Message Queue +//! +//! Implements a Redis-backed queue for WhatsApp messages to enforce +//! Meta's official rate limits: 1 message per 6 seconds per recipient (0.17 msg/sec). +//! +//! ## Meta WhatsApp Rate Limits (Official) +//! - **Base Rate**: 1 message per 6 seconds per recipient (0.17 msg/sec) +//! - **Burst Limit**: Up to 45 messages in 6 seconds (borrows from future quota) +//! - **Post-Burst**: Must wait equivalent time at normal rate +//! - **Error Code**: 131056 when rate limit exceeded +//! - **Retry Strategy**: 4^X seconds (X starts at 0, increments on each failure) + +use log::{error, info, warn}; +use redis::AsyncCommands; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; +use tokio::time::sleep; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QueuedWhatsAppMessage { + pub to: String, + pub message: String, + pub api_key: String, + pub phone_number_id: String, + pub api_version: String, +} + +#[derive(Debug)] +struct RecipientState { + last_sent: Instant, + burst_count: u32, + burst_started: Option, +} + +pub struct WhatsAppMessageQueue { + redis_client: redis::Client, + recipient_states: Arc>>, +} + +impl WhatsAppMessageQueue { + const QUEUE_KEY: &'static str = "whatsapp:message_queue"; + const MIN_DELAY: Duration = Duration::from_secs(6); // 1 msg per 6 seconds + const BURST_WINDOW: Duration = Duration::from_secs(6); + const MAX_BURST: u32 = 45; + + pub fn new(redis_url: &str) -> Result { + Ok(Self { + redis_client: redis::Client::open(redis_url)?, + recipient_states: Arc::new(Mutex::new(HashMap::new())), + }) + } + + pub async fn enqueue(&self, msg: QueuedWhatsAppMessage) -> Result<(), redis::RedisError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + let json = serde_json::to_string(&msg).map_err(|e| { + redis::RedisError::from((redis::ErrorKind::TypeError, "JSON serialization failed", e.to_string())) + })?; + conn.rpush(Self::QUEUE_KEY, json).await?; + Ok(()) + } + + pub async fn start_worker(self: Arc) { + info!("WhatsApp queue worker started (Meta official rate: 1 msg/6s per recipient)"); + loop { + if let Err(e) = self.process_next().await { + error!("WhatsApp queue worker error: {}", e); + sleep(Duration::from_secs(1)).await; + } + } + } + + async fn process_next(&self) -> Result<(), Box> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + let result: Option = conn.blpop(Self::QUEUE_KEY, 5.0).await?; + + if let Some(json) = result { + let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?; + + // Check and enforce rate limit for this recipient + self.wait_for_rate_limit(&msg.to).await; + + // Send with retry logic + let mut retry_count = 0; + loop { + match self.send_message(&msg).await { + Ok(_) => { + // Update recipient state + let mut states = self.recipient_states.lock().await; + let state = states.entry(msg.to.clone()).or_insert(RecipientState { + last_sent: Instant::now(), + burst_count: 0, + burst_started: None, + }); + + // Track burst + if let Some(burst_start) = state.burst_started { + if burst_start.elapsed() < Self::BURST_WINDOW { + state.burst_count += 1; + } else { + state.burst_count = 1; + state.burst_started = Some(Instant::now()); + } + } else { + state.burst_count = 1; + state.burst_started = Some(Instant::now()); + } + + state.last_sent = Instant::now(); + break; + } + Err(e) => { + let error_str = e.to_string(); + if error_str.contains("131056") { + // Rate limit error - exponential backoff + let wait_secs = 4_u64.pow(retry_count); + warn!("Rate limit hit for {}, retrying in {}s (attempt {})", msg.to, wait_secs, retry_count + 1); + sleep(Duration::from_secs(wait_secs)).await; + retry_count += 1; + if retry_count > 5 { + error!("Max retries exceeded for {}: {}", msg.to, e); + break; + } + } else { + error!("Failed to send WhatsApp message to {}: {}", msg.to, e); + break; + } + } + } + } + } + + Ok(()) + } + + async fn wait_for_rate_limit(&self, recipient: &str) { + let mut states = self.recipient_states.lock().await; + let state = states.entry(recipient.to_string()).or_insert(RecipientState { + last_sent: Instant::now() - Self::MIN_DELAY, + burst_count: 0, + burst_started: None, + }); + + let now = Instant::now(); + + // Reset burst if window expired + if let Some(burst_start) = state.burst_started { + if burst_start.elapsed() >= Self::BURST_WINDOW { + state.burst_count = 0; + state.burst_started = None; + } + } + + // Check if we can send in burst mode (within 6-second window) + if let Some(burst_start) = state.burst_started { + if burst_start.elapsed() < Self::BURST_WINDOW && state.burst_count < Self::MAX_BURST { + // Can send immediately in burst mode + return; + } + } + + // Check if in burst cooldown + if state.burst_count > 0 { + let cooldown = Self::MIN_DELAY * state.burst_count; + let elapsed = state.last_sent.elapsed(); + if elapsed < cooldown { + let wait_time = cooldown - elapsed; + warn!("Burst cooldown for {}: waiting {:?} (sent {} msgs)", recipient, wait_time, state.burst_count); + drop(states); + sleep(wait_time).await; + return; + } else { + // Cooldown complete, reset burst + let mut states = self.recipient_states.lock().await; + if let Some(s) = states.get_mut(recipient) { + s.burst_count = 0; + s.burst_started = None; + } + drop(states); + } + } + + // Normal rate limit: 6 seconds between messages (if not in burst) + let elapsed = state.last_sent.elapsed(); + if elapsed < Self::MIN_DELAY { + let wait_time = Self::MIN_DELAY - elapsed; + drop(states); + sleep(wait_time).await; + } + } + + async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box> { + let client = reqwest::Client::new(); + let url = format!( + "https://graph.facebook.com/{}/{}/messages", + msg.api_version, msg.phone_number_id + ); + + let payload = serde_json::json!({ + "messaging_product": "whatsapp", + "to": msg.to, + "type": "text", + "text": { + "body": msg.message + } + }); + + let response = client + .post(&url) + .header("Authorization", format!("Bearer {}", msg.api_key)) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await?; + + if response.status().is_success() { + let result: serde_json::Value = response.json().await?; + let msg_id = result["messages"][0]["id"].as_str().unwrap_or(""); + info!("WhatsApp message sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::(), msg_id); + Ok(()) + } else { + let error_text = response.text().await?; + Err(format!("WhatsApp API error: {}", error_text).into()) + } + } +} + +#[cfg(test)] +#[path = "whatsapp_queue_tests.rs"] +mod whatsapp_queue_tests; diff --git a/src/core/bot/channels/whatsapp_queue_tests.rs b/src/core/bot/channels/whatsapp_queue_tests.rs new file mode 100644 index 00000000..0064a7da --- /dev/null +++ b/src/core/bot/channels/whatsapp_queue_tests.rs @@ -0,0 +1,121 @@ +#[cfg(test)] +mod tests { + use super::super::whatsapp_queue::*; + use std::sync::Arc; + use tokio::time::Duration; + + #[tokio::test] + async fn test_queue_enqueue() { + let queue = WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap(); + + let msg = QueuedWhatsAppMessage { + to: "+5511999999999".to_string(), + message: "Test message".to_string(), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + + let result = queue.enqueue(msg).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_per_recipient_rate_limit() { + let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap()); + + let msg1 = QueuedWhatsAppMessage { + to: "+5511999999999".to_string(), + message: "Message 1".to_string(), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + + let msg2 = QueuedWhatsAppMessage { + to: "+5511999999999".to_string(), + message: "Message 2".to_string(), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + + queue.enqueue(msg1).await.unwrap(); + queue.enqueue(msg2).await.unwrap(); + + let start = std::time::Instant::now(); + + // Process both messages + let _ = queue.process_next().await; + let _ = queue.process_next().await; + + let elapsed = start.elapsed(); + + // Should take at least 6 seconds due to Meta rate limiting + assert!(elapsed >= Duration::from_secs(6)); + } + + #[tokio::test] + async fn test_different_recipients_no_delay() { + let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap()); + + let msg1 = QueuedWhatsAppMessage { + to: "+5511999999999".to_string(), + message: "Message 1".to_string(), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + + let msg2 = QueuedWhatsAppMessage { + to: "+5511888888888".to_string(), + message: "Message 2".to_string(), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + + queue.enqueue(msg1).await.unwrap(); + queue.enqueue(msg2).await.unwrap(); + + let start = std::time::Instant::now(); + + // Process both messages (different recipients) + let _ = queue.process_next().await; + let _ = queue.process_next().await; + + let elapsed = start.elapsed(); + + // Should be fast since different recipients + assert!(elapsed < Duration::from_millis(500)); + } + + #[tokio::test] + async fn test_burst_mode_within_window() { + let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap()); + + // Send 3 messages to same recipient in quick succession + for i in 1..=3 { + let msg = QueuedWhatsAppMessage { + to: "+5511999999999".to_string(), + message: format!("Burst message {}", i), + api_key: "test_key".to_string(), + phone_number_id: "123456".to_string(), + api_version: "v17.0".to_string(), + }; + queue.enqueue(msg).await.unwrap(); + } + + let start = std::time::Instant::now(); + + // Process all 3 messages + let _ = queue.process_next().await; + let _ = queue.process_next().await; + let _ = queue.process_next().await; + + let elapsed = start.elapsed(); + + // Should complete in burst mode (< 6 seconds, not 18 seconds) + assert!(elapsed < Duration::from_secs(6)); + } +} diff --git a/src/core/bot/channels/whatsapp_rate_limiter.rs b/src/core/bot/channels/whatsapp_rate_limiter.rs index 8c8bfecf..9418b175 100644 --- a/src/core/bot/channels/whatsapp_rate_limiter.rs +++ b/src/core/bot/channels/whatsapp_rate_limiter.rs @@ -19,9 +19,11 @@ use governor::{ state::{InMemoryState, NotKeyed}, Quota, RateLimiter, }; +use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tokio::time::sleep; /// WhatsApp throughput tier levels (matches Meta's tiers) @@ -146,11 +148,13 @@ type Limiter = RateLimiter, config: WhatsAppRateLimitConfig, min_delay: Duration, + per_recipient_limiters: Arc>>>, } impl WhatsAppRateLimiter { @@ -181,6 +185,7 @@ impl WhatsAppRateLimiter { limiter: Arc::new(RateLimiter::direct(quota)), config, min_delay, + per_recipient_limiters: Arc::new(Mutex::new(HashMap::new())), } } @@ -212,6 +217,38 @@ impl WhatsAppRateLimiter { } } + /// Wait until a message can be sent to a specific recipient (async) + /// + /// Enforces 1 message per second per phone number (Meta requirement). + pub async fn acquire_for_recipient(&self, phone_number: &str) { + if !self.config.enabled { + return; + } + + // Get or create per-recipient limiter (1 msg/sec) + let recipient_limiter = { + let mut limiters = self.per_recipient_limiters.lock().await; + limiters + .entry(phone_number.to_string()) + .or_insert_with(|| { + let quota = Quota::per_second(NonZeroU32::new(1).unwrap()); + Arc::new(RateLimiter::direct(quota)) + }) + .clone() + }; + + // Wait for recipient-specific rate limit + loop { + if recipient_limiter.check().is_ok() { + break; + } + sleep(Duration::from_millis(100)).await; + } + + // Also wait for global rate limit + self.acquire().await; + } + /// Try to acquire with timeout /// /// Returns true if acquired, false if timed out @@ -262,6 +299,7 @@ impl Clone for WhatsAppRateLimiter { limiter: Arc::clone(&self.limiter), config: self.config.clone(), min_delay: self.min_delay, + per_recipient_limiters: Arc::clone(&self.per_recipient_limiters), } } } diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 7535d714..f5d986f3 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -473,6 +473,13 @@ impl ConfigManager { let mut conn = self .get_conn() .map_err(|e| format!("Failed to acquire connection: {}", e))?; + + // Delete all existing config keys for this bot to ensure clean sync + diesel::sql_query("DELETE FROM bot_configuration WHERE bot_id = $1") + .bind::(bot_id) + .execute(&mut conn) + .map_err(|e| format!("Failed to delete existing config: {}", e))?; + let mut updated = 0; for line in content.lines().skip(1) { let parts: Vec<&str> = line.split(',').collect(); @@ -480,13 +487,13 @@ impl ConfigManager { let key = parts[0].trim(); let value = parts[1].trim(); let new_id: uuid::Uuid = uuid::Uuid::new_v4(); - diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES ($1, $2, $3, $4, 'string') ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = EXCLUDED.config_value, updated_at = NOW()") + diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES ($1, $2, $3, $4, 'string')") .bind::(new_id) .bind::(bot_id) .bind::(key) .bind::(value) .execute(&mut conn) - .map_err(|e| format!("Failed to update config: {}", e))?; + .map_err(|e| format!("Failed to insert config: {}", e))?; updated += 1; } }