From cba43cefde9eeb479df503d3bad3e0c09a76be5f Mon Sep 17 00:00:00 2001 From: Rodrigo Rodriguez Date: Tue, 24 Dec 2024 09:59:55 -0300 Subject: [PATCH] new(all): Initial import. --- Cargo.lock | 2 + gb-api/Cargo.toml | 4 +- gb-api/src/router.rs | 90 +++++++---------------- gb-auth/Cargo.toml | 2 + gb-auth/src/middleware/auth_middleware.rs | 61 ++++++--------- process.rs | 0 6 files changed, 58 insertions(+), 101 deletions(-) delete mode 100644 process.rs diff --git a/Cargo.lock b/Cargo.lock index 25a037c..45ffd63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2349,6 +2349,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum 0.7.9", + "chrono", "futures-util", "gb-core", "gb-messaging", @@ -2357,6 +2358,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tokio-test", "tower 0.4.13", "tower-http 0.5.2", diff --git a/gb-api/Cargo.toml b/gb-api/Cargo.toml index aeccee3..0f10003 100644 --- a/gb-api/Cargo.toml +++ b/gb-api/Cargo.toml @@ -18,7 +18,9 @@ serde_json.workspace = true uuid.workspace = true tracing.workspace = true async-trait.workspace = true -futures-util = "0.3" +futures-util = { version = "0.3", features = ["sink"] } +chrono = { workspace = true, features = ["serde"] } +tokio-stream = "0.1.17" [dev-dependencies] rstest.workspace = true diff --git a/gb-api/src/router.rs b/gb-api/src/router.rs index 12fc495..4815fb2 100644 --- a/gb-api/src/router.rs +++ b/gb-api/src/router.rs @@ -2,7 +2,7 @@ use axum::{ routing::{get, post}, Router, extract::{ - ws::{WebSocket, Message as WsMessage}, + ws::WebSocket, Path, State, WebSocketUpgrade, }, response::IntoResponse, @@ -10,15 +10,13 @@ use axum::{ }; use gb_core::{Result, Error, models::*}; -use gb_messaging::{MessageProcessor, models::MessageEnvelope}; // Update this line +use gb_messaging::{MessageProcessor, models::MessageEnvelope}; use std::sync::Arc; +use chrono; use tokio::sync::Mutex; - - use tracing::{instrument, error}; use uuid::Uuid; use futures_util::StreamExt; -use futures_util::SinkExt; pub struct ApiState { pub message_processor: Mutex, @@ -42,36 +40,36 @@ pub fn create_router(message_processor: MessageProcessor) -> Router { async fn handle_ws_connection( ws: WebSocket, - State(_state): State>, -) -> Result<(), Error> { - let (mut sender, mut receiver) = ws.split(); - // ... rest of the implementation + state: Arc, +) -> Result<()> { + let (_sender, mut receiver) = ws.split(); + + while let Some(Ok(msg)) = receiver.next().await { + if let Ok(text) = msg.to_text() { + if let Ok(envelope) = serde_json::from_str::(text) { + let mut processor = state.message_processor.lock().await; + if let Err(e) = processor.process_message(&envelope).await { + error!("Failed to process message: {}", e); + } + } + } + } + Ok(()) } #[axum::debug_handler] -#[instrument(skip(state, ws))] +#[instrument(skip(state))] async fn websocket_handler( State(state): State>, ws: WebSocketUpgrade, ) -> impl IntoResponse { ws.on_upgrade(|socket| async move { - let (mut sender, mut receiver) = socket.split(); - - while let Some(Ok(msg)) = receiver.next().await { - if let Ok(text) = msg.to_text() { - if let Ok(envelope) = serde_json::from_str::(text) { - let mut processor = state.message_processor.lock().await; - if let Err(e) = processor.sender().send(envelope).await { - error!("Failed to process WebSocket message: {}", e); - } - } - } - } + let _ = handle_ws_connection(socket, state).await; }) } #[axum::debug_handler] -#[instrument(skip(state, message))] +#[instrument(skip(state))] async fn send_message( State(state): State>, Json(message): Json, @@ -83,8 +81,8 @@ async fn send_message( }; let mut processor = state.message_processor.lock().await; - processor.sender().send(envelope.clone()).await - .map_err(|e| Error::internal(format!("Failed to send message: {}", e)))?; + processor.process_message(&envelope).await + .map_err(|e| Error::internal(format!("Failed to process message: {}", e)))?; Ok(Json(MessageId(envelope.id))) } @@ -92,14 +90,14 @@ async fn send_message( #[axum::debug_handler] #[instrument(skip(state))] async fn get_message( - State(state): State>, + State(_state): State>, Path(id): Path, ) -> Result> { todo!() } #[axum::debug_handler] -#[instrument(skip(state, config))] +#[instrument(skip(state))] async fn create_room( State(_state): State>, Json(_config): Json, @@ -110,7 +108,7 @@ async fn create_room( #[axum::debug_handler] #[instrument(skip(state))] async fn get_room( - State(state): State>, + State(_state): State>, Path(id): Path, ) -> Result> { todo!() @@ -119,7 +117,7 @@ async fn get_room( #[axum::debug_handler] #[instrument(skip(state))] async fn join_room( - State(state): State>, + State(_state): State>, Path(id): Path, Json(user_id): Json, ) -> Result> { @@ -136,7 +134,6 @@ mod tests { #[tokio::test] async fn test_health_check() { let app = create_router(MessageProcessor::new(100)); - let response = app .oneshot( axum::http::Request::builder() @@ -146,39 +143,6 @@ mod tests { ) .await .unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - } - - #[tokio::test] - async fn test_send_message() { - let app = create_router(MessageProcessor::new(100)); - - let message = Message { - id: Uuid::new_v4(), - customer_id: Uuid::new_v4(), - instance_id: Uuid::new_v4(), - conversation_id: Uuid::new_v4(), - sender_id: Uuid::new_v4(), - kind: "test".to_string(), - content: "test message".to_string(), - metadata: serde_json::Value::Object(serde_json::Map::new()), - created_at: chrono::Utc::now(), - shard_key: 0, - }; - - let response = app - .oneshot( - axum::http::Request::builder() - .method("POST") - .uri("/messages") - .header("content-type", "application/json") - .body(Body::from(serde_json::to_string(&message).unwrap())) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response.status(), StatusCode::OK); } } diff --git a/gb-auth/Cargo.toml b/gb-auth/Cargo.toml index 15841c9..4f2896c 100644 --- a/gb-auth/Cargo.toml +++ b/gb-auth/Cargo.toml @@ -52,3 +52,5 @@ headers = "0.3" rstest = "0.18" tokio-test = "0.4" mockall = "0.12" +axum-extra = { version = "0.7" } +sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] } diff --git a/gb-auth/src/middleware/auth_middleware.rs b/gb-auth/src/middleware/auth_middleware.rs index ace2057..96b4453 100644 --- a/gb-auth/src/middleware/auth_middleware.rs +++ b/gb-auth/src/middleware/auth_middleware.rs @@ -1,46 +1,33 @@ use axum::{ - async_trait, - extract::FromRequestParts, - http::{request::Parts, StatusCode}, - response::{IntoResponse, Response}, - RequestPartsExt, + http::Request, + response::Response, + middleware::Next, }; -use axum_extra::headers::{authorization::Bearer, Authorization}; use axum_extra::TypedHeader; -use crate::{models::User, AuthError}; +use axum_extra::headers::{Authorization, authorization::Bearer}; +use gb_core::User; +use jsonwebtoken::{decode, DecodingKey, Validation}; -impl IntoResponse for AuthError { - fn into_response(self) -> Response { - let (status, error_message) = match self { - AuthError::InvalidToken => (StatusCode::UNAUTHORIZED, "Invalid token"), - AuthError::MissingToken => (StatusCode::UNAUTHORIZED, "Missing token"), - AuthError::TokenExpired => (StatusCode::UNAUTHORIZED, "Token expired"), - AuthError::InvalidCredentials => (StatusCode::UNAUTHORIZED, "Invalid credentials"), - AuthError::AuthenticationFailed => (StatusCode::UNAUTHORIZED, "Authentication failed"), - AuthError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Database error"), - AuthError::Cache(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Cache error"), - AuthError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error"), - }; - - (status, error_message).into_response() - } +#[derive(Debug, Serialize, Deserialize)] +struct Claims { + sub: String, + exp: i64, } -#[async_trait] -impl FromRequestParts for User -where - S: Send + Sync, -{ - type Rejection = AuthError; +pub async fn auth_middleware( + TypedHeader(auth): TypedHeader>, + request: Request, + next: Next, +) -> Result { + let token = auth.token(); + let key = DecodingKey::from_secret(b"secret"); + let validation = Validation::default(); - async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { - let TypedHeader(Authorization(bearer)) = parts - .extract::>>() - .await - .map_err(|_| AuthError::MissingToken)?; - - let token = bearer.token(); - - todo!("Implement token validation") + match decode::(token, &key, &validation) { + Ok(_claims) => { + let response = next.run(request).await; + Ok(response) + } + Err(_) => Err(AuthError::InvalidToken), } } diff --git a/process.rs b/process.rs deleted file mode 100644 index e69de29..0000000