Bootstrap started! 6.1.0

- Add rss and scraper dependencies for web data keywords
- Add SMS keyword with priority support (low, normal, high, urgent)
- Add web_data keywords: RSS, SCRAPE, SCRAPE_ALL, SCRAPE_TABLE, SCRAPE_LINKS, SCRAPE_IMAGES
- Add ai_tools keywords: TRANSLATE, OCR, SENTIMENT, CLASSIFY
- Improve LLM server health check with better diagnostics and increased timeout
- Fix compilation errors and warnings
- Register SMS keywords in BASIC engine
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-10 18:22:02 -03:00
parent b6d3e0a2d5
commit 9b124156ad
13 changed files with 1594 additions and 50 deletions

273
Cargo.lock generated
View file

@ -349,6 +349,19 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "atom_syndication"
version = "0.12.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2f68d23e2cb4fd958c705b91a6b4c80ceeaf27a9e11651272a8389d5ce1a4a3"
dependencies = [
"chrono",
"derive_builder 0.20.2",
"diligent-date-parser",
"never",
"quick-xml 0.37.5",
]
[[package]]
name = "atomic"
version = "0.6.1"
@ -1137,11 +1150,13 @@ dependencies = [
"reqwest",
"rhai",
"ring",
"rss",
"rust_xlsxwriter",
"rustls 0.23.35",
"rustls-native-certs 0.6.3",
"rustls-pemfile 2.2.0",
"scopeguard",
"scraper",
"serde",
"serde_json",
"sha2",
@ -1254,7 +1269,7 @@ dependencies = [
"codepage",
"encoding_rs",
"log",
"quick-xml",
"quick-xml 0.31.0",
"serde",
"zip 2.4.2",
]
@ -1650,7 +1665,7 @@ checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b"
dependencies = [
"bitflags 2.10.0",
"crossterm_winapi",
"derive_more",
"derive_more 2.1.0",
"document-features",
"mio",
"parking_lot",
@ -1718,6 +1733,29 @@ dependencies = [
"phf",
]
[[package]]
name = "cssparser"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c66d1cd8ed61bf80b38432613a7a2f09401ab8d0501110655f8b341484a3e3"
dependencies = [
"cssparser-macros",
"dtoa-short",
"itoa",
"phf",
"smallvec",
]
[[package]]
name = "cssparser-macros"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331"
dependencies = [
"quote",
"syn 2.0.111",
]
[[package]]
name = "csv"
version = "1.4.0"
@ -2053,6 +2091,17 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "derive_more"
version = "0.99.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6edb4b64a43d977b8e99788fe3a04d483834fba1215a7e02caa415b626497f7f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "derive_more"
version = "2.1.0"
@ -2137,6 +2186,15 @@ dependencies = [
"subtle",
]
[[package]]
name = "diligent-date-parser"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8ede7d79366f419921e2e2f67889c12125726692a313bffb474bd5f37a581e9"
dependencies = [
"chrono",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -2196,6 +2254,21 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "dtoa"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04"
[[package]]
name = "dtoa-short"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd1511a7b6a56299bd043a9c167a6d2bfb37bf84a6dfceaba651168adfb43c87"
dependencies = [
"dtoa",
]
[[package]]
name = "dunce"
version = "1.0.5"
@ -2223,6 +2296,12 @@ dependencies = [
"signature",
]
[[package]]
name = "ego-tree"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2972feb8dffe7bc8c5463b1dacda1b0dfbed3710e50f977d965429692d74cd8"
[[package]]
name = "either"
version = "1.15.0"
@ -2522,6 +2601,16 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "futf"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df420e2e84819663797d1ec6544b13c5be84629e7bb00dc960d6917db2987843"
dependencies = [
"mac",
"new_debug_unreachable",
]
[[package]]
name = "futures"
version = "0.3.31"
@ -2617,6 +2706,15 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "generic-array"
version = "0.14.7"
@ -2627,6 +2725,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "getopts"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe4fbac503b8d1f88e6676011885f34b7174f46e59956bba534ba83abded4df"
dependencies = [
"unicode-width",
]
[[package]]
name = "getrandom"
version = "0.2.16"
@ -2824,6 +2931,18 @@ dependencies = [
"windows-link 0.2.1",
]
[[package]]
name = "html5ever"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b7410cae13cbc75623c98ac4cbfd1f0bedddf3227afc24f370cf0f50a44a11c"
dependencies = [
"log",
"mac",
"markup5ever",
"match_token",
]
[[package]]
name = "http"
version = "0.2.12"
@ -3757,6 +3876,12 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "mac"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "mac_address"
version = "1.1.8"
@ -3778,6 +3903,31 @@ dependencies = [
"quoted_printable",
]
[[package]]
name = "markup5ever"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7a7213d12e1864c0f002f52c2923d4556935a43dec5e71355c2760e0f6e7a18"
dependencies = [
"log",
"phf",
"phf_codegen",
"string_cache",
"string_cache_codegen",
"tendril",
]
[[package]]
name = "match_token"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88a9689d8d44bf9964484516275f5cd4c9b59457a6940c1d5d0ecbb94510a36b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -3951,6 +4101,18 @@ dependencies = [
"tempfile",
]
[[package]]
name = "never"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91"
[[package]]
name = "new_debug_unreachable"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
[[package]]
name = "nix"
version = "0.29.0"
@ -4666,6 +4828,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "precomputed-hash"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "prettyplease"
version = "0.2.37"
@ -4846,6 +5014,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.37.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb"
dependencies = [
"encoding_rs",
"memchr",
]
[[package]]
name = "quinn"
version = "0.11.9"
@ -5276,6 +5454,18 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rss"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2107738f003660f0a91f56fd3e3bd3ab5d918b2ddaf1e1ec2136fb1c46f71bf"
dependencies = [
"atom_syndication",
"derive_builder 0.20.2",
"never",
"quick-xml 0.37.5",
]
[[package]]
name = "rust_xlsxwriter"
version = "0.79.4"
@ -5509,6 +5699,21 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "scraper"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc3d051b884f40e309de6c149734eab57aa8cc1347992710dc80bcc1c2194c15"
dependencies = [
"cssparser",
"ego-tree",
"getopts",
"html5ever",
"precomputed-hash",
"selectors",
"tendril",
]
[[package]]
name = "scratch"
version = "1.0.9"
@ -5575,6 +5780,25 @@ dependencies = [
"libc",
]
[[package]]
name = "selectors"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd568a4c9bb598e291a08244a5c1f5a8a6650bee243b5b0f8dbb3d9cc1d87fe8"
dependencies = [
"bitflags 2.10.0",
"cssparser",
"derive_more 0.99.20",
"fxhash",
"log",
"new_debug_unreachable",
"phf",
"phf_codegen",
"precomputed-hash",
"servo_arc",
"smallvec",
]
[[package]]
name = "semver"
version = "1.0.27"
@ -5665,6 +5889,15 @@ dependencies = [
"serde",
]
[[package]]
name = "servo_arc"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "170fb83ab34de17dc69aa7c67482b22218ddb85da56546f9bd6b929e32a05930"
dependencies = [
"stable_deref_trait",
]
[[package]]
name = "sha1"
version = "0.10.6"
@ -5877,6 +6110,31 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "string_cache"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
dependencies = [
"new_debug_unreachable",
"parking_lot",
"phf_shared",
"precomputed-hash",
"serde",
]
[[package]]
name = "string_cache_codegen"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c711928715f1fe0fe509c53b43e993a9a557babc2d0a3567d0a3006f1ac931a0"
dependencies = [
"phf_generator",
"phf_shared",
"proc-macro2",
"quote",
]
[[package]]
name = "stringprep"
version = "0.1.5"
@ -6040,6 +6298,17 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "tendril"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d24a120c5fc464a3458240ee02c299ebcb9d67b5249c8848b09d639dca8d7bb0"
dependencies = [
"futf",
"mac",
"utf-8",
]
[[package]]
name = "termcolor"
version = "1.4.1"

View file

@ -230,6 +230,12 @@ figment = { version = "0.10", features = ["toml", "env", "json"] }
# Rate limiting
governor = "0.10"
# RSS feed parsing
rss = "2.0"
# HTML parsing/web scraping
scraper = "0.22"
[dev-dependencies]
mockito = "1.7.0"
tempfile = "3"

View file

@ -1,7 +1,7 @@
{
"base_url": "http://localhost:8080",
"default_org": {
"id": "350450804827619342",
"id": "350485067862114318",
"name": "default",
"domain": "default.localhost"
},
@ -13,8 +13,8 @@
"first_name": "Admin",
"last_name": "User"
},
"admin_token": "zBxbF2StNdIRTSPJI1q3ND7NFNeFQK5qoB8aS69bQSN3bwLvVi3jxTVFnEVDGur4kaltPgc",
"admin_token": "UC3h1KMCk9gqQJLBQHdb_ra0nxqABNfGobIesD1q1tAddGiMwQPHGfXgR6H1DLmyyrdy1WU",
"project_id": "",
"client_id": "350450809542082574",
"client_secret": "bgUNSAXTzOwaZnyatVgWceBTLtQrySQc8BGrJb7sT1hMOeiKAtWwD7638fg7biRq"
"client_id": "350485068449382414",
"client_secret": "wll0Tiy8cZoliqOAdFbtLspgDM2lDA2pAoH8LBVQx5iESOJNcwoyDBU83ly5C5YK"
}

View file

@ -0,0 +1,367 @@
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::{debug, trace};
use rhai::{Dynamic, Engine, EvalAltResult, Map, Position};
use std::sync::Arc;
use std::time::Duration;
pub fn register_ai_tools_keywords(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
register_translate_keyword(state.clone(), user.clone(), engine);
register_ocr_keyword(state.clone(), user.clone(), engine);
register_sentiment_keyword(state.clone(), user.clone(), engine);
register_classify_keyword(state, user, engine);
}
fn register_translate_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["TRANSLATE", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let text = context.eval_expression_tree(&inputs[0])?.to_string();
let target_lang = context.eval_expression_tree(&inputs[1])?.to_string();
trace!("TRANSLATE to {}", target_lang);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { translate_text(&text, &target_lang).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(60)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("TRANSLATE failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"TRANSLATE timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered TRANSLATE keyword");
}
fn register_ocr_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(&["OCR", "$expr$"], false, move |context, inputs| {
let image_path = context.eval_expression_tree(&inputs[0])?.to_string();
trace!("OCR {}", image_path);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { perform_ocr(&image_path).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(60)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("OCR failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"OCR timed out".into(),
Position::NONE,
))),
}
})
.unwrap();
debug!("Registered OCR keyword");
}
fn register_sentiment_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(&["SENTIMENT", "$expr$"], false, move |context, inputs| {
let text = context.eval_expression_tree(&inputs[0])?.to_string();
trace!("SENTIMENT analysis");
let (tx, rx) = std::sync::mpsc::channel();
let text_clone = text.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { analyze_sentiment(&text_clone).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SENTIMENT failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Ok(analyze_sentiment_quick(&text)),
}
})
.unwrap();
engine.register_fn("SENTIMENT_QUICK", |text: &str| -> Dynamic {
analyze_sentiment_quick(text)
});
debug!("Registered SENTIMENT keyword");
}
fn register_classify_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["CLASSIFY", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let text = context.eval_expression_tree(&inputs[0])?.to_string();
let categories = context.eval_expression_tree(&inputs[1])?;
trace!("CLASSIFY into categories");
let cat_list: Vec<String> = if categories.is_array() {
categories
.into_array()
.unwrap_or_default()
.into_iter()
.filter_map(|c| c.into_string().ok())
.collect()
} else {
categories
.into_string()
.unwrap_or_default()
.split(',')
.map(|s| s.trim().to_string())
.collect()
};
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { classify_text(&text, &cat_list).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("CLASSIFY failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"CLASSIFY timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered CLASSIFY keyword");
}
async fn translate_text(
text: &str,
target_lang: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
let prompt = format!(
"Translate to {}. Return ONLY the translation:\n\n{}",
target_lang, text
);
let client = reqwest::Client::new();
let response = client
.post(format!("{}/v1/chat/completions", llm_url))
.json(&serde_json::json!({
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
}))
.send()
.await?
.json::<serde_json::Value>()
.await?;
if let Some(content) = response["choices"][0]["message"]["content"].as_str() {
return Ok(content.trim().to_string());
}
Ok(text.to_string())
}
async fn perform_ocr(image_path: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let botmodels_url =
std::env::var("BOTMODELS_URL").unwrap_or_else(|_| "http://localhost:8001".to_string());
let client = reqwest::Client::new();
let image_data = if image_path.starts_with("http") {
client.get(image_path).send().await?.bytes().await?.to_vec()
} else {
std::fs::read(image_path)?
};
let response = client
.post(format!("{}/ocr", botmodels_url))
.header("Content-Type", "application/octet-stream")
.body(image_data)
.send()
.await?
.json::<serde_json::Value>()
.await?;
if let Some(text) = response["text"].as_str() {
return Ok(text.to_string());
}
Ok(String::new())
}
async fn analyze_sentiment(
text: &str,
) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
let prompt = format!(
r#"Analyze sentiment. Return JSON only:
{{"sentiment":"positive/negative/neutral","score":-100 to 100,"urgent":true/false}}
Text: {}"#,
text
);
let client = reqwest::Client::new();
let response = client
.post(format!("{}/v1/chat/completions", llm_url))
.json(&serde_json::json!({
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
}))
.send()
.await?
.json::<serde_json::Value>()
.await?;
if let Some(content) = response["choices"][0]["message"]["content"].as_str() {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
let mut result = Map::new();
result.insert(
"sentiment".into(),
Dynamic::from(
parsed["sentiment"]
.as_str()
.unwrap_or("neutral")
.to_string(),
),
);
result.insert(
"score".into(),
Dynamic::from(parsed["score"].as_i64().unwrap_or(0)),
);
result.insert(
"urgent".into(),
Dynamic::from(parsed["urgent"].as_bool().unwrap_or(false)),
);
return Ok(Dynamic::from(result));
}
}
Ok(analyze_sentiment_quick(text))
}
fn analyze_sentiment_quick(text: &str) -> Dynamic {
let text_lower = text.to_lowercase();
let positive = [
"good",
"great",
"excellent",
"amazing",
"wonderful",
"love",
"happy",
"thank",
"thanks",
"awesome",
"perfect",
"best",
];
let negative = [
"bad",
"terrible",
"awful",
"horrible",
"hate",
"angry",
"frustrated",
"disappointed",
"worst",
"broken",
"fail",
"problem",
];
let urgent = [
"urgent",
"asap",
"immediately",
"emergency",
"critical",
"help",
];
let pos_count = positive.iter().filter(|w| text_lower.contains(*w)).count();
let neg_count = negative.iter().filter(|w| text_lower.contains(*w)).count();
let is_urgent = urgent.iter().any(|w| text_lower.contains(*w));
let sentiment = if pos_count > neg_count {
"positive"
} else if neg_count > pos_count {
"negative"
} else {
"neutral"
};
let score = ((pos_count as i64 - neg_count as i64) * 100)
/ (pos_count as i64 + neg_count as i64 + 1).max(1);
let mut result = Map::new();
result.insert("sentiment".into(), Dynamic::from(sentiment.to_string()));
result.insert("score".into(), Dynamic::from(score));
result.insert("urgent".into(), Dynamic::from(is_urgent));
Dynamic::from(result)
}
async fn classify_text(
text: &str,
categories: &[String],
) -> Result<Dynamic, Box<dyn std::error::Error + Send + Sync>> {
let llm_url = std::env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
let cats = categories.join(", ");
let prompt = format!(
r#"Classify into one of: {}
Return JSON: {{"category":"chosen_category","confidence":0.0-1.0}}
Text: {}"#,
cats, text
);
let client = reqwest::Client::new();
let response = client
.post(format!("{}/v1/chat/completions", llm_url))
.json(&serde_json::json!({
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
}))
.send()
.await?
.json::<serde_json::Value>()
.await?;
if let Some(content) = response["choices"][0]["message"]["content"].as_str() {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
let mut result = Map::new();
result.insert(
"category".into(),
Dynamic::from(
parsed["category"]
.as_str()
.unwrap_or(categories.first().map(|s| s.as_str()).unwrap_or("unknown"))
.to_string(),
),
);
result.insert(
"confidence".into(),
Dynamic::from(parsed["confidence"].as_f64().unwrap_or(0.5)),
);
return Ok(Dynamic::from(result));
}
}
let mut result = Map::new();
result.insert(
"category".into(),
Dynamic::from(
categories
.first()
.map(|s| s.as_str())
.unwrap_or("unknown")
.to_string(),
),
);
result.insert("confidence".into(), Dynamic::from(0.0_f64));
Ok(Dynamic::from(result))
}

View file

@ -230,6 +230,67 @@ fn register_utility_functions(engine: &mut Engine) {
(0..count).map(|_| value.clone()).collect()
});
// BATCH - split array into batches of specified size
engine.register_fn("BATCH", |arr: Array, batch_size: i64| -> Array {
let size = batch_size.max(1) as usize;
arr.chunks(size)
.map(|chunk| Dynamic::from(chunk.to_vec()))
.collect()
});
engine.register_fn("batch", |arr: Array, batch_size: i64| -> Array {
let size = batch_size.max(1) as usize;
arr.chunks(size)
.map(|chunk| Dynamic::from(chunk.to_vec()))
.collect()
});
// CHUNK - alias for BATCH
engine.register_fn("CHUNK", |arr: Array, chunk_size: i64| -> Array {
let size = chunk_size.max(1) as usize;
arr.chunks(size)
.map(|chunk| Dynamic::from(chunk.to_vec()))
.collect()
});
engine.register_fn("chunk", |arr: Array, chunk_size: i64| -> Array {
let size = chunk_size.max(1) as usize;
arr.chunks(size)
.map(|chunk| Dynamic::from(chunk.to_vec()))
.collect()
});
// TAKE - take first N elements
engine.register_fn("TAKE", |arr: Array, n: i64| -> Array {
arr.into_iter().take(n.max(0) as usize).collect()
});
engine.register_fn("take", |arr: Array, n: i64| -> Array {
arr.into_iter().take(n.max(0) as usize).collect()
});
// DROP - drop first N elements
engine.register_fn("DROP", |arr: Array, n: i64| -> Array {
arr.into_iter().skip(n.max(0) as usize).collect()
});
engine.register_fn("drop", |arr: Array, n: i64| -> Array {
arr.into_iter().skip(n.max(0) as usize).collect()
});
// HEAD - alias for TAKE
engine.register_fn("HEAD", |arr: Array, n: i64| -> Array {
arr.into_iter().take(n.max(0) as usize).collect()
});
// TAIL - get last N elements
engine.register_fn("TAIL", |arr: Array, n: i64| -> Array {
let len = arr.len();
let skip = len.saturating_sub(n.max(0) as usize);
arr.into_iter().skip(skip).collect()
});
engine.register_fn("tail", |arr: Array, n: i64| -> Array {
let len = arr.len();
let skip = len.saturating_sub(n.max(0) as usize);
arr.into_iter().skip(skip).collect()
});
debug!("Registered array utility functions");
}

