- Logging review.
Some checks failed
GBCI / build (push) Has been cancelled

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-09-17 20:11:56 -03:00
parent f10522bf76
commit e5dfcf2659
25 changed files with 298 additions and 281 deletions

4
Cargo.lock generated
View file

@ -2705,9 +2705,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.27" version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
dependencies = [ dependencies = [
"value-bag", "value-bag",
] ]

View file

@ -34,7 +34,7 @@ lettre = { version = "0.10", features = [
"tokio1", "tokio1",
"tokio1-native-tls", "tokio1-native-tls",
] } ] }
log = "0.4" log = "0.4.28"
mailparse = "0.13" mailparse = "0.13"
minio = { git = "https://github.com/minio/minio-rs", branch = "master" } minio = { git = "https://github.com/minio/minio-rs", branch = "master" }
native-tls = "0.2" native-tls = "0.2"

View file

@ -1,4 +1,5 @@
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use log::info;
use std::sync::Arc; use std::sync::Arc;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
@ -8,11 +9,12 @@ use services::{config::*, file::*};
use sqlx::PgPool; use sqlx::PgPool;
use crate::services::automation::AutomationService; use crate::services::automation::AutomationService;
use crate::services::email::{get_emails, list_emails, save_click, send_email}; use crate::services::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
};
use crate::services::llm::{chat, chat_stream}; use crate::services::llm::{chat, chat_stream};
use crate::services::llm_local::ensure_llama_servers_running; use crate::services::llm_local::ensure_llama_servers_running;
use crate::services::llm_local::{chat_completions_local, embeddings_local}; use crate::services::llm_local::{chat_completions_local, embeddings_local};
use crate::services::llm_provider::chat_completions;
use crate::services::web_automation::{initialize_browser_pool, BrowserPool}; use crate::services::web_automation::{initialize_browser_pool, BrowserPool};
mod models; mod models;
@ -22,6 +24,7 @@ mod services;
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
dotenv().ok(); dotenv().ok();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
info!("Starting General Bots 6.0...");
let config = AppConfig::from_env(); let config = AppConfig::from_env();
let db_url = config.database_url(); let db_url = config.database_url();
@ -84,7 +87,9 @@ async fn main() -> std::io::Result<()> {
.service(chat_stream) .service(chat_stream)
.service(chat_completions_local) .service(chat_completions_local)
.service(chat) .service(chat)
.service(save_draft)
.service(embeddings_local) .service(embeddings_local)
.service(get_latest_email_from)
}) })
.bind((config.server.host.clone(), config.server.port))? .bind((config.server.host.clone(), config.server.port))?
.run() .run()

View file

