Refactor main function to support multiple database connections and enhance AppState structure; add custom database configuration

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-07-19 00:45:40 -03:00
parent 0f0ea3e137
commit e17e0e36a0
9 changed files with 237 additions and 373 deletions

View file

@ -14,25 +14,31 @@ use sqlx::PgPool;
//use services:: find::*;
mod services;
#[actix_web::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> std::io::Result<()> {
dotenv().ok();
let config = AppConfig::from_env();
let db_url = config.database_url();
let db_custom_url = config.database_custom_url();
let db = PgPool::connect(&db_url).await.unwrap();
let db_custom = PgPool::connect(&db_custom_url).await.unwrap();
let minio_client = init_minio(&config)
.await
.expect("Failed to initialize Minio");
// let table_str = "rob";
// let filter_str = "ACTION=EMUL1";
// match execute_find(table_str, filter_str) {
// Ok(result) => println!("{}", result),
// Err(e) => eprintln!("Error: {}", e),
// }
let app_state = web::Data::new(AppState {
db: db.into(),
db_custom: db_custom.into(),
config: Some(config.clone()),
minio_client: minio_client.into(),
});
let script_service = ScriptService::new();
let script_service = ScriptService::new(&app_state.clone());
let script = r#"
let items = FIND "rob", "ACTION=EMUL1"
let items = FIND "gb.rob", "ACTION=EMUL1"
FOR EACH item IN items
let text = GET "example.com"
PRINT item.name
@ -49,18 +55,6 @@ async fn main() -> std::io::Result<()> {
}
let db_url = config.database_url();
let db = PgPool::connect(&db_url).await.unwrap();
let minio_client = init_minio(&config)
.await
.expect("Failed to initialize Minio");
let app_state = web::Data::new(AppState {
db: db.into(),
config: Some(config.clone()),
minio_client: minio_client.into(),
});
// Start HTTP server
HttpServer::new(move || {

View file

@ -2,6 +2,7 @@ pub mod config;
pub mod state;
pub mod email;
pub mod find;
pub mod file;
pub mod llm;
pub mod script;

View file

@ -5,6 +5,8 @@ pub struct AppConfig {
pub minio: MinioConfig,
pub server: ServerConfig,
pub database: DatabaseConfig,
pub database_custom: DatabaseConfig,
pub email: EmailConfig,
pub ai: AIConfig,
}
@ -67,6 +69,18 @@ impl AppConfig {
)
}
pub fn database_custom_url(&self) -> String {
format!(
"postgres://{}:{}@{}:{}/{}",
self.database_custom.username,
self.database_custom.password,
self.database_custom.server,
self.database_custom.port,
self.database_custom.database
)
}
pub fn from_env() -> Self {
let database = DatabaseConfig {
username: env::var("TABLES_USERNAME").unwrap_or_else(|_| "user".to_string()),
@ -79,6 +93,17 @@ impl AppConfig {
database: env::var("TABLES_DATABASE").unwrap_or_else(|_| "db".to_string()),
};
let database_custom = DatabaseConfig {
username: env::var("CUSTOM_USERNAME").unwrap_or_else(|_| "user".to_string()),
password: env::var("CUSTOM_PASSWORD").unwrap_or_else(|_| "pass".to_string()),
server: env::var("CUSTOM_SERVER").unwrap_or_else(|_| "localhost".to_string()),
port: env::var("CUSTOM_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(5432),
database: env::var("CUSTOM_DATABASE").unwrap_or_else(|_| "db".to_string()),
};
let minio = MinioConfig {
server: env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"),
access_key: env::var("DRIVE_ACCESSKEY").expect("DRIVE_ACCESSKEY not set"),
@ -125,6 +150,7 @@ impl AppConfig {
.unwrap_or(8080),
},
database,
database_custom,
email,
ai,
}

View file

@ -1,4 +1,5 @@
use actix_web::{ web};
use actix_multipart::Multipart;

View file

@ -1,232 +0,0 @@
- 100KB Email box
i need a vb to russt watx
first wkeywords;
the keyword find for example will call the websiervics speciifid.
you now ? this should be compatible with languageserer so can be debuger
in code. user rust minamalistc to do this very inline and minimize code verbosity
it will be integatd in antoher proejt so i nee a sevicelayer
to call it from a schduler i need the compiler and the rnner of the ob
use localhost:5858 to call websiervis from the wasm executable
this is will attend
FOR EACH item in ARRAY
next item
FIND filter
json = FIND "tablename", "field=value"
/tables/find
SET "tablename", "key=value", "value"
/tables/set
text = GET "domain.com"
/webauto/get_text
text = GET WEBSITE "CASAS BAHIA" 'casasbahia.com via duckduckgo.
/webauto/get_website
CREATE SITE "sitename",company, website, template, prompt
/sites/create
copy template from temapltes folder
add llm outpt page.tsx
add listener
CREATE DRAFT to, subject, body
/email/create_draft
use serde_json::{json, Value};
use sqlx::{postgres::PgRow, PgPool, Row};
use sqlx::Column; // Required for .name() method
use sqlx::TypeInfo; // Required for .type_info() method
use std::error::Error;
use sqlx::postgres::PgPoolOptions;
// Main async function to execute the FIND command
pub async fn execute_find(
pool: &PgPool,
table_str: &str,
filter_str: &str,
) -> Result<Value, Box<dyn Error>> {
// Parse the filter string into SQL WHERE clause and parameters
let (where_clause, params) = parse_filter(filter_str)?;
// Build the SQL query with proper parameter binding
let query = format!("SELECT * FROM {} WHERE {}", table_str, where_clause);
// Execute query and collect results
let rows = match params.len() {
0 => sqlx::query(&query).fetch_all(pool).await?,
1 => sqlx::query(&query).bind(&params[0]).fetch_all(pool).await?,
_ => return Err("Only single parameter filters supported in this example".into()),
};
// Convert rows to JSON values
let mut results = Vec::new();
for row in rows {
results.push(row_to_json(row)?);
}
// Return the structured result
Ok(json!({
"command": "find",
"table": table_str,
"filter": filter_str,
"results": results
}))
}
fn row_to_json(row: PgRow) -> Result<Value, Box<dyn Error>> {
let mut result = serde_json::Map::new();
for (i, column) in row.columns().iter().enumerate() {
let column_name = column.name();
let value: Value = match column.type_info().name() {
"int4" | "int8" => {
match row.try_get::<i64, _>(i) {
Ok(v) => json!(v),
Err(_) => Value::Null,
}
},
"float4" | "float8" => {
match row.try_get::<f64, _>(i) {
Ok(v) => json!(v),
Err(_) => Value::Null,
}
},
"text" | "varchar" => {
match row.try_get::<String, _>(i) {
Ok(v) => json!(v),
Err(_) => Value::Null,
}
},
"bool" => {
match row.try_get::<bool, _>(i) {
Ok(v) => json!(v),
Err(_) => Value::Null,
}
},
"json" | "jsonb" => {
match row.try_get::<Value, _>(i) {
Ok(v) => v,
Err(_) => Value::Null,
}
},
_ => Value::Null,
};
result.insert(column_name.to_string(), value);
}
Ok(Value::Object(result))
}
// Helper function to parse the filter string into SQL WHERE clause and parameters
fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn Error>> {
let parts: Vec<&str> = filter_str.split('=').collect();
if parts.len() != 2 {
return Err("Invalid filter format. Expected 'KEY=VALUE'".into());
}
let column = parts[0].trim();
let value = parts[1].trim();
// Validate column name to prevent SQL injection
if !column.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err("Invalid column name in filter".into());
}
// Return the parameterized query part and the value separately
Ok((format!("{} = $1", column), vec![value.to_string()]))
}
// Database connection setup
pub async fn create_pool(database_url: &str) -> Result<PgPool, Box<dyn Error>> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(database_url)
.await?;
// Test the connection
sqlx::query("SELECT 1").execute(&pool).await?;
Ok(pool)
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::postgres::PgPoolOptions;
use dotenv::dotenv;
use std::env;
async fn setup_test_db() -> PgPool {
dotenv().ok();
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in .env for tests");
let pool = PgPoolOptions::new()
.max_connections(1)
.connect(&database_url)
.await
.unwrap();
// Create a test table
sqlx::query(
r#"
DROP TABLE IF EXISTS rob;
CREATE TABLE rob (
id SERIAL PRIMARY KEY,
action TEXT,
name TEXT,
is_active BOOLEAN,
metadata JSONB
)
"#
)
.execute(&pool)
.await
.unwrap();
// Insert test data
sqlx::query(
r#"
INSERT INTO rob (action, name, is_active, metadata) VALUES
('EMUL1', 'Robot1', true, '{"version": 1}'),
('EMUL2', 'Robot2', false, '{"version": 2}'),
('EMUL1', 'Robot3', true, null)
"#
)
.execute(&pool)
.await
.unwrap();
pool
}
#[tokio::test]
async fn test_execute_find() {
let pool = setup_test_db().await;
let result = execute_find(&pool, "rob", "action=EMUL1")
.await
.unwrap();
let results = result["results"].as_array().unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0]["action"], "EMUL1");
assert_eq!(results[1]["action"], "EMUL1");
// Test JSON field
assert_eq!(results[0]["metadata"]["version"], 1);
// Test boolean field
assert_eq!(results[0]["is_active"], true);
}
}

