gbserver/gb-api/src/router.rs

180 lines
4.8 KiB
Rust
Raw Normal View History

2024-12-22 20:56:52 -03:00
use axum::{
routing::{get, post},
Router,
2024-12-23 00:54:50 -03:00
extract::{
ws::{WebSocket, Message as WsMessage},
Path, State, WebSocketUpgrade,
},
2024-12-22 20:56:52 -03:00
response::IntoResponse,
Json,
};
use gb_core::{Result, Error, models::*};
2024-12-23 00:54:50 -03:00
use gb_messaging::{MessageProcessor, MessageEnvelope};
2024-12-22 20:56:52 -03:00
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{instrument, error};
use uuid::Uuid;
2024-12-23 00:54:50 -03:00
use futures_util::StreamExt;
use futures_util::SinkExt;
2024-12-22 20:56:52 -03:00
pub struct ApiState {
message_processor: Arc<Mutex<MessageProcessor>>,
}
pub fn create_router(message_processor: MessageProcessor) -> Router {
let state = ApiState {
message_processor: Arc::new(Mutex::new(message_processor)),
};
Router::new()
.route("/health", get(health_check))
.route("/ws", get(websocket_handler))
.route("/messages", post(send_message))
.route("/messages/:id", get(get_message))
.route("/rooms", post(create_room))
.route("/rooms/:id", get(get_room))
.route("/rooms/:id/join", post(join_room))
.with_state(Arc::new(state))
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument]
async fn health_check() -> &'static str {
"OK"
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state, ws))]
async fn websocket_handler(
State(state): State<Arc<ApiState>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
let (mut sender, mut receiver) = socket.split();
while let Some(Ok(msg)) = receiver.next().await {
if let Ok(text) = msg.to_text() {
if let Ok(envelope) = serde_json::from_str::<MessageEnvelope>(text) {
let mut processor = state.message_processor.lock().await;
if let Err(e) = processor.sender().send(envelope).await {
error!("Failed to process WebSocket message: {}", e);
}
}
}
}
})
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state, message))]
async fn send_message(
State(state): State<Arc<ApiState>>,
Json(message): Json<Message>,
) -> Result<Json<MessageId>> {
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message,
metadata: std::collections::HashMap::new(),
};
let mut processor = state.message_processor.lock().await;
processor.sender().send(envelope.clone()).await
2024-12-23 00:54:50 -03:00
.map_err(|e| Error::internal(format!("Failed to send message: {}", e)))?;
2024-12-22 20:56:52 -03:00
Ok(Json(MessageId(envelope.id)))
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state))]
async fn get_message(
State(state): State<Arc<ApiState>>,
Path(id): Path<Uuid>,
) -> Result<Json<Message>> {
todo!()
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state, config))]
async fn create_room(
State(state): State<Arc<ApiState>>,
Json(config): Json<RoomConfig>,
) -> Result<Json<Room>> {
todo!()
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state))]
async fn get_room(
State(state): State<Arc<ApiState>>,
Path(id): Path<Uuid>,
) -> Result<Json<Room>> {
todo!()
}
2024-12-23 00:54:50 -03:00
#[axum::debug_handler]
2024-12-22 20:56:52 -03:00
#[instrument(skip(state))]
async fn join_room(
State(state): State<Arc<ApiState>>,
Path(id): Path<Uuid>,
Json(user_id): Json<Uuid>,
) -> Result<Json<Connection>> {
todo!()
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::StatusCode;
use axum::body::Body;
use tower::ServiceExt;
#[tokio::test]
async fn test_health_check() {
let app = create_router(MessageProcessor::new(100));
let response = app
.oneshot(
axum::http::Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_send_message() {
let app = create_router(MessageProcessor::new(100));
let message = Message {
id: Uuid::new_v4(),
customer_id: Uuid::new_v4(),
instance_id: Uuid::new_v4(),
conversation_id: Uuid::new_v4(),
sender_id: Uuid::new_v4(),
kind: "test".to_string(),
content: "test message".to_string(),
metadata: serde_json::Value::Object(serde_json::Map::new()),
created_at: chrono::Utc::now(),
shard_key: 0,
};
let response = app
.oneshot(
axum::http::Request::builder()
.method("POST")
.uri("/messages")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_string(&message).unwrap()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
}