diff --git a/src/main.rs b/src/main.rs index 1b24b49..9f32e61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,25 +14,31 @@ use sqlx::PgPool; //use services:: find::*; mod services; -#[actix_web::main] +#[tokio::main(flavor = "multi_thread")] + async fn main() -> std::io::Result<()> { dotenv().ok(); let config = AppConfig::from_env(); + let db_url = config.database_url(); + let db_custom_url = config.database_custom_url(); + let db = PgPool::connect(&db_url).await.unwrap(); + let db_custom = PgPool::connect(&db_custom_url).await.unwrap(); + let minio_client = init_minio(&config) + .await + .expect("Failed to initialize Minio"); -// let table_str = "rob"; -// let filter_str = "ACTION=EMUL1"; - -// match execute_find(table_str, filter_str) { -// Ok(result) => println!("{}", result), -// Err(e) => eprintln!("Error: {}", e), -// } + let app_state = web::Data::new(AppState { + db: db.into(), + db_custom: db_custom.into(), + config: Some(config.clone()), + minio_client: minio_client.into(), + }); - - let script_service = ScriptService::new(); + let script_service = ScriptService::new(&app_state.clone()); let script = r#" - let items = FIND "rob", "ACTION=EMUL1" + let items = FIND "gb.rob", "ACTION=EMUL1" FOR EACH item IN items let text = GET "example.com" PRINT item.name @@ -49,18 +55,6 @@ async fn main() -> std::io::Result<()> { } - let db_url = config.database_url(); - let db = PgPool::connect(&db_url).await.unwrap(); - - let minio_client = init_minio(&config) - .await - .expect("Failed to initialize Minio"); - - let app_state = web::Data::new(AppState { - db: db.into(), - config: Some(config.clone()), - minio_client: minio_client.into(), - }); // Start HTTP server HttpServer::new(move || { diff --git a/src/services.rs b/src/services.rs index 7eb6360..c10bab9 100644 --- a/src/services.rs +++ b/src/services.rs @@ -2,6 +2,7 @@ pub mod config; pub mod state; pub mod email; +pub mod find; pub mod file; pub mod llm; pub mod script; \ No newline at end of file diff --git a/src/services/config.rs b/src/services/config.rs index 8fa151d..c4d48b9 100644 --- a/src/services/config.rs +++ b/src/services/config.rs @@ -5,6 +5,8 @@ pub struct AppConfig { pub minio: MinioConfig, pub server: ServerConfig, pub database: DatabaseConfig, + pub database_custom: DatabaseConfig, + pub email: EmailConfig, pub ai: AIConfig, } @@ -67,6 +69,18 @@ impl AppConfig { ) } + pub fn database_custom_url(&self) -> String { + format!( + "postgres://{}:{}@{}:{}/{}", + self.database_custom.username, + self.database_custom.password, + self.database_custom.server, + self.database_custom.port, + self.database_custom.database + ) + } + + pub fn from_env() -> Self { let database = DatabaseConfig { username: env::var("TABLES_USERNAME").unwrap_or_else(|_| "user".to_string()), @@ -79,6 +93,17 @@ impl AppConfig { database: env::var("TABLES_DATABASE").unwrap_or_else(|_| "db".to_string()), }; + let database_custom = DatabaseConfig { + username: env::var("CUSTOM_USERNAME").unwrap_or_else(|_| "user".to_string()), + password: env::var("CUSTOM_PASSWORD").unwrap_or_else(|_| "pass".to_string()), + server: env::var("CUSTOM_SERVER").unwrap_or_else(|_| "localhost".to_string()), + port: env::var("CUSTOM_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(5432), + database: env::var("CUSTOM_DATABASE").unwrap_or_else(|_| "db".to_string()), + }; + let minio = MinioConfig { server: env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"), access_key: env::var("DRIVE_ACCESSKEY").expect("DRIVE_ACCESSKEY not set"), @@ -125,6 +150,7 @@ impl AppConfig { .unwrap_or(8080), }, database, + database_custom, email, ai, } diff --git a/src/services/file.rs b/src/services/file.rs index e5c8f73..22d9235 100644 --- a/src/services/file.rs +++ b/src/services/file.rs @@ -1,4 +1,5 @@ + use actix_web::{ web}; use actix_multipart::Multipart; diff --git a/src/services/find.md b/src/services/find.md deleted file mode 100644 index cdc21cd..0000000 --- a/src/services/find.md +++ /dev/null @@ -1,232 +0,0 @@ -- 100KB Email box - -i need a vb to russt watx -first wkeywords; -the keyword find for example will call the websiervics speciifid. -you now ? this should be compatible with languageserer so can be debuger -in code. user rust minamalistc to do this very inline and minimize code verbosity -it will be integatd in antoher proejt so i nee a sevicelayer -to call it from a schduler i need the compiler and the rnner of the ob -use localhost:5858 to call websiervis from the wasm executable -this is will attend - - FOR EACH item in ARRAY - next item - - FIND filter - - json = FIND "tablename", "field=value" - /tables/find - - SET "tablename", "key=value", "value" - /tables/set - - text = GET "domain.com" - /webauto/get_text - - text = GET WEBSITE "CASAS BAHIA" 'casasbahia.com via duckduckgo. - /webauto/get_website - - CREATE SITE "sitename",company, website, template, prompt - - /sites/create - copy template from temapltes folder - add llm outpt page.tsx - add listener - - CREATE DRAFT to, subject, body - /email/create_draft - - - - - - - - - - -use serde_json::{json, Value}; -use sqlx::{postgres::PgRow, PgPool, Row}; -use sqlx::Column; // Required for .name() method -use sqlx::TypeInfo; // Required for .type_info() method -use std::error::Error; -use sqlx::postgres::PgPoolOptions; - -// Main async function to execute the FIND command -pub async fn execute_find( - pool: &PgPool, - table_str: &str, - filter_str: &str, -) -> Result> { - // Parse the filter string into SQL WHERE clause and parameters - let (where_clause, params) = parse_filter(filter_str)?; - - // Build the SQL query with proper parameter binding - let query = format!("SELECT * FROM {} WHERE {}", table_str, where_clause); - - // Execute query and collect results - let rows = match params.len() { - 0 => sqlx::query(&query).fetch_all(pool).await?, - 1 => sqlx::query(&query).bind(¶ms[0]).fetch_all(pool).await?, - _ => return Err("Only single parameter filters supported in this example".into()), - }; - - // Convert rows to JSON values - let mut results = Vec::new(); - for row in rows { - results.push(row_to_json(row)?); - } - - // Return the structured result - Ok(json!({ - "command": "find", - "table": table_str, - "filter": filter_str, - "results": results - })) -} - -fn row_to_json(row: PgRow) -> Result> { - let mut result = serde_json::Map::new(); - for (i, column) in row.columns().iter().enumerate() { - let column_name = column.name(); - let value: Value = match column.type_info().name() { - "int4" | "int8" => { - match row.try_get::(i) { - Ok(v) => json!(v), - Err(_) => Value::Null, - } - }, - "float4" | "float8" => { - match row.try_get::(i) { - Ok(v) => json!(v), - Err(_) => Value::Null, - } - }, - "text" | "varchar" => { - match row.try_get::(i) { - Ok(v) => json!(v), - Err(_) => Value::Null, - } - }, - "bool" => { - match row.try_get::(i) { - Ok(v) => json!(v), - Err(_) => Value::Null, - } - }, - "json" | "jsonb" => { - match row.try_get::(i) { - Ok(v) => v, - Err(_) => Value::Null, - } - }, - _ => Value::Null, - }; - result.insert(column_name.to_string(), value); - } - Ok(Value::Object(result)) -} -// Helper function to parse the filter string into SQL WHERE clause and parameters -fn parse_filter(filter_str: &str) -> Result<(String, Vec), Box> { - let parts: Vec<&str> = filter_str.split('=').collect(); - if parts.len() != 2 { - return Err("Invalid filter format. Expected 'KEY=VALUE'".into()); - } - - let column = parts[0].trim(); - let value = parts[1].trim(); - - // Validate column name to prevent SQL injection - if !column.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') { - return Err("Invalid column name in filter".into()); - } - - // Return the parameterized query part and the value separately - Ok((format!("{} = $1", column), vec![value.to_string()])) -} - -// Database connection setup -pub async fn create_pool(database_url: &str) -> Result> { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(database_url) - .await?; - - // Test the connection - sqlx::query("SELECT 1").execute(&pool).await?; - - Ok(pool) -} - -#[cfg(test)] -mod tests { - use super::*; - use sqlx::postgres::PgPoolOptions; - use dotenv::dotenv; - use std::env; - - async fn setup_test_db() -> PgPool { - dotenv().ok(); - let database_url = env::var("DATABASE_URL") - .expect("DATABASE_URL must be set in .env for tests"); - - let pool = PgPoolOptions::new() - .max_connections(1) - .connect(&database_url) - .await - .unwrap(); - - // Create a test table - sqlx::query( - r#" - DROP TABLE IF EXISTS rob; - CREATE TABLE rob ( - id SERIAL PRIMARY KEY, - action TEXT, - name TEXT, - is_active BOOLEAN, - metadata JSONB - ) - "# - ) - .execute(&pool) - .await - .unwrap(); - - // Insert test data - sqlx::query( - r#" - INSERT INTO rob (action, name, is_active, metadata) VALUES - ('EMUL1', 'Robot1', true, '{"version": 1}'), - ('EMUL2', 'Robot2', false, '{"version": 2}'), - ('EMUL1', 'Robot3', true, null) - "# - ) - .execute(&pool) - .await - .unwrap(); - - pool - } - - #[tokio::test] - async fn test_execute_find() { - let pool = setup_test_db().await; - let result = execute_find(&pool, "rob", "action=EMUL1") - .await - .unwrap(); - - let results = result["results"].as_array().unwrap(); - assert_eq!(results.len(), 2); - assert_eq!(results[0]["action"], "EMUL1"); - assert_eq!(results[1]["action"], "EMUL1"); - - // Test JSON field - assert_eq!(results[0]["metadata"]["version"], 1); - - // Test boolean field - assert_eq!(results[0]["is_active"], true); - } -} \ No newline at end of file diff --git a/src/services/find.rs b/src/services/find.rs new file mode 100644 index 0000000..988f0e0 --- /dev/null +++ b/src/services/find.rs @@ -0,0 +1,158 @@ +use serde_json::{json, Value}; +use sqlx::postgres::PgPoolOptions; +use sqlx::Column; // Required for .name() method +use sqlx::TypeInfo; // Required for .type_info() method +use sqlx::{postgres::PgRow, PgPool, Row}; +use std::error::Error; +use std::time::Duration; + + +pub async fn execute_find( + pool: &PgPool, + table_str: &str, + filter_str: &str, +) -> Result { // Changed to String error like your Actix code + println!("Starting execute_find with table: {}, filter: {}", table_str, filter_str); + + let (where_clause, params) = parse_filter(filter_str) + .map_err(|e| e.to_string())?; + + let query = format!("SELECT * FROM {} WHERE {} LIMIT 10", table_str, where_clause); + println!("Executing query: {}", query); + + // Use the same simple pattern as your Actix code - no timeout wrapper + let rows = sqlx::query(&query) + .bind(¶ms[0]) // Simplified like your working code + .fetch_all(pool) + .await + .map_err(|e| { + eprintln!("SQL execution error: {}", e); + e.to_string() + })?; + + println!("Query successful, got {} rows", rows.len()); + + let mut results = Vec::new(); + for row in rows { + results.push(row_to_json(row).map_err(|e| e.to_string())?); + } + + Ok(json!({ + "command": "find", + "table": table_str, + "filter": filter_str, + "results": results + })) +} + +fn row_to_json(row: PgRow) -> Result> { + let mut result = serde_json::Map::new(); + let columns = row.columns(); + println!("Processing {} columns", columns.len()); + + for (i, column) in columns.iter().enumerate() { + let column_name = column.name(); + let type_name = column.type_info().name(); + println!( + "Processing column {}: {} (type: {})", + i, column_name, type_name + ); + + let value: Value = match type_name { + "INT4" | "INT8" | "int4" | "int8" => match row.try_get::(i) { + Ok(v) => { + println!("Got int64 value: {}", v); + json!(v) + } + Err(e) => { + println!("Failed to get int64, trying i32: {}", e); + match row.try_get::(i) { + Ok(v) => json!(v as i64), + Err(_) => Value::Null, + } + } + }, + "FLOAT4" | "FLOAT8" | "float4" | "float8" => match row.try_get::(i) { + Ok(v) => { + println!("Got float64 value: {}", v); + json!(v) + } + Err(e) => { + println!("Failed to get float64, trying f32: {}", e); + match row.try_get::(i) { + Ok(v) => json!(v as f64), + Err(_) => Value::Null, + } + } + }, + "TEXT" | "VARCHAR" | "text" | "varchar" => match row.try_get::(i) { + Ok(v) => { + println!("Got string value: {}", v); + json!(v) + } + Err(e) => { + println!("Failed to get string: {}", e); + Value::Null + } + }, + "BOOL" | "bool" => match row.try_get::(i) { + Ok(v) => { + println!("Got bool value: {}", v); + json!(v) + } + Err(e) => { + println!("Failed to get bool: {}", e); + Value::Null + } + }, + "JSON" | "JSONB" | "json" | "jsonb" => match row.try_get::(i) { + Ok(v) => { + println!("Got JSON value: {:?}", v); + v + } + Err(e) => { + println!("Failed to get JSON, trying as string: {}", e); + match row.try_get::(i) { + Ok(s) => match serde_json::from_str(&s) { + Ok(v) => v, + Err(_) => json!(s), + }, + Err(_) => Value::Null, + } + } + }, + _ => { + println!("Unknown type {}, trying as string", type_name); + match row.try_get::(i) { + Ok(v) => json!(v), + Err(_) => Value::Null, + } + } + }; + result.insert(column_name.to_string(), value); + } + println!("Finished processing row, got {} fields", result.len()); + Ok(Value::Object(result)) +} + +// Helper function to parse the filter string into SQL WHERE clause and parameters +fn parse_filter(filter_str: &str) -> Result<(String, Vec), Box> { + let parts: Vec<&str> = filter_str.split('=').collect(); + if parts.len() != 2 { + return Err("Invalid filter format. Expected 'KEY=VALUE'".into()); + } + + let column = parts[0].trim(); + let value = parts[1].trim(); + + // Validate column name to prevent SQL injection + if !column + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '_') + { + return Err("Invalid column name in filter".into()); + } + + // Return the parameterized query part and the value separately + Ok((format!("{} = $1", column), vec![value.to_string()])) +} diff --git a/src/services/llm-email.md b/src/services/llm-email.md index 3c9f0dd..8fc69f8 100644 --- a/src/services/llm-email.md +++ b/src/services/llm-email.md @@ -52,7 +52,7 @@ pub async fn chat( // Define available tools based on context let tools = get_available_tools(&request.context); - + // Build the prompt with context and available tools let system_prompt = build_system_prompt(&request.context, &tools); let user_message = format!("{}\n\nUser input: {}", system_prompt, request.input); diff --git a/src/services/script.rs b/src/services/script.rs index 97307dc..09002a2 100644 --- a/src/services/script.rs +++ b/src/services/script.rs @@ -1,11 +1,14 @@ -use smartstring::SmartString; use anyhow::Error; use rhai::module_resolvers::StaticModuleResolver; use rhai::{Array, Dynamic, Engine, FnPtr, Scope}; use rhai::{EvalAltResult, ImmutableString, LexError, ParseError, ParseErrorType, Position}; use serde_json::{json, Value}; +use smartstring::SmartString; use std::collections::HashMap; +use crate::services::find::execute_find; +use crate::services::state::AppState; + pub struct ScriptService { engine: Engine, module_resolver: StaticModuleResolver, @@ -53,7 +56,7 @@ fn to_array(value: Dynamic) -> Array { } impl ScriptService { - pub fn new() -> Self { + pub fn new(state: &AppState) -> Self { let mut engine = Engine::new(); let module_resolver = StaticModuleResolver::new(); @@ -140,50 +143,41 @@ impl ScriptService { .unwrap(); // FIND command: FIND "table", "filter" + // Clone the database reference outside the closure to avoid lifetime issues + let db = state.db_custom.clone(); + engine - .register_custom_syntax( - &["FIND", "$expr$", ",", "$expr$"], - false, // Expression, not statement - |context, inputs| { + .register_custom_syntax(&["FIND", "$expr$", ",", "$expr$"], false, { + let db = db.clone(); + + move |context, inputs| { let table_name = context.eval_expression_tree(&inputs[0])?; let filter = context.eval_expression_tree(&inputs[1])?; + let binding = db.as_ref().unwrap(); - let table_str = table_name.to_string(); - let filter_str = filter.to_string(); + // Use the current async context instead of creating a new runtime + let binding2 = table_name.to_string(); + let binding3 = filter.to_string(); + let fut = execute_find( + binding, + &binding2, + &binding3, + ); - use serde_json::json; + // Use tokio::task::block_in_place + tokio::runtime::Handle::current().block_on + let result = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(fut) + }) + .map_err(|e| format!("DB error: {}", e))?; - let result = json!({ - "command": "find", - "table": table_str, - "filter": filter_str, - "results": [ - { - "id": 1, - "name": "dummy1" - }, - { - "id": 2, - "name": "dummy2" - } - ] - }); - - if let serde_json::Value::Object(ref obj) = result { - if let Some(results_value) = obj.get("results") { - let dynamic_results = json_value_to_dynamic(results_value); - - // Now you can work with it as Dynamic - let array = to_array(dynamic_results); - Ok(Dynamic::from(array)) - } else { - Err("No results found".into()) - } + if let Some(results) = result.get("results") { + let array = to_array(json_value_to_dynamic(results)); + Ok(Dynamic::from(array)) } else { - Err("Invalid result format".into()) + Err("No results".into()) } - }, - ) + } + }) .unwrap(); // SET command: SET "table", "key", "value" @@ -425,81 +419,3 @@ impl ScriptService { self.run(&ast) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_basic_script_without_semicolons() { - let service = ScriptService::new(); - - // Test BASIC-style script without semicolons - let script = r#" -json = FIND "users", "name=John" -SET "users", "name=John", "age=30" -text = GET "example.com" -CREATE SITE "mysite", "My Company", "mycompany.com", "basic", "Create a professional site" -CREATE DRAFT "client@example.com", "Project Update", "Here's the latest update..." -PRINT "Script completed successfully" - "#; - - let result = service.execute_basic_script(script); - assert!(result.is_ok()); - } - - #[test] - fn test_preprocessing() { - let service = ScriptService::new(); - - let script = r#" -json = FIND "users", "name=John" -SET "users", "name=John", "age=30" -let x = 42 -PRINT x -if x > 10 { - PRINT "Large number" -} - "#; - - let processed = service.preprocess_basic_script(script); - - // Should add semicolons to regular statements but not custom commands - assert!(processed.contains("let x = 42;")); - assert!(processed.contains("json = FIND")); - assert!(!processed.contains("SET \"users\"")); - assert!(!processed.contains("PRINT \"Large number\";")); // Inside block shouldn't get semicolon - } - - #[test] - fn test_individual_commands() { - let service = ScriptService::new(); - - let commands = vec![ - r#"SET "users", "name=John", "age=30""#, - r#"CREATE SITE "mysite", "My Company", "mycompany.com", "basic", "Create a professional site""#, - r#"CREATE DRAFT "client@example.com", "Project Update", "Here's the latest update...""#, - r#"PRINT "Hello, World!""#, - ]; - - for cmd in commands { - let result = service.execute_basic_script(cmd); - assert!(result.is_ok(), "Command '{}' failed", cmd); - } - } - - #[test] - fn test_block_statements() { - let service = ScriptService::new(); - - let script = r#" -if true { - PRINT "Inside block" - PRINT "Another statement" -} - "#; - - let result = service.execute_basic_script(script); - assert!(result.is_ok()); - } -} diff --git a/src/services/state.rs b/src/services/state.rs index 2ff306f..4ab156a 100644 --- a/src/services/state.rs +++ b/src/services/state.rs @@ -8,6 +8,6 @@ pub struct AppState { pub minio_client: Option, pub config: Option, pub db: Option, - +pub db_custom: Option, }