new(all): Initial import.

This commit is contained in:
Rodrigo Rodriguez 2024-12-23 00:20:59 -03:00
parent 9dd2950c25
commit 8aa9a4abe1
34 changed files with 728 additions and 1616 deletions

876
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
[workspace] [workspace]
resolver="2"
members = [ members = [
"gb-core", # Core domain models and traits "gb-core", # Core domain models and traits
"gb-api", # API layer and server implementation "gb-api", # API layer and server implementation
@ -12,7 +13,6 @@ members = [
#"gb-cloud", # Cloud provider integrations #"gb-cloud", # Cloud provider integrations
#"gb-vm", # Virtual machine and BASIC compiler #"gb-vm", # Virtual machine and BASIC compiler
"gb-automation", # Web and process automation "gb-automation", # Web and process automation
"gb-nlp", # Natural language processing
"gb-image", # Image processing capabilities "gb-image", # Image processing capabilities
] ]
@ -72,7 +72,7 @@ azure_identity = "0.15"
google-cloud-storage = "0.16" google-cloud-storage = "0.16"
# Monitoring and metrics # Monitoring and metrics
prometheus = "0.13" prometheus = "0.13.0"
opentelemetry = { version = "0.20", features = ["rt-tokio"] } opentelemetry = { version = "0.20", features = ["rt-tokio"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -24,4 +24,4 @@ pub async fn logout() -> Result<()> {
pub async fn refresh_token() -> Result<Json<LoginResponse>> { pub async fn refresh_token() -> Result<Json<LoginResponse>> {
todo!() todo!()
} }

View file

@ -0,0 +1,3 @@
mod auth_handler;
pub use auth_handler::*;

View file

@ -28,4 +28,4 @@ where
// Implement token validation and user extraction // Implement token validation and user extraction
todo!() todo!()
} }
} }

View file

@ -0,0 +1,3 @@
mod auth_middleware;
pub use auth_middleware::*;

View file

@ -0,0 +1,3 @@
mod user;
pub use user::*;

View file

@ -42,4 +42,4 @@ pub struct LoginResponse {
pub refresh_token: String, pub refresh_token: String,
pub token_type: String, pub token_type: String,
pub expires_in: i64, pub expires_in: i64,
} }

View file

@ -1,14 +1,16 @@
use std::sync::Arc;
use sqlx::PgPool;
use argon2::{
password_hash::{PasswordHash, PasswordHasher, SaltString},
Argon2,
};
use rand::rngs::OsRng;
use crate::{ use crate::{
models::{LoginRequest, LoginResponse, User}, models::{LoginRequest, LoginResponse, User},
Result, AuthError, AuthError,
Result,
}; };
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHash, PasswordHasher, PasswordVerifier,
};
use jsonwebtoken::{encode, EncodingKey, Header};
use sqlx::PgPool;
use std::sync::Arc;
pub struct AuthService { pub struct AuthService {
db: Arc<PgPool>, db: Arc<PgPool>,
@ -70,4 +72,4 @@ impl AuthService {
// Token generation implementation // Token generation implementation
Ok("token".to_string()) Ok("token".to_string())
} }
} }

View file

@ -0,0 +1,3 @@
pub mod auth_service;
pub use auth_service::*;

1
gb-auth/src/utils/mod.rs Normal file
View file

@ -0,0 +1 @@
// Auth utilities module

View file

@ -6,6 +6,7 @@ authors.workspace = true
license.workspace = true license.workspace = true
[dependencies] [dependencies]
tokio-tungstenite = "0.18"
async-trait.workspace = true async-trait.workspace = true
serde.workspace = true serde.workspace = true
uuid.workspace = true uuid.workspace = true

View file

@ -1,54 +1,82 @@
use thiserror::Error; use thiserror::Error;
use redis::RedisError;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum ErrorKind {
#[error("Database error: {0}")] #[error("Database error: {0}")]
Database(#[from] sqlx::Error), Database(String),
#[error("Redis error: {0}")] #[error("Redis error: {0}")]
Redis(#[from] redis::RedisError), Redis(String),
#[error("Kafka error: {0}")] #[error("Kafka error: {0}")]
Kafka(String), Kafka(String),
#[error("WebRTC error: {0}")]
WebRTC(String),
#[error("Invalid input: {0}")] #[error("Invalid input: {0}")]
InvalidInput(String), InvalidInput(String),
#[error("Not found: {0}")] #[error("Not found: {0}")]
NotFound(String), NotFound(String),
#[error("Unauthorized: {0}")] #[error("Authentication error: {0}")]
Unauthorized(String), Authentication(String),
#[error("Rate limited: {0}")] #[error("Authorization error: {0}")]
RateLimited(String), Authorization(String),
#[error("Resource quota exceeded: {0}")]
QuotaExceeded(String),
#[error("Internal error: {0}")] #[error("Internal error: {0}")]
Internal(String), Internal(String),
#[error("External service error: {0}")]
ExternalService(String),
#[error("WebSocket error: {0}")]
WebSocket(String),
#[error("Messaging error: {0}")]
Messaging(String),
} }
pub type Result<T> = std::result::Result<T, Error>; #[derive(Debug)]
pub struct Error {
pub kind: ErrorKind,
pub message: String,
}
#[cfg(test)] impl Error {
mod tests { pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
use super::*; Self {
kind,
message: message.into(),
}
}
#[test] pub fn internal<T: std::fmt::Display>(msg: T) -> Self {
fn test_error_display() { Self::new(ErrorKind::Internal(msg.to_string()), msg.to_string())
let err = Error::NotFound("User".to_string()); }
assert_eq!(err.to_string(), "Not found: User");
let err = Error::Unauthorized("Invalid token".to_string()); pub fn redis<T: std::fmt::Display>(msg: T) -> Self {
assert_eq!(err.to_string(), "Unauthorized: Invalid token"); Self::new(ErrorKind::Redis(msg.to_string()), msg.to_string())
}
let err = Error::QuotaExceeded("Max instances reached".to_string()); pub fn kafka<T: std::fmt::Display>(msg: T) -> Self {
assert_eq!(err.to_string(), "Resource quota exceeded: Max instances reached"); Self::new(ErrorKind::Kafka(msg.to_string()), msg.to_string())
}
pub fn database<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::Database(msg.to_string()), msg.to_string())
}
pub fn websocket<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::WebSocket(msg.to_string()), msg.to_string())
} }
} }
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.kind, self.message)
}
}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,8 +1,10 @@
pub mod errors;
pub mod models; pub mod models;
pub mod traits; pub mod traits;
pub mod errors;
pub use errors::{Error, Result}; pub use errors::{Error, ErrorKind, Result};
pub use models::*; pub use models::*;
pub use traits::*; pub use traits::*;

View file

