feat: implement WhatsApp bursting rate limit and consume activation tokens
All checks were successful
BotServer CI / build (push) Successful in 11m4s
All checks were successful
BotServer CI / build (push) Successful in 11m4s
This commit is contained in:
parent
e98de24fe6
commit
ad4aca21ff
3 changed files with 56 additions and 70 deletions
|
|
@ -176,6 +176,7 @@ pub fn fetch_folder_changes(
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
fn apply_filters(events: Vec<FolderChangeEvent>, filters: &Option<FileFilters>) -> Vec<FolderChangeEvent> {
|
fn apply_filters(events: Vec<FolderChangeEvent>, filters: &Option<FileFilters>) -> Vec<FolderChangeEvent> {
|
||||||
let Some(ref filters) = filters else {
|
let Some(ref filters) = filters else {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,8 @@
|
||||||
//! WhatsApp Message Queue
|
//! WhatsApp Message Queue
|
||||||
//!
|
//!
|
||||||
//! Implements a Redis-backed queue for WhatsApp messages to enforce
|
//! Enforces Meta's bursting rules (up to 45 msgs in a burst window) and
|
||||||
//! Meta's official rate limits: 1 message per 6 seconds per recipient.
|
//! handles cooling off (steady state 1 msg/6s) and rate limit errors (131056)
|
||||||
|
//! with exponential backoff 4^X.
|
||||||
|
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use redis::AsyncCommands;
|
use redis::AsyncCommands;
|
||||||
|
|
@ -26,8 +27,9 @@ pub struct WhatsAppMessageQueue {
|
||||||
|
|
||||||
impl WhatsAppMessageQueue {
|
impl WhatsAppMessageQueue {
|
||||||
const QUEUE_KEY: &'static str = "whatsapp:message_queue";
|
const QUEUE_KEY: &'static str = "whatsapp:message_queue";
|
||||||
const LAST_SENT_PREFIX: &'static str = "whatsapp:last_sent:";
|
const TFT_PREFIX: &'static str = "whatsapp:tft:"; // Theoretical Finish Time
|
||||||
const MIN_DELAY_SECS: i64 = 6;
|
const BURST_CAPACITY: i64 = 45;
|
||||||
|
const RATE_SECS: i64 = 6;
|
||||||
|
|
||||||
pub fn new(redis_url: &str) -> Result<Self, redis::RedisError> {
|
pub fn new(redis_url: &str) -> Result<Self, redis::RedisError> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
|
@ -45,7 +47,7 @@ impl WhatsAppMessageQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_worker(self: Arc<Self>) {
|
pub async fn start_worker(self: Arc<Self>) {
|
||||||
info!("WhatsApp queue worker started (Meta rate: 1 msg/6s per recipient)");
|
info!("WhatsApp queue worker started (Burst: up to 45 msgs in 6s per recipient)");
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = self.process_next().await {
|
if let Err(e) = self.process_next().await {
|
||||||
error!("WhatsApp queue worker error: {}", e);
|
error!("WhatsApp queue worker error: {}", e);
|
||||||
|
|
@ -57,38 +59,32 @@ impl WhatsAppMessageQueue {
|
||||||
async fn process_next(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
async fn process_next(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||||
|
|
||||||
let result: Option<(String, String)> = conn.blpop(Self::QUEUE_KEY, 5.0).await?;
|
let result: Option<(String, String)> = conn.blpop::<&str, Option<(String, String)>>(Self::QUEUE_KEY, 5.0).await?;
|
||||||
|
|
||||||
if let Some((_key, json)) = result {
|
if let Some((_key, json)) = result {
|
||||||
let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?;
|
let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?;
|
||||||
|
|
||||||
// Wait for rate limit (stored in Redis)
|
// 1. Proactive Rate Limiting (Burst 45, steady state 1/6s)
|
||||||
self.wait_for_rate_limit(&msg.to, &mut conn).await?;
|
self.wait_for_rate_limit(&msg.to, &mut conn).await?;
|
||||||
|
|
||||||
// Send with retry logic
|
// 2. Reactive Retry Logic (4^X for error 131056)
|
||||||
let mut retry_count = 0;
|
let mut x = 0;
|
||||||
loop {
|
loop {
|
||||||
match self.send_message(&msg).await {
|
match self.send_message(&msg).await {
|
||||||
Ok(_) => {
|
Ok(_) => break,
|
||||||
// Update last sent time in Redis
|
|
||||||
let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, msg.to);
|
|
||||||
let now = chrono::Utc::now().timestamp();
|
|
||||||
let _: () = conn.set(last_sent_key, now).await?;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let error_str = e.to_string();
|
let error_str = e.to_string();
|
||||||
if error_str.contains("131056") {
|
if error_str.contains("131056") {
|
||||||
let wait_secs = 4_i64.pow(retry_count);
|
let wait_secs = 4_u64.pow(x as u32);
|
||||||
warn!("Rate limit hit for {}, retrying in {}s", msg.to, wait_secs);
|
warn!("WhatsApp 131056 rate limit for {}: retrying in 4^{} = {}s", msg.to, x, wait_secs);
|
||||||
sleep(Duration::from_secs(wait_secs as u64)).await;
|
sleep(Duration::from_secs(wait_secs)).await;
|
||||||
retry_count += 1;
|
x += 1;
|
||||||
if retry_count > 5 {
|
if x > 5 {
|
||||||
error!("Max retries for {}: {}", msg.to, e);
|
error!("Max retries (4^5) exceeded for {}: {}", msg.to, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!("Failed to send to {}: {}", msg.to, e);
|
error!("WhatsApp send failure for {}: {}", msg.to, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -99,26 +95,47 @@ impl WhatsAppMessageQueue {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Implements Virtual Clock / Leaky Bucket for Meta's Bursting Rules.
|
||||||
|
/// Capacity: 45 messages (represented as 45 * 6s = 270s of "debt").
|
||||||
|
/// Steady rate: 1 message per 6 seconds.
|
||||||
async fn wait_for_rate_limit(&self, recipient: &str, conn: &mut redis::aio::MultiplexedConnection) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
async fn wait_for_rate_limit(&self, recipient: &str, conn: &mut redis::aio::MultiplexedConnection) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, recipient);
|
let tft_key = format!("{}{}", Self::TFT_PREFIX, recipient);
|
||||||
|
|
||||||
// Get last sent time from Redis
|
let now = chrono::Utc::now().timestamp();
|
||||||
let last_sent: Option<i64> = conn.get(&last_sent_key).await?;
|
let tft: i64 = conn.get::<_, Option<i64>>(&tft_key).await?.unwrap_or(0);
|
||||||
|
|
||||||
if let Some(last_ts) = last_sent {
|
// Max "borrowing" is 45 messages * 6s = 270s
|
||||||
let now = chrono::Utc::now().timestamp();
|
let max_debt_secs = Self::BURST_CAPACITY * Self::RATE_SECS;
|
||||||
let since_last = now - last_ts;
|
|
||||||
|
|
||||||
if since_last < Self::MIN_DELAY_SECS {
|
let mut wait_secs = 0;
|
||||||
let wait_time = Self::MIN_DELAY_SECS - since_last;
|
let mut new_tft = if tft > now {
|
||||||
warn!("Rate limiting {}: waiting {}s", recipient, wait_time);
|
// Recipient is in debt
|
||||||
sleep(Duration::from_secs(wait_time as u64)).await;
|
let debt = tft - now;
|
||||||
|
if debt + Self::RATE_SECS > max_debt_secs {
|
||||||
|
// Next message would exceed burst capacity
|
||||||
|
wait_secs = (debt + Self::RATE_SECS) - max_debt_secs;
|
||||||
|
tft + Self::RATE_SECS
|
||||||
|
} else {
|
||||||
|
tft + Self::RATE_SECS
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Recipient has no active debt
|
||||||
|
now + Self::RATE_SECS
|
||||||
|
};
|
||||||
|
|
||||||
|
if wait_secs > 0 {
|
||||||
|
warn!("Burst capacity exhausted for {}: waiting {}s cooling off", recipient, wait_secs);
|
||||||
|
sleep(Duration::from_secs(wait_secs as u64)).await;
|
||||||
|
// Advance TFT if we waited (now has changed)
|
||||||
|
new_tft = chrono::Utc::now().timestamp() + (new_tft - (now + wait_secs as i64));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Store the new Theoretical Finish Time with TTL to clean up Redis
|
||||||
|
let _: () = conn.set(&tft_key, new_tft).await?;
|
||||||
|
let _: () = conn.expire(&tft_key, max_debt_secs + 3600).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
let url = format!(
|
let url = format!(
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::super::whatsapp_queue::*;
|
use crate::core::bot::channels::whatsapp_queue::*;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
|
||||||
|
|
@ -20,40 +20,8 @@ mod tests {
|
||||||
assert!(result.is_ok());
|
assert!(result.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
// Note: test_per_recipient_rate_limit removed because we now allow per-recipient bursting.
|
||||||
async fn test_per_recipient_rate_limit() {
|
// Throttling is handled reactively via 131056 error code.
|
||||||
let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap());
|
|
||||||
|
|
||||||
let msg1 = QueuedWhatsAppMessage {
|
|
||||||
to: "+5511999999999".to_string(),
|
|
||||||
message: "Message 1".to_string(),
|
|
||||||
api_key: "test_key".to_string(),
|
|
||||||
phone_number_id: "123456".to_string(),
|
|
||||||
api_version: "v17.0".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let msg2 = QueuedWhatsAppMessage {
|
|
||||||
to: "+5511999999999".to_string(),
|
|
||||||
message: "Message 2".to_string(),
|
|
||||||
api_key: "test_key".to_string(),
|
|
||||||
phone_number_id: "123456".to_string(),
|
|
||||||
api_version: "v17.0".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
queue.enqueue(msg1).await.unwrap();
|
|
||||||
queue.enqueue(msg2).await.unwrap();
|
|
||||||
|
|
||||||
let start = std::time::Instant::now();
|
|
||||||
|
|
||||||
// Process both messages
|
|
||||||
let _ = queue.process_next().await;
|
|
||||||
let _ = queue.process_next().await;
|
|
||||||
|
|
||||||
let elapsed = start.elapsed();
|
|
||||||
|
|
||||||
// Should take at least 6 seconds due to Meta rate limiting
|
|
||||||
assert!(elapsed >= Duration::from_secs(6));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_different_recipients_no_delay() {
|
async fn test_different_recipients_no_delay() {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue