Implement Meta WhatsApp official rate limits: 1msg/6s, 45 burst, 4^X retry
Some checks failed
BotServer CI / build (push) Failing after 8m45s
Some checks failed
BotServer CI / build (push) Failing after 8m45s
- Add Redis-backed message queue with per-recipient tracking - Enforce 1 message per 6 seconds per recipient (0.17 msg/s) - Support burst mode: up to 45 messages in 6-second window - Implement proportional cooldown after burst - Add exponential backoff retry on error 131056 (4^X seconds) - Update botbook with official Meta rate limits - Add unit tests for burst mode and rate limiting - Fix config inheritance bug: delete all keys before sync
This commit is contained in:
parent
77c35ccde5
commit
d22ce019b6
6 changed files with 437 additions and 60 deletions
|
|
@ -2,6 +2,7 @@ pub mod instagram;
|
|||
pub mod teams;
|
||||
pub mod telegram;
|
||||
pub mod whatsapp;
|
||||
pub mod whatsapp_queue;
|
||||
pub mod whatsapp_rate_limiter;
|
||||
|
||||
use crate::core::shared::models::BotResponse;
|
||||
|
|
|
|||
|
|
@ -4,13 +4,14 @@ use serde::{Deserialize, Serialize};
|
|||
use uuid::Uuid;
|
||||
|
||||
use crate::core::bot::channels::ChannelAdapter;
|
||||
use crate::core::bot::channels::whatsapp_rate_limiter::WhatsAppRateLimiter;
|
||||
use crate::core::bot::channels::whatsapp_queue::{QueuedWhatsAppMessage, WhatsAppMessageQueue};
|
||||
use crate::core::config::ConfigManager;
|
||||
use crate::core::shared::models::BotResponse;
|
||||
use crate::core::shared::utils::DbPool;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Global rate limiter for WhatsApp API (shared across all adapters)
|
||||
static WHATSAPP_RATE_LIMITER: std::sync::OnceLock<WhatsAppRateLimiter> = std::sync::OnceLock::new();
|
||||
/// Global WhatsApp message queue (shared across all adapters)
|
||||
static WHATSAPP_QUEUE: std::sync::OnceLock<Arc<WhatsAppMessageQueue>> = std::sync::OnceLock::new();
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WhatsAppAdapter {
|
||||
|
|
@ -19,12 +20,12 @@ pub struct WhatsAppAdapter {
|
|||
webhook_verify_token: String,
|
||||
_business_account_id: String,
|
||||
api_version: String,
|
||||
rate_limiter: &'static WhatsAppRateLimiter,
|
||||
queue: &'static Arc<WhatsAppMessageQueue>,
|
||||
}
|
||||
|
||||
impl WhatsAppAdapter {
|
||||
pub fn new(pool: DbPool, bot_id: Uuid) -> Self {
|
||||
let config_manager = ConfigManager::new(pool);
|
||||
let config_manager = ConfigManager::new(pool.clone());
|
||||
|
||||
let api_key = config_manager
|
||||
.get_config(&bot_id, "whatsapp-api-key", None)
|
||||
|
|
@ -46,17 +47,10 @@ impl WhatsAppAdapter {
|
|||
.get_config(&bot_id, "whatsapp-api-version", Some("v17.0"))
|
||||
.unwrap_or_else(|_| "v17.0".to_string());
|
||||
|
||||
// Get rate limit tier from config (default to Tier 1 for safety)
|
||||
let tier_str = config_manager
|
||||
.get_config(&bot_id, "whatsapp-rate-tier", None)
|
||||
.unwrap_or_else(|_| "1".to_string());
|
||||
let tier = match tier_str.as_str() {
|
||||
"1" | "tier1" => super::whatsapp_rate_limiter::WhatsAppTier::Tier1,
|
||||
"2" | "tier2" => super::whatsapp_rate_limiter::WhatsAppTier::Tier2,
|
||||
"3" | "tier3" => super::whatsapp_rate_limiter::WhatsAppTier::Tier3,
|
||||
"4" | "tier4" => super::whatsapp_rate_limiter::WhatsAppTier::Tier4,
|
||||
_ => super::whatsapp_rate_limiter::WhatsAppTier::Tier1,
|
||||
};
|
||||
// Get Redis URL from config
|
||||
let redis_url = config_manager
|
||||
.get_config(&bot_id, "redis-url", Some("redis://127.0.0.1:6379"))
|
||||
.unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
|
||||
|
||||
Self {
|
||||
api_key,
|
||||
|
|
@ -64,8 +58,18 @@ impl WhatsAppAdapter {
|
|||
webhook_verify_token: verify_token,
|
||||
_business_account_id: business_account_id,
|
||||
api_version,
|
||||
rate_limiter: WHATSAPP_RATE_LIMITER.get_or_init(|| {
|
||||
super::whatsapp_rate_limiter::WhatsAppRateLimiter::from_tier(tier)
|
||||
queue: WHATSAPP_QUEUE.get_or_init(|| {
|
||||
let queue = WhatsAppMessageQueue::new(&redis_url)
|
||||
.unwrap_or_else(|e| {
|
||||
error!("Failed to create WhatsApp queue: {}", e);
|
||||
panic!("WhatsApp queue initialization failed");
|
||||
});
|
||||
let queue = Arc::new(queue);
|
||||
let worker_queue = Arc::clone(&queue);
|
||||
tokio::spawn(async move {
|
||||
worker_queue.start_worker().await;
|
||||
});
|
||||
queue
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
|
@ -134,43 +138,20 @@ impl WhatsAppAdapter {
|
|||
to: &str,
|
||||
message: &str,
|
||||
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Wait for rate limiter before making API call
|
||||
self.rate_limiter.acquire().await;
|
||||
// Enqueue message instead of sending directly
|
||||
let queued_msg = QueuedWhatsAppMessage {
|
||||
to: to.to_string(),
|
||||
message: message.to_string(),
|
||||
api_key: self.api_key.clone(),
|
||||
phone_number_id: self.phone_number_id.clone(),
|
||||
api_version: self.api_version.clone(),
|
||||
};
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
self.queue.enqueue(queued_msg).await
|
||||
.map_err(|e| format!("Failed to enqueue WhatsApp message: {}", e))?;
|
||||
|
||||
let url = format!(
|
||||
"https://graph.facebook.com/{}/{}/messages",
|
||||
self.api_version, self.phone_number_id
|
||||
);
|
||||
|
||||
let payload = serde_json::json!({
|
||||
"messaging_product": "whatsapp",
|
||||
"to": to,
|
||||
"type": "text",
|
||||
"text": {
|
||||
"body": message
|
||||
}
|
||||
});
|
||||
|
||||
let response = client
|
||||
.post(&url)
|
||||
.header("Authorization", format!("Bearer {}", self.api_key))
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let result: serde_json::Value = response.json().await?;
|
||||
Ok(result["messages"][0]["id"]
|
||||
.as_str()
|
||||
.unwrap_or("")
|
||||
.to_string())
|
||||
} else {
|
||||
let error_text = response.text().await?;
|
||||
Err(format!("WhatsApp API error: {}", error_text).into())
|
||||
}
|
||||
info!("WhatsApp message enqueued for {}: {}", to, &message.chars().take(50).collect::<String>());
|
||||
Ok("queued".to_string())
|
||||
}
|
||||
|
||||
pub async fn send_template_message(
|
||||
|
|
@ -668,11 +649,7 @@ impl ChannelAdapter for WhatsAppAdapter {
|
|||
"WhatsApp message part {}/{} sent to {}: {} (message_id: {})",
|
||||
i + 1, parts.len(), response.user_id, &part.chars().take(50).collect::<String>(), message_id
|
||||
);
|
||||
|
||||
// Use rate limiter to wait before sending next message
|
||||
if i < parts.len() - 1 {
|
||||
self.rate_limiter.acquire().await;
|
||||
}
|
||||
// Rate limiting is now handled inside send_whatsapp_message (per-recipient)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
233
src/core/bot/channels/whatsapp_queue.rs
Normal file
233
src/core/bot/channels/whatsapp_queue.rs
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
//! WhatsApp Message Queue
|
||||
//!
|
||||
//! Implements a Redis-backed queue for WhatsApp messages to enforce
|
||||
//! Meta's official rate limits: 1 message per 6 seconds per recipient (0.17 msg/sec).
|
||||
//!
|
||||
//! ## Meta WhatsApp Rate Limits (Official)
|
||||
//! - **Base Rate**: 1 message per 6 seconds per recipient (0.17 msg/sec)
|
||||
//! - **Burst Limit**: Up to 45 messages in 6 seconds (borrows from future quota)
|
||||
//! - **Post-Burst**: Must wait equivalent time at normal rate
|
||||
//! - **Error Code**: 131056 when rate limit exceeded
|
||||
//! - **Retry Strategy**: 4^X seconds (X starts at 0, increments on each failure)
|
||||
|
||||
use log::{error, info, warn};
|
||||
use redis::AsyncCommands;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct QueuedWhatsAppMessage {
|
||||
pub to: String,
|
||||
pub message: String,
|
||||
pub api_key: String,
|
||||
pub phone_number_id: String,
|
||||
pub api_version: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RecipientState {
|
||||
last_sent: Instant,
|
||||
burst_count: u32,
|
||||
burst_started: Option<Instant>,
|
||||
}
|
||||
|
||||
pub struct WhatsAppMessageQueue {
|
||||
redis_client: redis::Client,
|
||||
recipient_states: Arc<Mutex<HashMap<String, RecipientState>>>,
|
||||
}
|
||||
|
||||
impl WhatsAppMessageQueue {
|
||||
const QUEUE_KEY: &'static str = "whatsapp:message_queue";
|
||||
const MIN_DELAY: Duration = Duration::from_secs(6); // 1 msg per 6 seconds
|
||||
const BURST_WINDOW: Duration = Duration::from_secs(6);
|
||||
const MAX_BURST: u32 = 45;
|
||||
|
||||
pub fn new(redis_url: &str) -> Result<Self, redis::RedisError> {
|
||||
Ok(Self {
|
||||
redis_client: redis::Client::open(redis_url)?,
|
||||
recipient_states: Arc::new(Mutex::new(HashMap::new())),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn enqueue(&self, msg: QueuedWhatsAppMessage) -> Result<(), redis::RedisError> {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
let json = serde_json::to_string(&msg).map_err(|e| {
|
||||
redis::RedisError::from((redis::ErrorKind::TypeError, "JSON serialization failed", e.to_string()))
|
||||
})?;
|
||||
conn.rpush(Self::QUEUE_KEY, json).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_worker(self: Arc<Self>) {
|
||||
info!("WhatsApp queue worker started (Meta official rate: 1 msg/6s per recipient)");
|
||||
loop {
|
||||
if let Err(e) = self.process_next().await {
|
||||
error!("WhatsApp queue worker error: {}", e);
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_next(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
|
||||
let result: Option<String> = conn.blpop(Self::QUEUE_KEY, 5.0).await?;
|
||||
|
||||
if let Some(json) = result {
|
||||
let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?;
|
||||
|
||||
// Check and enforce rate limit for this recipient
|
||||
self.wait_for_rate_limit(&msg.to).await;
|
||||
|
||||
// Send with retry logic
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
match self.send_message(&msg).await {
|
||||
Ok(_) => {
|
||||
// Update recipient state
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
let state = states.entry(msg.to.clone()).or_insert(RecipientState {
|
||||
last_sent: Instant::now(),
|
||||
burst_count: 0,
|
||||
burst_started: None,
|
||||
});
|
||||
|
||||
// Track burst
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() < Self::BURST_WINDOW {
|
||||
state.burst_count += 1;
|
||||
} else {
|
||||
state.burst_count = 1;
|
||||
state.burst_started = Some(Instant::now());
|
||||
}
|
||||
} else {
|
||||
state.burst_count = 1;
|
||||
state.burst_started = Some(Instant::now());
|
||||
}
|
||||
|
||||
state.last_sent = Instant::now();
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
let error_str = e.to_string();
|
||||
if error_str.contains("131056") {
|
||||
// Rate limit error - exponential backoff
|
||||
let wait_secs = 4_u64.pow(retry_count);
|
||||
warn!("Rate limit hit for {}, retrying in {}s (attempt {})", msg.to, wait_secs, retry_count + 1);
|
||||
sleep(Duration::from_secs(wait_secs)).await;
|
||||
retry_count += 1;
|
||||
if retry_count > 5 {
|
||||
error!("Max retries exceeded for {}: {}", msg.to, e);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
error!("Failed to send WhatsApp message to {}: {}", msg.to, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_rate_limit(&self, recipient: &str) {
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
let state = states.entry(recipient.to_string()).or_insert(RecipientState {
|
||||
last_sent: Instant::now() - Self::MIN_DELAY,
|
||||
burst_count: 0,
|
||||
burst_started: None,
|
||||
});
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// Reset burst if window expired
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() >= Self::BURST_WINDOW {
|
||||
state.burst_count = 0;
|
||||
state.burst_started = None;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we can send in burst mode (within 6-second window)
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() < Self::BURST_WINDOW && state.burst_count < Self::MAX_BURST {
|
||||
// Can send immediately in burst mode
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if in burst cooldown
|
||||
if state.burst_count > 0 {
|
||||
let cooldown = Self::MIN_DELAY * state.burst_count;
|
||||
let elapsed = state.last_sent.elapsed();
|
||||
if elapsed < cooldown {
|
||||
let wait_time = cooldown - elapsed;
|
||||
warn!("Burst cooldown for {}: waiting {:?} (sent {} msgs)", recipient, wait_time, state.burst_count);
|
||||
drop(states);
|
||||
sleep(wait_time).await;
|
||||
return;
|
||||
} else {
|
||||
// Cooldown complete, reset burst
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
if let Some(s) = states.get_mut(recipient) {
|
||||
s.burst_count = 0;
|
||||
s.burst_started = None;
|
||||
}
|
||||
drop(states);
|
||||
}
|
||||
}
|
||||
|
||||
// Normal rate limit: 6 seconds between messages (if not in burst)
|
||||
let elapsed = state.last_sent.elapsed();
|
||||
if elapsed < Self::MIN_DELAY {
|
||||
let wait_time = Self::MIN_DELAY - elapsed;
|
||||
drop(states);
|
||||
sleep(wait_time).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!(
|
||||
"https://graph.facebook.com/{}/{}/messages",
|
||||
msg.api_version, msg.phone_number_id
|
||||
);
|
||||
|
||||
let payload = serde_json::json!({
|
||||
"messaging_product": "whatsapp",
|
||||
"to": msg.to,
|
||||
"type": "text",
|
||||
"text": {
|
||||
"body": msg.message
|
||||
}
|
||||
});
|
||||
|
||||
let response = client
|
||||
.post(&url)
|
||||
.header("Authorization", format!("Bearer {}", msg.api_key))
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let result: serde_json::Value = response.json().await?;
|
||||
let msg_id = result["messages"][0]["id"].as_str().unwrap_or("");
|
||||
info!("WhatsApp message sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::<String>(), msg_id);
|
||||
Ok(())
|
||||
} else {
|
||||
let error_text = response.text().await?;
|
||||
Err(format!("WhatsApp API error: {}", error_text).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "whatsapp_queue_tests.rs"]
|
||||
mod whatsapp_queue_tests;
|
||||
121
src/core/bot/channels/whatsapp_queue_tests.rs
Normal file
121
src/core/bot/channels/whatsapp_queue_tests.rs
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::whatsapp_queue::*;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_queue_enqueue() {
|
||||
let queue = WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap();
|
||||
|
||||
let msg = QueuedWhatsAppMessage {
|
||||
to: "+5511999999999".to_string(),
|
||||
message: "Test message".to_string(),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
|
||||
let result = queue.enqueue(msg).await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_per_recipient_rate_limit() {
|
||||
let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap());
|
||||
|
||||
let msg1 = QueuedWhatsAppMessage {
|
||||
to: "+5511999999999".to_string(),
|
||||
message: "Message 1".to_string(),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
|
||||
let msg2 = QueuedWhatsAppMessage {
|
||||
to: "+5511999999999".to_string(),
|
||||
message: "Message 2".to_string(),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
|
||||
queue.enqueue(msg1).await.unwrap();
|
||||
queue.enqueue(msg2).await.unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Process both messages
|
||||
let _ = queue.process_next().await;
|
||||
let _ = queue.process_next().await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// Should take at least 6 seconds due to Meta rate limiting
|
||||
assert!(elapsed >= Duration::from_secs(6));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_different_recipients_no_delay() {
|
||||
let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap());
|
||||
|
||||
let msg1 = QueuedWhatsAppMessage {
|
||||
to: "+5511999999999".to_string(),
|
||||
message: "Message 1".to_string(),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
|
||||
let msg2 = QueuedWhatsAppMessage {
|
||||
to: "+5511888888888".to_string(),
|
||||
message: "Message 2".to_string(),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
|
||||
queue.enqueue(msg1).await.unwrap();
|
||||
queue.enqueue(msg2).await.unwrap();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Process both messages (different recipients)
|
||||
let _ = queue.process_next().await;
|
||||
let _ = queue.process_next().await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// Should be fast since different recipients
|
||||
assert!(elapsed < Duration::from_millis(500));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_burst_mode_within_window() {
|
||||
let queue = Arc::new(WhatsAppMessageQueue::new("redis://127.0.0.1:6379").unwrap());
|
||||
|
||||
// Send 3 messages to same recipient in quick succession
|
||||
for i in 1..=3 {
|
||||
let msg = QueuedWhatsAppMessage {
|
||||
to: "+5511999999999".to_string(),
|
||||
message: format!("Burst message {}", i),
|
||||
api_key: "test_key".to_string(),
|
||||
phone_number_id: "123456".to_string(),
|
||||
api_version: "v17.0".to_string(),
|
||||
};
|
||||
queue.enqueue(msg).await.unwrap();
|
||||
}
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Process all 3 messages
|
||||
let _ = queue.process_next().await;
|
||||
let _ = queue.process_next().await;
|
||||
let _ = queue.process_next().await;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// Should complete in burst mode (< 6 seconds, not 18 seconds)
|
||||
assert!(elapsed < Duration::from_secs(6));
|
||||
}
|
||||
}
|
||||
|
|
@ -19,9 +19,11 @@ use governor::{
|
|||
state::{InMemoryState, NotKeyed},
|
||||
Quota, RateLimiter,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
|
||||
/// WhatsApp throughput tier levels (matches Meta's tiers)
|
||||
|
|
@ -146,11 +148,13 @@ type Limiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware
|
|||
///
|
||||
/// Uses token bucket algorithm via governor crate.
|
||||
/// Thread-safe and async-friendly.
|
||||
/// Implements per-recipient rate limiting (1 msg/sec per phone number).
|
||||
#[derive(Debug)]
|
||||
pub struct WhatsAppRateLimiter {
|
||||
limiter: Arc<Limiter>,
|
||||
config: WhatsAppRateLimitConfig,
|
||||
min_delay: Duration,
|
||||
per_recipient_limiters: Arc<Mutex<HashMap<String, Arc<Limiter>>>>,
|
||||
}
|
||||
|
||||
impl WhatsAppRateLimiter {
|
||||
|
|
@ -181,6 +185,7 @@ impl WhatsAppRateLimiter {
|
|||
limiter: Arc::new(RateLimiter::direct(quota)),
|
||||
config,
|
||||
min_delay,
|
||||
per_recipient_limiters: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -212,6 +217,38 @@ impl WhatsAppRateLimiter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Wait until a message can be sent to a specific recipient (async)
|
||||
///
|
||||
/// Enforces 1 message per second per phone number (Meta requirement).
|
||||
pub async fn acquire_for_recipient(&self, phone_number: &str) {
|
||||
if !self.config.enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get or create per-recipient limiter (1 msg/sec)
|
||||
let recipient_limiter = {
|
||||
let mut limiters = self.per_recipient_limiters.lock().await;
|
||||
limiters
|
||||
.entry(phone_number.to_string())
|
||||
.or_insert_with(|| {
|
||||
let quota = Quota::per_second(NonZeroU32::new(1).unwrap());
|
||||
Arc::new(RateLimiter::direct(quota))
|
||||
})
|
||||
.clone()
|
||||
};
|
||||
|
||||
// Wait for recipient-specific rate limit
|
||||
loop {
|
||||
if recipient_limiter.check().is_ok() {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Also wait for global rate limit
|
||||
self.acquire().await;
|
||||
}
|
||||
|
||||
/// Try to acquire with timeout
|
||||
///
|
||||
/// Returns true if acquired, false if timed out
|
||||
|
|
@ -262,6 +299,7 @@ impl Clone for WhatsAppRateLimiter {
|
|||
limiter: Arc::clone(&self.limiter),
|
||||
config: self.config.clone(),
|
||||
min_delay: self.min_delay,
|
||||
per_recipient_limiters: Arc::clone(&self.per_recipient_limiters),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -473,6 +473,13 @@ impl ConfigManager {
|
|||
let mut conn = self
|
||||
.get_conn()
|
||||
.map_err(|e| format!("Failed to acquire connection: {}", e))?;
|
||||
|
||||
// Delete all existing config keys for this bot to ensure clean sync
|
||||
diesel::sql_query("DELETE FROM bot_configuration WHERE bot_id = $1")
|
||||
.bind::<diesel::sql_types::Uuid, _>(bot_id)
|
||||
.execute(&mut conn)
|
||||
.map_err(|e| format!("Failed to delete existing config: {}", e))?;
|
||||
|
||||
let mut updated = 0;
|
||||
for line in content.lines().skip(1) {
|
||||
let parts: Vec<&str> = line.split(',').collect();
|
||||
|
|
@ -480,13 +487,13 @@ impl ConfigManager {
|
|||
let key = parts[0].trim();
|
||||
let value = parts[1].trim();
|
||||
let new_id: uuid::Uuid = uuid::Uuid::new_v4();
|
||||
diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES ($1, $2, $3, $4, 'string') ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = EXCLUDED.config_value, updated_at = NOW()")
|
||||
diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES ($1, $2, $3, $4, 'string')")
|
||||
.bind::<diesel::sql_types::Uuid, _>(new_id)
|
||||
.bind::<diesel::sql_types::Uuid, _>(bot_id)
|
||||
.bind::<diesel::sql_types::Text, _>(key)
|
||||
.bind::<diesel::sql_types::Text, _>(value)
|
||||
.execute(&mut conn)
|
||||
.map_err(|e| format!("Failed to update config: {}", e))?;
|
||||
.map_err(|e| format!("Failed to insert config: {}", e))?;
|
||||
updated += 1;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue