2024-12-25 16:25:09 -03:00
|
|
|
use gb_core::Result;
|
|
|
|
|
|
|
|
use tracing::{error, instrument};
|
|
|
|
use std::sync::Arc;
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
use std::collections::HashMap;
|
2024-12-22 20:56:52 -03:00
|
|
|
|
2024-12-25 16:25:09 -03:00
|
|
|
use crate::MessageEnvelope;
|
2024-12-22 20:56:52 -03:00
|
|
|
|
|
|
|
pub struct MessageProcessor {
|
2024-12-25 16:25:09 -03:00
|
|
|
tx: broadcast::Sender<MessageEnvelope>,
|
|
|
|
rx: broadcast::Receiver<MessageEnvelope>,
|
|
|
|
handlers: Arc<HashMap<String, Box<dyn Fn(MessageEnvelope) -> Result<()> + Send + Sync>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Clone for MessageProcessor {
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
MessageProcessor {
|
|
|
|
tx: self.tx.clone(),
|
|
|
|
rx: self.tx.subscribe(),
|
|
|
|
handlers: Arc::clone(&self.handlers),
|
|
|
|
}
|
|
|
|
}
|
2024-12-22 20:56:52 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl MessageProcessor {
|
2024-12-25 16:25:09 -03:00
|
|
|
pub fn new() -> Self {
|
|
|
|
Self::new_with_buffer_size(100)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn new_with_buffer_size(buffer_size: usize) -> Self {
|
|
|
|
let (tx, rx) = broadcast::channel(buffer_size);
|
2024-12-22 20:56:52 -03:00
|
|
|
Self {
|
|
|
|
tx,
|
|
|
|
rx,
|
2024-12-25 16:25:09 -03:00
|
|
|
handlers: Arc::new(HashMap::new()),
|
2024-12-22 20:56:52 -03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-12-25 16:25:09 -03:00
|
|
|
pub fn sender(&self) -> broadcast::Sender<MessageEnvelope> {
|
2024-12-22 20:56:52 -03:00
|
|
|
self.tx.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
#[instrument(skip(self, handler))]
|
2024-12-25 16:25:09 -03:00
|
|
|
pub fn register_handler<F>(&mut self, kind: &str, handler: F)
|
2024-12-22 20:56:52 -03:00
|
|
|
where
|
|
|
|
F: Fn(MessageEnvelope) -> Result<()> + Send + Sync + 'static,
|
|
|
|
{
|
2024-12-25 16:25:09 -03:00
|
|
|
Arc::get_mut(&mut self.handlers)
|
|
|
|
.expect("Cannot modify handlers")
|
|
|
|
.insert(kind.to_string(), Box::new(handler));
|
2024-12-22 20:56:52 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
#[instrument(skip(self))]
|
|
|
|
pub async fn process_messages(&mut self) -> Result<()> {
|
2024-12-25 16:25:09 -03:00
|
|
|
while let Ok(envelope) = self.rx.recv().await {
|
2024-12-22 20:56:52 -03:00
|
|
|
if let Some(handler) = self.handlers.get(&envelope.message.kind) {
|
|
|
|
if let Err(e) = handler(envelope.clone()) {
|
|
|
|
error!("Handler error for message {}: {}", envelope.id, e);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
error!("No handler registered for message kind: {}", envelope.message.kind);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2024-12-25 16:25:09 -03:00
|
|
|
use gb_core::models::Message;
|
2024-12-22 20:56:52 -03:00
|
|
|
use rstest::*;
|
2024-12-26 10:09:39 -03:00
|
|
|
use uuid::Uuid;
|
2024-12-25 16:25:09 -03:00
|
|
|
use std::{sync::Arc, time::Duration};
|
2024-12-22 20:56:52 -03:00
|
|
|
use tokio::sync::Mutex;
|
|
|
|
|
|
|
|
#[fixture]
|
|
|
|
fn test_message() -> Message {
|
|
|
|
Message {
|
|
|
|
id: Uuid::new_v4(),
|
|
|
|
customer_id: Uuid::new_v4(),
|
|
|
|
instance_id: Uuid::new_v4(),
|
|
|
|
conversation_id: Uuid::new_v4(),
|
|
|
|
sender_id: Uuid::new_v4(),
|
|
|
|
kind: "test".to_string(),
|
|
|
|
content: "test content".to_string(),
|
|
|
|
metadata: serde_json::Value::Object(serde_json::Map::new()),
|
|
|
|
created_at: chrono::Utc::now(),
|
|
|
|
shard_key: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[rstest]
|
|
|
|
#[tokio::test]
|
|
|
|
async fn test_message_processor(test_message: Message) {
|
2024-12-25 16:25:09 -03:00
|
|
|
let mut processor = MessageProcessor::new();
|
2024-12-22 20:56:52 -03:00
|
|
|
let processed = Arc::new(Mutex::new(false));
|
|
|
|
let processed_clone = processed.clone();
|
|
|
|
|
|
|
|
processor.register_handler("test", move |envelope| {
|
|
|
|
assert_eq!(envelope.message.content, "test content");
|
|
|
|
let mut processed = processed_clone.blocking_lock();
|
|
|
|
*processed = true;
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
|
|
|
|
let mut processor_clone = processor.clone();
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
processor_clone.process_messages().await.unwrap();
|
|
|
|
});
|
|
|
|
|
|
|
|
let envelope = MessageEnvelope {
|
|
|
|
id: Uuid::new_v4(),
|
|
|
|
message: test_message,
|
|
|
|
metadata: HashMap::new(),
|
|
|
|
};
|
|
|
|
|
2024-12-25 16:25:09 -03:00
|
|
|
processor.sender().send(envelope).unwrap();
|
2024-12-22 20:56:52 -03:00
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
|
|
|
|
assert!(*processed.lock().await);
|
|
|
|
|
|
|
|
handle.abort();
|
|
|
|
}
|
|
|
|
}
|