@ -1,17 +1,19 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::types::JsonValue;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Customer { pub struct Message {
pub id: Uuid, pub id: Uuid,
pub name: String, pub conversation_id: Uuid,
pub subscription_tier: String, pub sender_id: Uuid,
pub content: String,
pub status: String, pub status: String,
pub max_instances: i32, pub message_type: String,
pub metadata: JsonValue, pub kind: String, // Add this field
pub shard_key: i32,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -19,95 +21,138 @@ pub struct Instance {
pub id: Uuid, pub id: Uuid,
pub customer_id: Uuid, pub customer_id: Uuid,
pub name: String, pub name: String,
pub status: String,
pub shard_id: i32, pub shard_id: i32,
pub region: String,
pub config: JsonValue,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Room { pub struct Room {
pub id: Uuid, pub id: Uuid,
pub customer_id: Uuid,
pub instance_id: Uuid, pub instance_id: Uuid,
pub name: String, pub name: String,
pub kind: String, pub is_active: bool,
pub status: String,
pub config: JsonValue,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
} pub updated_at: DateTime<Utc>,
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub customer_id: Uuid,
pub instance_id: Uuid,
pub conversation_id: Uuid,
pub sender_id: Uuid,
pub kind: String,
pub content: String,
pub metadata: JsonValue,
pub created_at: DateTime<Utc>,
pub shard_key: i32,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Track { pub struct Track {
pub id: Uuid, pub id: Uuid,
pub room_id: Uuid, pub room_id: Uuid,
pub user_id: Uuid, pub user_id: Uuid,
pub kind: String, pub media_type: String,
pub status: String, pub created_at: DateTime<Utc>,
pub metadata: JsonValue, pub updated_at: DateTime<Utc>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct User { pub struct User {
pub id: Uuid, pub id: Uuid,
pub customer_id: Uuid,
pub instance_id: Uuid, pub instance_id: Uuid,
pub email: String,
pub name: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Customer {
pub id: Uuid,
pub name: String, pub name: String,
pub email: String, pub email: String,
pub status: String, pub created_at: DateTime<Utc>,
pub metadata: JsonValue, pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomConfig {
pub instance_id: Uuid,
pub name: String,
pub max_participants: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Connection {
pub id: Uuid,
pub room_id: Uuid,
pub user_id: Uuid,
pub connected_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrackInfo {
pub room_id: Uuid,
pub user_id: Uuid,
pub media_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subscription {
pub id: Uuid,
pub track_id: Uuid,
pub subscriber_id: Uuid,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
} }
impl Customer { #[derive(Debug, Clone, Serialize, Deserialize)]
pub fn new( pub struct Participant {
name: String, pub user_id: Uuid,
subscription_tier: String, pub room_id: Uuid,
max_instances: i32, pub joined_at: DateTime<Utc>,
) -> Self {
Self {
id: Uuid::new_v4(),
name,
subscription_tier,
status: "active".to_string(),
max_instances,
metadata: HashMap::new(),
created_at: Utc::now()
}
}
} }
#[cfg(test)] #[derive(Debug, Clone, Serialize, Deserialize)]
mod tests { pub struct RoomStats {
use super::*; pub participant_count: u32,
use rstest::*; pub track_count: u32,
pub duration: i64,
#[rstest]
fn test_customer_creation() {
let customer = Customer::new(
"Test Corp".to_string(),
"enterprise".to_string(),
10,
);
assert_eq!(customer.name, "Test Corp");
assert_eq!(customer.subscription_tier, "enterprise");
assert_eq!(customer.max_instances, 10);
assert_eq!(customer.status, "active");
}
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageId(pub Uuid);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageFilter {
pub conversation_id: Option<Uuid>,
pub sender_id: Option<Uuid>,
pub from_date: Option<DateTime<Utc>>,
pub to_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Status {
pub code: String,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchQuery {
pub query: String,
pub conversation_id: Option<Uuid>,
pub from_date: Option<DateTime<Utc>>,
pub to_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileUpload {
pub content: Vec<u8>,
pub filename: String,
pub content_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileInfo {
pub id: Uuid,
pub filename: String,
pub content_type: String,
pub size: u64,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileContent {
pub content: Vec<u8>,
pub content_type: String,
}

View file

@ -1,99 +1,76 @@
use async_trait::async_trait; use std::future::Future;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde::{Map, Value as JsonValue};
use uuid::Uuid; use uuid::Uuid;
use crate::{models::*, Result}; use crate::errors::Result;
use crate::models::{
Customer, Instance, Room, Track, User, Message, Connection,
TrackInfo, Subscription, Participant, RoomStats, MessageId,
MessageFilter, Status, SearchQuery, FileUpload, FileInfo,
FileContent, RoomConfig
};
#[async_trait]
pub trait CustomerRepository: Send + Sync { pub trait CustomerRepository: Send + Sync {
async fn create(&self, customer: &Customer) -> Result<Customer>; fn create(&self, customer: &Customer) -> impl Future<Output = Result<Customer>> + Send;
async fn get(&self, id: Uuid) -> Result<Customer>; fn get(&self, id: Uuid) -> impl Future<Output = Result<Customer>> + Send;
async fn update(&self, customer: &Customer) -> Result<Customer>; fn update(&self, customer: &Customer) -> impl Future<Output = Result<Customer>> + Send;
async fn delete(&self, id: Uuid) -> Result<()>; fn delete(&self, id: Uuid) -> impl Future<Output = Result<()>> + Send;
} }
#[async_trait]
pub trait InstanceRepository: Send + Sync { pub trait InstanceRepository: Send + Sync {
async fn create(&self, instance: &Instance) -> Result<Instance>; fn create(&self, instance: &Instance) -> impl Future<Output = Result<Instance>> + Send;
async fn get(&self, id: Uuid) -> Result<Instance>; fn get(&self, id: Uuid) -> impl Future<Output = Result<Instance>> + Send;
async fn get_by_customer(&self, customer_id: Uuid) -> Result<Vec<Instance>>; fn get_by_customer(&self, customer_id: Uuid) -> impl Future<Output = Result<Vec<Instance>>> + Send;
async fn update(&self, instance: &Instance) -> Result<Instance>; fn update(&self, instance: &Instance) -> impl Future<Output = Result<Instance>> + Send;
async fn delete(&self, id: Uuid) -> Result<()>; fn delete(&self, id: Uuid) -> impl Future<Output = Result<()>> + Send;
async fn get_by_shard(&self, shard_id: i32) -> Result<Vec<Instance>>; fn get_by_shard(&self, shard_id: i32) -> impl Future<Output = Result<Vec<Instance>>> + Send;
} }
#[async_trait]
pub trait RoomRepository: Send + Sync { pub trait RoomRepository: Send + Sync {
async fn create(&self, room: &Room) -> Result<Room>; fn create(&self, room: &Room) -> impl Future<Output = Result<Room>> + Send;
async fn get(&self, id: Uuid) -> Result<Room>; fn get(&self, id: Uuid) -> impl Future<Output = Result<Room>> + Send;
async fn get_by_instance(&self, instance_id: Uuid) -> Result<Vec<Room>>; fn get_by_instance(&self, instance_id: Uuid) -> impl Future<Output = Result<Vec<Room>>> + Send;
async fn update(&self, room: &Room) -> Result<Room>; fn update(&self, room: &Room) -> impl Future<Output = Result<Room>> + Send;
async fn delete(&self, id: Uuid) -> Result<()>; fn delete(&self, id: Uuid) -> impl Future<Output = Result<()>> + Send;
async fn get_active_rooms(&self, instance_id: Uuid) -> Result<Vec<Room>>; fn get_active_rooms(&self, instance_id: Uuid) -> impl Future<Output = Result<Vec<Room>>> + Send;
} }
#[async_trait]
pub trait MessageRepository: Send + Sync {
async fn create(&self, message: &Message) -> Result<Message>;
async fn get(&self, id: Uuid) -> Result<Message>;
async fn get_by_conversation(&self, conversation_id: Uuid) -> Result<Vec<Message>>;
async fn update_status(&self, id: Uuid, status: String) -> Result<()>;
async fn delete(&self, id: Uuid) -> Result<()>;
async fn get_by_shard(&self, shard_key: i32) -> Result<Vec<Message>>;
}
#[async_trait]
pub trait TrackRepository: Send + Sync { pub trait TrackRepository: Send + Sync {
async fn create(&self, track: &Track) -> Result<Track>; fn create(&self, track: &Track) -> impl Future<Output = Result<Track>> + Send;
async fn get(&self, id: Uuid) -> Result<Track>; fn get(&self, id: Uuid) -> impl Future<Output = Result<Track>> + Send;
async fn get_by_room(&self, room_id: Uuid) -> Result<Vec<Track>>; fn get_by_room(&self, room_id: Uuid) -> impl Future<Output = Result<Vec<Track>>> + Send;
async fn update(&self, track: &Track) -> Result<Track>; fn update(&self, track: &Track) -> impl Future<Output = Result<Track>> + Send;
async fn delete(&self, id: Uuid) -> Result<()>; fn delete(&self, id: Uuid) -> impl Future<Output = Result<()>> + Send;
} }
#[async_trait]
pub trait UserRepository: Send + Sync { pub trait UserRepository: Send + Sync {
async fn create(&self, user: &User) -> Result<User>; fn create(&self, user: &User) -> impl Future<Output = Result<User>> + Send;
async fn get(&self, id: Uuid) -> Result<User>; fn get(&self, id: Uuid) -> impl Future<Output = Result<User>> + Send;
async fn get_by_email(&self, email: &str) -> Result<User>; fn get_by_email(&self, email: &str) -> impl Future<Output = Result<User>> + Send;
async fn get_by_instance(&self, instance_id: Uuid) -> Result<Vec<User>>; fn get_by_instance(&self, instance_id: Uuid) -> impl Future<Output = Result<Vec<User>>> + Send;
async fn update(&self, user: &User) -> Result<User>; fn update(&self, user: &User) -> impl Future<Output = Result<User>> + Send;
async fn delete(&self, id: Uuid) -> Result<()>; fn delete(&self, id: Uuid) -> impl Future<Output = Result<()>> + Send;
} }
#[async_trait]
pub trait RoomService: Send + Sync { pub trait RoomService: Send + Sync {
async fn create_room(&self, config: RoomConfig) -> Result<Room>; fn create_room(&self, config: RoomConfig) -> impl Future<Output = Result<Room>> + Send;
async fn join_room(&self, room_id: Uuid, user_id: Uuid) -> Result<Connection>; fn join_room(&self, room_id: Uuid, user_id: Uuid) -> impl Future<Output = Result<Connection>> + Send;
async fn leave_room(&self, room_id: Uuid, user_id: Uuid) -> Result<()>; fn leave_room(&self, room_id: Uuid, user_id: Uuid) -> impl Future<Output = Result<()>> + Send;
async fn publish_track(&self, track: TrackInfo) -> Result<Track>; fn publish_track(&self, track: TrackInfo) -> impl Future<Output = Result<Track>> + Send;
async fn subscribe_track(&self, track_id: Uuid) -> Result<Subscription>; fn subscribe_track(&self, track_id: Uuid) -> impl Future<Output = Result<Subscription>> + Send;
async fn get_participants(&self, room_id: Uuid) -> Result<Vec<Participant>>; fn get_participants(&self, room_id: Uuid) -> impl Future<Output = Result<Vec<Participant>>> + Send;
async fn get_room_stats(&self, room_id: Uuid) -> Result<RoomStats>; fn get_room_stats(&self, room_id: Uuid) -> impl Future<Output = Result<RoomStats>> + Send;
} }
#[async_trait]
pub trait MessageService: Send + Sync { pub trait MessageService: Send + Sync {
async fn send_message(&self, message: Message) -> Result<MessageId>; fn send_message(&self, message: Message) -> impl Future<Output = Result<MessageId>> + Send;
async fn get_messages(&self, filter: MessageFilter) -> Result<Vec<Message>>; fn get_messages(&self, filter: MessageFilter) -> impl Future<Output = Result<Vec<Message>>> + Send;
async fn update_status(&self, message_id: Uuid, status: Status) -> Result<()>; fn update_status(&self, message_id: Uuid, status: Status) -> impl Future<Output = Result<()>> + Send;
async fn delete_messages(&self, filter: MessageFilter) -> Result<()>; fn delete_messages(&self, filter: MessageFilter) -> impl Future<Output = Result<()>> + Send;
async fn search_messages(&self, query: SearchQuery) -> Result<Vec<Message>>; fn search_messages(&self, query: SearchQuery) -> impl Future<Output = Result<Vec<Message>>> + Send;
} }
#[async_trait] pub trait FileService: Send + Sync {
pub trait StorageService: Send + Sync { fn save_file(&self, file: FileUpload) -> impl Future<Output = Result<FileInfo>> + Send;
async fn save_file(&self, file: FileUpload) -> Result<FileInfo>; fn get_file(&self, file_id: Uuid) -> impl Future<Output = Result<FileContent>> + Send;
async fn get_file(&self, file_id: Uuid) -> Result<FileContent>; fn delete_file(&self, file_id: Uuid) -> impl Future<Output = Result<()>> + Send;
async fn delete_file(&self, file_id: Uuid) -> Result<()>; fn list_files(&self, prefix: &str) -> impl Future<Output = Result<Vec<FileInfo>>> + Send;
async fn list_files(&self, prefix: &str) -> Result<Vec<FileInfo>>; }
}
#[async_trait]
pub trait MetricsService: Send + Sync {
async fn record_metric(&self, metric: Metric) -> Result<()>;
async fn get_metrics(&self, query: MetricsQuery) -> Result<Vec<MetricValue>>;
async fn create_dashboard(&self, config: DashboardConfig) -> Result<Dashboard>;
async fn get_dashboard(&self, id: Uuid) -> Result<Dashboard>;
}

View file

@ -11,7 +11,7 @@ image = { version = "0.24", features = ["webp", "jpeg", "png", "gif"] }
imageproc = "0.23" imageproc = "0.23"
rusttype = "0.9" rusttype = "0.9"
tesseract = "0.13" tesseract = "0.13"
opencv = { version = "0.84", features = ["clang-runtime"] } opencv = { version = "0.82", features = ["clang-runtime"] }
async-trait.workspace = true async-trait.workspace = true
tokio.workspace = true tokio.workspace = true
serde.workspace = true serde.workspace = true

View file

@ -16,8 +16,10 @@ uuid.workspace = true
async-trait.workspace = true async-trait.workspace = true
tracing.workspace = true tracing.workspace = true
futures.workspace = true futures.workspace = true
futures-util = "0.3"
lapin = "2.3" lapin = "2.3"
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
[dev-dependencies] [dev-dependencies]
rstest.workspace = true rstest.workspace = true
tokio-test = "0.4" tokio-test = "0.4"

View file

@ -1,35 +1,27 @@
use async_trait::async_trait; use gb_core::{Result, Error};
use gb_core::{Result, Error, models::Message}; use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{ use rdkafka::consumer::{StreamConsumer, Consumer};
producer::{FutureProducer, FutureRecord}, use rdkafka::ClientConfig;
consumer::{StreamConsumer, Consumer},
ClientConfig, Message as KafkaMessage,
};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration; use std::time::Duration;
use tracing::{instrument, error, info}; use tracing::{instrument, error};
use uuid::Uuid; use serde::Serialize;
pub struct Kafka {
pub struct KafkaBroker {
producer: FutureProducer, producer: FutureProducer,
consumer: StreamConsumer, consumer: StreamConsumer,
} }
impl KafkaBroker { impl Kafka {
pub fn new(brokers: &str, group_id: &str) -> Result<Self> { pub async fn new(brokers: &str) -> Result<Self> {
let producer: FutureProducer = ClientConfig::new() let producer = ClientConfig::new()
.set("bootstrap.servers", brokers) .set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create() .create()
.map_err(|e| Error::Kafka(format!("Failed to create producer: {}", e)))?; .map_err(|e| Error::kafka(format!("Failed to create producer: {}", e)))?;
let consumer: StreamConsumer = ClientConfig::new() let consumer = ClientConfig::new()
.set("bootstrap.servers", brokers) .set("bootstrap.servers", brokers)
.set("group.id", group_id) .set("group.id", "my-group")
.set("enable.auto.commit", "true")
.set("auto.offset.reset", "earliest")
.create() .create()
.map_err(|e| Error::Kafka(format!("Failed to create consumer: {}", e)))?; .map_err(|e| Error::kafka(format!("Failed to create consumer: {}", e)))?;
Ok(Self { Ok(Self {
producer, producer,
@ -37,60 +29,37 @@ impl KafkaBroker {
}) })
} }
#[instrument(skip(self, value))] pub async fn publish<T: Serialize>(&self, topic: &str, message: &T) -> Result<()> {
pub async fn publish<T: Serialize>(&self, topic: &str, key: &str, value: &T) -> Result<()> { let payload = serde_json::to_string(message)
let payload = serde_json::to_string(value) .map_err(|e| Error::internal(format!("Serialization error: {}", e)))?;
.map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?;
self.producer self.producer
.send( .send(
FutureRecord::to(topic) FutureRecord::to(topic)
.key(key) .payload(payload.as_bytes())
.payload(&payload), .key(""),
Duration::from_secs(5), Duration::from_secs(0),
) )
.await .await
.map_err(|(e, _)| Error::Kafka(format!("Failed to send message: {}", e)))?; .map_err(|(e, _)| Error::kafka(format!("Failed to send message: {}", e)))?;
Ok(()) Ok(())
} }
#[instrument(skip(self, handler))] pub async fn subscribe(&self, topic: &str) -> Result<()> {
pub async fn subscribe<T, F, Fut>(&self, topics: &[&str], handler: F) -> Result<()>
where
T: DeserializeOwned,
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
self.consumer self.consumer
.subscribe(topics) .subscribe(&[topic])
.map_err(|e| Error::Kafka(format!("Failed to subscribe: {}", e)))?; .map_err(|e| Error::kafka(format!("Failed to subscribe: {}", e)))?;
loop { Ok(())
match self.consumer.recv().await {
Ok(msg) => {
if let Some(payload) = msg.payload() {
match serde_json::from_slice::<T>(payload) {
Ok(value) => {
if let Err(e) = handler(value).await {
error!("Handler error: {}", e);
}
}
Err(e) => error!("Deserialization error: {}", e),
}
}
}
Err(e) => error!("Consumer error: {}", e),
}
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use rstest::*; use rstest::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, PartialEq)] #[derive(Debug, Serialize, Deserialize, PartialEq)]
struct TestMessage { struct TestMessage {
@ -99,11 +68,8 @@ mod tests {
} }
#[fixture] #[fixture]
fn kafka_broker() -> KafkaBroker { async fn kafka_broker() -> Kafka {
KafkaBroker::new( Kafka::new("localhost:9092").await.unwrap()
"localhost:9092",
"test-group",
).unwrap()
} }
#[fixture] #[fixture]
@ -116,29 +82,15 @@ mod tests {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_publish_subscribe( async fn test_publish_subscribe(#[future] kafka_broker: Kafka, test_message: TestMessage) {
kafka_broker: KafkaBroker,
test_message: TestMessage,
) {
let topic = "test-topic"; let topic = "test-topic";
let key = test_message.id.to_string(); kafka_broker.publish(topic, &test_message)
// Publish message
kafka_broker.publish(topic, &key, &test_message)
.await .await
.unwrap(); .unwrap();
// Subscribe and verify kafka_broker.subscribe(topic)
let handler = |msg: TestMessage| async move { .await
assert_eq!(msg, test_message); .unwrap();
Ok(())
};
// Run subscription for a short time
tokio::spawn(async move {
kafka_broker.subscribe(&[topic], handler).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }

View file

@ -1,14 +1,14 @@
pub mod kafka; mod kafka;
pub mod redis_pubsub; mod rabbitmq;
pub mod rabbitmq; mod redis_pubsub;
pub mod websocket; mod websocket;
pub mod processor; mod processor;
pub use kafka::KafkaBroker; pub use kafka::Kafka;
pub use redis_pubsub::RedisPubSub;
pub use rabbitmq::RabbitMQ; pub use rabbitmq::RabbitMQ;
pub use redis_pubsub::RedisPubSub;
pub use websocket::WebSocketClient; pub use websocket::WebSocketClient;
pub use processor::{MessageProcessor, MessageEnvelope}; pub use processor::MessageProcessor;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -26,7 +26,6 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_messaging_integration() { async fn test_messaging_integration() {
// Initialize message brokers
let kafka = KafkaBroker::new( let kafka = KafkaBroker::new(
"localhost:9092", "localhost:9092",
"test-group", "test-group",
@ -40,37 +39,31 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let websocket = WebSocketClient::connect("ws://localhost:8080") let mut websocket = WebSocketClient::connect("ws://localhost:8080")
.await .await
.unwrap(); .unwrap();
// Create test message
let test_message = TestMessage { let test_message = TestMessage {
id: Uuid::new_v4(), id: Uuid::new_v4(),
content: "integration test".to_string(), content: "integration test".to_string(),
}; };
// Test Kafka
kafka.publish("test-topic", &test_message.id.to_string(), &test_message) kafka.publish("test-topic", &test_message.id.to_string(), &test_message)
.await .await
.unwrap(); .unwrap();
// Test Redis PubSub
redis.publish("test-channel", &test_message) redis.publish("test-channel", &test_message)
.await .await
.unwrap(); .unwrap();
// Test RabbitMQ
rabbitmq.publish("", "test.key", &test_message) rabbitmq.publish("", "test.key", &test_message)
.await .await
.unwrap(); .unwrap();
// Test WebSocket websocket.send_message(serde_json::to_string(&test_message).unwrap())
websocket.send(&test_message)
.await .await
.unwrap(); .unwrap();
// Test Message Processor
let mut processor = MessageProcessor::new(100); let mut processor = MessageProcessor::new(100);
processor.register_handler("test", |envelope| { processor.register_handler("test", |envelope| {

View file

@ -1,22 +1,31 @@
use async_trait::async_trait;
use gb_core::{Result, Error}; use gb_core::{Result, Error};
use lapin::{ use lapin::{
options::*, options::*,
types::FieldTable, types::FieldTable,
Connection, ConnectionProperties, Connection, ConnectionProperties,
Channel, Consumer, Channel,
message::Delivery, BasicProperties,
}; };
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{instrument, error}; use tracing::{instrument, error};
use futures::StreamExt;
pub struct RabbitMQ { pub struct RabbitMQ {
connection: Connection, connection: Arc<Connection>,
channel: Arc<Mutex<Channel>>, channel: Arc<Mutex<Channel>>,
} }
impl Clone for RabbitMQ {
fn clone(&self) -> Self {
Self {
connection: self.connection.clone(),
channel: self.channel.clone(),
}
}
}
impl RabbitMQ { impl RabbitMQ {
pub async fn new(url: &str) -> Result<Self> { pub async fn new(url: &str) -> Result<Self> {
let connection = Connection::connect( let connection = Connection::connect(
@ -24,14 +33,14 @@ impl RabbitMQ {
ConnectionProperties::default(), ConnectionProperties::default(),
) )
.await .await
.map_err(|e| Error::Internal(format!("RabbitMQ connection error: {}", e)))?; .map_err(|e| Error::internal(format!("RabbitMQ connection error: {}", e)))?;
let channel = connection.create_channel() let channel = connection.create_channel()
.await .await
.map_err(|e| Error::Internal(format!("RabbitMQ channel error: {}", e)))?; .map_err(|e| Error::internal(format!("RabbitMQ channel error: {}", e)))?;
Ok(Self { Ok(Self {
connection, connection: Arc::new(connection),
channel: Arc::new(Mutex::new(channel)), channel: Arc::new(Mutex::new(channel)),
}) })
} }
@ -44,7 +53,7 @@ impl RabbitMQ {
message: &T, message: &T,
) -> Result<()> { ) -> Result<()> {
let payload = serde_json::to_string(message) let payload = serde_json::to_string(message)
.map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?; .map_err(|e| Error::internal(format!("Serialization error: {}", e)))?;
let channel = self.channel.lock().await; let channel = self.channel.lock().await;
@ -56,7 +65,7 @@ impl RabbitMQ {
BasicProperties::default(), BasicProperties::default(),
) )
.await .await
.map_err(|e| Error::Internal(format!("RabbitMQ publish error: {}", e)))?; .map_err(|e| Error::internal(format!("RabbitMQ publish error: {}", e)))?;
Ok(()) Ok(())
} }
@ -80,7 +89,7 @@ impl RabbitMQ {
FieldTable::default(), FieldTable::default(),
) )
.await .await
.map_err(|e| Error::Internal(format!("RabbitMQ queue declare error: {}", e)))?; .map_err(|e| Error::internal(format!("RabbitMQ queue declare error: {}", e)))?;
let mut consumer = channel.basic_consume( let mut consumer = channel.basic_consume(
queue, queue,
@ -89,7 +98,7 @@ impl RabbitMQ {
FieldTable::default(), FieldTable::default(),
) )
.await .await
.map_err(|e| Error::Internal(format!("RabbitMQ consume error: {}", e)))?; .map_err(|e| Error::internal(format!("RabbitMQ consume error: {}", e)))?;
while let Some(delivery) = consumer.next().await { while let Some(delivery) = consumer.next().await {
match delivery { match delivery {
@ -122,8 +131,9 @@ mod tests {
use rstest::*; use rstest::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize, PartialEq)] #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct TestMessage { struct TestMessage {
id: Uuid, id: Uuid,
content: String, content: String,
@ -150,31 +160,29 @@ mod tests {
rabbitmq: RabbitMQ, rabbitmq: RabbitMQ,
test_message: TestMessage, test_message: TestMessage,
) { ) {
let queue = "test-queue"; let queue = "test_queue";
let routing_key = "test.key"; let routing_key = "test_routing_key";
// Subscribe first
let rabbitmq_clone = rabbitmq.clone(); let rabbitmq_clone = rabbitmq.clone();
let test_message_clone = test_message.clone(); let test_message_clone = test_message.clone();
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
let handler = |msg: TestMessage| async move { let handler = |msg: TestMessage| async move {
assert_eq!(msg, test_message_clone); assert_eq!(msg, test_message_clone);
Ok(()) Ok(())
}; };
rabbitmq_clone.subscribe(queue, handler).await.unwrap(); rabbitmq_clone.subscribe(queue, handler).await.unwrap();
}); });
// Give subscription time to establish
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
// Publish message
rabbitmq.publish("", routing_key, &test_message) rabbitmq.publish("", routing_key, &test_message)
.await .await
.unwrap(); .unwrap();
// Wait for handler to process
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
handle.abort(); handle.abort();
} }
} }

View file

@ -1,80 +1,53 @@
use async_trait::async_trait; use async_trait::async_trait;
use gb_core::{Result, Error}; use gb_core::{Result, Error};
use redis::{ use redis::{Client, AsyncCommands};
aio::MultiplexedConnection,
AsyncCommands, Client,
};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tracing::instrument;
use tracing::{instrument, error};
pub struct RedisPubSub { pub struct RedisPubSub {
client: Client, client: Arc<Client>,
conn: Arc<Mutex<MultiplexedConnection>>, }
impl Clone for RedisPubSub {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
} }
impl RedisPubSub { impl RedisPubSub {
pub async fn new(url: &str) -> Result<Self> { pub async fn new(url: &str) -> Result<Self> {
let client = Client::open(url) let client = Client::open(url)
.map_err(|e| Error::Redis(e))?; .map_err(|e| Error::redis(e.to_string()))?;
let conn = client.get_multiplexed_async_connection() // Test connection
client.get_async_connection()
.await .await
.map_err(|e| Error::Redis(e))?; .map_err(|e| Error::redis(e.to_string()))?;
Ok(Self { Ok(Self {
client, client: Arc::new(client),
conn: Arc::new(Mutex::new(conn)),
}) })
} }
#[instrument(skip(self, message))] #[instrument(skip(self, payload))]
pub async fn publish<T: Serialize>(&self, channel: &str, message: &T) -> Result<()> { pub async fn publish<T>(&self, channel: &str, payload: &T) -> Result<()>
let payload = serde_json::to_string(message) where
.map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?; T: Serialize + std::fmt::Debug,
{
let mut conn = self.client.get_async_connection()
.await
.map_err(|e| Error::redis(e.to_string()))?;
let payload = serde_json::to_string(payload)
.map_err(|e| Error::redis(e.to_string()))?;
let mut conn = self.conn.lock().await;
conn.publish(channel, payload) conn.publish(channel, payload)
.await .await
.map_err(|e| Error::Redis(e))?; .map_err(|e| Error::redis(e.to_string()))?;
Ok(())
}
#[instrument(skip(self, handler))]
pub async fn subscribe<T, F, Fut>(&self, channels: &[&str], handler: F) -> Result<()>
where
T: DeserializeOwned,
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
let mut pubsub = self.client.get_async_connection()
.await
.map_err(|e| Error::Redis(e))?
.into_pubsub();
for channel in channels {
pubsub.subscribe(*channel)
.await
.map_err(|e| Error::Redis(e))?;
}
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let payload: String = msg.get_payload()
.map_err(|e| Error::Redis(e))?;
match serde_json::from_str::<T>(&payload) {
Ok(value) => {
if let Err(e) = handler(value).await {
error!("Handler error: {}", e);
}
}
Err(e) => error!("Deserialization error: {}", e),
}
}
Ok(()) Ok(())
} }
@ -86,6 +59,7 @@ mod tests {
use rstest::*; use rstest::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use std::time::Duration;
#[derive(Debug, Serialize, Deserialize, PartialEq)] #[derive(Debug, Serialize, Deserialize, PartialEq)]
struct TestMessage { struct TestMessage {
@ -116,7 +90,6 @@ mod tests {
) { ) {
let channel = "test-channel"; let channel = "test-channel";
// Subscribe first
let pubsub_clone = redis_pubsub.clone(); let pubsub_clone = redis_pubsub.clone();
let test_message_clone = test_message.clone(); let test_message_clone = test_message.clone();
@ -129,15 +102,12 @@ mod tests {
pubsub_clone.subscribe(&[channel], handler).await.unwrap(); pubsub_clone.subscribe(&[channel], handler).await.unwrap();
}); });
// Give subscription time to establish
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
// Publish message
redis_pubsub.publish(channel, &test_message) redis_pubsub.publish(channel, &test_message)
.await .await
.unwrap(); .unwrap();
// Wait for handler to process
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
handle.abort(); handle.abort();
} }

View file

@ -1,80 +1,33 @@
use futures_util::SinkExt;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use gb_core::{Result, Error}; use gb_core::{Result, Error};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use tokio::{
net::TcpStream,
sync::Mutex,
};
use tokio_tungstenite::{
connect_async,
tungstenite::Message,
WebSocketStream,
};
use tracing::{instrument, error};
pub struct WebSocketClient { pub struct WebSocketClient {
write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>, stream: tokio_tungstenite::WebSocketStream<
read: Arc<Mutex<SplitStream<WebSocketStream<TcpStream>>>>, tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>
>,
} }
impl WebSocketClient { impl WebSocketClient {
fn to_gb_error(error: tokio_tungstenite::tungstenite::Error) -> Error {
Error::new(
gb_core::ErrorKind::WebSocket(error.to_string()),
error.to_string()
)
}
pub async fn connect(url: &str) -> Result<Self> { pub async fn connect(url: &str) -> Result<Self> {
let (ws_stream, _) = connect_async(url) let (ws_stream, _) = connect_async(url).await.map_err(Self::to_gb_error)?;
.await
.map_err(|e| Error::Internal(format!("WebSocket connection error: {}", e)))?;
let (write, read) = ws_stream.split();
Ok(Self { Ok(Self {
write: Arc::new(Mutex::new(write)), stream: ws_stream,
read: Arc::new(Mutex::new(read)),
}) })
} }
#[instrument(skip(self, message))] pub async fn send_message(&mut self, payload: String) -> Result<()> {
pub async fn send<T: Serialize>(&self, message: &T) -> Result<()> { self.stream
let payload = serde_json::to_string(message) .send(Message::Text(payload))
.map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?;
let mut write = self.write.lock().await;
write.send(Message::Text(payload))
.await .await
.map_err(|e| Error::Internal(format!("WebSocket send error: {}", e)))?; .map_err(Self::to_gb_error)?;
Ok(())
}
#[instrument(skip(self, handler))]
pub async fn receive<T, F, Fut>(&self, handler: F) -> Result<()>
where
T: DeserializeOwned,
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
let mut read = self.read.lock().await;
while let Some(message) = read.next().await {
match message {
Ok(Message::Text(payload)) => {
match serde_json::from_str::<T>(&payload) {
Ok(value) => {
if let Err(e) = handler(value).await {
error!("Handler error: {}", e);
}
}
Err(e) => error!("Deserialization error: {}", e),
}
}
Ok(Message::Close(_)) => break,
Err(e) => error!("WebSocket receive error: {}", e),
_ => continue,
}
}
Ok(()) Ok(())
} }
} }
@ -84,6 +37,7 @@ mod tests {
use super::*; use super::*;
use rstest::*; use rstest::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use uuid::Uuid; use uuid::Uuid;
@ -96,15 +50,12 @@ mod tests {
async fn create_test_server() -> String { async fn create_test_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
let ws_stream = tokio_tungstenite::accept_async(stream) let ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
.await
.unwrap();
let (mut write, mut read) = ws_stream.split(); let (mut write, mut read) = ws_stream.split();
while let Some(Ok(msg)) = read.next().await { while let Some(Ok(msg)) = read.next().await {
if let Message::Text(_) = msg { if let Message::Text(_) = msg {
write.send(msg).await.unwrap(); write.send(msg).await.unwrap();
@ -126,30 +77,10 @@ mod tests {
#[rstest] #[rstest]
#[tokio::test] #[tokio::test]
async fn test_websocket_client(test_message: TestMessage) { async fn test_websocket(test_message: TestMessage) {
let server_url = create_test_server().await; let server_url = create_test_server().await;
let client = WebSocketClient::connect(&server_url).await.unwrap(); let mut client = WebSocket::new(&server_url).await.unwrap();
let test_message_clone = test_message.clone();
// Start receiving messages
let client_clone = client.clone();
let handle = tokio::spawn(async move {
let handler = |msg: TestMessage| async move {
assert_eq!(msg, test_message_clone);
Ok(())
};
client_clone.receive(handler).await.unwrap();
});
// Give receiver time to start
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
// Send test message
client.send(&test_message).await.unwrap(); client.send(&test_message).await.unwrap();
// Wait for message to be processed
tokio::time::sleep(Duration::from_secs(1)).await;
handle.abort();
} }
} }

View file

@ -1,17 +1,18 @@
[package] [package]
name = "gb-monitoring" name = "gb-monitoring"
version.workspace = true version = "0.1.0"
edition.workspace = true edition = "2021"
authors.workspace = true
license.workspace = true
[dependencies] [dependencies]
opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.12", features = ["tonic"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
thiserror = "1.0"
prometheus = "0.13"
gb-core = { path = "../gb-core" } gb-core = { path = "../gb-core" }
lazy_static = "1.4"
tokio.workspace = true tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
prometheus.workspace = true
opentelemetry.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true

View file

@ -1,11 +1,12 @@
pub mod metrics; mod logging;
pub mod logging; mod metrics;
pub mod telemetry; mod telemetry;
pub use metrics::Metrics;
pub use logging::init_logging; pub use logging::init_logging;
pub use metrics::Metrics;
pub use telemetry::Telemetry; pub use telemetry::Telemetry;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -1,31 +1,28 @@
use tracing::{subscriber::set_global_default, Subscriber}; use tracing::subscriber::set_global_default;
use tracing_subscriber::{ use tracing_subscriber::{
fmt::{format::FmtSpan, time::ChronoUtc}, fmt::{format::FmtSpan, time},
EnvFilter,
layer::SubscriberExt, layer::SubscriberExt,
EnvFilter, Registry, Registry,
}; };
pub fn init_logging() -> Result<(), Box<dyn std::error::Error>> { pub fn init_logging(service_name: &str) {
let env_filter = EnvFilter::try_from_default_env() let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info")); .unwrap_or_else(|_| EnvFilter::new("info"));
let formatting_layer = tracing_subscriber::fmt::layer() let formatting_layer = tracing_subscriber::fmt::layer()
.with_timer(ChronoUtc::rfc3339()) .with_timer(time::time())
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true) .with_target(true)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .with_thread_ids(true)
.with_span_events(FmtSpan::CLOSE)
.with_file(true) .with_file(true)
.with_line_number(true) .with_line_number(true);
.json();
let subscriber = Registry::default() let subscriber = Registry::default()
.with(env_filter) .with(env_filter)
.with(formatting_layer); .with(formatting_layer);
set_global_default(subscriber)?; set_global_default(subscriber).expect("Failed to set tracing subscriber");
Ok(())
} }
#[cfg(test)] #[cfg(test)]
@ -37,7 +34,6 @@ mod tests {
fn test_logging_initialization() { fn test_logging_initialization() {
assert!(init_logging().is_ok()); assert!(init_logging().is_ok());
// Test logging
info!("Test log message"); info!("Test log message");
} }
} }

View file

@ -1,146 +1,80 @@
use gb_core::{Result, Error}; use prometheus::{IntCounter, IntGauge, Histogram, Registry};
use prometheus::{
Counter, Gauge, Histogram, HistogramOpts, IntCounter, Registry,
opts, register_counter, register_gauge, register_histogram,
};
use std::sync::Arc;
use tracing::{instrument, error};
#[derive(Clone)]
pub struct Metrics { pub struct Metrics {
registry: Arc<Registry>, registry: Registry,
active_connections: Gauge, message_counter: IntCounter,
message_count: IntCounter, active_connections: IntGauge,
request_duration: Histogram, message_processing_time: Histogram,
active_rooms: Gauge,
media_bandwidth: Gauge,
} }
impl Metrics { impl Metrics {
pub fn new() -> Result<Self> { pub fn new() -> Self {
let registry = Registry::new(); let registry = Registry::new();
let message_counter = IntCounter::new(
"message_total",
"Total number of messages processed"
).unwrap();
let active_connections = IntGauge::new(
"active_connections",
"Number of active connections"
).unwrap();
let message_processing_time = Histogram::with_opts(
prometheus::HistogramOpts::new(
"message_processing_seconds",
"Time spent processing messages"
).buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0])
).unwrap();
let active_connections = register_gauge!( registry.register(Box::new(message_counter.clone())).unwrap();
opts!("gb_active_connections", "Number of active connections"), registry.register(Box::new(active_connections.clone())).unwrap();
registry registry.register(Box::new(message_processing_time.clone())).unwrap();
).map_err(|e| Error::Internal(format!("Failed to create metric: {}", e)))?;
let message_count = register_counter!( Self {
opts!("gb_message_count", "Total number of messages processed"), registry,
registry message_counter,
).map_err(|e| Error::Internal(format!("Failed to create metric: {}", e)))?;
let request_duration = register_histogram!(
HistogramOpts::new(
"gb_request_duration",
"Request duration in seconds"
),
registry
).map_err(|e| Error::Internal(format!("Failed to create metric: {}", e)))?;
let active_rooms = register_gauge!(
opts!("gb_active_rooms", "Number of active rooms"),
registry
).map_err(|e| Error::Internal(format!("Failed to create metric: {}", e)))?;
let media_bandwidth = register_gauge!(
opts!("gb_media_bandwidth", "Current media bandwidth usage in bytes/sec"),
registry
).map_err(|e| Error::Internal(format!("Failed to create metric: {}", e)))?;
Ok(Self {
registry: Arc::new(registry),
active_connections, active_connections,
message_count, message_processing_time,
request_duration, }
active_rooms,
media_bandwidth,
})
} }
#[instrument(skip(self))] pub fn increment_message_count(&self) {
pub fn increment_connections(&self) { self.message_counter.inc();
self.active_connections.inc();
} }
#[instrument(skip(self))] pub fn observe_processing_time(&self, duration_seconds: f64) {
pub fn decrement_connections(&self) { self.message_processing_time.observe(duration_seconds);
self.active_connections.dec();
} }
#[instrument(skip(self))] pub fn set_active_connections(&self, count: i64) {
pub fn increment_messages(&self) { self.active_connections.set(count);
self.message_count.inc();
} }
#[instrument(skip(self))] pub fn registry(&self) -> &Registry {
pub fn observe_request_duration(&self, duration: f64) { &self.registry
self.request_duration.observe(duration);
}
#[instrument(skip(self))]
pub fn set_active_rooms(&self, count: i64) {
self.active_rooms.set(count as f64);
}
#[instrument(skip(self))]
pub fn set_media_bandwidth(&self, bytes_per_sec: f64) {
self.media_bandwidth.set(bytes_per_sec);
}
pub fn registry(&self) -> Arc<Registry> {
self.registry.clone()
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use prometheus::core::{Collector, Desc};
#[test] #[test]
fn test_metrics_creation() { fn test_metrics() {
let metrics = Metrics::new().unwrap(); let metrics = Metrics::new();
// Test increment connections metrics.increment_message_count();
metrics.increment_connections(); assert_eq!(metrics.message_counter.get(), 1);
assert_eq!(
metrics.active_connections.get(), metrics.set_active_connections(10);
1.0 assert_eq!(metrics.active_connections.get(), 10);
);
metrics.observe_processing_time(0.5);
// Test decrement connections
metrics.decrement_connections();
assert_eq!(
metrics.active_connections.get(),
0.0
);
// Test message count
metrics.increment_messages();
assert_eq!(
metrics.message_count.get(),
1
);
// Test request duration
metrics.observe_request_duration(0.5);
let mut buffer = Vec::new(); let mut buffer = Vec::new();
metrics.request_duration.encode(&mut buffer).unwrap(); let encoder = prometheus::TextEncoder::new();
encoder.encode(&metrics.registry().gather(), &mut buffer).unwrap();
assert!(!buffer.is_empty()); assert!(!buffer.is_empty());
// Test active rooms
metrics.set_active_rooms(10);
assert_eq!(
metrics.active_rooms.get(),
10.0
);
// Test media bandwidth
metrics.set_media_bandwidth(1024.0);
assert_eq!(
metrics.media_bandwidth.get(),
1024.0
);
} }
} }

View file

@ -1,40 +1,48 @@
use opentelemetry::{ use opentelemetry::{
runtime::Tokio,
sdk::{trace, Resource}, sdk::{trace, Resource},
runtime::Tokio,
KeyValue, KeyValue,
}; };
use std::time::Duration; use opentelemetry_otlp::{Protocol, WithExportConfig};
use tracing::error; use thiserror::Error;
#[derive(Error, Debug)]
pub enum TelemetryError {
#[error("Failed to initialize tracer: {0}")]
Init(String),
}
pub struct Telemetry { pub struct Telemetry {
tracer: opentelemetry::sdk::trace::Tracer, tracer: trace::Tracer,
} }
impl Telemetry { impl Telemetry {
pub fn new(service_name: &str) -> Result<Self, Box<dyn std::error::Error>> { pub async fn new(service_name: &str) -> Result<Self, TelemetryError> {
let tracer = opentelemetry_otlp::new_pipeline() let tracer = Self::init_tracer(service_name)
.tracing() .await
.with_exporter( .map_err(|e| TelemetryError::Init(e.to_string()))?;
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317")
.with_timeout(Duration::from_secs(3))
)
.with_trace_config(
trace::config()
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_string(),
)]))
.with_sampler(trace::Sampler::AlwaysOn)
)
.install_batch(Tokio)?;
Ok(Self { tracer }) Ok(Self { tracer })
} }
pub fn tracer(&self) -> &opentelemetry::sdk::trace::Tracer { async fn init_tracer(service_name: &str) -> Result<trace::Tracer, TelemetryError> {
&self.tracer let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(Protocol::Grpc);
let resource = Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
]);
let config = trace::config().with_resource(resource);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(config)
.install_batch(Tokio)
.map_err(|e| TelemetryError::Init(e.to_string()))?;
Ok(tracer)
} }
} }
@ -48,9 +56,9 @@ impl Drop for Telemetry {
mod tests { mod tests {
use super::*; use super::*;
#[test] #[tokio::test]
fn test_telemetry_creation() { async fn test_telemetry_creation() {
let telemetry = Telemetry::new("test-service"); let telemetry = Telemetry::new("test-service").await;
assert!(telemetry.is_ok()); assert!(telemetry.is_ok());
} }
} }

View file

@ -1,23 +0,0 @@
[package]
name = "gb-nlp"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
gb-core = { path = "../gb-core" }
rust-bert = "0.21"
tokenizers = "0.15"
whatlang = "0.16"
async-trait.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tracing.workspace = true
tch = "0.13"
[dev-dependencies]
rstest.workspace = true
tokio-test = "0.4"

View file

@ -1,76 +0,0 @@
use gb_core::{Result, Error};
use tracing::instrument;
use whatlang::{Lang, Script, Detector, detect};
pub struct LanguageDetector {
detector: Detector,
}
impl LanguageDetector {
pub fn new() -> Self {
Self {
detector: Detector::new(),
}
}
#[instrument(skip(self, text))]
pub fn detect_language(&self, text: &str) -> Result<DetectedLanguage> {
let info = detect(text)
.ok_or_else(|| Error::Internal("Failed to detect language".to_string()))?;
Ok(DetectedLanguage {
lang: info.lang(),
script: info.script(),
confidence: info.confidence(),
})
}
#[instrument(skip(self, text))]
pub fn is_language(&self, text: &str, lang: Lang) -> bool {
if let Some(info) = detect(text) {
info.lang() == lang
} else {
false
}
}
}
#[derive(Debug, Clone)]
pub struct DetectedLanguage {
pub lang: Lang,
pub script: Script,
pub confidence: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
#[fixture]
fn detector() -> LanguageDetector {
LanguageDetector::new()
}
#[rstest]
fn test_detect_english(detector: LanguageDetector) {
let text = "Hello, this is a test sentence in English.";
let result = detector.detect_language(text).unwrap();
assert_eq!(result.lang, Lang::Eng);
assert!(result.confidence > 0.9);
}
#[rstest]
fn test_detect_spanish(detector: LanguageDetector) {
let text = "Hola, esta es una prueba en español.";
let result = detector.detect_language(text).unwrap();
assert_eq!(result.lang, Lang::Spa);
assert!(result.confidence > 0.9);
}
#[rstest]
fn test_is_language(detector: LanguageDetector) {
let text = "Hello world";
assert!(detector.is_language(text, Lang::Eng));
}
}

View file

@ -1,42 +0,0 @@
pub mod lang;
pub mod text;
pub use lang::{LanguageDetector, DetectedLanguage};
pub use text::{TextProcessor, Sentiment, Entity, Answer};
#[cfg(test)]
mod tests {
use super::*;
use gb_core::Result;
#[tokio::test]
async fn test_nlp_integration() -> Result<()> {
// Initialize NLP components
let lang_detector = LanguageDetector::new();
let text_processor = TextProcessor::new().await?;
// Test language detection
let text = "This is a test sentence in English.";
let lang = lang_detector.detect_language(text)?;
assert_eq!(lang.lang, whatlang::Lang::Eng);
// Test sentiment analysis
let sentiment = text_processor.analyze_sentiment(text).await?;
assert!(sentiment.score > 0.0);
// Test entity extraction
let text = "OpenAI released GPT-4 in March 2023.";
let entities = text_processor.extract_entities(text).await?;
// Test summarization
let text = "Artificial intelligence has made significant advances in recent years. Machine learning models can now perform tasks that were once thought to be exclusive to humans. This has led to both excitement and concern about the future of AI.";
let summary = text_processor.summarize(text).await?;
// Test question answering
let context = "Rust is a systems programming language focused on safety and performance.";
let question = "What is Rust?";
let answer = text_processor.answer_question(context, question).await?;
Ok(())
}
}

View file

@ -1,168 +0,0 @@
use gb_core::{Result, Error};
use rust_bert::pipelines::{
sentiment::SentimentModel,
ner::NERModel,
summarization::SummarizationModel,
question_answering::{QaModel, QuestionAnsweringModel},
};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{instrument, error};
pub struct TextProcessor {
sentiment_model: Arc<Mutex<SentimentModel>>,
ner_model: Arc<Mutex<NERModel>>,
summarization_model: Arc<Mutex<SummarizationModel>>,
qa_model: Arc<Mutex<QuestionAnsweringModel>>,
}
impl TextProcessor {
#[instrument]
pub async fn new() -> Result<Self> {
let sentiment_model = SentimentModel::new(Default::default())
.map_err(|e| Error::Internal(format!("Failed to load sentiment model: {}", e)))?;
let ner_model = NERModel::new(Default::default())
.map_err(|e| Error::Internal(format!("Failed to load NER model: {}", e)))?;
let summarization_model = SummarizationModel::new(Default::default())
.map_err(|e| Error::Internal(format!("Failed to load summarization model: {}", e)))?;
let qa_model = QuestionAnsweringModel::new(Default::default())
.map_err(|e| Error::Internal(format!("Failed to load QA model: {}", e)))?;
Ok(Self {
sentiment_model: Arc::new(Mutex::new(sentiment_model)),
ner_model: Arc::new(Mutex::new(ner_model)),
summarization_model: Arc::new(Mutex::new(summarization_model)),
qa_model: Arc::new(Mutex::new(qa_model)),
})
}
#[instrument(skip(self, text))]
pub async fn analyze_sentiment(&self, text: &str) -> Result<Sentiment> {
let model = self.sentiment_model.lock().await;
let output = model.predict(&[text])
.map_err(|e| Error::Internal(format!("Sentiment analysis failed: {}", e)))?;
Ok(Sentiment {
score: output[0].score,
label: output[0].label.clone(),
})
}
#[instrument(skip(self, text))]
pub async fn extract_entities(&self, text: &str) -> Result<Vec<Entity>> {
let model = self.ner_model.lock().await;
let output = model.predict(&[text])
.map_err(|e| Error::Internal(format!("Entity extraction failed: {}", e)))?;
Ok(output[0].iter().map(|entity| Entity {
text: entity.word.clone(),
label: entity.entity.clone(),
score: entity.score,
}).collect())
}
#[instrument(skip(self, text))]
pub async fn summarize(&self, text: &str) -> Result<String> {
let model = self.summarization_model.lock().await;
let output = model.summarize(&[text])
.map_err(|e| Error::Internal(format!("Summarization failed: {}", e)))?;
Ok(output[0].clone())
}
#[instrument(skip(self, context, question))]
pub async fn answer_question(&self, context: &str, question: &str) -> Result<Answer> {
let model = self.qa_model.lock().await;
let output = model.predict(&[QaModel {
context,
question,
}])
.map_err(|e| Error::Internal(format!("Question answering failed: {}", e)))?;
Ok(Answer {
text: output[0].answer.clone(),
score: output[0].score,
start: output[0].start,
end: output[0].end,
})
}
}
#[derive(Debug, Clone)]
pub struct Sentiment {
pub score: f64,
pub label: String,
}
#[derive(Debug, Clone)]
pub struct Entity {
pub text: String,
pub label: String,
pub score: f64,
}
#[derive(Debug, Clone)]
pub struct Answer {
pub text: String,
pub score: f64,
pub start: usize,
pub end: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
#[fixture]
async fn processor() -> TextProcessor {
TextProcessor::new().await.unwrap()
}
#[rstest]
#[tokio::test]
async fn test_sentiment_analysis(processor: TextProcessor) -> Result<()> {
let text = "I love this product! It's amazing!";
let sentiment = processor.analyze_sentiment(text).await?;
assert!(sentiment.score > 0.5);
assert_eq!(sentiment.label, "positive");
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_entity_extraction(processor: TextProcessor) -> Result<()> {
let text = "John Smith works at Microsoft in Seattle.";
let entities = processor.extract_entities(text).await?;
assert!(entities.iter().any(|e| e.text == "John Smith" && e.label == "PERSON"));
assert!(entities.iter().any(|e| e.text == "Microsoft" && e.label == "ORG"));
assert!(entities.iter().any(|e| e.text == "Seattle" && e.label == "LOC"));
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_summarization(processor: TextProcessor) -> Result<()> {
let text = "The quick brown fox jumps over the lazy dog. This is a classic pangram that contains every letter of the English alphabet. It has been used for typing practice and font displays for many years.";
let summary = processor.summarize(text).await?;
assert!(summary.len() < text.len());
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_question_answering(processor: TextProcessor) -> Result<()> {
let context = "The capital of France is Paris. It is known as the City of Light.";
let question = "What is the capital of France?";
let answer = processor.answer_question(context, question).await?;
assert_eq!(answer.text, "Paris");
assert!(answer.score > 0.8);
Ok(())
}
}

View file

@ -34,7 +34,7 @@ sqlx.workspace = true
redis.workspace = true redis.workspace = true
# Metrics & Monitoring # Metrics & Monitoring
prometheus = { version = "0.13", features = ["process"] } prometheus = { version = "0.13.0", features = ["process"] }
tracing.workspace = true tracing.workspace = true
opentelemetry.workspace = true opentelemetry.workspace = true

View file

@ -6,10 +6,6 @@ set -e
sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common gnupg sudo apt-get install -y apt-transport-https ca-certificates curl software-properties-common gnupg
# Update package lists
echo "Updating package lists..."
echo "Repository fixes completed!" echo "Repository fixes completed!"
# Install system dependencies # Install system dependencies
@ -89,9 +85,9 @@ sudo systemctl start postgresql
sudo systemctl enable postgresql sudo systemctl enable postgresql
# Create database and user (with error handling) # Create database and user (with error handling)
sudo -u postgres psql -c "CREATE DATABASE generalbots;" 2>/dev/null || echo "Database might already exist" sudo -u postgres psql -c "CREATE DATABASE generalbots;" 2>/dev/null || echo "Database might already exist"
sudo -u postgres psql -c "CREATE USER gbuser WITH PASSWORD 'gbpassword';" 2>/dev/null || echo "User might already exist" sudo -u postgres psql -c "CREATE USER gbuser WITH PASSWORD 'gbpassword';" 2>/dev/null || echo "User might already exist"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE generalbots TO gbuser;" 2>/dev/null || echo "Privileges might already be granted" sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE generalbots TO gbuser;" 2>/dev/null || echo "Privileges might already be granted"
# Start Redis # Start Redis
echo "Starting Redis service..." echo "Starting Redis service..."
@ -110,4 +106,7 @@ echo -e "\nService Status:"
echo "PostgreSQL status:" echo "PostgreSQL status:"
sudo systemctl status postgresql --no-pager sudo systemctl status postgresql --no-pager
echo -e "\nRedis status:" echo -e "\nRedis status:"
sudo systemctl status redis-server --no-pager sudo systemctl status redis-server --no-pager
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 8771ADB0816950D8 && sudo apt-get update && sudo apt-get install -y libglib2.0-dev build-essential pkg-config
sudo apt-get install -y libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 gstreamer1.0-qt5 gstreamer1.0-pulseaudio && export PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig:/usr/lib/pkgconfig:/usr/share/pkgconfig:$PKG_CONFIG_PATH && echo 'export PKG_CONFIG_PATH=/usr/lib/x86_64-linux-gnu/pkgconfig:/usr/lib/pkgconfig:/usr/share/pkgconfig:$PKG_CONFIG_PATH' >> ~/.bashrc && source ~/.bashrc