Refactor GET keyword with blocking execution

- Replace async task spawning with `block_in_place` to simplify GET
  handling
- Add detailed safety checks for file paths and organization prefixes
- Introduce timeout and keep‑alive settings for HTTP client
- Improve S3 bucket access with existence check, timeouts, and richer
  logging
- Switch tracing logs to debug and add warning logs where appropriate
- Update announcement template to retrieve a PDF, generate a resume via
  LLM, and set context for subsequent queries.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-15 13:13:27 -03:00
parent f401c170d4
commit cfeb024173
2 changed files with 163 additions and 78 deletions

View file

@ -1,11 +1,12 @@
use crate::shared::models::UserSession; use crate::shared::models::UserSession;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::{error, info}; use log::{debug, error, info, warn};
use reqwest::{self, Client}; use reqwest::{self, Client};
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
use std::error::Error; use std::error::Error;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
@ -26,48 +27,41 @@ pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
))); )));
} }
let state_for_async = Arc::clone(&state_clone); let state_for_blocking = Arc::clone(&state_clone);
let url_for_async = url_str.clone(); let url_for_blocking = url_str.clone();
// Create a channel to communicate the result back // Use spawn_blocking for synchronous execution of async operations
let (tx, rx) = tokio::sync::oneshot::channel(); let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
debug!("Starting GET operation: {}", url_for_blocking);
// Spawn the async task without blocking let result = if url_for_blocking.starts_with("https://")
tokio::spawn(async move { || url_for_blocking.starts_with("http://")
log::trace!("Async task started for GET operation: {}", url_for_async); {
info!("HTTP(S) GET request: {}", url_for_blocking);
execute_get(&url_for_blocking).await
} else {
info!("Local file GET request from bucket: {}", url_for_blocking);
get_from_bucket(&state_for_blocking, &url_for_blocking).await
};
let result = if url_for_async.starts_with("https://") debug!(
|| url_for_async.starts_with("http://") "GET operation completed for: {}, success: {}",
{ url_for_blocking,
info!("HTTP(S) GET request: {}", url_for_async); result.is_ok()
execute_get(&url_for_async).await );
} else {
info!("Local file GET request from bucket: {}", url_for_async);
get_from_bucket(&state_for_async, &url_for_async).await
};
// Send the result back through the channel result
let _ = tx.send(result); })
}); });
// Block on receiving the result from the channel. match result {
// This is acceptable because we're in a custom syntax handler. Ok(content) => Ok(Dynamic::from(content)),
let result = match futures::executor::block_on(rx) { Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
Ok(inner) => inner.map_err(|e| { e.to_string().into(),
Box::new(rhai::EvalAltResult::ErrorRuntime( rhai::Position::NONE,
e.to_string().into(), ))),
rhai::Position::NONE, }
))
})?,
Err(_) => {
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"Failed to receive result from async task".into(),
rhai::Position::NONE,
)));
}
};
Ok(Dynamic::from(result))
}) })
.unwrap(); .unwrap();
} }
@ -94,10 +88,20 @@ fn is_safe_path(path: &str) -> bool {
return false; return false;
} }
// Normalize and validate the path doesn't escape // Additional checks for suspicious patterns
if let Ok(normalized) = Path::new(path).canonicalize() { if path.contains("//") || path.contains("~") || path.contains("*") || path.contains("?") {
// If canonicalize succeeds, verify it doesn't contain parent directory references return false;
if normalized.to_string_lossy().contains("..") { }
// For local file paths, ensure they don't try to escape
if !path.starts_with("http") {
let path_obj = Path::new(path);
if path_obj.components().count()
!= path_obj
.components()
.filter(|c| matches!(c, std::path::Component::Normal(_)))
.count()
{
return false; return false;
} }
} }
@ -106,11 +110,13 @@ fn is_safe_path(path: &str) -> bool {
} }
pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Sync>> { pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
log::trace!("Starting execute_get with URL: {}", url); debug!("Starting execute_get with URL: {}", url);
// Build secure HTTP client (removed danger_accept_invalid_certs) // Build secure HTTP client with reasonable timeouts
let client = Client::builder() let client = Client::builder()
.timeout(std::time::Duration::from_secs(30)) .timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.tcp_keepalive(Duration::from_secs(30))
.build() .build()
.map_err(|e| { .map_err(|e| {
error!("Failed to build HTTP client: {}", e); error!("Failed to build HTTP client: {}", e);
@ -125,11 +131,16 @@ pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Syn
// Check response status // Check response status
if !response.status().is_success() { if !response.status().is_success() {
let status = response.status(); let status = response.status();
let error_body = response.text().await.unwrap_or_default();
error!( error!(
"HTTP request returned non-success status for URL {}: {}", "HTTP request returned non-success status for URL {}: {} - {}",
url, status url, status, error_body
); );
return Err(format!("HTTP request failed with status: {}", status).into()); return Err(format!(
"HTTP request failed with status: {} - {}",
status, error_body
)
.into());
} }
let content = response.text().await.map_err(|e| { let content = response.text().await.map_err(|e| {
@ -137,7 +148,7 @@ pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Syn
e e
})?; })?;
log::trace!( debug!(
"Successfully executed GET request for URL: {}, content length: {}", "Successfully executed GET request for URL: {}, content length: {}",
url, url,
content.len() content.len()
@ -149,7 +160,7 @@ pub async fn get_from_bucket(
state: &AppState, state: &AppState,
file_path: &str, file_path: &str,
) -> Result<String, Box<dyn Error + Send + Sync>> { ) -> Result<String, Box<dyn Error + Send + Sync>> {
log::trace!("Getting file from bucket: {}", file_path); debug!("Getting file from bucket: {}", file_path);
// Additional validation for file path // Additional validation for file path
if !is_safe_path(file_path) { if !is_safe_path(file_path) {
@ -159,7 +170,10 @@ pub async fn get_from_bucket(
// Ensure the S3 client is configured // Ensure the S3 client is configured
let s3_client = match &state.s3_client { let s3_client = match &state.s3_client {
Some(client) => client, Some(client) => {
debug!("S3 client is available");
client
}
None => { None => {
error!( error!(
"S3 client not configured when trying to get file: {}", "S3 client not configured when trying to get file: {}",
@ -175,53 +189,110 @@ pub async fn get_from_bucket(
.config .config
.as_ref() .as_ref()
.ok_or_else(|| -> Box<dyn Error + Send + Sync> { .ok_or_else(|| -> Box<dyn Error + Send + Sync> {
error!("App configuration missing");
"App configuration missing".into() "App configuration missing".into()
})?; })?;
let org_prefix = &cfg.minio.org_prefix; let org_prefix = &cfg.minio.org_prefix;
// Validate org_prefix doesn't contain suspicious characters // Validate org_prefix doesn't contain suspicious characters
if org_prefix.contains("..") || org_prefix.contains('/') { if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') {
error!("Invalid org_prefix in configuration: {}", org_prefix); error!("Invalid org_prefix in configuration: {}", org_prefix);
return Err("Invalid organization prefix in configuration".into()); return Err("Invalid organization prefix in configuration".into());
} }
format!("{}.default.gbai", org_prefix) let bucket = format!("{}default.gbai", org_prefix);
debug!("Resolved bucket name: {}", bucket);
bucket
}; };
log::trace!("Using bucket: {} for file: {}", bucket_name, file_path); debug!("Using bucket: {} for file: {}", bucket_name, file_path);
// Perform the S3 GetObject request // Check if bucket exists first (optional but helpful for debugging)
let response = s3_client match s3_client.head_bucket().bucket(&bucket_name).send().await {
Ok(_) => debug!("Bucket exists: {}", bucket_name),
Err(e) => {
error!(
"Bucket does not exist or inaccessible: {} - {}",
bucket_name, e
);
return Err(format!("Bucket inaccessible: {}", e).into());
}
}
// Perform the S3 GetObject request with timeout
let get_object_future = s3_client
.get_object() .get_object()
.bucket(&bucket_name) .bucket(&bucket_name)
.key(file_path) .key(file_path)
.send() .send();
.await
.map_err(|e| { let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await {
Ok(Ok(response)) => {
debug!("S3 GetObject successful for key: {}", file_path);
response
}
Ok(Err(e)) => {
error!( error!(
"S3 get_object failed for bucket {} key {}: {}", "S3 get_object failed for bucket {} key {}: {}",
bucket_name, file_path, e bucket_name, file_path, e
); );
e return Err(format!("S3 operation failed: {}", e).into());
})?; }
Err(_) => {
error!(
"S3 get_object timed out for bucket {} key {}",
bucket_name, file_path
);
return Err("S3 operation timed out".into());
}
};
// Collect the body bytes // Collect the body bytes with timeout
let data = response.body.collect().await.map_err(|e| { let body_future = response.body.collect();
error!( let data = match tokio::time::timeout(Duration::from_secs(30), body_future).await {
"Failed to collect S3 response body for bucket {} key {}: {}", Ok(Ok(data)) => {
bucket_name, file_path, e debug!(
); "Successfully collected S3 response body for key: {}",
e file_path
})?; );
data
}
Ok(Err(e)) => {
error!(
"Failed to collect S3 response body for bucket {} key {}: {}",
bucket_name, file_path, e
);
return Err(format!("Failed to read S3 response: {}", e).into());
}
Err(_) => {
error!(
"Timeout collecting S3 response body for bucket {} key {}",
bucket_name, file_path
);
return Err("Timeout reading S3 response body".into());
}
};
// Handle PDF files specially; otherwise treat as UTF8 text // Handle PDF files specially; otherwise treat as UTF8 text
let bytes = data.into_bytes().to_vec(); let bytes = data.into_bytes().to_vec();
debug!(
"Retrieved {} bytes from S3 for key: {}",
bytes.len(),
file_path
);
let content = if file_path.to_ascii_lowercase().ends_with(".pdf") { let content = if file_path.to_ascii_lowercase().ends_with(".pdf") {
debug!("Processing as PDF file: {}", file_path);
// Extract text from PDF using the `pdf_extract` crate // Extract text from PDF using the `pdf_extract` crate
match pdf_extract::extract_text_from_mem(&bytes) { match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => text, Ok(text) => {
debug!(
"Successfully extracted text from PDF, length: {}",
text.len()
);
text
}
Err(e) => { Err(e) => {
error!( error!(
"Failed to extract text from PDF for bucket {} key {}: {}", "Failed to extract text from PDF for bucket {} key {}: {}",
@ -231,17 +302,25 @@ pub async fn get_from_bucket(
} }
} }
} else { } else {
debug!("Processing as text file: {}", file_path);
// Convert bytes to a UTF8 String // Convert bytes to a UTF8 String
String::from_utf8(bytes).map_err(|e| { match String::from_utf8(bytes) {
error!( Ok(text) => {
"Failed to convert S3 response to UTF-8 for bucket {} key {}: {}", debug!("Successfully converted to UTF-8, length: {}", text.len());
bucket_name, file_path, e text
); }
e Err(e) => {
})? error!(
"Failed to convert S3 response to UTF-8 for bucket {} key {}: {}",
bucket_name, file_path, e
);
// If it's not valid UTF-8, return as base64 or error
return Err("File content is not valid UTF-8 text".into());
}
}
}; };
log::trace!( info!(
"Successfully retrieved file from bucket: {}, content length: {}", "Successfully retrieved file from bucket: {}, content length: {}",
file_path, file_path,
content.len() content.len()

View file

@ -1 +1,7 @@
TALK "Olá, estou preparando um resumo para você." TALK "Olá, pode me perguntar sobre qualquer coisa..."
text = GET "default.gbdrive/default.pdf"
resume = LLM "Say Hello and present a a resume from " + text
TALK resume
SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: " + text