From 091b5f675d9ec83c95454e75da776e34cd59d52d Mon Sep 17 00:00:00 2001 From: Rodrigo Rodriguez Date: Mon, 23 Dec 2024 00:54:50 -0300 Subject: [PATCH] new(all): Initial import. --- Cargo.lock | 101 +- gb-api/Cargo.toml | 2 +- gb-api/src/router.rs | 22 +- gb-auth/Cargo.toml | 3 +- .../20231201000000_create_auth_tables.sql | 1 + gb-auth/src/lib.rs | 2 + gb-auth/src/middleware/auth_middleware.rs | 39 +- gb-auth/src/services/auth_service.rs | 34 +- gb-automation/src/process.rs | 8 +- gb-automation/src/web.rs | 22 +- gb-core/Cargo.toml | 2 + gb-core/src/errors.rs | 37 +- gb-document/src/excel.rs | 6 +- gb-document/src/pdf.rs | 14 +- gb-document/src/word.rs | 8 +- gb-image/Cargo.toml | 7 +- gb-image/assets/DejaVuSans.ttf | 1794 +++++++++++++++++ gb-image/src/build.rs | 21 + gb-image/src/converter.rs | 118 +- gb-image/src/processor.rs | 155 +- gb-media/src/audio.rs | 23 +- gb-media/src/processor.rs | 174 +- gb-media/src/webrtc.rs | 63 +- gb-monitoring/Cargo.toml | 14 +- gb-storage/src/redis.rs | 6 +- gb-storage/src/tikv.rs | 12 +- gb-utils/src/detector.rs | 2 +- gb-utils/src/processor.rs | 2 +- 28 files changed, 2211 insertions(+), 481 deletions(-) create mode 100644 gb-image/assets/DejaVuSans.ttf create mode 100644 gb-image/src/build.rs diff --git a/Cargo.lock b/Cargo.lock index 6924d8f..fefdf7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -683,6 +683,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core 0.4.5", + "axum-macros", "base64 0.22.1", "bytes", "futures-util", @@ -751,6 +752,38 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-extra" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93e433be9382c737320af3924f7d5fc6f89c155cf2bf88949d8f5126fab283f" +dependencies = [ + "axum 0.6.20", + "axum-core 0.3.4", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "pin-project-lite", + "serde", + "tokio", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", +] + [[package]] name = "backoff" version = "0.4.0" @@ -991,8 +1024,6 @@ version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c31a0499c1dc64f458ad13872de75c0eb7e3fdb0e67964610c914b034fc5956e" dependencies = [ - "jobserver", - "libc", "shlex", ] @@ -1159,16 +1190,6 @@ dependencies = [ "inout", ] -[[package]] -name = "clang" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c044c781163c001b913cd018fc95a628c50d0d2dfea8bca77dad71edb16e37" -dependencies = [ - "clang-sys", - "libc", -] - [[package]] name = "clang-sys" version = "1.8.1" @@ -2349,6 +2370,7 @@ dependencies = [ "argon2", "async-trait", "axum 0.7.9", + "axum-extra", "chrono", "gb-core", "headers", @@ -2401,11 +2423,13 @@ name = "gb-core" version = "0.1.0" dependencies = [ "async-trait", + "axum 0.7.9", "chrono", "mockall", "redis", "rstest", "serde", + "serde_json", "sqlx", "thiserror", "tokio", @@ -2423,7 +2447,7 @@ dependencies = [ "gb-core", "image", "imageproc", - "opencv", + "reqwest 0.11.27", "rstest", "rusttype", "serde", @@ -3605,15 +3629,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" -[[package]] -name = "jobserver" -version = "0.1.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" -dependencies = [ - "libc", -] - [[package]] name = "jpeg-decoder" version = "0.3.1" @@ -4415,39 +4430,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "opencv" -version = "0.82.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79290f5f138b26637cae0ae243d77de871a096e334d3fca22f5ddf31ab6f4cc5" -dependencies = [ - "cc", - "dunce", - "jobserver", - "libc", - "num-traits", - "once_cell", - "opencv-binding-generator", - "pkg-config", - "semver", - "shlex", - "vcpkg", -] - -[[package]] -name = "opencv-binding-generator" -version = "0.66.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be5f640bda28b478629f525e8525601586a2a2b9403a4b8f2264fa5fcfebe6be" -dependencies = [ - "clang", - "clang-sys", - "dunce", - "once_cell", - "percent-encoding", - "regex", -] - [[package]] name = "openid" version = "0.12.1" @@ -6855,9 +6837,9 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" [[package]] name = "tesseract" -version = "0.13.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50ddd63873a3dfc9966138457d320ff327414597e253bc4954dff519dce41b8a" +checksum = "b419c2568ceb602121d4ea2495e4b214ac7f32d5009b74b1ce67765a89c4da54" dependencies = [ "tesseract-plumbing", "tesseract-sys", @@ -6866,9 +6848,9 @@ dependencies = [ [[package]] name = "tesseract-plumbing" -version = "0.9.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd126488bfd02aed61285b6255e010c330d3ca3908d0fa80cb48edcab180c7c" +checksum = "a25fbbb95169954a9262a565fbfb001c4d9dad271d48142e6632a3e2b7314b35" dependencies = [ "leptonica-plumbing", "tesseract-sys", @@ -7458,7 +7440,6 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", - "time", "tracing", "tracing-core", "tracing-log", diff --git a/gb-api/Cargo.toml b/gb-api/Cargo.toml index bf03fad..75e6f08 100644 --- a/gb-api/Cargo.toml +++ b/gb-api/Cargo.toml @@ -10,7 +10,7 @@ gb-core = { path = "../gb-core" } gb-messaging = { path = "../gb-messaging" } gb-monitoring = { path = "../gb-monitoring" } tokio.workspace = true -axum = { version = "0.7.9", features = ["ws", "multipart"] } +axum = { version = "0.7.9", features = ["ws", "multipart", "macros"] } tower.workspace = true tower-http = { version = "0.5", features = ["cors", "trace"] } serde.workspace = true diff --git a/gb-api/src/router.rs b/gb-api/src/router.rs index d8c8d44..d7a430a 100644 --- a/gb-api/src/router.rs +++ b/gb-api/src/router.rs @@ -1,16 +1,21 @@ use axum::{ routing::{get, post}, Router, - extract::{Path, State, WebSocketUpgrade}, + extract::{ + ws::{WebSocket, Message as WsMessage}, + Path, State, WebSocketUpgrade, + }, response::IntoResponse, Json, }; use gb_core::{Result, Error, models::*}; -use gb_messaging::{MessageProcessor, MessageEnvelope}; +use gb_messaging::{MessageProcessor, MessageEnvelope}; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{instrument, error}; use uuid::Uuid; +use futures_util::StreamExt; +use futures_util::SinkExt; pub struct ApiState { message_processor: Arc>, @@ -32,11 +37,13 @@ pub fn create_router(message_processor: MessageProcessor) -> Router { .with_state(Arc::new(state)) } +#[axum::debug_handler] #[instrument] async fn health_check() -> &'static str { "OK" } +#[axum::debug_handler] #[instrument(skip(state, ws))] async fn websocket_handler( State(state): State>, @@ -58,6 +65,7 @@ async fn websocket_handler( }) } +#[axum::debug_handler] #[instrument(skip(state, message))] async fn send_message( State(state): State>, @@ -71,45 +79,45 @@ async fn send_message( let mut processor = state.message_processor.lock().await; processor.sender().send(envelope.clone()).await - .map_err(|e| Error::Internal(format!("Failed to send message: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to send message: {}", e)))?; Ok(Json(MessageId(envelope.id))) } +#[axum::debug_handler] #[instrument(skip(state))] async fn get_message( State(state): State>, Path(id): Path, ) -> Result> { - // Implement message retrieval logic todo!() } +#[axum::debug_handler] #[instrument(skip(state, config))] async fn create_room( State(state): State>, Json(config): Json, ) -> Result> { - // Implement room creation logic todo!() } +#[axum::debug_handler] #[instrument(skip(state))] async fn get_room( State(state): State>, Path(id): Path, ) -> Result> { - // Implement room retrieval logic todo!() } +#[axum::debug_handler] #[instrument(skip(state))] async fn join_room( State(state): State>, Path(id): Path, Json(user_id): Json, ) -> Result> { - // Implement room joining logic todo!() } diff --git a/gb-auth/Cargo.toml b/gb-auth/Cargo.toml index 538d708..1c3af97 100644 --- a/gb-auth/Cargo.toml +++ b/gb-auth/Cargo.toml @@ -23,8 +23,9 @@ async-trait.workspace = true # Web Framework axum = { version = "0.7.9" } +axum-extra = { version = "0.7.4" } tower = "0.4" -tower-http = { version = "0.5", features = ["auth", "cors"] } +tower-http = { version = "0.5", features = ["auth", "cors", "trace"] } headers = "0.3" # Database diff --git a/gb-auth/migrations/20231201000000_create_auth_tables.sql b/gb-auth/migrations/20231201000000_create_auth_tables.sql index bd60bf1..efa5c31 100644 --- a/gb-auth/migrations/20231201000000_create_auth_tables.sql +++ b/gb-auth/migrations/20231201000000_create_auth_tables.sql @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS users ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), email VARCHAR(255) NOT NULL UNIQUE, + name VARCHAR(255), password_hash VARCHAR(255) NOT NULL, role VARCHAR(50) NOT NULL DEFAULT 'user', status VARCHAR(50) NOT NULL DEFAULT 'active', diff --git a/gb-auth/src/lib.rs b/gb-auth/src/lib.rs index a020117..11f096a 100644 --- a/gb-auth/src/lib.rs +++ b/gb-auth/src/lib.rs @@ -16,6 +16,8 @@ pub enum AuthError { TokenExpired, #[error("Invalid token")] InvalidToken, + #[error("Missing token")] + MissingToken, #[error("Database error: {0}")] Database(#[from] sqlx::Error), #[error("Cache error: {0}")] diff --git a/gb-auth/src/middleware/auth_middleware.rs b/gb-auth/src/middleware/auth_middleware.rs index 1938d32..ace2057 100644 --- a/gb-auth/src/middleware/auth_middleware.rs +++ b/gb-auth/src/middleware/auth_middleware.rs @@ -1,16 +1,30 @@ use axum::{ async_trait, - extract::{FromRequestParts, TypedHeader}, - headers::{authorization::Bearer, Authorization}, - http::request::Parts, + extract::FromRequestParts, + http::{request::Parts, StatusCode}, + response::{IntoResponse, Response}, RequestPartsExt, }; -use jsonwebtoken::{decode, DecodingKey, Validation}; +use axum_extra::headers::{authorization::Bearer, Authorization}; +use axum_extra::TypedHeader; +use crate::{models::User, AuthError}; -use crate::{ - models::User, - AuthError, -}; +impl IntoResponse for AuthError { + fn into_response(self) -> Response { + let (status, error_message) = match self { + AuthError::InvalidToken => (StatusCode::UNAUTHORIZED, "Invalid token"), + AuthError::MissingToken => (StatusCode::UNAUTHORIZED, "Missing token"), + AuthError::TokenExpired => (StatusCode::UNAUTHORIZED, "Token expired"), + AuthError::InvalidCredentials => (StatusCode::UNAUTHORIZED, "Invalid credentials"), + AuthError::AuthenticationFailed => (StatusCode::UNAUTHORIZED, "Authentication failed"), + AuthError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Database error"), + AuthError::Cache(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Cache error"), + AuthError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error"), + }; + + (status, error_message).into_response() + } +} #[async_trait] impl FromRequestParts for User @@ -23,9 +37,10 @@ where let TypedHeader(Authorization(bearer)) = parts .extract::>>() .await - .map_err(|_| AuthError::InvalidToken)?; + .map_err(|_| AuthError::MissingToken)?; - // Implement token validation and user extraction - todo!() + let token = bearer.token(); + + todo!("Implement token validation") } -} \ No newline at end of file +} diff --git a/gb-auth/src/services/auth_service.rs b/gb-auth/src/services/auth_service.rs index 0bc0e6e..52e347f 100644 --- a/gb-auth/src/services/auth_service.rs +++ b/gb-auth/src/services/auth_service.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use sqlx::PgPool; use argon2::{ - password_hash::{PasswordHash, PasswordHasher, SaltString}, + password_hash::{PasswordHash, PasswordHasher, SaltString, PasswordVerifier}, Argon2, }; use rand::rngs::OsRng; @@ -15,7 +15,7 @@ use crate::{ pub struct AuthService { db: Arc, jwt_secret: String, - jwt_expiration: i64, + jwt_expiration: i64 } impl AuthService { @@ -69,7 +69,31 @@ impl AuthService { } fn generate_token(&self, user: &User) -> Result { - // Token generation implementation - Ok("token".to_string()) + use jsonwebtoken::{encode, EncodingKey, Header}; + use serde::{Serialize, Deserialize}; + use chrono::{Utc, Duration}; + + #[derive(Debug, Serialize, Deserialize)] + struct Claims { + sub: String, + exp: i64, + iat: i64, + } + + let now = Utc::now(); + let exp = now + Duration::seconds(self.jwt_expiration); + + let claims = Claims { + sub: user.id.to_string(), + exp: exp.timestamp(), + iat: now.timestamp(), + }; + + encode( + &Header::default(), + &claims, + &EncodingKey::from_secret(self.jwt_secret.as_bytes()), + ) + .map_err(|e| AuthError::Internal(e.to_string())) } -} \ No newline at end of file +} diff --git a/gb-automation/src/process.rs b/gb-automation/src/process.rs index 40b6950..81f20cf 100644 --- a/gb-automation/src/process.rs +++ b/gb-automation/src/process.rs @@ -33,10 +33,10 @@ impl ProcessAutomation { .stdout(Stdio::piped()) .stderr(Stdio::piped()) .output() - .map_err(|e| Error::Internal(format!("Failed to execute command: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to execute command: {}", e)))?; let error = String::from_utf8_lossy(&output.stderr); - return Err(Error::Internal(format!("Command failed: {}", error))); + return Err(Error::internal(format!("Command failed: {}", error))); } let stdout = String::from_utf8_lossy(&output.stdout).to_string(); @@ -51,7 +51,7 @@ impl ProcessAutomation { .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() - .map_err(|e| Error::Internal(format!("Failed to spawn process: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to spawn process: {}", e)))?; let process = Process { id: Uuid::new_v4(), @@ -71,7 +71,7 @@ impl ProcessAutomation { if let Some(index) = processes.iter().position(|p| p.id == id) { let process = processes.remove(index); process.handle.kill() - .map_err(|e| Error::Internal(format!("Failed to kill process: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to kill process: {}", e)))?; } Ok(()) diff --git a/gb-automation/src/web.rs b/gb-automation/src/web.rs index 3a1cbe0..84a0aa1 100644 --- a/gb-automation/src/web.rs +++ b/gb-automation/src/web.rs @@ -24,7 +24,7 @@ impl WebAutomation { let (browser, mut handler) = Browser::launch(config) .await - .map_err(|e| Error::Internal(format!("Failed to launch browser: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to launch browser: {}", e)))?; tokio::spawn(async move { while let Some(h) = handler.next().await { @@ -44,7 +44,7 @@ impl WebAutomation { pub async fn new_page(&self) -> Result { let page = self.browser.new_page() .await - .map_err(|e| Error::Internal(format!("Failed to create page: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create page: {}", e)))?; let mut pages = self.pages.lock().await; pages.push(page.clone()); @@ -56,11 +56,11 @@ impl WebAutomation { pub async fn navigate(&self, page: &Page, url: &str) -> Result<()> { page.goto(url) .await - .map_err(|e| Error::Internal(format!("Failed to navigate: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to navigate: {}", e)))?; page.wait_for_navigation() .await - .map_err(|e| Error::Internal(format!("Failed to wait for navigation: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to wait for navigation: {}", e)))?; Ok(()) } @@ -69,7 +69,7 @@ impl WebAutomation { pub async fn get_element(&self, page: &Page, selector: &str) -> Result { let element = page.find_element(selector) .await - .map_err(|e| Error::Internal(format!("Failed to find element: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to find element: {}", e)))?; Ok(Element { inner: element }) } @@ -78,7 +78,7 @@ impl WebAutomation { pub async fn click(&self, element: &Element) -> Result<()> { element.inner.click() .await - .map_err(|e| Error::Internal(format!("Failed to click: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to click: {}", e)))?; Ok(()) } @@ -87,7 +87,7 @@ impl WebAutomation { pub async fn type_text(&self, element: &Element, text: &str) -> Result<()> { element.inner.type_str(text) .await - .map_err(|e| Error::Internal(format!("Failed to type text: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to type text: {}", e)))?; Ok(()) } @@ -96,7 +96,7 @@ impl WebAutomation { pub async fn screenshot(&self, page: &Page, path: &str) -> Result> { let screenshot = page.screenshot(ScreenshotFormat::PNG, None, true) .await - .map_err(|e| Error::Internal(format!("Failed to take screenshot: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to take screenshot: {}", e)))?; Ok(screenshot) } @@ -105,7 +105,7 @@ impl WebAutomation { pub async fn wait_for_selector(&self, page: &Page, selector: &str) -> Result<()> { page.wait_for_element(selector) .await - .map_err(|e| Error::Internal(format!("Failed to wait for selector: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to wait for selector: {}", e)))?; Ok(()) } @@ -127,7 +127,7 @@ impl WebAutomation { tokio::time::sleep(Duration::from_secs(1)).await; } - Err(Error::Internal("Network did not become idle".to_string())) + Err(Error::internal("Network did not become idle".to_string())) } } @@ -153,7 +153,7 @@ mod tests { let title = page.title() .await - .map_err(|e| Error::Internal(format!("Failed to get title: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to get title: {}", e)))?; assert!(title.contains("Example")); Ok(()) diff --git a/gb-core/Cargo.toml b/gb-core/Cargo.toml index e545ba2..850aff8 100644 --- a/gb-core/Cargo.toml +++ b/gb-core/Cargo.toml @@ -16,6 +16,8 @@ chrono.workspace = true sqlx.workspace = true redis.workspace = true tracing.workspace = true +axum = { version = "0.7", features = ["json"] } +serde_json = "1.0" [dev-dependencies] mockall.workspace = true diff --git a/gb-core/src/errors.rs b/gb-core/src/errors.rs index a04b5c0..cdde33d 100644 --- a/gb-core/src/errors.rs +++ b/gb-core/src/errors.rs @@ -1,5 +1,11 @@ use thiserror::Error; - +use axum::{ + response::{IntoResponse, Response}, + http::StatusCode, + Json, +}; +use serde_json::json; + #[derive(Error, Debug)] pub enum ErrorKind { #[error("Database error: {0}")] @@ -79,4 +85,31 @@ impl std::fmt::Display for Error { impl std::error::Error for Error {} -pub type Result = std::result::Result; \ No newline at end of file +pub type Result = std::result::Result; + +#[derive(Debug)] +pub enum AuthError { + InvalidToken, + MissingToken, + InvalidCredentials, + Internal(String), +} + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let status = match self.kind { + ErrorKind::NotFound(_) => StatusCode::NOT_FOUND, + ErrorKind::Authentication(_) => StatusCode::UNAUTHORIZED, + ErrorKind::Authorization(_) => StatusCode::FORBIDDEN, + ErrorKind::InvalidInput(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + + let body = Json(json!({ + "error": self.message, + "kind": format!("{:?}", self.kind) + })); + + (status, body).into_response() + } +} diff --git a/gb-document/src/excel.rs b/gb-document/src/excel.rs index 1d2e1cd..cb774e8 100644 --- a/gb-document/src/excel.rs +++ b/gb-document/src/excel.rs @@ -10,12 +10,12 @@ impl ExcelProcessor { pub fn extract_data(data: &[u8]) -> Result>> { let cursor = Cursor::new(data); let mut workbook = Xlsx::new(cursor) - .map_err(|e| Error::Internal(format!("Failed to read Excel file: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to read Excel file: {}", e)))?; let sheet_name = workbook.sheet_names()[0].clone(); let range = workbook.worksheet_range(&sheet_name) - .ok_or_else(|| Error::Internal("Failed to get worksheet".to_string()))? - .map_err(|e| Error::Internal(format!("Failed to read range: {}", e)))?; + .ok_or_else(|| Error::internal("Failed to get worksheet".to_string()))? + .map_err(|e| Error::internal(format!("Failed to read range: {}", e)))?; let mut result = Vec::new(); for row in range.rows() { diff --git a/gb-document/src/pdf.rs b/gb-document/src/pdf.rs index 7139ad5..c6f8073 100644 --- a/gb-document/src/pdf.rs +++ b/gb-document/src/pdf.rs @@ -9,7 +9,7 @@ impl PdfProcessor { #[instrument(skip(data))] pub fn extract_text(data: &[u8]) -> Result { let doc = Document::load_from(Cursor::new(data)) - .map_err(|e| Error::Internal(format!("Failed to load PDF: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?; let mut text = String::new(); for page_num in 1..=doc.get_pages().len() { @@ -25,10 +25,10 @@ impl PdfProcessor { #[instrument(skip(doc))] fn extract_page_text(doc: &Document, page_num: u32) -> Result { let page = doc.get_page(page_num) - .map_err(|e| Error::Internal(format!("Failed to get page {}: {}", page_num, e)))?; + .map_err(|e| Error::internal(format!("Failed to get page {}: {}", page_num, e)))?; let contents = doc.get_page_content(page) - .map_err(|e| Error::Internal(format!("Failed to get page content: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to get page content: {}", e)))?; let mut text = String::new(); for content in contents.iter() { @@ -49,7 +49,7 @@ impl PdfProcessor { for pdf_data in pdfs { let doc = Document::load_from(Cursor::new(pdf_data)) - .map_err(|e| Error::Internal(format!("Failed to load PDF: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?; for (_, page) in doc.get_pages() { merged.add_page(page.clone()); @@ -59,7 +59,7 @@ impl PdfProcessor { let mut output = Vec::new(); merged.save_to(&mut Cursor::new(&mut output)) - .map_err(|e| Error::Internal(format!("Failed to save merged PDF: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to save merged PDF: {}", e)))?; Ok(output) } @@ -67,7 +67,7 @@ impl PdfProcessor { #[instrument(skip(data))] pub fn split_pdf(data: &[u8], pages: &[u32]) -> Result>> { let doc = Document::load_from(Cursor::new(data)) - .map_err(|e| Error::Internal(format!("Failed to load PDF: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?; let mut result = Vec::new(); for &page_num in pages { @@ -76,7 +76,7 @@ impl PdfProcessor { new_doc.add_page(page.clone()); let mut output = Vec::new(); new_doc.save_to(&mut Cursor::new(&mut output)) - .map_err(|e| Error::Internal(format!("Failed to save split PDF: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to save split PDF: {}", e)))?; result.push(output); } } diff --git a/gb-document/src/word.rs b/gb-document/src/word.rs index db327a0..4607a91 100644 --- a/gb-document/src/word.rs +++ b/gb-document/src/word.rs @@ -9,7 +9,7 @@ impl WordProcessor { #[instrument(skip(data))] pub fn extract_text(data: &[u8]) -> Result { let doc = Docx::from_reader(Cursor::new(data)) - .map_err(|e| Error::Internal(format!("Failed to read DOCX: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to read DOCX: {}", e)))?; let mut text = String::new(); for para in doc.document.paragraphs() { @@ -40,7 +40,7 @@ impl WordProcessor { let mut output = Vec::new(); docx.build() .pack(&mut Cursor::new(&mut output)) - .map_err(|e| Error::Internal(format!("Failed to create DOCX: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create DOCX: {}", e)))?; Ok(output) } @@ -48,7 +48,7 @@ impl WordProcessor { #[instrument(skip(template_data, variables))] pub fn fill_template(template_data: &[u8], variables: &serde_json::Value) -> Result> { let doc = Docx::from_reader(Cursor::new(template_data)) - .map_err(|e| Error::Internal(format!("Failed to read template: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to read template: {}", e)))?; let mut new_doc = doc.clone(); @@ -68,7 +68,7 @@ impl WordProcessor { let mut output = Vec::new(); new_doc.build() .pack(&mut Cursor::new(&mut output)) - .map_err(|e| Error::Internal(format!("Failed to save filled template: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to save filled template: {}", e)))?; Ok(output) } diff --git a/gb-image/Cargo.toml b/gb-image/Cargo.toml index e362ddc..11356a2 100644 --- a/gb-image/Cargo.toml +++ b/gb-image/Cargo.toml @@ -10,8 +10,7 @@ gb-core = { path = "../gb-core" } image = { version = "0.24", features = ["webp", "jpeg", "png", "gif"] } imageproc = "0.23" rusttype = "0.9" -tesseract = "0.13" -opencv = { version = "0.82", features = ["clang-runtime"] } +tesseract = "0.12" async-trait.workspace = true tokio.workspace = true serde.workspace = true @@ -23,3 +22,7 @@ tracing.workspace = true rstest.workspace = true tokio-test = "0.4" tempfile = "3.8" + + +[build-dependencies] +reqwest = { version = "0.11", features = ["blocking"] } \ No newline at end of file diff --git a/gb-image/assets/DejaVuSans.ttf b/gb-image/assets/DejaVuSans.ttf new file mode 100644 index 0000000..216e450 --- /dev/null +++ b/gb-image/assets/DejaVuSans.ttf @@ -0,0 +1,1794 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Page not found · GitHub · GitHub + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + +
+ Skip to content + + + + + + + + + + + + + + + + + + + + +
+
+ + + + + + + + + + + + + + +
+ +
+ + + + + + + + +
+ + + + + +
+ + + + + + + + + +
+
+ + + +
+
+ +
+
+ 404 “This is not the web page you are looking for” + + + + + + + + + + + + +
+
+ +
+
+ +
+ + +
+
+ +
+ +
+ +
+ + + + + + + + + + + + + + + + + + + + + +
+ +
+
+ + + diff --git a/gb-image/src/build.rs b/gb-image/src/build.rs new file mode 100644 index 0000000..ff4fa99 --- /dev/null +++ b/gb-image/src/build.rs @@ -0,0 +1,21 @@ +use std::fs; +use std::path::Path; + +fn main() { + let assets_dir = Path::new("assets"); + if !assets_dir.exists() { + fs::create_dir(assets_dir).expect("Failed to create assets directory"); + } + + let font_path = assets_dir.join("DejaVuSans.ttf"); + if !font_path.exists() { + let font_url = "https://github.com/dejavu-fonts/dejavu-fonts/raw/master/ttf/DejaVuSans.ttf"; + let response = reqwest::blocking::get(font_url) + .expect("Failed to download font") + .bytes() + .expect("Failed to get font bytes"); + + fs::write(font_path, response) + .expect("Failed to save font file"); + } +} \ No newline at end of file diff --git a/gb-image/src/converter.rs b/gb-image/src/converter.rs index 15282f3..2294472 100644 --- a/gb-image/src/converter.rs +++ b/gb-image/src/converter.rs @@ -1,118 +1,64 @@ use gb_core::{Result, Error}; -use image::{ - DynamicImage, ImageOutputFormat, - codecs::{webp, jpeg, png, gif}, -}; -use std::io::Cursor; +use image::{DynamicImage, ImageFormat, codecs::webp}; use tracing::instrument; -pub struct ImageConverter; - -impl ImageConverter { #[instrument] - pub fn to_webp(image: &DynamicImage, quality: u8) -> Result> { - let mut buffer = Cursor::new(Vec::new()); - let encoder = webp::WebPEncoder::new_with_quality(&mut buffer, quality as f32); - - encoder.encode( - image.as_bytes(), - image.width(), - image.height(), - image.color(), - ).map_err(|e| Error::Internal(format!("WebP conversion failed: {}", e)))?; - - Ok(buffer.into_inner()) - } - - #[instrument] - pub fn to_jpeg(image: &DynamicImage, quality: u8) -> Result> { - let mut buffer = Cursor::new(Vec::new()); - image.write_to(&mut buffer, ImageOutputFormat::Jpeg(quality)) - .map_err(|e| Error::Internal(format!("JPEG conversion failed: {}", e)))?; - - Ok(buffer.into_inner()) - } - - #[instrument] - pub fn to_png(image: &DynamicImage) -> Result> { - let mut buffer = Cursor::new(Vec::new()); - image.write_to(&mut buffer, ImageOutputFormat::Png) - .map_err(|e| Error::Internal(format!("PNG conversion failed: {}", e)))?; - - Ok(buffer.into_inner()) - } - - #[instrument] - pub fn to_gif(image: &DynamicImage) -> Result> { - let mut buffer = Cursor::new(Vec::new()); - image.write_to(&mut buffer, ImageOutputFormat::Gif) - .map_err(|e| Error::Internal(format!("GIF conversion failed: {}", e)))?; - - Ok(buffer.into_inner()) - } - - #[instrument] - pub fn get_format(data: &[u8]) -> Result { - let format = image::guess_format(data) - .map_err(|e| Error::Internal(format!("Failed to determine format: {}", e)))?; - +pub fn convert_to_format(image_data: &[u8], format: ImageFormat) -> Result> { + let img = image::load_from_memory(image_data) + .map_err(|e| Error::internal(format!("Failed to load image: {}", e)))?; + let mut output = Vec::new(); match format { - image::ImageFormat::WebP => Ok(ImageFormat::WebP), - image::ImageFormat::Jpeg => Ok(ImageFormat::Jpeg), - image::ImageFormat::Png => Ok(ImageFormat::Png), - image::ImageFormat::Gif => Ok(ImageFormat::Gif), - _ => Err(Error::Internal("Unsupported format".to_string())), + ImageFormat::Jpeg => { + img.write_to(&mut output, ImageFormat::Jpeg) + .map_err(|e| Error::internal(format!("JPEG conversion failed: {}", e)))?; } + ImageFormat::Png => { + img.write_to(&mut output, ImageFormat::Png) + .map_err(|e| Error::internal(format!("PNG conversion failed: {}", e)))?; } + ImageFormat::WebP => { + img.write_to(&mut output, ImageFormat::WebP) + .map_err(|e| Error::internal(format!("WebP conversion failed: {}", e)))?; } + _ => return Err(Error::internal("Unsupported format".to_string())), + } -#[derive(Debug, Clone, PartialEq)] -pub enum ImageFormat { - WebP, - Jpeg, - Png, - Gif, + Ok(output) } - #[cfg(test)] mod tests { use super::*; use rstest::*; #[fixture] - fn test_image() -> DynamicImage { - DynamicImage::new_rgb8(100, 100) + fn test_image() -> Vec { + let img = DynamicImage::new_rgb8(100, 100); + let mut buffer = Vec::new(); + img.write_to(&mut buffer, ImageFormat::Png).unwrap(); + buffer } #[rstest] - fn test_webp_conversion(test_image: DynamicImage) -> Result<()> { - let webp_data = ImageConverter::to_webp(&test_image, 80)?; - assert!(!webp_data.is_empty()); - assert_eq!(ImageConverter::get_format(&webp_data)?, ImageFormat::WebP); - Ok(()) - } - - #[rstest] - fn test_jpeg_conversion(test_image: DynamicImage) -> Result<()> { - let jpeg_data = ImageConverter::to_jpeg(&test_image, 80)?; + fn test_jpeg_conversion(test_image: Vec) -> Result<()> { + let jpeg_data = convert_to_format(&test_image, ImageFormat::Jpeg)?; assert!(!jpeg_data.is_empty()); - assert_eq!(ImageConverter::get_format(&jpeg_data)?, ImageFormat::Jpeg); + assert_eq!(image::guess_format(&jpeg_data).unwrap(), ImageFormat::Jpeg); Ok(()) } #[rstest] - fn test_png_conversion(test_image: DynamicImage) -> Result<()> { - let png_data = ImageConverter::to_png(&test_image)?; + fn test_png_conversion(test_image: Vec) -> Result<()> { + let png_data = convert_to_format(&test_image, ImageFormat::Png)?; assert!(!png_data.is_empty()); - assert_eq!(ImageConverter::get_format(&png_data)?, ImageFormat::Png); + assert_eq!(image::guess_format(&png_data).unwrap(), ImageFormat::Png); Ok(()) } #[rstest] - fn test_gif_conversion(test_image: DynamicImage) -> Result<()> { - let gif_data = ImageConverter::to_gif(&test_image)?; - assert!(!gif_data.is_empty()); - assert_eq!(ImageConverter::get_format(&gif_data)?, ImageFormat::Gif); + fn test_webp_conversion(test_image: Vec) -> Result<()> { + let webp_data = convert_to_format(&test_image, ImageFormat::WebP)?; + assert!(!webp_data.is_empty()); + assert_eq!(image::guess_format(&webp_data).unwrap(), ImageFormat::WebP); Ok(()) } } diff --git a/gb-image/src/processor.rs b/gb-image/src/processor.rs index 0ed2b28..2a838d3 100644 --- a/gb-image/src/processor.rs +++ b/gb-image/src/processor.rs @@ -1,15 +1,28 @@ use gb_core::{Result, Error}; use image::{ - DynamicImage, ImageBuffer, Rgba, GenericImageView, - imageops::{blur, brighten, contrast}, + DynamicImage, Rgba, }; use imageproc::{ - drawing::{draw_text_mut, draw_filled_rect_mut}, - rect::Rect, + drawing::draw_text_mut, }; use rusttype::{Font, Scale}; use std::path::Path; -use tracing::{instrument, error}; +use tracing::instrument; +use std::convert::TryInto; + +pub struct ProcessingOptions { + pub crop: Option, + pub watermark: Option, + pub x: i32, + pub y: i32, +} + +pub struct CropParams { + pub x: u32, + pub y: u32, + pub width: u32, + pub height: u32, +} pub struct ImageProcessor { default_font: Font<'static>, @@ -19,59 +32,51 @@ impl ImageProcessor { pub fn new() -> Result { let font_data = include_bytes!("../assets/DejaVuSans.ttf"); let font = Font::try_from_bytes(font_data) - .ok_or_else(|| Error::Internal("Failed to load font".to_string()))?; + .ok_or_else(|| Error::internal("Failed to load font"))?; Ok(Self { default_font: font, }) } + pub fn process_image(&self, mut image: DynamicImage, options: &ProcessingOptions) -> Result { + if let Some(crop) = &options.crop { + let cropped = image.crop_imm( + crop.x, + crop.y, + crop.width, + crop.height + ); + image = cropped; + } + + if let Some(watermark) = &options.watermark { + let x: i64 = options.x.try_into().map_err(|_| Error::internal("Invalid x coordinate"))?; + let y: i64 = options.y.try_into().map_err(|_| Error::internal("Invalid y coordinate"))?; + image::imageops::overlay(&mut image, watermark, x, y); + } + + Ok(image) + } + #[instrument(skip(self, image_data))] pub fn load_image(&self, image_data: &[u8]) -> Result { image::load_from_memory(image_data) - .map_err(|e| Error::Internal(format!("Failed to load image: {}", e))) + .map_err(|e| Error::internal(format!("Failed to load image: {}", e))) } #[instrument(skip(self, image))] pub fn save_image(&self, image: &DynamicImage, path: &Path) -> Result<()> { image.save(path) - .map_err(|e| Error::Internal(format!("Failed to save image: {}", e))) - } - - #[instrument(skip(self, image))] - pub fn resize(&self, image: &DynamicImage, width: u32, height: u32) -> DynamicImage { - image.resize(width, height, image::imageops::FilterType::Lanczos3) + .map_err(|e| Error::internal(format!("Failed to save image: {}", e))) } #[instrument(skip(self, image))] pub fn crop(&self, image: &DynamicImage, x: u32, y: u32, width: u32, height: u32) -> Result { - image.crop_imm(x, y, width, height) - .map_err(|e| Error::Internal(format!("Failed to crop image: {}", e))) - .map(|img| img.to_owned()) + Ok(image.crop_imm(x, y, width, height)) } #[instrument(skip(self, image))] - pub fn apply_blur(&self, image: &DynamicImage, sigma: f32) -> DynamicImage { - let mut img = image.clone(); - blur(&mut img, sigma); - img - } - - #[instrument(skip(self, image))] - pub fn adjust_brightness(&self, image: &DynamicImage, value: i32) -> DynamicImage { - let mut img = image.clone(); - brighten(&mut img, value); - img - } - - #[instrument(skip(self, image))] - pub fn adjust_contrast(&self, image: &DynamicImage, value: f32) -> DynamicImage { - let mut img = image.clone(); - contrast(&mut img, value); - img - } - - #[instrument(skip(self, image, text))] pub fn add_text( &self, image: &mut DynamicImage, @@ -106,6 +111,8 @@ impl ImageProcessor { x: u32, y: u32, ) -> Result<()> { + let x: i64 = x.try_into().map_err(|_| Error::internal("Invalid x coordinate"))?; + let y: i64 = y.try_into().map_err(|_| Error::internal("Invalid y coordinate"))?; image::imageops::overlay(image, watermark, x, y); Ok(()) } @@ -114,68 +121,27 @@ impl ImageProcessor { pub fn extract_text(&self, image: &DynamicImage) -> Result { use tesseract::Tesseract; - let mut temp_file = tempfile::NamedTempFile::new() - .map_err(|e| Error::Internal(format!("Failed to create temp file: {}", e)))?; + let temp_file = tempfile::NamedTempFile::new() + .map_err(|e| Error::internal(format!("Failed to create temp file: {}", e)))?; image.save(&temp_file) - .map_err(|e| Error::Internal(format!("Failed to save temp image: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to save temp image: {}", e)))?; - let text = Tesseract::new(None, Some("eng")) - .map_err(|e| Error::Internal(format!("Failed to initialize Tesseract: {}", e)))? - .set_image_from_path(temp_file.path()) - .map_err(|e| Error::Internal(format!("Failed to set image: {}", e)))? - .recognize() - .map_err(|e| Error::Internal(format!("Failed to recognize text: {}", e)))? - .get_text() - .map_err(|e| Error::Internal(format!("Failed to get text: {}", e)))?; + let mut api = Tesseract::new(None, Some("eng")) + .map_err(|e| Error::internal(format!("Failed to initialize Tesseract: {}", e)))?; - Ok(text) - } + api.set_image(temp_file.path()) + .map_err(|e| Error::internal(format!("Failed to set image: {}", e)))?; - #[instrument(skip(self, image))] - pub fn detect_faces(&self, image: &DynamicImage) -> Result> { - use opencv::{ - core, - objdetect::CascadeClassifier, - prelude::*, - types::VectorOfRect, - }; + api.recognize() + .map_err(|e| Error::internal(format!("Failed to recognize text: {}", e)))?; - let mut classifier = CascadeClassifier::new(&format!( - "{}/haarcascade_frontalface_default.xml", - std::env::var("OPENCV_DATA_PATH") - .unwrap_or_else(|_| "/usr/share/opencv4".to_string()) - )).map_err(|e| Error::Internal(format!("Failed to load classifier: {}", e)))?; - - let mut img = core::Mat::new_rows_cols_with_default( - image.height() as i32, - image.width() as i32, - core::CV_8UC3, - core::Scalar::all(0.0), - ).map_err(|e| Error::Internal(format!("Failed to create Mat: {}", e)))?; - - // Convert DynamicImage to OpenCV Mat - let rgb = image.to_rgb8(); - unsafe { - img.set_data(rgb.as_raw().as_ptr() as *mut u8, core::CV_8UC3)?; - } - - let mut faces = VectorOfRect::new(); - classifier.detect_multi_scale( - &img, - &mut faces, - 1.1, - 3, - 0, - core::Size::new(30, 30), - core::Size::new(0, 0), - ).map_err(|e| Error::Internal(format!("Face detection failed: {}", e)))?; - - Ok(faces.iter().map(|r| Rect::at(r.x, r.y).of_size(r.width as u32, r.height as u32)) - .collect()) + api.get_text() + .map_err(|e| Error::internal(format!("Failed to get text: {}", e))) } } + #[cfg(test)] mod tests { use super::*; @@ -235,11 +201,4 @@ mod tests { assert!(text.contains("Test OCR")); Ok(()) } - - #[rstest] - fn test_detect_faces(processor: ImageProcessor, test_image: DynamicImage) -> Result<()> { - let faces = processor.detect_faces(&test_image)?; - assert!(faces.is_empty()); // Test image has no faces - Ok(()) - } -} +} \ No newline at end of file diff --git a/gb-media/src/audio.rs b/gb-media/src/audio.rs index 8102136..bf00b1e 100644 --- a/gb-media/src/audio.rs +++ b/gb-media/src/audio.rs @@ -1,5 +1,6 @@ use gb_core::{Result, Error}; use opus::{Decoder, Encoder}; +use opus::{Decoder, Encoder, Channels, Application}; use std::io::Cursor; use tracing::{instrument, error}; @@ -26,14 +27,20 @@ impl AudioProcessor { opus::Channels::Stereo }, opus::Application::Voip, - ).map_err(|e| Error::Internal(format!("Failed to create Opus encoder: {}", e)))?; + ).map_err(|e| Error::internal(format!("Failed to create Opus encoder: {}", e)))?; + u32::try_from(self.sample_rate).map_err(|e| Error::internal(format!("Invalid sample rate: {}", e)))?, + Channels::Mono, + Application::Voip + ).map_err(|e| Error::internal(format!("Failed to create Opus encoder: {}", e)))?; let mut output = vec![0u8; 1024]; let encoded_len = encoder.encode(input, &mut output) - .map_err(|e| Error::Internal(format!("Failed to encode audio: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to encode audio: {}", e)))?; output.truncate(encoded_len); Ok(output) + encoder.encode(input) + .map_err(|e| Error::internal(format!("Failed to encode audio: {}", e))) } #[instrument(skip(self, input))] @@ -45,14 +52,19 @@ impl AudioProcessor { } else { opus::Channels::Stereo }, - ).map_err(|e| Error::Internal(format!("Failed to create Opus decoder: {}", e)))?; + ).map_err(|e| Error::internal(format!("Failed to create Opus decoder: {}", e)))?; + u32::try_from(self.sample_rate).map_err(|e| Error::internal(format!("Invalid sample rate: {}", e)))?, + Channels::Mono + ).map_err(|e| Error::internal(format!("Failed to create Opus decoder: {}", e)))?; let mut output = vec![0i16; 1024]; let decoded_len = decoder.decode(input, &mut output, false) - .map_err(|e| Error::Internal(format!("Failed to decode audio: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to decode audio: {}", e)))?; output.truncate(decoded_len); Ok(output) + decoder.decode(input) + .map_err(|e| Error::internal(format!("Failed to decode audio: {}", e))) } } @@ -79,6 +91,7 @@ mod tests { let value = (2.0 * std::f32::consts::PI * frequency * t).sin(); let sample = (value * i16::MAX as f32) as i16; vec![sample, sample] // Stereo + vec![sample, sample] }) .collect() } @@ -91,7 +104,7 @@ mod tests { // Verify basic properties assert!(!encoded.is_empty()); assert!(!decoded.is_empty()); - + // Opus is lossy, so we can't compare exact values // But we can verify the length is the same assert_eq!(decoded.len(), test_audio.len()); diff --git a/gb-media/src/processor.rs b/gb-media/src/processor.rs index afc5ea6..d84f473 100644 --- a/gb-media/src/processor.rs +++ b/gb-media/src/processor.rs @@ -1,19 +1,48 @@ -use async_trait::async_trait; -use gb_core::{Result, Error}; -use gstreamer as gst; -use std::path::PathBuf; -use tracing::{instrument, error}; - -pub struct MediaProcessor { - pipeline: gst::Pipeline, -} +use gstreamer::{self as gst, prelude::*}; +use gstreamer::prelude::{ + ElementExt, + GstBinExtManual, + GstObjectExt, +}; impl MediaProcessor { pub fn new() -> Result { - gst::init().map_err(|e| Error::Internal(format!("Failed to initialize GStreamer: {}", e)))?; + gst::init().map_err(|e| Error::internal(format!("Failed to initialize GStreamer: {}", e)))?; let pipeline = gst::Pipeline::new(None); - Ok(Self { pipeline }) + + Ok(Self { + pipeline, + }) + } + + fn setup_pipeline(&mut self) -> Result<()> { + self.pipeline.set_state(gst::State::Playing) + .map_err(|e| Error::internal(format!("Failed to start pipeline: {}", e)))?; + + let bus = self.pipeline.bus().expect("Pipeline without bus"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Error(err) => { + error!("Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + return Err(Error::internal(format!("Pipeline error: {}", err.error()))); + } + MessageView::Eos(_) => break, + _ => (), + } + } + + self.pipeline.set_state(gst::State::Null) + .map_err(|e| Error::internal(format!("Failed to stop pipeline: {}", e)))?; + + Ok(()) } #[instrument(skip(self, input_path, output_path))] @@ -26,49 +55,26 @@ impl MediaProcessor { let src = gst::ElementFactory::make("filesrc") .property("location", input_path.to_str().unwrap()) .build() - .map_err(|e| Error::Internal(format!("Failed to create source element: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create source element: {}", e)))?; let sink = gst::ElementFactory::make("filesink") .property("location", output_path.to_str().unwrap()) .build() - .map_err(|e| Error::Internal(format!("Failed to create sink element: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create sink element: {}", e)))?; let decoder = match format { "h264" => gst::ElementFactory::make("h264parse").build(), - "opus" => gst::ElementFactory::make("opusparse").build(), + "opus" => gst::ElementFactory::make("opusparse").build(), _ => return Err(Error::InvalidInput(format!("Unsupported format: {}", format))), - }.map_err(|e| Error::Internal(format!("Failed to create decoder: {}", e)))?; + }.map_err(|e| Error::internal(format!("Failed to create decoder: {}", e)))?; self.pipeline.add_many(&[&src, &decoder, &sink]) - .map_err(|e| Error::Internal(format!("Failed to add elements: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to add elements: {}", e)))?; gst::Element::link_many(&[&src, &decoder, &sink]) - .map_err(|e| Error::Internal(format!("Failed to link elements: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to link elements: {}", e)))?; - self.pipeline.set_state(gst::State::Playing) - .map_err(|e| Error::Internal(format!("Failed to start pipeline: {}", e)))?; - - let bus = self.pipeline.bus().unwrap(); - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Error(err) => { - error!("Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - return Err(Error::Internal(format!("Pipeline error: {}", err.error()))); - } - MessageView::Eos(_) => break, - _ => (), - } - } - - self.pipeline.set_state(gst::State::Null) - .map_err(|e| Error::Internal(format!("Failed to stop pipeline: {}", e)))?; + self.setup_pipeline()?; Ok(()) } @@ -78,16 +84,16 @@ impl MediaProcessor { let src = gst::ElementFactory::make("filesrc") .property("location", input_path.to_str().unwrap()) .build() - .map_err(|e| Error::Internal(format!("Failed to create source element: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create source element: {}", e)))?; let decodebin = gst::ElementFactory::make("decodebin").build() - .map_err(|e| Error::Internal(format!("Failed to create decodebin: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create decodebin: {}", e)))?; self.pipeline.add_many(&[&src, &decodebin]) - .map_err(|e| Error::Internal(format!("Failed to add elements: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to add elements: {}", e)))?; gst::Element::link_many(&[&src, &decodebin]) - .map_err(|e| Error::Internal(format!("Failed to link elements: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to link elements: {}", e)))?; let mut metadata = MediaMetadata::default(); @@ -119,83 +125,7 @@ impl MediaProcessor { } }); - self.pipeline.set_state(gst::State::Playing) - .map_err(|e| Error::Internal(format!("Failed to start pipeline: {}", e)))?; - - let bus = self.pipeline.bus().unwrap(); - - for msg in bus.iter_timed(gst::ClockTime::NONE) { - use gst::MessageView; - - match msg.view() { - MessageView::Error(err) => { - error!("Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - return Err(Error::Internal(format!("Pipeline error: {}", err.error()))); - } - MessageView::Eos(_) => break, - _ => (), - } - } - - self.pipeline.set_state(gst::State::Null) - .map_err(|e| Error::Internal(format!("Failed to stop pipeline: {}", e)))?; - + self.setup_pipeline()?; Ok(metadata) } } - -#[derive(Debug, Default)] -pub struct MediaMetadata { - pub width: Option, - pub height: Option, - pub framerate: Option, - pub channels: Option, - pub sample_rate: Option, -} - -#[cfg(test)] -mod tests { - use super::*; - use std::path::PathBuf; - use rstest::*; - - #[fixture] - fn media_processor() -> MediaProcessor { - MediaProcessor::new().unwrap() - } - - #[fixture] - fn test_video_path() -> PathBuf { - PathBuf::from("test_data/test.mp4") - } - - #[rstest] - #[tokio::test] - async fn test_transcode(media_processor: MediaProcessor, test_video_path: PathBuf) { - let output_path = PathBuf::from("test_data/output.mp4"); - - let result = media_processor.transcode( - test_video_path, - output_path.clone(), - "h264", - ).await; - - assert!(result.is_ok()); - assert!(output_path.exists()); - std::fs::remove_file(output_path).unwrap(); - } - - #[rstest] - #[tokio::test] - async fn test_extract_metadata(media_processor: MediaProcessor, test_video_path: PathBuf) { - let metadata = media_processor.extract_metadata(test_video_path).await.unwrap(); - - assert!(metadata.width.is_some()); - assert!(metadata.height.is_some()); - assert!(metadata.framerate.is_some()); - } -} diff --git a/gb-media/src/webrtc.rs b/gb-media/src/webrtc.rs index 8c46cb3..828ed45 100644 --- a/gb-media/src/webrtc.rs +++ b/gb-media/src/webrtc.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use gb_core::{ models::*, traits::*, - Result, Error, + Result, Error, Connection, }; use uuid::Uuid; use webrtc::{ @@ -12,8 +12,12 @@ use webrtc::{ peer_connection::peer_connection_state::RTCPeerConnectionState, peer_connection::RTCPeerConnection, track::track_remote::TrackRemote, + rtp::rtp_receiver::RTCRtpReceiver, + rtp::rtp_transceiver::RTCRtpTransceiver, }; use tracing::{instrument, error}; +use std::sync::Arc; +use chrono::Utc; pub struct WebRTCService { config: RTCConfiguration, @@ -38,17 +42,35 @@ impl WebRTCService { let peer_connection = api.new_peer_connection(self.config.clone()) .await - .map_err(|e| Error::WebRTC(format!("Failed to create peer connection: {}", e)))?; + .map_err(|e| Error::internal(format!("Failed to create peer connection: {}", e)))?; Ok(peer_connection) } + + async fn handle_track(&self, track: Arc, receiver: Arc, transceiver: Arc) { + tracing::info!( + "Received track: {} {}", + track.kind(), + track.id() + ); + } + + async fn create_connection(&self) -> Result { + Ok(Connection { + id: Uuid::new_v4(), + connected_at: Utc::now(), + ice_servers: self.config.ice_servers.clone(), + metadata: serde_json::Value::Object(serde_json::Map::new()), + room_id: Uuid::new_v4(), + user_id: Uuid::new_v4(), + }) + } } #[async_trait] impl RoomService for WebRTCService { #[instrument(skip(self))] async fn create_room(&self, config: RoomConfig) -> Result { - // Create room implementation todo!() } @@ -56,14 +78,13 @@ impl RoomService for WebRTCService { async fn join_room(&self, room_id: Uuid, user_id: Uuid) -> Result { let peer_connection = self.create_peer_connection().await?; - // Setup connection handlers peer_connection .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { Box::pin(async move { match s { RTCPeerConnectionState::Connected => { tracing::info!("Peer connection connected"); - } + } RTCPeerConnectionState::Disconnected | RTCPeerConnectionState::Failed | RTCPeerConnectionState::Closed => { @@ -74,58 +95,35 @@ impl RoomService for WebRTCService { }) })); - peer_connection - .on_track(Box::new(move |track: Option>, _receiver| { - Box::pin(async move { - if let Some(track) = track { - tracing::info!( - "Received track: {} {}", - track.kind(), - track.id() - ); - } - }) - })); - - // Create connection object - let connection = Connection { - id: Uuid::new_v4(), - room_id, - user_id, - ice_servers: self.config.ice_servers.clone(), - metadata: serde_json::Value::Object(serde_json::Map::new()), - }; + let mut connection = self.create_connection().await?; + connection.room_id = room_id; + connection.user_id = user_id; Ok(connection) } #[instrument(skip(self))] async fn leave_room(&self, room_id: Uuid, user_id: Uuid) -> Result<()> { - // Leave room implementation todo!() } #[instrument(skip(self))] async fn publish_track(&self, track: TrackInfo) -> Result { - // Publish track implementation todo!() } #[instrument(skip(self))] async fn subscribe_track(&self, track_id: Uuid) -> Result { - // Subscribe to track implementation todo!() - } +} #[instrument(skip(self))] async fn get_participants(&self, room_id: Uuid) -> Result> { - // Get participants implementation todo!() } #[instrument(skip(self))] async fn get_room_stats(&self, room_id: Uuid) -> Result { - // Get room stats implementation todo!() } } @@ -142,7 +140,6 @@ mod tests { #[rstest] #[tokio::test] - async fn test_create_peer_connection(webrtc_service: WebRTCService) { let peer_connection = webrtc_service.create_peer_connection().await.unwrap(); assert_eq!( diff --git a/gb-monitoring/Cargo.toml b/gb-monitoring/Cargo.toml index c395321..ae9fcf4 100644 --- a/gb-monitoring/Cargo.toml +++ b/gb-monitoring/Cargo.toml @@ -1,15 +1,15 @@ [package] name = "gb-monitoring" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true [dependencies] opentelemetry = { version = "0.19", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.12", features = ["tonic"] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } -thiserror = "1.0" -prometheus = "0.13" +tracing.workspace = true +tracing-subscriber.workspace = true +thiserror.workspace = true +prometheus.workspace = true gb-core = { path = "../gb-core" } lazy_static = "1.4" tokio.workspace = true @@ -18,4 +18,4 @@ serde_json.workspace = true [dev-dependencies] rstest.workspace = true -tokio-test = "0.4" +tokio-test = "0.4" \ No newline at end of file diff --git a/gb-storage/src/redis.rs b/gb-storage/src/redis.rs index e1a74b8..5233888 100644 --- a/gb-storage/src/redis.rs +++ b/gb-storage/src/redis.rs @@ -26,7 +26,7 @@ impl RedisCache { .map_err(Error::Redis)?; let serialized = serde_json::to_string(value) - .map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?; + .map_err(|e| Error::internal(format!("Serialization error: {}", e)))?; conn.set_ex(key, serialized, self.default_ttl.as_secs() as usize) .await @@ -51,7 +51,7 @@ impl RedisCache { match value { Some(v) => { let deserialized = serde_json::from_str(&v) - .map_err(|e| Error::Internal(format!("Deserialization error: {}", e)))?; + .map_err(|e| Error::internal(format!("Deserialization error: {}", e)))?; Ok(Some(deserialized)) } None => Ok(None), @@ -95,7 +95,7 @@ impl RedisCache { .map_err(Error::Redis)?; let serialized = serde_json::to_string(value) - .map_err(|e| Error::Internal(format!("Serialization error: {}", e)))?; + .map_err(|e| Error::internal(format!("Serialization error: {}", e)))?; conn.set_ex(key, serialized, ttl.as_secs() as usize) .await diff --git a/gb-storage/src/tikv.rs b/gb-storage/src/tikv.rs index feea994..fa39527 100644 --- a/gb-storage/src/tikv.rs +++ b/gb-storage/src/tikv.rs @@ -12,7 +12,7 @@ impl TiKVStorage { let config = Config::default(); let client = RawClient::new(pd_endpoints, config) .await - .map_err(|e| Error::Internal(format!("TiKV client error: {}", e)))?; + .map_err(|e| Error::internal(format!("TiKV client error: {}", e)))?; Ok(Self { client }) } @@ -24,7 +24,7 @@ impl TiKVStorage { .await .map_err(|e| { error!("TiKV put error: {}", e); - Error::Internal(format!("TiKV error: {}", e)) + Error::internal(format!("TiKV error: {}", e)) }) } @@ -35,7 +35,7 @@ impl TiKVStorage { .await .map_err(|e| { error!("TiKV get error: {}", e); - Error::Internal(format!("TiKV error: {}", e)) + Error::internal(format!("TiKV error: {}", e)) }) } @@ -46,7 +46,7 @@ impl TiKVStorage { .await .map_err(|e| { error!("TiKV delete error: {}", e); - Error::Internal(format!("TiKV error: {}", e)) + Error::internal(format!("TiKV error: {}", e)) }) } @@ -57,7 +57,7 @@ impl TiKVStorage { .await .map_err(|e| { error!("TiKV batch get error: {}", e); - Error::Internal(format!("TiKV error: {}", e)) + Error::internal(format!("TiKV error: {}", e)) }) } @@ -68,7 +68,7 @@ impl TiKVStorage { .await .map_err(|e| { error!("TiKV scan error: {}", e); - Error::Internal(format!("TiKV error: {}", e)) + Error::internal(format!("TiKV error: {}", e)) }) } } diff --git a/gb-utils/src/detector.rs b/gb-utils/src/detector.rs index 38a6274..d49b9ab 100644 --- a/gb-utils/src/detector.rs +++ b/gb-utils/src/detector.rs @@ -32,7 +32,7 @@ impl FileTypeDetector { } else if data.starts_with(b"RIFF") && data[8..12] == *b"WEBP" { Ok(FileType::WebP) } else { - Err(Error::Internal("Unknown file type".to_string())) + Err(Error::internal("Unknown file type".to_string())) } } diff --git a/gb-utils/src/processor.rs b/gb-utils/src/processor.rs index 394527c..f58690a 100644 --- a/gb-utils/src/processor.rs +++ b/gb-utils/src/processor.rs @@ -103,7 +103,7 @@ cat >> gb-utils/src/processor.rs << 'EOL' let headers: Vec<&str> = data[0].iter().map(|s| s.as_str()).collect(); ExcelProcessor::create_excel(&headers, &data[1..]) } - _ => Err(Error::Internal(format!( + _ => Err(Error::internal(format!( "Unsupported conversion: {:?} to {:?}", file.content_type(), target_type