From ff89298e61597cad23002b23a155348fcb8091f3 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 15 Oct 2025 21:18:01 -0300 Subject: [PATCH] Refactor async GET and LLM, add connection UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Execute GET requests in a dedicated thread with its own Tokio runtime, add timeout handling and clearer error messages. - Tighten `is_safe_path` checks and simplify HTTP/S3 logic. - Change `llm_keyword` to accept `Arc`, add prompt builder, run LLM generation in an isolated thread with timeout. - Update keyword registration call in `basic/mod.rs`. - Convert template script to use `let` declarations and return a boolean. - Introduce connection‑status indicator in the web UI with styles, automatic reconnection attempts, and proper WS/WSS handling for voice. --- prompts/dev/keywords.md | 1 + src/basic/keywords/get.rs | 172 ++++++------------ src/basic/keywords/llm_keyword.rs | 79 +++++++- src/basic/mod.rs | 2 +- .../annoucements.gbdialog/start.bas | 6 +- web/index.html | 141 ++++++++++++-- 6 files changed, 256 insertions(+), 145 deletions(-) create mode 100644 prompts/dev/keywords.md diff --git a/prompts/dev/keywords.md b/prompts/dev/keywords.md new file mode 100644 index 00000000..ec4b559c --- /dev/null +++ b/prompts/dev/keywords.md @@ -0,0 +1 @@ +- When generating new Rhai keyword handlers in Rust, always design them to be thread-safe and fully compatible with async execution. Use Arc for shared state, perform heavy or async operations with tokio::task::block_in_place and Handle::current().block_on, and return results as rhai::Dynamic to maintain safe cross-thread communication between Rust and the Rhai engine. diff --git a/src/basic/keywords/get.rs b/src/basic/keywords/get.rs index 3cbc463c..a83d4219 100644 --- a/src/basic/keywords/get.rs +++ b/src/basic/keywords/get.rs @@ -13,13 +13,11 @@ pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine engine .register_custom_syntax(&["GET", "$expr$"], false, move |context, inputs| { - // Evaluate the URL expression let url = context.eval_expression_tree(&inputs[0])?; let url_str = url.to_string(); info!("GET command executed: {}", url_str); - // Enhanced security check for path traversal if !is_safe_path(&url_str) { return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( "URL contains invalid or unsafe path sequences".into(), @@ -30,37 +28,49 @@ pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine let state_for_blocking = Arc::clone(&state_clone); let url_for_blocking = url_str.clone(); - // Use spawn_blocking for synchronous execution of async operations - let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - debug!("Starting GET operation: {}", url_for_blocking); + // ---- fixed section: spawn on separate thread runtime ---- + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); - let result = if url_for_blocking.starts_with("https://") - || url_for_blocking.starts_with("http://") - { - info!("HTTP(S) GET request: {}", url_for_blocking); - execute_get(&url_for_blocking).await - } else { - info!("Local file GET request from bucket: {}", url_for_blocking); - get_from_bucket(&state_for_blocking, &url_for_blocking).await - }; + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + if url_for_blocking.starts_with("https://") + || url_for_blocking.starts_with("http://") + { + info!("HTTP(S) GET request: {}", url_for_blocking); + execute_get(&url_for_blocking).await + } else { + info!("Local file GET request from bucket: {}", url_for_blocking); + get_from_bucket(&state_for_blocking, &url_for_blocking).await + } + }); + tx.send(result).err() + } else { + tx.send(Err("failed to build tokio runtime".into())).err() + }; - debug!( - "GET operation completed for: {}, success: {}", - url_for_blocking, - result.is_ok() - ); - - result - }) + if send_err.is_some() { + error!("Failed to send result from thread"); + } }); - match result { - Ok(content) => Ok(Dynamic::from(content)), - Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + match rx.recv_timeout(std::time::Duration::from_secs(40)) { + Ok(Ok(content)) => Ok(Dynamic::from(content)), + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( e.to_string().into(), rhai::Position::NONE, ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(Box::new( + rhai::EvalAltResult::ErrorRuntime("GET timed out".into(), rhai::Position::NONE), + )), + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("GET failed: {e}").into(), + rhai::Position::NONE, + ))), } }) .unwrap(); @@ -68,32 +78,18 @@ pub fn get_keyword(state: Arc, _user: UserSession, engine: &mut Engine /// Enhanced security check for path traversal and unsafe paths fn is_safe_path(path: &str) -> bool { - // Allow full URLs if path.starts_with("https://") || path.starts_with("http://") { return true; } - - // Check for various path traversal patterns - if path.contains("..") { + if path.contains("..") || path.starts_with('/') { return false; } - - // Reject absolute paths (starting with /) - if path.starts_with('/') { - return false; - } - - // Reject Windows-style absolute paths if path.len() >= 2 && path.chars().nth(1) == Some(':') { return false; } - - // Additional checks for suspicious patterns if path.contains("//") || path.contains("~") || path.contains("*") || path.contains("?") { return false; } - - // For local file paths, ensure they don't try to escape if !path.starts_with("http") { let path_obj = Path::new(path); if path_obj.components().count() @@ -105,14 +101,12 @@ fn is_safe_path(path: &str) -> bool { return false; } } - true } pub async fn execute_get(url: &str) -> Result> { debug!("Starting execute_get with URL: {}", url); - // Build secure HTTP client with reasonable timeouts let client = Client::builder() .timeout(Duration::from_secs(30)) .connect_timeout(Duration::from_secs(10)) @@ -128,7 +122,6 @@ pub async fn execute_get(url: &str) -> Result Result> { debug!("Getting file from bucket: {}", file_path); - // Additional validation for file path if !is_safe_path(file_path) { error!("Unsafe file path detected: {}", file_path); return Err("Invalid file path".into()); } - // Ensure the S3 client is configured let s3_client = match &state.s3_client { - Some(client) => { - debug!("S3 client is available"); - client - } + Some(client) => client, None => { - error!( - "S3 client not configured when trying to get file: {}", - file_path - ); + error!("S3 client not configured"); return Err("S3 client not configured".into()); } }; - // Resolve the bucket name safely, handling missing configuration values let bucket_name = { let cfg = state .config @@ -195,10 +179,9 @@ pub async fn get_from_bucket( let org_prefix = &cfg.minio.org_prefix; - // Validate org_prefix doesn't contain suspicious characters if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') { - error!("Invalid org_prefix in configuration: {}", org_prefix); - return Err("Invalid organization prefix in configuration".into()); + error!("Invalid org_prefix: {}", org_prefix); + return Err("Invalid organization prefix".into()); } let bucket = format!("{}default.gbai", org_prefix); @@ -206,21 +189,14 @@ pub async fn get_from_bucket( bucket }; - debug!("Using bucket: {} for file: {}", bucket_name, file_path); - - // Check if bucket exists first (optional but helpful for debugging) match s3_client.head_bucket().bucket(&bucket_name).send().await { Ok(_) => debug!("Bucket exists: {}", bucket_name), Err(e) => { - error!( - "Bucket does not exist or inaccessible: {} - {}", - bucket_name, e - ); + error!("Bucket inaccessible: {} - {}", bucket_name, e); return Err(format!("Bucket inaccessible: {}", e).into()); } } - // Perform the S3 GetObject request with timeout let get_object_future = s3_client .get_object() .bucket(&bucket_name) @@ -228,53 +204,30 @@ pub async fn get_from_bucket( .send(); let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await { - Ok(Ok(response)) => { - debug!("S3 GetObject successful for key: {}", file_path); - response - } + Ok(Ok(response)) => response, Ok(Err(e)) => { - error!( - "S3 get_object failed for bucket {} key {}: {}", - bucket_name, file_path, e - ); + error!("S3 get_object failed: {}", e); return Err(format!("S3 operation failed: {}", e).into()); } Err(_) => { - error!( - "S3 get_object timed out for bucket {} key {}", - bucket_name, file_path - ); + error!("S3 get_object timed out"); return Err("S3 operation timed out".into()); } }; - // Collect the body bytes with timeout let body_future = response.body.collect(); let data = match tokio::time::timeout(Duration::from_secs(30), body_future).await { - Ok(Ok(data)) => { - debug!( - "Successfully collected S3 response body for key: {}", - file_path - ); - data - } + Ok(Ok(data)) => data, Ok(Err(e)) => { - error!( - "Failed to collect S3 response body for bucket {} key {}: {}", - bucket_name, file_path, e - ); + error!("Failed to collect S3 response body: {}", e); return Err(format!("Failed to read S3 response: {}", e).into()); } Err(_) => { - error!( - "Timeout collecting S3 response body for bucket {} key {}", - bucket_name, file_path - ); + error!("Timeout collecting S3 response body"); return Err("Timeout reading S3 response body".into()); } }; - // Handle PDF files specially; otherwise treat as UTF‑8 text let bytes = data.into_bytes().to_vec(); debug!( "Retrieved {} bytes from S3 for key: {}", @@ -284,37 +237,18 @@ pub async fn get_from_bucket( let content = if file_path.to_ascii_lowercase().ends_with(".pdf") { debug!("Processing as PDF file: {}", file_path); - // Extract text from PDF using the `pdf_extract` crate match pdf_extract::extract_text_from_mem(&bytes) { - Ok(text) => { - debug!( - "Successfully extracted text from PDF, length: {}", - text.len() - ); - text - } + Ok(text) => text, Err(e) => { - error!( - "Failed to extract text from PDF for bucket {} key {}: {}", - bucket_name, file_path, e - ); + error!("PDF extraction failed: {}", e); return Err(format!("PDF extraction failed: {}", e).into()); } } } else { - debug!("Processing as text file: {}", file_path); - // Convert bytes to a UTF‑8 String match String::from_utf8(bytes) { - Ok(text) => { - debug!("Successfully converted to UTF-8, length: {}", text.len()); - text - } - Err(e) => { - error!( - "Failed to convert S3 response to UTF-8 for bucket {} key {}: {}", - bucket_name, file_path, e - ); - // If it's not valid UTF-8, return as base64 or error + Ok(text) => text, + Err(_) => { + error!("File content is not valid UTF-8 text"); return Err("File content is not valid UTF-8 text".into()); } } diff --git a/src/basic/keywords/llm_keyword.rs b/src/basic/keywords/llm_keyword.rs index 3a512b5b..6a739289 100644 --- a/src/basic/keywords/llm_keyword.rs +++ b/src/basic/keywords/llm_keyword.rs @@ -1,10 +1,17 @@ use crate::shared::models::UserSession; use crate::shared::state::AppState; -use log::info; +use log::{error, info}; use rhai::{Dynamic, Engine}; +use std::sync::Arc; +use std::time::Duration; -pub fn llm_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) { - let state_clone = state.clone(); +/// Registers the `LLM` keyword for Rhai scripts. +/// Example usage inside Rhai: +/// ```rhai +/// result = LLM "Summarize the following text about AI:"; +/// ``` +pub fn llm_keyword(state: Arc, _user: UserSession, engine: &mut Engine) { + let state_clone = Arc::clone(&state); engine .register_custom_syntax(&["LLM", "$expr$"], false, move |context, inputs| { @@ -12,20 +19,72 @@ pub fn llm_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) { info!("LLM processing text: {}", text); - let state_inner = state_clone.clone(); - let fut = execute_llm_generation(state_inner, text); + let state_for_thread = Arc::clone(&state_clone); + let prompt = build_llm_prompt(&text); - let result = - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) - .map_err(|e| format!("LLM generation failed: {}", e))?; + // ---- safe runtime isolation: no deadlocks possible ---- + let (tx, rx) = std::sync::mpsc::channel(); - Ok(Dynamic::from(result)) + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + execute_llm_generation(state_for_thread, prompt).await + }); + tx.send(result).err() + } else { + tx.send(Err("failed to build tokio runtime".into())).err() + }; + + if send_err.is_some() { + error!("Failed to send LLM thread result"); + } + }); + + match rx.recv_timeout(Duration::from_secs(60)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + e.to_string().into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "LLM generation timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("LLM thread failed: {e}").into(), + rhai::Position::NONE, + ))), + } }) .unwrap(); } +/// Builds a consistent LLM prompt used by all Rhai scripts. +/// You can change the style/structure here to guide the model's behavior. +fn build_llm_prompt(user_text: &str) -> String { + format!( + "You are a AI assistant in form of KEYWORD called LLM + running inside a General Bots BASIC environment. +Task: Process and respond concisely to the following call to x = LLM 'prompt' syntax. +--- +User Input: +{} +--- +Respond clearly and accurately in the same language as the input.", + user_text.trim() + ) +} + +/// Runs the async LLM provider call safely. pub async fn execute_llm_generation( - state: AppState, + state: Arc, prompt: String, ) -> Result> { info!("Starting LLM generation for prompt: '{}'", prompt); diff --git a/src/basic/mod.rs b/src/basic/mod.rs index bd75f450..53e6cc9b 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -51,7 +51,7 @@ impl ScriptService { first_keyword(&mut engine); last_keyword(&mut engine); format_keyword(&mut engine); - llm_keyword(&state, user.clone(), &mut engine); + llm_keyword(state.clone(), user.clone(), &mut engine); get_keyword(state.clone(), user.clone(), &mut engine); set_keyword(&state, user.clone(), &mut engine); wait_keyword(&state, user.clone(), &mut engine); diff --git a/templates/annoucements.gbai/annoucements.gbdialog/start.bas b/templates/annoucements.gbai/annoucements.gbdialog/start.bas index b5ea80d6..82782c37 100644 --- a/templates/annoucements.gbai/annoucements.gbdialog/start.bas +++ b/templates/annoucements.gbai/annoucements.gbdialog/start.bas @@ -1,7 +1,9 @@ TALK "Olá, pode me perguntar sobre qualquer coisa..." -text = GET "default.gbdrive/default.pdf" -resume = LLM "Say Hello and present a a resume from " + text +let text = GET "default.gbdrive/default.pdf" +let resume = LLM "Say Hello and present a a resume from " + text TALK resume SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: " + text + +return true; diff --git a/web/index.html b/web/index.html index 29ceb04e..28547054 100644 --- a/web/index.html +++ b/web/index.html @@ -82,7 +82,7 @@ border-radius: 8px; color: #ffd700; cursor: pointer; - margin-top: 60px; /* Moved down to create space */ + margin-top: 60px; margin-bottom: 15px; transition: all 0.3s ease; font-weight: 600; @@ -436,6 +436,36 @@ box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2); } + .connection-status { + position: fixed; + top: 10px; + right: 10px; + padding: 8px 12px; + border-radius: 20px; + font-size: 12px; + font-weight: 600; + z-index: 1000; + transition: all 0.3s ease; + } + + .connection-status.connected { + background: rgba(100, 255, 100, 0.2); + border: 1px solid rgba(100, 255, 100, 0.4); + color: #90ff90; + } + + .connection-status.disconnected { + background: rgba(255, 100, 100, 0.2); + border: 1px solid rgba(255, 100, 100, 0.4); + color: #ff9090; + } + + .connection-status.connecting { + background: rgba(255, 215, 0, 0.2); + border: 1px solid rgba(255, 215, 0, 0.4); + color: #ffd700; + } + /* Markdown Styles */ .markdown-content h1, .markdown-content h2, @@ -616,6 +646,13 @@ height: 60px; font-size: 36px; } + + .connection-status { + top: 5px; + right: 5px; + font-size: 10px; + padding: 6px 10px; + } } @media (max-width: 480px) { @@ -651,6 +688,10 @@ +
+ 🔄 Conectando... +
+