gbserver/gb-messaging/src/processor.rs

151 lines
4.5 KiB
Rust
Raw Normal View History

2024-12-26 16:16:03 -03:00
use gb_core::{Result, models::*}; // This will import both Message and MessageId
2024-12-25 16:25:09 -03:00
2024-12-26 16:16:03 -03:00
use gb_core::Error;
use uuid::Uuid;
2024-12-25 16:25:09 -03:00
use std::collections::HashMap;
2024-12-26 16:16:03 -03:00
use tracing::instrument;
2024-12-25 16:25:09 -03:00
use crate::MessageEnvelope;
2024-12-26 16:16:03 -03:00
use tokio::sync::broadcast; // Add this import
use std::sync::Arc;
use tracing::{error, info}; // Add error and info macros here
2024-12-22 20:56:52 -03:00
pub struct MessageProcessor {
2024-12-25 16:25:09 -03:00
tx: broadcast::Sender<MessageEnvelope>,
rx: broadcast::Receiver<MessageEnvelope>,
handlers: Arc<HashMap<String, Box<dyn Fn(MessageEnvelope) -> Result<()> + Send + Sync>>>,
}
impl Clone for MessageProcessor {
fn clone(&self) -> Self {
MessageProcessor {
tx: self.tx.clone(),
rx: self.tx.subscribe(),
handlers: Arc::clone(&self.handlers),
}
}
2024-12-22 20:56:52 -03:00
}
impl MessageProcessor {
2024-12-25 16:25:09 -03:00
pub fn new() -> Self {
Self::new_with_buffer_size(100)
}
pub fn new_with_buffer_size(buffer_size: usize) -> Self {
let (tx, rx) = broadcast::channel(buffer_size);
2024-12-22 20:56:52 -03:00
Self {
tx,
rx,
2024-12-25 16:25:09 -03:00
handlers: Arc::new(HashMap::new()),
2024-12-22 20:56:52 -03:00
}
}
2024-12-25 16:25:09 -03:00
pub fn sender(&self) -> broadcast::Sender<MessageEnvelope> {
2024-12-22 20:56:52 -03:00
self.tx.clone()
}
#[instrument(skip(self, handler))]
2024-12-25 16:25:09 -03:00
pub fn register_handler<F>(&mut self, kind: &str, handler: F)
2024-12-22 20:56:52 -03:00
where
F: Fn(MessageEnvelope) -> Result<()> + Send + Sync + 'static,
{
2024-12-25 16:25:09 -03:00
Arc::get_mut(&mut self.handlers)
.expect("Cannot modify handlers")
.insert(kind.to_string(), Box::new(handler));
2024-12-22 20:56:52 -03:00
}
2024-12-26 16:16:03 -03:00
#[instrument(skip(self))]
pub async fn add_message(&mut self, message: Message) -> Result<MessageId> {
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message,
metadata: HashMap::new(),
};
self.tx.send(envelope.clone())
.map_err(|e| Error::internal(format!("Failed to queue message: {}", e)))?;
// Start processing immediately
if let Some(handler) = self.handlers.get(&envelope.message.kind) {
handler(envelope.clone())
.map_err(|e| Error::internal(format!("Handler error: {}", e)))?;
}
Ok(MessageId(envelope.id))
}
2024-12-22 20:56:52 -03:00
#[instrument(skip(self))]
pub async fn process_messages(&mut self) -> Result<()> {
2024-12-25 16:25:09 -03:00
while let Ok(envelope) = self.rx.recv().await {
2024-12-22 20:56:52 -03:00
if let Some(handler) = self.handlers.get(&envelope.message.kind) {
if let Err(e) = handler(envelope.clone()) {
error!("Handler error for message {}: {}", envelope.id, e);
}
2024-12-26 16:16:03 -03:00
tracing::info!("Processing message: {:?}", &envelope.message.id);
2024-12-22 20:56:52 -03:00
} else {
error!("No handler registered for message kind: {}", envelope.message.kind);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
2024-12-25 16:25:09 -03:00
use gb_core::models::Message;
2024-12-22 20:56:52 -03:00
use rstest::*;
2024-12-26 10:09:39 -03:00
use uuid::Uuid;
2024-12-25 16:25:09 -03:00
use std::{sync::Arc, time::Duration};
2024-12-22 20:56:52 -03:00
use tokio::sync::Mutex;
#[fixture]
fn test_message() -> Message {
Message {
id: Uuid::new_v4(),
customer_id: Uuid::new_v4(),
instance_id: Uuid::new_v4(),
conversation_id: Uuid::new_v4(),
sender_id: Uuid::new_v4(),
kind: "test".to_string(),
content: "test content".to_string(),
metadata: serde_json::Value::Object(serde_json::Map::new()),
created_at: chrono::Utc::now(),
shard_key: 0,
}
}
#[rstest]
#[tokio::test]
async fn test_message_processor(test_message: Message) {
2024-12-25 16:25:09 -03:00
let mut processor = MessageProcessor::new();
2024-12-22 20:56:52 -03:00
let processed = Arc::new(Mutex::new(false));
let processed_clone = processed.clone();
processor.register_handler("test", move |envelope| {
assert_eq!(envelope.message.content, "test content");
let mut processed = processed_clone.blocking_lock();
*processed = true;
Ok(())
});
let mut processor_clone = processor.clone();
let handle = tokio::spawn(async move {
processor_clone.process_messages().await.unwrap();
});
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message: test_message,
metadata: HashMap::new(),
};
2024-12-25 16:25:09 -03:00
processor.sender().send(envelope).unwrap();
2024-12-22 20:56:52 -03:00
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(*processed.lock().await);
handle.abort();
}
}