From cfeb0241731acb7180a2e54ff6cc6a0b32bde157 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 15 Oct 2025 13:13:27 -0300 Subject: [PATCH] Refactor GET keyword with blocking execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace async task spawning with `block_in_place` to simplify GET handling - Add detailed safety checks for file paths and organization prefixes - Introduce timeout and keep‑alive settings for HTTP client - Improve S3 bucket access with existence check, timeouts, and richer logging - Switch tracing logs to debug and add warning logs where appropriate - Update announcement template to retrieve a PDF, generate a resume via LLM, and set context for subsequent queries. --- src/basic/keywords/get.rs | 233 ++++++++++++------ .../annoucements.gbdialog/start.bas | 8 +- 2 files changed, 163 insertions(+), 78 deletions(-) diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index 383d6a6a..ff582b12 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -1,11 +1,12 @@ use crate::shared::models::UserSession; use crate::shared::state::AppState; -use log::{error, info}; +use log::{debug, error, info, warn}; use reqwest::{self, Client}; use rhai::{Dynamic, Engine}; use std::error::Error; use std::path::Path; use std::sync::Arc; +use std::time::Duration; pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine) { let state_clone = Arc::clone(&state); @@ -26,48 +27,41 @@ pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine ))); } - let state_for_async = Arc::clone(&state_clone); - let url_for_async = url_str.clone(); + let state_for_blocking = Arc::clone(&state_clone); + let url_for_blocking = url_str.clone(); - // Create a channel to communicate the result back - let (tx, rx) = tokio::sync::oneshot::channel(); + // Use spawn_blocking for synchronous execution of async operations + let result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + debug!("Starting GET operation: {}", url_for_blocking); - // Spawn the async task without blocking - tokio::spawn(async move { - log::trace!("Async task started for GET operation: {}", url_for_async); + let result = if url_for_blocking.starts_with("https://") + || url_for_blocking.starts_with("http://") + { + info!("HTTP(S) GET request: {}", url_for_blocking); + execute_get(&url_for_blocking).await + } else { + info!("Local file GET request from bucket: {}", url_for_blocking); + get_from_bucket(&state_for_blocking, &url_for_blocking).await + }; - let result = if url_for_async.starts_with("https://") - || url_for_async.starts_with("http://") - { - info!("HTTP(S) GET request: {}", url_for_async); - execute_get(&url_for_async).await - } else { - info!("Local file GET request from bucket: {}", url_for_async); - get_from_bucket(&state_for_async, &url_for_async).await - }; + debug!( + "GET operation completed for: {}, success: {}", + url_for_blocking, + result.is_ok() + ); - // Send the result back through the channel - let _ = tx.send(result); + result + }) }); - // Block on receiving the result from the channel. - // This is acceptable because we're in a custom syntax handler. - let result = match futures::executor::block_on(rx) { - Ok(inner) => inner.map_err(|e| { - Box::new(rhai::EvalAltResult::ErrorRuntime( - e.to_string().into(), - rhai::Position::NONE, - )) - })?, - Err(_) => { - return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( - "Failed to receive result from async task".into(), - rhai::Position::NONE, - ))); - } - }; - - Ok(Dynamic::from(result)) + match result { + Ok(content) => Ok(Dynamic::from(content)), + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + e.to_string().into(), + rhai::Position::NONE, + ))), + } }) .unwrap(); } @@ -94,10 +88,20 @@ fn is_safe_path(path: &str) -> bool { return false; } - // Normalize and validate the path doesn't escape - if let Ok(normalized) = Path::new(path).canonicalize() { - // If canonicalize succeeds, verify it doesn't contain parent directory references - if normalized.to_string_lossy().contains("..") { + // Additional checks for suspicious patterns + if path.contains("//") || path.contains("~") || path.contains("*") || path.contains("?") { + return false; + } + + // For local file paths, ensure they don't try to escape + if !path.starts_with("http") { + let path_obj = Path::new(path); + if path_obj.components().count() + != path_obj + .components() + .filter(|c| matches!(c, std::path::Component::Normal(_))) + .count() + { return false; } } @@ -106,11 +110,13 @@ fn is_safe_path(path: &str) -> bool { } pub async fn execute_get(url: &str) -> Result> { - log::trace!("Starting execute_get with URL: {}", url); + debug!("Starting execute_get with URL: {}", url); - // Build secure HTTP client (removed danger_accept_invalid_certs) + // Build secure HTTP client with reasonable timeouts let client = Client::builder() - .timeout(std::time::Duration::from_secs(30)) + .timeout(Duration::from_secs(30)) + .connect_timeout(Duration::from_secs(10)) + .tcp_keepalive(Duration::from_secs(30)) .build() .map_err(|e| { error!("Failed to build HTTP client: {}", e); @@ -125,11 +131,16 @@ pub async fn execute_get(url: &str) -> Result Result Result> { - log::trace!("Getting file from bucket: {}", file_path); + debug!("Getting file from bucket: {}", file_path); // Additional validation for file path if !is_safe_path(file_path) { @@ -159,7 +170,10 @@ pub async fn get_from_bucket( // Ensure the S3 client is configured let s3_client = match &state.s3_client { - Some(client) => client, + Some(client) => { + debug!("S3 client is available"); + client + } None => { error!( "S3 client not configured when trying to get file: {}", @@ -175,53 +189,110 @@ pub async fn get_from_bucket( .config .as_ref() .ok_or_else(|| -> Box { + error!("App configuration missing"); "App configuration missing".into() })?; let org_prefix = &cfg.minio.org_prefix; // Validate org_prefix doesn't contain suspicious characters - if org_prefix.contains("..") || org_prefix.contains('/') { + if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') { error!("Invalid org_prefix in configuration: {}", org_prefix); return Err("Invalid organization prefix in configuration".into()); } - format!("{}.default.gbai", org_prefix) + let bucket = format!("{}default.gbai", org_prefix); + debug!("Resolved bucket name: {}", bucket); + bucket }; - log::trace!("Using bucket: {} for file: {}", bucket_name, file_path); + debug!("Using bucket: {} for file: {}", bucket_name, file_path); - // Perform the S3 GetObject request - let response = s3_client + // Check if bucket exists first (optional but helpful for debugging) + match s3_client.head_bucket().bucket(&bucket_name).send().await { + Ok(_) => debug!("Bucket exists: {}", bucket_name), + Err(e) => { + error!( + "Bucket does not exist or inaccessible: {} - {}", + bucket_name, e + ); + return Err(format!("Bucket inaccessible: {}", e).into()); + } + } + + // Perform the S3 GetObject request with timeout + let get_object_future = s3_client .get_object() .bucket(&bucket_name) .key(file_path) - .send() - .await - .map_err(|e| { + .send(); + + let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await { + Ok(Ok(response)) => { + debug!("S3 GetObject successful for key: {}", file_path); + response + } + Ok(Err(e)) => { error!( "S3 get_object failed for bucket {} key {}: {}", bucket_name, file_path, e ); - e - })?; + return Err(format!("S3 operation failed: {}", e).into()); + } + Err(_) => { + error!( + "S3 get_object timed out for bucket {} key {}", + bucket_name, file_path + ); + return Err("S3 operation timed out".into()); + } + }; - // Collect the body bytes - let data = response.body.collect().await.map_err(|e| { - error!( - "Failed to collect S3 response body for bucket {} key {}: {}", - bucket_name, file_path, e - ); - e - })?; + // Collect the body bytes with timeout + let body_future = response.body.collect(); + let data = match tokio::time::timeout(Duration::from_secs(30), body_future).await { + Ok(Ok(data)) => { + debug!( + "Successfully collected S3 response body for key: {}", + file_path + ); + data + } + Ok(Err(e)) => { + error!( + "Failed to collect S3 response body for bucket {} key {}: {}", + bucket_name, file_path, e + ); + return Err(format!("Failed to read S3 response: {}", e).into()); + } + Err(_) => { + error!( + "Timeout collecting S3 response body for bucket {} key {}", + bucket_name, file_path + ); + return Err("Timeout reading S3 response body".into()); + } + }; // Handle PDF files specially; otherwise treat as UTF‑8 text let bytes = data.into_bytes().to_vec(); + debug!( + "Retrieved {} bytes from S3 for key: {}", + bytes.len(), + file_path + ); let content = if file_path.to_ascii_lowercase().ends_with(".pdf") { + debug!("Processing as PDF file: {}", file_path); // Extract text from PDF using the `pdf_extract` crate match pdf_extract::extract_text_from_mem(&bytes) { - Ok(text) => text, + Ok(text) => { + debug!( + "Successfully extracted text from PDF, length: {}", + text.len() + ); + text + } Err(e) => { error!( "Failed to extract text from PDF for bucket {} key {}: {}", @@ -231,17 +302,25 @@ pub async fn get_from_bucket( } } } else { + debug!("Processing as text file: {}", file_path); // Convert bytes to a UTF‑8 String - String::from_utf8(bytes).map_err(|e| { - error!( - "Failed to convert S3 response to UTF-8 for bucket {} key {}: {}", - bucket_name, file_path, e - ); - e - })? + match String::from_utf8(bytes) { + Ok(text) => { + debug!("Successfully converted to UTF-8, length: {}", text.len()); + text + } + Err(e) => { + error!( + "Failed to convert S3 response to UTF-8 for bucket {} key {}: {}", + bucket_name, file_path, e + ); + // If it's not valid UTF-8, return as base64 or error + return Err("File content is not valid UTF-8 text".into()); + } + } }; - log::trace!( + info!( "Successfully retrieved file from bucket: {}, content length: {}", file_path, content.len() diff --git a/templates/annoucements.gbai/annoucements.gbdialog/start.bas b/templates/annoucements.gbai/annoucements.gbdialog/start.bas index ee04adb1..b5ea80d6 100644 --- a/templates/annoucements.gbai/annoucements.gbdialog/start.bas +++ b/templates/annoucements.gbai/annoucements.gbdialog/start.bas @@ -1 +1,7 @@ -TALK "Olá, estou preparando um resumo para você." +TALK "Olá, pode me perguntar sobre qualquer coisa..." + +text = GET "default.gbdrive/default.pdf" +resume = LLM "Say Hello and present a a resume from " + text +TALK resume + +SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: " + text