- GET ketyowrd for buckets.

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-15 12:45:15 -03:00
parent a293c0e083
commit f401c170d4
13 changed files with 668 additions and 295 deletions

21
.vscode/launch.json vendored
View file

@ -7,15 +7,11 @@
{
"type": "lldb",
"request": "launch",
"name": "Debug executable 'gbserver'",
"name": "Debug executable 'botserver'",
"cargo": {
"args": [
"build",
"--bin=gbserver",
"--package=gbserver"
],
"args": ["build", "--bin=botserver", "--package=botserver"],
"filter": {
"name": "gbserver",
"name": "botserver",
"kind": "bin"
}
},
@ -25,16 +21,11 @@
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in executable 'gbserver'",
"name": "Debug unit tests in executable 'botserver'",
"cargo": {
"args": [
"test",
"--no-run",
"--bin=gbserver",
"--package=gbserver"
],
"args": ["test", "--no-run", "--bin=botserver", "--package=botserver"],
"filter": {
"name": "gbserver",
"name": "botserver",
"kind": "bin"
}
},

186
Cargo.lock generated
View file

@ -267,6 +267,15 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "adobe-cmap-parser"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae8abfa9a4688de8fc9f42b3f013b6fffec18ed8a554f5f113577e0b9b3212a3"
dependencies = [
"pom",
]
[[package]]
name = "aead"
version = "0.5.2"
@ -979,6 +988,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block-padding"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
"generic-array",
]
[[package]]
name = "bmrng"
version = "0.5.2"
@ -1020,6 +1038,7 @@ dependencies = [
"mailparse",
"native-tls",
"num-format",
"pdf-extract",
"qdrant-client",
"rand 0.9.2",
"redis",
@ -1073,6 +1092,12 @@ version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -1133,6 +1158,15 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher",
]
[[package]]
name = "cc"
version = "1.2.41"
@ -1160,6 +1194,12 @@ dependencies = [
"nom 7.1.3",
]
[[package]]
name = "cff-parser"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f5b6e9141c036f3ff4ce7b2f7e432b0f00dee416ddcd4f17741d189ddc2e9d"
[[package]]
name = "cfg-if"
version = "1.0.3"
@ -1820,6 +1860,15 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
[[package]]
name = "ecb"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a8bfa975b1aec2145850fcaa1c6fe269a16578c44705a532ae3edc92b8881c7"
dependencies = [
"cipher",
]
[[package]]
name = "ecdsa"
version = "0.14.8"
@ -1928,6 +1977,15 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "euclid"
version = "0.20.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb7ef65b3777a325d1eeefefab5b6d4959da54747e33bd6258e789640f307ad"
dependencies = [
"num-traits",
]
[[package]]
name = "fastrand"
version = "2.3.0"
@ -2714,6 +2772,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [
"block-padding",
"generic-array",
]
@ -3086,6 +3145,34 @@ version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]]
name = "lopdf"
version = "0.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7184fdea2bc3cd272a1acec4030c321a8f9875e877b3f92a53f2f6033fdc289"
dependencies = [
"aes",
"bitflags 2.9.4",
"cbc",
"ecb",
"encoding_rs",
"flate2",
"getrandom 0.3.3",
"indexmap 2.11.4",
"itoa",
"log",
"md-5",
"nom 8.0.0",
"nom_locate",
"rand 0.9.2",
"rangemap",
"sha2",
"stringprep",
"thiserror 2.0.17",
"ttf-parser",
"weezl",
]
[[package]]
name = "lru"
version = "0.12.5"
@ -3231,6 +3318,17 @@ dependencies = [
"memchr",
]
[[package]]
name = "nom_locate"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b577e2d69827c4740cba2b52efaad1c4cc7c73042860b199710b3575c68438d"
dependencies = [
"bytecount",
"memchr",
"nom 8.0.0",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@ -3509,6 +3607,23 @@ dependencies = [
"hmac",
]
[[package]]
name = "pdf-extract"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28ba1758a3d3f361459645780e09570b573fc3c82637449e9963174c813a98"
dependencies = [
"adobe-cmap-parser",
"cff-parser",
"encoding_rs",
"euclid",
"log",
"lopdf",
"postscript",
"type1-encoding-parser",
"unicode-normalization",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
@ -3585,6 +3700,12 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "pom"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60f6ce597ecdcc9a098e7fddacb1065093a3d66446fa16c675e7e71d1b5c28e6"
[[package]]
name = "portable-atomic"
version = "1.11.1"
@ -3600,6 +3721,12 @@ dependencies = [
"portable-atomic",
]
[[package]]
name = "postscript"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78451badbdaebaf17f053fd9152b3ffb33b516104eacb45e7864aaa9c712f306"
[[package]]
name = "potential_utf"
version = "0.1.3"
@ -3917,6 +4044,12 @@ dependencies = [
"getrandom 0.3.3",
]
[[package]]
name = "rangemap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223"
[[package]]
name = "redis"
version = "0.27.6"
@ -4590,6 +4723,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
"unicode-properties",
]
[[package]]
name = "strsim"
version = "0.11.1"
@ -5083,6 +5227,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "ttf-parser"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2df906b07856748fa3f6e0ad0cbaa047052d4a7dd609e231c4f72cee8c36f31"
[[package]]
name = "tungstenite"
version = "0.20.1"
@ -5119,18 +5269,48 @@ dependencies = [
"utf-8",
]
[[package]]
name = "type1-encoding-parser"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3d6cc09e1a99c7e01f2afe4953789311a1c50baebbdac5b477ecf78e2e92a5b"
dependencies = [
"pom",
]
[[package]]
name = "typenum"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "unicode-bidi"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
[[package]]
name = "unicode-ident"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d"
[[package]]
name = "unicode-normalization"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-properties"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
[[package]]
name = "unicode-width"
version = "0.2.2"
@ -5443,6 +5623,12 @@ dependencies = [
"zip 0.6.6",
]
[[package]]
name = "weezl"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a751b3277700db47d3e574514de2eced5e54dc8a5436a3bf7a0b248b2cee16f3"
[[package]]
name = "which"
version = "8.0.0"

View file

@ -59,3 +59,4 @@ time = "0.3.44"
aws-sdk-s3 = "1.108.0"
headless_chrome = { version = "1.0.18", optional = true }
rand = "0.9.2"
pdf-extract = "0.10.0"

View file

@ -2,6 +2,7 @@ MOST IMPORTANT CODE GENERATION RULES:
- No placeholders, never comment/uncomment code, no explanations, no filler text.
- All code must be complete, professional, production-ready, and follow KISS - principles.
- NEVER return placeholders of any kind, neither commented code, only REAL PRODUCTION GRADE code.
- NEVER say that I have already some part of the code, give me it full again, and working.
- Always increment logging with (all-in-one-line) info!, debug!, trace! to give birth to the console.
- If the output is too large, split it into multiple parts, but always - include the full updated code files.
- Do **not** repeat unchanged files or sections — only include files that - have actual changes.

View file

@ -1,58 +1,147 @@
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::info;
use log::{error, info};
use reqwest::{self, Client};
use rhai::{Dynamic, Engine};
use std::error::Error;
use std::path::Path;
use std::sync::Arc;
pub fn get_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state);
engine
.register_custom_syntax(&["GET", "$expr$"], false, move |context, inputs| {
// Evaluate the URL expression
let url = context.eval_expression_tree(&inputs[0])?;
let url_str = url.to_string();
if url_str.contains("..") {
return Err("URL contains invalid path traversal sequences like '..'.".into());
info!("GET command executed: {}", url_str);
// Enhanced security check for path traversal
if !is_safe_path(&url_str) {
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"URL contains invalid or unsafe path sequences".into(),
rhai::Position::NONE,
)));
}
let state_for_async = state_clone.clone();
let state_for_async = Arc::clone(&state_clone);
let url_for_async = url_str.clone();
if url_str.starts_with("https://") {
info!("HTTPS GET request: {}", url_for_async);
// Create a channel to communicate the result back
let (tx, rx) = tokio::sync::oneshot::channel();
let fut = execute_get(&url_for_async);
let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
.map_err(|e| format!("HTTP request failed: {}", e))?;
// Spawn the async task without blocking
tokio::spawn(async move {
log::trace!("Async task started for GET operation: {}", url_for_async);
Ok(Dynamic::from(result))
let result = if url_for_async.starts_with("https://")
|| url_for_async.starts_with("http://")
{
info!("HTTP(S) GET request: {}", url_for_async);
execute_get(&url_for_async).await
} else {
info!("Local file GET request from bucket: {}", url_for_async);
get_from_bucket(&state_for_async, &url_for_async).await
};
let fut = get_from_bucket(&state_for_async, &url_for_async);
let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
.map_err(|e| format!("Bucket GET failed: {}", e))?;
// Send the result back through the channel
let _ = tx.send(result);
});
// Block on receiving the result from the channel.
// This is acceptable because we're in a custom syntax handler.
let result = match futures::executor::block_on(rx) {
Ok(inner) => inner.map_err(|e| {
Box::new(rhai::EvalAltResult::ErrorRuntime(
e.to_string().into(),
rhai::Position::NONE,
))
})?,
Err(_) => {
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"Failed to receive result from async task".into(),
rhai::Position::NONE,
)));
}
};
Ok(Dynamic::from(result))
}
})
.unwrap();
}
/// Enhanced security check for path traversal and unsafe paths
fn is_safe_path(path: &str) -> bool {
// Allow full URLs
if path.starts_with("https://") || path.starts_with("http://") {
return true;
}
// Check for various path traversal patterns
if path.contains("..") {
return false;
}
// Reject absolute paths (starting with /)
if path.starts_with('/') {
return false;
}
// Reject Windows-style absolute paths
if path.len() >= 2 && path.chars().nth(1) == Some(':') {
return false;
}
// Normalize and validate the path doesn't escape
if let Ok(normalized) = Path::new(path).canonicalize() {
// If canonicalize succeeds, verify it doesn't contain parent directory references
if normalized.to_string_lossy().contains("..") {
return false;
}
}
true
}
pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
info!("Starting execute_get with URL: {}", url);
log::trace!("Starting execute_get with URL: {}", url);
// Build secure HTTP client (removed danger_accept_invalid_certs)
let client = Client::builder()
.danger_accept_invalid_certs(true)
.build()?;
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| {
error!("Failed to build HTTP client: {}", e);
e
})?;
let response = client.get(url).send().await?;
let content = response.text().await?;
let response = client.get(url).send().await.map_err(|e| {
error!("HTTP request failed for URL {}: {}", url, e);
e
})?;
// Check response status
if !response.status().is_success() {
let status = response.status();
error!(
"HTTP request returned non-success status for URL {}: {}",
url, status
);
return Err(format!("HTTP request failed with status: {}", status).into());
}
let content = response.text().await.map_err(|e| {
error!("Failed to read response text for URL {}: {}", url, e);
e
})?;
log::trace!(
"Successfully executed GET request for URL: {}, content length: {}",
url,
content.len()
);
Ok(content)
}
@ -60,25 +149,102 @@ pub async fn get_from_bucket(
state: &AppState,
file_path: &str,
) -> Result<String, Box<dyn Error + Send + Sync>> {
info!("Getting file from bucket: {}", file_path);
log::trace!("Getting file from bucket: {}", file_path);
if let Some(s3_client) = &state.s3_client {
let bucket_name = std::env::var("DRIVE_ORG_PREFIX")
.map(|v| format!("{}-default", v))
.unwrap_or_else(|_| "org-default".to_string());
// Additional validation for file path
if !is_safe_path(file_path) {
error!("Unsafe file path detected: {}", file_path);
return Err("Invalid file path".into());
}
// Ensure the S3 client is configured
let s3_client = match &state.s3_client {
Some(client) => client,
None => {
error!(
"S3 client not configured when trying to get file: {}",
file_path
);
return Err("S3 client not configured".into());
}
};
// Resolve the bucket name safely, handling missing configuration values
let bucket_name = {
let cfg = state
.config
.as_ref()
.ok_or_else(|| -> Box<dyn Error + Send + Sync> {
"App configuration missing".into()
})?;
let org_prefix = &cfg.minio.org_prefix;
// Validate org_prefix doesn't contain suspicious characters
if org_prefix.contains("..") || org_prefix.contains('/') {
error!("Invalid org_prefix in configuration: {}", org_prefix);
return Err("Invalid organization prefix in configuration".into());
}
format!("{}.default.gbai", org_prefix)
};
log::trace!("Using bucket: {} for file: {}", bucket_name, file_path);
// Perform the S3 GetObject request
let response = s3_client
.get_object()
.bucket(&bucket_name)
.key(file_path)
.send()
.await?;
.await
.map_err(|e| {
error!(
"S3 get_object failed for bucket {} key {}: {}",
bucket_name, file_path, e
);
e
})?;
let data = response.body.collect().await?;
let content = String::from_utf8(data.into_bytes().to_vec())?;
// Collect the body bytes
let data = response.body.collect().await.map_err(|e| {
error!(
"Failed to collect S3 response body for bucket {} key {}: {}",
bucket_name, file_path, e
);
e
})?;
Ok(content)
// Handle PDF files specially; otherwise treat as UTF8 text
let bytes = data.into_bytes().to_vec();
let content = if file_path.to_ascii_lowercase().ends_with(".pdf") {
// Extract text from PDF using the `pdf_extract` crate
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => text,
Err(e) => {
error!(
"Failed to extract text from PDF for bucket {} key {}: {}",
bucket_name, file_path, e
);
return Err(format!("PDF extraction failed: {}", e).into());
}
}
} else {
Err("S3 client not configured".into())
}
// Convert bytes to a UTF8 String
String::from_utf8(bytes).map_err(|e| {
error!(
"Failed to convert S3 response to UTF-8 for bucket {} key {}: {}",
bucket_name, file_path, e
);
e
})?
};
log::trace!(
"Successfully retrieved file from bucket: {}, content length: {}",
file_path,
content.len()
);
Ok(content)
}

View file

@ -52,7 +52,7 @@ impl ScriptService {
last_keyword(&mut engine);
format_keyword(&mut engine);
llm_keyword(&state, user.clone(), &mut engine);
get_keyword(&state, user.clone(), &mut engine);
get_keyword(state.clone(), user.clone(), &mut engine);
set_keyword(&state, user.clone(), &mut engine);
wait_keyword(&state, user.clone(), &mut engine);
print_keyword(&state, user.clone(), &mut engine);

View file

@ -862,7 +862,7 @@ async fn auth_handler(
data: web::Data<AppState>,
web::Query(params): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse> {
let token = params.get("token").cloned().unwrap_or_default();
let _token = params.get("token").cloned().unwrap_or_default();
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") {
match Uuid::parse_str(&bot_guid) {

View file

@ -27,7 +27,7 @@ pub struct DriveConfig {
pub access_key: String,
pub secret_key: String,
pub use_ssl: bool,
pub bucket: String,
pub org_prefix: String,
}
#[derive(Clone)]
@ -107,7 +107,7 @@ impl AppConfig {
.unwrap_or_else(|_| "false".to_string())
.parse()
.unwrap_or(false),
bucket: env::var("DRIVE_ORG_PREFIX").unwrap_or_else(|_| "botserver".to_string()),
org_prefix: env::var("DRIVE_ORG_PREFIX").unwrap_or_else(|_| "botserver".to_string()),
};
let email = EmailConfig {

View file

@ -6,6 +6,7 @@ use std::io::Write;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt;
use crate::config::DriveConfig;
use crate::shared::state::AppState;
#[post("/files/upload/{folder_path}")]
@ -49,7 +50,7 @@ pub async fn upload_file(
let temp_file_path = temp_file.into_temp_path();
// Retrieve the bucket name from configuration, handling the case where it is missing
let bucket_name = match &state.config {
let bucket_name = match &state.get_ref().config {
Some(cfg) => cfg.s3_bucket.clone(),
None => {
// Clean up the temp file before returning the error
@ -63,9 +64,13 @@ pub async fn upload_file(
// Build the S3 object key (folder + filename)
let s3_key = format!("{}/{}", folder_path, file_name);
// Retrieve a reference to the S3 client, handling the case where it is missing
let s3_client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
})?;
// Perform the upload
let s3_client = get_s3_client(&state).await;
match upload_to_s3(&s3_client, &bucket_name, &s3_key, &temp_file_path).await {
match upload_to_s3(s3_client, &bucket_name, &s3_key, &temp_file_path).await {
Ok(_) => {
// Remove the temporary file now that the upload succeeded
let _ = std::fs::remove_file(&temp_file_path);
@ -86,8 +91,7 @@ pub async fn upload_file(
}
// Helper function to get S3 client
async fn get_s3_client(state: &AppState) -> Client {
if let Some(cfg) = &state.config.as_ref().and_then(|c| Some(&c.minio)) {
pub async fn init_drive(cfg: &DriveConfig) -> Result<Client, Box<dyn std::error::Error>> {
// Build static credentials from the Drive configuration.
let credentials = aws_sdk_s3::config::Credentials::new(
cfg.access_key.clone(),
@ -103,16 +107,15 @@ async fn get_s3_client(state: &AppState) -> Client {
// MinIO requires pathstyle addressing.
let s3_config = aws_sdk_s3::config::Builder::new()
// Set the behavior version to the latest to satisfy the SDK requirement.
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.endpoint_url(endpoint)
.credentials_provider(credentials)
.force_path_style(true)
.build();
Client::from_conf(s3_config)
} else {
panic!("MinIO configuration is missing in application state");
}
Ok(Client::from_conf(s3_config))
}
// Helper function to upload file to S3
@ -122,15 +125,14 @@ async fn upload_to_s3(
key: &str,
file_path: &std::path::Path,
) -> Result<(), S3Error> {
// Convert the file at `file_path` into a `ByteStream`. Any I/O error is
// turned into a constructionfailure `SdkError` so that the functions
// `Result` type (`Result<(), S3Error>`) stays consistent.
// Convert the file at `file_path` into a ByteStream, mapping any I/O error
// into the appropriate `SdkError` type expected by the function signature.
let body = aws_sdk_s3::primitives::ByteStream::from_path(file_path)
.await
.map_err(|e| {
aws_sdk_s3::error::SdkError::<
aws_sdk_s3::operation::put_object::PutObjectError,
aws_sdk_s3::operation::put_object::PutObjectOutput,
aws_sdk_s3::primitives::ByteStream,
>::construction_failure(e)
})?;
@ -141,7 +143,8 @@ async fn upload_to_s3(
.key(key)
.body(body)
.send()
.await?;
.await
.map(|_| ())?; // Convert the successful output to `()`.
Ok(())
}

View file

@ -63,7 +63,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
}
// Get configuration from environment variables
let llm_url = env::var("LLM_URL").unwrap_or_else(|_| "http://48.217.66.81:8080".to_string());
let llm_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
let embedding_url =
env::var("EMBEDDING_URL").unwrap_or_else(|_| "http://localhost:8082".to_string());
let llama_cpp_path = env::var("LLM_CPP_PATH").unwrap_or_else(|_| "~/llama.cpp".to_string());
@ -259,7 +259,7 @@ pub async fn chat_completions_local(
dotenv().ok().unwrap();
// Get llama.cpp server URL
let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://48.217.66.81:8080".to_string());
let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
// Convert OpenAI format to llama.cpp format
let prompt = messages_to_prompt(&req_body.messages);

View file

@ -34,7 +34,7 @@ use crate::config::AppConfig;
use crate::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
};
use crate::file::upload_file;
use crate::file::{init_drive, upload_file};
use crate::llm_legacy::llm_local::{
chat_completions_local, embeddings_local, ensure_llama_servers_running,
};
@ -42,13 +42,21 @@ use crate::shared::state::AppState;
use crate::whatsapp::WhatsAppAdapter;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
async fn main() -> std::io::Result<()> {// Load environment variables from a .env file, if present.
dotenv().ok();
let llama_url =
std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
// Initialise logger with environmentbased log level (default to "info").
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// Load application configuration.
let cfg = AppConfig::from_env();
let config = std::sync::Arc::new(cfg.clone());
// ----------------------------------------------------------------------
// Database connections
// ----------------------------------------------------------------------
let db_pool = match diesel::Connection::establish(&cfg.database_url()) {
Ok(conn) => {
info!("Connected to main database successfully");
@ -63,7 +71,8 @@ async fn main() -> std::io::Result<()> {
}
};
let custom_db_url = format!(
// Placeholder for a second/custom database currently just reusing the main pool.
let _custom_db_url = format!(
"postgres://{}:{}@{}:{}/{}",
cfg.database_custom.username,
cfg.database_custom.password,
@ -71,26 +80,18 @@ async fn main() -> std::io::Result<()> {
cfg.database_custom.port,
cfg.database_custom.database
);
let db_custom_pool = db_pool.clone();
// match diesel::Connection::establish(&custom_db_url) {
// Ok(conn) => {
// info!("Connected to custom database successfully");
// Arc::new(Mutex::new(conn))
// }
// Err(e2) => {
// log::error!("Failed to connect to custom database: {}", e2);
// return Err(std::io::Error::new(
// std::io::ErrorKind::ConnectionRefused,
// format!("Custom Database connection failed: {}", e2),
// ));
// }
// };
// ----------------------------------------------------------------------
// LLM local server initialisation
// ----------------------------------------------------------------------
ensure_llama_servers_running()
.await
.expect("Failed to initialize LLM local server.");
// ----------------------------------------------------------------------
// Redis client (optional)
// ----------------------------------------------------------------------
let redis_client = match redis::Client::open("redis://127.0.0.1/") {
Ok(client) => {
info!("Connected to Redis successfully");
@ -102,14 +103,18 @@ async fn main() -> std::io::Result<()> {
}
};
// ----------------------------------------------------------------------
// Tooling and LLM provider
// ----------------------------------------------------------------------
let tool_manager = Arc::new(tools::ToolManager::new());
let llama_url =
std::env::var("LLM_URL").unwrap_or_else(|_| "http://48.217.66.81:8080".to_string());
let llm_provider = Arc::new(crate::llm::OpenAIClient::new(
"empty".to_string(),
Some(llama_url.clone()),
));
// ----------------------------------------------------------------------
// Channel adapters
// ----------------------------------------------------------------------
let web_adapter = Arc::new(WebChannelAdapter::new());
let voice_adapter = Arc::new(VoiceAdapter::new(
"https://livekit.example.com".to_string(),
@ -123,6 +128,16 @@ async fn main() -> std::io::Result<()> {
));
let tool_api = Arc::new(tools::ToolApi::new());
// ----------------------------------------------------------------------
// S3 / MinIO client
// ----------------------------------------------------------------------
let drive = init_drive(&config.minio)
.await
.expect("Failed to initialize Drive");
// ----------------------------------------------------------------------
// Session and authentication services
// ----------------------------------------------------------------------
let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new(
diesel::Connection::establish(&cfg.database_url()).unwrap(),
redis_client.clone(),
@ -133,8 +148,12 @@ async fn main() -> std::io::Result<()> {
redis_client.clone(),
)));
// ----------------------------------------------------------------------
// Global application state
// ----------------------------------------------------------------------
let app_state = Arc::new(AppState {
s3_client: None,
// `s3_client` expects an `Option<aws_sdk_s3::Client>`.
s3_client: Some(drive.clone()),
config: Some(cfg.clone()),
conn: db_pool.clone(),
custom_conn: db_custom_pool.clone(),
@ -158,12 +177,22 @@ async fn main() -> std::io::Result<()> {
tool_api: tool_api.clone(),
});
// ----------------------------------------------------------------------
// Start HTTP server (multithreaded)
// ----------------------------------------------------------------------
info!(
"Starting server on {}:{}",
config.server.host, config.server.port
);
// Determine the number of worker threads default to the number of logical CPUs,
// fallback to 4 if the information cannot be retrieved.
let worker_count = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
HttpServer::new(move || {
// CORS configuration allow any origin/method/header (adjust for production).
let cors = Cors::default()
.allow_any_origin()
.allow_any_method()
@ -177,6 +206,7 @@ async fn main() -> std::io::Result<()> {
.wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i"))
.app_data(web::Data::from(app_state_clone));
// Register all route handlers / services.
app = app
.service(upload_file)
.service(index)
@ -207,6 +237,7 @@ async fn main() -> std::io::Result<()> {
app
})
.workers(worker_count) // Enable multithreaded handling
.bind((config.server.host.clone(), config.server.port))?
.run()
.await

View file

@ -75,7 +75,7 @@ impl Default for AppState {
tool_manager: Arc::new(ToolManager::new()),
llm_provider: Arc::new(crate::llm::OpenAIClient::new(
"empty".to_string(),
Some("http://48.217.66.81:8080".to_string()),
Some("http://localhost:8081".to_string()),
)),
auth_service: Arc::new(tokio::sync::Mutex::new(AuthService::new(
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),

View file

@ -1,7 +1 @@
TALK "Olá, estou preparando um resumo para você."
text = GET "default.gbdrive/default.pdf"
resume = LLM "Say Hello and present a a resume from " + text
SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: O céu é azul."
TALK resume