158
src/services/find.rs Normal file
View file

@ -0,0 +1,158 @@
use serde_json::{json, Value};
use sqlx::postgres::PgPoolOptions;
use sqlx::Column; // Required for .name() method
use sqlx::TypeInfo; // Required for .type_info() method
use sqlx::{postgres::PgRow, PgPool, Row};
use std::error::Error;
use std::time::Duration;
pub async fn execute_find(
pool: &PgPool,
table_str: &str,
filter_str: &str,
) -> Result<Value, String> { // Changed to String error like your Actix code
println!("Starting execute_find with table: {}, filter: {}", table_str, filter_str);
let (where_clause, params) = parse_filter(filter_str)
.map_err(|e| e.to_string())?;
let query = format!("SELECT * FROM {} WHERE {} LIMIT 10", table_str, where_clause);
println!("Executing query: {}", query);
// Use the same simple pattern as your Actix code - no timeout wrapper
let rows = sqlx::query(&query)
.bind(&params[0]) // Simplified like your working code
.fetch_all(pool)
.await
.map_err(|e| {
eprintln!("SQL execution error: {}", e);
e.to_string()
})?;
println!("Query successful, got {} rows", rows.len());
let mut results = Vec::new();
for row in rows {
results.push(row_to_json(row).map_err(|e| e.to_string())?);
}
Ok(json!({
"command": "find",
"table": table_str,
"filter": filter_str,
"results": results
}))
}
fn row_to_json(row: PgRow) -> Result<Value, Box<dyn Error>> {
let mut result = serde_json::Map::new();
let columns = row.columns();
println!("Processing {} columns", columns.len());
for (i, column) in columns.iter().enumerate() {
let column_name = column.name();
let type_name = column.type_info().name();
println!(
"Processing column {}: {} (type: {})",
i, column_name, type_name
);
let value: Value = match type_name {
"INT4" | "INT8" | "int4" | "int8" => match row.try_get::<i64, _>(i) {
Ok(v) => {
println!("Got int64 value: {}", v);
json!(v)
}
Err(e) => {
println!("Failed to get int64, trying i32: {}", e);
match row.try_get::<i32, _>(i) {
Ok(v) => json!(v as i64),
Err(_) => Value::Null,
}
}
},
"FLOAT4" | "FLOAT8" | "float4" | "float8" => match row.try_get::<f64, _>(i) {
Ok(v) => {
println!("Got float64 value: {}", v);
json!(v)
}
Err(e) => {
println!("Failed to get float64, trying f32: {}", e);
match row.try_get::<f32, _>(i) {
Ok(v) => json!(v as f64),
Err(_) => Value::Null,
}
}
},
"TEXT" | "VARCHAR" | "text" | "varchar" => match row.try_get::<String, _>(i) {
Ok(v) => {
println!("Got string value: {}", v);
json!(v)
}
Err(e) => {
println!("Failed to get string: {}", e);
Value::Null
}
},
"BOOL" | "bool" => match row.try_get::<bool, _>(i) {
Ok(v) => {
println!("Got bool value: {}", v);
json!(v)
}
Err(e) => {
println!("Failed to get bool: {}", e);
Value::Null
}
},
"JSON" | "JSONB" | "json" | "jsonb" => match row.try_get::<Value, _>(i) {
Ok(v) => {
println!("Got JSON value: {:?}", v);
v
}
Err(e) => {
println!("Failed to get JSON, trying as string: {}", e);
match row.try_get::<String, _>(i) {
Ok(s) => match serde_json::from_str(&s) {
Ok(v) => v,
Err(_) => json!(s),
},
Err(_) => Value::Null,
}
}
},
_ => {
println!("Unknown type {}, trying as string", type_name);
match row.try_get::<String, _>(i) {
Ok(v) => json!(v),
Err(_) => Value::Null,
}
}
};
result.insert(column_name.to_string(), value);
}
println!("Finished processing row, got {} fields", result.len());
Ok(Value::Object(result))
}
// Helper function to parse the filter string into SQL WHERE clause and parameters
fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn Error>> {
let parts: Vec<&str> = filter_str.split('=').collect();
if parts.len() != 2 {
return Err("Invalid filter format. Expected 'KEY=VALUE'".into());
}
let column = parts[0].trim();
let value = parts[1].trim();
// Validate column name to prevent SQL injection
if !column
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
{
return Err("Invalid column name in filter".into());
}
// Return the parameterized query part and the value separately
Ok((format!("{} = $1", column), vec![value.to_string()]))
}

