Use Redis to track last sent time per WhatsApp recipient
All checks were successful
BotServer CI / build (push) Successful in 13m37s
All checks were successful
BotServer CI / build (push) Successful in 13m37s
- Store last_sent timestamp in Redis (whatsapp:last_sent:<phone>) - Always wait 6 seconds between messages to same recipient - Persists across restarts
This commit is contained in:
parent
4bda4ba897
commit
c523cee177
2 changed files with 49 additions and 108 deletions
|
|
@ -1,22 +1,13 @@
|
|||
//! WhatsApp Message Queue
|
||||
//!
|
||||
//! Implements a Redis-backed queue for WhatsApp messages to enforce
|
||||
//! Meta's official rate limits: 1 message per 6 seconds per recipient (0.17 msg/sec).
|
||||
//!
|
||||
//! ## Meta WhatsApp Rate Limits (Official)
|
||||
//! - **Base Rate**: 1 message per 6 seconds per recipient (0.17 msg/sec)
|
||||
//! - **Burst Limit**: Up to 45 messages in 6 seconds (borrows from future quota)
|
||||
//! - **Post-Burst**: Must wait equivalent time at normal rate
|
||||
//! - **Error Code**: 131056 when rate limit exceeded
|
||||
//! - **Retry Strategy**: 4^X seconds (X starts at 0, increments on each failure)
|
||||
//! Meta's official rate limits: 1 message per 6 seconds per recipient.
|
||||
|
||||
use log::{error, info, warn};
|
||||
use redis::AsyncCommands;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -28,29 +19,19 @@ pub struct QueuedWhatsAppMessage {
|
|||
pub api_version: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RecipientState {
|
||||
last_sent: Instant,
|
||||
burst_count: u32,
|
||||
burst_started: Option<Instant>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WhatsAppMessageQueue {
|
||||
redis_client: redis::Client,
|
||||
recipient_states: Arc<Mutex<HashMap<String, RecipientState>>>,
|
||||
}
|
||||
|
||||
impl WhatsAppMessageQueue {
|
||||
const QUEUE_KEY: &'static str = "whatsapp:message_queue";
|
||||
const MIN_DELAY: Duration = Duration::from_secs(6); // 1 msg per 6 seconds
|
||||
const BURST_WINDOW: Duration = Duration::from_secs(6);
|
||||
const MAX_BURST: u32 = 45;
|
||||
const LAST_SENT_PREFIX: &'static str = "whatsapp:last_sent:";
|
||||
const MIN_DELAY_SECS: i64 = 6;
|
||||
|
||||
pub fn new(redis_url: &str) -> Result<Self, redis::RedisError> {
|
||||
Ok(Self {
|
||||
redis_client: redis::Client::open(redis_url)?,
|
||||
recipient_states: Arc::new(Mutex::new(HashMap::new())),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -64,7 +45,7 @@ impl WhatsAppMessageQueue {
|
|||
}
|
||||
|
||||
pub async fn start_worker(self: Arc<Self>) {
|
||||
info!("WhatsApp queue worker started (Meta official rate: 1 msg/6s per recipient)");
|
||||
info!("WhatsApp queue worker started (Meta rate: 1 msg/6s per recipient)");
|
||||
loop {
|
||||
if let Err(e) = self.process_next().await {
|
||||
error!("WhatsApp queue worker error: {}", e);
|
||||
|
|
@ -76,58 +57,38 @@ impl WhatsAppMessageQueue {
|
|||
async fn process_next(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
|
||||
// BLPOP returns (key, value) tuple
|
||||
let result: Option<(String, String)> = conn.blpop(Self::QUEUE_KEY, 5.0).await?;
|
||||
|
||||
if let Some((_key, json)) = result {
|
||||
let msg: QueuedWhatsAppMessage = serde_json::from_str(&json)?;
|
||||
|
||||
// Check and enforce rate limit for this recipient
|
||||
self.wait_for_rate_limit(&msg.to).await;
|
||||
// Wait for rate limit (stored in Redis)
|
||||
self.wait_for_rate_limit(&msg.to, &mut conn).await?;
|
||||
|
||||
// Send with retry logic
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
match self.send_message(&msg).await {
|
||||
Ok(_) => {
|
||||
// Update recipient state
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
let state = states.entry(msg.to.clone()).or_insert(RecipientState {
|
||||
last_sent: Instant::now(),
|
||||
burst_count: 0,
|
||||
burst_started: None,
|
||||
});
|
||||
|
||||
// Track burst
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() < Self::BURST_WINDOW {
|
||||
state.burst_count += 1;
|
||||
} else {
|
||||
state.burst_count = 1;
|
||||
state.burst_started = Some(Instant::now());
|
||||
}
|
||||
} else {
|
||||
state.burst_count = 1;
|
||||
state.burst_started = Some(Instant::now());
|
||||
}
|
||||
|
||||
state.last_sent = Instant::now();
|
||||
// Update last sent time in Redis
|
||||
let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, msg.to);
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let _: () = conn.set(last_sent_key, now).await?;
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
let error_str = e.to_string();
|
||||
if error_str.contains("131056") {
|
||||
// Rate limit error - exponential backoff
|
||||
let wait_secs = 4_u64.pow(retry_count);
|
||||
warn!("Rate limit hit for {}, retrying in {}s (attempt {})", msg.to, wait_secs, retry_count + 1);
|
||||
sleep(Duration::from_secs(wait_secs)).await;
|
||||
let wait_secs = 4_i64.pow(retry_count);
|
||||
warn!("Rate limit hit for {}, retrying in {}s", msg.to, wait_secs);
|
||||
sleep(Duration::from_secs(wait_secs as u64)).await;
|
||||
retry_count += 1;
|
||||
if retry_count > 5 {
|
||||
error!("Max retries exceeded for {}: {}", msg.to, e);
|
||||
error!("Max retries for {}: {}", msg.to, e);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
error!("Failed to send WhatsApp message to {}: {}", msg.to, e);
|
||||
error!("Failed to send to {}: {}", msg.to, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -138,58 +99,24 @@ impl WhatsAppMessageQueue {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_rate_limit(&self, recipient: &str) {
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
let state = states.entry(recipient.to_string()).or_insert(RecipientState {
|
||||
last_sent: Instant::now() - Self::MIN_DELAY,
|
||||
burst_count: 0,
|
||||
burst_started: None,
|
||||
});
|
||||
async fn wait_for_rate_limit(&self, recipient: &str, conn: &mut redis::aio::MultiplexedConnection) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let last_sent_key = format!("{}{}", Self::LAST_SENT_PREFIX, recipient);
|
||||
|
||||
// Reset burst if window expired
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() >= Self::BURST_WINDOW {
|
||||
state.burst_count = 0;
|
||||
state.burst_started = None;
|
||||
// Get last sent time from Redis
|
||||
let last_sent: Option<i64> = conn.get(&last_sent_key).await?;
|
||||
|
||||
if let Some(last_ts) = last_sent {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let since_last = now - last_ts;
|
||||
|
||||
if since_last < Self::MIN_DELAY_SECS {
|
||||
let wait_time = Self::MIN_DELAY_SECS - since_last;
|
||||
warn!("Rate limiting {}: waiting {}s", recipient, wait_time);
|
||||
sleep(Duration::from_secs(wait_time as u64)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we can send in burst mode (within 6-second window)
|
||||
if let Some(burst_start) = state.burst_started {
|
||||
if burst_start.elapsed() < Self::BURST_WINDOW && state.burst_count < Self::MAX_BURST {
|
||||
// Can send immediately in burst mode
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if in burst cooldown
|
||||
if state.burst_count > 0 {
|
||||
let cooldown = Self::MIN_DELAY * state.burst_count;
|
||||
let elapsed = state.last_sent.elapsed();
|
||||
if elapsed < cooldown {
|
||||
let wait_time = cooldown - elapsed;
|
||||
warn!("Burst cooldown for {}: waiting {:?} (sent {} msgs)", recipient, wait_time, state.burst_count);
|
||||
drop(states);
|
||||
sleep(wait_time).await;
|
||||
return;
|
||||
} else {
|
||||
// Cooldown complete, reset burst
|
||||
let mut states = self.recipient_states.lock().await;
|
||||
if let Some(s) = states.get_mut(recipient) {
|
||||
s.burst_count = 0;
|
||||
s.burst_started = None;
|
||||
}
|
||||
drop(states);
|
||||
}
|
||||
}
|
||||
|
||||
// Normal rate limit: 6 seconds between messages (if not in burst)
|
||||
let elapsed = state.last_sent.elapsed();
|
||||
if elapsed < Self::MIN_DELAY {
|
||||
let wait_time = Self::MIN_DELAY - elapsed;
|
||||
drop(states);
|
||||
sleep(wait_time).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_message(&self, msg: &QueuedWhatsAppMessage) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
|
|
@ -219,7 +146,7 @@ impl WhatsAppMessageQueue {
|
|||
if response.status().is_success() {
|
||||
let result: serde_json::Value = response.json().await?;
|
||||
let msg_id = result["messages"][0]["id"].as_str().unwrap_or("");
|
||||
info!("WhatsApp message sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::<String>(), msg_id);
|
||||
info!("WhatsApp sent to {}: {} (id: {})", msg.to, &msg.message.chars().take(50).collect::<String>(), msg_id);
|
||||
Ok(())
|
||||
} else {
|
||||
let error_text = response.text().await?;
|
||||
|
|
|
|||
|
|
@ -49,7 +49,21 @@ impl HallucinationDetector {
|
|||
|
||||
/// Check if a pattern is hallucinating (repeating 50+ times)
|
||||
pub async fn check(&self, pattern: &str) -> bool {
|
||||
if pattern.trim().is_empty() || pattern.len() < 3 {
|
||||
let trimmed = pattern.trim();
|
||||
|
||||
// Ignore short patterns
|
||||
if trimmed.is_empty() || trimmed.len() < 3 {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Ignore Markdown formatting patterns
|
||||
let md_patterns = ["**", "__", "*", "_", "`", "~~", "---", "***"];
|
||||
if md_patterns.iter().any(|p| trimmed == *p) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Ignore patterns that are just Markdown formatting (e.g., " **", "* ", "__")
|
||||
if trimmed.chars().all(|c| c == '*' || c == '_' || c == '`' || c == '~' || c == '-') {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -60,11 +74,11 @@ impl HallucinationDetector {
|
|||
counts.retain(|_, (_, time)| now.duration_since(*time) < self.config.window);
|
||||
|
||||
// Increment count for this pattern
|
||||
let (count, _) = counts.entry(pattern.to_string()).or_insert((0, now));
|
||||
let (count, _) = counts.entry(trimmed.to_string()).or_insert((0, now));
|
||||
*count += 1;
|
||||
|
||||
if *count >= self.config.threshold {
|
||||
warn!("Hallucination detected: pattern {:?} repeated {} times", pattern, count);
|
||||
warn!("Hallucination detected: pattern {:?} repeated {} times", trimmed, count);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue