diff --git a/Cargo.lock b/Cargo.lock index 3c27df6..4b21d4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4302,6 +4302,7 @@ dependencies = [ "atoi", "byteorder", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -4327,10 +4328,12 @@ dependencies = [ "smallvec", "sqlformat", "thiserror 1.0.69", + "time", "tokio", "tokio-stream", "tracing", "url", + "uuid", "webpki-roots", ] @@ -4384,6 +4387,7 @@ dependencies = [ "bitflags 2.9.1", "byteorder", "bytes", + "chrono", "crc", "digest", "dotenvy", @@ -4411,7 +4415,9 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror 1.0.69", + "time", "tracing", + "uuid", "whoami", ] @@ -4425,6 +4431,7 @@ dependencies = [ "base64 0.21.7", "bitflags 2.9.1", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -4449,7 +4456,9 @@ dependencies = [ "sqlx-core", "stringprep", "thiserror 1.0.69", + "time", "tracing", + "uuid", "whoami", ] @@ -4460,6 +4469,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", @@ -4471,9 +4481,11 @@ dependencies = [ "percent-encoding", "serde", "sqlx-core", + "time", "tracing", "url", "urlencoding", + "uuid", ] [[package]] @@ -5192,6 +5204,7 @@ checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" dependencies = [ "getrandom 0.3.3", "js-sys", + "serde", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index 0a0255b..2f2f197 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,10 @@ description = "General Bots Server" license = "AGPL" repository = "https://alm.pragmatismo.com.br/generalbots/gbserver" +[features] +default = [] +local_llm = [] + [dependencies] actix-cors = "0.6" actix-multipart = "0.6" @@ -39,7 +43,13 @@ rhai = "1.22.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smartstring = "1.0" # Use the latest version from crates.io -sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres"] } +sqlx = { version = "0.7", features = [ + "time", + "uuid", + "runtime-tokio-rustls", + "postgres", + "chrono", +] } tempfile = "3" tokio = { version = "1", features = ["full"] } tokio-stream = "0.1.17" @@ -48,5 +58,5 @@ tracing-subscriber = { version = "0.3", features = ["fmt"] } scraper = "0.18" urlencoding = "2.1" regex = "1.10" -uuid = { version = "1.0", features = ["v4"] } +uuid = { version = "1.4", features = ["serde", "v4"] } # v4, v7, etc. as needed zip = "4.3.0" diff --git a/src/main.rs b/src/main.rs index 9ce5b03..f6196f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,20 +4,17 @@ use actix_cors::Cors; use actix_web::http::header; use actix_web::{web, App, HttpServer}; use dotenv::dotenv; - -use reqwest::Client; -use services::config::*; -use services::email::*; -use services::file::*; -use services::llm::*; -use services::script::*; use services::state::*; +use services::{config::*, file::*}; use sqlx::PgPool; -use crate::services::llm_local::ensure_llama_server_running; +use crate::services::automation::AutomationService; +use crate::services::email::{get_emails, list_emails, save_click, send_email}; +use crate::services::llm::{chat, chat_stream}; use crate::services::llm_provider::chat_completions; use crate::services::web_automation::{initialize_browser_pool, BrowserPool}; +mod models; mod services; #[tokio::main(flavor = "multi_thread")] @@ -41,9 +38,14 @@ async fn main() -> std::io::Result<()> { "/usr/bin/brave-browser-beta".to_string(), )); - // ensure_llama_server_running() - // .await - // .expect("Failed to initialize LLM local server."); + #[cfg(feature = "local_llm")] + { + use crate::services::llm_local::ensure_llama_server_running; + + ensure_llama_server_running() + .await + .expect("Failed to initialize LLM local server."); + } initialize_browser_pool() .await @@ -57,17 +59,11 @@ async fn main() -> std::io::Result<()> { browser_pool: browser_pool.clone(), }); - let script_service = ScriptService::new(&app_state.clone()); + // Start automation service in background + let automation_state = app_state.get_ref().clone(); // This gets the Arc - const TEXT: &str = include_str!("prompts/business/data-enrichment.bas"); - - match script_service.compile(TEXT) { - Ok(ast) => match script_service.run(&ast) { - Ok(result) => println!("Script executed successfully: {:?}", result), - Err(e) => eprintln!("Error executing script: {}", e), - }, - Err(e) => eprintln!("Error compiling script: {}", e), - } + let automation = AutomationService::new(automation_state, "../../src/scripts"); + let _automation_handle = automation.spawn(); // Start HTTP server HttpServer::new(move || { @@ -77,9 +73,9 @@ async fn main() -> std::io::Result<()> { .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT]) .allowed_header(header::CONTENT_TYPE) .max_age(3600); - //.wrap(cors) App::new() + .wrap(cors) .app_data(app_state.clone()) .service(upload_file) .service(list_file) diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..17de93b --- /dev/null +++ b/src/models.rs @@ -0,0 +1 @@ +pub mod automation_model; diff --git a/src/models/automation_model.rs b/src/models/automation_model.rs new file mode 100644 index 0000000..967f977 --- /dev/null +++ b/src/models/automation_model.rs @@ -0,0 +1,35 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum TriggerKind { + Scheduled = 0, + TableUpdate = 1, + TableInsert = 2, + TableDelete = 3, +} + +impl TriggerKind { + pub fn from_i32(value: i32) -> Option { + match value { + 0 => Some(Self::Scheduled), + 1 => Some(Self::TableUpdate), + 2 => Some(Self::TableInsert), + 3 => Some(Self::TableDelete), + _ => None, + } + } +} + +#[derive(Debug, FromRow, Serialize, Deserialize)] +pub struct Automation { + pub id: Uuid, + pub kind: i32, // Using number for trigger type + pub target: Option, + pub schedule: Option, + pub param: String, + pub is_active: bool, + pub last_triggered: Option>, +} diff --git a/src/models/bot_model.rs b/src/models/bot_model.rs new file mode 100644 index 0000000..19affdf --- /dev/null +++ b/src/models/bot_model.rs @@ -0,0 +1,18 @@ +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct Bot { + pub bot_id: Uuid, + pub name: String, + pub status: BotStatus, + pub config: serde_json::Value, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)] +#[serde(rename_all = "snake_case")] +#[sqlx(type_name = "bot_status", rename_all = "snake_case")] +pub enum BotStatus { + Active, + Inactive, + Maintenance, +} diff --git a/src/prompts/business/create-lead-from-draft.bas b/src/prompts/business/create-lead-from-draft.bas new file mode 100644 index 0000000..270386b --- /dev/null +++ b/src/prompts/business/create-lead-from-draft.bas @@ -0,0 +1,5 @@ + +PARAM text as STRING +DESCRIPTION "Called when someone wants to create a customer by pasting unstructured text, like and e-mail answer." + +SAVE_FROM_UNSTRUCTURED "rob", text diff --git a/src/prompts/business/data-enrichment.bas b/src/prompts/business/data-enrichment.bas index 8659500..124cb59 100644 --- a/src/prompts/business/data-enrichment.bas +++ b/src/prompts/business/data-enrichment.bas @@ -1,20 +1,30 @@ -let items = FIND "gb.rob", "ACTION=EMUL1" +let items = FIND "gb.rob", "ACTION=EMUL" FOR EACH item IN items PRINT item.company - let website = WEBSITE OF item.company - PRINT website + + let website = item.website ?? "" + if item.website == "" { + website = WEBSITE OF item.company + PRINT website + } let page = GET website - let prompt = "Build the same simulator, but for " + item.company + " using just *content about the company* from its website, so it is possible to create a good and useful emulator in the same langue as the content: " + page + let prompt = "Build the same simulator, keep js, svg, css, assets paths, just change title, keep six cases of six messages each (change and return VALID JSON with a minium of 6 cases and 6-8 messages each), but for " + item.company + " using just *content about the company* " + item.llm_notes + " from its website, so it is possible to create a good and useful emulator in the same langue as the content: " + page let alias = LLM "Return a single word for " + item.company + " like a token, no spaces, no special characters, no numbers, no uppercase letters." - CREATE_SITE alias, "blank", prompt + CREATE_SITE alias, "gb-emulator-base", prompt let to = item.emailcto - let subject = "Simulador " + alias - let name = FIRST(item.Contact) - let body = "Oi, " + name + "! Tudo bem? Estou empolgado, pois criamos o simulador " + alias + " especificamente para vocês!" + "\n\n Acesse o site: https://sites.pragmatismo.com.br/" + alias + "\n\n" + "Para acessar o simulador, clique no link acima ou copie e cole no seu navegador." + "\n\n" + "Para iniciar, clique no ícone de Play." + "\n\n" + "Atenciosamente,\nDário Vieira\n\n" + let subject = "Simulador " + alias + " ficou pronto" + let name = FIRST(item.contact) + let body = "Oi, " + name + ". Tudo bem? Para vocês terem uma ideia do ambiente conversacional em AI e algumas possibilidades, preparamos o " + alias + " especificamente para vocês!" + "\n\n Acesse o site: https://sites.pragmatismo.com.br/" + alias + "\n\n" + "Para acessar o simulador, clique no link acima ou copie e cole no seu navegador." + "\n\n" + "Para iniciar, escolha um dos casos conversacionais." + "\n\n" + "Atenciosamente,\nRodrigo Rodriguez\n\n" + + let body = LLM "Melhora este e-mail: ------ " + body + " ----- mas mantem o link e inclui alguma referência ao histórico com o cliente: " + item.history + CREATE_DRAFT to, subject, body + SET "gb.rob", "id="+ item.id, "ACTION=CALL" + SET "gb.rob", "id="+ item.id, "emulator=true" + WAIT 3000 NEXT item diff --git a/src/prompts/marketing/add-new-idea.bas b/src/prompts/marketing/add-new-idea.bas new file mode 100644 index 0000000..99b6071 --- /dev/null +++ b/src/prompts/marketing/add-new-idea.bas @@ -0,0 +1,5 @@ + +PARAM idea as STRING +DESCRIPTION "Called when someone have an idea and wants to keep it." + +SAVE "marketing_ideas", idea, username diff --git a/src/scripts/containers/llm.sh b/src/scripts/containers/llm.sh index e94476c..edd21cb 100644 --- a/src/scripts/containers/llm.sh +++ b/src/scripts/containers/llm.sh @@ -1,61 +1,10 @@ -#DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf -#Phi-3.5-mini-instruct-IQ2_M.gguf -#tinyllama-1.1b-chat-v1.0.Q4_0.gguf - -sudo apt update -sudo apt upgrade -y -sudo apt install -y build-essential cmake git curl wget libcurl4-openssl-dev pkg-config gcc-9 g++-9 - -sudo apt install software-properties-common -sudo add-apt-repository ppa:deadsnakes/ppa -sudo apt install python3.6 python3.6-venv python3.6-dev -wget https://download.pytorch.org/whl/cu110/torch-1.7.1%2Bcu110-cp36-cp36m-linux_x86_64.whl -wget https://download.pytorch.org/whl/cu110/torchvision-0.8.2%2Bcu110-cp36-cp36m-linux_x86_64.whl - - -sudo ubuntu-drivers autoinstall - -sleep 10 - -CUDA_RUN_FILE="cuda_11.0.3_450.51.06_linux.run" -wget https://developer.download.nvidia.com/compute/cuda/11.0.3/local_installers/$CUDA_RUN_FILE -chmod +x $CUDA_RUN_FILE -sudo ./$CUDA_RUN_FILE --silent --toolkit - -echo 'export PATH=/usr/local/cuda-11.0/bin:$PATH' >> ~/.bashrc -echo 'export LD_LIBRARY_PATH=/usr/local/cuda-11.0/lib64:$LD_LIBRARY_PATH' >> ~/.bashrc -source ~/.bashrc - -nvidia-smi -nvcc --version - -python3 -m venv llama_venv -source llama_venv/bin/activate -pip install --upgrade pip -pip install torch==1.12.1+cu110 torchvision==0.13.1+cu110 torchaudio==0.12.1 --extra-index-url https://download.pytorch.org/whl/cu110 - -cd ~ -git clone https://github.com/ggerganov/llama.cpp.git -cd llama.cpp -rm -rf build -mkdir build -cd build - - -# EDIT FILE: -#ifdef __CUDACC__ - #ifndef __builtin_assume - #define __builtin_assume(x) // empty: ignore it for CUDA compiler - #endif -#endif -# ggml/src/ggml-cuda/fattn-common. -# -cmake -DGGML_CUDA=ON -DCMAKE_CUDA_ARCHITECTURES=35 .. -make -j$(nproc) - -OR wget https://github.com/ggml-org/llama.cpp/releases/download/b6148/llama-b6148-bin-ubuntu-x64.zip - wget https://huggingface.co/TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF/resolve/main/tinyllama-1.1b-chat-v1.0.Q4_0.gguf?download=true + +# DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf +# Phi-3.5-mini-instruct-IQ2_M.gguf + +# ./llama-cli -m tinyllama-1.1b-chat-v1.0.Q4_0.gguf --reasoning-budget 0 --reasoning-format none -mli +# ./llama-cli -m DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf --system-prompt " Output as JSON: Name 3 colors and their HEX codes. Use format: [{\"name\": \"red\", \"hex\": \"#FF0000\"}]" --reasoning-budget 0 --reasoning-format none -mli diff --git a/src/scripts/database/0002.sql b/src/scripts/database/0002.sql new file mode 100644 index 0000000..bace0cb --- /dev/null +++ b/src/scripts/database/0002.sql @@ -0,0 +1,11 @@ + CREATE TABLE system_automations ( + id uuid PRIMARY KEY, + kind NUMBER, + target VARCHAR(32), + schedule CHAR(6), + param VARCHAR(32) NOT NULL, + is_active BOOL NOT NULL DEFAULT TRUE, + last_triggered TIMESTAMPTZ +); + +CREATE INDEX idx_active_automations ON system_automations(kind) WHERE is_active; diff --git a/src/services.rs b/src/services.rs index 57eaa80..567bcc4 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,3 +1,4 @@ +pub mod automation; pub mod config; pub mod email; pub mod file; diff --git a/src/services/automation.rs b/src/services/automation.rs new file mode 100644 index 0000000..5199405 --- /dev/null +++ b/src/services/automation.rs @@ -0,0 +1,197 @@ +use crate::models::automation_model::{Automation, TriggerKind}; +use crate::services::script::ScriptService; +use crate::services::state::AppState; +use chrono::Datelike; +use chrono::Timelike; +use chrono::{DateTime, Utc}; +use std::path::Path; +use tokio::time::Duration; +use uuid::Uuid; + +pub struct AutomationService { + state: AppState, // Use web::Data directly + scripts_dir: String, +} + +impl AutomationService { + pub fn new(state: AppState, scripts_dir: &str) -> Self { + Self { + state, + scripts_dir: scripts_dir.to_string(), + } + } + + pub fn spawn(self) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + let mut last_check = Utc::now(); + + loop { + interval.tick().await; + + if let Err(e) = self.run_cycle(&mut last_check).await { + eprintln!("Automation cycle error: {}", e); + } + } + }) + } + + async fn run_cycle( + &self, + last_check: &mut DateTime, + ) -> Result<(), Box> { + let automations = self.load_active_automations().await?; + self.check_table_changes(&automations, *last_check).await; + self.process_schedules(&automations).await; + *last_check = Utc::now(); + Ok(()) + } + + async fn load_active_automations(&self) -> Result, sqlx::Error> { + if let Some(pool) = &self.state.db { + sqlx::query_as::<_, Automation>( + r#" + SELECT id, kind, target, schedule, param, is_active, last_triggered + FROM system_automations + WHERE is_active = true + "#, + ) + .fetch_all(pool) + .await + } else { + Err(sqlx::Error::PoolClosed) + } + } + + async fn check_table_changes(&self, automations: &[Automation], since: DateTime) { + if let Some(pool) = &self.state.db { + for automation in automations { + if let Some(trigger_kind) = TriggerKind::from_i32(automation.kind) { + if matches!( + trigger_kind, + TriggerKind::TableUpdate + | TriggerKind::TableInsert + | TriggerKind::TableDelete + ) { + if let Some(table) = &automation.target { + let column = match trigger_kind { + TriggerKind::TableInsert => "created_at", + _ => "updated_at", + }; + + let query = + format!("SELECT COUNT(*) FROM {} WHERE {} > $1", table, column); + + match sqlx::query_scalar::<_, i64>(&query) + .bind(since) + .fetch_one(pool) + .await + { + Ok(count) => { + if count > 0 { + self.execute_action(&automation.param).await; + self.update_last_triggered(automation.id).await; + } + } + Err(e) => { + eprintln!("Error checking changes for table {}: {}", table, e); + } + } + } + } + } + } + } + } + + async fn process_schedules(&self, automations: &[Automation]) { + let now = Utc::now().timestamp(); + + for automation in automations { + if let Some(TriggerKind::Scheduled) = TriggerKind::from_i32(automation.kind) { + if let Some(pattern) = &automation.schedule { + if Self::should_run_cron(pattern, now) { + self.execute_action(&automation.param).await; + self.update_last_triggered(automation.id).await; + } + } + } + } + } + + async fn update_last_triggered(&self, automation_id: Uuid) { + if let Some(pool) = &self.state.db { + if let Err(e) = sqlx::query!( + "UPDATE system_automations SET last_triggered = $1 WHERE id = $2", + Utc::now(), + automation_id + ) + .execute(pool) + .await + { + eprintln!( + "Failed to update last_triggered for automation {}: {}", + automation_id, e + ); + } + } + } + + fn should_run_cron(pattern: &str, timestamp: i64) -> bool { + let parts: Vec<&str> = pattern.split_whitespace().collect(); + if parts.len() != 5 { + return false; + } + + let dt = chrono::DateTime::from_timestamp(timestamp, 0).unwrap(); + let minute = dt.minute() as i32; + let hour = dt.hour() as i32; + let day = dt.day() as i32; + let month = dt.month() as i32; + let weekday = dt.weekday().num_days_from_monday() as i32; + + [minute, hour, day, month, weekday] + .iter() + .enumerate() + .all(|(i, &val)| Self::cron_part_matches(parts[i], val)) + } + + fn cron_part_matches(part: &str, value: i32) -> bool { + if part == "*" { + return true; + } + if part.contains('/') { + let parts: Vec<&str> = part.split('/').collect(); + if parts.len() != 2 { + return false; + } + let step: i32 = parts[1].parse().unwrap_or(1); + if parts[0] == "*" { + return value % step == 0; + } + } + part.parse::().map_or(false, |num| num == value) + } + + async fn execute_action(&self, param: &str) { + let full_path = Path::new(&self.scripts_dir).join(param); + match tokio::fs::read_to_string(&full_path).await { + Ok(script_content) => { + println!("Executing action with param: {}", param); + + let script_service = ScriptService::new(&self.state.clone()); + + match script_service.compile(&script_content) { + Ok(ast) => match script_service.run(&ast) { + Ok(result) => println!("Script executed successfully: {:?}", result), + Err(e) => eprintln!("Error executing script: {}", e), + }, + Err(e) => eprintln!("Error compiling script: {}", e), + } + } + Err(e) => { + eprintln!("Failed to execute action {}: {}", full_path.display(), e); + } + } + } +} diff --git a/src/services/bot.md b/src/services/bot.md new file mode 100644 index 0000000..2f17c2b --- /dev/null +++ b/src/services/bot.md @@ -0,0 +1,233 @@ + // .service(create_bot) + // .service(get_bot) + // .service(list_bots) + // .service(update_bot) + // .service(delete_bot) + // .service(update_bot_status) + // .service(execute_bot_command) + + + +use crate::services::{config::BotConfig, state::AppState}; +use actix_web::{ + delete, get, post, put, + web::{self, Data, Json, Path}, + HttpResponse, Responder, Result, +}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; +use sqlx::{postgres::PgQueryResult, FromRow, PgPool}; +use uuid::Uuid; + +// 1. Core Data Structures + +// 2. Request/Response DTOs +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateBotRequest { + pub name: String, + pub initial_config: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateBotRequest { + pub name: Option, + pub status: Option, + pub config: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BotResponse { + pub bot_id: Uuid, + pub name: String, + pub status: BotStatus, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +// 3. Helper Functions +impl From for BotResponse { + fn from(bot: Bot) -> Self { + BotResponse { + bot_id: bot.bot_id, + name: bot.name, + status: bot.status, + created_at: bot.created_at, + updated_at: bot.updated_at, + } + } +} + +async fn find_bot(bot_id: Uuid, pool: &PgPool) -> Result { + sqlx::query_as::<_, Bot>("SELECT * FROM bots WHERE bot_id = $1") + .bind(bot_id) + .fetch_one(pool) + .await +} + +// 4. API Endpoints +#[post("/bots/create")] +pub async fn create_bot( + payload: Json, + state: Data, +) -> Result { + let new_bot = sqlx::query_as::<_, Bot>( + r#" + INSERT INTO bots (name, status, config) + VALUES ($1, 'active', $2) + RETURNING * + "#, + ) + .bind(&payload.name) + .bind(&payload.initial_config) + .fetch_one(&state.db) + .await + .map_err(|e| { + log::error!("Failed to create bot: {}", e); + actix_web::error::ErrorInternalServerError("Failed to create bot") + })?; + + Ok(HttpResponse::Created().json(BotResponse::from(new_bot))) +} + +#[get("/bots/{bot_id}")] +pub async fn get_bot( + path: Path, + state: Data, +) -> Result { + let bot_id = path.into_inner(); + let bot = find_bot(bot_id, &state.db).await.map_err(|e| match e { + sqlx::Error::RowNotFound => actix_web::error::ErrorNotFound("Bot not found"), + _ => { + log::error!("Failed to fetch bot: {}", e); + actix_web::error::ErrorInternalServerError("Failed to fetch bot") + } + })?; + + Ok(HttpResponse::Ok().json(BotResponse::from(bot))) +} + +#[get("/bots")] +pub async fn list_bots(state: Data) -> Result { + let bots = sqlx::query_as::<_, Bot>("SELECT * FROM bots ORDER BY created_at DESC") + .fetch_all(&state.db) + .await + .map_err(|e| { + log::error!("Failed to list bots: {}", e); + actix_web::error::ErrorInternalServerError("Failed to list bots") + })?; + + let responses: Vec = bots.into_iter().map(BotResponse::from).collect(); + Ok(HttpResponse::Ok().json(responses)) +} + +#[put("/bots/{bot_id}")] +pub async fn update_bot( + path: Path, + payload: Json, + state: Data, +) -> Result { + let bot_id = path.into_inner(); + + let updated_bot = sqlx::query_as::<_, Bot>( + r#" + UPDATE bots + SET + name = COALESCE($1, name), + status = COALESCE($2, status), + config = COALESCE($3, config), + updated_at = NOW() + WHERE bot_id = $4 + RETURNING * + "#, + ) + .bind(&payload.name) + .bind(&payload.status) + .bind(&payload.config) + .bind(bot_id) + .fetch_one(&state.db) + .await + .map_err(|e| match e { + sqlx::Error::RowNotFound => actix_web::error::ErrorNotFound("Bot not found"), + _ => { + log::error!("Failed to update bot: {}", e); + actix_web::error::ErrorInternalServerError("Failed to update bot") + } + })?; + + Ok(HttpResponse::Ok().json(BotResponse::from(updated_bot))) +} + +#[delete("/bots/{bot_id}")] +pub async fn delete_bot( + path: Path, + state: Data, +) -> Result { + let bot_id = path.into_inner(); + + let result = sqlx::query("DELETE FROM bots WHERE bot_id = $1") + .bind(bot_id) + .execute(&state.db) + .await + .map_err(|e| { + log::error!("Failed to delete bot: {}", e); + actix_web::error::ErrorInternalServerError("Failed to delete bot") + })?; + + if result.rows_affected() == 0 { + return Err(actix_web::error::ErrorNotFound("Bot not found")); + } + + Ok(HttpResponse::NoContent().finish()) +} + +#[put("/bots/{bot_id}/status")] +pub async fn update_bot_status( + path: Path, + new_status: Json, + state: Data, +) -> Result { + let bot_id = path.into_inner(); + + let updated_bot = sqlx::query_as::<_, Bot>( + "UPDATE bots SET status = $1, updated_at = NOW() WHERE bot_id = $2 RETURNING *", + ) + .bind(new_status.into_inner()) + .bind(bot_id) + .fetch_one(&state.db) + .await + .map_err(|e| match e { + sqlx::Error::RowNotFound => actix_web::error::ErrorNotFound("Bot not found"), + _ => { + log::error!("Failed to update bot status: {}", e); + actix_web::error::ErrorInternalServerError("Failed to update bot status") + } + })?; + + Ok(HttpResponse::Ok().json(BotResponse::from(updated_bot))) +} + +#[post("/bots/{bot_id}/execute")] +pub async fn execute_bot_command( + path: Path, + command: Json, + state: Data, +) -> Result { + let bot_id = path.into_inner(); + + // Verify bot exists + let _ = find_bot(bot_id, &state.db).await.map_err(|e| match e { + sqlx::Error::RowNotFound => actix_web::error::ErrorNotFound("Bot not found"), + _ => { + log::error!("Failed to fetch bot: {}", e); + actix_web::error::ErrorInternalServerError("Failed to fetch bot") + } + })?; + + // Here you would implement your bot execution logic + // For now, we'll just echo back the command + Ok(HttpResponse::Ok().json(json!({ + "bot_id": bot_id, + "command": command, + "result": "Command executed successfully (simulated)" + }))) +} diff --git a/src/services/keywords/create_draft.rs b/src/services/keywords/create_draft.rs index 9ffc348..8cabf82 100644 --- a/src/services/keywords/create_draft.rs +++ b/src/services/keywords/create_draft.rs @@ -1,5 +1,5 @@ +use crate::services::email::save_email_draft; use crate::services::email::{fetch_latest_sent_to, SaveDraftRequest}; -use crate::services::email::{save_email_draft}; use crate::services::state::AppState; use rhai::Dynamic; use rhai::Engine; @@ -38,7 +38,7 @@ async fn execute_create_draft( let get_result = fetch_latest_sent_to(&state.config.clone().unwrap().email, to).await; let email_body = if let Ok(get_result_str) = get_result { if !get_result_str.is_empty() { - let email_separator = "\n\n-------------------------------------------------\n\n"; // Horizontal rule style separator + let email_separator = "\n\n-------------------------------------------------\n\n"; // Horizontal rule style separator reply_text.to_string() + email_separator + get_result_str.as_str() } else { // Fixed: Use reply_text when get_result_str is empty, not empty string @@ -56,9 +56,10 @@ async fn execute_create_draft( text: email_body, }; - let save_result = match save_email_draft(&state.config.clone().unwrap().email, &draft_request).await { - Ok(_) => Ok("Draft saved successfully".to_string()), - Err(e) => Err(e.to_string()), - }; + let save_result = + match save_email_draft(&state.config.clone().unwrap().email, &draft_request).await { + Ok(_) => Ok("Draft saved successfully".to_string()), + Err(e) => Err(e.to_string()), + }; save_result -} \ No newline at end of file +} diff --git a/src/services/keywords/get.rs b/src/services/keywords/get.rs index e01652d..185e736 100644 --- a/src/services/keywords/get.rs +++ b/src/services/keywords/get.rs @@ -1,58 +1,65 @@ - use rhai::{Dynamic, Engine}; -use reqwest; use crate::services::state::AppState; -use std::error::Error; +use reqwest::{self, Client}; +use rhai::{Dynamic, Engine}; use scraper::{Html, Selector}; - +use std::error::Error; pub fn get_keyword(_state: &AppState, engine: &mut Engine) { - engine.register_custom_syntax( - &["GET", "$expr$"], - false, // Expression, not statement - move |context, inputs| { - let url = context.eval_expression_tree(&inputs[0])?; - let url_str = url.to_string(); + engine + .register_custom_syntax( + &["GET", "$expr$"], + false, // Expression, not statement + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?; + let url_str = url.to_string(); - if url_str.starts_with("https") { - println!("HTTPS GET request: {}", url_str); - - // Use the same pattern as find_keyword - let fut = execute_get(&url_str); - let result = tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(fut) - }).map_err(|e| format!("HTTP request failed: {}", e))?; - - Ok(Dynamic::from(result)) - } else { - println!("GET executed: {}", url_str); - Ok(Dynamic::from(format!("Content from {}", url_str))) - } - } - ).unwrap(); + if url_str.starts_with("https") { + println!("HTTPS GET request: {}", url_str); + + // Use the same pattern as find_keyword + let fut = execute_get(&url_str); + let result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(fut) + }) + .map_err(|e| format!("HTTP request failed: {}", e))?; + + Ok(Dynamic::from(result)) + } else { + println!("GET executed: {}", url_str); + Ok(Dynamic::from(format!("Content from {}", url_str))) + } + }, + ) + .unwrap(); } pub async fn _execute_get(url: &str) -> Result> { println!("Starting execute_get with URL: {}", url); - + let response = reqwest::get(url).await?; let content = response.text().await?; - + println!("GET request successful, got {} bytes", content.len()); Ok(format!("Secure content fetched: {}", content)) } pub async fn execute_get(url: &str) -> Result> { println!("Starting execute_get with URL: {}", url); - - let response = reqwest::get(url).await?; + + // Create a client that ignores invalid certificates + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + + let response = client.get(url).send().await?; let html_content = response.text().await?; - + // Parse HTML and extract text let document = Html::parse_document(&html_content); let selector = Selector::parse("body").unwrap(); // Focus on body content let body = document.select(&selector).next().unwrap(); let text_content = body.text().collect::>().join(" "); - + // Clean up the text (remove extra whitespace, newlines, etc.) let cleaned_text = text_content .replace('\n', " ") @@ -60,7 +67,10 @@ pub async fn execute_get(url: &str) -> Result>() .join(" "); - - println!("GET request successful, extracted {} characters of text", cleaned_text.len()); + + println!( + "GET request successful, extracted {} characters of text", + cleaned_text.len() + ); Ok(cleaned_text) } diff --git a/src/services/keywords/mod.rs b/src/services/keywords/mod.rs index c999292..a85a9f1 100644 --- a/src/services/keywords/mod.rs +++ b/src/services/keywords/mod.rs @@ -4,8 +4,10 @@ pub mod find; pub mod first; pub mod for_next; pub mod get; -pub mod get_website; +pub mod get_website; pub mod llm_keyword; +pub mod on; pub mod print; pub mod set; -pub mod wait; \ No newline at end of file +pub mod set_schedule; +pub mod wait; diff --git a/src/services/keywords/on.rs b/src/services/keywords/on.rs new file mode 100644 index 0000000..05e21c9 --- /dev/null +++ b/src/services/keywords/on.rs @@ -0,0 +1,85 @@ +use rhai::Dynamic; +use rhai::Engine; +use serde_json::{json, Value}; +use sqlx::PgPool; + +use crate::models::automation_model::TriggerKind; +use crate::services::state::AppState; + +pub fn on_keyword(state: &AppState, engine: &mut Engine) { + let db = state.db_custom.clone(); + + engine + .register_custom_syntax( + ["ON", "$ident$", "OF", "$string$"], // Changed $string$ to $ident$ for operation + true, + { + let db = db.clone(); + + move |context, inputs| { + let trigger_type = context.eval_expression_tree(&inputs[0])?.to_string(); + let table = context.eval_expression_tree(&inputs[1])?.to_string(); + let script_name = format!("{}_{}.rhai", table, trigger_type.to_lowercase()); + + // Determine the trigger kind based on the trigger type + let kind = match trigger_type.to_uppercase().as_str() { + "UPDATE" => TriggerKind::TableUpdate, + "INSERT" => TriggerKind::TableInsert, + "DELETE" => TriggerKind::TableDelete, + _ => return Err(format!("Invalid trigger type: {}", trigger_type).into()), + }; + + let binding = db.as_ref().unwrap(); + let fut = execute_on_trigger(binding, kind, &table, &script_name); + + let result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(fut) + }) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("No rows affected".into()) + } + } + }, + ) + .unwrap(); +} + +pub async fn execute_on_trigger( + pool: &PgPool, + kind: TriggerKind, + table: &str, + script_name: &str, +) -> Result { + println!( + "Starting execute_on_trigger with kind: {:?}, table: {}, script_name: {}", + kind, table, script_name + ); + + // Option 1: Use query_with macro if you need to pass enum values + let result = sqlx::query( + "INSERT INTO system_automations + (kind, target, script_name) + VALUES ($1, $2, $3)", + ) + .bind(kind.clone() as i32) // Assuming TriggerKind is #[repr(i32)] + .bind(table) + .bind(script_name) + .execute(pool) + .await + .map_err(|e| { + eprintln!("SQL execution error: {}", e); + e.to_string() + })?; + + Ok(json!({ + "command": "on_trigger", + "trigger_type": format!("{:?}", kind), + "table": table, + "script_name": script_name, + "rows_affected": result.rows_affected() + })) +} diff --git a/src/services/keywords/prompt.md b/src/services/keywords/prompt.md new file mode 100644 index 0000000..f6f4f75 --- /dev/null +++ b/src/services/keywords/prompt.md @@ -0,0 +1,150 @@ + +Create a new Rhai custom keyword implementation with these specifications: + +1. DATABASE REQUIREMENTS: +- No enums in database schema (only in Rust code) +- Use direct integer values for enum variants in queries +- Follow existing connection pooling pattern with AppState +- Include proper error handling and logging + +2. RUST IMPLEMENTATION: +- Enum definition (Rust-only, no DB enum): +```rust +#[repr(i32)] +pub enum KeywordAction { + Action1 = 0, + Action2 = 1, + Action3 = 2 +} +``` + +3. KEYWORD TEMPLATE: +```rust +pub fn {keyword_name}_keyword(state: &AppState, engine: &mut Engine) { + let db = state.db_custom.clone(); + + engine.register_custom_syntax( + {syntax_pattern}, + {is_raw}, + { + let db = db.clone(); + move |context, inputs| { + // Input processing + {input_processing} + + let binding = db.as_ref().unwrap(); + let fut = execute_{keyword_name}(binding, {params}); + + let result = tokio::task::block_in_place(|| + tokio::runtime::Handle::current().block_on(fut)) + .map_err(|e| format!("DB error: {}", e))?; + + {result_handling} + } + } + ).unwrap(); +} + +pub async fn execute_{keyword_name}( + pool: &PgPool, + {params_with_types} +) -> Result> { + println!("Executing {keyword_name} with: {debug_params}"); + + let result = sqlx::query( + "{sql_query_with_i32_enum}" + ) + .bind({enum_value} as i32) + {additional_binds} + .execute(pool) + .await?; + + Ok(json!({ + "command": "{keyword_name}", + {result_fields} + "rows_affected": result.rows_affected() + })) +} +``` + +4. EXAMPLE IMPLEMENTATION (SET SCHEDULE): +```rust +// Enum (Rust-only) +#[repr(i32)] +pub enum TriggerKind { + Scheduled = 0, + TableUpdate = 1, + TableInsert = 2, + TableDelete = 3 +} + +// Keyword implementation +pub fn set_schedule_keyword(state: &AppState, engine: &mut Engine) { + let db = state.db_custom.clone(); + + engine.register_custom_syntax( + ["SET", "SCHEDULE", "$string$"], + true, + { + let db = db.clone(); + move |context, inputs| { + let cron = context.eval_expression_tree(&inputs[0])?.to_string(); + let script_name = format!("cron_{}.rhai", cron.replace(' ', "_")); + + let binding = db.as_ref().unwrap(); + let fut = execute_set_schedule(binding, &cron, &script_name); + + let result = tokio::task::block_in_place(|| + tokio::runtime::Handle::current().block_on(fut)) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("No rows affected".into()) + } + } + } + ).unwrap(); +} + +pub async fn execute_set_schedule( + pool: &PgPool, + cron: &str, + script_name: &str, +) -> Result> { + println!("Executing schedule: {}, {}", cron, script_name); + + let result = sqlx::query( + "INSERT INTO system_automations + (kind, schedule, script_name) + VALUES ($1, $2, $3)" + ) + .bind(TriggerKind::Scheduled as i32) + .bind(cron) + .bind(script_name) + .execute(pool) + .await?; + + Ok(json!({ + "command": "set_schedule", + "schedule": cron, + "script_name": script_name, + "rows_affected": result.rows_affected() + })) +} +``` + +5. ADDITIONAL REQUIREMENTS: +- Maintain consistent tokio runtime handling +- Include parameter validation +- Follow existing JSON response format +- Ensure proper script name generation +- Include debug logging for all operations + +6. OUTPUT FORMAT: +Provide complete implementation with: +1. Rust enum definition +2. Keyword registration function +3. Execution function +4. Example usage in Rhai diff --git a/src/services/keywords/set.rs b/src/services/keywords/set.rs index 53e9726..6c22012 100644 --- a/src/services/keywords/set.rs +++ b/src/services/keywords/set.rs @@ -1,7 +1,7 @@ use rhai::Dynamic; use rhai::Engine; use serde_json::{json, Value}; -use sqlx::{PgPool}; +use sqlx::PgPool; use std::error::Error; use crate::services::state::AppState; @@ -57,8 +57,9 @@ pub async fn execute_set( let update_params_count = update_values.len(); // Parse filter with proper type handling - let (where_clause, filter_values) = utils::parse_filter_with_offset(filter_str, update_params_count) - .map_err(|e| e.to_string())?; + let (where_clause, filter_values) = + utils::parse_filter_with_offset(filter_str, update_params_count) + .map_err(|e| e.to_string())?; let query = format!( "UPDATE {} SET {} WHERE {}", @@ -68,24 +69,21 @@ pub async fn execute_set( // Build query with proper parameter binding let mut query = sqlx::query(&query); - + // Bind update values for value in update_values { query = bind_value(query, value); } - + // Bind filter values for value in filter_values { query = bind_value(query, value); } - let result = query - .execute(pool) - .await - .map_err(|e| { - eprintln!("SQL execution error: {}", e); - e.to_string() - })?; + let result = query.execute(pool).await.map_err(|e| { + eprintln!("SQL execution error: {}", e); + e.to_string() + })?; Ok(json!({ "command": "set", @@ -96,7 +94,10 @@ pub async fn execute_set( })) } -fn bind_value<'q>(query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments>, value: String) -> sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments> { +fn bind_value<'q>( + query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments>, + value: String, +) -> sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres::PgArguments> { if let Ok(int_val) = value.parse::() { query.bind(int_val) } else if let Ok(float_val) = value.parse::() { @@ -114,7 +115,7 @@ fn bind_value<'q>(query: sqlx::query::Query<'q, sqlx::Postgres, sqlx::postgres:: fn parse_updates(updates_str: &str) -> Result<(String, Vec), Box> { let mut set_clauses = Vec::new(); let mut params = Vec::new(); - + for (i, update) in updates_str.split(',').enumerate() { let parts: Vec<&str> = update.split('=').collect(); if parts.len() != 2 { @@ -124,7 +125,10 @@ fn parse_updates(updates_str: &str) -> Result<(String, Vec), Box Result<(String, Vec), Box Result> { + println!( + "Starting execute_set_schedule with cron: {}, script_name: {}", + cron, script_name + ); + + let result = sqlx::query( + r#" + INSERT INTO system_automations + (kind, schedule, script_name) + VALUES ($1, $2, $3) + "#, + ) + .bind(TriggerKind::Scheduled as i32) // Cast to i32 + .bind(cron) + .bind(script_name) + .execute(pool) + .await?; + + Ok(json!({ + "command": "set_schedule", + "schedule": cron, + "script_name": script_name, + "rows_affected": result.rows_affected() + })) +} diff --git a/src/services/llm_local.rs b/src/services/llm_local.rs index 3ac0ba0..e2af303 100644 --- a/src/services/llm_local.rs +++ b/src/services/llm_local.rs @@ -3,7 +3,7 @@ use dotenv::dotenv; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::env; -use std::process::{Child, Command, Stdio}; +use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command as TokioCommand; diff --git a/src/services/prompt.md b/src/services/prompt.md new file mode 100644 index 0000000..002e228 --- /dev/null +++ b/src/services/prompt.md @@ -0,0 +1,57 @@ +Generate a Rust service module following these patterns: + +Core Structure: + +Use actix-web for HTTP endpoints (get, post, etc.) + +Isolate shared resources (DB, clients, config) in AppState + +Split logic into reusable helper functions + +do not create main logic + +Endpoints: + +Follow REST conventions (e.g., POST /{resource}/create) use anotations in methods. + +Use web::Path for route parameters, web::Json for payloads + +Return consistent responses (e.g., HttpResponse::Ok().json(data)) + +Error Handling: + +Wrap fallible operations in Result + +Use map_err to convert errors to actix_web::Error + +Provide clear error messages (e.g., ErrorInternalServerError) + +Async Patterns: + +Use async/await for I/O (DB, external APIs) + +Leverage streams for pagination/large datasets + +Isolate blocking ops in spawn_blocking if needed + +Configuration: + +Load settings (e.g., URLs, credentials) from AppConfig + +Initialize clients (DB, SDKs) at startup (e.g., init_*() helpers) + +Documentation: + +Add brief doc comments for public functions + +Note safety assumptions (e.g., #[post] invariants) +postgres sqlx +Omit domain-specific logic (e.g., file/email details), focusing on the scaffolding." + +Key Features: + +Generic (applies to any service: auth, payments, etc.) + +KISS (avoids over-engineering) + +Copy-paste friendly (clear patterns without verbosity) diff --git a/src/services/script.rs b/src/services/script.rs index 51f8de3..c6939a6 100644 --- a/src/services/script.rs +++ b/src/services/script.rs @@ -1,16 +1,18 @@ -use rhai::{ Dynamic, Engine, EvalAltResult}; -use crate::services::keywords::create_draft::{create_draft_keyword}; +use crate::services::keywords::create_draft::create_draft_keyword; use crate::services::keywords::create_site::create_site_keyword; -use crate::services::keywords::find::{find_keyword}; +use crate::services::keywords::find::find_keyword; +use crate::services::keywords::first::first_keyword; use crate::services::keywords::for_next::for_keyword; use crate::services::keywords::get::get_keyword; use crate::services::keywords::get_website::get_website_keyword; use crate::services::keywords::llm_keyword::llm_keyword; +use crate::services::keywords::on::on_keyword; use crate::services::keywords::print::print_keyword; use crate::services::keywords::set::set_keyword; +use crate::services::keywords::set_schedule::set_schedule_keyword; use crate::services::keywords::wait::wait_keyword; -use crate::services::keywords::first::first_keyword; use crate::services::state::AppState; +use rhai::{Dynamic, Engine, EvalAltResult}; pub struct ScriptService { engine: Engine, @@ -19,22 +21,25 @@ pub struct ScriptService { impl ScriptService { pub fn new(state: &AppState) -> Self { let mut engine = Engine::new(); - + // Configure engine for BASIC-like syntax engine.set_allow_anonymous_fn(true); engine.set_allow_looping(true); - + create_draft_keyword(state, &mut engine); create_site_keyword(state, &mut engine); find_keyword(state, &mut engine); - for_keyword(state, &mut engine); - first_keyword(&mut engine); + for_keyword(state, &mut engine); + first_keyword(&mut engine); llm_keyword(state, &mut engine); get_website_keyword(state, &mut engine); get_keyword(state, &mut engine); set_keyword(state, &mut engine); wait_keyword(state, &mut engine); print_keyword(state, &mut engine); + on_keyword(state, &mut engine); + set_schedule_keyword(state, &mut engine); + ScriptService { engine } }