From f401c170d4913fe794dcbd79a48f88e56ac11eae Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 15 Oct 2025 12:45:15 -0300 Subject: [PATCH] - GET ketyowrd for buckets. --- .vscode/launch.json | 75 ++-- Cargo.lock | 186 +++++++++ Cargo.toml | 1 + prompts/dev/shared.md | 1 + src/basic/keywords/get.rs | 256 ++++++++++--- src/basic/mod.rs | 2 +- src/bot/mod.rs | 2 +- src/config/mod.rs | 4 +- src/file/mod.rs | 67 ++-- src/llm_legacy/llm_local.rs | 4 +- src/main.rs | 357 ++++++++++-------- src/shared/state.rs | 2 +- .../annoucements.gbdialog/start.bas | 6 - 13 files changed, 668 insertions(+), 295 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 56451705..bf24a3b2 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,45 +1,36 @@ { - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "type": "lldb", - "request": "launch", - "name": "Debug executable 'gbserver'", - "cargo": { - "args": [ - "build", - "--bin=gbserver", - "--package=gbserver" - ], - "filter": { - "name": "gbserver", - "kind": "bin" - } - }, - "args": [], - "cwd": "${workspaceFolder}" - }, - { - "type": "lldb", - "request": "launch", - "name": "Debug unit tests in executable 'gbserver'", - "cargo": { - "args": [ - "test", - "--no-run", - "--bin=gbserver", - "--package=gbserver" - ], - "filter": { - "name": "gbserver", - "kind": "bin" - } - }, - "args": [], - "cwd": "${workspaceFolder}" + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'botserver'", + "cargo": { + "args": ["build", "--bin=botserver", "--package=botserver"], + "filter": { + "name": "botserver", + "kind": "bin" } - ] + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'botserver'", + "cargo": { + "args": ["test", "--no-run", "--bin=botserver", "--package=botserver"], + "filter": { + "name": "botserver", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] } diff --git a/Cargo.lock b/Cargo.lock index 788c45e9..3dfd3ef0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,6 +267,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "adobe-cmap-parser" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8abfa9a4688de8fc9f42b3f013b6fffec18ed8a554f5f113577e0b9b3212a3" +dependencies = [ + "pom", +] + [[package]] name = "aead" version = "0.5.2" @@ -979,6 +988,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "bmrng" version = "0.5.2" @@ -1020,6 +1038,7 @@ dependencies = [ "mailparse", "native-tls", "num-format", + "pdf-extract", "qdrant-client", "rand 0.9.2", "redis", @@ -1073,6 +1092,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "bytecount" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" + [[package]] name = "byteorder" version = "1.5.0" @@ -1133,6 +1158,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.41" @@ -1160,6 +1194,12 @@ dependencies = [ "nom 7.1.3", ] +[[package]] +name = "cff-parser" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f5b6e9141c036f3ff4ce7b2f7e432b0f00dee416ddcd4f17741d189ddc2e9d" + [[package]] name = "cfg-if" version = "1.0.3" @@ -1820,6 +1860,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "ecb" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a8bfa975b1aec2145850fcaa1c6fe269a16578c44705a532ae3edc92b8881c7" +dependencies = [ + "cipher", +] + [[package]] name = "ecdsa" version = "0.14.8" @@ -1928,6 +1977,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "euclid" +version = "0.20.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb7ef65b3777a325d1eeefefab5b6d4959da54747e33bd6258e789640f307ad" +dependencies = [ + "num-traits", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2714,6 +2772,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ + "block-padding", "generic-array", ] @@ -3086,6 +3145,34 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lopdf" +version = "0.38.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7184fdea2bc3cd272a1acec4030c321a8f9875e877b3f92a53f2f6033fdc289" +dependencies = [ + "aes", + "bitflags 2.9.4", + "cbc", + "ecb", + "encoding_rs", + "flate2", + "getrandom 0.3.3", + "indexmap 2.11.4", + "itoa", + "log", + "md-5", + "nom 8.0.0", + "nom_locate", + "rand 0.9.2", + "rangemap", + "sha2", + "stringprep", + "thiserror 2.0.17", + "ttf-parser", + "weezl", +] + [[package]] name = "lru" version = "0.12.5" @@ -3231,6 +3318,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "nom_locate" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b577e2d69827c4740cba2b52efaad1c4cc7c73042860b199710b3575c68438d" +dependencies = [ + "bytecount", + "memchr", + "nom 8.0.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3509,6 +3607,23 @@ dependencies = [ "hmac", ] +[[package]] +name = "pdf-extract" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28ba1758a3d3f361459645780e09570b573fc3c82637449e9963174c813a98" +dependencies = [ + "adobe-cmap-parser", + "cff-parser", + "encoding_rs", + "euclid", + "log", + "lopdf", + "postscript", + "type1-encoding-parser", + "unicode-normalization", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -3585,6 +3700,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "pom" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60f6ce597ecdcc9a098e7fddacb1065093a3d66446fa16c675e7e71d1b5c28e6" + [[package]] name = "portable-atomic" version = "1.11.1" @@ -3600,6 +3721,12 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "postscript" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78451badbdaebaf17f053fd9152b3ffb33b516104eacb45e7864aaa9c712f306" + [[package]] name = "potential_utf" version = "0.1.3" @@ -3917,6 +4044,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rangemap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223" + [[package]] name = "redis" version = "0.27.6" @@ -4590,6 +4723,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" @@ -5083,6 +5227,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "ttf-parser" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31" + [[package]] name = "tungstenite" version = "0.20.1" @@ -5119,18 +5269,48 @@ dependencies = [ "utf-8", ] +[[package]] +name = "type1-encoding-parser" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3d6cc09e1a99c7e01f2afe4953789311a1c50baebbdac5b477ecf78e2e92a5b" +dependencies = [ + "pom", +] + [[package]] name = "typenum" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-width" version = "0.2.2" @@ -5443,6 +5623,12 @@ dependencies = [ "zip 0.6.6", ] +[[package]] +name = "weezl" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a751b3277700db47d3e574514de2eced5e54dc8a5436a3bf7a0b248b2cee16f3" + [[package]] name = "which" version = "8.0.0" diff --git a/Cargo.toml b/Cargo.toml index e57b074c..16e2c4e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,3 +59,4 @@ time = "0.3.44" aws-sdk-s3 = "1.108.0" headless_chrome = { version = "1.0.18", optional = true } rand = "0.9.2" +pdf-extract = "0.10.0" diff --git a/prompts/dev/shared.md b/prompts/dev/shared.md index 2984d187..a94fee75 100644 --- a/prompts/dev/shared.md +++ b/prompts/dev/shared.md @@ -2,6 +2,7 @@ MOST IMPORTANT CODE GENERATION RULES: - No placeholders, never comment/uncomment code, no explanations, no filler text. - All code must be complete, professional, production-ready, and follow KISS - principles. - NEVER return placeholders of any kind, neither commented code, only REAL PRODUCTION GRADE code. +- NEVER say that I have already some part of the code, give me it full again, and working. - Always increment logging with (all-in-one-line) info!, debug!, trace! to give birth to the console. - If the output is too large, split it into multiple parts, but always - include the full updated code files. - Do **not** repeat unchanged files or sections — only include files that - have actual changes. diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index 021fd4d0..383d6a6a 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -1,58 +1,147 @@ use crate::shared::models::UserSession; use crate::shared::state::AppState; -use log::info; +use log::{error, info}; use reqwest::{self, Client}; use rhai::{Dynamic, Engine}; use std::error::Error; +use std::path::Path; +use std::sync::Arc; -pub fn get_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) { - let state_clone = state.clone(); +pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine) { + let state_clone = Arc::clone(&state); engine .register_custom_syntax(&["GET", "$expr$"], false, move |context, inputs| { + // Evaluate the URL expression let url = context.eval_expression_tree(&inputs[0])?; let url_str = url.to_string(); - if url_str.contains("..") { - return Err("URL contains invalid path traversal sequences like '..'.".into()); + info!("GET command executed: {}", url_str); + + // Enhanced security check for path traversal + if !is_safe_path(&url_str) { + return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "URL contains invalid or unsafe path sequences".into(), + rhai::Position::NONE, + ))); } - let state_for_async = state_clone.clone(); + let state_for_async = Arc::clone(&state_clone); let url_for_async = url_str.clone(); - if url_str.starts_with("https://") { - info!("HTTPS GET request: {}", url_for_async); + // Create a channel to communicate the result back + let (tx, rx) = tokio::sync::oneshot::channel(); - let fut = execute_get(&url_for_async); - let result = - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) - .map_err(|e| format!("HTTP request failed: {}", e))?; + // Spawn the async task without blocking + tokio::spawn(async move { + log::trace!("Async task started for GET operation: {}", url_for_async); - Ok(Dynamic::from(result)) - } else { - info!("Local file GET request from bucket: {}", url_for_async); + let result = if url_for_async.starts_with("https://") + || url_for_async.starts_with("http://") + { + info!("HTTP(S) GET request: {}", url_for_async); + execute_get(&url_for_async).await + } else { + info!("Local file GET request from bucket: {}", url_for_async); + get_from_bucket(&state_for_async, &url_for_async).await + }; - let fut = get_from_bucket(&state_for_async, &url_for_async); - let result = - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) - .map_err(|e| format!("Bucket GET failed: {}", e))?; + // Send the result back through the channel + let _ = tx.send(result); + }); - Ok(Dynamic::from(result)) - } + // Block on receiving the result from the channel. + // This is acceptable because we're in a custom syntax handler. + let result = match futures::executor::block_on(rx) { + Ok(inner) => inner.map_err(|e| { + Box::new(rhai::EvalAltResult::ErrorRuntime( + e.to_string().into(), + rhai::Position::NONE, + )) + })?, + Err(_) => { + return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "Failed to receive result from async task".into(), + rhai::Position::NONE, + ))); + } + }; + + Ok(Dynamic::from(result)) }) .unwrap(); } +/// Enhanced security check for path traversal and unsafe paths +fn is_safe_path(path: &str) -> bool { + // Allow full URLs + if path.starts_with("https://") || path.starts_with("http://") { + return true; + } + + // Check for various path traversal patterns + if path.contains("..") { + return false; + } + + // Reject absolute paths (starting with /) + if path.starts_with('/') { + return false; + } + + // Reject Windows-style absolute paths + if path.len() >= 2 && path.chars().nth(1) == Some(':') { + return false; + } + + // Normalize and validate the path doesn't escape + if let Ok(normalized) = Path::new(path).canonicalize() { + // If canonicalize succeeds, verify it doesn't contain parent directory references + if normalized.to_string_lossy().contains("..") { + return false; + } + } + + true +} + pub async fn execute_get(url: &str) -> Result> { - info!("Starting execute_get with URL: {}", url); + log::trace!("Starting execute_get with URL: {}", url); + // Build secure HTTP client (removed danger_accept_invalid_certs) let client = Client::builder() - .danger_accept_invalid_certs(true) - .build()?; + .timeout(std::time::Duration::from_secs(30)) + .build() + .map_err(|e| { + error!("Failed to build HTTP client: {}", e); + e + })?; - let response = client.get(url).send().await?; - let content = response.text().await?; + let response = client.get(url).send().await.map_err(|e| { + error!("HTTP request failed for URL {}: {}", url, e); + e + })?; + // Check response status + if !response.status().is_success() { + let status = response.status(); + error!( + "HTTP request returned non-success status for URL {}: {}", + url, status + ); + return Err(format!("HTTP request failed with status: {}", status).into()); + } + + let content = response.text().await.map_err(|e| { + error!("Failed to read response text for URL {}: {}", url, e); + e + })?; + + log::trace!( + "Successfully executed GET request for URL: {}, content length: {}", + url, + content.len() + ); Ok(content) } @@ -60,25 +149,102 @@ pub async fn get_from_bucket( state: &AppState, file_path: &str, ) -> Result> { - info!("Getting file from bucket: {}", file_path); + log::trace!("Getting file from bucket: {}", file_path); - if let Some(s3_client) = &state.s3_client { - let bucket_name = std::env::var("DRIVE_ORG_PREFIX") - .map(|v| format!("{}-default", v)) - .unwrap_or_else(|_| "org-default".to_string()); - - let response = s3_client - .get_object() - .bucket(&bucket_name) - .key(file_path) - .send() - .await?; - - let data = response.body.collect().await?; - let content = String::from_utf8(data.into_bytes().to_vec())?; - - Ok(content) - } else { - Err("S3 client not configured".into()) + // Additional validation for file path + if !is_safe_path(file_path) { + error!("Unsafe file path detected: {}", file_path); + return Err("Invalid file path".into()); } + + // Ensure the S3 client is configured + let s3_client = match &state.s3_client { + Some(client) => client, + None => { + error!( + "S3 client not configured when trying to get file: {}", + file_path + ); + return Err("S3 client not configured".into()); + } + }; + + // Resolve the bucket name safely, handling missing configuration values + let bucket_name = { + let cfg = state + .config + .as_ref() + .ok_or_else(|| -> Box { + "App configuration missing".into() + })?; + + let org_prefix = &cfg.minio.org_prefix; + + // Validate org_prefix doesn't contain suspicious characters + if org_prefix.contains("..") || org_prefix.contains('/') { + error!("Invalid org_prefix in configuration: {}", org_prefix); + return Err("Invalid organization prefix in configuration".into()); + } + + format!("{}.default.gbai", org_prefix) + }; + + log::trace!("Using bucket: {} for file: {}", bucket_name, file_path); + + // Perform the S3 GetObject request + let response = s3_client + .get_object() + .bucket(&bucket_name) + .key(file_path) + .send() + .await + .map_err(|e| { + error!( + "S3 get_object failed for bucket {} key {}: {}", + bucket_name, file_path, e + ); + e + })?; + + // Collect the body bytes + let data = response.body.collect().await.map_err(|e| { + error!( + "Failed to collect S3 response body for bucket {} key {}: {}", + bucket_name, file_path, e + ); + e + })?; + + // Handle PDF files specially; otherwise treat as UTF‑8 text + let bytes = data.into_bytes().to_vec(); + + let content = if file_path.to_ascii_lowercase().ends_with(".pdf") { + // Extract text from PDF using the `pdf_extract` crate + match pdf_extract::extract_text_from_mem(&bytes) { + Ok(text) => text, + Err(e) => { + error!( + "Failed to extract text from PDF for bucket {} key {}: {}", + bucket_name, file_path, e + ); + return Err(format!("PDF extraction failed: {}", e).into()); + } + } + } else { + // Convert bytes to a UTF‑8 String + String::from_utf8(bytes).map_err(|e| { + error!( + "Failed to convert S3 response to UTF-8 for bucket {} key {}: {}", + bucket_name, file_path, e + ); + e + })? + }; + + log::trace!( + "Successfully retrieved file from bucket: {}, content length: {}", + file_path, + content.len() + ); + Ok(content) } diff --git a/src/basic/mod.rs b/src/basic/mod.rs index c9ac133b..bd75f450 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -52,7 +52,7 @@ impl ScriptService { last_keyword(&mut engine); format_keyword(&mut engine); llm_keyword(&state, user.clone(), &mut engine); - get_keyword(&state, user.clone(), &mut engine); + get_keyword(state.clone(), user.clone(), &mut engine); set_keyword(&state, user.clone(), &mut engine); wait_keyword(&state, user.clone(), &mut engine); print_keyword(&state, user.clone(), &mut engine); diff --git a/src/bot/mod.rs b/src/bot/mod.rs index fb3f67e3..12964648 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -862,7 +862,7 @@ async fn auth_handler( data: web::Data, web::Query(params): web::Query>, ) -> Result { - let token = params.get("token").cloned().unwrap_or_default(); + let _token = params.get("token").cloned().unwrap_or_default(); let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(); let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { match Uuid::parse_str(&bot_guid) { diff --git a/src/config/mod.rs b/src/config/mod.rs index 705c4973..7f6892b6 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -27,7 +27,7 @@ pub struct DriveConfig { pub access_key: String, pub secret_key: String, pub use_ssl: bool, - pub bucket: String, + pub org_prefix: String, } #[derive(Clone)] @@ -107,7 +107,7 @@ impl AppConfig { .unwrap_or_else(|_| "false".to_string()) .parse() .unwrap_or(false), - bucket: env::var("DRIVE_ORG_PREFIX").unwrap_or_else(|_| "botserver".to_string()), + org_prefix: env::var("DRIVE_ORG_PREFIX").unwrap_or_else(|_| "botserver".to_string()), }; let email = EmailConfig { diff --git a/src/file/mod.rs b/src/file/mod.rs index f90c910d..c7120b9b 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -6,6 +6,7 @@ use std::io::Write; use tempfile::NamedTempFile; use tokio_stream::StreamExt as TokioStreamExt; +use crate::config::DriveConfig; use crate::shared::state::AppState; #[post("/files/upload/{folder_path}")] @@ -49,7 +50,7 @@ pub async fn upload_file( let temp_file_path = temp_file.into_temp_path(); // Retrieve the bucket name from configuration, handling the case where it is missing - let bucket_name = match &state.config { + let bucket_name = match &state.get_ref().config { Some(cfg) => cfg.s3_bucket.clone(), None => { // Clean up the temp file before returning the error @@ -63,9 +64,13 @@ pub async fn upload_file( // Build the S3 object key (folder + filename) let s3_key = format!("{}/{}", folder_path, file_name); + // Retrieve a reference to the S3 client, handling the case where it is missing + let s3_client = state.get_ref().s3_client.as_ref().ok_or_else(|| { + actix_web::error::ErrorInternalServerError("S3 client is not initialized") + })?; + // Perform the upload - let s3_client = get_s3_client(&state).await; - match upload_to_s3(&s3_client, &bucket_name, &s3_key, &temp_file_path).await { + match upload_to_s3(s3_client, &bucket_name, &s3_key, &temp_file_path).await { Ok(_) => { // Remove the temporary file now that the upload succeeded let _ = std::fs::remove_file(&temp_file_path); @@ -86,33 +91,31 @@ pub async fn upload_file( } // Helper function to get S3 client -async fn get_s3_client(state: &AppState) -> Client { - if let Some(cfg) = &state.config.as_ref().and_then(|c| Some(&c.minio)) { - // Build static credentials from the Drive configuration. - let credentials = aws_sdk_s3::config::Credentials::new( - cfg.access_key.clone(), - cfg.secret_key.clone(), - None, - None, - "static", - ); +pub async fn init_drive(cfg: &DriveConfig) -> Result> { + // Build static credentials from the Drive configuration. + let credentials = aws_sdk_s3::config::Credentials::new( + cfg.access_key.clone(), + cfg.secret_key.clone(), + None, + None, + "static", + ); - // Construct the endpoint URL, respecting the SSL flag. - let scheme = if cfg.use_ssl { "https" } else { "http" }; - let endpoint = format!("{}://{}", scheme, cfg.server); + // Construct the endpoint URL, respecting the SSL flag. + let scheme = if cfg.use_ssl { "https" } else { "http" }; + let endpoint = format!("{}://{}", scheme, cfg.server); - // MinIO requires path‑style addressing. - let s3_config = aws_sdk_s3::config::Builder::new() - .region(aws_sdk_s3::config::Region::new("us-east-1")) - .endpoint_url(endpoint) - .credentials_provider(credentials) - .force_path_style(true) - .build(); + // MinIO requires path‑style addressing. + let s3_config = aws_sdk_s3::config::Builder::new() + // Set the behavior version to the latest to satisfy the SDK requirement. + .behavior_version(aws_sdk_s3::config::BehaviorVersion::latest()) + .region(aws_sdk_s3::config::Region::new("us-east-1")) + .endpoint_url(endpoint) + .credentials_provider(credentials) + .force_path_style(true) + .build(); - Client::from_conf(s3_config) - } else { - panic!("MinIO configuration is missing in application state"); - } + Ok(Client::from_conf(s3_config)) } // Helper function to upload file to S3 @@ -122,15 +125,14 @@ async fn upload_to_s3( key: &str, file_path: &std::path::Path, ) -> Result<(), S3Error> { - // Convert the file at `file_path` into a `ByteStream`. Any I/O error is - // turned into a construction‑failure `SdkError` so that the function’s - // `Result` type (`Result<(), S3Error>`) stays consistent. + // Convert the file at `file_path` into a ByteStream, mapping any I/O error + // into the appropriate `SdkError` type expected by the function signature. let body = aws_sdk_s3::primitives::ByteStream::from_path(file_path) .await .map_err(|e| { aws_sdk_s3::error::SdkError::< aws_sdk_s3::operation::put_object::PutObjectError, - aws_sdk_s3::operation::put_object::PutObjectOutput, + aws_sdk_s3::primitives::ByteStream, >::construction_failure(e) })?; @@ -141,7 +143,8 @@ async fn upload_to_s3( .key(key) .body(body) .send() - .await?; + .await + .map(|_| ())?; // Convert the successful output to `()`. Ok(()) } diff --git a/src/llm_legacy/llm_local.rs b/src/llm_legacy/llm_local.rs index 73e4b7c2..d97fd51e 100644 --- a/src/llm_legacy/llm_local.rs +++ b/src/llm_legacy/llm_local.rs @@ -63,7 +63,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box std::io::Result<()> { - dotenv().ok(); - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); +async fn main() -> std::io::Result<()> {// Load environment variables from a .env file, if present. +dotenv().ok(); +let llama_url = + std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); - let cfg = AppConfig::from_env(); - let config = std::sync::Arc::new(cfg.clone()); +// Initialise logger with environment‑based log level (default to "info"). +env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - let db_pool = match diesel::Connection::establish(&cfg.database_url()) { - Ok(conn) => { - info!("Connected to main database successfully"); - Arc::new(Mutex::new(conn)) - } - Err(e) => { - log::error!("Failed to connect to main database: {}", e); - return Err(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - format!("Database connection failed: {}", e), - )); - } - }; +// Load application configuration. +let cfg = AppConfig::from_env(); +let config = std::sync::Arc::new(cfg.clone()); - let custom_db_url = format!( - "postgres://{}:{}@{}:{}/{}", - cfg.database_custom.username, - cfg.database_custom.password, - cfg.database_custom.server, - cfg.database_custom.port, - cfg.database_custom.database - ); +// ---------------------------------------------------------------------- +// Database connections +// ---------------------------------------------------------------------- +let db_pool = match diesel::Connection::establish(&cfg.database_url()) { + Ok(conn) => { + info!("Connected to main database successfully"); + Arc::new(Mutex::new(conn)) + } + Err(e) => { + log::error!("Failed to connect to main database: {}", e); + return Err(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + format!("Database connection failed: {}", e), + )); + } +}; - let db_custom_pool = db_pool.clone(); - // match diesel::Connection::establish(&custom_db_url) { - // Ok(conn) => { - // info!("Connected to custom database successfully"); - // Arc::new(Mutex::new(conn)) - // } - // Err(e2) => { - // log::error!("Failed to connect to custom database: {}", e2); - // return Err(std::io::Error::new( - // std::io::ErrorKind::ConnectionRefused, - // format!("Custom Database connection failed: {}", e2), - // )); - // } - // }; +// Placeholder for a second/custom database – currently just re‑using the main pool. +let _custom_db_url = format!( + "postgres://{}:{}@{}:{}/{}", + cfg.database_custom.username, + cfg.database_custom.password, + cfg.database_custom.server, + cfg.database_custom.port, + cfg.database_custom.database +); +let db_custom_pool = db_pool.clone(); - ensure_llama_servers_running() - .await - .expect("Failed to initialize LLM local server."); - - let redis_client = match redis::Client::open("redis://127.0.0.1/") { - Ok(client) => { - info!("Connected to Redis successfully"); - Some(Arc::new(client)) - } - Err(e) => { - log::warn!("Failed to connect to Redis: {}", e); - None - } - }; - - let tool_manager = Arc::new(tools::ToolManager::new()); - let llama_url = - std::env::var("LLM_URL").unwrap_or_else(|_| "http://48.217.66.81:8080".to_string()); - let llm_provider = Arc::new(crate::llm::OpenAIClient::new( - "empty".to_string(), - Some(llama_url.clone()), - )); - - let web_adapter = Arc::new(WebChannelAdapter::new()); - let voice_adapter = Arc::new(VoiceAdapter::new( - "https://livekit.example.com".to_string(), - "api_key".to_string(), - "api_secret".to_string(), - )); - let whatsapp_adapter = Arc::new(WhatsAppAdapter::new( - "whatsapp_token".to_string(), - "phone_number_id".to_string(), - "verify_token".to_string(), - )); - let tool_api = Arc::new(tools::ToolApi::new()); - - let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new( - diesel::Connection::establish(&cfg.database_url()).unwrap(), - redis_client.clone(), - ))); - - let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new( - diesel::Connection::establish(&cfg.database_url()).unwrap(), - redis_client.clone(), - ))); - - let app_state = Arc::new(AppState { - s3_client: None, - config: Some(cfg.clone()), - conn: db_pool.clone(), - custom_conn: db_custom_pool.clone(), - redis_client: redis_client.clone(), - session_manager: session_manager.clone(), - tool_manager: tool_manager.clone(), - llm_provider: llm_provider.clone(), - auth_service: auth_service.clone(), - channels: Arc::new(Mutex::new({ - let mut map = HashMap::new(); - map.insert( - "web".to_string(), - web_adapter.clone() as Arc, - ); - map - })), - response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), - web_adapter: web_adapter.clone(), - voice_adapter: voice_adapter.clone(), - whatsapp_adapter: whatsapp_adapter.clone(), - tool_api: tool_api.clone(), - }); - - info!( - "Starting server on {}:{}", - config.server.host, config.server.port - ); - - HttpServer::new(move || { - let cors = Cors::default() - .allow_any_origin() - .allow_any_method() - .allow_any_header() - .max_age(3600); - - let app_state_clone = app_state.clone(); - let mut app = App::new() - .wrap(cors) - .wrap(Logger::default()) - .wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i")) - .app_data(web::Data::from(app_state_clone)); - - app = app - .service(upload_file) - .service(index) - .service(static_files) - .service(websocket_handler) - .service(auth_handler) - .service(whatsapp_webhook_verify) - .service(voice_start) - .service(voice_stop) - .service(create_session) - .service(get_sessions) - .service(start_session) - .service(get_session_history) - .service(set_mode_handler) - .service(chat_completions_local) - .service(embeddings_local); - - #[cfg(feature = "email")] - { - app = app - .service(get_latest_email_from) - .service(get_emails) - .service(list_emails) - .service(send_email) - .service(save_draft) - .service(save_click); - } - - app - }) - .bind((config.server.host.clone(), config.server.port))? - .run() +// ---------------------------------------------------------------------- +// LLM local server initialisation +// ---------------------------------------------------------------------- +ensure_llama_servers_running() .await + .expect("Failed to initialize LLM local server."); + +// ---------------------------------------------------------------------- +// Redis client (optional) +// ---------------------------------------------------------------------- +let redis_client = match redis::Client::open("redis://127.0.0.1/") { + Ok(client) => { + info!("Connected to Redis successfully"); + Some(Arc::new(client)) + } + Err(e) => { + log::warn!("Failed to connect to Redis: {}", e); + None + } +}; + +// ---------------------------------------------------------------------- +// Tooling and LLM provider +// ---------------------------------------------------------------------- +let tool_manager = Arc::new(tools::ToolManager::new()); +let llm_provider = Arc::new(crate::llm::OpenAIClient::new( + "empty".to_string(), + Some(llama_url.clone()), +)); + +// ---------------------------------------------------------------------- +// Channel adapters +// ---------------------------------------------------------------------- +let web_adapter = Arc::new(WebChannelAdapter::new()); +let voice_adapter = Arc::new(VoiceAdapter::new( + "https://livekit.example.com".to_string(), + "api_key".to_string(), + "api_secret".to_string(), +)); +let whatsapp_adapter = Arc::new(WhatsAppAdapter::new( + "whatsapp_token".to_string(), + "phone_number_id".to_string(), + "verify_token".to_string(), +)); +let tool_api = Arc::new(tools::ToolApi::new()); + +// ---------------------------------------------------------------------- +// S3 / MinIO client +// ---------------------------------------------------------------------- +let drive = init_drive(&config.minio) + .await + .expect("Failed to initialize Drive"); + +// ---------------------------------------------------------------------- +// Session and authentication services +// ---------------------------------------------------------------------- +let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new( + diesel::Connection::establish(&cfg.database_url()).unwrap(), + redis_client.clone(), +))); + +let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new( + diesel::Connection::establish(&cfg.database_url()).unwrap(), + redis_client.clone(), +))); + +// ---------------------------------------------------------------------- +// Global application state +// ---------------------------------------------------------------------- +let app_state = Arc::new(AppState { + // `s3_client` expects an `Option`. + s3_client: Some(drive.clone()), + config: Some(cfg.clone()), + conn: db_pool.clone(), + custom_conn: db_custom_pool.clone(), + redis_client: redis_client.clone(), + session_manager: session_manager.clone(), + tool_manager: tool_manager.clone(), + llm_provider: llm_provider.clone(), + auth_service: auth_service.clone(), + channels: Arc::new(Mutex::new({ + let mut map = HashMap::new(); + map.insert( + "web".to_string(), + web_adapter.clone() as Arc, + ); + map + })), + response_channels: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + web_adapter: web_adapter.clone(), + voice_adapter: voice_adapter.clone(), + whatsapp_adapter: whatsapp_adapter.clone(), + tool_api: tool_api.clone(), +}); + +// ---------------------------------------------------------------------- +// Start HTTP server (multithreaded) +// ---------------------------------------------------------------------- +info!( + "Starting server on {}:{}", + config.server.host, config.server.port +); + +// Determine the number of worker threads – default to the number of logical CPUs, +// fallback to 4 if the information cannot be retrieved. +let worker_count = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4); + +HttpServer::new(move || { + // CORS configuration – allow any origin/method/header (adjust for production). + let cors = Cors::default() + .allow_any_origin() + .allow_any_method() + .allow_any_header() + .max_age(3600); + + let app_state_clone = app_state.clone(); + let mut app = App::new() + .wrap(cors) + .wrap(Logger::default()) + .wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i")) + .app_data(web::Data::from(app_state_clone)); + + // Register all route handlers / services. + app = app + .service(upload_file) + .service(index) + .service(static_files) + .service(websocket_handler) + .service(auth_handler) + .service(whatsapp_webhook_verify) + .service(voice_start) + .service(voice_stop) + .service(create_session) + .service(get_sessions) + .service(start_session) + .service(get_session_history) + .service(set_mode_handler) + .service(chat_completions_local) + .service(embeddings_local); + + #[cfg(feature = "email")] + { + app = app + .service(get_latest_email_from) + .service(get_emails) + .service(list_emails) + .service(send_email) + .service(save_draft) + .service(save_click); + } + + app +}) +.workers(worker_count) // Enable multithreaded handling +.bind((config.server.host.clone(), config.server.port))? +.run() +.await } diff --git a/src/shared/state.rs b/src/shared/state.rs index cc803d4d..07eeaf8e 100644 --- a/src/shared/state.rs +++ b/src/shared/state.rs @@ -75,7 +75,7 @@ impl Default for AppState { tool_manager: Arc::new(ToolManager::new()), llm_provider: Arc::new(crate::llm::OpenAIClient::new( "empty".to_string(), - Some("http://48.217.66.81:8080".to_string()), + Some("http://localhost:8081".to_string()), )), auth_service: Arc::new(tokio::sync::Mutex::new(AuthService::new( diesel::PgConnection::establish("postgres://localhost/test").unwrap(), diff --git a/templates/annoucements.gbai/annoucements.gbdialog/start.bas b/templates/annoucements.gbai/annoucements.gbdialog/start.bas index 5fc0fd3f..ee04adb1 100644 --- a/templates/annoucements.gbai/annoucements.gbdialog/start.bas +++ b/templates/annoucements.gbai/annoucements.gbdialog/start.bas @@ -1,7 +1 @@ TALK "Olá, estou preparando um resumo para você." - -text = GET "default.gbdrive/default.pdf" -resume = LLM "Say Hello and present a a resume from " + text - -SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: O céu é azul." -TALK resume