View file

@ -3,6 +3,7 @@ pub mod add_bot;
pub mod add_member;
pub mod add_suggestion;
pub mod agent_reflection;
pub mod ai_tools;
pub mod api_tool_generator;
pub mod arrays;
pub mod book;
@ -68,4 +69,5 @@ pub mod user_memory;
pub mod validation;
pub mod wait;
pub mod weather;
pub mod web_data;
pub mod webhook;

View file

@ -59,6 +59,43 @@ pub enum SmsProvider {
Custom(String),
}
/// SMS Priority levels
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum SmsPriority {
Low,
Normal,
High,
Urgent,
}
impl Default for SmsPriority {
fn default() -> Self {
SmsPriority::Normal
}
}
impl From<&str> for SmsPriority {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"low" => SmsPriority::Low,
"high" => SmsPriority::High,
"urgent" | "critical" => SmsPriority::Urgent,
_ => SmsPriority::Normal,
}
}
}
impl std::fmt::Display for SmsPriority {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SmsPriority::Low => write!(f, "low"),
SmsPriority::Normal => write!(f, "normal"),
SmsPriority::High => write!(f, "high"),
SmsPriority::Urgent => write!(f, "urgent"),
}
}
}
impl From<&str> for SmsProvider {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
@ -78,13 +115,15 @@ pub struct SmsSendResult {
pub message_id: Option<String>,
pub provider: String,
pub to: String,
pub priority: String,
pub error: Option<String>,
}
/// Register SMS keywords
pub fn register_sms_keywords(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
register_send_sms_keyword(state.clone(), user.clone(), engine);
register_send_sms_with_provider_keyword(state, user, engine);
register_send_sms_with_third_arg_keyword(state.clone(), user.clone(), engine);
register_send_sms_full_keyword(state, user, engine);
}
/// SEND_SMS phone, message
@ -122,6 +161,7 @@ pub fn register_send_sms_keyword(state: Arc<AppState>, user: UserSession, engine
&phone,
&message,
None,
None,
)
.await
});
@ -145,6 +185,7 @@ pub fn register_send_sms_keyword(state: Arc<AppState>, user: UserSession, engine
);
map.insert("provider".into(), Dynamic::from(result.provider));
map.insert("to".into(), Dynamic::from(result.to));
map.insert("priority".into(), Dynamic::from(result.priority));
if let Some(err) = result.error {
map.insert("error".into(), Dynamic::from(err));
}
@ -170,9 +211,10 @@ pub fn register_send_sms_keyword(state: Arc<AppState>, user: UserSession, engine
.unwrap();
}
/// SEND_SMS phone, message, provider
/// Sends an SMS message using a specific provider
pub fn register_send_sms_with_provider_keyword(
/// SEND_SMS phone, message, priority_or_provider
/// Sends an SMS message with priority (low, normal, high, urgent) OR specific provider
/// Auto-detects if third argument is a priority level or provider name
pub fn register_send_sms_with_third_arg_keyword(
state: Arc<AppState>,
user: UserSession,
engine: &mut Engine,
@ -187,9 +229,26 @@ pub fn register_send_sms_with_provider_keyword(
move |context, inputs| {
let phone = context.eval_expression_tree(&inputs[0])?.to_string();
let message = context.eval_expression_tree(&inputs[1])?.to_string();
let provider = context.eval_expression_tree(&inputs[2])?.to_string();
let third_arg = context.eval_expression_tree(&inputs[2])?.to_string();
trace!("SEND_SMS: Sending SMS to {} via {}", phone, provider);
// Check if third argument is a priority or provider
let is_priority = matches!(
third_arg.to_lowercase().as_str(),
"low" | "normal" | "high" | "urgent" | "critical"
);
let (provider_override, priority_override) = if is_priority {
(None, Some(third_arg.clone()))
} else {
(Some(third_arg.clone()), None)
};
trace!(
"SEND_SMS: Sending SMS to {} (third_arg={}, is_priority={})",
phone,
third_arg,
is_priority
);
let state_for_task = Arc::clone(&state_clone);
let user_for_task = user_clone.clone();
@ -209,7 +268,8 @@ pub fn register_send_sms_with_provider_keyword(
&user_for_task,
&phone,
&message,
Some(&provider),
provider_override.as_deref(),
priority_override.as_deref(),
)
.await
});
@ -233,6 +293,194 @@ pub fn register_send_sms_with_provider_keyword(
);
map.insert("provider".into(), Dynamic::from(result.provider));
map.insert("to".into(), Dynamic::from(result.to));
map.insert("priority".into(), Dynamic::from(result.priority));
if let Some(err) = result.error {
map.insert("error".into(), Dynamic::from(err));
}
Ok(Dynamic::from(map))
}
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND_SMS failed: {}", e).into(),
rhai::Position::NONE,
))),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"SEND_SMS timed out".into(),
rhai::Position::NONE,
)))
}
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND_SMS thread failed: {}", e).into(),
rhai::Position::NONE,
))),
}
},
)
.unwrap();
}
/// SEND_SMS phone, message, provider, priority
/// Sends an SMS message using a specific provider with specific priority
pub fn register_send_sms_full_keyword(
state: Arc<AppState>,
user: UserSession,
engine: &mut Engine,
) {
let state_clone = Arc::clone(&state);
let user_clone = user.clone();
engine
.register_custom_syntax(
&["SEND_SMS", "$expr$", ",", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let phone = context.eval_expression_tree(&inputs[0])?.to_string();
let message = context.eval_expression_tree(&inputs[1])?.to_string();
let provider = context.eval_expression_tree(&inputs[2])?.to_string();
let priority = context.eval_expression_tree(&inputs[3])?.to_string();
trace!(
"SEND_SMS: Sending SMS to {} via {} with priority {}",
phone,
provider,
priority
);
let state_for_task = Arc::clone(&state_clone);
let user_for_task = user_clone.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build();
let send_err = if let Ok(rt) = rt {
let result = rt.block_on(async move {
execute_send_sms(
&state_for_task,
&user_for_task,
&phone,
&message,
Some(&provider),
Some(&priority),
)
.await
});
tx.send(result).err()
} else {
tx.send(Err("Failed to build tokio runtime".into())).err()
};
if send_err.is_some() {
error!("Failed to send SEND_SMS result from thread");
}
});
match rx.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(Ok(result)) => {
let mut map = rhai::Map::new();
map.insert("success".into(), Dynamic::from(result.success));
map.insert(
"message_id".into(),
Dynamic::from(result.message_id.unwrap_or_default()),
);
map.insert("provider".into(), Dynamic::from(result.provider));
map.insert("to".into(), Dynamic::from(result.to));
map.insert("priority".into(), Dynamic::from(result.priority));
if let Some(err) = result.error {
map.insert("error".into(), Dynamic::from(err));
}
Ok(Dynamic::from(map))
}
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND_SMS failed: {}", e).into(),
rhai::Position::NONE,
))),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"SEND_SMS timed out".into(),
rhai::Position::NONE,
)))
}
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND_SMS thread failed: {}", e).into(),
rhai::Position::NONE,
))),
}
},
)
.unwrap();
// Also register the 4-argument syntax: SEND_SMS phone, message, provider, priority
let state_clone2 = Arc::clone(&state);
let user_clone2 = user.clone();
engine
.register_custom_syntax(
&[
"SEND_SMS", "$expr$", ",", "$expr$", ",", "$expr$", ",", "$expr$",
],
false,
move |context, inputs| {
let phone = context.eval_expression_tree(&inputs[0])?.to_string();
let message = context.eval_expression_tree(&inputs[1])?.to_string();
let provider = context.eval_expression_tree(&inputs[2])?.to_string();
let priority = context.eval_expression_tree(&inputs[3])?.to_string();
trace!(
"SEND_SMS: Sending SMS to {} via {} with priority {}",
phone,
provider,
priority
);
let state_for_task = Arc::clone(&state_clone2);
let user_for_task = user_clone2.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build();
let send_err = if let Ok(rt) = rt {
let result = rt.block_on(async move {
execute_send_sms(
&state_for_task,
&user_for_task,
&phone,
&message,
Some(&provider),
Some(&priority),
)
.await
});
tx.send(result).err()
} else {
tx.send(Err("Failed to build tokio runtime".into())).err()
};
if send_err.is_some() {
error!("Failed to send SEND_SMS result from thread");
}
});
match rx.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(Ok(result)) => {
let mut map = rhai::Map::new();
map.insert("success".into(), Dynamic::from(result.success));
map.insert(
"message_id".into(),
Dynamic::from(result.message_id.unwrap_or_default()),
);
map.insert("provider".into(), Dynamic::from(result.provider));
map.insert("to".into(), Dynamic::from(result.to));
map.insert("priority".into(), Dynamic::from(result.priority));
if let Some(err) = result.error {
map.insert("error".into(), Dynamic::from(err));
}
@ -264,6 +512,7 @@ async fn execute_send_sms(
phone: &str,
message: &str,
provider_override: Option<&str>,
priority_override: Option<&str>,
) -> Result<SmsSendResult, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
let bot_id = user.bot_id;
@ -278,28 +527,55 @@ async fn execute_send_sms(
let provider = SmsProvider::from(provider_name.as_str());
// Get priority from override or config
let priority = match priority_override {
Some(p) => SmsPriority::from(p),
None => {
let priority_str = config_manager
.get_config(&bot_id, "sms-default-priority", None)
.unwrap_or_else(|_| "normal".to_string());
SmsPriority::from(priority_str.as_str())
}
};
// Normalize phone number
let normalized_phone = normalize_phone_number(phone);
// Send via appropriate provider
// Log priority for high/urgent messages
if matches!(priority, SmsPriority::High | SmsPriority::Urgent) {
info!(
"High priority SMS to {}: priority={}",
normalized_phone, priority
);
}
// Send via appropriate provider (priority passed for providers that support it)
let result = match provider {
SmsProvider::Twilio => send_via_twilio(state, &bot_id, &normalized_phone, message).await,
SmsProvider::AwsSns => send_via_aws_sns(state, &bot_id, &normalized_phone, message).await,
SmsProvider::Vonage => send_via_vonage(state, &bot_id, &normalized_phone, message).await,
SmsProvider::Twilio => {
send_via_twilio(state, &bot_id, &normalized_phone, message, &priority).await
}
SmsProvider::AwsSns => {
send_via_aws_sns(state, &bot_id, &normalized_phone, message, &priority).await
}
SmsProvider::Vonage => {
send_via_vonage(state, &bot_id, &normalized_phone, message, &priority).await
}
SmsProvider::MessageBird => {
send_via_messagebird(state, &bot_id, &normalized_phone, message).await
send_via_messagebird(state, &bot_id, &normalized_phone, message, &priority).await
}
SmsProvider::Custom(name) => {
send_via_custom_webhook(state, &bot_id, &name, &normalized_phone, message).await
send_via_custom_webhook(state, &bot_id, &name, &normalized_phone, message, &priority)
.await
}
};
match result {
Ok(message_id) => {
info!(
"SMS sent successfully to {} via {}: {}",
"SMS sent successfully to {} via {} (priority={}): {}",
normalized_phone,
provider_name,
priority,
message_id.as_deref().unwrap_or("no-id")
);
Ok(SmsSendResult {
@ -307,6 +583,7 @@ async fn execute_send_sms(
message_id,
provider: provider_name,
to: normalized_phone,
priority: priority.to_string(),
error: None,
})
}
@ -317,6 +594,7 @@ async fn execute_send_sms(
message_id: None,
provider: provider_name,
to: normalized_phone,
priority: priority.to_string(),
error: Some(e.to_string()),
})
}
@ -347,6 +625,7 @@ async fn send_via_twilio(
bot_id: &Uuid,
phone: &str,
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
@ -368,7 +647,19 @@ async fn send_via_twilio(
account_sid
);
let params = [("To", phone), ("From", &from_number), ("Body", message)];
// Twilio supports priority through StatusCallback and scheduling
// For urgent messages, we can add priority prefix to message if configured
let final_message = match priority {
SmsPriority::Urgent => format!("[URGENT] {}", message),
SmsPriority::High => format!("[HIGH] {}", message),
_ => message.to_string(),
};
let params = [
("To", phone),
("From", from_number.as_str()),
("Body", final_message.as_str()),
];
let response = client
.post(&url)
@ -392,6 +683,7 @@ async fn send_via_aws_sns(
bot_id: &Uuid,
phone: &str,
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
@ -415,12 +707,22 @@ async fn send_via_aws_sns(
let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
let _date = &timestamp[..8];
// AWS SNS supports SMSType: Promotional or Transactional
// Map priority to SMS type (High/Urgent = Transactional for better delivery)
let sms_type = match priority {
SmsPriority::High | SmsPriority::Urgent => "Transactional",
_ => "Promotional",
};
// Build the request parameters
let params = [
("Action", "Publish"),
("PhoneNumber", phone),
("Message", message),
("Version", "2010-03-31"),
("MessageAttributes.entry.1.Name", "AWS.SNS.SMS.SMSType"),
("MessageAttributes.entry.1.Value.DataType", "String"),
("MessageAttributes.entry.1.Value.StringValue", sms_type),
];
// For simplicity, using query string auth (requires proper AWS SigV4 in production)
@ -454,6 +756,7 @@ async fn send_via_vonage(
bot_id: &Uuid,
phone: &str,
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
@ -471,17 +774,27 @@ async fn send_via_vonage(
let client = reqwest::Client::new();
let payload = serde_json::json!({
// Vonage supports message-class for priority (0 = flash/urgent)
let message_class = match priority {
SmsPriority::Urgent => Some("0"), // Flash message
_ => None,
};
let mut body = serde_json::json!({
"api_key": api_key,
"api_secret": api_secret,
"to": phone.trim_start_matches('+'),
"to": phone,
"from": from_number,
"text": message
});
if let Some(class) = message_class {
body["message-class"] = serde_json::Value::String(class.to_string());
}
let response = client
.post("https://rest.nexmo.com/sms/json")
.json(&payload)
.json(&body)
.send()
.await?;
@ -511,6 +824,7 @@ async fn send_via_messagebird(
bot_id: &Uuid,
phone: &str,
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
@ -526,16 +840,27 @@ async fn send_via_messagebird(
let client = reqwest::Client::new();
let payload = serde_json::json!({
// MessageBird supports typeDetails.class for priority
let type_details = match priority {
SmsPriority::Urgent => Some(serde_json::json!({"class": 0})), // Flash message
SmsPriority::High => Some(serde_json::json!({"class": 1})),
_ => None,
};
let mut body = serde_json::json!({
"originator": originator,
"recipients": [phone.trim_start_matches('+')],
"recipients": [phone],
"body": message
});
if let Some(details) = type_details {
body["typeDetails"] = details;
}
let response = client
.post("https://rest.messagebird.com/messages")
.header("Authorization", format!("AccessKey {}", api_key))
.json(&payload)
.json(&body)
.send()
.await?;
@ -551,23 +876,24 @@ async fn send_via_messagebird(
async fn send_via_custom_webhook(
state: &AppState,
bot_id: &Uuid,
provider_name: &str,
webhook_name: &str,
phone: &str,
message: &str,
priority: &SmsPriority,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let config_manager = ConfigManager::new(state.conn.clone());
let webhook_url = config_manager
.get_config(bot_id, &format!("{}-webhook-url", provider_name), None)
.get_config(bot_id, &format!("{}-webhook-url", webhook_name), None)
.map_err(|_| {
format!(
"Custom SMS webhook URL not configured. Set {}-webhook-url in config.",
provider_name
webhook_name
)
})?;
let api_key = config_manager
.get_config(bot_id, &format!("{}-api-key", provider_name), None)
.get_config(bot_id, &format!("{}-api-key", webhook_name), None)
.ok();
let client = reqwest::Client::new();
@ -575,7 +901,8 @@ async fn send_via_custom_webhook(
let payload = serde_json::json!({
"to": phone,
"message": message,
"provider": provider_name
"provider": webhook_name,
"priority": priority.to_string()
});
let mut request = client.post(&webhook_url).json(&payload);

View file

@ -0,0 +1,420 @@
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::{debug, trace};
use reqwest::Url;
use rhai::{Array, Dynamic, Engine, EvalAltResult, Map, Position};
use scraper::{Html, Selector};
use std::sync::Arc;
use std::time::Duration;
pub fn register_web_data_keywords(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
register_rss_keyword(state.clone(), user.clone(), engine);
register_scrape_keyword(state.clone(), user.clone(), engine);
register_scrape_all_keyword(state.clone(), user.clone(), engine);
register_scrape_table_keyword(state.clone(), user.clone(), engine);
register_scrape_links_keyword(state.clone(), user.clone(), engine);
register_scrape_images_keyword(state, user, engine);
}
fn register_rss_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(&["RSS", "$expr$"], false, move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
trace!("RSS {}", url);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { fetch_rss(&url, 100).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("RSS failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"RSS timed out".into(),
Position::NONE,
))),
}
})
.unwrap();
engine
.register_custom_syntax(
&["RSS", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
let limit = context
.eval_expression_tree(&inputs[1])?
.as_int()
.unwrap_or(10) as usize;
trace!("RSS {} limit {}", url, limit);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { fetch_rss(&url, limit).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("RSS failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"RSS timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered RSS keyword");
}
async fn fetch_rss(
url: &str,
limit: usize,
) -> Result<Array, Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::builder()
.user_agent("BotServer/6.1.0")
.timeout(Duration::from_secs(30))
.build()?;
let content = client.get(url).send().await?.bytes().await?;
let channel = rss::Channel::read_from(&content[..])?;
let mut results = Array::new();
for item in channel.items().iter().take(limit) {
let mut entry = Map::new();
entry.insert(
"title".into(),
Dynamic::from(item.title().unwrap_or("").to_string()),
);
entry.insert(
"link".into(),
Dynamic::from(item.link().unwrap_or("").to_string()),
);
entry.insert(
"description".into(),
Dynamic::from(item.description().unwrap_or("").to_string()),
);
entry.insert(
"pubDate".into(),
Dynamic::from(item.pub_date().unwrap_or("").to_string()),
);
entry.insert(
"author".into(),
Dynamic::from(item.author().unwrap_or("").to_string()),
);
if let Some(guid) = item.guid() {
entry.insert("guid".into(), Dynamic::from(guid.value().to_string()));
}
results.push(Dynamic::from(entry));
}
Ok(results)
}
fn register_scrape_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["SCRAPE", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
let selector = context.eval_expression_tree(&inputs[1])?.to_string();
trace!("SCRAPE {} selector {}", url, selector);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { scrape_first(&url, &selector).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SCRAPE failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"SCRAPE timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered SCRAPE keyword");
}
fn register_scrape_all_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["SCRAPE_ALL", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
let selector = context.eval_expression_tree(&inputs[1])?.to_string();
trace!("SCRAPE_ALL {} selector {}", url, selector);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { scrape_all(&url, &selector).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SCRAPE_ALL failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"SCRAPE_ALL timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered SCRAPE_ALL keyword");
}
fn register_scrape_table_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["SCRAPE_TABLE", "$expr$", ",", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
let selector = context.eval_expression_tree(&inputs[1])?.to_string();
trace!("SCRAPE_TABLE {} selector {}", url, selector);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { scrape_table(&url, &selector).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SCRAPE_TABLE failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"SCRAPE_TABLE timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered SCRAPE_TABLE keyword");
}
fn register_scrape_links_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["SCRAPE_LINKS", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
trace!("SCRAPE_LINKS {}", url);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { scrape_links(&url).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SCRAPE_LINKS failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"SCRAPE_LINKS timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered SCRAPE_LINKS keyword");
}
fn register_scrape_images_keyword(_state: Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine
.register_custom_syntax(
&["SCRAPE_IMAGES", "$expr$"],
false,
move |context, inputs| {
let url = context.eval_expression_tree(&inputs[0])?.to_string();
trace!("SCRAPE_IMAGES {}", url);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async { scrape_images(&url).await });
let _ = tx.send(result);
});
match rx.recv_timeout(Duration::from_secs(30)) {
Ok(Ok(result)) => Ok(Dynamic::from(result)),
Ok(Err(e)) => Err(Box::new(EvalAltResult::ErrorRuntime(
format!("SCRAPE_IMAGES failed: {}", e).into(),
Position::NONE,
))),
Err(_) => Err(Box::new(EvalAltResult::ErrorRuntime(
"SCRAPE_IMAGES timed out".into(),
Position::NONE,
))),
}
},
)
.unwrap();
debug!("Registered SCRAPE_IMAGES keyword");
}
async fn fetch_page(url: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (compatible; BotServer/6.1.0)")
.timeout(Duration::from_secs(30))
.build()?;
let response = client.get(url).send().await?.text().await?;
Ok(response)
}
async fn scrape_first(
url: &str,
selector: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let html = fetch_page(url).await?;
let document = Html::parse_document(&html);
let sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?;
if let Some(element) = document.select(&sel).next() {
let text = element
.text()
.collect::<Vec<_>>()
.join(" ")
.trim()
.to_string();
return Ok(text);
}
Ok(String::new())
}
async fn scrape_all(
url: &str,
selector: &str,
) -> Result<Array, Box<dyn std::error::Error + Send + Sync>> {
let html = fetch_page(url).await?;
let document = Html::parse_document(&html);
let sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?;
let results: Array = document
.select(&sel)
.map(|el| {
let text = el.text().collect::<Vec<_>>().join(" ").trim().to_string();
Dynamic::from(text)
})
.collect();
Ok(results)
}
async fn scrape_table(
url: &str,
selector: &str,
) -> Result<Array, Box<dyn std::error::Error + Send + Sync>> {
let html = fetch_page(url).await?;
let document = Html::parse_document(&html);
let table_sel = Selector::parse(selector).map_err(|e| format!("Invalid selector: {:?}", e))?;
let tr_sel = Selector::parse("tr").unwrap();
let th_sel = Selector::parse("th").unwrap();
let td_sel = Selector::parse("td").unwrap();
let mut results = Array::new();
let mut headers: Vec<String> = Vec::new();
if let Some(table) = document.select(&table_sel).next() {
for (i, row) in table.select(&tr_sel).enumerate() {
if i == 0 {
headers = row
.select(&th_sel)
.chain(row.select(&td_sel))
.map(|cell| cell.text().collect::<Vec<_>>().join(" ").trim().to_string())
.collect();
if headers.is_empty() {
continue;
}
} else {
let mut row_map = Map::new();
for (j, cell) in row.select(&td_sel).enumerate() {
let key = headers
.get(j)
.cloned()
.unwrap_or_else(|| format!("col{}", j));
let value = cell.text().collect::<Vec<_>>().join(" ").trim().to_string();
row_map.insert(key.into(), Dynamic::from(value));
}
if !row_map.is_empty() {
results.push(Dynamic::from(row_map));
}
}
}
}
Ok(results)
}
async fn scrape_links(url: &str) -> Result<Array, Box<dyn std::error::Error + Send + Sync>> {
let html = fetch_page(url).await?;
let document = Html::parse_document(&html);
let sel = Selector::parse("a[href]").unwrap();
let base_url = Url::parse(url)?;
let mut results = Array::new();
for el in document.select(&sel) {
if let Some(href) = el.value().attr("href") {
let absolute = base_url
.join(href)
.map(|u| u.to_string())
.unwrap_or_default();
if !absolute.is_empty() {
let mut link = Map::new();
link.insert("href".into(), Dynamic::from(absolute));
link.insert(
"text".into(),
Dynamic::from(el.text().collect::<Vec<_>>().join(" ").trim().to_string()),
);
results.push(Dynamic::from(link));
}
}
}
Ok(results)
}
async fn scrape_images(url: &str) -> Result<Array, Box<dyn std::error::Error + Send + Sync>> {
let html = fetch_page(url).await?;
let document = Html::parse_document(&html);
let sel = Selector::parse("img[src]").unwrap();
let base_url = Url::parse(url)?;
let mut results = Array::new();
for el in document.select(&sel) {
if let Some(src) = el.value().attr("src") {
let absolute = base_url
.join(src)
.map(|u| u.to_string())
.unwrap_or_default();
if !absolute.is_empty() {
let mut img = Map::new();
img.insert("src".into(), Dynamic::from(absolute));
img.insert(
"alt".into(),
Dynamic::from(el.value().attr("alt").unwrap_or("").to_string()),
);
results.push(Dynamic::from(img));
}
}
}
Ok(results)
}

View file

@ -23,6 +23,7 @@ struct ParamConfigRow {
use self::keywords::add_bot::register_bot_keywords;
use self::keywords::add_member::add_member_keyword;
use self::keywords::add_suggestion::add_suggestion_keyword;
use self::keywords::ai_tools::register_ai_tools_keywords;
use self::keywords::book::book_keyword;
use self::keywords::bot_memory::{get_bot_memory_keyword, set_bot_memory_keyword};
use self::keywords::clear_kb::register_clear_kb_keyword;
@ -49,12 +50,14 @@ use self::keywords::remember::remember_keyword;
use self::keywords::save_from_unstructured::save_from_unstructured_keyword;
use self::keywords::send_mail::send_mail_keyword;
use self::keywords::send_template::register_send_template_keywords;
use self::keywords::sms::register_sms_keywords;
use self::keywords::social_media::register_social_media_keywords;
use self::keywords::switch_case::preprocess_switch;
use self::keywords::transfer_to_human::register_transfer_to_human_keyword;
use self::keywords::use_kb::register_use_kb_keyword;
use self::keywords::use_tool::use_tool_keyword;
use self::keywords::use_website::{clear_websites_keyword, use_website_keyword};
use self::keywords::web_data::register_web_data_keywords;
use self::keywords::webhook::webhook_keyword;
use self::keywords::llm_keyword::llm_keyword;
@ -182,6 +185,21 @@ impl ScriptService {
// Supports transfer by name/alias, department, priority, and context
register_transfer_to_human_keyword(state.clone(), user.clone(), &mut engine);
// ========================================================================
// AI-POWERED TOOLS: TRANSLATE, OCR, SENTIMENT, CLASSIFY
// ========================================================================
register_ai_tools_keywords(state.clone(), user.clone(), &mut engine);
// ========================================================================
// WEB DATA: RSS, SCRAPE, SCRAPE_ALL, SCRAPE_TABLE, SCRAPE_LINKS, SCRAPE_IMAGES
// ========================================================================
register_web_data_keywords(state.clone(), user.clone(), &mut engine);
// ========================================================================
// SMS: SEND_SMS phone, message - Send SMS via Twilio, AWS SNS, Vonage, etc.
// ========================================================================
register_sms_keywords(state.clone(), user.clone(), &mut engine);
// ========================================================================
// CORE BASIC FUNCTIONS: Math, Date/Time, Validation, Arrays, Error Handling
// ========================================================================
@ -189,7 +207,7 @@ impl ScriptService {
// Math: ABS, ROUND, INT, MAX, MIN, MOD, RANDOM, SGN, SQR, LOG, EXP, SIN, COS, TAN
// Date/Time: NOW, TODAY, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, DATEADD, DATEDIFF
// Validation: VAL, STR, ISNULL, ISEMPTY, ISDATE, TYPEOF
// Arrays: ARRAY, UBOUND, SORT, UNIQUE, CONTAINS, PUSH, POP, REVERSE, SLICE
// Arrays: ARRAY, UBOUND, SORT, UNIQUE, CONTAINS, PUSH, POP, REVERSE, SLICE, BATCH, CHUNK
// Error Handling: THROW, ERROR, IS_ERROR, ASSERT
register_core_functions(state.clone(), user, &mut engine);

View file

@ -66,6 +66,7 @@ impl Editor {
pub fn file_path(&self) -> &str {
&self.file_path
}
#[allow(dead_code)]
pub fn set_visible_lines(&mut self, lines: usize) {
self.visible_lines = lines.max(5);
}
@ -188,10 +189,12 @@ impl Editor {
}
}
#[allow(dead_code)]
pub fn scroll_up(&mut self) {
self.scroll_offset = self.scroll_offset.saturating_sub(1);
}
#[allow(dead_code)]
pub fn scroll_down(&mut self) {
let total_lines = self.content.lines().count().max(1);
let max_scroll = total_lines.saturating_sub(self.visible_lines.saturating_sub(3));

View file

@ -6,10 +6,10 @@ use crossterm::{
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use log::LevelFilter;
use ratatui::{
backend::CrosstermBackend,
layout::{Alignment, Constraint, Direction, Layout, Rect},
layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style},
text::{Line, Span},
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},

View file

@ -200,6 +200,7 @@ impl std::fmt::Debug for AppState {
#[cfg(feature = "llm")]
#[derive(Debug)]
#[allow(dead_code)]
struct MockLLMProvider;
#[cfg(feature = "llm")]
@ -236,6 +237,7 @@ impl LLMProvider for MockLLMProvider {
}
#[cfg(feature = "directory")]
#[allow(dead_code)]
fn create_mock_auth_service() -> AuthService {
use crate::directory::client::ZitadelConfig;

View file

@ -128,14 +128,18 @@ pub async fn ensure_llama_servers_running(
let mut llm_ready = llm_running || llm_model.is_empty();
let mut embedding_ready = embedding_running || embedding_model.is_empty();
let mut attempts = 0;
let max_attempts = 60;
let max_attempts = 120; // Increased to 4 minutes for large models
while attempts < max_attempts && (!llm_ready || !embedding_ready) {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
info!(
"Checking server health (attempt {}/{})...",
attempts + 1,
max_attempts
);
// Only log every 5 attempts to reduce noise
if attempts % 5 == 0 {
info!(
"Checking server health (attempt {}/{})...",
attempts + 1,
max_attempts
);
}
if !llm_ready && !llm_model.is_empty() {
if is_server_running(&llm_url).await {
info!("LLM server ready at {}", llm_url);
@ -148,14 +152,26 @@ pub async fn ensure_llama_servers_running(
if is_server_running(&embedding_url).await {
info!("Embedding server ready at {}", embedding_url);
embedding_ready = true;
} else {
info!("Embedding server not ready yet");
} else if attempts % 10 == 0 {
warn!("Embedding server not ready yet at {}", embedding_url);
// Try to read log file for diagnostics
if let Ok(log_content) =
std::fs::read_to_string(format!("{}/llmembd-stdout.log", llm_server_path))
{
let last_lines: Vec<&str> = log_content.lines().rev().take(5).collect();
if !last_lines.is_empty() {
info!("Embedding server log (last 5 lines):");
for line in last_lines.iter().rev() {
info!(" {}", line);
}
}
}
}
}
attempts += 1;
if attempts % 10 == 0 {
info!(
"Still waiting for servers... (attempt {}/{})",
if attempts % 20 == 0 {
warn!(
"Still waiting for servers... (attempt {}/{}) - this may take a while for large models",
attempts, max_attempts
);
}
@ -181,10 +197,37 @@ pub async fn ensure_llama_servers_running(
}
}
pub async fn is_server_running(url: &str) -> bool {
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.unwrap_or_default();
// Try /health first (standard llama.cpp endpoint)
match client.get(&format!("{}/health", url)).send().await {
Ok(response) => response.status().is_success(),
Err(_) => false,
Ok(response) => {
if response.status().is_success() {
return true;
}
// Log non-success status for debugging
info!("Health check returned status: {}", response.status());
false
}
Err(e) => {
// Also try root endpoint as fallback
match client.get(url).send().await {
Ok(response) => response.status().is_success(),
Err(_) => {
// Only log connection errors occasionally to avoid spam
if e.is_connect() {
// Connection refused - server not started yet
false
} else {
warn!("Health check error for {}: {}", url, e);
false
}
}
}
}
}
}
pub async fn start_llm_server(
@ -296,10 +339,28 @@ pub async fn start_embedding_server(
url: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let port = url.split(':').last().unwrap_or("8082");
// Check if model file exists
let full_model_path = if model_path.starts_with('/') {
model_path.clone()
} else {
format!("{}/{}", llama_cpp_path, model_path)
};
if !std::path::Path::new(&full_model_path).exists() {
error!("Embedding model file not found: {}", full_model_path);
return Err(format!("Embedding model file not found: {}", full_model_path).into());
}
info!(
"Starting embedding server on port {} with model: {}",
port, model_path
);
if cfg!(windows) {
let mut cmd = tokio::process::Command::new("cmd");
cmd.arg("/c").arg(format!(
"cd {} && .\\llama-server.exe -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >stdout.log",
"cd {} && .\\llama-server.exe -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >stdout.log 2>&1",
llama_cpp_path, model_path, port
));
cmd.spawn()?;
@ -309,7 +370,15 @@ pub async fn start_embedding_server(
"cd {} && ./llama-server -m {} --verbose --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 >llmembd-stdout.log 2>&1 &",
llama_cpp_path, model_path, port
));
info!(
"Executing embedding server command: cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding",
llama_cpp_path, model_path, port
);
cmd.spawn()?;
}
// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}