@ -4,10 +4,10 @@ use crate::services::state::AppState;
use chrono::Datelike; use chrono::Datelike;
use chrono::Timelike; use chrono::Timelike;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use log::{error, info};
use std::path::Path; use std::path::Path;
use tokio::time::Duration; use tokio::time::Duration;
use uuid::Uuid; use uuid::Uuid;
pub struct AutomationService { pub struct AutomationService {
state: AppState, // Use web::Data directly state: AppState, // Use web::Data directly
scripts_dir: String, scripts_dir: String,
@ -30,7 +30,7 @@ impl AutomationService {
interval.tick().await; interval.tick().await;
if let Err(e) = self.run_cycle(&mut last_check).await { if let Err(e) = self.run_cycle(&mut last_check).await {
eprintln!("Automation cycle error: {}", e); error!("Automation cycle error: {}", e);
} }
} }
}) })
@ -94,7 +94,7 @@ impl AutomationService {
} }
} }
Err(e) => { Err(e) => {
eprintln!("Error checking changes for table {}: {}", table, e); error!("Error checking changes for table {}: {}", table, e);
} }
} }
} }
@ -128,8 +128,9 @@ impl AutomationService {
) )
.execute(pool) .execute(pool)
.await .await
{ {
eprintln!( error!(
"Failed to update last_triggered for automation {}: {}", "Failed to update last_triggered for automation {}: {}",
automation_id, e automation_id, e
); );
@ -177,20 +178,20 @@ impl AutomationService {
let full_path = Path::new(&self.scripts_dir).join(param); let full_path = Path::new(&self.scripts_dir).join(param);
match tokio::fs::read_to_string(&full_path).await { match tokio::fs::read_to_string(&full_path).await {
Ok(script_content) => { Ok(script_content) => {
println!("Executing action with param: {}", param); info!("Executing action with param: {}", param);
let script_service = ScriptService::new(&self.state.clone()); let script_service = ScriptService::new(&self.state.clone());
match script_service.compile(&script_content) { match script_service.compile(&script_content) {
Ok(ast) => match script_service.run(&ast) { Ok(ast) => match script_service.run(&ast) {
Ok(result) => println!("Script executed successfully: {:?}", result), Ok(result) => info!("Script executed successfully: {:?}", result),
Err(e) => eprintln!("Error executing script: {}", e), Err(e) => error!("Error executing script: {}", e),
}, },
Err(e) => eprintln!("Error compiling script: {}", e), Err(e) => error!("Error compiling script: {}", e),
} }
} }
Err(e) => { Err(e) => {
eprintln!("Failed to execute action {}: {}", full_path.display(), e); error!("Failed to execute action {}: {}", full_path.display(), e);
} }
} }
} }

View file

@ -1,4 +1,6 @@
use crate::services::{config::EmailConfig, state::AppState}; use crate::services::{config::EmailConfig, state::AppState};
use log::info;
use actix_web::error::ErrorInternalServerError; use actix_web::error::ErrorInternalServerError;
use actix_web::http::header::ContentType; use actix_web::http::header::ContentType;
use actix_web::{web, HttpResponse, Result}; use actix_web::{web, HttpResponse, Result};
@ -263,7 +265,7 @@ pub async fn save_email_draft(
Ok(chrono::Utc::now().timestamp().to_string()) Ok(chrono::Utc::now().timestamp().to_string())
} }
pub async fn fetch_latest_email_from_sender( async fn fetch_latest_email_from_sender(
email_config: &EmailConfig, email_config: &EmailConfig,
from_email: &str, from_email: &str,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<String, Box<dyn std::error::Error>> {
@ -479,9 +481,9 @@ pub async fn send_email(
) -> Result<HttpResponse, actix_web::Error> { ) -> Result<HttpResponse, actix_web::Error> {
let (to, subject, body) = payload.into_inner(); let (to, subject, body) = payload.into_inner();
println!("To: {}", to); info!("To: {}", to);
println!("Subject: {}", subject); info!("Subject: {}", subject);
println!("Body: {}", body); info!("Body: {}", body);
// Send via SMTP // Send via SMTP
internal_send_email(&state.config.clone().unwrap().email, &to, &subject, &body).await; internal_send_email(&state.config.clone().unwrap().email, &to, &subject, &body).await;

View file

@ -1,9 +1,11 @@
use log::info;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use std::error::Error; use std::error::Error;
use std::fs; use std::fs;
use std::path::{ PathBuf};
use std::io::Read; use std::io::Read;
use std::path::PathBuf;
use crate::services::state::AppState; use crate::services::state::AppState;
use crate::services::utils; use crate::services::utils;
@ -12,10 +14,7 @@ pub fn create_site_keyword(state: &AppState, engine: &mut Engine) {
let state_clone = state.clone(); let state_clone = state.clone();
engine engine
.register_custom_syntax( .register_custom_syntax(
&[ &["CREATE_SITE", "$expr$", ",", "$expr$", ",", "$expr$"],
"CREATE_SITE", "$expr$", ",", "$expr$", ",", "$expr$",
],
true, true,
move |context, inputs| { move |context, inputs| {
if inputs.len() < 3 { if inputs.len() < 3 {
@ -25,9 +24,13 @@ pub fn create_site_keyword(state: &AppState, engine: &mut Engine) {
let alias = context.eval_expression_tree(&inputs[0])?; let alias = context.eval_expression_tree(&inputs[0])?;
let template_dir = context.eval_expression_tree(&inputs[1])?; let template_dir = context.eval_expression_tree(&inputs[1])?;
let prompt = context.eval_expression_tree(&inputs[2])?; let prompt = context.eval_expression_tree(&inputs[2])?;
let config = state_clone.config.as_ref().expect("Config must be initialized").clone(); let config = state_clone
.config
.as_ref()
.expect("Config must be initialized")
.clone();
let fut = create_site(&config, alias, template_dir, prompt); let fut = create_site(&config, alias, template_dir, prompt);
let result = let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
@ -55,16 +58,17 @@ async fn create_site(
// Process all HTML files in template directory // Process all HTML files in template directory
let mut combined_content = String::new(); let mut combined_content = String::new();
for entry in fs::read_dir(&template_path).map_err(|e| e.to_string())? { for entry in fs::read_dir(&template_path).map_err(|e| e.to_string())? {
let entry = entry.map_err(|e| e.to_string())?; let entry = entry.map_err(|e| e.to_string())?;
let path = entry.path(); let path = entry.path();
if path.extension().map_or(false, |ext| ext == "html") { if path.extension().map_or(false, |ext| ext == "html") {
let mut file = fs::File::open(&path).map_err(|e| e.to_string())?; let mut file = fs::File::open(&path).map_err(|e| e.to_string())?;
let mut contents = String::new(); let mut contents = String::new();
file.read_to_string(&mut contents).map_err(|e| e.to_string())?; file.read_to_string(&mut contents)
.map_err(|e| e.to_string())?;
combined_content.push_str(&contents); combined_content.push_str(&contents);
combined_content.push_str("\n\n--- TEMPLATE SEPARATOR ---\n\n"); combined_content.push_str("\n\n--- TEMPLATE SEPARATOR ---\n\n");
} }
@ -78,13 +82,13 @@ async fn create_site(
); );
// Call LLM with the combined prompt // Call LLM with the combined prompt
println!("Asking LLM to create site."); info!("Asking LLM to create site.");
let llm_result = utils::call_llm(&full_prompt, &config.ai).await?; let llm_result = utils::call_llm(&full_prompt, &config.ai).await?;
// Write the generated HTML file // Write the generated HTML file
let index_path = alias_path.join("index.html"); let index_path = alias_path.join("index.html");
fs::write(index_path, llm_result).map_err(|e| e.to_string())?; fs::write(index_path, llm_result).map_err(|e| e.to_string())?;
println!("Site created at: {}", alias_path.display()); info!("Site created at: {}", alias_path.display());
Ok(alias_path.to_string_lossy().into_owned()) Ok(alias_path.to_string_lossy().into_owned())
} }

View file

@ -1,14 +1,14 @@
use log::{error, info};
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use serde_json::{json, Value}; use serde_json::{json, Value};
use sqlx::{PgPool}; use sqlx::PgPool;
use crate::services::state::AppState; use crate::services::state::AppState;
use crate::services::utils; use crate::services::utils;
use crate::services::utils::row_to_json; use crate::services::utils::row_to_json;
use crate::services::utils::to_array; use crate::services::utils::to_array;
pub fn find_keyword(state: &AppState, engine: &mut Engine) { pub fn find_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone(); let db = state.db_custom.clone();
@ -48,7 +48,7 @@ pub async fn execute_find(
filter_str: &str, filter_str: &str,
) -> Result<Value, String> { ) -> Result<Value, String> {
// Changed to String error like your Actix code // Changed to String error like your Actix code
println!( info!(
"Starting execute_find with table: {}, filter: {}", "Starting execute_find with table: {}, filter: {}",
table_str, filter_str table_str, filter_str
); );
@ -59,7 +59,7 @@ pub async fn execute_find(
"SELECT * FROM {} WHERE {} LIMIT 10", "SELECT * FROM {} WHERE {} LIMIT 10",
table_str, where_clause table_str, where_clause
); );
println!("Executing query: {}", query); info!("Executing query: {}", query);
// Use the same simple pattern as your Actix code - no timeout wrapper // Use the same simple pattern as your Actix code - no timeout wrapper
let rows = sqlx::query(&query) let rows = sqlx::query(&query)
@ -67,11 +67,11 @@ pub async fn execute_find(
.fetch_all(pool) .fetch_all(pool)
.await .await
.map_err(|e| { .map_err(|e| {
eprintln!("SQL execution error: {}", e); error!("SQL execution error: {}", e);
e.to_string() e.to_string()
})?; })?;
println!("Query successful, got {} rows", rows.len()); info!("Query successful, got {} rows", rows.len());
let mut results = Vec::new(); let mut results = Vec::new();
for row in rows { for row in rows {
@ -85,4 +85,3 @@ pub async fn execute_find(
"results": results "results": results
})) }))
} }

View file

@ -1,9 +1,9 @@
use crate::services::state::AppState;
use log::info;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use crate::services::state::AppState;
pub fn for_keyword(_state: &AppState, engine: &mut Engine) { pub fn for_keyword(_state: &AppState, engine: &mut Engine) {
engine engine
.register_custom_syntax(&["EXIT", "FOR"], false, |_context, _inputs| { .register_custom_syntax(&["EXIT", "FOR"], false, |_context, _inputs| {
Err("EXIT FOR".into()) Err("EXIT FOR".into())
@ -34,7 +34,7 @@ pub fn for_keyword(_state: &AppState, engine: &mut Engine) {
let collection = context.eval_expression_tree(&inputs[1])?; let collection = context.eval_expression_tree(&inputs[1])?;
// Debug: Print the collection type // Debug: Print the collection type
println!("Collection type: {}", collection.type_name()); info!("Collection type: {}", collection.type_name());
let ccc = collection.clone(); let ccc = collection.clone();
// Convert to array - with proper error handling // Convert to array - with proper error handling
let array = match collection.into_array() { let array = match collection.into_array() {

View file

@ -1,3 +1,5 @@
use log::info;
use crate::services::state::AppState; use crate::services::state::AppState;
use reqwest::{self, Client}; use reqwest::{self, Client};
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
@ -31,7 +33,7 @@ pub fn get_keyword(_state: &AppState, engine: &mut Engine) {
}; };
if modified_url.starts_with("https://") { if modified_url.starts_with("https://") {
println!("HTTPS GET request: {}", modified_url); info!("HTTPS GET request: {}", modified_url);
let fut = execute_get(&modified_url); let fut = execute_get(&modified_url);
let result = let result =
@ -57,7 +59,7 @@ pub fn get_keyword(_state: &AppState, engine: &mut Engine) {
} }
pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Sync>> { pub async fn execute_get(url: &str) -> Result<String, Box<dyn Error + Send + Sync>> {
println!("Starting execute_get with URL: {}", url); info!("Starting execute_get with URL: {}", url);
// Create a client that ignores invalid certificates // Create a client that ignores invalid certificates
let client = Client::builder() let client = Client::builder()

View file

@ -1,4 +1,5 @@
use crate::services::{state::AppState, web_automation::BrowserPool}; use crate::services::{state::AppState, web_automation::BrowserPool};
use log::info;
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
@ -15,18 +16,11 @@ pub fn get_website_keyword(state: &AppState, engine: &mut Engine) {
false, false,
move |context, inputs| { move |context, inputs| {
let search_term = context.eval_expression_tree(&inputs[0])?.to_string(); let search_term = context.eval_expression_tree(&inputs[0])?.to_string();
println!( info!("GET WEBSITE executed - Search: '{}'", search_term);
"GET WEBSITE executed - Search: '{}'",
search_term
);
let browser_pool_clone = browser_pool.clone(); let browser_pool_clone = browser_pool.clone();
let fut = execute_headless_browser_search( let fut = execute_headless_browser_search(browser_pool_clone, &search_term);
browser_pool_clone,
&search_term
);
let result = let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
@ -40,26 +34,22 @@ pub fn get_website_keyword(state: &AppState, engine: &mut Engine) {
pub async fn execute_headless_browser_search( pub async fn execute_headless_browser_search(
browser_pool: Arc<BrowserPool>, // Adjust path as needed browser_pool: Arc<BrowserPool>, // Adjust path as needed
search_term: &str) -> Result<String, Box<dyn Error + Send + Sync>> { search_term: &str,
println!( ) -> Result<String, Box<dyn Error + Send + Sync>> {
"Starting headless browser search: '{}' ", info!("Starting headless browser search: '{}' ", search_term);
search_term
);
let search_term = search_term.to_string(); let search_term = search_term.to_string();
let result = browser_pool let result = browser_pool
.with_browser(|driver| { .with_browser(|driver| Box::pin(async move { perform_search(driver, &search_term).await }))
Box::pin(async move { perform_search(driver, &search_term).await })
})
.await?; .await?;
Ok(result) Ok(result)
} }
async fn perform_search( async fn perform_search(
driver: WebDriver, driver: WebDriver,
search_term: &str) -> Result<String, Box<dyn Error + Send + Sync>> { search_term: &str,
) -> Result<String, Box<dyn Error + Send + Sync>> {
// Navigate to DuckDuckGo // Navigate to DuckDuckGo
driver.goto("https://duckduckgo.com").await?; driver.goto("https://duckduckgo.com").await?;
@ -96,13 +86,13 @@ async fn extract_search_results(
// Modern DuckDuckGo (as seen in the HTML) // Modern DuckDuckGo (as seen in the HTML)
"a[data-testid='result-title-a']", // Primary result links "a[data-testid='result-title-a']", // Primary result links
"a[data-testid='result-extras-url-link']", // URL links in results "a[data-testid='result-extras-url-link']", // URL links in results
"a.eVNpHGjtxRBq_gLOfGDr", // Class-based selector for result titles "a.eVNpHGjtxRBq_gLOfGDr", // Class-based selector for result titles
"a.Rn_JXVtoPVAFyGkcaXyK", // Class-based selector for URL links "a.Rn_JXVtoPVAFyGkcaXyK", // Class-based selector for URL links
".ikg2IXiCD14iVX7AdZo1 a", // Heading container links ".ikg2IXiCD14iVX7AdZo1 a", // Heading container links
".OQ_6vPwNhCeusNiEDcGp a", // URL container links ".OQ_6vPwNhCeusNiEDcGp a", // URL container links
// Fallback selectors // Fallback selectors
".result__a", // Classic DuckDuckGo ".result__a", // Classic DuckDuckGo
"a.result-link", // Alternative "a.result-link", // Alternative
".result a[href]", // Generic result links ".result a[href]", // Generic result links
]; ];
@ -111,11 +101,11 @@ async fn extract_search_results(
for element in elements { for element in elements {
if let Ok(Some(href)) = element.attr("href").await { if let Ok(Some(href)) = element.attr("href").await {
// Filter out internal and non-http links // Filter out internal and non-http links
if href.starts_with("http") if href.starts_with("http")
&& !href.contains("duckduckgo.com") && !href.contains("duckduckgo.com")
&& !href.contains("duck.co") && !href.contains("duck.co")
&& !results.contains(&href) { && !results.contains(&href)
{
// Get the display URL for verification // Get the display URL for verification
let display_url = if let Ok(text) = element.text().await { let display_url = if let Ok(text) = element.text().await {
text.trim().to_string() text.trim().to_string()
@ -140,4 +130,4 @@ async fn extract_search_results(
results.dedup(); results.dedup();
Ok(results) Ok(results)
} }

View file

@ -1,28 +1,30 @@
use rhai::{Dynamic, Engine}; use log::info;
use crate::services::{state::AppState, utils::call_llm}; use crate::services::{state::AppState, utils::call_llm};
use rhai::{Dynamic, Engine};
pub fn llm_keyword(state: &AppState, engine: &mut Engine) { pub fn llm_keyword(state: &AppState, engine: &mut Engine) {
let ai_config = state.config.clone().unwrap().ai.clone(); let ai_config = state.config.clone().unwrap().ai.clone();
engine.register_custom_syntax( engine
&["LLM", "$expr$"], // Syntax: LLM "text to process" .register_custom_syntax(
false, // Expression, not statement &["LLM", "$expr$"], // Syntax: LLM "text to process"
move |context, inputs| { false, // Expression, not statement
let text = context.eval_expression_tree(&inputs[0])?; move |context, inputs| {
let text_str = text.to_string(); let text = context.eval_expression_tree(&inputs[0])?;
let text_str = text.to_string();
println!("LLM processing text: {}", text_str); info!("LLM processing text: {}", text_str);
// Use the same pattern as GET
let fut = call_llm( // Use the same pattern as GET
&text_str, &ai_config);
let result = tokio::task::block_in_place(|| { let fut = call_llm(&text_str, &ai_config);
tokio::runtime::Handle::current().block_on(fut) let result =
}).map_err(|e| format!("LLM call failed: {}", e))?; tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
.map_err(|e| format!("LLM call failed: {}", e))?;
Ok(Dynamic::from(result))
} Ok(Dynamic::from(result))
).unwrap(); },
)
.unwrap();
} }

View file

@ -1,3 +1,4 @@
use log::{error, info};
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -54,7 +55,7 @@ pub async fn execute_on_trigger(
table: &str, table: &str,
script_name: &str, script_name: &str,
) -> Result<Value, String> { ) -> Result<Value, String> {
println!( info!(
"Starting execute_on_trigger with kind: {:?}, table: {}, script_name: {}", "Starting execute_on_trigger with kind: {:?}, table: {}, script_name: {}",
kind, table, script_name kind, table, script_name
); );
@ -71,7 +72,7 @@ pub async fn execute_on_trigger(
.execute(pool) .execute(pool)
.await .await
.map_err(|e| { .map_err(|e| {
eprintln!("SQL execution error: {}", e); error!("SQL execution error: {}", e);
e.to_string() e.to_string()
})?; })?;

View file

@ -1,10 +1,10 @@
use log::info;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use crate::services::state::AppState; use crate::services::state::AppState;
pub fn print_keyword(_state: &AppState, engine: &mut Engine) { pub fn print_keyword(_state: &AppState, engine: &mut Engine) {
// PRINT command // PRINT command
engine engine
.register_custom_syntax( .register_custom_syntax(
@ -12,7 +12,7 @@ pub fn print_keyword(_state: &AppState, engine: &mut Engine) {
true, // Statement true, // Statement
|context, inputs| { |context, inputs| {
let value = context.eval_expression_tree(&inputs[0])?; let value = context.eval_expression_tree(&inputs[0])?;
println!("{}", value); info!("{}", value);
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}, },
) )

View file

@ -49,7 +49,7 @@ pub async fn execute_{keyword_name}(
pool: &PgPool, pool: &PgPool,
{params_with_types} {params_with_types}
) -> Result<Value, Box<dyn std::error::Error>> { ) -> Result<Value, Box<dyn std::error::Error>> {
println!("Executing {keyword_name} with: {debug_params}"); info!("Executing {keyword_name} with: {debug_params}");
let result = sqlx::query( let result = sqlx::query(
"{sql_query_with_i32_enum}" "{sql_query_with_i32_enum}"
@ -113,7 +113,7 @@ pub async fn execute_set_schedule(
cron: &str, cron: &str,
script_name: &str, script_name: &str,
) -> Result<Value, Box<dyn std::error::Error>> { ) -> Result<Value, Box<dyn std::error::Error>> {
println!("Executing schedule: {}, {}", cron, script_name); info!("Executing schedule: {}, {}", cron, script_name);
let result = sqlx::query( let result = sqlx::query(
"INSERT INTO system_automations "INSERT INTO system_automations

View file

@ -1,3 +1,4 @@
use log::{error, info};
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -47,7 +48,7 @@ pub async fn execute_set(
filter_str: &str, filter_str: &str,
updates_str: &str, updates_str: &str,
) -> Result<Value, String> { ) -> Result<Value, String> {
println!( info!(
"Starting execute_set with table: {}, filter: {}, updates: {}", "Starting execute_set with table: {}, filter: {}, updates: {}",
table_str, filter_str, updates_str table_str, filter_str, updates_str
); );
@ -65,7 +66,7 @@ pub async fn execute_set(
"UPDATE {} SET {} WHERE {}", "UPDATE {} SET {} WHERE {}",
table_str, set_clause, where_clause table_str, set_clause, where_clause
); );
println!("Executing query: {}", query); info!("Executing query: {}", query);
// Build query with proper parameter binding // Build query with proper parameter binding
let mut query = sqlx::query(&query); let mut query = sqlx::query(&query);
@ -81,7 +82,7 @@ pub async fn execute_set(
} }
let result = query.execute(pool).await.map_err(|e| { let result = query.execute(pool).await.map_err(|e| {
eprintln!("SQL execution error: {}", e); error!("SQL execution error: {}", e);
e.to_string() e.to_string()
})?; })?;

View file

@ -1,3 +1,4 @@
use log::info;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -39,7 +40,7 @@ pub async fn execute_set_schedule(
cron: &str, cron: &str,
script_name: &str, script_name: &str,
) -> Result<Value, Box<dyn std::error::Error>> { ) -> Result<Value, Box<dyn std::error::Error>> {
println!( info!(
"Starting execute_set_schedule with cron: {}, script_name: {}", "Starting execute_set_schedule with cron: {}, script_name: {}",
cron, script_name cron, script_name
); );

View file

@ -1,39 +1,46 @@
use rhai::{Dynamic, Engine};
use crate::services::state::AppState; use crate::services::state::AppState;
use log::info;
use rhai::{Dynamic, Engine};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
pub fn wait_keyword(_state: &AppState, engine: &mut Engine) { pub fn wait_keyword(_state: &AppState, engine: &mut Engine) {
engine.register_custom_syntax( engine
&["WAIT", "$expr$"], .register_custom_syntax(
false, // Expression, not statement &["WAIT", "$expr$"],
move |context, inputs| { false, // Expression, not statement
let seconds = context.eval_expression_tree(&inputs[0])?; move |context, inputs| {
let seconds = context.eval_expression_tree(&inputs[0])?;
// Convert to number (handle both int and float)
let duration_secs = if seconds.is::<i64>() { // Convert to number (handle both int and float)
seconds.cast::<i64>() as f64 let duration_secs = if seconds.is::<i64>() {
} else if seconds.is::<f64>() { seconds.cast::<i64>() as f64
seconds.cast::<f64>() } else if seconds.is::<f64>() {
} else { seconds.cast::<f64>()
return Err(format!("WAIT expects a number, got: {}", seconds).into()); } else {
}; return Err(format!("WAIT expects a number, got: {}", seconds).into());
};
if duration_secs < 0.0 {
return Err("WAIT duration cannot be negative".into()); if duration_secs < 0.0 {
} return Err("WAIT duration cannot be negative".into());
}
// Cap maximum wait time to prevent abuse (e.g., 5 minutes max)
let capped_duration = if duration_secs > 300.0 { 300.0 } else { duration_secs }; // Cap maximum wait time to prevent abuse (e.g., 5 minutes max)
let capped_duration = if duration_secs > 300.0 {
println!("WAIT {} seconds (thread sleep)", capped_duration); 300.0
} else {
// Use thread::sleep to block only the current thread, not the entire server duration_secs
let duration = Duration::from_secs_f64(capped_duration); };
thread::sleep(duration);
info!("WAIT {} seconds (thread sleep)", capped_duration);
println!("WAIT completed after {} seconds", capped_duration);
Ok(Dynamic::from(format!("Waited {} seconds", capped_duration))) // Use thread::sleep to block only the current thread, not the entire server
} let duration = Duration::from_secs_f64(capped_duration);
).unwrap(); thread::sleep(duration);
}
info!("WAIT completed after {} seconds", capped_duration);
Ok(Dynamic::from(format!("Waited {} seconds", capped_duration)))
},
)
.unwrap();
}

View file

@ -49,18 +49,18 @@ pub async fn chat(
) -> Result<HttpResponse> { ) -> Result<HttpResponse> {
let azure_config = from_config(&state.config.clone().unwrap().ai); let azure_config = from_config(&state.config.clone().unwrap().ai);
let open_ai = OpenAI::new(azure_config); let open_ai = OpenAI::new(azure_config);
// Define available tools based on context // Define available tools based on context
let tools = get_available_tools(&request.context); let tools = get_available_tools(&request.context);
// Build the prompt with context and available tools // Build the prompt with context and available tools
let system_prompt = build_system_prompt(&request.context, &tools); let system_prompt = build_system_prompt(&request.context, &tools);
let user_message = format!("{}\n\nUser input: {}", system_prompt, request.input); let user_message = format!("{}\n\nUser input: {}", system_prompt, request.input);
let response = match open_ai.invoke(&user_message).await { let response = match open_ai.invoke(&user_message).await {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
eprintln!("Error invoking API: {}", err); error!("Error invoking API: {}", err);
return Err(actix_web::error::ErrorInternalServerError( return Err(actix_web::error::ErrorInternalServerError(
"Failed to invoke OpenAI API", "Failed to invoke OpenAI API",
)); ));
@ -80,7 +80,7 @@ pub async fn chat(
fn get_available_tools(context: &Option<AppContext>) -> Vec<ToolDefinition> { fn get_available_tools(context: &Option<AppContext>) -> Vec<ToolDefinition> {
let mut tools = Vec::new(); let mut tools = Vec::new();
if let Some(ctx) = context { if let Some(ctx) = context {
if let Some(view_type) = &ctx.view_type { if let Some(view_type) = &ctx.view_type {
match view_type.as_str() { match view_type.as_str() {
@ -99,7 +99,7 @@ fn get_available_tools(context: &Option<AppContext>) -> Vec<ToolDefinition> {
"required": ["content"] "required": ["content"]
}), }),
}); });
tools.push(ToolDefinition { tools.push(ToolDefinition {
name: "forwardEmail".to_string(), name: "forwardEmail".to_string(),
description: "Forward the current email to specified recipients".to_string(), description: "Forward the current email to specified recipients".to_string(),
@ -124,13 +124,13 @@ fn get_available_tools(context: &Option<AppContext>) -> Vec<ToolDefinition> {
} }
} }
} }
tools tools
} }
fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -> String { fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -> String {
let mut prompt = String::new(); let mut prompt = String::new();
if let Some(ctx) = context { if let Some(ctx) = context {
if let Some(view_type) = &ctx.view_type { if let Some(view_type) = &ctx.view_type {
match view_type.as_str() { match view_type.as_str() {
@ -143,11 +143,11 @@ fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -
Labels: {:?}\n\n", Labels: {:?}\n\n",
email_ctx.subject, email_ctx.id, email_ctx.labels email_ctx.subject, email_ctx.id, email_ctx.labels
)); ));
if let Some(from) = &email_ctx.from { if let Some(from) = &email_ctx.from {
prompt.push_str(&format!("From: {}\n", from)); prompt.push_str(&format!("From: {}\n", from));
} }
if let Some(body) = &email_ctx.body { if let Some(body) = &email_ctx.body {
prompt.push_str(&format!("Body: {}\n", body)); prompt.push_str(&format!("Body: {}\n", body));
} }
@ -157,7 +157,7 @@ fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -
} }
} }
} }
if !tools.is_empty() { if !tools.is_empty() {
prompt.push_str("\nAvailable tools:\n"); prompt.push_str("\nAvailable tools:\n");
for tool in tools { for tool in tools {
@ -166,7 +166,7 @@ fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -
tool.name, tool.description, tool.parameters tool.name, tool.description, tool.parameters
)); ));
} }
prompt.push_str( prompt.push_str(
"If you need to use a tool, respond with:\n\ "If you need to use a tool, respond with:\n\
TOOL_CALL: tool_name\n\ TOOL_CALL: tool_name\n\
@ -175,7 +175,7 @@ fn build_system_prompt(context: &Option<AppContext>, tools: &[ToolDefinition]) -
Otherwise, just provide a normal response.\n" Otherwise, just provide a normal response.\n"
); );
} }
prompt prompt
} }
@ -183,19 +183,19 @@ fn parse_tool_calls(response: &str) -> Option<Vec<ToolCall>> {
if !response.contains("TOOL_CALL:") { if !response.contains("TOOL_CALL:") {
return None; return None;
} }
let mut tool_calls = Vec::new(); let mut tool_calls = Vec::new();
let lines: Vec<&str> = response.lines().collect(); let lines: Vec<&str> = response.lines().collect();
let mut i = 0; let mut i = 0;
while i < lines.len() { while i < lines.len() {
if lines[i].starts_with("TOOL_CALL:") { if lines[i].starts_with("TOOL_CALL:") {
let tool_name = lines[i].replace("TOOL_CALL:", "").trim().to_string(); let tool_name = lines[i].replace("TOOL_CALL:", "").trim().to_string();
// Look for parameters in the next line // Look for parameters in the next line
if i + 1 < lines.len() && lines[i + 1].starts_with("PARAMETERS:") { if i + 1 < lines.len() && lines[i + 1].starts_with("PARAMETERS:") {
let params_str = lines[i + 1].replace("PARAMETERS:", "").trim(); let params_str = lines[i + 1].replace("PARAMETERS:", "").trim();
if let Ok(parameters) = serde_json::from_str::<serde_json::Value>(params_str) { if let Ok(parameters) = serde_json::from_str::<serde_json::Value>(params_str) {
tool_calls.push(ToolCall { tool_calls.push(ToolCall {
tool_name, tool_name,
@ -206,10 +206,10 @@ fn parse_tool_calls(response: &str) -> Option<Vec<ToolCall>> {
} }
i += 1; i += 1;
} }
if tool_calls.is_empty() { if tool_calls.is_empty() {
None None
} else { } else {
Some(tool_calls) Some(tool_calls)
} }
} }

View file

@ -1,3 +1,5 @@
use log::error;
use actix_web::{ use actix_web::{
web::{self, Bytes}, web::{self, Bytes},
HttpResponse, Responder, HttpResponse, Responder,
@ -8,7 +10,7 @@ use langchain_rust::{
chain::{Chain, LLMChainBuilder}, chain::{Chain, LLMChainBuilder},
fmt_message, fmt_template, fmt_message, fmt_template,
language_models::llm::LLM, language_models::llm::LLM,
llm::{openai::OpenAI}, llm::openai::OpenAI,
message_formatter, message_formatter,
prompt::HumanMessagePromptTemplate, prompt::HumanMessagePromptTemplate,
prompt_args, prompt_args,
@ -16,7 +18,7 @@ use langchain_rust::{
template_fstring, template_fstring,
}; };
use crate::services::{ state::AppState, utils::azure_from_config}; use crate::services::{state::AppState, utils::azure_from_config};
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct ChatRequest { struct ChatRequest {
@ -71,7 +73,7 @@ pub async fn chat(
let response_text = match open_ai.invoke(&prompt).await { let response_text = match open_ai.invoke(&prompt).await {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
eprintln!("Error invoking API: {}", err); error!("Error invoking API: {}", err);
return Err(actix_web::error::ErrorInternalServerError( return Err(actix_web::error::ErrorInternalServerError(
"Failed to invoke OpenAI API", "Failed to invoke OpenAI API",
)); ));

View file

@ -1,10 +1,9 @@
use actix_web::{post, web, HttpRequest, HttpResponse, Result}; use actix_web::{post, web, HttpRequest, HttpResponse, Result};
use dotenv::dotenv; use dotenv::dotenv;
use log::{error, info};
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::env; use std::env;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
// OpenAI-compatible request/response structures // OpenAI-compatible request/response structures
@ -59,7 +58,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
let llm_local = env::var("LLM_LOCAL").unwrap_or_else(|_| "false".to_string()); let llm_local = env::var("LLM_LOCAL").unwrap_or_else(|_| "false".to_string());
if llm_local.to_lowercase() != "true" { if llm_local.to_lowercase() != "true" {
println!(" LLM_LOCAL is not enabled, skipping local server startup"); info!(" LLM_LOCAL is not enabled, skipping local server startup");
return Ok(()); return Ok(());
} }
@ -71,19 +70,19 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
let llm_model_path = env::var("LLM_MODEL_PATH").unwrap_or_else(|_| "".to_string()); let llm_model_path = env::var("LLM_MODEL_PATH").unwrap_or_else(|_| "".to_string());
let embedding_model_path = env::var("EMBEDDING_MODEL_PATH").unwrap_or_else(|_| "".to_string()); let embedding_model_path = env::var("EMBEDDING_MODEL_PATH").unwrap_or_else(|_| "".to_string());
println!("🚀 Starting local llama.cpp servers..."); info!("🚀 Starting local llama.cpp servers...");
println!("📋 Configuration:"); info!("📋 Configuration:");
println!(" LLM URL: {}", llm_url); info!(" LLM URL: {}", llm_url);
println!(" Embedding URL: {}", embedding_url); info!(" Embedding URL: {}", embedding_url);
println!(" LLM Model: {}", llm_model_path); info!(" LLM Model: {}", llm_model_path);
println!(" Embedding Model: {}", embedding_model_path); info!(" Embedding Model: {}", embedding_model_path);
// Check if servers are already running // Check if servers are already running
let llm_running = is_server_running(&llm_url).await; let llm_running = is_server_running(&llm_url).await;
let embedding_running = is_server_running(&embedding_url).await; let embedding_running = is_server_running(&embedding_url).await;
if llm_running && embedding_running { if llm_running && embedding_running {
println!("✅ Both LLM and Embedding servers are already running"); info!("✅ Both LLM and Embedding servers are already running");
return Ok(()); return Ok(());
} }
@ -91,25 +90,25 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
let mut tasks = vec![]; let mut tasks = vec![];
if !llm_running && !llm_model_path.is_empty() { if !llm_running && !llm_model_path.is_empty() {
println!("🔄 Starting LLM server..."); info!("🔄 Starting LLM server...");
tasks.push(tokio::spawn(start_llm_server( tasks.push(tokio::spawn(start_llm_server(
llama_cpp_path.clone(), llama_cpp_path.clone(),
llm_model_path.clone(), llm_model_path.clone(),
llm_url.clone(), llm_url.clone(),
))); )));
} else if llm_model_path.is_empty() { } else if llm_model_path.is_empty() {
println!("⚠️ LLM_MODEL_PATH not set, skipping LLM server"); info!("⚠️ LLM_MODEL_PATH not set, skipping LLM server");
} }
if !embedding_running && !embedding_model_path.is_empty() { if !embedding_running && !embedding_model_path.is_empty() {
println!("🔄 Starting Embedding server..."); info!("🔄 Starting Embedding server...");
tasks.push(tokio::spawn(start_embedding_server( tasks.push(tokio::spawn(start_embedding_server(
llama_cpp_path.clone(), llama_cpp_path.clone(),
embedding_model_path.clone(), embedding_model_path.clone(),
embedding_url.clone(), embedding_url.clone(),
))); )));
} else if embedding_model_path.is_empty() { } else if embedding_model_path.is_empty() {
println!("⚠️ EMBEDDING_MODEL_PATH not set, skipping Embedding server"); info!("⚠️ EMBEDDING_MODEL_PATH not set, skipping Embedding server");
} }
// Wait for all server startup tasks // Wait for all server startup tasks
@ -118,7 +117,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
} }
// Wait for servers to be ready with verbose logging // Wait for servers to be ready with verbose logging
println!("⏳ Waiting for servers to become ready..."); info!("⏳ Waiting for servers to become ready...");
let mut llm_ready = llm_running || llm_model_path.is_empty(); let mut llm_ready = llm_running || llm_model_path.is_empty();
let mut embedding_ready = embedding_running || embedding_model_path.is_empty(); let mut embedding_ready = embedding_running || embedding_model_path.is_empty();
@ -129,7 +128,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
while attempts < max_attempts && (!llm_ready || !embedding_ready) { while attempts < max_attempts && (!llm_ready || !embedding_ready) {
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(2)).await;
println!( info!(
"🔍 Checking server health (attempt {}/{})...", "🔍 Checking server health (attempt {}/{})...",
attempts + 1, attempts + 1,
max_attempts max_attempts
@ -137,26 +136,26 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
if !llm_ready && !llm_model_path.is_empty() { if !llm_ready && !llm_model_path.is_empty() {
if is_server_running(&llm_url).await { if is_server_running(&llm_url).await {
println!(" ✅ LLM server ready at {}", llm_url); info!(" ✅ LLM server ready at {}", llm_url);
llm_ready = true; llm_ready = true;
} else { } else {
println!(" ❌ LLM server not ready yet"); info!(" ❌ LLM server not ready yet");
} }
} }
if !embedding_ready && !embedding_model_path.is_empty() { if !embedding_ready && !embedding_model_path.is_empty() {
if is_server_running(&embedding_url).await { if is_server_running(&embedding_url).await {
println!(" ✅ Embedding server ready at {}", embedding_url); info!(" ✅ Embedding server ready at {}", embedding_url);
embedding_ready = true; embedding_ready = true;
} else { } else {
println!(" ❌ Embedding server not ready yet"); info!(" ❌ Embedding server not ready yet");
} }
} }
attempts += 1; attempts += 1;
if attempts % 10 == 0 { if attempts % 10 == 0 {
println!( info!(
"⏰ Still waiting for servers... (attempt {}/{})", "⏰ Still waiting for servers... (attempt {}/{})",
attempts, max_attempts attempts, max_attempts
); );
@ -164,7 +163,7 @@ pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Er
} }
if llm_ready && embedding_ready { if llm_ready && embedding_ready {
println!("🎉 All llama.cpp servers are ready and responding!"); info!("🎉 All llama.cpp servers are ready and responding!");
Ok(()) Ok(())
} else { } else {
let mut error_msg = "❌ Servers failed to start within timeout:".to_string(); let mut error_msg = "❌ Servers failed to start within timeout:".to_string();
@ -279,7 +278,7 @@ pub async fn chat_completions_local(
.timeout(Duration::from_secs(120)) // 2 minute timeout .timeout(Duration::from_secs(120)) // 2 minute timeout
.build() .build()
.map_err(|e| { .map_err(|e| {
eprintln!("Error creating HTTP client: {}", e); error!("Error creating HTTP client: {}", e);
actix_web::error::ErrorInternalServerError("Failed to create HTTP client") actix_web::error::ErrorInternalServerError("Failed to create HTTP client")
})?; })?;
@ -290,7 +289,7 @@ pub async fn chat_completions_local(
.send() .send()
.await .await
.map_err(|e| { .map_err(|e| {
eprintln!("Error calling llama.cpp server: {}", e); error!("Error calling llama.cpp server: {}", e);
actix_web::error::ErrorInternalServerError("Failed to call llama.cpp server") actix_web::error::ErrorInternalServerError("Failed to call llama.cpp server")
})?; })?;
@ -298,7 +297,7 @@ pub async fn chat_completions_local(
if status.is_success() { if status.is_success() {
let llama_response: LlamaCppResponse = response.json().await.map_err(|e| { let llama_response: LlamaCppResponse = response.json().await.map_err(|e| {
eprintln!("Error parsing llama.cpp response: {}", e); error!("Error parsing llama.cpp response: {}", e);
actix_web::error::ErrorInternalServerError("Failed to parse llama.cpp response") actix_web::error::ErrorInternalServerError("Failed to parse llama.cpp response")
})?; })?;
@ -331,7 +330,7 @@ pub async fn chat_completions_local(
.await .await
.unwrap_or_else(|_| "Unknown error".to_string()); .unwrap_or_else(|_| "Unknown error".to_string());
eprintln!("Llama.cpp server error ({}): {}", status, error_text); error!("Llama.cpp server error ({}): {}", status, error_text);
let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16())
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);
@ -352,7 +351,7 @@ pub struct EmbeddingRequest {
pub input: Vec<String>, pub input: Vec<String>,
pub model: String, pub model: String,
#[serde(default)] #[serde(default)]
pub encoding_format: Option<String>, pub _encoding_format: Option<String>,
} }
// Custom deserializer to handle both string and array inputs // Custom deserializer to handle both string and array inputs
@ -432,7 +431,7 @@ struct LlamaCppEmbeddingRequest {
// FIXED: Handle the stupid nested array format // FIXED: Handle the stupid nested array format
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct LlamaCppEmbeddingResponseItem { struct LlamaCppEmbeddingResponseItem {
pub index: usize, pub _index: usize,
pub embedding: Vec<Vec<f32>>, // This is the fucked up part - embedding is an array of arrays pub embedding: Vec<Vec<f32>>, // This is the fucked up part - embedding is an array of arrays
} }
@ -452,7 +451,7 @@ pub async fn embeddings_local(
.timeout(Duration::from_secs(120)) .timeout(Duration::from_secs(120))
.build() .build()
.map_err(|e| { .map_err(|e| {
eprintln!("Error creating HTTP client: {}", e); error!("Error creating HTTP client: {}", e);
actix_web::error::ErrorInternalServerError("Failed to create HTTP client") actix_web::error::ErrorInternalServerError("Failed to create HTTP client")
})?; })?;
@ -472,7 +471,7 @@ pub async fn embeddings_local(
.send() .send()
.await .await
.map_err(|e| { .map_err(|e| {
eprintln!("Error calling llama.cpp server for embedding: {}", e); error!("Error calling llama.cpp server for embedding: {}", e);
actix_web::error::ErrorInternalServerError( actix_web::error::ErrorInternalServerError(
"Failed to call llama.cpp server for embedding", "Failed to call llama.cpp server for embedding",
) )
@ -483,15 +482,15 @@ pub async fn embeddings_local(
if status.is_success() { if status.is_success() {
// First, get the raw response text for debugging // First, get the raw response text for debugging
let raw_response = response.text().await.map_err(|e| { let raw_response = response.text().await.map_err(|e| {
eprintln!("Error reading response text: {}", e); error!("Error reading response text: {}", e);
actix_web::error::ErrorInternalServerError("Failed to read response") actix_web::error::ErrorInternalServerError("Failed to read response")
})?; })?;
// Parse the response as a vector of items with nested arrays // Parse the response as a vector of items with nested arrays
let llama_response: Vec<LlamaCppEmbeddingResponseItem> = let llama_response: Vec<LlamaCppEmbeddingResponseItem> =
serde_json::from_str(&raw_response).map_err(|e| { serde_json::from_str(&raw_response).map_err(|e| {
eprintln!("Error parsing llama.cpp embedding response: {}", e); error!("Error parsing llama.cpp embedding response: {}", e);
eprintln!("Raw response: {}", raw_response); error!("Raw response: {}", raw_response);
actix_web::error::ErrorInternalServerError( actix_web::error::ErrorInternalServerError(
"Failed to parse llama.cpp embedding response", "Failed to parse llama.cpp embedding response",
) )
@ -517,7 +516,7 @@ pub async fn embeddings_local(
index, index,
}); });
} else { } else {
eprintln!("No embedding data returned for input: {}", input_text); error!("No embedding data returned for input: {}", input_text);
return Ok(HttpResponse::InternalServerError().json(serde_json::json!({ return Ok(HttpResponse::InternalServerError().json(serde_json::json!({
"error": { "error": {
"message": format!("No embedding data returned for input {}", index), "message": format!("No embedding data returned for input {}", index),
@ -531,7 +530,7 @@ pub async fn embeddings_local(
.await .await
.unwrap_or_else(|_| "Unknown error".to_string()); .unwrap_or_else(|_| "Unknown error".to_string());
eprintln!("Llama.cpp server error ({}): {}", status, error_text); error!("Llama.cpp server error ({}): {}", status, error_text);
let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16()) let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16())
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);

View file

@ -1,3 +1,5 @@
use log::info;
use actix_web::{post, web, HttpRequest, HttpResponse, Result}; use actix_web::{post, web, HttpRequest, HttpResponse, Result};
use dotenv::dotenv; use dotenv::dotenv;
use regex::Regex; use regex::Regex;
@ -38,9 +40,9 @@ struct Choice {
async fn chat_completions(body: web::Bytes, _req: HttpRequest) -> Result<HttpResponse> { async fn chat_completions(body: web::Bytes, _req: HttpRequest) -> Result<HttpResponse> {
// Always log raw POST data // Always log raw POST data
if let Ok(body_str) = std::str::from_utf8(&body) { if let Ok(body_str) = std::str::from_utf8(&body) {
println!("POST Data: {}", body_str); info!("POST Data: {}", body_str);
} else { } else {
println!("POST Data (binary): {:?}", body); info!("POST Data (binary): {:?}", body);
} }
dotenv().ok(); dotenv().ok();
@ -72,7 +74,7 @@ async fn chat_completions(body: web::Bytes, _req: HttpRequest) -> Result<HttpRes
); );
let body_str = std::str::from_utf8(&body).unwrap_or(""); let body_str = std::str::from_utf8(&body).unwrap_or("");
println!("Original POST Data: {}", body_str); info!("Original POST Data: {}", body_str);
// Remove the problematic params // Remove the problematic params
let re = let re =
@ -80,7 +82,7 @@ async fn chat_completions(body: web::Bytes, _req: HttpRequest) -> Result<HttpRes
let cleaned = re.replace_all(body_str, ""); let cleaned = re.replace_all(body_str, "");
let cleaned_body = web::Bytes::from(cleaned.to_string()); let cleaned_body = web::Bytes::from(cleaned.to_string());
println!("Cleaned POST Data: {}", cleaned); info!("Cleaned POST Data: {}", cleaned);
// Send request to Azure // Send request to Azure
let client = Client::new(); let client = Client::new();
@ -100,7 +102,7 @@ async fn chat_completions(body: web::Bytes, _req: HttpRequest) -> Result<HttpRes
.map_err(actix_web::error::ErrorInternalServerError)?; .map_err(actix_web::error::ErrorInternalServerError)?;
// Log the raw response // Log the raw response
println!("Raw Azure response: {}", raw_response); info!("Raw Azure response: {}", raw_response);
if status.is_success() { if status.is_success() {
Ok(HttpResponse::Ok().body(raw_response)) Ok(HttpResponse::Ok().body(raw_response))

View file

@ -12,6 +12,7 @@ use crate::services::keywords::set::set_keyword;
use crate::services::keywords::set_schedule::set_schedule_keyword; use crate::services::keywords::set_schedule::set_schedule_keyword;
use crate::services::keywords::wait::wait_keyword; use crate::services::keywords::wait::wait_keyword;
use crate::services::state::AppState; use crate::services::state::AppState;
use log::info;
use rhai::{Dynamic, Engine, EvalAltResult}; use rhai::{Dynamic, Engine, EvalAltResult};
pub struct ScriptService { pub struct ScriptService {
@ -134,7 +135,7 @@ impl ScriptService {
/// Preprocesses BASIC-style script to handle semicolon-free syntax /// Preprocesses BASIC-style script to handle semicolon-free syntax
pub fn compile(&self, script: &str) -> Result<rhai::AST, Box<EvalAltResult>> { pub fn compile(&self, script: &str) -> Result<rhai::AST, Box<EvalAltResult>> {
let processed_script = self.preprocess_basic_script(script); let processed_script = self.preprocess_basic_script(script);
println!("Processed Script:\n{}", processed_script); info!("Processed Script:\n{}", processed_script);
match self.engine.compile(&processed_script) { match self.engine.compile(&processed_script) {
Ok(ast) => Ok(ast), Ok(ast) => Ok(ast),
Err(parse_error) => Err(Box::new(EvalAltResult::from(parse_error))), Err(parse_error) => Err(Box::new(EvalAltResult::from(parse_error))),

View file

@ -1,6 +1,7 @@
use crate::services::config::AIConfig; use crate::services::config::AIConfig;
use langchain_rust::llm::OpenAI; use langchain_rust::llm::OpenAI;
use langchain_rust::{language_models::llm::LLM, llm::AzureConfig}; use langchain_rust::{language_models::llm::LLM, llm::AzureConfig};
use log::error;
use log::{debug, warn}; use log::{debug, warn};
use rhai::{Array, Dynamic}; use rhai::{Array, Dynamic};
use serde_json::{json, Value}; use serde_json::{json, Value};
@ -22,7 +23,7 @@ use tokio::io::AsyncWriteExt;
pub fn azure_from_config(config: &AIConfig) -> AzureConfig { pub fn azure_from_config(config: &AIConfig) -> AzureConfig {
AzureConfig::new() AzureConfig::new()
.with_api_base(&config.endpoint) .with_api_base(&config.endpoint)
.with_api_key(&config.key) .with_api_key(&config.key)
.with_api_version(&config.version) .with_api_version(&config.version)
.with_deployment_id(&config.instance) .with_deployment_id(&config.instance)
@ -42,7 +43,7 @@ pub async fn call_llm(
match open_ai.invoke(&prompt).await { match open_ai.invoke(&prompt).await {
Ok(response_text) => Ok(response_text), Ok(response_text) => Ok(response_text),
Err(err) => { Err(err) => {
eprintln!("Error invoking LLM API: {}", err); error!("Error invoking LLM API: {}", err);
Err(Box::new(std::io::Error::new( Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"Failed to invoke LLM API", "Failed to invoke LLM API",

View file

@ -1,5 +1,6 @@
// wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb // wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb
// sudo dpkg -i google-chrome-stable_current_amd64.deb // sudo dpkg -i google-chrome-stable_current_amd64.deb
use log::info;
use crate::services::utils; use crate::services::utils;
@ -93,29 +94,26 @@ impl BrowserSetup {
Err("Brave browser not found. Please install Brave first.".into()) Err("Brave browser not found. Please install Brave first.".into())
} }
async fn setup_chromedriver() -> Result<String, Box<dyn std::error::Error>> { async fn setup_chromedriver() -> Result<String, Box<dyn std::error::Error>> {
// Create chromedriver directory in executable's parent directory // Create chromedriver directory in executable's parent directory
let mut chromedriver_dir = env::current_exe()? let mut chromedriver_dir = env::current_exe()?.parent().unwrap().to_path_buf();
.parent() chromedriver_dir.push("chromedriver");
.unwrap()
.to_path_buf();
chromedriver_dir.push("chromedriver");
// Ensure the directory exists
if !chromedriver_dir.exists() {
fs::create_dir(&chromedriver_dir).await?;
}
// Determine the final chromedriver path // Ensure the directory exists
let chromedriver_path = if cfg!(target_os = "windows") { if !chromedriver_dir.exists() {
chromedriver_dir.join("chromedriver.exe") fs::create_dir(&chromedriver_dir).await?;
} else { }
chromedriver_dir.join("chromedriver")
};
// Check if chromedriver exists // Determine the final chromedriver path
if fs::metadata(&chromedriver_path).await.is_err() { let chromedriver_path = if cfg!(target_os = "windows") {
let (download_url, platform) = match (cfg!(target_os = "windows"), cfg!(target_arch = "x86_64")) { chromedriver_dir.join("chromedriver.exe")
} else {
chromedriver_dir.join("chromedriver")
};
// Check if chromedriver exists
if fs::metadata(&chromedriver_path).await.is_err() {
let (download_url, platform) = match (cfg!(target_os = "windows"), cfg!(target_arch = "x86_64")) {
(true, true) => ( (true, true) => (
"https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.183/win64/chromedriver-win64.zip", "https://storage.googleapis.com/chrome-for-testing-public/138.0.7204.183/win64/chromedriver-win64.zip",
"win64", "win64",
@ -138,69 +136,69 @@ async fn setup_chromedriver() -> Result<String, Box<dyn std::error::Error>> {
), ),
_ => return Err("Unsupported platform".into()), _ => return Err("Unsupported platform".into()),
}; };
let mut zip_path = std::env::temp_dir();
zip_path.push("chromedriver.zip");
println!("Downloading chromedriver for {}...", platform);
// Download the zip file let mut zip_path = std::env::temp_dir();
utils::download_file(download_url, &zip_path.to_str().unwrap()).await?; zip_path.push("chromedriver.zip");
info!("Downloading chromedriver for {}...", platform);
// Extract the zip to a temporary directory first // Download the zip file
let mut temp_extract_dir = std::env::temp_dir(); utils::download_file(download_url, &zip_path.to_str().unwrap()).await?;
temp_extract_dir.push("chromedriver_extract");
let temp_extract_dir1 = temp_extract_dir.clone();
// Clean up any previous extraction
let _ = fs::remove_dir_all(&temp_extract_dir).await;
fs::create_dir(&temp_extract_dir).await?;
utils::extract_zip_recursive(&zip_path, &temp_extract_dir)?; // Extract the zip to a temporary directory first
let mut temp_extract_dir = std::env::temp_dir();
temp_extract_dir.push("chromedriver_extract");
let temp_extract_dir1 = temp_extract_dir.clone();
// Chrome for Testing zips contain a platform-specific directory // Clean up any previous extraction
// Find the chromedriver binary in the extracted structure let _ = fs::remove_dir_all(&temp_extract_dir).await;
let mut extracted_binary_path = temp_extract_dir; fs::create_dir(&temp_extract_dir).await?;
extracted_binary_path.push(format!("chromedriver-{}", platform));
extracted_binary_path.push(if cfg!(target_os = "windows") {
"chromedriver.exe"
} else {
"chromedriver"
});
// Try to move the file, fall back to copy if cross-device utils::extract_zip_recursive(&zip_path, &temp_extract_dir)?;
match fs::rename(&extracted_binary_path, &chromedriver_path).await {
Ok(_) => (), // Chrome for Testing zips contain a platform-specific directory
Err(e) if e.kind() == std::io::ErrorKind::CrossesDevices => { // Find the chromedriver binary in the extracted structure
// Cross-device move failed, use copy instead let mut extracted_binary_path = temp_extract_dir;
fs::copy(&extracted_binary_path, &chromedriver_path).await?; extracted_binary_path.push(format!("chromedriver-{}", platform));
// Set permissions on the copied file extracted_binary_path.push(if cfg!(target_os = "windows") {
#[cfg(unix)] "chromedriver.exe"
{ } else {
use std::os::unix::fs::PermissionsExt; "chromedriver"
let mut perms = fs::metadata(&chromedriver_path).await?.permissions(); });
perms.set_mode(0o755);
fs::set_permissions(&chromedriver_path, perms).await?; // Try to move the file, fall back to copy if cross-device
match fs::rename(&extracted_binary_path, &chromedriver_path).await {
Ok(_) => (),
Err(e) if e.kind() == std::io::ErrorKind::CrossesDevices => {
// Cross-device move failed, use copy instead
fs::copy(&extracted_binary_path, &chromedriver_path).await?;
// Set permissions on the copied file
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&chromedriver_path).await?.permissions();
perms.set_mode(0o755);
fs::set_permissions(&chromedriver_path, perms).await?;
}
} }
}, Err(e) => return Err(e.into()),
Err(e) => return Err(e.into()), }
// Clean up
let _ = fs::remove_file(&zip_path).await;
let _ = fs::remove_dir_all(temp_extract_dir1).await;
// Set executable permissions (if not already set during copy)
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&chromedriver_path).await?.permissions();
perms.set_mode(0o755);
fs::set_permissions(&chromedriver_path, perms).await?;
}
} }
// Clean up Ok(chromedriver_path.to_string_lossy().to_string())
let _ = fs::remove_file(&zip_path).await;
let _ = fs::remove_dir_all(temp_extract_dir1).await;
// Set executable permissions (if not already set during copy)
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&chromedriver_path).await?.permissions();
perms.set_mode(0o755);
fs::set_permissions(&chromedriver_path, perms).await?;
}
} }
Ok(chromedriver_path.to_string_lossy().to_string())
}
} }
// Modified BrowserPool initialization // Modified BrowserPool initialization

View file

@ -16,7 +16,6 @@ use std::str::FromStr;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
#[tokio::test] #[tokio::test]
async fn test_successful_file_listing() -> Result<(), Box<dyn std::error::Error>> { async fn test_successful_file_listing() -> Result<(), Box<dyn std::error::Error>> {
@ -61,7 +60,7 @@ async fn test_successful_file_listing() -> Result<(), Box<dyn std::error::Error>
let app_state = web::Data::new(AppState { let app_state = web::Data::new(AppState {
minio_client: Some(minio_client.clone()), minio_client: Some(minio_client.clone()),
config: None, config: None,
db_pool: None db_pool: None,
}); });
let app = test::init_service(App::new().app_data(app_state.clone()).service(list_file)).await; let app = test::init_service(App::new().app_data(app_state.clone()).service(list_file)).await;
@ -97,10 +96,10 @@ async fn test_successful_file_listing() -> Result<(), Box<dyn std::error::Error>
match result { match result {
Ok(resp) => { Ok(resp) => {
for item in resp.contents { for item in resp.contents {
println!("{:?}", item); info!("{:?}", item);
} }
} }
Err(e) => println!("Error: {:?}", e), Err(e) => info!("Error: {:?}", e),
} }
} }