View file

@ -52,7 +52,7 @@ pub async fn chat(
// Define available tools based on context
let tools = get_available_tools(&request.context);
// Build the prompt with context and available tools
let system_prompt = build_system_prompt(&request.context, &tools);
let user_message = format!("{}\n\nUser input: {}", system_prompt, request.input);

View file

@ -1,11 +1,14 @@
use smartstring::SmartString;
use anyhow::Error;
use rhai::module_resolvers::StaticModuleResolver;
use rhai::{Array, Dynamic, Engine, FnPtr, Scope};
use rhai::{EvalAltResult, ImmutableString, LexError, ParseError, ParseErrorType, Position};
use serde_json::{json, Value};
use smartstring::SmartString;
use std::collections::HashMap;
use crate::services::find::execute_find;
use crate::services::state::AppState;
pub struct ScriptService {
engine: Engine,
module_resolver: StaticModuleResolver,
@ -53,7 +56,7 @@ fn to_array(value: Dynamic) -> Array {
}
impl ScriptService {
pub fn new() -> Self {
pub fn new(state: &AppState) -> Self {
let mut engine = Engine::new();
let module_resolver = StaticModuleResolver::new();
@ -140,50 +143,41 @@ impl ScriptService {
.unwrap();
// FIND command: FIND "table", "filter"
// Clone the database reference outside the closure to avoid lifetime issues
let db = state.db_custom.clone();
engine
.register_custom_syntax(
&["FIND", "$expr$", ",", "$expr$"],
false, // Expression, not statement
|context, inputs| {
.register_custom_syntax(&["FIND", "$expr$", ",", "$expr$"], false, {
let db = db.clone();
move |context, inputs| {
let table_name = context.eval_expression_tree(&inputs[0])?;
let filter = context.eval_expression_tree(&inputs[1])?;
let binding = db.as_ref().unwrap();
let table_str = table_name.to_string();
let filter_str = filter.to_string();
// Use the current async context instead of creating a new runtime
let binding2 = table_name.to_string();
let binding3 = filter.to_string();
let fut = execute_find(
binding,
&binding2,
&binding3,
);
use serde_json::json;
// Use tokio::task::block_in_place + tokio::runtime::Handle::current().block_on
let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(fut)
})
.map_err(|e| format!("DB error: {}", e))?;
let result = json!({
"command": "find",
"table": table_str,
"filter": filter_str,
"results": [
{
"id": 1,
"name": "dummy1"
},
{
"id": 2,
"name": "dummy2"
}
]
});
if let serde_json::Value::Object(ref obj) = result {
if let Some(results_value) = obj.get("results") {
let dynamic_results = json_value_to_dynamic(results_value);
// Now you can work with it as Dynamic
let array = to_array(dynamic_results);
Ok(Dynamic::from(array))
} else {
Err("No results found".into())
}
if let Some(results) = result.get("results") {
let array = to_array(json_value_to_dynamic(results));
Ok(Dynamic::from(array))
} else {
Err("Invalid result format".into())
Err("No results".into())
}
},
)
}
})
.unwrap();
// SET command: SET "table", "key", "value"
@ -425,81 +419,3 @@ impl ScriptService {
self.run(&ast)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_script_without_semicolons() {
let service = ScriptService::new();
// Test BASIC-style script without semicolons
let script = r#"
json = FIND "users", "name=John"
SET "users", "name=John", "age=30"
text = GET "example.com"
CREATE SITE "mysite", "My Company", "mycompany.com", "basic", "Create a professional site"
CREATE DRAFT "client@example.com", "Project Update", "Here's the latest update..."
PRINT "Script completed successfully"
"#;
let result = service.execute_basic_script(script);
assert!(result.is_ok());
}
#[test]
fn test_preprocessing() {
let service = ScriptService::new();
let script = r#"
json = FIND "users", "name=John"
SET "users", "name=John", "age=30"
let x = 42
PRINT x
if x > 10 {
PRINT "Large number"
}
"#;
let processed = service.preprocess_basic_script(script);
// Should add semicolons to regular statements but not custom commands
assert!(processed.contains("let x = 42;"));
assert!(processed.contains("json = FIND"));
assert!(!processed.contains("SET \"users\""));
assert!(!processed.contains("PRINT \"Large number\";")); // Inside block shouldn't get semicolon
}
#[test]
fn test_individual_commands() {
let service = ScriptService::new();
let commands = vec![
r#"SET "users", "name=John", "age=30""#,
r#"CREATE SITE "mysite", "My Company", "mycompany.com", "basic", "Create a professional site""#,
r#"CREATE DRAFT "client@example.com", "Project Update", "Here's the latest update...""#,
r#"PRINT "Hello, World!""#,
];
for cmd in commands {
let result = service.execute_basic_script(cmd);
assert!(result.is_ok(), "Command '{}' failed", cmd);
}
}
#[test]
fn test_block_statements() {
let service = ScriptService::new();
let script = r#"
if true {
PRINT "Inside block"
PRINT "Another statement"
}
"#;
let result = service.execute_basic_script(script);
assert!(result.is_ok());
}
}

View file

@ -8,6 +8,6 @@ pub struct AppState {
pub minio_client: Option<Client>,
pub config: Option<AppConfig>,
pub db: Option<sqlx::PgPool>,
pub db_custom: Option<sqlx::PgPool>,
}