diff --git a/.vscode/launch.json b/.vscode/launch.json index 8866d81..5911f65 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,19 +1,6 @@ { "version": "0.2.0", "configurations": [ - { - "type": "lldb", - "request": "launch", - "name": "Cargo test", - "cargo": { - "args": [ - "test", - "--no-run", - "--lib" - ] - }, - "args": [] - }, { "type": "lldb", "request": "launch", diff --git a/gb-api/src/main.rs b/gb-api/src/main.rs index 9e0944a..661f234 100644 --- a/gb-api/src/main.rs +++ b/gb-api/src/main.rs @@ -1,7 +1,7 @@ use gb_core::{Error, Result}; use tracing::{info, error}; -use axum::Router; use std::net::SocketAddr; +use gb_messaging::MessageProcessor; #[tokio::main] async fn main() -> Result<()> { @@ -16,21 +16,13 @@ async fn main() -> Result<()> { } async fn initialize_bot_server() -> Result { - info!("Initializing General Bots..."); + info!("Initializing General Bots server..."); - // Initialize database connections - let db_pool = initialize_database().await?; + // Initialize the MessageProcessor + let message_processor = MessageProcessor::new(); - // Initialize Redis - let redis_client = initialize_redis().await?; - - // Build the Axum router with our routes - let app = axum::Router::new() - .with_state(AppState { - db: db_pool, - redis: redis_client, - }) - // Add your route handlers here + // Build the Axum router using our router module + let app = gb_api::create_router(message_processor) .layer(tower_http::trace::TraceLayer::new_for_http()); Ok(app) @@ -77,8 +69,8 @@ struct AppState { } -async fn start_server(app: Router) -> Result<()> { - let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); +async fn start_server(app: axum::Router) -> Result<()> { + let addr = SocketAddr::from(([0, 0, 0, 0], 3001)); info!("Starting server on {}", addr); match tokio::net::TcpListener::bind(addr).await { @@ -91,6 +83,7 @@ async fn start_server(app: Router) -> Result<()> { Err(e) => { error!("Failed to bind to address: {}", e); Err(Error::internal(format!("Failed to bind to address: {}", e))) - } + } + } } \ No newline at end of file diff --git a/gb-api/src/router.rs b/gb-api/src/router.rs index 507f8e9..c7f9d03 100644 --- a/gb-api/src/router.rs +++ b/gb-api/src/router.rs @@ -70,16 +70,18 @@ async fn send_message( State(state): State>, Json(message): Json, ) -> Result> { + // Clone the message before using it in envelope let envelope = MessageEnvelope { id: Uuid::new_v4(), - message, + message: message.clone(), // Clone here metadata: HashMap::new(), }; let mut processor = state.message_processor.lock().await; - processor.process_messages().await - .map_err(|e| Error::internal(format!("Failed to process message: {}", e)))?; - + processor.add_message(message) // Use original message here + .await + .map_err(|e| Error::internal(format!("Failed to add message: {}", e)))?; + Ok(Json(MessageId(envelope.id))) } diff --git a/gb-core/src/models.rs b/gb-core/src/models.rs index 8ca2bc6..7739c86 100644 --- a/gb-core/src/models.rs +++ b/gb-core/src/models.rs @@ -60,8 +60,8 @@ pub struct Message { pub kind: String, pub content: String, pub metadata: JsonValue, - pub created_at: DateTime, - pub shard_key: i32, + pub created_at: Option>, + pub shard_key: Option, } #[derive(Debug, Serialize, Deserialize)] diff --git a/gb-messaging/gb-migrations/Cargo.toml b/gb-messaging/gb-migrations/Cargo.toml deleted file mode 100644 index dc6aaea..0000000 --- a/gb-messaging/gb-migrations/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "gb-migrations" -version = { workspace = true } -edition = { workspace = true } -authors = { workspace = true } -license = { workspace = true } - -[[bin]] -name = "migrations" -path = "src/bin/migrations.rs" - -[dependencies] -tokio= { workspace = true } -sqlx= { workspace = true } -tracing= { workspace = true } -uuid= { workspace = true } -chrono= { workspace = true } -serde_json= { workspace = true } -gb-core = { path = "../gb-core" } - -[dev-dependencies] -rstest= { workspace = true } \ No newline at end of file diff --git a/gb-messaging/gb-migrations/src/bin/migrations.rs b/gb-messaging/gb-migrations/src/bin/migrations.rs deleted file mode 100644 index 237bf81..0000000 --- a/gb-messaging/gb-migrations/src/bin/migrations.rs +++ /dev/null @@ -1,19 +0,0 @@ -use sqlx::PgPool; -use gb_migrations::run_migrations; - -#[tokio::main] -async fn main() -> Result<(), sqlx::Error> { - let database_url = std::env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); - - println!("Creating database connection pool..."); - let pool = PgPool::connect(&database_url) - .await - .expect("Failed to create pool"); - - println!("Running migrations..."); - run_migrations(&pool).await?; - - println!("Migrations completed successfully!"); - Ok(()) -} \ No newline at end of file diff --git a/gb-messaging/gb-migrations/src/lib.rs b/gb-messaging/gb-migrations/src/lib.rs deleted file mode 100644 index 59a8faa..0000000 --- a/gb-messaging/gb-migrations/src/lib.rs +++ /dev/null @@ -1,144 +0,0 @@ -use sqlx::PgPool; -use tracing::info; - -pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> { - info!("Running database migrations"); - - // Create tables - let table_queries = [ - // Customers table - r#"CREATE TABLE IF NOT EXISTS customers ( - id UUID PRIMARY KEY, - name VARCHAR(255) NOT NULL, - subscription_tier VARCHAR(50) NOT NULL, - status VARCHAR(50) NOT NULL, - max_instances INTEGER NOT NULL, - metadata JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - - // Instances table - r#"CREATE TABLE IF NOT EXISTS instances ( - id UUID PRIMARY KEY, - customer_id UUID NOT NULL REFERENCES customers(id), - name VARCHAR(255) NOT NULL, - status VARCHAR(50) NOT NULL, - shard_id INTEGER NOT NULL, - region VARCHAR(50) NOT NULL, - config JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - - // Rooms table - r#"CREATE TABLE IF NOT EXISTS rooms ( - id UUID PRIMARY KEY, - customer_id UUID NOT NULL REFERENCES customers(id), - instance_id UUID NOT NULL REFERENCES instances(id), - name VARCHAR(255) NOT NULL, - kind VARCHAR(50) NOT NULL, - status VARCHAR(50) NOT NULL, - config JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - - // Messages table - r#"CREATE TABLE IF NOT EXISTS messages ( - id UUID PRIMARY KEY, - customer_id UUID NOT NULL REFERENCES customers(id), - instance_id UUID NOT NULL REFERENCES instances(id), - conversation_id UUID NOT NULL, - sender_id UUID NOT NULL, - kind VARCHAR(50) NOT NULL, - content TEXT NOT NULL, - metadata JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, - shard_key INTEGER NOT NULL - )"#, - - // Users table - r#"CREATE TABLE IF NOT EXISTS users ( - id UUID PRIMARY KEY, - customer_id UUID NOT NULL REFERENCES customers(id), - instance_id UUID NOT NULL REFERENCES instances(id), - name VARCHAR(255) NOT NULL, - email VARCHAR(255) NOT NULL UNIQUE, - status VARCHAR(50) NOT NULL, - metadata JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - - // Tracks table - r#"CREATE TABLE IF NOT EXISTS tracks ( - id UUID PRIMARY KEY, - room_id UUID NOT NULL REFERENCES rooms(id), - user_id UUID NOT NULL REFERENCES users(id), - kind VARCHAR(50) NOT NULL, - status VARCHAR(50) NOT NULL, - metadata JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - - // Subscriptions table - r#"CREATE TABLE IF NOT EXISTS subscriptions ( - id UUID PRIMARY KEY, - track_id UUID NOT NULL REFERENCES tracks(id), - user_id UUID NOT NULL REFERENCES users(id), - status VARCHAR(50) NOT NULL, - metadata JSONB NOT NULL DEFAULT '{}', - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP - )"#, - ]; - - // Create indexes - let index_queries = [ - "CREATE INDEX IF NOT EXISTS idx_instances_customer_id ON instances(customer_id)", - "CREATE INDEX IF NOT EXISTS idx_rooms_instance_id ON rooms(instance_id)", - "CREATE INDEX IF NOT EXISTS idx_messages_conversation_id ON messages(conversation_id)", - "CREATE INDEX IF NOT EXISTS idx_messages_shard_key ON messages(shard_key)", - "CREATE INDEX IF NOT EXISTS idx_tracks_room_id ON tracks(room_id)", - "CREATE INDEX IF NOT EXISTS idx_subscriptions_track_id ON subscriptions(track_id)", - "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)", - ]; - - // Execute table creation queries - for query in table_queries { - sqlx::query(query) - .execute(pool) - .await?; - } - - // Execute index creation queries - for query in index_queries { - sqlx::query(query) - .execute(pool) - .await?; - } - - info!("Migrations completed successfully"); - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use sqlx::postgres::{PgPoolOptions, PgPool}; - use rstest::*; - - async fn create_test_pool() -> PgPool { - let database_url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/gb_test".to_string()); - - PgPoolOptions::new() - .max_connections(5) - .connect(&database_url) - .await - .expect("Failed to create test pool") - } - - #[rstest] - #[tokio::test] - async fn test_migrations() { - let pool = create_test_pool().await; - assert!(run_migrations(&pool).await.is_ok()); - } -} diff --git a/gb-messaging/src/processor.rs b/gb-messaging/src/processor.rs index ccb83fc..84546ac 100644 --- a/gb-messaging/src/processor.rs +++ b/gb-messaging/src/processor.rs @@ -1,11 +1,14 @@ -use gb_core::Result; +use gb_core::{Result, models::*}; // This will import both Message and MessageId -use tracing::{error, instrument}; -use std::sync::Arc; -use tokio::sync::broadcast; +use gb_core::Error; +use uuid::Uuid; use std::collections::HashMap; - +use tracing::instrument; use crate::MessageEnvelope; +use tokio::sync::broadcast; // Add this import +use std::sync::Arc; +use tracing::{error, info}; // Add error and info macros here + pub struct MessageProcessor { tx: broadcast::Sender, @@ -51,6 +54,25 @@ impl MessageProcessor { .insert(kind.to_string(), Box::new(handler)); } + #[instrument(skip(self))] + pub async fn add_message(&mut self, message: Message) -> Result { + let envelope = MessageEnvelope { + id: Uuid::new_v4(), + message, + metadata: HashMap::new(), + }; + + self.tx.send(envelope.clone()) + .map_err(|e| Error::internal(format!("Failed to queue message: {}", e)))?; + + // Start processing immediately + if let Some(handler) = self.handlers.get(&envelope.message.kind) { + handler(envelope.clone()) + .map_err(|e| Error::internal(format!("Handler error: {}", e)))?; + } + + Ok(MessageId(envelope.id)) + } #[instrument(skip(self))] pub async fn process_messages(&mut self) -> Result<()> { while let Ok(envelope) = self.rx.recv().await { @@ -58,6 +80,7 @@ impl MessageProcessor { if let Err(e) = handler(envelope.clone()) { error!("Handler error for message {}: {}", envelope.id, e); } + tracing::info!("Processing message: {:?}", &envelope.message.id); } else { error!("No handler registered for message kind: {}", envelope.message.kind); } diff --git a/processor.rs b/processor.rs new file mode 100644 index 0000000..e69de29