diff --git a/Cargo.lock b/Cargo.lock index 707981a88..f12582599 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,6 +349,19 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "atom_syndication" +version = "0.12.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2f68d23e2cb4fd958c705b91a6b4c80ceeaf27a9e11651272a8389d5ce1a4a3" +dependencies = [ + "chrono", + "derive_builder 0.20.2", + "diligent-date-parser", + "never", + "quick-xml 0.37.5", +] + [[package]] name = "atomic" version = "0.6.1" @@ -1137,11 +1150,13 @@ dependencies = [ "reqwest", "rhai", "ring", + "rss", "rust_xlsxwriter", "rustls 0.23.35", "rustls-native-certs 0.6.3", "rustls-pemfile 2.2.0", "scopeguard", + "scraper", "serde", "serde_json", "sha2", @@ -1254,7 +1269,7 @@ dependencies = [ "codepage", "encoding_rs", "log", - "quick-xml", + "quick-xml 0.31.0", "serde", "zip 2.4.2", ] @@ -1650,7 +1665,7 @@ checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" dependencies = [ "bitflags 2.10.0", "crossterm_winapi", - "derive_more", + "derive_more 2.1.0", "document-features", "mio", "parking_lot", @@ -1718,6 +1733,29 @@ dependencies = [ "phf", ] +[[package]] +name = "cssparser" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c66d1cd8ed61bf80b38432613a7a2f09401ab8d0501110655f8b341484a3e3" +dependencies = [ + "cssparser-macros", + "dtoa-short", + "itoa", + "phf", + "smallvec", +] + +[[package]] +name = "cssparser-macros" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331" +dependencies = [ + "quote", + "syn 2.0.111", +] + [[package]] name = "csv" version = "1.4.0" @@ -2053,6 +2091,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "derive_more" +version = "0.99.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6edb4b64a43d977b8e99788fe3a04d483834fba1215a7e02caa415b626497f7f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "derive_more" version = "2.1.0" @@ -2137,6 +2186,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "diligent-date-parser" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ede7d79366f419921e2e2f67889c12125726692a313bffb474bd5f37a581e9" +dependencies = [ + "chrono", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -2196,6 +2254,21 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "dtoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" + +[[package]] +name = "dtoa-short" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd1511a7b6a56299bd043a9c167a6d2bfb37bf84a6dfceaba651168adfb43c87" +dependencies = [ + "dtoa", +] + [[package]] name = "dunce" version = "1.0.5" @@ -2223,6 +2296,12 @@ dependencies = [ "signature", ] +[[package]] +name = "ego-tree" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2972feb8dffe7bc8c5463b1dacda1b0dfbed3710e50f977d965429692d74cd8" + [[package]] name = "either" version = "1.15.0" @@ -2522,6 +2601,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "futf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df420e2e84819663797d1ec6544b13c5be84629e7bb00dc960d6917db2987843" +dependencies = [ + "mac", + "new_debug_unreachable", +] + [[package]] name = "futures" version = "0.3.31" @@ -2617,6 +2706,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2627,6 +2725,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "getopts" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df" +dependencies = [ + "unicode-width", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -2824,6 +2931,18 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "html5ever" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b7410cae13cbc75623c98ac4cbfd1f0bedddf3227afc24f370cf0f50a44a11c" +dependencies = [ + "log", + "mac", + "markup5ever", + "match_token", +] + [[package]] name = "http" version = "0.2.12" @@ -3757,6 +3876,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "mac" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" + [[package]] name = "mac_address" version = "1.1.8" @@ -3778,6 +3903,31 @@ dependencies = [ "quoted_printable", ] +[[package]] +name = "markup5ever" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7a7213d12e1864c0f002f52c2923d4556935a43dec5e71355c2760e0f6e7a18" +dependencies = [ + "log", + "phf", + "phf_codegen", + "string_cache", + "string_cache_codegen", + "tendril", +] + +[[package]] +name = "match_token" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a9689d8d44bf9964484516275f5cd4c9b59457a6940c1d5d0ecbb94510a36b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "matchit" version = "0.7.3" @@ -3951,6 +4101,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" + +[[package]] +name = "new_debug_unreachable" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" + [[package]] name = "nix" version = "0.29.0" @@ -4666,6 +4828,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "precomputed-hash" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" + [[package]] name = "prettyplease" version = "0.2.37" @@ -4846,6 +5014,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "encoding_rs", + "memchr", +] + [[package]] name = "quinn" version = "0.11.9" @@ -5276,6 +5454,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rss" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2107738f003660f0a91f56fd3e3bd3ab5d918b2ddaf1e1ec2136fb1c46f71bf" +dependencies = [ + "atom_syndication", + "derive_builder 0.20.2", + "never", + "quick-xml 0.37.5", +] + [[package]] name = "rust_xlsxwriter" version = "0.79.4" @@ -5509,6 +5699,21 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scraper" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc3d051b884f40e309de6c149734eab57aa8cc1347992710dc80bcc1c2194c15" +dependencies = [ + "cssparser", + "ego-tree", + "getopts", + "html5ever", + "precomputed-hash", + "selectors", + "tendril", +] + [[package]] name = "scratch" version = "1.0.9" @@ -5575,6 +5780,25 @@ dependencies = [ "libc", ] +[[package]] +name = "selectors" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd568a4c9bb598e291a08244a5c1f5a8a6650bee243b5b0f8dbb3d9cc1d87fe8" +dependencies = [ + "bitflags 2.10.0", + "cssparser", + "derive_more 0.99.20", + "fxhash", + "log", + "new_debug_unreachable", + "phf", + "phf_codegen", + "precomputed-hash", + "servo_arc", + "smallvec", +] + [[package]] name = "semver" version = "1.0.27" @@ -5665,6 +5889,15 @@ dependencies = [ "serde", ] +[[package]] +name = "servo_arc" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "170fb83ab34de17dc69aa7c67482b22218ddb85da56546f9bd6b929e32a05930" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "sha1" version = "0.10.6" @@ -5877,6 +6110,31 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "string_cache" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" +dependencies = [ + "new_debug_unreachable", + "parking_lot", + "phf_shared", + "precomputed-hash", + "serde", +] + +[[package]] +name = "string_cache_codegen" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c711928715f1fe0fe509c53b43e993a9a557babc2d0a3567d0a3006f1ac931a0" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro2", + "quote", +] + [[package]] name = "stringprep" version = "0.1.5" @@ -6040,6 +6298,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tendril" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d24a120c5fc464a3458240ee02c299ebcb9d67b5249c8848b09d639dca8d7bb0" +dependencies = [ + "futf", + "mac", + "utf-8", +] + [[package]] name = "termcolor" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 59e0053e9..46c854226 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -230,6 +230,12 @@ figment = { version = "0.10", features = ["toml", "env", "json"] } # Rate limiting governor = "0.10" +# RSS feed parsing +rss = "2.0" + +# HTML parsing/web scraping +scraper = "0.22" + [dev-dependencies] mockito = "1.7.0" tempfile = "3" diff --git a/config/directory_config.json b/config/directory_config.json index f77f57cec..04092eb68 100644 --- a/config/directory_config.json +++ b/config/directory_config.json @@ -1,7 +1,7 @@ { "base_url": "http://localhost:8080", "default_org": { - "id": "350450804827619342", + "id": "350485067862114318", "name": "default", "domain": "default.localhost" }, @@ -13,8 +13,8 @@ "first_name": "Admin", "last_name": "User" }, - "admin_token": "zBxbF2StNdIRTSPJI1q3ND7NFNeFQK5qoB8aS69bQSN3bwLvVi3jxTVFnEVDGur4kaltPgc", + "admin_token": "UC3h1KMCk9gqQJLBQHdb_ra0nxqABNfGobIesD1q1tAddGiMwQPHGfXgR6H1DLmyyrdy1WU", "project_id": "", - "client_id": "350450809542082574", - "client_secret": "bgUNSAXTzOwaZnyatVgWceBTLtQrySQc8BGrJb7sT1hMOeiKAtWwD7638fg7biRq" + "client_id": "350485068449382414", + "client_secret": "wll0Tiy8cZoliqOAdFbtLspgDM2lDA2pAoH8LBVQx5iESOJNcwoyDBU83ly5C5YK" } \ No newline at end of file diff --git a/src/basic/keywords/ai_tools.rs b/src/basic/keywords/ai_tools.rs new file mode 100644 index 000000000..51b41b27e --- /dev/null +++ b/src/basic/keywords/ai_tools.rs @@ -0,0 +1,367 @@ +use crate::shared::models::UserSession; +use crate::shared::state::AppState; +use log::{debug, trace}; +use rhai::{Dynamic, Engine, EvalAltResult, Map, Position}; +use std::sync::Arc; +use std::time::Duration; + +pub fn register_ai_tools_keywords(state: Arc, user: UserSession, engine: &mut Engine) { + register_translate_keyword(state.clone(), user.clone(), engine); + register_ocr_keyword(state.clone(), user.clone(), engine); + register_sentiment_keyword(state.clone(), user.clone(), engine); + register_classify_keyword(state, user, engine); +} + +fn register_translate_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["TRANSLATE", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let text = context.eval_expression_tree(&inputs[0])?.to_string(); + let target_lang = context.eval_expression_tree(&inputs[1])?.to_string(); + trace!("TRANSLATE to {}", target_lang); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { translate_text(&text, &target_lang).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(60)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("TRANSLATE failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "TRANSLATE timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered TRANSLATE keyword"); +} + +fn register_ocr_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax(&["OCR", "$expr$"], false, move |context, inputs| { + let image_path = context.eval_expression_tree(&inputs[0])?.to_string(); + trace!("OCR {}", image_path); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { perform_ocr(&image_path).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(60)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("OCR failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "OCR timed out".into(), + Position::NONE, + ))), + } + }) + .unwrap(); + + debug!("Registered OCR keyword"); +} + +fn register_sentiment_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax(&["SENTIMENT", "$expr$"], false, move |context, inputs| { + let text = context.eval_expression_tree(&inputs[0])?.to_string(); + trace!("SENTIMENT analysis"); + let (tx, rx) = std::sync::mpsc::channel(); + let text_clone = text.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { analyze_sentiment(&text_clone).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(result), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SENTIMENT failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Ok(analyze_sentiment_quick(&text)), + } + }) + .unwrap(); + + engine.register_fn("SENTIMENT_QUICK", |text: &str| -> Dynamic { + analyze_sentiment_quick(text) + }); + + debug!("Registered SENTIMENT keyword"); +} + +fn register_classify_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["CLASSIFY", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let text = context.eval_expression_tree(&inputs[0])?.to_string(); + let categories = context.eval_expression_tree(&inputs[1])?; + trace!("CLASSIFY into categories"); + let cat_list: Vec = if categories.is_array() { + categories + .into_array() + .unwrap_or_default() + .into_iter() + .filter_map(|c| c.into_string().ok()) + .collect() + } else { + categories + .into_string() + .unwrap_or_default() + .split(',') + .map(|s| s.trim().to_string()) + .collect() + }; + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { classify_text(&text, &cat_list).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(result), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("CLASSIFY failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "CLASSIFY timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered CLASSIFY keyword"); +} + +async fn translate_text( + text: &str, + target_lang: &str, +) -> Result> { + let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); + let prompt = format!( + "Translate to {}. Return ONLY the translation:\n\n{}", + target_lang, text + ); + let client = reqwest::Client::new(); + let response = client + .post(format!("{}/v1/chat/completions", llm_url)) + .json(&serde_json::json!({ + "model": "default", + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1 + })) + .send() + .await? + .json::() + .await?; + if let Some(content) = response["choices"][0]["message"]["content"].as_str() { + return Ok(content.trim().to_string()); + } + Ok(text.to_string()) +} + +async fn perform_ocr(image_path: &str) -> Result> { + let botmodels_url = + std::env::var("BOTMODELS_URL").unwrap_or_else(|_| "http://localhost:8001".to_string()); + let client = reqwest::Client::new(); + let image_data = if image_path.starts_with("http") { + client.get(image_path).send().await?.bytes().await?.to_vec() + } else { + std::fs::read(image_path)? + }; + let response = client + .post(format!("{}/ocr", botmodels_url)) + .header("Content-Type", "application/octet-stream") + .body(image_data) + .send() + .await? + .json::() + .await?; + if let Some(text) = response["text"].as_str() { + return Ok(text.to_string()); + } + Ok(String::new()) +} + +async fn analyze_sentiment( + text: &str, +) -> Result> { + let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); + let prompt = format!( + r#"Analyze sentiment. Return JSON only: +{{"sentiment":"positive/negative/neutral","score":-100 to 100,"urgent":true/false}} + +Text: {}"#, + text + ); + let client = reqwest::Client::new(); + let response = client + .post(format!("{}/v1/chat/completions", llm_url)) + .json(&serde_json::json!({ + "model": "default", + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1 + })) + .send() + .await? + .json::() + .await?; + if let Some(content) = response["choices"][0]["message"]["content"].as_str() { + if let Ok(parsed) = serde_json::from_str::(content) { + let mut result = Map::new(); + result.insert( + "sentiment".into(), + Dynamic::from( + parsed["sentiment"] + .as_str() + .unwrap_or("neutral") + .to_string(), + ), + ); + result.insert( + "score".into(), + Dynamic::from(parsed["score"].as_i64().unwrap_or(0)), + ); + result.insert( + "urgent".into(), + Dynamic::from(parsed["urgent"].as_bool().unwrap_or(false)), + ); + return Ok(Dynamic::from(result)); + } + } + Ok(analyze_sentiment_quick(text)) +} + +fn analyze_sentiment_quick(text: &str) -> Dynamic { + let text_lower = text.to_lowercase(); + let positive = [ + "good", + "great", + "excellent", + "amazing", + "wonderful", + "love", + "happy", + "thank", + "thanks", + "awesome", + "perfect", + "best", + ]; + let negative = [ + "bad", + "terrible", + "awful", + "horrible", + "hate", + "angry", + "frustrated", + "disappointed", + "worst", + "broken", + "fail", + "problem", + ]; + let urgent = [ + "urgent", + "asap", + "immediately", + "emergency", + "critical", + "help", + ]; + let pos_count = positive.iter().filter(|w| text_lower.contains(*w)).count(); + let neg_count = negative.iter().filter(|w| text_lower.contains(*w)).count(); + let is_urgent = urgent.iter().any(|w| text_lower.contains(*w)); + let sentiment = if pos_count > neg_count { + "positive" + } else if neg_count > pos_count { + "negative" + } else { + "neutral" + }; + let score = ((pos_count as i64 - neg_count as i64) * 100) + / (pos_count as i64 + neg_count as i64 + 1).max(1); + let mut result = Map::new(); + result.insert("sentiment".into(), Dynamic::from(sentiment.to_string())); + result.insert("score".into(), Dynamic::from(score)); + result.insert("urgent".into(), Dynamic::from(is_urgent)); + Dynamic::from(result) +} + +async fn classify_text( + text: &str, + categories: &[String], +) -> Result> { + let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string()); + let cats = categories.join(", "); + let prompt = format!( + r#"Classify into one of: {} +Return JSON: {{"category":"chosen_category","confidence":0.0-1.0}} + +Text: {}"#, + cats, text + ); + let client = reqwest::Client::new(); + let response = client + .post(format!("{}/v1/chat/completions", llm_url)) + .json(&serde_json::json!({ + "model": "default", + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.1 + })) + .send() + .await? + .json::() + .await?; + if let Some(content) = response["choices"][0]["message"]["content"].as_str() { + if let Ok(parsed) = serde_json::from_str::(content) { + let mut result = Map::new(); + result.insert( + "category".into(), + Dynamic::from( + parsed["category"] + .as_str() + .unwrap_or(categories.first().map(|s| s.as_str()).unwrap_or("unknown")) + .to_string(), + ), + ); + result.insert( + "confidence".into(), + Dynamic::from(parsed["confidence"].as_f64().unwrap_or(0.5)), + ); + return Ok(Dynamic::from(result)); + } + } + let mut result = Map::new(); + result.insert( + "category".into(), + Dynamic::from( + categories + .first() + .map(|s| s.as_str()) + .unwrap_or("unknown") + .to_string(), + ), + ); + result.insert("confidence".into(), Dynamic::from(0.0_f64)); + Ok(Dynamic::from(result)) +} diff --git a/src/basic/keywords/arrays/mod.rs b/src/basic/keywords/arrays/mod.rs index e7a429eeb..b0acfbff1 100644 --- a/src/basic/keywords/arrays/mod.rs +++ b/src/basic/keywords/arrays/mod.rs @@ -230,6 +230,67 @@ fn register_utility_functions(engine: &mut Engine) { (0..count).map(|_| value.clone()).collect() }); + // BATCH - split array into batches of specified size + engine.register_fn("BATCH", |arr: Array, batch_size: i64| -> Array { + let size = batch_size.max(1) as usize; + arr.chunks(size) + .map(|chunk| Dynamic::from(chunk.to_vec())) + .collect() + }); + engine.register_fn("batch", |arr: Array, batch_size: i64| -> Array { + let size = batch_size.max(1) as usize; + arr.chunks(size) + .map(|chunk| Dynamic::from(chunk.to_vec())) + .collect() + }); + + // CHUNK - alias for BATCH + engine.register_fn("CHUNK", |arr: Array, chunk_size: i64| -> Array { + let size = chunk_size.max(1) as usize; + arr.chunks(size) + .map(|chunk| Dynamic::from(chunk.to_vec())) + .collect() + }); + engine.register_fn("chunk", |arr: Array, chunk_size: i64| -> Array { + let size = chunk_size.max(1) as usize; + arr.chunks(size) + .map(|chunk| Dynamic::from(chunk.to_vec())) + .collect() + }); + + // TAKE - take first N elements + engine.register_fn("TAKE", |arr: Array, n: i64| -> Array { + arr.into_iter().take(n.max(0) as usize).collect() + }); + engine.register_fn("take", |arr: Array, n: i64| -> Array { + arr.into_iter().take(n.max(0) as usize).collect() + }); + + // DROP - drop first N elements + engine.register_fn("DROP", |arr: Array, n: i64| -> Array { + arr.into_iter().skip(n.max(0) as usize).collect() + }); + engine.register_fn("drop", |arr: Array, n: i64| -> Array { + arr.into_iter().skip(n.max(0) as usize).collect() + }); + + // HEAD - alias for TAKE + engine.register_fn("HEAD", |arr: Array, n: i64| -> Array { + arr.into_iter().take(n.max(0) as usize).collect() + }); + + // TAIL - get last N elements + engine.register_fn("TAIL", |arr: Array, n: i64| -> Array { + let len = arr.len(); + let skip = len.saturating_sub(n.max(0) as usize); + arr.into_iter().skip(skip).collect() + }); + engine.register_fn("tail", |arr: Array, n: i64| -> Array { + let len = arr.len(); + let skip = len.saturating_sub(n.max(0) as usize); + arr.into_iter().skip(skip).collect() + }); + debug!("Registered array utility functions"); } diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index 22ac6ef19..c67ca911e 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -3,6 +3,7 @@ pub mod add_bot; pub mod add_member; pub mod add_suggestion; pub mod agent_reflection; +pub mod ai_tools; pub mod api_tool_generator; pub mod arrays; pub mod book; @@ -68,4 +69,5 @@ pub mod user_memory; pub mod validation; pub mod wait; pub mod weather; +pub mod web_data; pub mod webhook; diff --git a/src/basic/keywords/sms.rs b/src/basic/keywords/sms.rs index ebd409d2f..8c252c20c 100644 --- a/src/basic/keywords/sms.rs +++ b/src/basic/keywords/sms.rs @@ -59,6 +59,43 @@ pub enum SmsProvider { Custom(String), } +/// SMS Priority levels +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum SmsPriority { + Low, + Normal, + High, + Urgent, +} + +impl Default for SmsPriority { + fn default() -> Self { + SmsPriority::Normal + } +} + +impl From<&str> for SmsPriority { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "low" => SmsPriority::Low, + "high" => SmsPriority::High, + "urgent" | "critical" => SmsPriority::Urgent, + _ => SmsPriority::Normal, + } + } +} + +impl std::fmt::Display for SmsPriority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SmsPriority::Low => write!(f, "low"), + SmsPriority::Normal => write!(f, "normal"), + SmsPriority::High => write!(f, "high"), + SmsPriority::Urgent => write!(f, "urgent"), + } + } +} + impl From<&str> for SmsProvider { fn from(s: &str) -> Self { match s.to_lowercase().as_str() { @@ -78,13 +115,15 @@ pub struct SmsSendResult { pub message_id: Option, pub provider: String, pub to: String, + pub priority: String, pub error: Option, } /// Register SMS keywords pub fn register_sms_keywords(state: Arc, user: UserSession, engine: &mut Engine) { register_send_sms_keyword(state.clone(), user.clone(), engine); - register_send_sms_with_provider_keyword(state, user, engine); + register_send_sms_with_third_arg_keyword(state.clone(), user.clone(), engine); + register_send_sms_full_keyword(state, user, engine); } /// SEND_SMS phone, message @@ -122,6 +161,7 @@ pub fn register_send_sms_keyword(state: Arc, user: UserSession, engine &phone, &message, None, + None, ) .await }); @@ -145,6 +185,7 @@ pub fn register_send_sms_keyword(state: Arc, user: UserSession, engine ); map.insert("provider".into(), Dynamic::from(result.provider)); map.insert("to".into(), Dynamic::from(result.to)); + map.insert("priority".into(), Dynamic::from(result.priority)); if let Some(err) = result.error { map.insert("error".into(), Dynamic::from(err)); } @@ -170,9 +211,10 @@ pub fn register_send_sms_keyword(state: Arc, user: UserSession, engine .unwrap(); } -/// SEND_SMS phone, message, provider -/// Sends an SMS message using a specific provider -pub fn register_send_sms_with_provider_keyword( +/// SEND_SMS phone, message, priority_or_provider +/// Sends an SMS message with priority (low, normal, high, urgent) OR specific provider +/// Auto-detects if third argument is a priority level or provider name +pub fn register_send_sms_with_third_arg_keyword( state: Arc, user: UserSession, engine: &mut Engine, @@ -187,9 +229,26 @@ pub fn register_send_sms_with_provider_keyword( move |context, inputs| { let phone = context.eval_expression_tree(&inputs[0])?.to_string(); let message = context.eval_expression_tree(&inputs[1])?.to_string(); - let provider = context.eval_expression_tree(&inputs[2])?.to_string(); + let third_arg = context.eval_expression_tree(&inputs[2])?.to_string(); - trace!("SEND_SMS: Sending SMS to {} via {}", phone, provider); + // Check if third argument is a priority or provider + let is_priority = matches!( + third_arg.to_lowercase().as_str(), + "low" | "normal" | "high" | "urgent" | "critical" + ); + + let (provider_override, priority_override) = if is_priority { + (None, Some(third_arg.clone())) + } else { + (Some(third_arg.clone()), None) + }; + + trace!( + "SEND_SMS: Sending SMS to {} (third_arg={}, is_priority={})", + phone, + third_arg, + is_priority + ); let state_for_task = Arc::clone(&state_clone); let user_for_task = user_clone.clone(); @@ -209,7 +268,8 @@ pub fn register_send_sms_with_provider_keyword( &user_for_task, &phone, &message, - Some(&provider), + provider_override.as_deref(), + priority_override.as_deref(), ) .await }); @@ -233,6 +293,194 @@ pub fn register_send_sms_with_provider_keyword( ); map.insert("provider".into(), Dynamic::from(result.provider)); map.insert("to".into(), Dynamic::from(result.to)); + map.insert("priority".into(), Dynamic::from(result.priority)); + if let Some(err) = result.error { + map.insert("error".into(), Dynamic::from(err)); + } + Ok(Dynamic::from(map)) + } + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND_SMS failed: {}", e).into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "SEND_SMS timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND_SMS thread failed: {}", e).into(), + rhai::Position::NONE, + ))), + } + }, + ) + .unwrap(); +} + +/// SEND_SMS phone, message, provider, priority +/// Sends an SMS message using a specific provider with specific priority +pub fn register_send_sms_full_keyword( + state: Arc, + user: UserSession, + engine: &mut Engine, +) { + let state_clone = Arc::clone(&state); + let user_clone = user.clone(); + + engine + .register_custom_syntax( + &["SEND_SMS", "$expr$", ",", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let phone = context.eval_expression_tree(&inputs[0])?.to_string(); + let message = context.eval_expression_tree(&inputs[1])?.to_string(); + let provider = context.eval_expression_tree(&inputs[2])?.to_string(); + let priority = context.eval_expression_tree(&inputs[3])?.to_string(); + + trace!( + "SEND_SMS: Sending SMS to {} via {} with priority {}", + phone, + provider, + priority + ); + + let state_for_task = Arc::clone(&state_clone); + let user_for_task = user_clone.clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + execute_send_sms( + &state_for_task, + &user_for_task, + &phone, + &message, + Some(&provider), + Some(&priority), + ) + .await + }); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".into())).err() + }; + + if send_err.is_some() { + error!("Failed to send SEND_SMS result from thread"); + } + }); + + match rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(Ok(result)) => { + let mut map = rhai::Map::new(); + map.insert("success".into(), Dynamic::from(result.success)); + map.insert( + "message_id".into(), + Dynamic::from(result.message_id.unwrap_or_default()), + ); + map.insert("provider".into(), Dynamic::from(result.provider)); + map.insert("to".into(), Dynamic::from(result.to)); + map.insert("priority".into(), Dynamic::from(result.priority)); + if let Some(err) = result.error { + map.insert("error".into(), Dynamic::from(err)); + } + Ok(Dynamic::from(map)) + } + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND_SMS failed: {}", e).into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "SEND_SMS timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND_SMS thread failed: {}", e).into(), + rhai::Position::NONE, + ))), + } + }, + ) + .unwrap(); + + // Also register the 4-argument syntax: SEND_SMS phone, message, provider, priority + let state_clone2 = Arc::clone(&state); + let user_clone2 = user.clone(); + + engine + .register_custom_syntax( + &[ + "SEND_SMS", "$expr$", ",", "$expr$", ",", "$expr$", ",", "$expr$", + ], + false, + move |context, inputs| { + let phone = context.eval_expression_tree(&inputs[0])?.to_string(); + let message = context.eval_expression_tree(&inputs[1])?.to_string(); + let provider = context.eval_expression_tree(&inputs[2])?.to_string(); + let priority = context.eval_expression_tree(&inputs[3])?.to_string(); + + trace!( + "SEND_SMS: Sending SMS to {} via {} with priority {}", + phone, + provider, + priority + ); + + let state_for_task = Arc::clone(&state_clone2); + let user_for_task = user_clone2.clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + execute_send_sms( + &state_for_task, + &user_for_task, + &phone, + &message, + Some(&provider), + Some(&priority), + ) + .await + }); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".into())).err() + }; + + if send_err.is_some() { + error!("Failed to send SEND_SMS result from thread"); + } + }); + + match rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(Ok(result)) => { + let mut map = rhai::Map::new(); + map.insert("success".into(), Dynamic::from(result.success)); + map.insert( + "message_id".into(), + Dynamic::from(result.message_id.unwrap_or_default()), + ); + map.insert("provider".into(), Dynamic::from(result.provider)); + map.insert("to".into(), Dynamic::from(result.to)); + map.insert("priority".into(), Dynamic::from(result.priority)); if let Some(err) = result.error { map.insert("error".into(), Dynamic::from(err)); } @@ -264,6 +512,7 @@ async fn execute_send_sms( phone: &str, message: &str, provider_override: Option<&str>, + priority_override: Option<&str>, ) -> Result> { let config_manager = ConfigManager::new(state.conn.clone()); let bot_id = user.bot_id; @@ -278,28 +527,55 @@ async fn execute_send_sms( let provider = SmsProvider::from(provider_name.as_str()); + // Get priority from override or config + let priority = match priority_override { + Some(p) => SmsPriority::from(p), + None => { + let priority_str = config_manager + .get_config(&bot_id, "sms-default-priority", None) + .unwrap_or_else(|_| "normal".to_string()); + SmsPriority::from(priority_str.as_str()) + } + }; + // Normalize phone number let normalized_phone = normalize_phone_number(phone); - // Send via appropriate provider + // Log priority for high/urgent messages + if matches!(priority, SmsPriority::High | SmsPriority::Urgent) { + info!( + "High priority SMS to {}: priority={}", + normalized_phone, priority + ); + } + + // Send via appropriate provider (priority passed for providers that support it) let result = match provider { - SmsProvider::Twilio => send_via_twilio(state, &bot_id, &normalized_phone, message).await, - SmsProvider::AwsSns => send_via_aws_sns(state, &bot_id, &normalized_phone, message).await, - SmsProvider::Vonage => send_via_vonage(state, &bot_id, &normalized_phone, message).await, + SmsProvider::Twilio => { + send_via_twilio(state, &bot_id, &normalized_phone, message, &priority).await + } + SmsProvider::AwsSns => { + send_via_aws_sns(state, &bot_id, &normalized_phone, message, &priority).await + } + SmsProvider::Vonage => { + send_via_vonage(state, &bot_id, &normalized_phone, message, &priority).await + } SmsProvider::MessageBird => { - send_via_messagebird(state, &bot_id, &normalized_phone, message).await + send_via_messagebird(state, &bot_id, &normalized_phone, message, &priority).await } SmsProvider::Custom(name) => { - send_via_custom_webhook(state, &bot_id, &name, &normalized_phone, message).await + send_via_custom_webhook(state, &bot_id, &name, &normalized_phone, message, &priority) + .await } }; match result { Ok(message_id) => { info!( - "SMS sent successfully to {} via {}: {}", + "SMS sent successfully to {} via {} (priority={}): {}", normalized_phone, provider_name, + priority, message_id.as_deref().unwrap_or("no-id") ); Ok(SmsSendResult { @@ -307,6 +583,7 @@ async fn execute_send_sms( message_id, provider: provider_name, to: normalized_phone, + priority: priority.to_string(), error: None, }) } @@ -317,6 +594,7 @@ async fn execute_send_sms( message_id: None, provider: provider_name, to: normalized_phone, + priority: priority.to_string(), error: Some(e.to_string()), }) } @@ -347,6 +625,7 @@ async fn send_via_twilio( bot_id: &Uuid, phone: &str, message: &str, + priority: &SmsPriority, ) -> Result, Box> { let config_manager = ConfigManager::new(state.conn.clone()); @@ -368,7 +647,19 @@ async fn send_via_twilio( account_sid ); - let params = [("To", phone), ("From", &from_number), ("Body", message)]; + // Twilio supports priority through StatusCallback and scheduling + // For urgent messages, we can add priority prefix to message if configured + let final_message = match priority { + SmsPriority::Urgent => format!("[URGENT] {}", message), + SmsPriority::High => format!("[HIGH] {}", message), + _ => message.to_string(), + }; + + let params = [ + ("To", phone), + ("From", from_number.as_str()), + ("Body", final_message.as_str()), + ]; let response = client .post(&url) @@ -392,6 +683,7 @@ async fn send_via_aws_sns( bot_id: &Uuid, phone: &str, message: &str, + priority: &SmsPriority, ) -> Result, Box> { let config_manager = ConfigManager::new(state.conn.clone()); @@ -415,12 +707,22 @@ async fn send_via_aws_sns( let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string(); let _date = ×tamp[..8]; + // AWS SNS supports SMSType: Promotional or Transactional + // Map priority to SMS type (High/Urgent = Transactional for better delivery) + let sms_type = match priority { + SmsPriority::High | SmsPriority::Urgent => "Transactional", + _ => "Promotional", + }; + // Build the request parameters let params = [ ("Action", "Publish"), ("PhoneNumber", phone), ("Message", message), ("Version", "2010-03-31"), + ("MessageAttributes.entry.1.Name", "AWS.SNS.SMS.SMSType"), + ("MessageAttributes.entry.1.Value.DataType", "String"), + ("MessageAttributes.entry.1.Value.StringValue", sms_type), ]; // For simplicity, using query string auth (requires proper AWS SigV4 in production) @@ -454,6 +756,7 @@ async fn send_via_vonage( bot_id: &Uuid, phone: &str, message: &str, + priority: &SmsPriority, ) -> Result, Box> { let config_manager = ConfigManager::new(state.conn.clone()); @@ -471,17 +774,27 @@ async fn send_via_vonage( let client = reqwest::Client::new(); - let payload = serde_json::json!({ + // Vonage supports message-class for priority (0 = flash/urgent) + let message_class = match priority { + SmsPriority::Urgent => Some("0"), // Flash message + _ => None, + }; + + let mut body = serde_json::json!({ "api_key": api_key, "api_secret": api_secret, - "to": phone.trim_start_matches('+'), + "to": phone, "from": from_number, "text": message }); + if let Some(class) = message_class { + body["message-class"] = serde_json::Value::String(class.to_string()); + } + let response = client .post("https://rest.nexmo.com/sms/json") - .json(&payload) + .json(&body) .send() .await?; @@ -511,6 +824,7 @@ async fn send_via_messagebird( bot_id: &Uuid, phone: &str, message: &str, + priority: &SmsPriority, ) -> Result, Box> { let config_manager = ConfigManager::new(state.conn.clone()); @@ -526,16 +840,27 @@ async fn send_via_messagebird( let client = reqwest::Client::new(); - let payload = serde_json::json!({ + // MessageBird supports typeDetails.class for priority + let type_details = match priority { + SmsPriority::Urgent => Some(serde_json::json!({"class": 0})), // Flash message + SmsPriority::High => Some(serde_json::json!({"class": 1})), + _ => None, + }; + + let mut body = serde_json::json!({ "originator": originator, - "recipients": [phone.trim_start_matches('+')], + "recipients": [phone], "body": message }); + if let Some(details) = type_details { + body["typeDetails"] = details; + } + let response = client .post("https://rest.messagebird.com/messages") .header("Authorization", format!("AccessKey {}", api_key)) - .json(&payload) + .json(&body) .send() .await?; @@ -551,23 +876,24 @@ async fn send_via_messagebird( async fn send_via_custom_webhook( state: &AppState, bot_id: &Uuid, - provider_name: &str, + webhook_name: &str, phone: &str, message: &str, + priority: &SmsPriority, ) -> Result, Box> { let config_manager = ConfigManager::new(state.conn.clone()); let webhook_url = config_manager - .get_config(bot_id, &format!("{}-webhook-url", provider_name), None) + .get_config(bot_id, &format!("{}-webhook-url", webhook_name), None) .map_err(|_| { format!( "Custom SMS webhook URL not configured. Set {}-webhook-url in config.", - provider_name + webhook_name ) })?; let api_key = config_manager - .get_config(bot_id, &format!("{}-api-key", provider_name), None) + .get_config(bot_id, &format!("{}-api-key", webhook_name), None) .ok(); let client = reqwest::Client::new(); @@ -575,7 +901,8 @@ async fn send_via_custom_webhook( let payload = serde_json::json!({ "to": phone, "message": message, - "provider": provider_name + "provider": webhook_name, + "priority": priority.to_string() }); let mut request = client.post(&webhook_url).json(&payload); diff --git a/src/basic/keywords/web_data.rs b/src/basic/keywords/web_data.rs new file mode 100644 index 000000000..d508589cc --- /dev/null +++ b/src/basic/keywords/web_data.rs @@ -0,0 +1,420 @@ +use crate::shared::models::UserSession; +use crate::shared::state::AppState; +use log::{debug, trace}; +use reqwest::Url; +use rhai::{Array, Dynamic, Engine, EvalAltResult, Map, Position}; +use scraper::{Html, Selector}; +use std::sync::Arc; +use std::time::Duration; + +pub fn register_web_data_keywords(state: Arc, user: UserSession, engine: &mut Engine) { + register_rss_keyword(state.clone(), user.clone(), engine); + register_scrape_keyword(state.clone(), user.clone(), engine); + register_scrape_all_keyword(state.clone(), user.clone(), engine); + register_scrape_table_keyword(state.clone(), user.clone(), engine); + register_scrape_links_keyword(state.clone(), user.clone(), engine); + register_scrape_images_keyword(state, user, engine); +} + +fn register_rss_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax(&["RSS", "$expr$"], false, move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + trace!("RSS {}", url); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { fetch_rss(&url, 100).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("RSS failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "RSS timed out".into(), + Position::NONE, + ))), + } + }) + .unwrap(); + + engine + .register_custom_syntax( + &["RSS", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + let limit = context + .eval_expression_tree(&inputs[1])? + .as_int() + .unwrap_or(10) as usize; + trace!("RSS {} limit {}", url, limit); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { fetch_rss(&url, limit).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("RSS failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "RSS timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered RSS keyword"); +} + +async fn fetch_rss( + url: &str, + limit: usize, +) -> Result> { + let client = reqwest::Client::builder() + .user_agent("BotServer/6.1.0") + .timeout(Duration::from_secs(30)) + .build()?; + let content = client.get(url).send().await?.bytes().await?; + let channel = rss::Channel::read_from(&content[..])?; + let mut results = Array::new(); + for item in channel.items().iter().take(limit) { + let mut entry = Map::new(); + entry.insert( + "title".into(), + Dynamic::from(item.title().unwrap_or("").to_string()), + ); + entry.insert( + "link".into(), + Dynamic::from(item.link().unwrap_or("").to_string()), + ); + entry.insert( + "description".into(), + Dynamic::from(item.description().unwrap_or("").to_string()), + ); + entry.insert( + "pubDate".into(), + Dynamic::from(item.pub_date().unwrap_or("").to_string()), + ); + entry.insert( + "author".into(), + Dynamic::from(item.author().unwrap_or("").to_string()), + ); + if let Some(guid) = item.guid() { + entry.insert("guid".into(), Dynamic::from(guid.value().to_string())); + } + results.push(Dynamic::from(entry)); + } + Ok(results) +} + +fn register_scrape_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["SCRAPE", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + let selector = context.eval_expression_tree(&inputs[1])?.to_string(); + trace!("SCRAPE {} selector {}", url, selector); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { scrape_first(&url, &selector).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SCRAPE failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "SCRAPE timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered SCRAPE keyword"); +} + +fn register_scrape_all_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["SCRAPE_ALL", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + let selector = context.eval_expression_tree(&inputs[1])?.to_string(); + trace!("SCRAPE_ALL {} selector {}", url, selector); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { scrape_all(&url, &selector).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SCRAPE_ALL failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "SCRAPE_ALL timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered SCRAPE_ALL keyword"); +} + +fn register_scrape_table_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["SCRAPE_TABLE", "$expr$", ",", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + let selector = context.eval_expression_tree(&inputs[1])?.to_string(); + trace!("SCRAPE_TABLE {} selector {}", url, selector); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { scrape_table(&url, &selector).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SCRAPE_TABLE failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "SCRAPE_TABLE timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered SCRAPE_TABLE keyword"); +} + +fn register_scrape_links_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["SCRAPE_LINKS", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + trace!("SCRAPE_LINKS {}", url); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { scrape_links(&url).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SCRAPE_LINKS failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "SCRAPE_LINKS timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered SCRAPE_LINKS keyword"); +} + +fn register_scrape_images_keyword(_state: Arc, _user: UserSession, engine: &mut Engine) { + engine + .register_custom_syntax( + &["SCRAPE_IMAGES", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?.to_string(); + trace!("SCRAPE_IMAGES {}", url); + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = rt.block_on(async { scrape_images(&url).await }); + let _ = tx.send(result); + }); + match rx.recv_timeout(Duration::from_secs(30)) { + Ok(Ok(result)) => Ok(Dynamic::from(result)), + Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime( + format!("SCRAPE_IMAGES failed: {}", e).into(), + Position::NONE, + ))), + Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime( + "SCRAPE_IMAGES timed out".into(), + Position::NONE, + ))), + } + }, + ) + .unwrap(); + + debug!("Registered SCRAPE_IMAGES keyword"); +} + +async fn fetch_page(url: &str) -> Result> { + let client = reqwest::Client::builder() + .user_agent("Mozilla/5.0 (compatible; BotServer/6.1.0)") + .timeout(Duration::from_secs(30)) + .build()?; + let response = client.get(url).send().await?.text().await?; + Ok(response) +} + +async fn scrape_first( + url: &str, + selector: &str, +) -> Result> { + let html = fetch_page(url).await?; + let document = Html::parse_document(&html); + let sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?; + if let Some(element) = document.select(&sel).next() { + let text = element + .text() + .collect::>() + .join(" ") + .trim() + .to_string(); + return Ok(text); + } + Ok(String::new()) +} + +async fn scrape_all( + url: &str, + selector: &str, +) -> Result> { + let html = fetch_page(url).await?; + let document = Html::parse_document(&html); + let sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?; + let results: Array = document + .select(&sel) + .map(|el| { + let text = el.text().collect::>().join(" ").trim().to_string(); + Dynamic::from(text) + }) + .collect(); + Ok(results) +} + +async fn scrape_table( + url: &str, + selector: &str, +) -> Result> { + let html = fetch_page(url).await?; + let document = Html::parse_document(&html); + let table_sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?; + let tr_sel = Selector::parse("tr").unwrap(); + let th_sel = Selector::parse("th").unwrap(); + let td_sel = Selector::parse("td").unwrap(); + let mut results = Array::new(); + let mut headers: Vec = Vec::new(); + if let Some(table) = document.select(&table_sel).next() { + for (i, row) in table.select(&tr_sel).enumerate() { + if i == 0 { + headers = row + .select(&th_sel) + .chain(row.select(&td_sel)) + .map(|cell| cell.text().collect::>().join(" ").trim().to_string()) + .collect(); + if headers.is_empty() { + continue; + } + } else { + let mut row_map = Map::new(); + for (j, cell) in row.select(&td_sel).enumerate() { + let key = headers + .get(j) + .cloned() + .unwrap_or_else(|| format!("col{}", j)); + let value = cell.text().collect::>().join(" ").trim().to_string(); + row_map.insert(key.into(), Dynamic::from(value)); + } + if !row_map.is_empty() { + results.push(Dynamic::from(row_map)); + } + } + } + } + Ok(results) +} + +async fn scrape_links(url: &str) -> Result> { + let html = fetch_page(url).await?; + let document = Html::parse_document(&html); + let sel = Selector::parse("a[href]").unwrap(); + let base_url = Url::parse(url)?; + let mut results = Array::new(); + for el in document.select(&sel) { + if let Some(href) = el.value().attr("href") { + let absolute = base_url + .join(href) + .map(|u| u.to_string()) + .unwrap_or_default(); + if !absolute.is_empty() { + let mut link = Map::new(); + link.insert("href".into(), Dynamic::from(absolute)); + link.insert( + "text".into(), + Dynamic::from(el.text().collect::>().join(" ").trim().to_string()), + ); + results.push(Dynamic::from(link)); + } + } + } + Ok(results) +} + +async fn scrape_images(url: &str) -> Result> { + let html = fetch_page(url).await?; + let document = Html::parse_document(&html); + let sel = Selector::parse("img[src]").unwrap(); + let base_url = Url::parse(url)?; + let mut results = Array::new(); + for el in document.select(&sel) { + if let Some(src) = el.value().attr("src") { + let absolute = base_url + .join(src) + .map(|u| u.to_string()) + .unwrap_or_default(); + if !absolute.is_empty() { + let mut img = Map::new(); + img.insert("src".into(), Dynamic::from(absolute)); + img.insert( + "alt".into(), + Dynamic::from(el.value().attr("alt").unwrap_or("").to_string()), + ); + results.push(Dynamic::from(img)); + } + } + } + Ok(results) +} diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 49ab8a00b..6f66e994c 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -23,6 +23,7 @@ struct ParamConfigRow { use self::keywords::add_bot::register_bot_keywords; use self::keywords::add_member::add_member_keyword; use self::keywords::add_suggestion::add_suggestion_keyword; +use self::keywords::ai_tools::register_ai_tools_keywords; use self::keywords::book::book_keyword; use self::keywords::bot_memory::{get_bot_memory_keyword, set_bot_memory_keyword}; use self::keywords::clear_kb::register_clear_kb_keyword; @@ -49,12 +50,14 @@ use self::keywords::remember::remember_keyword; use self::keywords::save_from_unstructured::save_from_unstructured_keyword; use self::keywords::send_mail::send_mail_keyword; use self::keywords::send_template::register_send_template_keywords; +use self::keywords::sms::register_sms_keywords; use self::keywords::social_media::register_social_media_keywords; use self::keywords::switch_case::preprocess_switch; use self::keywords::transfer_to_human::register_transfer_to_human_keyword; use self::keywords::use_kb::register_use_kb_keyword; use self::keywords::use_tool::use_tool_keyword; use self::keywords::use_website::{clear_websites_keyword, use_website_keyword}; +use self::keywords::web_data::register_web_data_keywords; use self::keywords::webhook::webhook_keyword; use self::keywords::llm_keyword::llm_keyword; @@ -182,6 +185,21 @@ impl ScriptService { // Supports transfer by name/alias, department, priority, and context register_transfer_to_human_keyword(state.clone(), user.clone(), &mut engine); + // ======================================================================== + // AI-POWERED TOOLS: TRANSLATE, OCR, SENTIMENT, CLASSIFY + // ======================================================================== + register_ai_tools_keywords(state.clone(), user.clone(), &mut engine); + + // ======================================================================== + // WEB DATA: RSS, SCRAPE, SCRAPE_ALL, SCRAPE_TABLE, SCRAPE_LINKS, SCRAPE_IMAGES + // ======================================================================== + register_web_data_keywords(state.clone(), user.clone(), &mut engine); + + // ======================================================================== + // SMS: SEND_SMS phone, message - Send SMS via Twilio, AWS SNS, Vonage, etc. + // ======================================================================== + register_sms_keywords(state.clone(), user.clone(), &mut engine); + // ======================================================================== // CORE BASIC FUNCTIONS: Math, Date/Time, Validation, Arrays, Error Handling // ======================================================================== @@ -189,7 +207,7 @@ impl ScriptService { // Math: ABS, ROUND, INT, MAX, MIN, MOD, RANDOM, SGN, SQR, LOG, EXP, SIN, COS, TAN // Date/Time: NOW, TODAY, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, DATEADD, DATEDIFF // Validation: VAL, STR, ISNULL, ISEMPTY, ISDATE, TYPEOF - // Arrays: ARRAY, UBOUND, SORT, UNIQUE, CONTAINS, PUSH, POP, REVERSE, SLICE + // Arrays: ARRAY, UBOUND, SORT, UNIQUE, CONTAINS, PUSH, POP, REVERSE, SLICE, BATCH, CHUNK // Error Handling: THROW, ERROR, IS_ERROR, ASSERT register_core_functions(state.clone(), user, &mut engine); diff --git a/src/console/editor.rs b/src/console/editor.rs index e9624a9c4..9919e5e50 100644 --- a/src/console/editor.rs +++ b/src/console/editor.rs @@ -66,6 +66,7 @@ impl Editor { pub fn file_path(&self) -> &str { &self.file_path } + #[allow(dead_code)] pub fn set_visible_lines(&mut self, lines: usize) { self.visible_lines = lines.max(5); } @@ -188,10 +189,12 @@ impl Editor { } } + #[allow(dead_code)] pub fn scroll_up(&mut self) { self.scroll_offset = self.scroll_offset.saturating_sub(1); } + #[allow(dead_code)] pub fn scroll_down(&mut self) { let total_lines = self.content.lines().count().max(1); let max_scroll = total_lines.saturating_sub(self.visible_lines.saturating_sub(3)); diff --git a/src/console/mod.rs b/src/console/mod.rs index f4f944a8b..dfffc21fd 100644 --- a/src/console/mod.rs +++ b/src/console/mod.rs @@ -6,10 +6,10 @@ use crossterm::{ execute, terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; -use log::LevelFilter; + use ratatui::{ backend::CrosstermBackend, - layout::{Alignment, Constraint, Direction, Layout, Rect}, + layout::{Constraint, Direction, Layout, Rect}, style::{Color, Modifier, Style}, text::{Line, Span}, widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}, diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index cf6f67452..4e4fc1c03 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -200,6 +200,7 @@ impl std::fmt::Debug for AppState { #[cfg(feature = "llm")] #[derive(Debug)] +#[allow(dead_code)] struct MockLLMProvider; #[cfg(feature = "llm")] @@ -236,6 +237,7 @@ impl LLMProvider for MockLLMProvider { } #[cfg(feature = "directory")] +#[allow(dead_code)] fn create_mock_auth_service() -> AuthService { use crate::directory::client::ZitadelConfig; diff --git a/src/llm/local.rs b/src/llm/local.rs index b8750aa5a..4d2b7f02b 100644 --- a/src/llm/local.rs +++ b/src/llm/local.rs @@ -128,14 +128,18 @@ pub async fn ensure_llama_servers_running( let mut llm_ready = llm_running || llm_model.is_empty(); let mut embedding_ready = embedding_running || embedding_model.is_empty(); let mut attempts = 0; - let max_attempts = 60; + let max_attempts = 120; // Increased to 4 minutes for large models while attempts < max_attempts && (!llm_ready || !embedding_ready) { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - info!( - "Checking server health (attempt {}/{})...", - attempts + 1, - max_attempts - ); + + // Only log every 5 attempts to reduce noise + if attempts % 5 == 0 { + info!( + "Checking server health (attempt {}/{})...", + attempts + 1, + max_attempts + ); + } if !llm_ready && !llm_model.is_empty() { if is_server_running(&llm_url).await { info!("LLM server ready at {}", llm_url); @@ -148,14 +152,26 @@ pub async fn ensure_llama_servers_running( if is_server_running(&embedding_url).await { info!("Embedding server ready at {}", embedding_url); embedding_ready = true; - } else { - info!("Embedding server not ready yet"); + } else if attempts % 10 == 0 { + warn!("Embedding server not ready yet at {}", embedding_url); + // Try to read log file for diagnostics + if let Ok(log_content) = + std::fs::read_to_string(format!("{}/llmembd-stdout.log", llm_server_path)) + { + let last_lines: Vec<&str> = log_content.lines().rev().take(5).collect(); + if !last_lines.is_empty() { + info!("Embedding server log (last 5 lines):"); + for line in last_lines.iter().rev() { + info!(" {}", line); + } + } + } } } attempts += 1; - if attempts % 10 == 0 { - info!( - "Still waiting for servers... (attempt {}/{})", + if attempts % 20 == 0 { + warn!( + "Still waiting for servers... (attempt {}/{}) - this may take a while for large models", attempts, max_attempts ); } @@ -181,10 +197,37 @@ pub async fn ensure_llama_servers_running( } } pub async fn is_server_running(url: &str) -> bool { - let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .unwrap_or_default(); + + // Try /health first (standard llama.cpp endpoint) match client.get(&format!("{}/health", url)).send().await { - Ok(response) => response.status().is_success(), - Err(_) => false, + Ok(response) => { + if response.status().is_success() { + return true; + } + // Log non-success status for debugging + info!("Health check returned status: {}", response.status()); + false + } + Err(e) => { + // Also try root endpoint as fallback + match client.get(url).send().await { + Ok(response) => response.status().is_success(), + Err(_) => { + // Only log connection errors occasionally to avoid spam + if e.is_connect() { + // Connection refused - server not started yet + false + } else { + warn!("Health check error for {}: {}", url, e); + false + } + } + } + } } } pub async fn start_llm_server( @@ -296,10 +339,28 @@ pub async fn start_embedding_server( url: String, ) -> Result<(), Box> { let port = url.split(':').last().unwrap_or("8082"); + + // Check if model file exists + let full_model_path = if model_path.starts_with('/') { + model_path.clone() + } else { + format!("{}/{}", llama_cpp_path, model_path) + }; + + if !std::path::Path::new(&full_model_path).exists() { + error!("Embedding model file not found: {}", full_model_path); + return Err(format!("Embedding model file not found: {}", full_model_path).into()); + } + + info!( + "Starting embedding server on port {} with model: {}", + port, model_path + ); + if cfg!(windows) { let mut cmd = tokio::process::Command::new("cmd"); cmd.arg("/c").arg(format!( - "cd {} && .\\llama-server.exe -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >stdout.log", + "cd {} && .\\llama-server.exe -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >stdout.log 2>&1", llama_cpp_path, model_path, port )); cmd.spawn()?; @@ -309,7 +370,15 @@ pub async fn start_embedding_server( "cd {} && ./llama-server -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >llmembd-stdout.log 2>&1 &", llama_cpp_path, model_path, port )); + info!( + "Executing embedding server command: cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding", + llama_cpp_path, model_path, port + ); cmd.spawn()?; } + + // Give the server a moment to start + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + Ok(()) }