Implement email, meeting, proxy, and webmail services with LXC containers
Some checks failed
GBCI / build (push) Failing after 8m43s

- Added email service setup script to configure Stalwart Mail in a container.
- Created meeting service script to install and configure LiveKit with TURN server.
- Developed proxy service script to set up Caddy as a reverse proxy.
- Implemented webmail service script to deploy Roundcube with PHP support.
- Established system service files for each service to manage their lifecycle.
- Configured persistent storage for logs, data, and configuration for all services.
- Added integration tests for email listing and file upload functionalities.
- Updated prompt guidelines for consistent directory structure and user management.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-06-19 23:16:57 -03:00
parent 6d771bf264
commit c75095505b
169 changed files with 798 additions and 16352 deletions

View file

@ -1,30 +0,0 @@
# The run config is used for both run mode and debug mode
[[configs]]
# the name of this task
name = "task"
# the type of the debugger. If not set, it can't be debugged but can still be run
# type = "lldb"
# the program to run, e.g. "${workspace}\\target\\debug\\check.exe"
program = ""
# the program arguments, e.g. args = ["arg1", "arg2"], optional
# args = []
# current working directory, optional
# cwd = "${workspace}"
# environment variables, optional
# [configs.env]
# VAR1 = "VAL1"
# VAR2 = "VAL2"
# task to run before the run/debug session is started, optional
# [configs.prelaunch]
# program = "cargo"
# args = [
# "build",
# ]

7860
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,143 +1,22 @@
[workspace]
resolver = "2"
members = [
"gb-core",
"gb-server",
"gb-media",
"gb-messaging",
"gb-storage",
"gb-monitoring",
"gb-auth",
"gb-testing",
"gb-migrations",
"gb-cloud",
"gb-vm",
"gb-automation",
"gb-image",
"gb-utils",
"gb-document",
"gb-file",
"gb-llm",
"gb-calendar", "gb-infra",
]
[workspace.package]
[package]
name = "gbserver"
version = "0.1.0"
edition = "2021"
authors = ["General Bots Maintainers"]
authors = ["Your Name <your.email@example.com>"]
description = "File server with MinIO integration"
license = "MIT"
repository = ""
[workspace.dependencies]
# Core async runtime and utilities
tokio = { version = "1.34", features = ["full"] }
tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] }
tungstenite = "0.20"
tokio-test = "0.4"
tokio-stream = "0.1.17"
async-trait = "0.1"
futures = "0.3"
futures-util = "0.3" # Add futures-util here
parking_lot = "0.12"
bytes = "1.0"
[dependencies]
actix-web = "4"
actix-multipart = "0.6"
tokio = { version = "1", features = ["full"] }
log = "0.4"
env_logger = "0.10"
ctrlc = "3.2"
# Web framework and servers
axum = { version = "0.7.9", features = ["ws", "multipart"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors", "trace", "fs"] }
hyper = { version = "1.1", features = ["full"] }
hyper-util = { version = "0.1" }
tonic = { version = "0.10", features = ["tls", "transport"] }
actix-multipart = "0.4"
# Database and storage
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "mysql", "sqlite", "uuid", "time", "json"] }
redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] }
tikv-client = "0.3"
sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros"] }
# Message queues
rdkafka = { version = "0.36", features = ["cmake-build", "ssl"] }
lapin = "2.3"
# Drive, Serialization and data formats
minio = { git = "https://github.com/minio/minio-rs", branch = "master" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt"] }
dotenv = "0.15"
tempfile = "3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
protobuf = "3.3"
prost = "0.12"
csv = "1.3"
# WebRTC and media processing
webrtc = "0.9"
gstreamer = "0.21"
opus = "0.3"
image = "0.24"
# Authentication and security
zitadel = {version = "5.5.1", features = ["api-common", "api-auth-v1", "zitadel-auth-v1", "credentials", "interceptors"]}
jsonwebtoken = "9.2"
argon2 = "0.5"
ring = "0.17"
reqwest = { version = "0.11", features = ["json", "stream", "blocking"] }
# Cloud services
aws-sdk-core = "1.1"
azure_core = "0.15"
azure_identity = "0.15"
google-cloud-storage = "0.16"
# Monitoring and metrics
prometheus = "0.13.0"
opentelemetry = { version = "0.20", features = ["rt-tokio"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Testing
criterion = "0.5"
mockall = "0.12"
fake = { version = "2.9", features = ["derive"] }
rstest = "0.18"
# Utilities
actix-web = "4.0.1"
uuid = { version = "1.6", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] }
thiserror = "1.0"
anyhow = "1.0"
regex = "1.10"
url = "2.5"
rand = "0.8"
base64 = "0.21"
semver = "1.0"
walkdir = "2.4"
tempfile = "3.9"
dotenv = "0.15"
lettre = "0.10"
sanitize-filename = "0.3"
# Web assembly
wasm-bindgen = "0.2"
js-sys = "0.3"
web-sys = { version = "0.3", features = ["WebSocket", "WebRtcPeerConnection"] }
# Natural language processing
rust-bert = "0.21"
tokenizers = "0.15"
whatlang = "0.16"
# PDF and document processing
pdf = "0.8"
docx = "1.1"
zip = "0.6"
tar = "0.4"
flate2 = "1.0"
[workspace.metadata]
msrv = "1.70.0"
minio = { git = "https://github.com/minio/minio-rs", branch = "master" }
tokio-stream = "0.1.17"

View file

@ -1,12 +0,0 @@
#!/bin/bash
set -e
echo "Deploying General Bots platform..."
# Create DB.
cargo run -p gb-migrations --bin migrations
echo "Deployment completed successfully!"
echo "Please wait for all pods to be ready..."

View file

@ -1,58 +0,0 @@
[package]
name = "gb-auth"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
# Authentication & Security
jsonwebtoken = { workspace = true }
argon2 = "0.5"
rand = { version = "0.8", features = ["std"] }
oauth2 = "4.4"
openid = "0.12"
tokio-openssl = "0.6"
ring = "0.17"
# Async Runtime
tokio= { workspace = true }
async-trait= { workspace = true }
# Database
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] }
redis = { version = "0.24", features = ["tokio-comp", "json"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Error Handling
thiserror = "1.0"
# Logging & Metrics
tracing= { workspace = true }
# Utils
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.6", features = ["serde", "v4"] }
validator = { version = "0.16", features = ["derive"] }
# Web Framework
axum = { version = "0.7.9" }
axum-extra = { version = "0.7" } # Add headers feature
tower = "0.4"
tower-http = { version = "0.5", features = ["auth", "cors", "trace"] }
headers = "0.3"
tokio-stream = { workspace = true }
[dev-dependencies]
rstest = "0.18"
tokio-test = "0.4"
mockall = "0.12"
axum-extra = { version = "0.7" }
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] }

View file

@ -1,23 +0,0 @@
jwt:
secret: your_jwt_secret_key_here
expiration: 3600 # 1 hour in seconds
password:
min_length: 8
require_uppercase: true
require_lowercase: true
require_numbers: true
require_special: true
oauth:
providers:
google:
client_id: your_google_client_id
client_secret: your_google_client_secret
github:
client_id: your_github_client_id
client_secret: your_github_client_secret
redis:
url: redis://localhost:6379
session_ttl: 86400 # 24 hours in seconds

View file

@ -1,26 +0,0 @@
use gb_core::Error as CoreError;
use redis::RedisError;
use sqlx::Error as SqlxError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AuthError {
#[error("Invalid token")]
InvalidToken,
#[error("Invalid credentials")]
InvalidCredentials,
#[error("Database error: {0}")]
Database(#[from] SqlxError),
#[error("Redis error: {0}")]
Redis(#[from] RedisError),
#[error("Internal error: {0}")]
Internal(String),
}
impl From<CoreError> for AuthError {
fn from(err: CoreError) -> Self {
match err {
CoreError { .. } => AuthError::Internal(err.to_string()),
}
}
}

View file

@ -1,24 +0,0 @@
use gb_core::Error as CoreError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AuthError {
#[error("Invalid token")]
InvalidToken,
#[error("Invalid credentials")]
InvalidCredentials,
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Internal error: {0}")]
Internal(String),
}
impl From<CoreError> for AuthError {
fn from(err: CoreError) -> Self {
match err {
CoreError { .. } => AuthError::Internal(err.to_string()),
}
}
}

View file

@ -1,2 +0,0 @@
mod auth_handler;

View file

@ -1,7 +0,0 @@
mod error;
pub mod handlers;
pub mod models;
pub mod services; // Make services module public
pub mod middleware;
pub use error::AuthError;

View file

@ -1,33 +0,0 @@
use axum::{
http::{Request, Response},
middleware::Next,
body::Body,
};
use headers::{Authorization, authorization::Bearer};
use jsonwebtoken::{decode, DecodingKey, Validation};
use serde::{Serialize, Deserialize};
use crate::AuthError;
#[derive(Debug, Serialize, Deserialize)]
struct Claims {
sub: String,
exp: i64,
}
pub async fn auth_middleware(
auth: Authorization<Bearer>,
request: Request<Body>,
next: Next,
) -> Result<Response<Body>, AuthError> {
let token = auth.token();
let key = DecodingKey::from_secret(b"secret");
let validation = Validation::default();
match decode::<Claims>(token, &key, &validation) {
Ok(_claims) => {
let response = next.run(request).await;
Ok(response)
}
Err(_) => Err(AuthError::InvalidToken),
}
}

View file

@ -1,3 +0,0 @@
mod auth_middleware;
pub use auth_middleware::*;

View file

@ -1,15 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
pub struct LoginRequest {
pub email: String,
pub password: String,
}
#[derive(Debug, Serialize)]
pub struct LoginResponse {
pub access_token: String,
pub refresh_token: String,
pub token_type: String,
pub expires_in: i64,
}

View file

@ -1,4 +0,0 @@
mod auth;
pub mod user;
pub use auth::{LoginRequest, LoginResponse};

View file

@ -1,48 +0,0 @@
use serde::{Serialize, Deserialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};
#[derive(Debug, Serialize, Deserialize)]
pub enum UserRole {
Admin,
User,
Guest,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum UserStatus {
Active,
Inactive,
Suspended,
}
impl From<String> for UserRole {
fn from(s: String) -> Self {
match s.to_lowercase().as_str() {
"admin" => UserRole::Admin,
"guest" => UserRole::Guest,
_ => UserRole::User,
}
}
}
impl From<String> for UserStatus {
fn from(s: String) -> Self {
match s.to_lowercase().as_str() {
"inactive" => UserStatus::Inactive,
"suspended" => UserStatus::Suspended,
_ => UserStatus::Active,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DbUser {
pub id: Uuid,
pub email: String,
pub password_hash: String,
pub role: UserRole,
pub status: UserStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

View file

@ -1 +0,0 @@
// Auth utilities module

View file

@ -1,28 +0,0 @@
[package]
name = "gb-automation"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
image = { version = "0.24", features = ["webp", "jpeg", "png", "gif"] }
chromiumoxide = { version = "0.5", features = ["tokio-runtime"] }
futures-util = "0.3"
async-trait= { workspace = true }
tokio= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
thiserror= { workspace = true }
tracing= { workspace = true }
uuid= { workspace = true }
regex = "1.10"
fantoccini = "0.19"
headless_chrome = "1.0"
async-recursion = "1.0"
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"
mock_instant = "0.2"

View file

@ -1,4 +0,0 @@
mod web;
pub use chromiumoxide::element::Element;
pub use web::WebAutomation;

View file

@ -1,127 +0,0 @@
use std::{
path::{Path, PathBuf},
process::{Child, Command, Stdio},
};
use tokio::sync::Mutex;
use tracing::{error, instrument};
use uuid::Uuid;
use gb_core::{Error, Result};
#[derive(Debug)]
struct Process {
id: Uuid,
handle: Child,
}
pub struct ProcessAutomation {
working_dir: PathBuf,
processes: Mutex<Vec<Process>>,
}
impl ProcessAutomation {
pub fn new(working_dir: impl AsRef<Path>) -> Self {
Self {
working_dir: working_dir.as_ref().to_path_buf(),
processes: Mutex::new(Vec::new()),
}
}
#[instrument(skip(self, command))]
pub async fn execute(&self, command: &str, args: &[&str]) -> Result<String> {
let output = Command::new(command)
.args(args)
.current_dir(&self.working_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
.map_err(|e| Error::internal(format!("Failed to execute command: {}", e)))?;
if !output.status.success() {
let error = String::from_utf8_lossy(&output.stderr);
return Err(Error::internal(format!("Command failed: {}", error)));
}
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
Ok(stdout)
}
pub async fn spawn(&self, command: &str, args: &[&str]) -> Result<Uuid> {
let child = Command::new(command)
.args(args)
.current_dir(&self.working_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| Error::internal(format!("Failed to spawn process: {}", e)))?;
let id = Uuid::new_v4();
let mut processes = self.processes.lock().await;
processes.push(Process { id, handle: child });
Ok(id)
}
pub async fn kill(&self, id: Uuid) -> Result<()> {
let mut processes = self.processes.lock().await;
if let Some(index) = processes.iter().position(|p| p.id == id) {
let mut process = processes.remove(index);
process.handle.kill()
.map_err(|e| Error::internal(format!("Failed to kill process: {}", e)))?;
}
Ok(())
}
pub async fn cleanup(&self) -> Result<()> {
let mut processes = self.processes.lock().await;
for process in processes.iter_mut() {
if let Err(e) = process.handle.kill() {
error!("Failed to kill process {}: {}", process.id, e);
}
}
processes.clear();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn automation() -> ProcessAutomation {
let dir = tempdir().unwrap();
ProcessAutomation::new(dir.path())
}
#[tokio::test]
async fn test_execute() -> Result<()> {
let automation = automation();
let output = automation.execute("echo", &["Hello, World!"]).await?;
assert!(output.contains("Hello, World!"));
Ok(())
}
#[tokio::test]
async fn test_spawn_and_kill() -> Result<()> {
let automation = automation();
let id = automation.spawn("sleep", &["1"]).await?;
automation.kill(id).await?;
Ok(())
}
#[tokio::test]
async fn test_cleanup() -> Result<()> {
let automation = automation();
automation.spawn("sleep", &["1"]).await?;
automation.spawn("sleep", &["2"]).await?;
automation.cleanup().await?;
let processes = automation.processes.lock().await;
assert!(processes.is_empty());
Ok(())
}
}

View file

@ -1,61 +0,0 @@
use chromiumoxide::{Browser, Element};
use chromiumoxide::page::Page;
use chromiumoxide::browser::BrowserConfig;
use futures_util::StreamExt;
use gb_core::{Error, Result};
use std::time::Duration;
pub struct WebAutomation {
browser: Browser,
}
impl WebAutomation {
pub async fn new() -> Result<Self> {
let config = BrowserConfig::builder()
.build()
.map_err(|e| Error::internal(e.to_string()))?;
let (browser, handler) = Browser::launch(config)
.await
.map_err(|e| Error::internal(e.to_string()))?;
// Spawn the handler in the background
tokio::spawn(async move {
handler.for_each(|_| async {}).await;
});
Ok(Self { browser })
}
pub async fn new_page(&self) -> Result<Page> {
self.browser
.new_page("about:blank")
.await
.map_err(|e| Error::internal(e.to_string()))
}
pub async fn navigate(&self, page: &Page, url: &str) -> Result<()> {
page.goto(url)
.await
.map_err(|e| Error::internal(e.to_string()))?;
Ok(())
}
pub async fn take_screenshot(&self, page: &Page) -> Result<Vec<u8>> {
let params = chromiumoxide::page::ScreenshotParams::builder().build();
page.screenshot(params)
.await
.map_err(|e| Error::internal(e.to_string()))
}
pub async fn find_element(&self, page: &Page, selector: &str, timeout: Duration) -> Result<Element> {
tokio::time::timeout(
timeout,
page.find_element(selector)
)
.await
.map_err(|_| Error::internal("Timeout waiting for element"))?
.map_err(|e| Error::internal(e.to_string()))
}
}

View file

@ -1,9 +0,0 @@
[package]
name = "gb-calendar"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }

View file

View file

@ -1,19 +0,0 @@
[package]
name = "gb-cloud"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
async-trait= { workspace = true }
tokio= { workspace = true }
serde = { version = "1.0", features = ["derive"] }
thiserror= { workspace = true }
tracing= { workspace = true }
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"
tempfile = "3.8"

View file

View file

@ -1,33 +0,0 @@
[package]
name = "gb-core"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
tokio-tungstenite = { workspace = true }
async-trait= { workspace = true }
serde= { workspace = true }
uuid= { workspace = true }
tokio= { workspace = true }
thiserror= { workspace = true }
chrono= { workspace = true }
tracing= { workspace = true }
axum = { version = "0.7", features = ["json"] }
serde_json = "1.0"
sqlx = { workspace = true }
redis = { workspace = true }
minio = { workspace = true }
zitadel = { workspace = true }
rdkafka = { workspace = true }
tonic = { workspace = true }
actix-web ={ workspace = true }
anyhow = { workspace = true }
jsonwebtoken = { workspace = true }
lettre= { workspace = true }
[dev-dependencies]
mockall= { workspace = true }
rstest= { workspace = true }
tokio-test = { workspace = true }

View file

@ -1,114 +0,0 @@
use serde::Deserialize;
use std::env;
#[derive(Clone, Debug, Deserialize)]
pub struct AppConfig {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub redis: RedisConfig,
pub kafka: KafkaConfig,
// pub zitadel: ZitadelConfig,
pub minio: MinioConfig,
pub email: EmailConfig,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
}
#[derive(Clone, Debug, Deserialize)]
pub struct DatabaseConfig {
pub url: String,
pub max_connections: u32,
}
#[derive(Clone, Debug, Deserialize)]
pub struct RedisConfig {
pub url: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct KafkaConfig {
pub brokers: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ZitadelConfig {
pub domain: String,
pub client_id: String,
pub client_secret: String,
pub access_token: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct MinioConfig {
pub endpoint: String,
pub access_key: String,
pub secret_key: String,
pub use_ssl: bool,
pub bucket: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct EmailConfig {
pub smtp_server: String,
pub smtp_port: u16,
pub username: String,
pub password: String,
pub from_email: String,
}
impl AppConfig {
pub fn from_env() -> Self {
Self {
server: ServerConfig {
host: env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()),
port: env::var("SERVER_PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse()
.expect("Invalid SERVER_PORT"),
},
database: DatabaseConfig {
url: env::var("DATABASE_URL").expect("DATABASE_URL must be set"),
max_connections: env::var("DATABASE_MAX_CONNECTIONS")
.unwrap_or_else(|_| "5".to_string())
.parse()
.expect("Invalid DATABASE_MAX_CONNECTIONS"),
},
redis: RedisConfig {
url: env::var("REDIS_URL").expect("REDIS_URL must be set"),
},
kafka: KafkaConfig {
brokers: env::var("KAFKA_BROKERS").expect("KAFKA_BROKERS must be set"),
},
// zitadel: ZitadelConfig {
// domain: env::var("ZITADEL_DOMAIN").expect("ZITADEL_DOMAIN must be set"),
// client_id: env::var("ZITADEL_CLIENT_ID").expect("ZITADEL_CLIENT_ID must be set"),
// client_secret: env::var("ZITADEL_CLIENT_SECRET")
// .expect("ZITADEL_CLIENT_SECRET must be set"),
// },
minio: MinioConfig {
endpoint: env::var("MINIO_ENDPOINT").expect("MINIO_ENDPOINT must be set"),
access_key: env::var("MINIO_ACCESS_KEY").expect("MINIO_ACCESS_KEY must be set"),
secret_key: env::var("MINIO_SECRET_KEY").expect("MINIO_SECRET_KEY must be set"),
use_ssl: env::var("MINIO_USE_SSL")
.unwrap_or_else(|_| "false".to_string())
.parse()
.expect("Invalid MINIO_USE_SSL"),
bucket: env::var("MINIO_BUCKET").expect("MINIO_BUCKET must be set"),
},
email: EmailConfig {
smtp_server: env::var("EMAIL_SMTP_SERVER").expect("EMAIL_SMTP_SERVER must be set"),
smtp_port: env::var("EMAIL_SMTP_PORT")
.unwrap_or_else(|_| "587".to_string())
.parse()
.expect("Invalid EMAIL_SMTP_PORT"),
username: env::var("EMAIL_USERNAME").expect("EMAIL_USERNAME must be set"),
password: env::var("EMAIL_PASSWORD").expect("EMAIL_PASSWORD must be set"),
from_email: env::var("EMAIL_FROM").expect("EMAIL_FROM must be set"),
},
}
}
}

View file

@ -1,69 +0,0 @@
use crate::config::AppConfig;
use anyhow::Result;
use minio::s3::client::{Client as MinioClient, ClientBuilder as MinioClientBuilder};
use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl;
use rdkafka::producer::FutureProducer;
use rdkafka::ClientConfig;
use redis::aio::ConnectionManager as RedisConnectionManager;
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::str::FromStr;
// use zitadel::api::clients::ClientBuilder;
// use zitadel::api::interceptors::AccessTokenInterceptor;
// use zitadel::api::zitadel::auth::v1::auth_service_client::AuthServiceClient;
pub async fn init_postgres(config: &AppConfig) -> Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(config.database.max_connections)
.connect(&config.database.url)
.await
.map_err(|e| anyhow::anyhow!(e))?;
Ok(pool)
}
pub async fn init_redis(config: &AppConfig) -> Result<RedisConnectionManager> {
let client = redis::Client::open(config.redis.url.as_str())?;
let connection_manager = RedisConnectionManager::new(client).await?;
Ok(connection_manager)
}
pub async fn init_kafka(config: &AppConfig) -> Result<FutureProducer> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &config.kafka.brokers)
.set("message.timeout.ms", "5000")
.create()?;
Ok(producer)
}
pub async fn init_zitadel(
_config: &AppConfig,
) -> Result<
(),
Box<dyn std::error::Error>>
{
// TODO: https://github.com/smartive/zitadel-rust/blob/be389ca08c7f82d36fc1bcc36d2d9eb8666b22cd/examples/fetch_profile_with_service_account.rs#L18
Ok(())
}
pub async fn init_minio(
config: &AppConfig,
) -> Result<MinioClient, Box<dyn std::error::Error + Send + Sync>> {
// Construct the base URL
let base_url = format!("https://{}", config.minio.endpoint);
let base_url = BaseUrl::from_str(&base_url)?;
// Create credentials provider
let credentials = StaticProvider::new(&config.minio.access_key, &config.minio.secret_key, None);
// Build the MinIO client
let client = MinioClientBuilder::new(base_url.clone())
.provider(Some(Box::new(credentials)))
//.secure(config.minio.use_ssl)
.build()?;
Ok(client)
}

View file

@ -1,123 +0,0 @@
use thiserror::Error;
use axum::{
response::{IntoResponse, Response},
http::StatusCode,
Json,
};
use serde_json::json;
#[derive(Error, Debug)]
pub enum ErrorKind {
#[error("Database error: {0}")]
Database(String),
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Redis error: {0}")]
Redis(String),
#[error("Kafka error: {0}")]
Kafka(String),
#[error("Invalid input: {0}")]
InvalidInput(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Authentication error: {0}")]
Authentication(String),
#[error("Authorization error: {0}")]
Authorization(String),
#[error("Internal error: {0}")]
Internal(String),
#[error("External service error: {0}")]
ExternalService(String),
#[error("WebSocket error: {0}")]
WebSocket(String),
#[error("Messaging error: {0}")]
Messaging(String),
#[error("API HTTP Server error: {0}")]
Server(String),
}
#[derive(Debug)]
pub struct Error {
pub kind: ErrorKind,
pub message: String,
}
impl Error {
pub fn new(kind: ErrorKind, message: impl Into<String>) -> Self {
Self {
kind,
message: message.into(),
}
}
pub fn internal<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::Internal(msg.to_string()), msg.to_string())
}
pub fn redis<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::Redis(msg.to_string()), msg.to_string())
}
pub fn kafka<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::Kafka(msg.to_string()), msg.to_string())
}
pub fn database<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::Database(msg.to_string()), msg.to_string())
}
pub fn websocket<T: std::fmt::Display>(msg: T) -> Self {
Self::new(ErrorKind::WebSocket(msg.to_string()), msg.to_string())
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.kind, self.message)
}
}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub enum AuthError {
InvalidToken,
MissingToken,
InvalidCredentials,
Internal(String),
}
impl IntoResponse for Error {
fn into_response(self) -> Response {
let status = match self.kind {
ErrorKind::NotFound(_) => StatusCode::NOT_FOUND,
ErrorKind::Authentication(_) => StatusCode::UNAUTHORIZED,
ErrorKind::Authorization(_) => StatusCode::FORBIDDEN,
ErrorKind::InvalidInput(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let body = Json(json!({
"error": self.message,
"kind": format!("{:?}", self.kind)
}));
(status, body).into_response()
}
}

View file

@ -1,9 +0,0 @@
pub mod db;
pub mod errors;
pub mod models;
pub mod traits;
pub mod config;
pub mod utils;
pub use errors::{Error, ErrorKind, Result};
pub use utils::{create_response, extract_user_id};

View file

@ -1,396 +0,0 @@
use chrono::{DateTime, Utc};
use minio::s3::client::Client as MinioClient;
use rdkafka::producer::FutureProducer;
use redis::aio::ConnectionManager as RedisConnectionManager;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
//use zitadel::api::zitadel::auth::v1::auth_service_client::AuthServiceClient;
use serde_json::Value as JsonValue;
use std::str::FromStr;
use crate::config::AppConfig;
#[derive(Debug)]
pub struct CoreError(pub String);
// Add these near the top with other type definitions
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CustomerStatus {
Active,
Inactive,
Suspended,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubscriptionTier {
Free,
Pro,
Enterprise,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Instance {
pub id: Uuid,
pub customer_id: Uuid,
pub name: String,
pub status: String,
pub shard_id: i32,
pub region: String,
pub config: JsonValue,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Room {
pub id: Uuid,
pub instance_id: Uuid,
pub name: String,
pub kind: String,
pub status: String,
pub config: JsonValue,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: Uuid,
pub customer_id: Uuid,
pub instance_id: Uuid,
pub conversation_id: Uuid,
pub sender_id: Uuid,
pub kind: String,
pub content: String,
pub metadata: JsonValue,
pub created_at: Option<DateTime<Utc>>,
pub shard_key: Option<i32>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MessageFilter {
pub conversation_id: Option<Uuid>,
pub sender_id: Option<Uuid>,
pub from_date: Option<DateTime<Utc>>,
pub to_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SearchQuery {
pub query: String,
pub conversation_id: Option<Uuid>,
pub from_date: Option<DateTime<Utc>>,
pub to_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FileUpload {
pub content: Vec<u8>,
pub filename: String,
pub content_type: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FileContent {
pub content: Vec<u8>,
pub content_type: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Status {
pub code: String,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum UserStatus {
Active,
Inactive,
Suspended,
}
impl FromStr for UserStatus {
type Err = CoreError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"active" => Ok(UserStatus::Active),
"inactive" => Ok(UserStatus::Inactive),
"suspended" => Ok(UserStatus::Suspended),
_ => Ok(UserStatus::Inactive),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Track {
pub id: Uuid,
pub room_id: Uuid,
pub user_id: Uuid,
pub media_type: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct User {
pub id: Uuid,
pub customer_id: Uuid,
pub instance_id: Uuid,
pub name: String,
pub email: String,
pub password_hash: String,
pub status: UserStatus,
pub metadata: JsonValue,
pub created_at: DateTime<Utc>,
}
// Update the Customer struct to include these fields
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Customer {
pub id: Uuid,
pub name: String,
pub max_instances: u32,
pub email: String,
pub status: CustomerStatus, // Add this field
pub subscription_tier: SubscriptionTier, // Add this field
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl Customer {
pub fn new(
name: String,
email: String,
subscription_tier: SubscriptionTier,
max_instances: u32,
) -> Self {
Customer {
id: Uuid::new_v4(),
name,
email,
max_instances,
subscription_tier,
status: CustomerStatus::Active, // Default to Active
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomConfig {
pub instance_id: Uuid,
pub name: String,
pub max_participants: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Connection {
pub id: Uuid,
pub room_id: Uuid,
pub user_id: Uuid,
pub connected_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrackInfo {
pub room_id: Uuid,
pub user_id: Uuid,
pub media_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subscription {
pub id: Uuid,
pub track_id: Uuid,
pub subscriber_id: Uuid,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Participant {
pub user_id: Uuid,
pub room_id: Uuid,
pub joined_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomStats {
pub participant_count: u32,
pub track_count: u32,
pub duration: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageId(pub Uuid);
#[derive(Debug, Serialize, Deserialize)]
pub struct FileInfo {
pub id: Uuid,
pub filename: String,
pub content_type: String,
pub size: usize,
pub url: String,
pub created_at: DateTime<Utc>,
}
// App state shared across all handlers
// App state shared across all handlers
pub struct AppState {
pub minio_client: Option<MinioClient>,
pub config: Option<AppConfig>,
pub db_pool: Option<PgPool>,
}
// File models
#[derive(Debug, Serialize, Deserialize)]
pub struct File {
pub id: Uuid,
pub user_id: Uuid,
pub folder_id: Option<Uuid>,
pub name: String,
pub path: String,
pub mime_type: String,
pub size: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Folder {
pub id: Uuid,
pub user_id: Uuid,
pub parent_id: Option<Uuid>,
pub name: String,
pub path: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// Conversation models
#[derive(Debug, Serialize, Deserialize)]
pub struct Conversation {
pub id: Uuid,
pub name: String,
pub created_by: Uuid,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ConversationMember {
pub conversation_id: Uuid,
pub user_id: Uuid,
pub joined_at: DateTime<Utc>,
}
// Calendar models
#[derive(Debug, Serialize, Deserialize)]
pub struct CalendarEvent {
pub id: Uuid,
pub title: String,
pub description: Option<String>,
pub location: Option<String>,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub user_id: Uuid,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// Task models
#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
pub id: Uuid,
pub title: String,
pub description: Option<String>,
pub due_date: Option<DateTime<Utc>>,
pub status: TaskStatus,
pub priority: TaskPriority,
pub user_id: Uuid,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TaskStatus {
Pending,
InProgress,
Completed,
Cancelled,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum TaskPriority {
Low,
Medium,
High,
Urgent,
}
// Response models
#[derive(Debug, Serialize)]
pub struct ApiResponse<T> {
pub success: bool,
pub message: Option<String>,
pub data: Option<T>,
}
// Error models
#[derive(Debug, thiserror::Error)]
pub enum AppError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Kafka error: {0}")]
Kafka(String),
#[error("Zitadel error: {0}")]
Zitadel(#[from] tonic::Status),
#[error("Minio error: {0}")]
Minio(String),
#[error("Validation error: {0}")]
Validation(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("Forbidden: {0}")]
Forbidden(String),
#[error("Internal server error: {0}")]
Internal(String),
}
impl actix_web::ResponseError for AppError {
fn error_response(&self) -> actix_web::HttpResponse {
let (status, error_message) = match self {
AppError::Validation(_) => (actix_web::http::StatusCode::BAD_REQUEST, self.to_string()),
AppError::NotFound(_) => (actix_web::http::StatusCode::NOT_FOUND, self.to_string()),
AppError::Unauthorized(_) => {
(actix_web::http::StatusCode::UNAUTHORIZED, self.to_string())
}
AppError::Forbidden(_) => (actix_web::http::StatusCode::FORBIDDEN, self.to_string()),
_ => (
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
"Internal server error".to_string(),
),
};
actix_web::HttpResponse::build(status).json(ApiResponse::<()> {
success: false,
message: Some(error_message),
data: None,
})
}
}

View file

View file

@ -1,155 +0,0 @@
use actix_web::{HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use lettre::message::header::ContentType;
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};
use uuid::Uuid;
use crate::config::AppConfig;
use crate::models::{ApiResponse, AppError, User};
// JWT Claims
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
pub sub: String, // subject (user ID)
pub exp: usize, // expiration time
pub iat: usize, // issued at
pub email: String, // user email
pub username: String, // username
}
// Generate JWT token
pub fn generate_jwt(user: &User, secret: &str) -> Result<String, AppError> {
let expiration = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as usize + 86400; // 24 hours
let issued_at = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as usize;
let claims = Claims {
sub: user.id.to_string(),
exp: expiration,
iat: issued_at,
email: user.email.clone(),
username: user.email.clone(),
};
encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(secret.as_bytes()),
)
.map_err(|e| AppError::Internal(format!("Failed to generate JWT: {}", e)))
}
// Validate JWT token
pub fn validate_jwt(token: &str, secret: &str) -> Result<Claims, AppError> {
let validation = Validation::default();
decode::<Claims>(
token,
&DecodingKey::from_secret(secret.as_bytes()),
&validation,
)
.map(|data| data.claims)
.map_err(|e| AppError::Unauthorized(format!("Invalid token: {}", e)))
}
// Extract user ID from request
pub fn extract_user_id(req: &HttpRequest) -> Result<Uuid, AppError> {
let auth_header = req
.headers()
.get("Authorization")
.ok_or_else(|| AppError::Unauthorized("Missing Authorization header".to_string()))?
.to_str()
.map_err(|_| AppError::Unauthorized("Invalid Authorization header".to_string()))?;
if !auth_header.starts_with("Bearer ") {
return Err(AppError::Unauthorized("Invalid Authorization header format".to_string()));
}
let token = &auth_header[7..];
let claims = validate_jwt(token, "your-secret-key")?;
Uuid::parse_str(&claims.sub)
.map_err(|_| AppError::Unauthorized("Invalid user ID in token".to_string()))
}
// Send email
pub async fn send_email(
config: &AppConfig,
to_email: &str,
subject: &str,
body: &str,
) -> Result<(), AppError> {
let email = Message::builder()
.from(config.email.from_email.parse().unwrap())
.to(to_email.parse().unwrap())
.subject(subject)
.header(ContentType::TEXT_PLAIN)
.body(body.to_string())
.map_err(|e| AppError::Internal(format!("Failed to create email: {}", e)))?;
let creds = Credentials::new(
config.email.username.clone(),
config.email.password.clone(),
);
// Open a remote connection to the SMTP server
let mailer = SmtpTransport::relay(&config.email.smtp_server)
.unwrap()
.credentials(creds)
.build();
// Send the email
mailer.send(&email)
.map_err(|e| AppError::Internal(format!("Failed to send email: {}", e)))?;
Ok(())
}
// Send message to Kafka
pub async fn send_to_kafka(
producer: &FutureProducer,
topic: &str,
key: &str,
payload: &str,
) -> Result<(), AppError> {
producer
.send(
FutureRecord::to(topic)
.key(key)
.payload(payload),
Timeout::After(Duration::from_secs(5)),
)
.await
.map_err(|(e, _)| AppError::Kafka(format!("Failed to send message to Kafka: {}", e)))?;
Ok(())
}
// Format datetime for JSON responses
pub fn format_datetime(dt: DateTime<Utc>) -> String {
dt.to_rfc3339()
}
// Create a standard API response
pub fn create_response<T: Serialize>(
data: T,
message: Option<String>,
) -> HttpResponse {
HttpResponse::Ok().json(ApiResponse {
success: true,
message,
data: Some(data),
})
}

View file

@ -1,25 +0,0 @@
[package]
name = "gb-document"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
lopdf = "0.31"
docx-rs = "0.4"
calamine = "0.21"
async-trait= { workspace = true }
tokio= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
thiserror= { workspace = true }
tracing= { workspace = true }
encoding_rs = "0.8"
zip = "0.6"
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"
tempfile = "3.8"

View file

@ -1,35 +0,0 @@
use gb_core::{Result, Error};
use calamine::{Reader, Xlsx, RangeDeserializerBuilder};
use std::io::Cursor;
use tracing::{instrument, error};
pub struct ExcelProcessor;
impl ExcelProcessor {
#[instrument(skip(data))]
pub fn extract_data(data: &[u8]) -> Result<Vec<Vec<String>>> {
let cursor = Cursor::new(data);
let mut workbook = Xlsx::new(cursor)
.map_err(|e| Error::internal(format!("Failed to read Excel file: {}", e)))?;
let sheet_name = workbook.sheet_names()[0].clone();
let range = workbook.worksheet_range(&sheet_name)
.ok_or_else(|| Error::internal("Failed to get worksheet".to_string()))?
.map_err(|e| Error::internal(format!("Failed to read range: {}", e)))?;
let mut result = Vec::new();
for row in range.rows() {
let row_data: Vec<String> = row.iter()
.map(|cell| cell.to_string())
.collect();
result.push(row_data);
}
Ok(result)
}
#[instrument(skip(headers, data))]
pub fn create_excel(headers: &[&str], data: &[Vec<String>]) -> Result<Vec<u8>> {
todo!("Implement Excel creation using a suitable library");
}

View file

View file

@ -1,127 +0,0 @@
use gb_core::{Result, Error};
use lopdf::{Document, Object, StringFormat};
use std::io::Cursor;
use tracing::{instrument, error};
pub struct PdfProcessor;
impl PdfProcessor {
#[instrument(skip(data))]
pub fn extract_text(data: &[u8]) -> Result<String> {
let doc = Document::load_from(Cursor::new(data))
.map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?;
let mut text = String::new();
for page_num in 1..=doc.get_pages().len() {
if let Ok(page_text) = Self::extract_page_text(&doc, page_num) {
text.push_str(&page_text);
text.push('\n');
}
}
Ok(text)
}
#[instrument(skip(doc))]
fn extract_page_text(doc: &Document, page_num: u32) -> Result<String> {
let page = doc.get_page(page_num)
.map_err(|e| Error::internal(format!("Failed to get page {}: {}", page_num, e)))?;
let contents = doc.get_page_content(page)
.map_err(|e| Error::internal(format!("Failed to get page content: {}", e)))?;
let mut text = String::new();
for content in contents.iter() {
if let Ok(Object::String(s, StringFormat::Literal)) = content {
if let Ok(decoded) = String::from_utf8(s.clone()) {
text.push_str(&decoded);
}
}
}
Ok(text)
}
#[instrument(skip(data))]
pub fn merge_pdfs(pdfs: Vec<&[u8]>) -> Result<Vec<u8>> {
let mut merged = Document::new();
let mut current_page = 1;
for pdf_data in pdfs {
let doc = Document::load_from(Cursor::new(pdf_data))
.map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?;
for (_, page) in doc.get_pages() {
merged.add_page(page.clone());
current_page += 1;
}
}
let mut output = Vec::new();
merged.save_to(&mut Cursor::new(&mut output))
.map_err(|e| Error::internal(format!("Failed to save merged PDF: {}", e)))?;
Ok(output)
}
#[instrument(skip(data))]
pub fn split_pdf(data: &[u8], pages: &[u32]) -> Result<Vec<Vec<u8>>> {
let doc = Document::load_from(Cursor::new(data))
.map_err(|e| Error::internal(format!("Failed to load PDF: {}", e)))?;
let mut result = Vec::new();
for &page_num in pages {
let mut new_doc = Document::new();
if let Ok(page) = doc.get_page(page_num) {
new_doc.add_page(page.clone());
let mut output = Vec::new();
new_doc.save_to(&mut Cursor::new(&mut output))
.map_err(|e| Error::internal(format!("Failed to save split PDF: {}", e)))?;
result.push(output);
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
fn create_test_pdf() -> Vec<u8> {
let mut doc = Document::new();
doc.add_page(lopdf::dictionary! {
"Type" => "Page",
"Contents" => Object::String(b"BT /F1 12 Tf 72 712 Td (Test Page) Tj ET".to_vec(), StringFormat::Literal),
});
let mut output = Vec::new();
doc.save_to(&mut Cursor::new(&mut output)).unwrap();
output
}
#[rstest]
fn test_extract_text() -> Result<()> {
let pdf_data = create_test_pdf();
let text = PdfProcessor::extract_text(&pdf_data)?;
assert!(text.contains("Test Page"));
Ok(())
}
#[rstest]
fn test_merge_pdfs() -> Result<()> {
let pdf1 = create_test_pdf();
let pdf2 = create_test_pdf();
let merged = PdfProcessor::merge_pdfs(vec[build-dependencies]&pdf1, &pdf2])?;
Ok(())
}
#[rstest]
fn test_split_pdf() -> Result<()> {
let pdf_data = create_test_pdf();
let split = PdfProcessor::split_pdf(&pdf_data, &[1])?;
assert_eq!(split.len(), 1);
Ok(())
}
}

View file

@ -1,105 +0,0 @@
use gb_core::{Result, Error};
use docx_rs::{Docx, Paragraph, Run, RunText};
use std::io::{Cursor, Read};
use tracing::{instrument, error};
pub struct WordProcessor;
impl WordProcessor {
#[instrument(skip(data))]
pub fn extract_text(data: &[u8]) -> Result<String> {
let doc = Docx::from_reader(Cursor::new(data))
.map_err(|e| Error::internal(format!("Failed to read DOCX: {}", e)))?;
let mut text = String::new();
for para in doc.document.paragraphs() {
for run in para.runs() {
if let Some(text_content) = run.text() {
text.push_str(text_content);
}
text.push(' ');
}
text.push('\n');
}
Ok(text)
}
#[instrument(skip(content))]
pub fn create_document(content: &str) -> Result<Vec<u8>> {
let mut docx = Docx::new();
for line in content.lines() {
let paragraph = Paragraph::new()
.add_run(
Run::new().add_text(RunText::new(line))
);
docx = docx.add_paragraph(paragraph);
}
let mut output = Vec::new();
docx.build()
.pack(&mut Cursor::new(&mut output))
.map_err(|e| Error::internal(format!("Failed to create DOCX: {}", e)))?;
Ok(output)
}
#[instrument(skip(template_data, variables))]
pub fn fill_template(template_data: &[u8], variables: &serde_json::Value) -> Result<Vec<u8>> {
let doc = Docx::from_reader(Cursor::new(template_data))
.map_err(|e| Error::internal(format!("Failed to read template: {}", e)))?;
let mut new_doc = doc.clone();
for para in new_doc.document.paragraphs_mut() {
for run in para.runs_mut() {
if let Some(text) = run.text_mut() {
let mut new_text = text.clone();
for (key, value) in variables.as_object().unwrap() {
let placeholder = format!("{{{}}}", key);
new_text = new_text.replace(&placeholder, value.as_str().unwrap_or(""));
}
*text = new_text;
}
}
}
let mut output = Vec::new();
new_doc.build()
.pack(&mut Cursor::new(&mut output))
.map_err(|e| Error::internal(format!("Failed to save filled template: {}", e)))?;
Ok(output)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
use serde_json::json;
#[rstest]
fn test_create_document() -> Result<()> {
let content = "Test document\nSecond line";
let doc_data = WordProcessor::create_document(content)?;
let extracted_text = WordProcessor::extract_text(&doc_data)?;
assert!(extracted_text.contains("Test document"));
assert!(extracted_text.contains("Second line"));
Ok(())
}
#[rstest]
fn test_fill_template() -> Result<()> {
let template = WordProcessor::create_document("Hello, {name}!")?;
"name": "World"
});
let filled = WordProcessor::fill_template(&template, &variables)?;
let text = WordProcessor::extract_text(&filled)?;
assert!(text.contains("Hello, World!"));
Ok(())
}
}

View file

@ -1,32 +0,0 @@
[package]
name = "gb-file"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
async-trait= { workspace = true }
tokio= { workspace = true }
serde = { workspace = true , features = ["derive"] }
serde_json ={ workspace = true }
thiserror= { workspace = true }
tracing= { workspace = true }
futures ={ workspace = true }
uuid = { workspace = true }
jsonwebtoken = { workspace = true }
lettre= { workspace = true }
minio = { workspace = true }
actix-web ={ workspace = true }
actix-multipart ={ workspace = true }
sanitize-filename = { workspace = true }
tempfile = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
tokio-stream = { workspace = true }
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"
tempfile = "3.8"

View file

@ -1 +0,0 @@
pub mod handlers;

View file

@ -1,27 +0,0 @@
[package]
name = "gb-image"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
image = { version = "0.24", features = ["webp", "jpeg", "png", "gif"] }
imageproc = "0.23"
rusttype = "0.9"
async-trait= { workspace = true }
tokio= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
thiserror= { workspace = true }
tracing= { workspace = true }
tempfile = "3.8"
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"
[build-dependencies]
reqwest = { version = "0.11", features = ["blocking"] }

File diff suppressed because one or more lines are too long

View file

@ -1,21 +0,0 @@
use std::fs;
use std::path::Path;
fn main() {
let assets_dir = Path::new("assets");
if !assets_dir.exists() {
fs::create_dir(assets_dir).expect("Failed to create assets directory");
}
let font_path = assets_dir.join("DejaVuSans.ttf");
if !font_path.exists() {
let font_url = "https://github.com/dejavu-fonts/dejavu-fonts/raw/master/ttf/DejaVuSans.ttf";
let response = reqwest::blocking::get(font_url)
.expect("Failed to download font")
.bytes()
.expect("Failed to get font bytes");
fs::write(font_path, response)
.expect("Failed to save font file");
}
}

View file

@ -1,74 +0,0 @@
use std::io::Cursor;
use gb_core::{Result, Error};
use image::{ImageOutputFormat, DynamicImage};
use tracing::instrument;
pub struct ImageConverter;
impl ImageConverter {
#[instrument]
pub fn to_jpeg(img: &DynamicImage, quality: u8) -> Result<Vec<u8>> {
let mut buffer = Cursor::new(Vec::new());
img.write_to(&mut buffer, ImageOutputFormat::Jpeg(quality))
.map_err(|e| Error::internal(format!("JPEG conversion failed: {}", e)))?;
Ok(buffer.into_inner())
}
#[instrument]
pub fn to_png(img: &DynamicImage) -> Result<Vec<u8>> {
let mut buffer = Cursor::new(Vec::new());
img.write_to(&mut buffer, ImageOutputFormat::Png)
.map_err(|e| Error::internal(format!("PNG conversion failed: {}", e)))?;
Ok(buffer.into_inner())
}
#[instrument]
pub fn to_webp(img: &DynamicImage, quality: u8) -> Result<Vec<u8>> {
let mut buffer = Cursor::new(Vec::new());
img.write_to(&mut buffer, ImageOutputFormat::WebP)
.map_err(|e| Error::internal(format!("WebP conversion failed: {}", e)))?;
Ok(buffer.into_inner())
}
#[instrument]
pub fn to_gif(img: &DynamicImage) -> Result<Vec<u8>> {
let mut buffer = Cursor::new(Vec::new());
img.write_to(&mut buffer, ImageOutputFormat::Gif)
.map_err(|e| Error::internal(format!("GIF conversion failed: {}", e)))?;
Ok(buffer.into_inner())
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
#[fixture]
fn test_image() -> DynamicImage {
DynamicImage::new_rgb8(100, 100)
}
#[rstest]
fn test_jpeg_conversion(test_image: DynamicImage) -> Result<()> {
let jpeg_data = ImageConverter::to_jpeg(&test_image, 80)?;
assert!(!jpeg_data.is_empty());
assert_eq!(image::guess_format(&jpeg_data).unwrap(), image::ImageFormat::Jpeg);
Ok(())
}
#[rstest]
fn test_png_conversion(test_image: DynamicImage) -> Result<()> {
let png_data = ImageConverter::to_png(&test_image)?;
assert!(!png_data.is_empty());
assert_eq!(image::guess_format(&png_data).unwrap(), image::ImageFormat::Png);
Ok(())
}
#[rstest]
fn test_webp_conversion(test_image: DynamicImage) -> Result<()> {
let webp_data = ImageConverter::to_webp(&test_image, 80)?;
assert!(!webp_data.is_empty());
Ok(())
}
}

View file

@ -1,54 +0,0 @@
pub mod processor;
pub mod converter;
pub use processor::ImageProcessor;
pub use converter::ImageConverter;
// Remove the ImageFormat re-export since it's private in the image crate
pub use image::ImageFormat;
#[cfg(test)]
mod tests {
use super::*;
use gb_core::Result;
use image::{DynamicImage, Rgba};
#[tokio::test]
async fn test_image_processing_integration() -> Result<()> {
// Initialize components
let processor = ImageProcessor::new();
// Create test image
let mut image = DynamicImage::new_rgb8(200, 200);
// Test image processing operations
let resized = processor.resize(&image, 100, 100);
assert_eq!(resized.width(), 100);
assert_eq!(resized.height(), 100);
let cropped = processor.crop(&image, 50, 50, 100, 100)?;
assert_eq!(cropped.width(), 100);
assert_eq!(cropped.height(), 100);
let _blurred = processor.apply_blur(&image, 1.0);
let _brightened = processor.adjust_brightness(&image, 10);
let _contrasted = processor.adjust_contrast(&image, 1.2);
// Test text addition
processor.add_text(
&mut image,
"Integration Test",
10,
10,
24.0,
Rgba([0, 0, 0, 255]),
)?;
// Test format conversion
let _webp_data = ImageConverter::to_webp(&image, 80)?;
let _jpeg_data = ImageConverter::to_jpeg(&image, 80)?;
let _png_data = ImageConverter::to_png(&image)?;
let _gif_data = ImageConverter::to_gif(&image)?;
Ok(())
}
}

View file

@ -1,74 +0,0 @@
use gb_core::{Error, Result};
use image::{DynamicImage, Rgba};
use imageproc::drawing::draw_text_mut;
use rusttype::{Font, Scale};
use std::fs;
pub struct ImageProcessor;
impl ImageProcessor {
pub fn new() -> Self {
Self
}
pub fn resize(&self, image: &DynamicImage, width: u32, height: u32) -> DynamicImage {
image.resize(width, height, image::imageops::FilterType::Lanczos3)
}
pub fn crop(&self, image: &DynamicImage, x: u32, y: u32, width: u32, height: u32) -> Result<DynamicImage> {
if x + width > image.width() || y + height > image.height() {
return Err(Error::internal("Crop dimensions exceed image bounds".to_string()));
}
Ok(image.crop_imm(x, y, width, height))
}
pub fn apply_blur(&self, image: &DynamicImage, sigma: f32) -> DynamicImage {
image.blur(sigma)
}
pub fn adjust_brightness(&self, image: &DynamicImage, value: i32) -> DynamicImage {
image.brighten(value)
}
pub fn adjust_contrast(&self, image: &DynamicImage, value: f32) -> DynamicImage {
image.adjust_contrast(value)
}
pub fn add_text(
&self,
image: &mut DynamicImage,
text: &str,
x: i32,
y: i32,
size: f32,
color: Rgba<u8>,
) -> Result<()> {
// Load the font file from assets (downloaded in build.rs)
let font_data = fs::read("assets/DejaVuSans.ttf")
.map_err(|e| Error::internal(format!("Failed to load font: {}", e)))?;
let font = Font::try_from_vec(font_data)
.ok_or_else(|| Error::internal("Failed to parse font data".to_string()))?;
let scale = Scale::uniform(size);
let image_buffer = image.as_mut_rgba8()
.ok_or_else(|| Error::internal("Failed to convert image to RGBA".to_string()))?;
draw_text_mut(
image_buffer,
color,
x,
y,
scale,
&font,
text
);
Ok(())
}
}

View file

@ -1,18 +0,0 @@
[package]
name = "gb-infra"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
[dependencies]
dotenv = { workspace = true }
ctrlc = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
flate2 = { workspace = true }
tar = { workspace = true }
zip = { workspace = true }

View file

@ -1,20 +0,0 @@
# Backup
## Fastest way to tranfer files between servers over TCP/IP
rsync -avz --progress --bwlimit=0 -e "ssh -p 22 -T -c aes128-gcm@openssh.com -o Compression=no -o IPQoS=throughput" gbbackup@host.com.br:/opt/gbo/backup /home/user/Desktop
# Security
apt update && apt install -y fail2ban
systemctl enable fail2ban
apt update && apt install -y fail2ban iptables-persistent
systemctl enable fail2ban
systemctl enable netfilter-persistent
# Add
Docusign: https://www.docuseal.com/on-premises

Binary file not shown.

Before

Width:  |  Height:  |  Size: 148 KiB

View file

@ -1,78 +0,0 @@
flowchart TB
%% Style definitions
classDef users fill:#FF9900,stroke:#FF6600,stroke-width:2px,color:white,font-weight:bold
classDef identity fill:#4285F4,stroke:#2956B2,stroke-width:2px,color:white,font-weight:bold
classDef content fill:#0F9D58,stroke:#0B8043,stroke-width:2px,color:white,font-weight:bold
classDef communication fill:#DB4437,stroke:#B31412,stroke-width:2px,color:white,font-weight:bold
classDef ai fill:#9C27B0,stroke:#7B1FA2,stroke-width:2px,color:white,font-weight:bold
classDef bot fill:#FB8C00,stroke:#EF6C00,stroke-width:2px,color:white,font-weight:bold
%% Main user node
User((👤 Users))
subgraph "Identity & Access Management"
Zitadel["🔐 Identity Provider<br>(Zitadel)"]
Stalwart["✉️ Mail Server<br>(Stalwart)"]
end
subgraph "Content & Collaboration"
Forgejo["📊 ALM Server<br>(Forgejo)"]
Static["🌐 Static Site Generator<br>(Website Static)"]
Minio["💾 Object Storage<br>(MinIO)"]
end
subgraph "Communication & Delivery"
Caddy["🔄 Reverse Proxy<br>(Caddy)"]
LiveKit["💬 Real-time Communication<br>(LiveKit)"]
end
subgraph "AI & Integration Layer"
LLM["🧠 LLM Engine<br>(Nomic LLM)"]
subgraph "Bot Framework"
BotFramework["🤖 Bot Framework"]
subgraph "Bot Capabilities"
MCP["📨 Message Control Protocol<br>(MCP)"]
GET["🔗 GET Web Service Calls"]
BASIC["⚙️ BASIC Engine"]
end
end
end
%% Connection lines with colors
User --> |"User Access"| Caddy
Caddy --> |"Auth"| Zitadel
Caddy --> |"Code & Issues"| Forgejo
Caddy --> |"Content"| Static
Caddy --> |"Real-time"| LiveKit
Caddy --> |"AI & Bots"| BotFramework
Zitadel --> |"SSO"| Forgejo
Zitadel --> |"Auth"| LiveKit
Zitadel --> |"Identity"| BotFramework
Forgejo --> |"Store"| Minio
Static --> |"Assets"| Minio
BotFramework --> MCP
BotFramework --> GET
BotFramework --> BASIC
BotFramework --> |"NLP"| LLM
Stalwart --> |"Email"| BotFramework
LiveKit --> |"Messaging"| BotFramework
%% Integration flows - dashed lines with colors
MCP -.-> |"Message Routing"| Stalwart
GET -.-> |"API Calls"| Forgejo
BASIC -.-> |"Scripting"| Minio
LLM -.-> |"Content Generation"| Static
%% Apply styles
class User users
class Zitadel,Stalwart identity
class Forgejo,Static,Minio content
class Caddy,LiveKit communication
class LLM ai
class BotFramework,MCP,GET,BASIC bot

View file

@ -1,9 +0,0 @@
pub mod manager;
pub mod setup;
pub mod services {
pub mod minio;
pub mod nginx;
pub mod postgresql;
pub mod stalwart;
pub mod zitadel;
}

View file

@ -1,60 +0,0 @@
use crate::services::{zitadel, stalwart, minio, postgresql, nginx};
use dotenv::dotenv;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
pub struct ServiceManager {
services: Vec<Box<dyn Service>>,
}
impl ServiceManager {
pub fn new() -> Self {
dotenv().ok();
ServiceManager {
services: vec![
Box::new(zitadel::Zitadel::new()),
Box::new(stalwart::Stalwart::new()),
Box::new(minio::MinIO::new()),
Box::new(postgresql::PostgreSQL::new()),
Box::new(nginx::NGINX::new()),
],
}
}
pub fn start(&mut self) {
for service in &mut self.services {
service.start().unwrap();
}
}
pub fn stop(&mut self) {
for service in &mut self.services {
service.stop().unwrap();
}
}
pub fn run(&mut self) {
self.start();
let running = Arc::new(Mutex::new(true));
let running_clone = Arc::clone(&running);
ctrlc::set_handler(move || {
println!("Exiting service manager...");
let mut running = running_clone.lock().unwrap();
*running = false;
})
.expect("Failed to set Ctrl+C handler.");
while *running.lock().unwrap() {
thread::sleep(Duration::from_secs(1));
}
self.stop();
}
}
pub trait Service {
fn start(&mut self) -> Result<(), String>;
fn stop(&mut self) -> Result<(), String>;
}

View file

@ -1,54 +0,0 @@
use crate::manager::Service;
use std::env;
use std::process::Command;
use std::collections::HashMap;
use dotenv::dotenv;
pub struct MinIO {
env_vars: HashMap<String, String>,
process: Option<std::process::Child>,
}
impl MinIO {
pub fn new() -> Self {
dotenv().ok();
let env_vars = vec![
"MINIO_ROOT_USER",
"MINIO_ROOT_PASSWORD",
"MINIO_VOLUMES",
"MINIO_ADDRESS",
]
.into_iter()
.filter_map(|key| env::var(key).ok().map(|value| (key.to_string(), value)))
.collect();
MinIO {
env_vars,
process: None,
}
}
}
impl Service for MinIO {
fn start(&mut self) -> Result<(), String> {
if self.process.is_some() {
return Err("MinIO is already running.".to_string());
}
let mut command = Command::new("/opt/gbo/bin/minio");
for (key, value) in &self.env_vars {
command.env(key, value);
}
self.process = Some(command.spawn().map_err(|e| e.to_string())?);
Ok(())
}
fn stop(&mut self) -> Result<(), String> {
if let Some(mut child) = self.process.take() {
child.kill().map_err(|e| e.to_string())?;
child.wait().map_err(|e| e.to_string())?;
}
Ok(())
}
}

View file

@ -1,83 +0,0 @@
use crate::manager::Service;
use std::env;
use std::process::Command;
use std::collections::HashMap;
use dotenv::dotenv;
pub struct NGINX {
env_vars: HashMap<String, String>,
process: Option<std::process::Child>,
}
impl NGINX {
pub fn new() -> Self {
dotenv().ok();
let env_vars = vec![
"NGINX_ERROR_LOG",
"NGINX_ACCESS_LOG",
]
.into_iter()
.filter_map(|key| env::var(key).ok().map(|value| (key.to_string(), value)))
.collect();
NGINX {
env_vars,
process: None,
}
}
}
impl Service for NGINX {
fn start(&mut self) -> Result<(), String> {
if self.process.is_some() {
return Err("NGINX is already running.".to_string());
}
// Configure NGINX logs
let error_log = self.env_vars.get("NGINX_ERROR_LOG").unwrap();
let access_log = self.env_vars.get("NGINX_ACCESS_LOG").unwrap();
// Update NGINX configuration
let nginx_conf = format!(
r#"
error_log {} debug;
access_log {};
events {{}}
http {{
server {{
listen 80;
server_name localhost;
location / {{
root /var/www/html;
}}
}}
}}
"#,
error_log, access_log
);
// Write the configuration to /etc/nginx/nginx.conf
std::fs::write("/etc/nginx/nginx.conf", nginx_conf).map_err(|e| e.to_string())?;
// Start NGINX
let mut command = Command::new("nginx");
self.process = Some(command.spawn().map_err(|e| e.to_string())?);
Ok(())
}
fn stop(&mut self) -> Result<(), String> {
if let Some(mut child) = self.process.take() {
child.kill().map_err(|e| e.to_string())?;
child.wait().map_err(|e| e.to_string())?;
}
// Stop NGINX
Command::new("nginx")
.arg("-s")
.arg("stop")
.status()
.map_err(|e| e.to_string())?;
Ok(())
}
}

View file

@ -1,89 +0,0 @@
use crate::manager::Service;
use std::env;
use std::process::Command;
use std::collections::HashMap;
use dotenv::dotenv;
pub struct PostgreSQL {
env_vars: HashMap<String, String>,
process: Option<std::process::Child>,
}
impl PostgreSQL {
pub fn new() -> Self {
dotenv().ok();
let env_vars = vec![
"POSTGRES_DATA_DIR",
"POSTGRES_PORT",
"POSTGRES_USER",
"POSTGRES_PASSWORD",
]
.into_iter()
.filter_map(|key| env::var(key).ok().map(|value| (key.to_string(), value)))
.collect();
PostgreSQL {
env_vars,
process: None,
}
}
}
impl Service for PostgreSQL {
fn start(&mut self) -> Result<(), String> {
if self.process.is_some() {
return Err("PostgreSQL is already running.".to_string());
}
// Initialize PostgreSQL data directory if it doesn't exist
let data_dir = self.env_vars.get("POSTGRES_DATA_DIR").unwrap();
if !std::path::Path::new(data_dir).exists() {
Command::new("sudo")
.arg("-u")
.arg("postgres")
.arg("/usr/lib/postgresql/14/bin/initdb")
.arg("-D")
.arg(data_dir)
.status()
.map_err(|e| e.to_string())?;
}
// Start PostgreSQL
let mut command = Command::new("sudo");
command
.arg("-u")
.arg("postgres")
.arg("/usr/lib/postgresql/14/bin/pg_ctl")
.arg("start")
.arg("-D")
.arg(data_dir);
for (key, value) in &self.env_vars {
command.env(key, value);
}
self.process = Some(command.spawn().map_err(|e| e.to_string())?);
Ok(())
}
fn stop(&mut self) -> Result<(), String> {
if let Some(mut child) = self.process.take() {
child.kill().map_err(|e| e.to_string())?;
child.wait().map_err(|e| e.to_string())?;
}
// Stop PostgreSQL
let data_dir = self.env_vars.get("POSTGRES_DATA_DIR").unwrap();
Command::new("sudo")
.arg("-u")
.arg("postgres")
.arg("/usr/lib/postgresql/14/bin/pg_ctl")
.arg("stop")
.arg("-D")
.arg(data_dir)
.status()
.map_err(|e| e.to_string())?;
Ok(())
}
}

View file

@ -1,58 +0,0 @@
use crate::manager::Service;
use std::env;
use std::process::Command;
use std::collections::HashMap;
use dotenv::dotenv;
pub struct Stalwart {
env_vars: HashMap<String, String>,
process: Option<std::process::Child>,
}
impl Stalwart {
pub fn new() -> Self {
dotenv().ok();
let env_vars = vec![
"STALWART_LOG_LEVEL",
"STALWART_OAUTH_PROVIDER",
"STALWART_OAUTH_CLIENT_ID",
"STALWART_OAUTH_CLIENT_SECRET",
"STALWART_OAUTH_AUTHORIZATION_ENDPOINT",
"STALWART_OAUTH_TOKEN_ENDPOINT",
"STALWART_OAUTH_USERINFO_ENDPOINT",
"STALWART_OAUTH_SCOPE",
]
.into_iter()
.filter_map(|key| env::var(key).ok().map(|value| (key.to_string(), value)))
.collect();
Stalwart {
env_vars,
process: None,
}
}
}
impl Service for Stalwart {
fn start(&mut self) -> Result<(), String> {
if self.process.is_some() {
return Err("Stalwart Mail is already running.".to_string());
}
let mut command = Command::new("/opt/gbo/bin/stalwart");
for (key, value) in &self.env_vars {
command.env(key, value);
}
self.process = Some(command.spawn().map_err(|e| e.to_string())?);
Ok(())
}
fn stop(&mut self) -> Result<(), String> {
if let Some(mut child) = self.process.take() {
child.kill().map_err(|e| e.to_string())?;
child.wait().map_err(|e| e.to_string())?;
}
Ok(())
}
}

View file

@ -1,63 +0,0 @@
use crate::manager::Service;
use std::env;
use std::process::Command;
use std::collections::HashMap;
use dotenv::dotenv;
pub struct Zitadel {
env_vars: HashMap<String, String>,
process: Option<std::process::Child>,
}
impl Zitadel {
pub fn new() -> Self {
dotenv().ok();
let env_vars = vec![
"ZITADEL_DEFAULTINSTANCE_INSTANCENAME",
"ZITADEL_DEFAULTINSTANCE_ORG_NAME",
"ZITADEL_DATABASE_POSTGRES_HOST",
"ZITADEL_DATABASE_POSTGRES_PORT",
"ZITADEL_DATABASE_POSTGRES_DATABASE",
"ZITADEL_DATABASE_POSTGRES_USER_USERNAME",
"ZITADEL_DATABASE_POSTGRES_USER_PASSWORD",
"ZITADEL_DATABASE_POSTGRES_ADMIN_SSL_MODE",
"ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE",
"ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME",
"ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD",
"ZITADEL_EXTERNALSECURE",
"ZITADEL_MASTERKEY",
]
.into_iter()
.filter_map(|key| env::var(key).ok().map(|value| (key.to_string(), value)))
.collect();
Zitadel {
env_vars,
process: None,
}
}
}
impl Service for Zitadel {
fn start(&mut self) -> Result<(), String> {
if self.process.is_some() {
return Err("Zitadel is already running.".to_string());
}
let mut command = Command::new("/opt/gbo/bin/zitadel");
for (key, value) in &self.env_vars {
command.env(key, value);
}
self.process = Some(command.spawn().map_err(|e| e.to_string())?);
Ok(())
}
fn stop(&mut self) -> Result<(), String> {
if let Some(mut child) = self.process.take() {
child.kill().map_err(|e| e.to_string())?;
child.wait().map_err(|e| e.to_string())?;
}
Ok(())
}
}

View file

@ -1,276 +0,0 @@
use std::{
fs::{self, File},
io::{self, Write},
path::{Path, PathBuf},
process::Command,
env,
};
use reqwest::blocking::Client;
use flate2::read::GzDecoder;
use tar::Archive;
use zip::ZipArchive;
// TODO: https://helpcenter.onlyoffice.com/docs/installation/docs-community-install-ubuntu.aspx
const INSTALL_DIR: &str = "/opt/gbo";
const TEMP_DIR: &str = "/tmp/gbotemp";
#[derive(Debug)]
struct Component {
name: &'static str,
bin_dir: &'static str,
download_url: Option<&'static str>,
archive_type: ArchiveType,
binaries: Vec<&'static str>,
config_files: Vec<ConfigFile>,
}
#[derive(Debug)]
struct ConfigFile {
src_url: Option<&'static str>,
src_path: Option<&'static str>,
dest_name: &'static str,
}
#[derive(Debug)]
enum ArchiveType {
TarGz,
Zip,
Binary,
}
pub fn doIt() -> io::Result<()> {
// Define all components
let components = [
// Directory (Zitadel)
Component {
name: "zitadel",
bin_dir: "directory",
download_url: Some("https://github.com/zitadel/zitadel/releases/latest/download/zitadel_Linux_x86_64.tar.gz"),
archive_type: ArchiveType::TarGz,
binaries: vec!["zitadel"],
config_files: vec![ConfigFile {
src_url: None,
src_path: Some("src/config/directory/zitadel.yaml"),
dest_name: "zitadel.yaml",
}],
},
// Mail (Stalwart)
Component {
name: "stalwart-mail",
bin_dir: "mail",
download_url: Some("https://github.com/stalwartlabs/mail-server/releases/latest/download/stalwart-linux-x86_64.tar.gz"),
archive_type: ArchiveType::TarGz,
binaries: vec!["stalwart-mail"],
config_files: vec![ConfigFile {
src_url: Some("https://raw.githubusercontent.com/stalwartlabs/mail-server/main/resources/config/config.toml"),
src_path: None,
dest_name: "config.toml",
}],
},
// Tabular (PostgreSQL)
Component {
name: "postgresql",
bin_dir: "tabular",
download_url: Some("https://get.enterprisedb.com/postgresql/postgresql-14.10-1-linux-x64-binaries.tar.gz"),
archive_type: ArchiveType::TarGz,
binaries: vec!["postgres", "pg_ctl", "psql", "pg_dump", "pg_restore"],
config_files: vec![],
},
// Object (MinIO)
Component {
name: "minio",
bin_dir: "object",
download_url: Some("https://dl.min.io/server/minio/release/linux-amd64/minio"),
archive_type: ArchiveType::Binary,
binaries: vec!["minio"],
config_files: vec![],
},
// Webserver (Caddy)
Component {
name: "caddy",
bin_dir: "webserver",
download_url: Some("https://github.com/caddyserver/caddy/releases/latest/download/caddy_linux_amd64.tar.gz"),
archive_type: ArchiveType::TarGz,
binaries: vec!["caddy"],
config_files: vec![ConfigFile {
src_url: None,
src_path: Some("src/config/webserver/Caddyfile"),
dest_name: "Caddyfile",
}],
},
];
// Create directories
create_directories()?;
// Install dependencies
install_dependencies()?;
// Create HTTP client
let client = Client::new();
// Process all components
for component in components.iter() {
install_component(&component, &client)?;
}
// Clean up temp directory
fs::remove_dir_all(TEMP_DIR)?;
println!("All binaries downloaded to {}", INSTALL_DIR);
println!("Use the start-stop script to manually control all components");
Ok(())
}
fn create_directories() -> io::Result<()> {
println!("Creating directories...");
// Main directories
fs::create_dir_all(INSTALL_DIR)?;
Command::new("chmod").args(["777", INSTALL_DIR]).status()?;
fs::create_dir_all(TEMP_DIR)?;
// Component directories
let dirs = [
"bin/bot", "bin/mail", "bin/tabular", "bin/object",
"bin/directory", "bin/alm", "bin/webserver", "bin/meeting",
"config/bot", "config/mail", "config/tabular", "config/object",
"config/directory", "config/alm", "config/webserver", "config/meeting",
"data/bot", "data/mail", "data/tabular", "data/object",
"data/directory", "data/alm", "data/webserver", "data/meeting",
"logs", "certs"
];
for dir in dirs {
fs::create_dir_all(format!("{}/{}", INSTALL_DIR, dir))?;
}
Ok(())
}
fn install_dependencies() -> io::Result<()> {
println!("Installing system dependencies...");
Command::new("apt-get").args(["update"]).status()?;
Command::new("apt-get").args(["install", "-y",
"apt-transport-https", "ca-certificates", "curl",
"software-properties-common", "gnupg", "wget",
"unzip", "tar", "postgresql-client", "redis-tools"
]).status()?;
Ok(())
}
fn install_component(component: &Component, client: &Client) -> io::Result<()> {
println!("Installing {}...", component.name);
if let Some(url) = component.download_url {
let temp_path = format!("{}/{}", TEMP_DIR, component.name);
let target_dir = format!("{}/bin/{}", INSTALL_DIR, component.bin_dir);
// Download the file
download_file(client, url, &temp_path)?;
match component.archive_type {
ArchiveType::TarGz => {
// Extract tar.gz archive
let tar_gz = File::open(&temp_path)?;
let tar = GzDecoder::new(tar_gz);
let mut archive = Archive::new(tar);
archive.unpack(TEMP_DIR)?;
// Move binaries to target directory
for binary in &component.binaries {
let src = format!("{}/{}", TEMP_DIR, binary);
let dest = format!("{}/{}", target_dir, binary);
if Path::new(&src).exists() {
fs::rename(&src, &dest)?;
set_executable(&dest)?;
} else {
// For PostgreSQL which has binaries in pgsql/bin/
let pg_src = format!("{}/pgsql/bin/{}", TEMP_DIR, binary);
if Path::new(&pg_src).exists() {
fs::rename(&pg_src, &dest)?;
set_executable(&dest)?;
}
}
}
},
ArchiveType::Zip => {
// Extract zip archive
let file = File::open(&temp_path)?;
let mut archive = ZipArchive::new(file)?;
archive.extract(TEMP_DIR)?;
// Move binaries to target directory
for binary in &component.binaries {
let src = format!("{}/{}", TEMP_DIR, binary);
let dest = format!("{}/{}", target_dir, binary);
if Path::new(&src).exists() {
fs::rename(&src, &dest)?;
set_executable(&dest)?;
}
}
},
ArchiveType::Binary => {
// Single binary - just move to target location
let dest = format!("{}/{}", target_dir, component.name);
fs::rename(&temp_path, &dest)?;
set_executable(&dest)?;
},
}
// Clean up downloaded file
fs::remove_file(temp_path)?;
}
// Handle config files
for config in &component.config_files {
let config_dir = format!("{}/config/{}", INSTALL_DIR, component.bin_dir);
let dest_path = format!("{}/{}", config_dir, config.dest_name);
if let Some(url) = config.src_url {
// Download config from URL
download_file(client, url, &dest_path)?;
} else if let Some(src_path) = config.src_path {
// Copy config from local source (placeholder)
println!("Would copy config from {} to {}", src_path, dest_path);
// fs::copy(src_path, dest_path)?;
}
}
println!("{} installed successfully!", component.name);
Ok(())
}
fn download_file(client: &Client, url: &str, dest_path: &str) -> io::Result<()> {
println!("Downloading {} to {}", url, dest_path);
let mut response = client.get(url)
.send()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
if !response.status().is_success() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Failed to download file: HTTP {}", response.status())
));
}
let mut dest_file = File::create(dest_path)?;
response.copy_to(&mut dest_file)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(())
}
fn set_executable(path: &str) -> io::Result<()> {
Command::new("chmod")
.args(["+x", path])
.status()?;
Ok(())
}

View file

@ -1,5 +0,0 @@
printf "%-20s %-10s %-10s %-10s %-6s %s\n" "CONTAINER" "USED" "AVAIL" "TOTAL" "USE%" "MOUNT"
for container in $(lxc list -c n --format csv); do
disk_info=$(lxc exec $container -- df -h / --output=used,avail,size,pcent | tail -n 1)
printf "%-20s %s\n" "$container" "$disk_info"
done

View file

@ -1,16 +0,0 @@
[package]
name = "gb-llm"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
async-trait= { workspace = true }
tokio= { workspace = true }
thiserror= { workspace = true }
tracing= { workspace = true }
[dev-dependencies]
rstest= { workspace = true }

View file

View file

@ -1,23 +0,0 @@
[package]
name = "gb-media"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
tokio= { workspace = true }
webrtc= { workspace = true }
gstreamer= { workspace = true }
opus= { workspace = true }
tracing= { workspace = true }
async-trait= { workspace = true }
serde= { workspace = true }
uuid= { workspace = true }
anyhow= { workspace = true }
[dev-dependencies]
rstest= { workspace = true }
mockall= { workspace = true }
tokio-test = "0.4"

View file

@ -1,56 +0,0 @@
use gb_core::{Result, Error};
use opus::{Encoder, Decoder, Application, Channels};
pub struct AudioProcessor {
encoder: Encoder,
decoder: Decoder,
sample_rate: u32,
channels: Channels,
}
impl AudioProcessor {
pub fn new(sample_rate: u32, channels: Channels) -> Result<Self> {
let encoder = Encoder::new(
sample_rate,
channels,
Application::Audio
).map_err(|e| Error::internal(format!("Failed to create Opus encoder: {}", e)))?;
let decoder = Decoder::new(
sample_rate,
channels
).map_err(|e| Error::internal(format!("Failed to create Opus decoder: {}", e)))?;
Ok(Self {
encoder,
decoder,
sample_rate,
channels,
})
}
pub fn encode(&self, input: &[i16]) -> Result<Vec<u8>> {
let mut output = vec![0u8; 1024];
let encoded_size = self.encoder.encode(
input,
&mut output
).map_err(|e| Error::internal(format!("Failed to encode audio: {}", e)))?;
output.truncate(encoded_size);
Ok(output)
}
pub fn decode(&self, input: &[u8]) -> Result<Vec<i16>> {
let max_size = (self.sample_rate as usize / 50) * self.channels.count();
let mut output = vec![0i16; max_size];
let decoded_size = self.decoder.decode(
Some(input),
&mut output,
false
).map_err(|e| Error::internal(format!("Failed to decode audio: {}", e)))?;
output.truncate(decoded_size);
Ok(output)
}
}

View file

@ -1,5 +0,0 @@
mod processor;
mod webrtc;
pub use processor::MediaProcessor;
pub use webrtc::WebRTCService;

View file

@ -1,82 +0,0 @@
use gb_core::{Result, Error};
use gstreamer as gst;
use gstreamer::prelude::*;
use std::path::PathBuf;
use tracing::{error, instrument};
pub struct MediaProcessor {
pipeline: gst::Pipeline,
}
impl MediaProcessor {
pub fn new() -> Result<Self> {
gst::init().map_err(|e| Error::internal(format!("Failed to initialize GStreamer: {}", e)))?;
let pipeline = gst::Pipeline::new();
Ok(Self { pipeline })
}
fn setup_pipeline(&mut self) -> Result<()> {
self.pipeline.set_state(gst::State::Playing)
.map_err(|e| Error::internal(format!("Failed to start pipeline: {}", e)))?;
Ok(())
}
fn process_messages(&self) -> Result<()> {
let bus = self.pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::from_seconds(1)) {
match msg.view() {
gst::MessageView::Error(err) => {
error!("Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
return Err(Error::internal(format!("Pipeline error: {}", err.error())));
}
gst::MessageView::Eos(_) => break,
_ => ()
}
}
self.pipeline.set_state(gst::State::Null)
.map_err(|e| Error::internal(format!("Failed to stop pipeline: {}", e)))?;
Ok(())
}
#[instrument(skip(self, input_path, output_path))]
pub async fn transcode(
&mut self,
input_path: PathBuf,
output_path: PathBuf,
format: &str
) -> Result<()> {
let source = gst::ElementFactory::make("filesrc")
.build()
.map_err(|e| Error::internal(format!("Failed to create source element: {}", e)))?;
source.set_property("location", input_path.to_str().unwrap());
let sink = gst::ElementFactory::make("filesink")
.build()
.map_err(|e| Error::internal(format!("Failed to create sink element: {}", e)))?;
sink.set_property("location", output_path.to_str().unwrap());
let decoder = match format.to_lowercase().as_str() {
"mp4" => gst::ElementFactory::make("qtdemux").build(),
"webm" => gst::ElementFactory::make("matroskademux").build(),
_ => return Err(Error::internal(format!("Unsupported format: {}", format)))
}.map_err(|e| Error::internal(format!("Failed to create decoder: {}", e)))?;
self.pipeline.add_many(&[&source, &decoder, &sink])
.map_err(|e| Error::internal(format!("Failed to add elements: {}", e)))?;
gst::Element::link_many(&[&source, &decoder, &sink])
.map_err(|e| Error::internal(format!("Failed to link elements: {}", e)))?;
self.setup_pipeline()?;
self.process_messages()
}
}

View file

@ -1,85 +0,0 @@
use gb_core::{Result, Error};
use webrtc::{
api::{API, APIBuilder},
peer_connection::{
RTCPeerConnection,
configuration::RTCConfiguration,
},
track::{
track_local::TrackLocal,
track_remote::TrackRemote,
},
};
use tokio::sync::mpsc;
use tracing::instrument;
use std::sync::Arc;
pub struct WebRTCService {
api: Arc<API>,
peer_connections: Vec<Arc<RTCPeerConnection>>,
}
impl WebRTCService {
pub fn new() -> Result<Self> {
let api = APIBuilder::new().build();
Ok(Self {
api: Arc::new(api),
peer_connections: Vec::new(),
})
}
pub async fn create_peer_connection(&mut self) -> Result<Arc<RTCPeerConnection>> {
let config = RTCConfiguration::default();
let peer_connection = self.api.new_peer_connection(config)
.await
.map_err(|e| Error::internal(format!("Failed to create peer connection: {}", e)))?;
let pc_arc = Arc::new(peer_connection);
self.peer_connections.push(pc_arc.clone());
Ok(pc_arc)
}
pub async fn add_track(
&self,
pc: &RTCPeerConnection,
track: Arc<dyn TrackLocal + Send + Sync>,
) -> Result<()> {
pc.add_track(track)
.await
.map_err(|e| Error::internal(format!("Failed to add track: {}", e)))?;
Ok(())
}
pub async fn on_track<F>(&self, pc: &RTCPeerConnection, mut callback: F)
where
F: FnMut(Arc<TrackRemote>) + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(100);
pc.on_track(Box::new(move |track, _, _| {
let track_clone = track.clone();
let tx = tx.clone();
Box::pin(async move {
let _ = tx.send(track_clone).await;
})
}));
while let Some(track) = rx.recv().await {
callback(track);
}
}
#[instrument(skip(self))]
pub async fn close(&mut self) -> Result<()> {
for pc in self.peer_connections.iter() {
pc.close().await
.map_err(|e| Error::internal(format!("Failed to close peer connection: {}", e)))?;
}
self.peer_connections.clear();
Ok(())
}
}

View file

@ -1,26 +0,0 @@
[package]
name = "gb-messaging"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
tokio= { workspace = true }
rdkafka= { workspace = true }
redis= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
uuid= { workspace = true }
async-trait= { workspace = true }
tracing= { workspace = true }
futures= { workspace = true }
futures-util = "0.3"
chrono = { version = "0.4", features = ["serde"] }
lapin = "2.3"
tokio-tungstenite = { workspace= true, features = ["native-tls"] }
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"

View file

@ -1,53 +0,0 @@
use gb_core::Error;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::config::ClientConfig;
use std::time::Duration;
use serde::Serialize;
#[allow(dead_code)]
pub struct KafkaBroker {
producer: FutureProducer,
// Stored for reconnection logic
broker_address: String,
// Stored for consumer group management
group_id: String,
}
impl KafkaBroker {
pub fn new(broker_address: &str, group_id: &str) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation failed");
Self {
producer,
broker_address: broker_address.to_string(),
group_id: group_id.to_string(),
}
}
pub async fn publish<T: Serialize>(
&self,
topic: &str,
key: &str,
message: &T,
) -> Result<(), Error> {
let payload = serde_json::to_string(message)
.map_err(|e| Error::internal(format!("Serialization failed: {}", e)))?;
self.producer
.send(
FutureRecord::to(topic)
.key(key)
.payload(&payload),
Duration::from_secs(5),
)
.await
.map_err(|(e, _)| Error::internal(format!("Failed to publish message: {}", e)))?;
Ok(())
}
}

View file

@ -1,112 +0,0 @@
use gb_core::{Result, Error};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::ClientConfig;
use std::time::Duration;
use serde::Serialize;
#[allow(dead_code)]
pub struct Kafka {
broker_address: String,
group_id: String,
producer: FutureProducer,
consumer: StreamConsumer,
}
impl Kafka {
pub async fn new(broker_address: &str, group_id: &str) -> Result<Self> {
let producer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.create()
.map_err(|e| Error::kafka(format!("Failed to create producer: {}", e)))?;
let consumer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("group.id", group_id)
.create()
.map_err(|e| Error::kafka(format!("Failed to create consumer: {}", e)))?;
Ok(Self {
broker_address: broker_address.to_string(),
group_id: group_id.to_string(),
producer,
consumer,
})
}
pub async fn publish<T: Serialize>(&self, topic: &str, message: &T) -> Result<()> {
let payload = serde_json::to_string(message)
.map_err(|e| Error::internal(format!("Serialization error: {}", e)))?;
self.producer
.send(
FutureRecord::to(topic)
.payload(payload.as_bytes())
.key(""),
Duration::from_secs(0),
)
.await
.map_err(|(e, _)| Error::kafka(format!("Failed to send message: {}", e)))?;
Ok(())
}
pub async fn subscribe(&self, topic: &str) -> Result<()> {
self.consumer
.subscribe(&[topic])
.map_err(|e| Error::kafka(format!("Failed to subscribe: {}", e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
use tokio;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct TestMessage {
id: Uuid,
content: String,
}
#[fixture]
fn test_message() -> TestMessage {
TestMessage {
id: Uuid::new_v4(),
content: "test message".to_string(),
}
}
#[fixture]
async fn kafka() -> Kafka {
Kafka::new(
"localhost:9092",
"test-group",
).await.unwrap()
}
#[rstest]
#[tokio::test]
async fn test_publish_subscribe(
#[future] kafka: Kafka,
test_message: TestMessage
) {
let topic = "test-topic";
let kafka = kafka.await;
kafka.publish(topic, &test_message)
.await
.unwrap();
kafka.subscribe(topic)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

View file

@ -1,88 +0,0 @@
mod kafka;
mod redis_pubsub;
mod websocket;
mod processor;
pub mod models;
pub use kafka::Kafka;
pub use redis_pubsub::RedisPubSub;
pub use websocket::WebSocketClient;
pub use processor::MessageProcessor;
pub use models::MessageEnvelope;
mod broker;
pub use broker::KafkaBroker;
#[cfg(test)]
mod tests {
use super::*;
use gb_core::models::Message;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::sync::Arc;
use redis::Client;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
id: Uuid,
content: String,
}
#[tokio::test]
async fn test_messaging_integration() {
let kafka = KafkaBroker::new(
"localhost:9092",
"test-group",
);
let redis_client = Client::open("redis://localhost")
.expect("Failed to create Redis client");
let redis = RedisPubSub::new(Arc::new(redis_client));
let mut websocket = WebSocketClient::connect("ws://localhost:8080")
.await
.unwrap();
let test_message = TestMessage {
id: Uuid::new_v4(),
content: "integration test".to_string(),
};
kafka.publish("test-topic", &test_message.id.to_string(), &test_message)
.await
.unwrap();
redis.publish("test-channel", &test_message)
.await
.unwrap();
websocket.send_message(&serde_json::to_string(&test_message).unwrap())
.await
.unwrap();
let mut processor = MessageProcessor::new();
processor.register_handler("test", |envelope| {
println!("Processed message: {}", envelope.message.content);
Ok(())
});
let message = Message {
id: Uuid::new_v4(),
customer_id: Uuid::new_v4(),
instance_id: Uuid::new_v4(),
conversation_id: Uuid::new_v4(),
sender_id: Uuid::new_v4(),
kind: "test".to_string(),
content: "test content".to_string(),
metadata: serde_json::Value::Object(serde_json::Map::new()),
created_at: Some(chrono::Utc::now()),
shard_key: Some(0),
};
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message,
metadata: std::collections::HashMap::new(),
};
processor.sender().send(envelope).unwrap();
}
}

View file

@ -1,11 +0,0 @@
use gb_core::models::Message;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageEnvelope {
pub id: Uuid,
pub message: Message,
pub metadata: HashMap<String, String>,
}

View file

@ -1,150 +0,0 @@
use gb_core::{Result, models::*}; // This will import both Message and MessageId
use gb_core::Error;
use uuid::Uuid;
use std::collections::HashMap;
use tracing::instrument;
use crate::MessageEnvelope;
use tokio::sync::broadcast; // Add this import
use std::sync::Arc;
use tracing::{error}; // Add error and info macros here
pub struct MessageProcessor {
tx: broadcast::Sender<MessageEnvelope>,
rx: broadcast::Receiver<MessageEnvelope>,
handlers: Arc<HashMap<String, Box<dyn Fn(MessageEnvelope) -> Result<()> + Send + Sync>>>,
}
impl Clone for MessageProcessor {
fn clone(&self) -> Self {
MessageProcessor {
tx: self.tx.clone(),
rx: self.tx.subscribe(),
handlers: Arc::clone(&self.handlers),
}
}
}
impl MessageProcessor {
pub fn new() -> Self {
Self::new_with_buffer_size(100)
}
pub fn new_with_buffer_size(buffer_size: usize) -> Self {
let (tx, rx) = broadcast::channel(buffer_size);
Self {
tx,
rx,
handlers: Arc::new(HashMap::new()),
}
}
pub fn sender(&self) -> broadcast::Sender<MessageEnvelope> {
self.tx.clone()
}
#[instrument(skip(self, handler))]
pub fn register_handler<F>(&mut self, kind: &str, handler: F)
where
F: Fn(MessageEnvelope) -> Result<()> + Send + Sync + 'static,
{
Arc::get_mut(&mut self.handlers)
.expect("Cannot modify handlers")
.insert(kind.to_string(), Box::new(handler));
}
#[instrument(skip(self))]
pub async fn add_message(&mut self, message: Message) -> Result<MessageId> {
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message,
metadata: HashMap::new(),
};
self.tx.send(envelope.clone())
.map_err(|e| Error::internal(format!("Failed to queue message: {}", e)))?;
// Start processing immediately
if let Some(handler) = self.handlers.get(&envelope.message.kind) {
handler(envelope.clone())
.map_err(|e| Error::internal(format!("Handler error: {}", e)))?;
}
Ok(MessageId(envelope.id))
}
#[instrument(skip(self))]
pub async fn process_messages(&mut self) -> Result<()> {
while let Ok(envelope) = self.rx.recv().await {
if let Some(handler) = self.handlers.get(&envelope.message.kind) {
if let Err(e) = handler(envelope.clone()) {
error!("Handler error for message {}: {}", envelope.id, e);
}
tracing::info!("Processing message: {:?}", &envelope.message.id);
} else {
error!("No handler registered for message kind: {}", envelope.message.kind);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use gb_core::models::Message;
use rstest::*;
use uuid::Uuid;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
#[fixture]
fn test_message() -> Message {
Message {
id: Uuid::new_v4(),
customer_id: Uuid::new_v4(),
instance_id: Uuid::new_v4(),
conversation_id: Uuid::new_v4(),
sender_id: Uuid::new_v4(),
kind: "test".to_string(),
content: "test content".to_string(),
metadata: serde_json::Value::Object(serde_json::Map::new()),
created_at: Some(chrono::Utc::now()),
shard_key: Some(0),
}
}
#[rstest]
#[tokio::test]
async fn test_message_processor(test_message: Message) {
let mut processor = MessageProcessor::new();
let processed = Arc::new(Mutex::new(false));
let processed_clone = processed.clone();
processor.register_handler("test", move |envelope| {
assert_eq!(envelope.message.content, "test content");
let mut processed = processed_clone.blocking_lock();
*processed = true;
Ok(())
});
let mut processor_clone = processor.clone();
let handle = tokio::spawn(async move {
processor_clone.process_messages().await.unwrap();
});
let envelope = MessageEnvelope {
id: Uuid::new_v4(),
message: test_message,
metadata: HashMap::new(),
};
processor.sender().send(envelope).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(*processed.lock().await);
handle.abort();
}
}

View file

@ -1,123 +0,0 @@
use gb_core::{Result, Error};
use redis::{Client, AsyncCommands};
use serde::Serialize;
use std::sync::Arc;
use tracing::instrument;
use futures_util::StreamExt;
#[derive(Clone)]
pub struct RedisPubSub {
client: Arc<Client>,
}
impl RedisPubSub {
pub fn new(client: Arc<Client>) -> Self {
Self { client }
}
#[instrument(skip(self, payload), err)]
pub async fn publish<T: Serialize>(&self, channel: &str, payload: &T) -> Result<()> {
let mut conn = self.client
.get_async_connection()
.await
.map_err(|e| Error::internal(e.to_string()))?;
let serialized = serde_json::to_string(payload)
.map_err(|e| Error::internal(e.to_string()))?;
conn.publish::<_, _, i32>(channel, serialized)
.await
.map_err(|e| Error::internal(e.to_string()))?;
Ok(())
}
#[instrument(skip(self, handler), err)]
pub async fn subscribe<F>(&self, channels: &[&str], mut handler: F) -> Result<()>
where
F: FnMut(String, String) + Send + 'static,
{
let mut pubsub = self.client
.get_async_connection()
.await
.map_err(|e| Error::internal(e.to_string()))?
.into_pubsub();
for channel in channels {
pubsub.subscribe(*channel)
.await
.map_err(|e| Error::internal(e.to_string()))?;
}
let mut stream = pubsub.on_message();
while let Some(msg) = stream.next().await {
let channel = msg.get_channel_name().to_string();
let payload: String = msg.get_payload()
.map_err(|e| Error::internal(e.to_string()))?;
handler(channel, payload);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use redis::Client;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use tokio::sync::mpsc;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
id: Uuid,
content: String,
}
async fn setup() -> (RedisPubSub, TestMessage) {
let client = Arc::new(Client::open("redis://127.0.0.1/").unwrap());
let redis_pubsub = RedisPubSub::new(client);
let test_message = TestMessage {
id: Uuid::new_v4(),
content: "test message".to_string(),
};
(redis_pubsub, test_message)
}
#[tokio::test]
async fn test_publish_subscribe() {
let (redis_pubsub, test_message) = setup().await;
let channel = "test_channel";
let (tx, mut rx) = mpsc::channel(1);
let pubsub_clone = redis_pubsub.clone();
tokio::spawn(async move {
let handler = move |_channel: String, payload: String| {
let received: TestMessage = serde_json::from_str(&payload).unwrap();
tx.try_send(received).unwrap();
};
pubsub_clone.subscribe(&[channel], handler).await.unwrap();
});
// Give the subscriber time to connect
tokio::time::sleep(Duration::from_millis(100)).await;
redis_pubsub.publish(channel, &test_message).await.unwrap();
let received = tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(received, test_message);
}
}

View file

@ -1,88 +0,0 @@
use futures_util::SinkExt;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use gb_core::{Result, Error};
pub struct WebSocketClient {
stream: tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>
>,
}
impl WebSocketClient {
fn to_gb_error(error: tokio_tungstenite::tungstenite::Error) -> Error {
Error::new(
gb_core::ErrorKind::WebSocket(error.to_string()),
error.to_string()
)
}
pub async fn connect(url: &str) -> Result<Self> {
let (ws_stream, _) = connect_async(url).await.map_err(Self::to_gb_error)?;
Ok(Self {
stream: ws_stream,
})
}
pub async fn send_message(&mut self, payload: &str) -> Result<()> {
self.stream
.send(Message::Text(payload.to_string()))
.await
.map_err(Self::to_gb_error)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use rstest::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::net::TcpListener;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestMessage {
id: Uuid,
content: String,
}
async fn create_test_server() -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
let ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
let (mut write, mut read) = ws_stream.split();
while let Some(Ok(msg)) = read.next().await {
if let Message::Text(_) = msg {
write.send(msg).await.unwrap();
}
}
}
});
format!("ws://{}", addr)
}
#[fixture]
fn test_message() -> TestMessage {
TestMessage {
id: Uuid::new_v4(),
content: "test message".to_string(),
}
}
#[rstest]
#[tokio::test]
async fn test_websocket(test_message: TestMessage) {
let server_url = create_test_server().await;
let mut client = WebSocketClient::connect(&server_url).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
client.send_message(&serde_json::to_string(&test_message).unwrap()).await.unwrap();
}
}

View file

@ -1,37 +0,0 @@
-- Create users table
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) NOT NULL UNIQUE,
name VARCHAR(255),
password_hash VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL DEFAULT 'user',
status VARCHAR(50) NOT NULL DEFAULT 'active',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Create sessions table
CREATE TABLE IF NOT EXISTS sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
refresh_token VARCHAR(255) NOT NULL UNIQUE,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_refresh_token ON sessions(refresh_token);
-- Add password_hash column to users table
ALTER TABLE users
ADD COLUMN IF NOT EXISTS password_hash VARCHAR(255) NOT NULL DEFAULT '';
-- Update column names if needed
ALTER TABLE users RENAME COLUMN password TO password_hash;
-- Add metadata column to instances table
ALTER TABLE instances
ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';

View file

@ -1,22 +0,0 @@
[package]
name = "gb-migrations"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[[bin]]
name = "migrations"
path = "src/bin/migrations.rs"
[dependencies]
tokio= { workspace = true }
sqlx= { workspace = true }
tracing= { workspace = true }
uuid= { workspace = true }
chrono= { workspace = true }
serde_json= { workspace = true }
gb-core = { path = "../gb-core" }
[dev-dependencies]
rstest= { workspace = true }

View file

@ -1,19 +0,0 @@
use sqlx::PgPool;
use gb_migrations::run_migrations;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
println!("Creating database connection pool...");
let pool = PgPool::connect(&database_url)
.await
.expect("Failed to create pool");
println!("Running migrations...");
run_migrations(&pool).await?;
println!("Migrations completed successfully!");
Ok(())
}

View file

@ -1,144 +0,0 @@
use sqlx::PgPool;
use tracing::info;
pub async fn run_migrations(pool: &PgPool) -> Result<(), sqlx::Error> {
info!("Running database migrations");
// Create tables
let table_queries = [
// Customers table
r#"CREATE TABLE IF NOT EXISTS customers (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
subscription_tier VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
max_instances INTEGER NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
// Instances table
r#"CREATE TABLE IF NOT EXISTS instances (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL REFERENCES customers(id),
name VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
shard_id INTEGER NOT NULL,
region VARCHAR(50) NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
// Rooms table
r#"CREATE TABLE IF NOT EXISTS rooms (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL REFERENCES customers(id),
instance_id UUID NOT NULL REFERENCES instances(id),
name VARCHAR(255) NOT NULL,
kind VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
// Messages table
r#"CREATE TABLE IF NOT EXISTS messages (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL REFERENCES customers(id),
instance_id UUID NOT NULL REFERENCES instances(id),
conversation_id UUID NOT NULL,
sender_id UUID NOT NULL,
kind VARCHAR(50) NOT NULL,
content TEXT NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
shard_key INTEGER NOT NULL
)"#,
// Users table
r#"CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY,
customer_id UUID NOT NULL REFERENCES customers(id),
instance_id UUID NOT NULL REFERENCES instances(id),
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
status VARCHAR(50) NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
// Tracks table
r#"CREATE TABLE IF NOT EXISTS tracks (
id UUID PRIMARY KEY,
room_id UUID NOT NULL REFERENCES rooms(id),
user_id UUID NOT NULL REFERENCES users(id),
kind VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
// Subscriptions table
r#"CREATE TABLE IF NOT EXISTS subscriptions (
id UUID PRIMARY KEY,
track_id UUID NOT NULL REFERENCES tracks(id),
user_id UUID NOT NULL REFERENCES users(id),
status VARCHAR(50) NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)"#,
];
// Create indexes
let index_queries = [
"CREATE INDEX IF NOT EXISTS idx_instances_customer_id ON instances(customer_id)",
"CREATE INDEX IF NOT EXISTS idx_rooms_instance_id ON rooms(instance_id)",
"CREATE INDEX IF NOT EXISTS idx_messages_conversation_id ON messages(conversation_id)",
"CREATE INDEX IF NOT EXISTS idx_messages_shard_key ON messages(shard_key)",
"CREATE INDEX IF NOT EXISTS idx_tracks_room_id ON tracks(room_id)",
"CREATE INDEX IF NOT EXISTS idx_subscriptions_track_id ON subscriptions(track_id)",
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
];
// Execute table creation queries
for query in table_queries {
sqlx::query(query)
.execute(pool)
.await?;
}
// Execute index creation queries
for query in index_queries {
sqlx::query(query)
.execute(pool)
.await?;
}
info!("Migrations completed successfully");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::postgres::{PgPoolOptions, PgPool};
use rstest::*;
async fn create_test_pool() -> PgPool {
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/gb_test".to_string());
PgPoolOptions::new()
.max_connections(5)
.connect(&database_url)
.await
.expect("Failed to create test pool")
}
#[rstest]
#[tokio::test]
async fn test_migrations() {
let pool = create_test_pool().await;
assert!(run_migrations(&pool).await.is_ok());
}
}

View file

@ -1,21 +0,0 @@
[package]
name = "gb-monitoring"
version= { workspace = true }
edition= { workspace = true }
[dependencies]
opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.12", features = ["tonic"] }
tracing= { workspace = true }
tracing-subscriber= { workspace = true }
thiserror= { workspace = true }
prometheus= { workspace = true }
gb-core = { path = "../gb-core" }
lazy_static = "1.4"
tokio= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
[dev-dependencies]
rstest= { workspace = true }
tokio-test = "0.4"

View file

@ -1,40 +0,0 @@
mod logging;
mod metrics;
mod telemetry;
pub use logging::init_logging;
pub use metrics::Metrics;
pub use telemetry::Telemetry;
#[cfg(test)]
mod tests {
use super::*;
use tracing::info;
#[tokio::test]
async fn test_monitoring_integration() {
// Initialize logging
init_logging("gb").unwrap();
// Initialize metrics
let metrics = Metrics::new();
// Initialize telemetry
Telemetry::new("test-service").await.unwrap();
// Test logging with metrics
info!(
active_connections = metrics.active_connections.get() as i64,
"System initialized"
);
// Simulate some activity
metrics.set_active_connections(1);
metrics.increment_message_count();
metrics.observe_processing_time(0.1);
// Verify metrics
assert_eq!(metrics.active_connections.get(), 1);
}
}

View file

@ -1,40 +0,0 @@
use tracing::subscriber::set_global_default;
use tracing_subscriber::{
fmt::{format::FmtSpan, time},
EnvFilter,
layer::SubscriberExt,
Registry,
};
pub fn init_logging(_service_name: &str) -> Result<(), Box<dyn std::error::Error>> {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
let formatting_layer = tracing_subscriber::fmt::layer()
.with_timer(time::time())
.with_target(true)
.with_thread_ids(true)
.with_span_events(FmtSpan::CLOSE)
.with_file(true)
.with_line_number(true);
let subscriber = Registry::default()
.with(env_filter)
.with(formatting_layer);
set_global_default(subscriber)?; // Use ? instead of expect
Ok(())
}
#[cfg(test)]
mod tests {
use tracing::info;
#[test]
fn test_logging_initialization() {
// TODO: init_logging("gb").Result; // Just call the function
info!("Test log message");
// Add assertions to verify the log was actually written if needed
}
}

View file

@ -1,82 +0,0 @@
use prometheus::{IntCounter, IntGauge, Histogram, Registry};
pub struct Metrics {
registry: Registry,
message_counter: IntCounter,
pub active_connections: IntGauge,
message_processing_time: Histogram,
}
impl Metrics {
pub fn new() -> Self {
let registry = Registry::new();
let message_counter = IntCounter::new(
"message_total",
"Total number of messages processed"
).unwrap();
let active_connections = IntGauge::new(
"active_connections",
"Number of active connections"
).unwrap();
let message_processing_time = Histogram::with_opts(
prometheus::HistogramOpts::new(
"message_processing_seconds",
"Time spent processing messages"
).buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0])
).unwrap();
registry.register(Box::new(message_counter.clone())).unwrap();
registry.register(Box::new(active_connections.clone())).unwrap();
registry.register(Box::new(message_processing_time.clone())).unwrap();
Self {
registry,
message_counter,
active_connections,
message_processing_time,
}
}
pub fn increment_message_count(&self) {
self.message_counter.inc();
}
pub fn observe_processing_time(&self, duration_seconds: f64) {
self.message_processing_time.observe(duration_seconds);
}
pub fn set_active_connections(&self, count: i64) {
self.active_connections.set(count);
}
pub fn registry(&self) -> &Registry {
&self.registry
}
}
#[cfg(test)]
mod tests {
use prometheus::Encoder as _;
use super::*;
#[test]
fn test_metrics() {
let metrics = Metrics::new();
metrics.increment_message_count();
assert_eq!(metrics.message_counter.get(), 1);
metrics.set_active_connections(10);
assert_eq!(metrics.active_connections.get(), 10);
metrics.observe_processing_time(0.5);
let mut buffer = Vec::new();
let encoder = prometheus::TextEncoder::new();
encoder.encode(&metrics.registry().gather(), &mut buffer).unwrap();
assert!(!buffer.is_empty());
}
}

View file

@ -1,65 +0,0 @@
use opentelemetry::{
sdk::{trace, Resource},
runtime::Tokio,
KeyValue,
};
use opentelemetry_otlp::{Protocol, WithExportConfig};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TelemetryError {
#[error("Failed to initialize tracer: {0}")]
Init(String),
}
#[allow(dead_code)]
pub struct Telemetry {
tracer: trace::Tracer
}
impl Telemetry {
pub async fn new(service_name: &str) -> Result<Self, TelemetryError> {
let tracer = Self::init_tracer(service_name)
.await
.map_err(|e| TelemetryError::Init(e.to_string()))?;
Ok(Self { tracer })
}
async fn init_tracer(service_name: &str) -> Result<trace::Tracer, TelemetryError> {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(Protocol::Grpc);
let resource = Resource::new(vec![
KeyValue::new("service.name", service_name.to_string()),
]);
let config = trace::config().with_resource(resource);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(config)
.install_batch(Tokio)
.map_err(|e| TelemetryError::Init(e.to_string()))?;
Ok(tracer)
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
opentelemetry::global::shutdown_tracer_provider();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_telemetry_creation() {
let telemetry = Telemetry::new("test-service").await;
assert!(telemetry.is_ok());
}
}

View file

@ -1,37 +0,0 @@
[package]
name = "gb-server"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
gb-messaging = { path = "../gb-messaging" }
gb-monitoring = { path = "../gb-monitoring" }
gb-file = { path = "../gb-file" }
tokio = { workspace = true, features = ["full", "macros", "rt-multi-thread"] }
axum = { workspace = true, features = ["ws", "multipart", "macros"] }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
async-trait = { workspace = true }
futures-util = { workspace = true, features = ["sink"] } # Now valid, as futures-util is in workspace.dependencies
chrono = { workspace = true, features = ["serde"] }
tokio-stream = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "chrono", "uuid"] }
redis = { workspace = true, features = ["tokio-comp"] }
hyper = { workspace = true, features = ["server"] }
hyper-util = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace"] }
actix-web = { workspace = true }
dotenv = { workspace = true }
log = { workspace = true }
vector = "0.4.1"
[dev-dependencies]
rstest = { workspace = true }
tokio-test = { workspace = true }

View file

View file

@ -1,72 +0,0 @@
use log::{info, LevelFilter};
use tokio::net::TcpStream;
use actix_web::{middleware, web, App, HttpServer};
use gb_core::models;
use tracing_subscriber::fmt::format::FmtSpan;
use dotenv::dotenv;
use gb_core::config::AppConfig;
use gb_core::db::{init_minio, init_postgres};
use gb_file::handlers::upload_file;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
// Initialize tracing
tracing_subscriber::fmt()
.with_span_events(FmtSpan::CLOSE)
.init();
// Configure the logger
// log::set_logger(&VectorLogger { stream: TcpStream::connect("127.0.0.1:9000").await? })
// .map_err(|_| "Couldn't set logger")?;
// log::set_max_level(LevelFilter::Info);
// Get the Vector agent's address and port
let vector_host = "127.0.0.1";
let vector_port = 9000;
// // Start a Vector logger
// let mut vector_logger = VectorLogger::new(vector_host, vector_port).await?;
// // Set the logger
// log::set_logger(&vector_logger).map_err(|_| "Couldn't set logger")?;
// log::set_max_level(LevelFilter::Info);
// Log some messages
info!("Hello from Rust!");
// Load configuration
let config = AppConfig::from_env();
// TODO: /gbo/bin/storage$ ./minio server ../../data/storage/
// Initialize databases and services
let db_pool = init_postgres(&config).await.expect("Failed to connect to PostgreSQL");
let minio_client = init_minio(&config).await.expect("Failed to initialize Minio");
let app_state = web::Data::new(models::AppState {
config: Some(config.clone()),
db_pool: Some(db_pool),
minio_client: Some(minio_client),
});
// Start HTTP server
HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())
.app_data(app_state.clone())
.service(upload_file)
})
.bind((config.server.host.clone(), config.server.port))?
.run()
.await
}

View file

@ -1,24 +0,0 @@
[package]
name = "gb-storage"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
[dependencies]
gb-core = { path = "../gb-core" }
tokio= { workspace = true }
sqlx= { workspace = true }
redis= { workspace = true }
tracing= { workspace = true }
async-trait= { workspace = true }
serde= { workspace = true }
serde_json= { workspace = true }
uuid= { workspace = true }
chrono= { workspace = true }
tikv-client = "0.1"
[dev-dependencies]
rstest= { workspace = true }
mockall= { workspace = true }
tokio-test = "0.4"

View file

@ -1,7 +0,0 @@
mod postgres;
mod redis;
mod tikv;
pub use postgres::PostgresCustomerRepository;
pub use redis::RedisStorage;
pub use tikv::TiKVStorage;

View file

@ -1,21 +0,0 @@
// or wherever SubscriptionTier is defined
impl From<SubscriptionTier> for String {
fn from(tier: SubscriptionTier) -> Self {
match tier {
SubscriptionTier::Free => "free".to_string(),
SubscriptionTier::Pro => "pro".to_string(),
SubscriptionTier::Enterprise => "enterprise".to_string(),
}
}
}
impl From<CustomerStatus> for String {
fn from(status: CustomerStatus) -> Self {
match status {
CustomerStatus::Active => "active".to_string(),
CustomerStatus::Inactive => "inactive".to_string(),
CustomerStatus::Suspended => "suspended".to_string(),
}
}
}

View file

@ -1,229 +0,0 @@
use gb_core::{
Result, Error,
models::{Customer, CustomerStatus, SubscriptionTier},
};
use sqlx::{PgPool, Row, postgres::PgRow};
use std::sync::Arc;
use chrono::Utc;
#[allow(dead_code)]
#[async_trait::async_trait]
pub trait CustomerRepository: Send + Sync {
async fn create(&self, customer: Customer) -> Result<Customer>;
async fn get_customer_by_id(&self, id: &str) -> Result<Option<Customer>>;
async fn update(&self, customer: Customer) -> Result<Customer>;
async fn delete(&self, id: &str) -> Result<()>;
}
trait ToDbString {
fn to_db_string(&self) -> String;
}
trait FromDbString: Sized {
fn from_db_string(s: &str) -> Result<Self>;
}
impl ToDbString for SubscriptionTier {
fn to_db_string(&self) -> String {
match self {
SubscriptionTier::Free => "free".to_string(),
SubscriptionTier::Pro => "pro".to_string(),
SubscriptionTier::Enterprise => "enterprise".to_string(),
}
}
}
impl ToDbString for CustomerStatus {
fn to_db_string(&self) -> String {
match self {
CustomerStatus::Active => "active".to_string(),
CustomerStatus::Inactive => "inactive".to_string(),
CustomerStatus::Suspended => "suspended".to_string(),
}
}
}
impl FromDbString for SubscriptionTier {
fn from_db_string(s: &str) -> Result<Self> {
match s {
"free" => Ok(SubscriptionTier::Free),
"pro" => Ok(SubscriptionTier::Pro),
"enterprise" => Ok(SubscriptionTier::Enterprise),
_ => Err(Error::internal(format!("Invalid subscription tier: {}", s))),
}
}
}
impl FromDbString for CustomerStatus {
fn from_db_string(s: &str) -> Result<Self> {
match s {
"active" => Ok(CustomerStatus::Active),
"inactive" => Ok(CustomerStatus::Inactive),
"suspended" => Ok(CustomerStatus::Suspended),
_ => Err(Error::internal(format!("Invalid customer status: {}", s))),
}
}
}
pub struct PostgresCustomerRepository {
pool: Arc<PgPool>,
}
#[async_trait::async_trait]
impl CustomerRepository for PostgresCustomerRepository {
async fn create(&self, customer: Customer) -> Result<Customer> {
let subscription_tier = customer.subscription_tier.to_db_string();
let status = customer.status.to_db_string();
let row = sqlx::query(
r#"
INSERT INTO customers (
id, name, email,
subscription_tier, status,
created_at, updated_at,
max_instances
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *
"#
)
.bind(&customer.id)
.bind(&customer.name)
.bind(&customer.email)
.bind(&subscription_tier)
.bind(&status)
.bind(&customer.created_at)
.bind(&customer.updated_at)
.bind(customer.max_instances as i32)
.fetch_one(&*self.pool)
.await
.map_err(|e| Error::internal(format!("Database error: {}", e)))?;
Self::row_to_customer(&row).await
}
async fn get_customer_by_id(&self, id: &str) -> Result<Option<Customer>> {
let maybe_row = sqlx::query(
"SELECT * FROM customers WHERE id = $1"
)
.bind(id)
.fetch_optional(&*self.pool)
.await
.map_err(|e| Error::internal(format!("Database error: {}", e)))?;
if let Some(row) = maybe_row {
Ok(Some(Self::row_to_customer(&row).await?))
} else {
Ok(None)
}
}
async fn update(&self, customer: Customer) -> Result<Customer> {
let subscription_tier = customer.subscription_tier.to_db_string();
let status = customer.status.to_db_string();
let row = sqlx::query(
r#"
UPDATE customers
SET name = $2,
email = $3,
subscription_tier = $4,
status = $5,
updated_at = $6,
max_instances = $7
WHERE id = $1
RETURNING *
"#
)
.bind(&customer.id)
.bind(&customer.name)
.bind(&customer.email)
.bind(&subscription_tier)
.bind(&status)
.bind(Utc::now())
.bind(customer.max_instances as i32)
.fetch_one(&*self.pool)
.await
.map_err(|e| Error::internal(format!("Database error: {}", e)))?;
Self::row_to_customer(&row).await
}
async fn delete(&self, id: &str) -> Result<()> {
sqlx::query("DELETE FROM customers WHERE id = $1")
.bind(id)
.execute(&*self.pool)
.await
.map_err(|e| Error::internal(format!("Database error: {}", e)))?;
Ok(())
}
}
impl PostgresCustomerRepository {
pub fn new(pool: Arc<PgPool>) -> Self {
Self { pool }
}
async fn row_to_customer(row: &PgRow) -> Result<Customer> {
Ok(Customer {
id: row.try_get("id").map_err(|e| Error::internal(e.to_string()))?,
name: row.try_get("name").map_err(|e| Error::internal(e.to_string()))?,
email: row.try_get("email").map_err(|e| Error::internal(e.to_string()))?,
subscription_tier: SubscriptionTier::from_db_string(
row.try_get("subscription_tier").map_err(|e| Error::internal(e.to_string()))?
)?,
status: CustomerStatus::from_db_string(
row.try_get("status").map_err(|e| Error::internal(e.to_string()))?
)?,
created_at: row.try_get("created_at").map_err(|e| Error::internal(e.to_string()))?,
updated_at: row.try_get("updated_at").map_err(|e| Error::internal(e.to_string()))?,
max_instances: {
let value: i32 = row.try_get("max_instances")
.map_err(|e| Error::internal(e.to_string()))?;
if value < 0 {
return Err(Error::internal("max_instances cannot be negative"));
}
value as u32
},
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
#[allow(dead_code)]
fn create_test_customer() -> Customer {
Customer {
id: Uuid::new_v4(),
name: "Test Customer".to_string(),
email: "test@example.com".to_string(),
subscription_tier: SubscriptionTier::Free,
status: CustomerStatus::Active,
created_at: Utc::now(),
updated_at: Utc::now(),
max_instances: 1,
}
}
// Add your tests here
// Example:
/*
#[sqlx::test]
async fn test_create_customer() {
let pool = setup_test_db().await;
let repo = PostgresCustomerRepository::new(Arc::new(pool));
let customer = create_test_customer();
let created = repo.create(customer.clone()).await.unwrap();
assert_eq!(created.id, customer.id);
assert_eq!(created.name, customer.name);
// ... more assertions
}
*/
}

View file

@ -1,79 +0,0 @@
use gb_core::{Result, Error};
use redis::{Client, Commands};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use tracing::instrument;
pub struct RedisStorage {
client: Client,
}
impl RedisStorage {
pub fn new(url: &str) -> Result<Self> {
let client = Client::open(url)
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
Ok(Self { client })
}
#[allow(dependency_on_unit_never_type_fallback)]
#[instrument(skip(self))]
pub async fn set<T: Serialize + std::fmt::Debug>(&self, key: &str, value: &T) -> Result<()> {
let mut conn = self.client.get_connection()
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
let serialized = serde_json::to_string(value)
.map_err(|e| Error::internal(format!("Serialization error: {}", e)))?;
conn.set(key, serialized)
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
Ok(())
}
#[instrument(skip(self))]
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
let mut conn = self.client.get_connection()
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
let value: Option<String> = conn.get(key)
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
match value {
Some(v) => {
let deserialized = serde_json::from_str(&v)
.map_err(|e| Error::internal(format!("Deserialization error: {}", e)))?;
Ok(Some(deserialized))
}
None => Ok(None)
}
}
#[instrument(skip(self))]
pub async fn delete(&self, key: &str) -> Result<bool> {
let mut conn = self.client.get_connection()
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
conn.del(key)
.map_err(|e| Error::internal(format!("Redis error: {}", e)))
}
#[allow(dependency_on_unit_never_type_fallback)]
#[instrument(skip(self))]
pub async fn set_with_ttl<T: Serialize + std::fmt::Debug>(&self, key: &str, value: &T, ttl: Duration) -> Result<()> {
let mut conn = self.client.get_connection()
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
let serialized = serde_json::to_string(value)
.map_err(|e| Error::internal(format!("Serialization error: {}", e)))?;
redis::pipe()
.set(key, serialized)
.expire(key, ttl.as_secs() as i64)
.query(&mut conn)
.map_err(|e| Error::internal(format!("Redis error: {}", e)))?;
Ok(())
}
}

View file

@ -1,73 +0,0 @@
use gb_core::{Result, Error};
use tikv_client::{RawClient, Config, KvPair};
use tracing::{error, instrument};
pub struct TiKVStorage {
client: RawClient,
}
impl TiKVStorage {
pub async fn new(pd_endpoints: Vec<String>) -> Result<Self> {
let _config = Config::default();
let client = RawClient::new(pd_endpoints)
.await
.map_err(|e| Error::internal(format!("TiKV error: {}", e)))?;
Ok(Self { client })
}
#[instrument(skip(self))]
pub async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.client
.get(key.to_vec())
.await
.map_err(|e| {
error!("TiKV get error: {}", e);
Error::internal(format!("TiKV error: {}", e))
})
}
#[instrument(skip(self))]
pub async fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.client
.put(key.to_vec(), value.to_vec())
.await
.map_err(|e| {
error!("TiKV put error: {}", e);
Error::internal(format!("TiKV error: {}", e))
})
}
#[instrument(skip(self))]
pub async fn delete(&self, key: &[u8]) -> Result<()> {
self.client
.delete(key.to_vec())
.await
.map_err(|e| {
error!("TiKV delete error: {}", e);
Error::internal(format!("TiKV error: {}", e))
})
}
#[instrument(skip(self))]
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KvPair>> {
self.client
.batch_get(keys)
.await
.map_err(|e| {
error!("TiKV batch get error: {}", e);
Error::internal(format!("TiKV error: {}", e))
})
}
#[instrument(skip(self))]
pub async fn scan(&self, start: &[u8], end: &[u8], limit: u32) -> Result<Vec<KvPair>> {
self.client
.scan(start.to_vec()..end.to_vec(), limit)
.await
.map_err(|e| {
error!("TiKV scan error: {}", e);
Error::internal(format!("TiKV error: {}", e))
})
}
}

View file

@ -1,76 +0,0 @@
[package]
name = "gb-testing"
version = { workspace = true }
edition = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
# Define features
[features]
default = ["integration"] # No default features
integration = [] # Feature for integration tests
load = [] # Feature for load tests
stress = [] # Feature for stress tests
chaos = [] # Feature for chaos tests
[dependencies]
gb-core = { path = "../gb-core" }
gb-auth = { path = "../gb-auth" }
gb-server = { path = "../gb-server" }
gb-file = { path = "../gb-file" }
anyhow = { workspace = true }
# Testing frameworks
goose = "0.17" # Load testing
criterion = { workspace = true, features = ["async_futures"] }
# Async Runtime
tokio = { workspace = true }
tokio-stream= { workspace = true }
async-trait = { workspace = true }
# HTTP Client
reqwest = { workspace = true, features = ["json", "stream"] }
hyper = { workspace = true, features = ["full"] }
# WebSocket Testing
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
# Database
sqlx = { workspace = true }
redis = { workspace = true }
# Metrics & Monitoring
prometheus = { workspace = true, features = ["process"] }
tracing = { workspace = true }
opentelemetry = { workspace = true }
# Serialization
serde = { workspace = true }
serde_json = { workspace = true }
# Utils
futures = { workspace = true }
rand = { workspace = true }
fake = { workspace = true, features = ["derive"] }
chrono = { workspace = true, features = ["serde"] }
uuid = { workspace = true, features = ["v4"] }
minio = { workspace = true }
actix-web ={ workspace = true }
actix-multipart ={ workspace = true }
sanitize-filename = { workspace = true }
tempfile = { workspace = true }
bytes = { workspace = true }
[dev-dependencies]
rstest = { workspace = true }
wiremock = "0.5"
assert_cmd = "2.0"
predicates = "3.0"
jmap-client = "0.1" # Check for the latest version on crates.io

View file

@ -1 +0,0 @@
cargo test --package gb-testing --test integration_email_list -- test_successful_email_lilst --exact --show-output

View file

@ -1,57 +0,0 @@
load_test:
users: 100
duration: 300 # seconds
ramp_up: 60 # seconds
scenarios:
- auth
- api
- webrtc
performance_test:
iterations: 1000
performance_test:
iterations: 1000
warmup_iterations: 100
sample_size: 100
threads: 8
scenarios:
- api_latency
- database_queries
- media_processing
stress_test:
duration: 1800 # 30 minutes
concurrent_users: 1000
scenarios:
- continuous_requests
- websocket_connections
- file_uploads
chaos_test:
duration: 3600 # 1 hour
interval: 300 # 5 minutes between actions
actions:
- kill_random_pod
- network_partition
- resource_exhaustion
- disk_pressure
metrics:
prometheus:
enabled: true
port: 9090
grafana:
enabled: true
port: 3000
jaeger:
enabled: true
port: 16686
reports:
formats:
- json
- html
- pdf
output_dir: "./test-reports"
retain_days: 30

View file

@ -1,26 +0,0 @@
#!/bin/bash
set -e
echo "Running gb-testing test suite..."
# Run integration tests
echo "Running integration tests..."
cargo test --test '*' --features integration
# Run load tests
echo "Running load tests..."
cargo test --test '*' --features load
# Run performance benchmarks
#echo "Running performance benchmarks..."
#cargo bench
# Run stress tests
#echo "Running stress tests..."
#cargo test --test '*' --features stress
# Run chaos tests
#echo "Running chaos tests..."
#cargo test --test '*' --features chaos
echo "All tests completed!"

View file

@ -1,20 +0,0 @@
pub struct ChaosTest {
}
impl ChaosTest {
pub async fn new() -> anyhow::Result<Self> {
// Initialize the ChaosTest struct
Ok(ChaosTest { })
}
pub async fn network_partition(&self) -> anyhow::Result<()> {
// Network partition test implementation
Ok(())
}
pub async fn resource_exhaustion(&self) -> anyhow::Result<()> {
// Resource exhaustion test implementation
Ok(())
}
}

View file

@ -1,13 +0,0 @@
use async_trait::async_trait;
use sqlx::PgPool;
pub struct IntegrationTest {
pub db_pool: PgPool,
}
#[async_trait]
pub trait IntegrationTestCase {
async fn setup(&mut self) -> anyhow::Result<()>;
async fn execute(&self) -> anyhow::Result<()>;
async fn teardown(&mut self) -> anyhow::Result<()>;
}

Some files were not shown because too many files have changed in this diff Show more