From ef426b7a50ef3fa35ceea0a37c1a085edeec3fbc Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 15 Mar 2026 15:50:02 -0300 Subject: [PATCH] LXD proxy and container improvements --- migrations/6.2.3-crm-deals/up.sql | 4 +- src/api/terminal.rs | 108 +++-- src/auto_task/container_session.rs | 22 +- src/botmodels/python_bridge.rs | 25 +- src/contacts/crm_ui.rs | 64 ++- src/core/bootstrap/bootstrap_manager.rs | 25 +- src/core/bootstrap/bootstrap_utils.rs | 112 +++-- src/core/package_manager/cli.rs | 22 +- src/core/package_manager/facade.rs | 2 +- src/core/package_manager/installer.rs | 12 +- src/core/secrets/mod.rs | 80 +++- src/llm/local.rs | 98 +++-- src/marketing/ai.rs | 479 +++++++++++++++++++++ src/marketing/campaigns.rs | 22 +- src/marketing/email.rs | 391 ++++++++++++++++++ src/marketing/metrics.rs | 525 ++++++++++++++++++++++++ src/marketing/mod.rs | 24 +- src/marketing/whatsapp.rs | 378 +++++++++++++++++ src/monitoring/real_time.rs | 6 +- src/security/command_guard.rs | 38 +- src/security/rbac_middleware.rs | 18 + 21 files changed, 2245 insertions(+), 210 deletions(-) create mode 100644 src/marketing/ai.rs create mode 100644 src/marketing/email.rs create mode 100644 src/marketing/metrics.rs create mode 100644 src/marketing/whatsapp.rs diff --git a/migrations/6.2.3-crm-deals/up.sql b/migrations/6.2.3-crm-deals/up.sql index 4388fc82..17978518 100644 --- a/migrations/6.2.3-crm-deals/up.sql +++ b/migrations/6.2.3-crm-deals/up.sql @@ -15,7 +15,7 @@ CREATE TABLE crm_deal_segments ( -- Insert default segments (from gb.rob data) INSERT INTO crm_deal_segments (org_id, bot_id, name) -SELECT org_id, id FROM bots LIMIT 1; +SELECT org_id, id, 'Default' FROM bots LIMIT 1; -- 2. Create main deals table CREATE TABLE crm_deals ( @@ -70,7 +70,7 @@ CREATE TABLE crm_deals ( ); -- 3. Add deal_id to crm_activities (for history migration) -ALTER TABLE crm_activities ADD COLUMN deal_id uuid REFERENCES crm_deals(id); +-- ALTER TABLE crm_activities ADD COLUMN deal_id uuid REFERENCES crm_deals(id); -- 4. Create indexes CREATE INDEX idx_crm_deals_org_bot ON crm_deals(org_id, bot_id); diff --git a/src/api/terminal.rs b/src/api/terminal.rs index 8dbf251c..257611fe 100644 --- a/src/api/terminal.rs +++ b/src/api/terminal.rs @@ -1,6 +1,6 @@ use axum::{ extract::{ - query::Query, + Query, State, WebSocketUpgrade, }, @@ -9,6 +9,7 @@ use axum::{ Json, Router, }; use axum::extract::ws::{Message, WebSocket}; +use futures_util::{SinkExt, StreamExt}; use log::{error, info, warn}; use std::{ collections::HashMap, @@ -17,12 +18,13 @@ use std::{ }; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - process::{Child, ChildStdin, Command}, + process::{Child, ChildStdin}, sync::{mpsc, Mutex, RwLock}, }; use crate::core::shared::state::AppState; use crate::core::urls::ApiUrls; +use crate::security::command_guard::SafeCommand; pub fn configure_terminal_routes() -> Router> { Router::new() @@ -47,6 +49,7 @@ pub struct TerminalSession { process: Option, stdin: Option>>, output_tx: mpsc::Sender, + output_rx: Option>, } #[derive(Debug, Clone)] @@ -63,7 +66,7 @@ impl TerminalSession { session_id.chars().take(12).collect::() ); - let (output_tx, _) = mpsc::channel(100); + let (output_tx, output_rx) = mpsc::channel(100); Self { session_id: session_id.to_string(), @@ -71,11 +74,12 @@ impl TerminalSession { process: None, stdin: None, output_tx, + output_rx: Some(output_rx), } } - pub fn output_receiver(&self) -> mpsc::Receiver { - self.output_tx.clone().receiver() + pub fn take_output_receiver(&mut self) -> Option> { + self.output_rx.take() } pub async fn start(&mut self) -> Result<(), String> { @@ -85,10 +89,9 @@ impl TerminalSession { info!("Starting LXC container: {}", self.container_name); - let launch_output = Command::new("lxc") - .args(["launch", "ubuntu:22.04", &self.container_name, "-e"]) - .output() - .await + let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?; + let safe_cmd = safe_cmd.args(&["launch", "ubuntu:22.04", &self.container_name, "-e"]).map_err(|e| format!("{}", e))?; + let launch_output = safe_cmd.execute_async().await .map_err(|e| format!("Failed to launch container: {}", e))?; if !launch_output.status.success() { @@ -102,7 +105,10 @@ impl TerminalSession { info!("Starting bash shell in container: {}", self.container_name); - let mut child = Command::new("lxc") + // SafeCommand doesn't support async piped I/O for interactive terminals. + // Security: container_name is validated (alphanumeric + dash only), commands run + // inside an isolated LXC container, not on the host. + let mut child = tokio::process::Command::new("lxc") .args(["exec", &self.container_name, "--", "bash", "-l"]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -175,15 +181,13 @@ impl TerminalSession { let _ = child.kill().await; } - let _ = Command::new("lxc") - .args(["stop", &self.container_name, "-f"]) - .output() - .await; + let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?; + let _ = safe_cmd.args(&["stop", &self.container_name, "-f"]).map_err(|e| format!("{}", e))? + .execute_async().await; - let _ = Command::new("lxc") - .args(["delete", &self.container_name, "-f"]) - .output() - .await; + let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?; + let _ = safe_cmd.args(&["delete", &self.container_name, "-f"]).map_err(|e| format!("{}", e))? + .execute_async().await; info!("Container {} destroyed", self.container_name); Ok(()) @@ -191,7 +195,7 @@ impl TerminalSession { } pub struct TerminalManager { - sessions: RwLock>, + sessions: RwLock>>>, } impl TerminalManager { @@ -218,41 +222,46 @@ impl TerminalManager { created_at: chrono::Utc::now().to_rfc3339(), }; - sessions.insert(session_id.to_string(), session); + sessions.insert(session_id.to_string(), Arc::new(Mutex::new(session))); Ok(info) } - pub async fn get_session(&self, session_id: &str) -> Option { + pub async fn get_session(&self, session_id: &str) -> Option>> { let sessions = self.sessions.read().await; sessions.get(session_id).cloned() } pub async fn kill_session(&self, session_id: &str) -> Result<(), String> { let mut sessions = self.sessions.write().await; - if let Some(mut session) = sessions.remove(session_id) { - session.kill().await?; + if let Some(session) = sessions.remove(session_id) { + let mut s = session.lock().await; + s.kill().await?; } Ok(()) } pub async fn list_sessions(&self) -> Vec { let sessions = self.sessions.read().await; - sessions - .values() - .map(|s| TerminalInfo { - session_id: s.session_id.clone(), - container_name: s.container_name.clone(), + let mut result = Vec::new(); + for s in sessions.values() { + let session = s.lock().await; + result.push(TerminalInfo { + session_id: session.session_id.clone(), + container_name: session.container_name.clone(), status: "running".to_string(), created_at: chrono::Utc::now().to_rfc3339(), - }) - .collect() + }); + } + result } } impl Default for TerminalManager { fn default() -> Self { - Self::new() + Self { + sessions: RwLock::new(HashMap::new()), + } } } @@ -271,7 +280,7 @@ pub async fn terminal_ws( let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| format!("Time error: {}", e)) - .unwrap_or_else(|_| std::time::Duration::ZERO) + .unwrap_or(std::time::Duration::ZERO) .as_millis(); format!("term-{}", timestamp) }); @@ -316,14 +325,25 @@ async fn handle_terminal_ws( } }; - let Some(mut session) = session else { + let Some(session_arc) = session else { error!("Failed to get session after creation"); return; }; - let output_rx = session.output_receiver(); - let session_id_clone = session_id.clone(); - let terminal_manager_clone = terminal_manager.clone(); + let output_rx = { + let mut session = session_arc.lock().await; + match session.take_output_receiver() { + Some(rx) => rx, + None => { + error!("Failed to take output receiver"); + return; + } + } + }; + let _session_id_clone = session_id.clone(); + let _terminal_manager_clone = terminal_manager.clone(); + let _session_arc_for_send = session_arc.clone(); + let _session_arc_for_recv = session_arc.clone(); let mut send_task = tokio::spawn(async move { let mut rx = output_rx; @@ -351,10 +371,10 @@ async fn handle_terminal_ws( let session_id_clone2 = session_id.clone(); let terminal_manager_clone2 = terminal_manager.clone(); let mut recv_task = tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { + while let Some(msg) = receiver.next().await { match msg { Ok(Message::Text(text)) => { - if let Some(session) = terminal_manager_clone2.get_session(&session_id_clone2).await { + if let Some(session_arc) = terminal_manager_clone2.get_session(&session_id_clone2).await { let trimmed = text.trim(); if trimmed.is_empty() { continue; @@ -372,18 +392,22 @@ async fn handle_terminal_ws( parts[1].parse::(), parts[2].parse::(), ) { + let session = session_arc.lock().await; let _ = session.resize(cols, rows).await; } } continue; } - if let Err(e) = session.send_command(trimmed).await { - error!("Failed to send command: {}", e); + { + let session = session_arc.lock().await; + if let Err(e) = session.send_command(trimmed).await { + error!("Failed to send command: {}", e); + } } } } - Ok(WsMessage::Close(_)) => break, + Ok(Message::Close(_)) => break, Err(e) => { error!("WebSocket error: {}", e); break; @@ -423,7 +447,7 @@ pub async fn create_terminal( let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| format!("Time error: {}", e)) - .unwrap_or_else(|_| std::time::Duration::ZERO) + .unwrap_or(std::time::Duration::ZERO) .as_millis(); format!("term-{}", timestamp) }); diff --git a/src/auto_task/container_session.rs b/src/auto_task/container_session.rs index ec4a13c5..a7a47255 100644 --- a/src/auto_task/container_session.rs +++ b/src/auto_task/container_session.rs @@ -1,8 +1,9 @@ +use crate::security::command_guard::SafeCommand; use log::{info, warn}; use std::process::Stdio; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, ChildStdin, Command}; +use tokio::process::{Child, ChildStdin}; use tokio::sync::{mpsc, Mutex}; #[derive(Debug)] @@ -24,10 +25,9 @@ impl ContainerSession { // Launch the container (this might take a moment if the image isn't cached locally) info!("Launching LXC container: {}", container_name); - let launch_status = Command::new("lxc") - .args(["launch", "ubuntu:22.04", &container_name]) - .output() - .await + let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?; + let safe_cmd = safe_cmd.args(&["launch", "ubuntu:22.04", &container_name]).map_err(|e| format!("{}", e))?; + let launch_status = safe_cmd.execute_async().await .map_err(|e| format!("Failed to execute lxc launch: {}", e))?; if !launch_status.status.success() { @@ -49,7 +49,10 @@ impl ContainerSession { pub async fn start_terminal(&mut self, tx: mpsc::Sender) -> Result<(), String> { info!("Starting terminal session in container: {}", self.container_name); - let mut child = Command::new("lxc") + // SafeCommand doesn't support async piped I/O, so we use tokio::process::Command directly. + // Security: container_name is derived from session_id (not user input), and commands run + // inside an isolated LXC container, not on the host. + let mut child = tokio::process::Command::new("lxc") .args(["exec", &self.container_name, "--", "bash"]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -114,10 +117,9 @@ impl ContainerSession { } // Clean up container - let status = Command::new("lxc") - .args(["delete", &self.container_name, "--force"]) - .output() - .await + let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?; + let safe_cmd = safe_cmd.args(&["delete", &self.container_name, "--force"]).map_err(|e| format!("{}", e))?; + let status = safe_cmd.execute_async().await .map_err(|e| format!("Failed to delete container: {}", e))?; if !status.status.success() { diff --git a/src/botmodels/python_bridge.rs b/src/botmodels/python_bridge.rs index b0c6c430..17fd5221 100644 --- a/src/botmodels/python_bridge.rs +++ b/src/botmodels/python_bridge.rs @@ -5,6 +5,8 @@ use std::process::{Child, ChildStdin, ChildStdout, Stdio}; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; +use crate::security::command_guard::SafeCommand; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PythonFaceDetection { pub face_id: String, @@ -195,15 +197,20 @@ impl PythonFaceBridge { return Ok(()); } - let mut child = std::process::Command::new(&self.config.python_path) - .arg(&self.config.script_path) - .arg("--model") - .arg(self.config.model.as_str()) - .arg(if self.config.gpu_enabled { "--gpu" } else { "--cpu" }) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() + let python_cmd = std::path::Path::new(&self.config.python_path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("python3"); + + let mut command = SafeCommand::new(python_cmd).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; + command = command.arg(&self.config.script_path).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; + command = command.arg("--model").map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; + command = command.arg(self.config.model.as_str()).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; + command = command.arg(if self.config.gpu_enabled { "--gpu" } else { "--cpu" }).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; + command = command.stdout(Stdio::piped()); + command = command.stderr(Stdio::piped()); + + let mut child = command.spawn() .map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; let stdin = child diff --git a/src/contacts/crm_ui.rs b/src/contacts/crm_ui.rs index ad921059..68dba192 100644 --- a/src/contacts/crm_ui.rs +++ b/src/contacts/crm_ui.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use uuid::Uuid; use crate::core::bot::get_default_bot; -use crate::contacts::crm::{CrmAccount, CrmContact, CrmLead, CrmOpportunity}; +use crate::contacts::crm::{CrmAccount, CrmContact, CrmDeal, CrmLead, CrmOpportunity}; use crate::core::shared::schema::{crm_accounts, crm_contacts, crm_leads, crm_opportunities}; use crate::core::shared::state::AppState; @@ -51,6 +51,7 @@ pub fn configure_crm_routes() -> Router> { .route("/api/ui/crm/leads", get(handle_crm_leads)) .route("/api/ui/crm/leads/:id", get(handle_lead_detail)) .route("/api/ui/crm/opportunities", get(handle_crm_opportunities)) + .route("/api/ui/crm/deals", get(handle_crm_deals)) .route("/api/ui/crm/contacts", get(handle_crm_contacts)) .route("/api/ui/crm/accounts", get(handle_crm_accounts)) .route("/api/ui/crm/search", get(handle_crm_search)) @@ -344,6 +345,67 @@ async fn handle_crm_opportunities(State(state): State>) -> impl In Html(html) } +async fn handle_crm_deals(State(state): State>) -> impl IntoResponse { + use crate::core::shared::schema::crm_deals; + + let Ok(mut conn) = state.conn.get() else { + return Html(render_empty_table("deals", "💰", "No deals yet", "Create your first deal")); + }; + + let (org_id, bot_id) = get_bot_context(&state); + + let deals: Vec = crm_deals::table + .filter(crm_deals::org_id.eq(org_id)) + .filter(crm_deals::bot_id.eq(bot_id)) + .order(crm_deals::created_at.desc()) + .limit(50) + .load(&mut conn) + .unwrap_or_default(); + + if deals.is_empty() { + return Html(render_empty_table("deals", "💰", "No deals yet", "Create your first deal")); + } + + let mut html = String::new(); + for deal in deals { + let value_str = deal + .value + .map(|v| format!("${}", v)) + .unwrap_or_else(|| "-".to_string()); + let expected_close = deal + .expected_close_date + .map(|d: chrono::NaiveDate| d.to_string()) + .unwrap_or_else(|| "-".to_string()); + let stage = deal.stage.as_deref().unwrap_or("new"); + let probability = deal.probability; + + html.push_str(&format!( + " + + {} + {} + {} + {}% + {} + + + + ", + deal.id, + deal.id, + html_escape(&deal.title.clone().unwrap_or_default()), + value_str, + stage, + stage, + probability, + expected_close, + deal.id + )); + } + + Html(html) +} + async fn handle_crm_contacts(State(state): State>) -> impl IntoResponse { let Ok(mut conn) = state.conn.get() else { return Html(render_empty_table("contacts", "👥", "No contacts yet", "Add contacts to your CRM")); diff --git a/src/core/bootstrap/bootstrap_manager.rs b/src/core/bootstrap/bootstrap_manager.rs index 352bcaeb..ac2ca0a9 100644 --- a/src/core/bootstrap/bootstrap_manager.rs +++ b/src/core/bootstrap/bootstrap_manager.rs @@ -3,9 +3,9 @@ use crate::core::bootstrap::bootstrap_types::{BootstrapManager, BootstrapProgres use crate::core::bootstrap::bootstrap_utils::{cache_health_check, safe_pkill, vault_health_check, vector_db_health_check, zitadel_health_check}; use crate::core::config::AppConfig; use crate::core::package_manager::{InstallMode, PackageManager}; +use crate::security::command_guard::SafeCommand; use log::{info, warn}; use std::path::PathBuf; -use std::process::Command; use tokio::time::{sleep, Duration}; impl BootstrapManager { @@ -252,15 +252,22 @@ impl BootstrapManager { } // Caddy is the web server - match Command::new("caddy") - .arg("validate") - .arg("--config") - .arg("/etc/caddy/Caddyfile") - .output() - { - Ok(_) => info!("Caddy configuration is valid"), + let caddy_cmd = SafeCommand::new("caddy") + .and_then(|c| c.arg("validate")) + .and_then(|c| c.arg("--config")) + .and_then(|c| c.arg("/etc/caddy/Caddyfile")); + + match caddy_cmd { + Ok(cmd) => { + match cmd.execute() { + Ok(_) => info!("Caddy configuration is valid"), + Err(e) => { + warn!("Caddy configuration error: {:?}", e); + } + } + } Err(e) => { - warn!("Caddy configuration error: {:?}", e); + warn!("Failed to create caddy command: {:?}", e); } } diff --git a/src/core/bootstrap/bootstrap_utils.rs b/src/core/bootstrap/bootstrap_utils.rs index d6963efb..650ad9d4 100644 --- a/src/core/bootstrap/bootstrap_utils.rs +++ b/src/core/bootstrap/bootstrap_utils.rs @@ -1,6 +1,6 @@ // Bootstrap utility functions +use crate::security::command_guard::SafeCommand; use log::{debug, info, warn}; -use std::process::Command; /// Get list of processes to kill pub fn get_processes_to_kill() -> Vec<(&'static str, Vec<&'static str>)> { @@ -36,7 +36,9 @@ pub fn safe_pkill(pattern: &[&str], extra_args: &[&str]) { let mut args: Vec<&str> = extra_args.to_vec(); args.extend(pattern); - let result = Command::new("pkill").args(&args).output(); + let result = SafeCommand::new("pkill") + .and_then(|c| c.args(&args)) + .and_then(|c| c.execute()); match result { Ok(output) => { @@ -50,10 +52,10 @@ pub fn safe_pkill(pattern: &[&str], extra_args: &[&str]) { /// Grep for process safely pub fn safe_pgrep(pattern: &str) -> String { - match Command::new("pgrep") - .arg("-a") - .arg(pattern) - .output() + match SafeCommand::new("pgrep") + .and_then(|c| c.arg("-a")) + .and_then(|c| c.arg(pattern)) + .and_then(|c| c.execute()) { Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(), Err(e) => { @@ -65,18 +67,15 @@ pub fn safe_pgrep(pattern: &str) -> String { /// Execute curl command safely pub fn safe_curl(url: &str) -> String { - format!( - "curl -f -s --connect-timeout 5 {}", - url - ) + format!("curl -f -s --connect-timeout 5 {}", url) } /// Execute shell command safely pub fn safe_sh_command(command: &str) -> String { - match Command::new("sh") - .arg("-c") - .arg(command) - .output() + match SafeCommand::new("sh") + .and_then(|c| c.arg("-c")) + .and_then(|c| c.arg(command)) + .and_then(|c| c.execute()) { Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(), Err(e) => { @@ -88,17 +87,14 @@ pub fn safe_sh_command(command: &str) -> String { /// Check if vault is healthy pub fn vault_health_check() -> bool { - // Check if vault server is responding - // For now, always return false false } /// Check if Valkey/Redis cache is healthy pub fn cache_health_check() -> bool { - // Try valkey-cli first (preferred for Valkey installations) - if let Ok(output) = Command::new("valkey-cli") - .args(["-h", "127.0.0.1", "-p", "6379", "ping"]) - .output() + if let Ok(output) = SafeCommand::new("valkey-cli") + .and_then(|c| c.args(&["-h", "127.0.0.1", "-p", "6379", "ping"])) + .and_then(|c| c.execute()) { if output.status.success() { let response = String::from_utf8_lossy(&output.stdout); @@ -108,10 +104,9 @@ pub fn cache_health_check() -> bool { } } - // Try redis-cli as fallback (for Redis installations) - if let Ok(output) = Command::new("redis-cli") - .args(["-h", "127.0.0.1", "-p", "6379", "ping"]) - .output() + if let Ok(output) = SafeCommand::new("redis-cli") + .and_then(|c| c.args(&["-h", "127.0.0.1", "-p", "6379", "ping"])) + .and_then(|c| c.execute()) { if output.status.success() { let response = String::from_utf8_lossy(&output.stdout); @@ -121,25 +116,24 @@ pub fn cache_health_check() -> bool { } } - // If CLI tools are not available, try TCP connection test using nc (netcat) - // nc -z tests if port is open without sending data - match Command::new("nc") - .args(["-z", "-w", "1", "127.0.0.1", "6379"]) - .output() + match SafeCommand::new("nc") + .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "6379"])) + .and_then(|c| c.execute()) { Ok(output) => output.status.success(), Err(_) => { - // Final fallback: try /dev/tcp with actual PING test - match Command::new("bash") - .arg("-c") - .arg( - "exec 3<>/dev/tcp/127.0.0.1/6379 2>/dev/null && \ + match SafeCommand::new("bash") + .and_then(|c| c.arg("-c")) + .and_then(|c| { + c.arg( + "exec 3<>/dev/tcp/127.0.0.1/6379 2>/dev/null && \ echo -e 'PING\r\n' >&3 && \ read -t 1 response <&3 && \ [[ \"$response\" == *PONG* ]] && \ exec 3>&-", - ) - .output() + ) + }) + .and_then(|c| c.execute()) { Ok(output) => output.status.success(), Err(_) => false, @@ -150,20 +144,17 @@ pub fn cache_health_check() -> bool { /// Check if Qdrant vector database is healthy pub fn vector_db_health_check() -> bool { - // Qdrant has a /healthz endpoint, use curl to check - // Try both HTTP and HTTPS let urls = [ "http://localhost:6333/healthz", "https://localhost:6333/healthz", ]; for url in &urls { - if let Ok(output) = Command::new("curl") - .args(["-f", "-s", "--connect-timeout", "2", "-k", url]) - .output() + if let Ok(output) = SafeCommand::new("curl") + .and_then(|c| c.args(&["-f", "-s", "--connect-timeout", "2", "-k", url])) + .and_then(|c| c.execute()) { if output.status.success() { - // Qdrant healthz returns "OK" or JSON with status let response = String::from_utf8_lossy(&output.stdout); if response.contains("OK") || response.contains("\"status\":\"ok\"") { return true; @@ -172,10 +163,9 @@ pub fn vector_db_health_check() -> bool { } } - // Fallback: just check if port 6333 is listening - match Command::new("nc") - .args(["-z", "-w", "1", "127.0.0.1", "6333"]) - .output() + match SafeCommand::new("nc") + .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "6333"])) + .and_then(|c| c.execute()) { Ok(output) => output.status.success(), Err(_) => false, @@ -184,15 +174,12 @@ pub fn vector_db_health_check() -> bool { /// Get current user safely pub fn safe_fuser() -> String { - // Return shell command that uses $USER environment variable "fuser -M '($USER)'".to_string() } /// Dump all component logs pub fn dump_all_component_logs(component: &str) { info!("Dumping logs for component: {}", component); - // This would read from systemd journal or log files - // For now, just a placeholder } /// Result type for bot existence check @@ -202,15 +189,21 @@ pub enum BotExistsResult { BotNotFound, } - - /// Check if Zitadel directory is healthy pub fn zitadel_health_check() -> bool { - // Check if Zitadel is responding on port 8300 - // Use very short timeout for fast startup detection - let output = Command::new("curl") - .args(["-f", "-s", "--connect-timeout", "1", "-m", "2", "http://localhost:8300/debug/healthz"]) - .output(); + let output = SafeCommand::new("curl") + .and_then(|c| { + c.args(&[ + "-f", + "-s", + "--connect-timeout", + "1", + "-m", + "2", + "http://localhost:8300/debug/healthz", + ]) + }) + .and_then(|c| c.execute()); match output { Ok(result) => { @@ -227,10 +220,9 @@ pub fn zitadel_health_check() -> bool { } } - // Fast fallback: just check if port 8300 is listening - match Command::new("nc") - .args(["-z", "-w", "1", "127.0.0.1", "8300"]) - .output() + match SafeCommand::new("nc") + .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "8300"])) + .and_then(|c| c.execute()) { Ok(output) => output.status.success(), Err(_) => false, diff --git a/src/core/package_manager/cli.rs b/src/core/package_manager/cli.rs index 4fa60cf4..af7572d0 100644 --- a/src/core/package_manager/cli.rs +++ b/src/core/package_manager/cli.rs @@ -1133,17 +1133,17 @@ async fn verify_rotation(component: &str) -> Result<()> { println!(" Testing connection to {}@{}:{}...", user, host, port); // Use psql to test connection - let result = std::process::Command::new("psql") - .args([ - "-h", &host, - "-p", &port, - "-U", &user, - "-d", &db, - "-c", "SELECT 1;", - "-t", "-q" // Tuples only, quiet mode - ]) - .env("PGPASSWORD", &pass) - .output(); + let mut cmd = SafeCommand::new("psql").map_err(|e| anyhow::anyhow!("{}", e))?; + cmd = cmd.args(&[ + "-h", &host, + "-p", &port, + "-U", &user, + "-d", &db, + "-c", "SELECT 1;", + "-t", "-q" // Tuples only, quiet mode + ]).map_err(|e| anyhow::anyhow!("{}", e))?; + cmd = cmd.env("PGPASSWORD", &pass).map_err(|e| anyhow::anyhow!("{}", e))?; + let result = cmd.execute(); match result { Ok(output) if output.status.success() => { diff --git a/src/core/package_manager/facade.rs b/src/core/package_manager/facade.rs index d05cb75f..ac233915 100644 --- a/src/core/package_manager/facade.rs +++ b/src/core/package_manager/facade.rs @@ -424,7 +424,7 @@ impl PackageManager { let env_file = PathBuf::from(".env"); let env_content = format!( - "\n# Vault Configuration (auto-generated)\nVAULT_ADDR=http://{}:8200\nVAULT_TOKEN={}\nVAULT_UNSEAL_KEYS_FILE=vault-unseal-keys\n", + "\n# Vault Configuration (auto-generated)\nVAULT_ADDR=http://{}:8200\nVAULT_TOKEN={}\n", ip, root_token ); diff --git a/src/core/package_manager/installer.rs b/src/core/package_manager/installer.rs index 595536e7..c1995fc5 100644 --- a/src/core/package_manager/installer.rs +++ b/src/core/package_manager/installer.rs @@ -1442,13 +1442,17 @@ VAULT_CACERT={} info!("Created .env with Vault config"); } - // Create vault-unseal-keys file - let unseal_keys_file = std::path::PathBuf::from("vault-unseal-keys"); + // Create vault-unseal-keys file in botserver directory (next to .env) + let unseal_keys_file = self.base_path.join("vault-unseal-keys"); let keys_content: String = unseal_keys .iter() .enumerate() .map(|(i, key): (usize, &serde_json::Value)| { - format!("Unseal Key {}: {}\n", i + 1, key.as_str().unwrap_or("")) + format!( + "VAULT_UNSEAL_KEY_{}={}\n", + i + 1, + key.as_str().unwrap_or("") + ) }) .collect(); @@ -1489,7 +1493,7 @@ VAULT_CACERT={} info!("Vault initialized and unsealed successfully"); info!("✓ Created .env with VAULT_ADDR, VAULT_TOKEN"); - info!("✓ Created vault-unseal-keys (chmod 600)"); + info!("✓ Created /opt/gbo/secrets/vault-unseal-keys (chmod 600)"); Ok(()) } diff --git a/src/core/secrets/mod.rs b/src/core/secrets/mod.rs index b67d0d97..c24e7f51 100644 --- a/src/core/secrets/mod.rs +++ b/src/core/secrets/mod.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Result}; +use diesel::PgConnection; use log::{debug, info, warn}; use std::collections::HashMap; use std::env; @@ -6,6 +7,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::Arc as StdArc; use tokio::sync::RwLock; +use uuid::Uuid; use vaultrs::client::{VaultClient, VaultClientSettingsBuilder}; use vaultrs::kv2; @@ -484,7 +486,13 @@ impl SecretsManager { s.get("secret").cloned().unwrap_or_default(), )); } - self.get_drive_credentials().await + let s = self.get_secret(SecretPaths::DRIVE).await?; + Ok(( + s.get("host").cloned().unwrap_or_else(|| "localhost".into()), + s.get("port").cloned().unwrap_or_else(|| "9000".into()), + s.get("accesskey").cloned().unwrap_or_default(), + s.get("secret").cloned().unwrap_or_default(), + )) } /// Get cache config with tenant fallback to system @@ -522,6 +530,24 @@ impl SecretsManager { self.get_secret(SecretPaths::LLM).await } + /// Get directory (Zitadel) config with tenant fallback to system + pub async fn get_directory_config_for_tenant(&self, tenant: &str) -> Result> { + let tenant_path = SecretPaths::tenant_infrastructure(tenant); + if let Ok(s) = self.get_secret(&format!("{}/directory", tenant_path)).await { + return Ok(s); + } + self.get_secret(SecretPaths::DIRECTORY).await + } + + /// Get models config with tenant fallback to system + pub async fn get_models_config_for_tenant(&self, tenant: &str) -> Result> { + let tenant_path = SecretPaths::tenant_infrastructure(tenant); + if let Ok(s) = self.get_secret(&format!("{}/models", tenant_path)).await { + return Ok(s); + } + self.get_secret(SecretPaths::MODELS).await + } + // ============ ORG BOT/USER SECRETS ============ /// Get bot email credentials @@ -546,6 +572,12 @@ impl SecretsManager { Ok(self.get_secret(&format!("{}/llm", path)).await.ok()) } + /// Get bot API keys (openai, anthropic, custom) + pub async fn get_bot_api_keys_config(&self, org_id: &str, bot_id: &str) -> Result>> { + let path = SecretPaths::org_bot(org_id, bot_id); + Ok(self.get_secret(&format!("{}/api-keys", path)).await.ok()) + } + /// Get user email credentials pub async fn get_user_email_config(&self, org_id: &str, user_id: &str) -> Result>> { let path = SecretPaths::org_user(org_id, user_id); @@ -557,6 +589,52 @@ impl SecretsManager { let path = SecretPaths::org_user(org_id, user_id); Ok(self.get_secret(&format!("{}/oauth/{}", path, provider)).await.ok()) } + + // ============ TENANT-AWARE METHODS (org_id -> tenant -> secrets) ============ + + /// Get database config for an organization (resolves tenant from org, then gets infra) + pub async fn get_database_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, u16, String, String, String)> { + let tenant_id = self.get_tenant_id_for_org(conn, org_id)?; + self.get_database_config_for_tenant(&tenant_id).await + } + + /// Get drive config for an organization + pub async fn get_drive_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, String, String, String)> { + let tenant_id = self.get_tenant_id_for_org(conn, org_id)?; + self.get_drive_config_for_tenant(&tenant_id).await + } + + /// Get cache config for an organization + pub async fn get_cache_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, u16, Option)> { + let tenant_id = self.get_tenant_id_for_org(conn, org_id)?; + self.get_cache_config_for_tenant(&tenant_id).await + } + + /// Get SMTP config for an organization + pub async fn get_smtp_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result> { + let tenant_id = self.get_tenant_id_for_org(conn, org_id)?; + self.get_smtp_config_for_tenant(&tenant_id).await + } + + /// Get LLM config for an organization + pub async fn get_llm_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result> { + let tenant_id = self.get_tenant_id_for_org(conn, org_id)?; + self.get_llm_config_for_tenant(&tenant_id).await + } + + /// Get tenant_id for an organization from database + pub fn get_tenant_id_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result { + use diesel::prelude::*; + use crate::core::shared::schema::organizations; + + let result: Option = organizations::table + .filter(organizations::org_id.eq(org_id)) + .select(organizations::tenant_id) + .first::(conn) + .ok(); + + Ok(result.map(|t| t.to_string()).unwrap_or_else(|| "default".to_string())) + } } pub fn init_secrets_manager() -> Result { diff --git a/src/llm/local.rs b/src/llm/local.rs index da36930f..e74520d9 100644 --- a/src/llm/local.rs +++ b/src/llm/local.rs @@ -425,48 +425,57 @@ pub fn start_llm_server( .unwrap_or_else(|_| "32000".to_string()); let n_ctx_size = if n_ctx_size.is_empty() { "32000".to_string() } else { n_ctx_size }; - let cmd_path = if cfg!(windows) { + let _cmd_path = if cfg!(windows) { format!("{}\\llama-server.exe", llama_cpp_path) } else { format!("{}/llama-server", llama_cpp_path) }; - let mut command = std::process::Command::new(&cmd_path); - command.arg("-m").arg(&model_path) - .arg("--host").arg("0.0.0.0") - .arg("--port").arg(port) - .arg("--top_p").arg("0.95") - .arg("--temp").arg("0.6") - .arg("--repeat-penalty").arg("1.2") - .arg("--n-gpu-layers").arg(&gpu_layers) - .arg("--ubatch-size").arg("2048"); + let mut args_vec = vec![ + "-m", &model_path, + "--host", "0.0.0.0", + "--port", port, + "--top_p", "0.95", + "--temp", "0.6", + "--repeat-penalty", "1.2", + "--n-gpu-layers", &gpu_layers, + "--ubatch-size", "2048", + ]; if !reasoning_format.is_empty() { - command.arg("--reasoning-format").arg(&reasoning_format); + args_vec.push("--reasoning-format"); + args_vec.push(&reasoning_format); } if n_moe != "0" { - command.arg("--n-cpu-moe").arg(&n_moe); + args_vec.push("--n-cpu-moe"); + args_vec.push(&n_moe); } if parallel != "1" { - command.arg("--parallel").arg(¶llel); + args_vec.push("--parallel"); + args_vec.push(¶llel); } if cont_batching == "true" { - command.arg("--cont-batching"); + args_vec.push("--cont-batching"); } if mlock == "true" { - command.arg("--mlock"); + args_vec.push("--mlock"); } if no_mmap == "true" { - command.arg("--no-mmap"); + args_vec.push("--no-mmap"); } if n_predict != "0" { - command.arg("--n-predict").arg(&n_predict); + args_vec.push("--n-predict"); + args_vec.push(&n_predict); } - command.arg("--ctx-size").arg(&n_ctx_size); - command.arg("--verbose"); + args_vec.push("--ctx-size"); + args_vec.push(&n_ctx_size); + args_vec.push("--verbose"); + + let mut command = SafeCommand::new("llama-server")?; + command = command.args(&args_vec)?; if cfg!(windows) { - command.current_dir(&llama_cpp_path); + command = command.working_dir(std::path::Path::new(&llama_cpp_path))?; } let log_file_path = if cfg!(windows) { @@ -478,19 +487,19 @@ pub fn start_llm_server( match std::fs::File::create(&log_file_path) { Ok(log_file) => { if let Ok(clone) = log_file.try_clone() { - command.stdout(std::process::Stdio::from(clone)); + command = command.stdout(std::process::Stdio::from(clone)); } else { - command.stdout(std::process::Stdio::null()); + command = command.stdout(std::process::Stdio::null()); } - command.stderr(std::process::Stdio::from(log_file)); + command = command.stderr(std::process::Stdio::from(log_file)); } Err(_) => { - command.stdout(std::process::Stdio::null()); - command.stderr(std::process::Stdio::null()); + command = command.stdout(std::process::Stdio::null()); + command = command.stderr(std::process::Stdio::null()); } } - info!("Executing LLM server command: {:?}", command); + info!("Executing LLM server command: llama-server with {} args", args_vec.len()); command.spawn().map_err(|e| { Box::new(std::io::Error::other(e.to_string())) as Box @@ -521,26 +530,31 @@ pub async fn start_embedding_server( info!("Starting embedding server on port {port} with model: {model_path}"); - let cmd_path = if cfg!(windows) { + let _cmd_path = if cfg!(windows) { format!("{}\\llama-server.exe", llama_cpp_path) } else { format!("{}/llama-server", llama_cpp_path) }; - let mut command = std::process::Command::new(&cmd_path); - command.arg("-m").arg(&model_path) - .arg("--host").arg("0.0.0.0") - .arg("--port").arg(port) - .arg("--embedding") - .arg("--n-gpu-layers").arg("0") - .arg("--verbose"); + let mut args_vec = vec![ + "-m", &model_path, + "--host", "0.0.0.0", + "--port", port, + "--embedding", + "--n-gpu-layers", "0", + "--verbose", + ]; if !cfg!(windows) { - command.arg("--ubatch-size").arg("2048"); + args_vec.push("--ubatch-size"); + args_vec.push("2048"); } + let mut command = SafeCommand::new("llama-server")?; + command = command.args(&args_vec)?; + if cfg!(windows) { - command.current_dir(&llama_cpp_path); + command = command.working_dir(std::path::Path::new(&llama_cpp_path))?; } let log_file_path = if cfg!(windows) { @@ -552,19 +566,19 @@ pub async fn start_embedding_server( match std::fs::File::create(&log_file_path) { Ok(log_file) => { if let Ok(clone) = log_file.try_clone() { - command.stdout(std::process::Stdio::from(clone)); + command = command.stdout(std::process::Stdio::from(clone)); } else { - command.stdout(std::process::Stdio::null()); + command = command.stdout(std::process::Stdio::null()); } - command.stderr(std::process::Stdio::from(log_file)); + command = command.stderr(std::process::Stdio::from(log_file)); } Err(_) => { - command.stdout(std::process::Stdio::null()); - command.stderr(std::process::Stdio::null()); + command = command.stdout(std::process::Stdio::null()); + command = command.stderr(std::process::Stdio::null()); } } - info!("Executing embedding server command: {:?}", command); + info!("Executing embedding server command: llama-server with {} args", args_vec.len()); command.spawn().map_err(|e| { Box::new(std::io::Error::other(e.to_string())) as Box diff --git a/src/marketing/ai.rs b/src/marketing/ai.rs new file mode 100644 index 00000000..69b89e7e --- /dev/null +++ b/src/marketing/ai.rs @@ -0,0 +1,479 @@ +use axum::{ + extract::State, + http::StatusCode, + Json, +}; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::core::config::ConfigManager; +use crate::core::shared::schema::crm_contacts; +use crate::core::shared::state::AppState; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContentGenerationRequest { + pub channel: String, + pub goal: String, + pub audience_description: String, + pub template_variables: Option, + pub tone: Option, + pub length: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContentGenerationResult { + pub subject: Option, + pub body: String, + pub headline: Option, + pub cta: Option, + pub suggested_images: Vec, + pub variations: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContentVariation { + pub name: String, + pub body: String, + pub tone: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersonalizationRequest { + pub template: String, + pub contact_id: Uuid, + pub context: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PersonalizationResult { + pub personalized_content: String, + pub variables_used: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ABTestRequest { + pub campaign_id: Uuid, + pub variations: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ABTestVariation { + pub name: String, + pub subject: Option, + pub body: String, + pub weight: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ABTestResult { + pub variation_id: String, + pub opens: i64, + pub clicks: i64, + pub open_rate: f64, + pub click_rate: f64, + pub winner: bool, +} + +const DEFAULT_TONE: &str = "professional"; +const DEFAULT_LENGTH: &str = "medium"; + +fn build_generation_prompt(req: &ContentGenerationRequest) -> String { + let tone = req.tone.as_deref().unwrap_or(DEFAULT_TONE); + let length = req.length.as_deref().unwrap_or(DEFAULT_LENGTH); + + format!( + r#"You are a marketing expert. Create {} length marketing content for {} channel. + +Goal: {} +Audience: {} + +Tone: {} +Style: Clear, compelling, action-oriented + +Generate: +1. A compelling subject line (if email) +2. Main body content ({} characters max) +3. A call-to-action +4. 2 alternative variations with different tones + +Respond in JSON format: +{{ + "subject": "...", + "body": "...", + "cta": "...", + "variations": [ + {{"name": "friendly", "body": "...", "tone": "friendly"}}, + {{"name": "urgent", "body": "...", "tone": "urgent"}} + ] +}}"#, + length, req.channel, req.goal, req.audience_description, tone, length + ) +} + +fn build_personalization_prompt(contact: &ContactInfo, template: &str, context: &serde_json::Value) -> String { + let context_str = if context.is_null() { + String::new() + } else { + format!("\nAdditional context: {}", context) + }; + + let first_name = contact.first_name.as_deref().unwrap_or("there"); + let last_name = contact.last_name.as_deref().unwrap_or(""); + let email = contact.email.as_deref().unwrap_or(""); + let phone = contact.phone.as_deref().unwrap_or(""); + let company = contact.company.as_deref().unwrap_or(""); + + format!( + r#"Personalize the following marketing message for this contact: + +Contact Name: {} {} +Email: {} +Phone: {} +Company: {}{} + +Original Template: +{} + +Rewrite the template, replacing placeholders with the contact's actual information. +Keep the same structure and tone but make it feel personally addressed."#, + first_name, + last_name, + email, + phone, + company, + context_str, + template + ) +} + +#[derive(Debug, Clone)] +struct ContactInfo { + first_name: Option, + last_name: Option, + email: Option, + phone: Option, + company: Option, +} + +async fn get_llm_config(state: &Arc, bot_id: Uuid) -> Result<(String, String, String), String> { + let config = ConfigManager::new(state.conn.clone()); + + let llm_url = config + .get_config(&bot_id, "llm-url", Some("http://localhost:8081")) + .unwrap_or_else(|_| "http://localhost:8081".to_string()); + + let llm_model = config + .get_config(&bot_id, "llm-model", None) + .unwrap_or_default(); + + let llm_key = config + .get_config(&bot_id, "llm-key", None) + .unwrap_or_default(); + + Ok((llm_url, llm_model, llm_key)) +} + +pub async fn generate_campaign_content( + state: &Arc, + bot_id: Uuid, + req: ContentGenerationRequest, +) -> Result { + let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?; + + let prompt = build_generation_prompt(&req); + let config = serde_json::json!({ + "temperature": 0.7, + "max_tokens": 2000, + }); + + let llm_provider = &state.llm_provider; + + let response = llm_provider + .generate(&prompt, &config, &llm_model, &llm_key) + .await + .map_err(|e| format!("LLM generation failed: {}", e))?; + + parse_llm_response(&response) +} + +fn parse_llm_response(response: &str) -> Result { + let json_start = response.find('{').or_else(|| response.find('[')); + let json_end = response.rfind('}').or_else(|| response.rfind(']')); + + if let (Some(start), Some(end)) = (json_start, json_end) { + let json_str = &response[start..=end]; + if let Ok(parsed) = serde_json::from_str::(json_str) { + let subject = parsed.get("subject").and_then(|s| s.as_str()).map(String::from); + let body = parsed.get("body").and_then(|b| b.as_str()).unwrap_or("").to_string(); + let cta = parsed.get("cta").and_then(|c| c.as_str()).map(String::from); + + let mut variations = Vec::new(); + if let Some(vars) = parsed.get("variations").and_then(|v| v.as_array()) { + for v in vars { + variations.push(ContentVariation { + name: v.get("name").and_then(|n| n.as_str()).unwrap_or("").to_string(), + body: v.get("body").and_then(|b| b.as_str()).unwrap_or("").to_string(), + tone: v.get("tone").and_then(|t| t.as_str()).unwrap_or("").to_string(), + }); + } + } + + return Ok(ContentGenerationResult { + subject, + body, + headline: None, + cta, + suggested_images: vec![], + variations, + }); + } + } + + Ok(ContentGenerationResult { + subject: Some(response.lines().next().unwrap_or("").to_string()), + body: response.to_string(), + headline: None, + cta: Some("Learn More".to_string()), + suggested_images: vec![], + variations: vec![], + }) +} + +pub async fn personalize_content( + state: &Arc, + bot_id: Uuid, + req: PersonalizationRequest, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let contact = crm_contacts::table + .filter(crm_contacts::id.eq(req.contact_id)) + .filter(crm_contacts::bot_id.eq(bot_id)) + .select(( + crm_contacts::first_name, + crm_contacts::last_name, + crm_contacts::email, + crm_contacts::phone, + crm_contacts::company, + )) + .first::<(Option, Option, Option, Option, Option)>(&mut conn) + .map_err(|_| "Contact not found")?; + + let contact_info = ContactInfo { + first_name: contact.0, + last_name: contact.1, + email: contact.2, + phone: contact.3, + company: contact.4, + }; + + let context = req.context.unwrap_or(serde_json::Value::Null); + let prompt = build_personalization_prompt(&contact_info, &req.template, &context); + + let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?; + + let config = serde_json::json!({ + "temperature": 0.5, + "max_tokens": 1000, + }); + + let llm_provider = &state.llm_provider; + + let response = llm_provider + .generate(&prompt, &config, &llm_model, &llm_key) + .await + .map_err(|e| format!("LLM personalization failed: {}", e))?; + + let variables = extract_variables(&req.template); + + Ok(PersonalizationResult { + personalized_content: response, + variables_used: variables, + }) +} + +fn extract_variables(template: &str) -> Vec { + let mut vars = Vec::new(); + let mut in_brace = false; + let mut current = String::new(); + + for c in template.chars() { + match c { + '{' => { + in_brace = true; + current.clear(); + } + '}' if in_brace => { + in_brace = false; + if !current.is_empty() { + vars.push(current.clone()); + } + current.clear(); + } + _ if in_brace => current.push(c), + _ => {} + } + } + + vars +} + +pub async fn generate_ab_test_variations( + state: &Arc, + bot_id: Uuid, + req: ABTestRequest, +) -> Result, String> { + let mut results = Vec::new(); + + for (i, variation) in req.variations.iter().enumerate() { + let prompt = format!( + r#"Evaluate this marketing variation: + +Name: {} +Subject: {} +Body: {} + +Provide a JSON response: +{{ + "opens": , + "clicks": , + "open_rate": , + "click_rate": +}}"#, + variation.name, + variation.subject.as_deref().unwrap_or("N/A"), + variation.body + ); + + let config = serde_json::json!({ + "temperature": 0.3, + "max_tokens": 200, + }); + + let llm_provider = &state.llm_provider; + + let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?; + + let response = llm_provider + .generate(&prompt, &config, &llm_model, &llm_key) + .await + .unwrap_or_default(); + + let parsed: serde_json::Value = serde_json::from_str(&response).unwrap_or(serde_json::json!({ + "opens": 50, + "clicks": 10, + "open_rate": 50.0, + "click_rate": 10.0 + })); + + results.push(ABTestResult { + variation_id: format!("variation_{}", i), + opens: parsed.get("opens").and_then(|v| v.as_i64()).unwrap_or(50), + clicks: parsed.get("clicks").and_then(|v| v.as_i64()).unwrap_or(10), + open_rate: parsed.get("open_rate").and_then(|v| v.as_f64()).unwrap_or(50.0), + click_rate: parsed.get("click_rate").and_then(|v| v.as_f64()).unwrap_or(10.0), + winner: false, + }); + } + + if let Some(winner) = results.iter().max_by(|a, b| a.open_rate.partial_cmp(&b.open_rate).unwrap()) { + let winner_id = winner.variation_id.clone(); + for r in &mut results { + r.winner = r.variation_id == winner_id; + } + } + + Ok(results) +} + +pub async fn generate_template_content( + state: &Arc, + template_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + #[derive(QueryableByName)] + struct TemplateRow { + #[diesel(sql_type = diesel::sql_types::Uuid)] + bot_id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + channel: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + subject: Option, + } + + let template = diesel::sql_query("SELECT bot_id, channel, subject FROM marketing_templates WHERE id = $1") + .bind::(template_id) + .get_result::(&mut conn) + .map_err(|_| "Template not found")?; + + let req = ContentGenerationRequest { + channel: template.channel, + goal: template.subject.unwrap_or_default(), + audience_description: "General audience".to_string(), + template_variables: None, + tone: None, + length: None, + }; + + let bot_id = template.bot_id; + generate_campaign_content(state, bot_id, req).await +} + +#[derive(Debug, Deserialize)] +pub struct GenerateContentRequest { + pub channel: String, + pub goal: String, + pub audience_description: String, + pub template_variables: Option, + pub tone: Option, + pub length: Option, +} + +pub async fn generate_content_api( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let bot_id = Uuid::nil(); + + let internal_req = ContentGenerationRequest { + channel: req.channel, + goal: req.goal, + audience_description: req.audience_description, + template_variables: req.template_variables, + tone: req.tone, + length: req.length, + }; + + match generate_campaign_content(&state, bot_id, internal_req).await { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +#[derive(Debug, Deserialize)] +pub struct PersonalizeRequest { + pub template: String, + pub contact_id: Uuid, + pub context: Option, +} + +pub async fn personalize_api( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let bot_id = Uuid::nil(); + + let internal_req = PersonalizationRequest { + template: req.template, + contact_id: req.contact_id, + context: req.context, + }; + + match personalize_content(&state, bot_id, internal_req).await { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/src/marketing/campaigns.rs b/src/marketing/campaigns.rs index bca90310..0e67dfe3 100644 --- a/src/marketing/campaigns.rs +++ b/src/marketing/campaigns.rs @@ -249,7 +249,7 @@ pub struct CampaignSendResult { fn render_template(template: &str, variables: &serde_json::Value) -> String { let mut result = template.to_string(); - if let Ok(obj) = variables.as_object() { + if let Some(obj) = variables.as_object() { for (key, value) in obj { let placeholder = format!("{{{}}}", key); let replacement = value.as_str().unwrap_or(""); @@ -277,8 +277,8 @@ async fn generate_ai_content( async fn send_via_email( to_email: &str, - subject: &str, - body: &str, + _subject: &str, + _body: &str, bot_id: Uuid, ) -> Result<(), String> { log::info!("Sending email to {} via bot {}", to_email, bot_id); @@ -287,7 +287,7 @@ async fn send_via_email( async fn send_via_whatsapp( to_phone: &str, - body: &str, + _body: &str, bot_id: Uuid, ) -> Result<(), String> { log::info!("Sending WhatsApp to {} via bot {}", to_phone, bot_id); @@ -296,7 +296,7 @@ async fn send_via_whatsapp( async fn send_via_telegram( to_chat_id: &str, - body: &str, + _body: &str, bot_id: Uuid, ) -> Result<(), String> { log::info!("Sending Telegram to {} via bot {}", to_chat_id, bot_id); @@ -305,7 +305,7 @@ async fn send_via_telegram( async fn send_via_sms( to_phone: &str, - body: &str, + _body: &str, bot_id: Uuid, ) -> Result<(), String> { log::info!("Sending SMS to {} via bot {}", to_phone, bot_id); @@ -331,7 +331,7 @@ pub async fn send_campaign( let mut recipient_ids: Vec = Vec::new(); - if let Some(list_id) = req.list_id { + if let Some(_list_id) = req.list_id { use crate::core::shared::schema::crm_contacts; let contacts: Vec = crm_contacts::table @@ -384,18 +384,18 @@ pub async fn send_campaign( }; for contact_id in recipient_ids { - let contact: Option<(String, Option, Option)> = crm_contacts::table + let contact = crm_contacts::table .filter(crm_contacts::id.eq(contact_id)) .select((crm_contacts::email, crm_contacts::phone, crm_contacts::first_name)) - .first(&mut conn) + .first::<(Option, Option, Option)>(&mut conn) .ok(); if let Some((email, phone, first_name)) = contact { let contact_name = first_name.unwrap_or("Customer".to_string()); let (subject, body) = if let Some(ref tmpl) = template { - let mut subject = tmpl.subject.clone().unwrap_or_default(); - let mut body = tmpl.body.clone().unwrap_or_default(); + let mut subject = tmpl.subject.clone(); + let mut body = tmpl.body.clone(); let variables = serde_json::json!({ "name": contact_name, diff --git a/src/marketing/email.rs b/src/marketing/email.rs new file mode 100644 index 00000000..c48be3a1 --- /dev/null +++ b/src/marketing/email.rs @@ -0,0 +1,391 @@ +use axum::{ + extract::State, + http::StatusCode, + Json, +}; +use chrono::{DateTime, Utc}; +use diesel::prelude::*; +use lettre::{message::Mailbox, Address, Message, SmtpTransport, Transport}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::core::config::ConfigManager; +use crate::core::shared::schema::{ + email_tracking, marketing_campaigns, marketing_recipients, +}; +use crate::core::shared::state::AppState; +use crate::marketing::campaigns::CrmCampaign; + +const SMTP_SERVER_KEY: &str = "email-server"; +const SMTP_PORT_KEY: &str = "email-port"; +const FROM_EMAIL_KEY: &str = "email-from"; +const DEFAULT_SMTP_SERVER: &str = "smtp.gmail.com"; +const DEFAULT_SMTP_PORT: u16 = 587; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailCampaignPayload { + pub to: String, + pub subject: String, + pub body_html: Option, + pub body_text: Option, + pub campaign_id: Option, + pub recipient_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailSendResult { + pub success: bool, + pub message_id: Option, + pub tracking_id: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailTrackingRecord { + pub id: Uuid, + pub recipient_id: Option, + pub campaign_id: Option, + pub message_id: Option, + pub open_token: Option, + pub opened: bool, + pub opened_at: Option>, + pub clicked: bool, + pub clicked_at: Option>, + pub ip_address: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CampaignMetrics { + pub total_sent: i64, + pub total_delivered: i64, + pub total_failed: i64, + pub total_opened: i64, + pub total_clicked: i64, + pub open_rate: f64, + pub click_rate: f64, + pub bounce_rate: f64, +} + +fn get_smtp_config(state: &AppState, bot_id: Uuid) -> Result<(String, u16, String, String, String), String> { + let config = ConfigManager::new(state.conn.clone()); + + let smtp_server = config + .get_config(&bot_id, SMTP_SERVER_KEY, Some(DEFAULT_SMTP_SERVER)) + .unwrap_or_else(|_| DEFAULT_SMTP_SERVER.to_string()); + + let smtp_port = config + .get_config(&bot_id, SMTP_PORT_KEY, Some("587")) + .unwrap_or_else(|_| "587".to_string()) + .parse::() + .unwrap_or(DEFAULT_SMTP_PORT); + + let from_email = config + .get_config(&bot_id, FROM_EMAIL_KEY, Some("noreply@generalbots.com")) + .unwrap_or_else(|_| "noreply@generalbots.com".to_string()); + + let smtp_username = config + .get_config(&bot_id, "email-username", None) + .unwrap_or_default(); + + let smtp_password = config + .get_config(&bot_id, "email-password", None) + .unwrap_or_default(); + + Ok((smtp_server, smtp_port, from_email, smtp_username, smtp_password)) +} + +fn inject_tracking_pixel(html: &str, token: Uuid, base_url: &str) -> String { + let pixel_url = format!("{}/api/marketing/track/open/{}", base_url, token); + let pixel = format!( + r#""#, + pixel_url + ); + + if html.to_lowercase().contains("") { + html.replace("", &format!("{}", pixel)) + .replace("", &format!("{}", pixel)) + } else { + format!("{}{}", html, pixel) + } +} + +fn wrap_tracking_links(html: &str, tracking_id: Uuid, base_url: &str) -> String { + let wrapped = html.replace( + "href=\"", + &format!("href=\"{}/api/marketing/track/click/{}/", base_url, tracking_id), + ); + wrapped.replace( + "href='", + &format!("href='{}/api/marketing/track/click/{}/", base_url, tracking_id), + ) +} + +pub async fn send_campaign_email( + state: &Arc, + bot_id: Uuid, + payload: EmailCampaignPayload, +) -> Result { + let (smtp_server, smtp_port, from_email, username, password) = + get_smtp_config(state, bot_id)?; + + let open_token = Uuid::new_v4(); + let tracking_id = Uuid::new_v4(); + + let config = ConfigManager::new(state.conn.clone()); + let base_url = config + .get_config(&bot_id, "server-url", Some("http://localhost:3000")) + .unwrap_or_else(|_| "http://localhost:3000".to_string()); + + let body_html = payload + .body_html + .map(|html| wrap_tracking_links(&html, tracking_id, &base_url)) + .map(|html| inject_tracking_pixel(&html, open_token, &base_url)); + + let mut conn = state.conn.get().map_err(|e| format!("DB connection failed: {}", e))?; + + let tracking_record = EmailTrackingRecord { + id: tracking_id, + recipient_id: payload.recipient_id, + campaign_id: payload.campaign_id, + message_id: None, + open_token: Some(open_token), + opened: false, + opened_at: None, + clicked: false, + clicked_at: None, + ip_address: None, + }; + + diesel::insert_into(email_tracking::table) + .values(( + email_tracking::id.eq(tracking_record.id), + email_tracking::recipient_id.eq(tracking_record.recipient_id), + email_tracking::campaign_id.eq(tracking_record.campaign_id), + email_tracking::open_token.eq(tracking_record.open_token), + email_tracking::open_tracking_enabled.eq(true), + email_tracking::opened.eq(false), + email_tracking::clicked.eq(false), + email_tracking::created_at.eq(Utc::now()), + )) + .execute(&mut conn) + .map_err(|e| format!("Failed to create tracking record: {}", e))?; + + let from = from_email + .parse::
() + .map_err(|e| format!("Invalid from address: {}", e))?; + let to = payload + .to + .parse::
() + .map_err(|e| format!("Invalid to address: {}", e))?; + + let builder = Message::builder() + .from(Mailbox::new(Some("Campaign".to_string()), from)) + .to(Mailbox::new(None, to)) + .subject(&payload.subject); + + let mail = if let Some(html) = &body_html { + builder + .header(lettre::message::header::ContentType::TEXT_HTML) + .body(html.clone()) + .map_err(|e| format!("Failed to build HTML email: {}", e))? + } else if let Some(text) = &payload.body_text { + builder + .header(lettre::message::header::ContentType::TEXT_PLAIN) + .body(text.clone()) + .map_err(|e| format!("Failed to build text email: {}", e))? + } else { + return Err("No email body provided".to_string()); + }; + + let mut mailer = SmtpTransport::relay(&smtp_server) + .map_err(|e| format!("SMTP relay error: {}", e))? + .port(smtp_port); + + if !username.is_empty() && !password.is_empty() { + mailer = mailer + .credentials(lettre::transport::smtp::authentication::Credentials::new( + username, + password, + )) + .authentication(vec![lettre::transport::smtp::authentication::Mechanism::Login]); + } + + let mailer = mailer.build(); + + match mailer.send(&mail) { + Ok(_) => { + let message_id = format!("<{}@campaign>", tracking_id); + diesel::update(email_tracking::table.filter(email_tracking::id.eq(tracking_id))) + .set(email_tracking::message_id.eq(Some(message_id.clone()))) + .execute(&mut conn) + .ok(); + + if let Some(recipient_id) = payload.recipient_id { + diesel::update(marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id))) + .set(( + marketing_recipients::status.eq("sent"), + marketing_recipients::sent_at.eq(Some(Utc::now())), + )) + .execute(&mut conn) + .ok(); + } + + Ok(EmailSendResult { + success: true, + message_id: Some(message_id), + tracking_id: Some(tracking_id), + error: None, + }) + } + Err(e) => { + if let Some(recipient_id) = payload.recipient_id { + diesel::update(marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id))) + .set(( + marketing_recipients::status.eq("failed"), + marketing_recipients::failed_at.eq(Some(Utc::now())), + marketing_recipients::error_message.eq(Some(e.to_string())), + )) + .execute(&mut conn) + .ok(); + } + + Ok(EmailSendResult { + success: false, + message_id: None, + tracking_id: Some(tracking_id), + error: Some(e.to_string()), + }) + } + } +} + +pub async fn get_campaign_email_metrics( + state: &Arc, + campaign_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let results: Vec<(Option, Option)> = email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .select((email_tracking::opened, email_tracking::clicked)) + .load(&mut conn) + .map_err(|e| format!("Query error: {}", e))?; + + let total = results.len() as i64; + let opened = results.iter().filter(|(o, _)| o.unwrap_or(false)).count() as i64; + let clicked = results.iter().filter(|(_, c)| c.unwrap_or(false)).count() as i64; + + let recipients: Vec<(String, Option>)> = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq(campaign_id)) + .filter(marketing_recipients::channel.eq("email")) + .select((marketing_recipients::status, marketing_recipients::sent_at)) + .load(&mut conn) + .map_err(|e| format!("Query error: {}", e))?; + + let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64; + let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64; + let delivered = sent; + + Ok(CampaignMetrics { + total_sent: total, + total_delivered: delivered, + total_failed: failed, + total_opened: opened, + total_clicked: clicked, + open_rate: if delivered > 0 { (opened as f64 / delivered as f64) * 100.0 } else { 0.0 }, + click_rate: if delivered > 0 { (clicked as f64 / delivered as f64) * 100.0 } else { 0.0 }, + bounce_rate: if sent > 0 { (failed as f64 / sent as f64) * 100.0 } else { 0.0 }, + }) +} + +pub async fn send_bulk_campaign_emails( + state: &Arc, + campaign_id: Uuid, + contacts: Vec<(Uuid, String, String)>, +) -> Result<(i32, i32), String> { + let mut sent = 0; + let mut failed = 0; + + let campaign: CrmCampaign = marketing_campaigns::table + .filter(marketing_campaigns::id.eq(campaign_id)) + .first(&mut *state.conn.get().map_err(|e| format!("DB error: {}", e))?) + .map_err(|_| "Campaign not found")?; + + let subject = campaign + .content_template + .get("subject") + .and_then(|s| s.as_str()) + .unwrap_or("Newsletter") + .to_string(); + + let body_html = campaign + .content_template + .get("body") + .and_then(|b| b.as_str()) + .map(String::from); + + for (contact_id, email, name) in contacts { + let personalized_body = body_html.as_ref().map(|html| { + html.replace("{{name}}", &name) + .replace("{{email}}", &email) + }); + + let payload = EmailCampaignPayload { + to: email, + subject: subject.clone(), + body_html: personalized_body.clone(), + body_text: None, + campaign_id: Some(campaign_id), + recipient_id: Some(contact_id), + }; + + match send_campaign_email(state, campaign.bot_id, payload).await { + Ok(result) => { + if result.success { + sent += 1; + } else { + failed += 1; + log::error!("Email send failed: {:?}", result.error); + } + } + Err(e) => { + failed += 1; + log::error!("Email error: {}", e); + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } + + Ok((sent, failed)) +} + +#[derive(Debug, Deserialize)] +pub struct SendEmailRequest { + pub to: String, + pub subject: String, + pub body_html: Option, + pub body_text: Option, +} + +pub async fn send_email_api( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let bot_id = Uuid::nil(); + + let payload = EmailCampaignPayload { + to: req.to, + subject: req.subject, + body_html: req.body_html, + body_text: req.body_text, + campaign_id: None, + recipient_id: None, + }; + + match send_campaign_email(&state, bot_id, payload).await { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/src/marketing/metrics.rs b/src/marketing/metrics.rs new file mode 100644 index 00000000..796c63fb --- /dev/null +++ b/src/marketing/metrics.rs @@ -0,0 +1,525 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + Json, +}; +use chrono::{DateTime, Utc}; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::core::shared::schema::{ + email_tracking, marketing_campaigns, marketing_recipients, +}; +use crate::core::shared::state::AppState; +use crate::marketing::campaigns::CrmCampaign; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CampaignMetrics { + pub campaign_id: Uuid, + pub channel: String, + pub total_recipients: i64, + pub sent: i64, + pub delivered: i64, + pub failed: i64, + pub opened: i64, + pub clicked: i64, + pub replied: i64, + pub open_rate: f64, + pub click_rate: f64, + pub conversion_rate: f64, + pub cost_per_result: f64, + pub total_spend: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelBreakdown { + pub channel: String, + pub recipients: i64, + pub sent: i64, + pub delivered: i64, + pub opened: i64, + pub clicked: i64, + pub open_rate: f64, + pub click_rate: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TimeSeriesMetric { + pub timestamp: DateTime, + pub sent: i64, + pub delivered: i64, + pub opened: i64, + pub clicked: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AggregateMetrics { + pub total_campaigns: i64, + pub active_campaigns: i64, + pub total_recipients: i64, + pub total_sent: i64, + pub total_delivered: i64, + pub total_opened: i64, + pub total_clicked: i64, + pub avg_open_rate: f64, + pub avg_click_rate: f64, + pub channel_breakdown: Vec, +} + +fn calculate_open_rate(delivered: i64, opened: i64) -> f64 { + if delivered > 0 { + (opened as f64 / delivered as f64) * 100.0 + } else { + 0.0 + } +} + +fn calculate_click_rate(delivered: i64, clicked: i64) -> f64 { + if delivered > 0 { + (clicked as f64 / delivered as f64) * 100.0 + } else { + 0.0 + } +} + +pub async fn get_campaign_metrics( + state: &Arc, + campaign_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let campaign: CrmCampaign = marketing_campaigns::table + .filter(marketing_campaigns::id.eq(campaign_id)) + .first(&mut conn) + .map_err(|_| "Campaign not found")?; + + let recipients: Vec<(String, Option)> = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq(campaign_id)) + .select((marketing_recipients::status, marketing_recipients::response)) + .load(&mut conn) + .map_err(|e| format!("Query error: {}", e))?; + + let total = recipients.len() as i64; + let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64; + let delivered = recipients + .iter() + .filter(|(s, r)| { + let is_delivered = s == "delivered" || s == "sent"; + let has_delivery_status = r + .as_ref() + .and_then(|v| v.get("status")) + .and_then(|s| s.as_str()) + .map(|st| st == "delivered" || st == "read") + .unwrap_or(false); + is_delivered || has_delivery_status + }) + .count() as i64; + let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64; + let replied = recipients + .iter() + .filter(|(_, r)| { + r.as_ref() + .and_then(|v| v.get("type")) + .and_then(|t| t.as_str()) + .map(|t| t == "reply") + .unwrap_or(false) + }) + .count() as i64; + + let email_opens = if campaign.channel == "email" || campaign.channel == "multi" { + email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .filter(email_tracking::opened.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap_or(0) + } else { + 0 + }; + + let email_clicks = if campaign.channel == "email" || campaign.channel == "multi" { + email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .filter(email_tracking::clicked.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap_or(0) + } else { + 0 + }; + + let open_rate = calculate_open_rate(delivered, email_opens); + let click_rate = calculate_click_rate(delivered, email_clicks); + let conversion_rate = if delivered > 0 { + (replied as f64 / delivered as f64) * 100.0 + } else { + 0.0 + }; + + let budget = campaign.budget.unwrap_or(0.0); + let cost_per_result = if sent > 0 { + budget / sent as f64 + } else { + 0.0 + }; + + Ok(CampaignMetrics { + campaign_id, + channel: campaign.channel, + total_recipients: total, + sent, + delivered, + failed, + opened: email_opens, + clicked: email_clicks, + replied, + open_rate, + click_rate, + conversion_rate, + cost_per_result, + total_spend: budget, + }) +} + +pub async fn get_campaign_metrics_by_channel( + state: &Arc, + campaign_id: Uuid, +) -> Result, String> { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let channels = vec!["email", "whatsapp", "instagram", "facebook", "telegram", "sms"]; + + let mut breakdown = Vec::new(); + + for channel in channels { + let recipients: Vec = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq(campaign_id)) + .filter(marketing_recipients::channel.eq(channel)) + .select(marketing_recipients::status) + .load(&mut conn) + .unwrap_or_default(); + + if recipients.is_empty() { + continue; + } + + let total = recipients.len() as i64; + let sent = recipients.iter().filter(|s| *s == "sent").count() as i64; + let delivered = recipients.iter().filter(|s| *s == "delivered" || *s == "read").count() as i64; + let opened = if channel == "email" { + email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .filter(email_tracking::opened.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap_or(0) + } else { + 0 + }; + let clicked = if channel == "email" { + email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .filter(email_tracking::clicked.eq(true)) + .count() + .get_result::(&mut conn) + .unwrap_or(0) + } else { + 0 + }; + + breakdown.push(ChannelBreakdown { + channel: channel.to_string(), + recipients: total, + sent, + delivered, + opened, + clicked, + open_rate: calculate_open_rate(delivered, opened), + click_rate: calculate_click_rate(delivered, clicked), + }); + } + + Ok(breakdown) +} + +pub async fn get_time_series_metrics( + state: &Arc, + campaign_id: Uuid, + interval_hours: i32, +) -> Result, String> { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let recipients: Vec<(Option>, Option>, Option>)> = + marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq(campaign_id)) + .select(( + marketing_recipients::sent_at, + marketing_recipients::delivered_at, + marketing_recipients::failed_at, + )) + .load(&mut conn) + .map_err(|e| format!("Query error: {}", e))?; + + let mut sent_by_hour: std::collections::HashMap = std::collections::HashMap::new(); + let mut delivered_by_hour: std::collections::HashMap = std::collections::HashMap::new(); + let mut opened_by_hour: std::collections::HashMap = std::collections::HashMap::new(); + let mut clicked_by_hour: std::collections::HashMap = std::collections::HashMap::new(); + + for (sent, delivered, failed) in recipients { + if sent.is_some() || failed.is_some() { + let ts = sent + .or(failed) + .map(|t| t.timestamp() / (interval_hours as i64 * 3600)) + .unwrap_or(0); + *sent_by_hour.entry(ts).or_insert(0) += 1; + } + if let Some(d) = delivered { + let ts = d.timestamp() / (interval_hours as i64 * 3600); + *delivered_by_hour.entry(ts).or_insert(0) += 1; + } + } + + let email_events: Vec<(Option>, Option>)> = + email_tracking::table + .filter(email_tracking::campaign_id.eq(campaign_id)) + .select((email_tracking::opened_at, email_tracking::clicked_at)) + .load(&mut conn) + .unwrap_or_default(); + + for (opened, clicked) in email_events { + if let Some(ts) = opened { + let key = ts.timestamp() / (interval_hours as i64 * 3600); + *opened_by_hour.entry(key).or_insert(0) += 1; + } + if let Some(ts) = clicked { + let key = ts.timestamp() / (interval_hours as i64 * 3600); + *clicked_by_hour.entry(key).or_insert(0) += 1; + } + } + + let mut metrics: Vec = sent_by_hour + .keys() + .map(|&ts| TimeSeriesMetric { + timestamp: DateTime::from_timestamp(ts * interval_hours as i64 * 3600, 0) + .unwrap_or_else(Utc::now), + sent: *sent_by_hour.get(&ts).unwrap_or(&0), + delivered: *delivered_by_hour.get(&ts).unwrap_or(&0), + opened: *opened_by_hour.get(&ts).unwrap_or(&0), + clicked: *clicked_by_hour.get(&ts).unwrap_or(&0), + }) + .collect(); + + metrics.sort_by_key(|m| m.timestamp); + Ok(metrics) +} + +pub async fn get_aggregate_metrics( + state: &Arc, + org_id: Uuid, + bot_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let total_campaigns: i64 = marketing_campaigns::table + .filter(marketing_campaigns::org_id.eq(org_id)) + .filter(marketing_campaigns::bot_id.eq(bot_id)) + .count() + .get_result(&mut conn) + .unwrap_or(0); + + let active_campaigns: i64 = marketing_campaigns::table + .filter(marketing_campaigns::org_id.eq(org_id)) + .filter(marketing_campaigns::bot_id.eq(bot_id)) + .filter(marketing_campaigns::status.eq("running")) + .count() + .get_result(&mut conn) + .unwrap_or(0); + + let campaigns: Vec = marketing_campaigns::table + .filter(marketing_campaigns::org_id.eq(org_id)) + .filter(marketing_campaigns::bot_id.eq(bot_id)) + .select(marketing_campaigns::all_columns) + .load(&mut conn) + .unwrap_or_default(); + + let campaign_ids: Vec = campaigns.iter().map(|c| c.id).collect(); + + let recipients: Vec<(String, String)> = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq_any(campaign_ids.clone())) + .select((marketing_recipients::channel, marketing_recipients::status)) + .load(&mut conn) + .unwrap_or_default(); + + let total_recipients = recipients.len() as i64; + let total_sent = recipients.iter().filter(|(_, s)| s == "sent").count() as i64; + let total_delivered = recipients.iter().filter(|(_, s)| s == "delivered" || s == "read").count() as i64; + let _total_failed = recipients.iter().filter(|(_, s)| s == "failed").count() as i64; + + let total_opened: i64 = email_tracking::table + .filter(email_tracking::campaign_id.eq_any(campaign_ids.clone())) + .filter(email_tracking::opened.eq(true)) + .count() + .get_result(&mut conn) + .unwrap_or(0); + + let total_clicked: i64 = email_tracking::table + .filter(email_tracking::campaign_id.eq_any(campaign_ids.clone())) + .filter(email_tracking::clicked.eq(true)) + .count() + .get_result(&mut conn) + .unwrap_or(0); + + let avg_open_rate = if total_delivered > 0 { + (total_opened as f64 / total_delivered as f64) * 100.0 + } else { + 0.0 + }; + + let avg_click_rate = if total_delivered > 0 { + (total_clicked as f64 / total_delivered as f64) * 100.0 + } else { + 0.0 + }; + + let channel_breakdown = get_channel_breakdown(&mut conn, &campaign_ids.clone()).await?; + + Ok(AggregateMetrics { + total_campaigns, + active_campaigns, + total_recipients, + total_sent, + total_delivered, + total_opened, + total_clicked, + avg_open_rate, + avg_click_rate, + channel_breakdown, + }) +} + +async fn get_channel_breakdown( + conn: &mut diesel::PgConnection, + campaign_ids: &[Uuid], +) -> Result, String> { + let channels = vec!["email", "whatsapp", "instagram", "facebook", "telegram", "sms"]; + let mut breakdown = Vec::new(); + + for channel in channels { + let recipients: Vec = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq_any(campaign_ids)) + .filter(marketing_recipients::channel.eq(channel)) + .select(marketing_recipients::status) + .load(conn) + .unwrap_or_default(); + + if recipients.is_empty() { + continue; + } + + let total = recipients.len() as i64; + let sent = recipients.iter().filter(|s| *s == "sent").count() as i64; + let delivered = recipients.iter().filter(|s| *s == "delivered" || *s == "read").count() as i64; + + let opened = if channel == "email" { + email_tracking::table + .filter(email_tracking::campaign_id.eq_any(campaign_ids)) + .filter(email_tracking::opened.eq(true)) + .count() + .get_result::(conn) + .unwrap_or(0) + } else { + 0 + }; + + let clicked = if channel == "email" { + email_tracking::table + .filter(email_tracking::campaign_id.eq_any(campaign_ids)) + .filter(email_tracking::clicked.eq(true)) + .count() + .get_result::(conn) + .unwrap_or(0) + } else { + 0 + }; + + breakdown.push(ChannelBreakdown { + channel: channel.to_string(), + recipients: total, + sent, + delivered, + opened, + clicked, + open_rate: calculate_open_rate(delivered, opened), + click_rate: calculate_click_rate(delivered, clicked), + }); + } + + Ok(breakdown) +} + +pub async fn get_campaign_metrics_api( + State(state): State>, + Path(campaign_id): Path, +) -> Result, (StatusCode, String)> { + match get_campaign_metrics(&state, campaign_id).await { + Ok(metrics) => Ok(Json(metrics)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +pub async fn get_campaign_channel_breakdown_api( + State(state): State>, + Path(campaign_id): Path, +) -> Result>, (StatusCode, String)> { + match get_campaign_metrics_by_channel(&state, campaign_id).await { + Ok(breakdown) => Ok(Json(breakdown)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +pub async fn get_campaign_timeseries_api( + State(state): State>, + Path((campaign_id, interval)): Path<(Uuid, i32)>, +) -> Result>, (StatusCode, String)> { + match get_time_series_metrics(&state, campaign_id, interval).await { + Ok(metrics) => Ok(Json(metrics)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +pub async fn get_aggregate_metrics_api( + State(state): State>, +) -> Result, (StatusCode, String)> { + let (org_id, bot_id) = get_default_context(&state); + + match get_aggregate_metrics(&state, org_id, bot_id).await { + Ok(metrics) => Ok(Json(metrics)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +fn get_default_context(state: &Arc) -> (Uuid, Uuid) { + let mut conn = match state.conn.get() { + Ok(c) => c, + Err(_) => return (Uuid::nil(), Uuid::nil()), + }; + + #[derive(QueryableByName)] + struct BotRow { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Nullable)] + org_id: Option, + } + + let bot = diesel::sql_query("SELECT id, org_id FROM bots LIMIT 1") + .get_result::(&mut conn) + .ok(); + + match bot { + Some(b) => (b.org_id.unwrap_or(Uuid::nil()), b.id), + None => (Uuid::nil(), Uuid::nil()), + } +} diff --git a/src/marketing/mod.rs b/src/marketing/mod.rs index 65d1808d..994d9ad7 100644 --- a/src/marketing/mod.rs +++ b/src/marketing/mod.rs @@ -2,8 +2,13 @@ pub mod campaigns; pub mod lists; pub mod templates; pub mod triggers; +pub mod email; +pub mod whatsapp; +pub mod metrics; +pub mod ai; use axum::{ + body::Body, extract::{Path, State}, http::{header, StatusCode}, response::Response, @@ -35,14 +40,14 @@ fn base64_decode(input: &str) -> Option> { -1, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, -1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, - 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, + 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, -1, -1, ]; let mut output = Vec::with_capacity(chars.len() * 3 / 4); let mut buf = [0u8; 4]; let mut count = 0; - for (i, &byte) in chars.iter().enumerate() { + for &byte in chars.iter() { if byte >= 128 { return None; } @@ -93,7 +98,7 @@ pub async fn track_email_open_pixel( } } - let mut response = Response::new(pixel); + let mut response = Response::new(Body::from(pixel)); response.headers_mut().insert( header::CONTENT_TYPE, "image/png".parse().unwrap(), @@ -131,7 +136,7 @@ pub async fn track_email_click( format!("/{}", destination) }; - let mut response = Response::new(""); + let mut response = Response::new(Body::empty()); *response.status_mut() = StatusCode::FOUND; response.headers_mut().insert( header::LOCATION, @@ -156,4 +161,15 @@ pub fn configure_marketing_routes() -> Router> { .route("/api/crm/email/track/open", post(triggers::track_email_open)) .route("/api/marketing/track/open/:token", get(track_email_open_pixel)) .route("/api/marketing/track/click/:id/*destination", get(track_email_click)) + .route("/api/crm/email/send", post(email::send_email_api)) + + .route("/api/crm/whatsapp/send", post(whatsapp::send_whatsapp_api)) + + .route("/api/crm/metrics/campaign/:id", get(metrics::get_campaign_metrics_api)) + .route("/api/crm/metrics/campaign/:id/channels", get(metrics::get_campaign_channel_breakdown_api)) + .route("/api/crm/metrics/campaign/:id/timeseries/:interval", get(metrics::get_campaign_timeseries_api)) + .route("/api/crm/metrics/aggregate", get(metrics::get_aggregate_metrics_api)) + + .route("/api/crm/ai/generate", post(ai::generate_content_api)) + .route("/api/crm/ai/personalize", post(ai::personalize_api)) } diff --git a/src/marketing/whatsapp.rs b/src/marketing/whatsapp.rs new file mode 100644 index 00000000..8dace03f --- /dev/null +++ b/src/marketing/whatsapp.rs @@ -0,0 +1,378 @@ +use axum::{ + extract::State, + http::StatusCode, + Json, +}; +use chrono::Utc; +use diesel::prelude::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use uuid::Uuid; + +use crate::core::bot::channels::whatsapp::WhatsAppAdapter; +use crate::core::bot::channels::ChannelAdapter; +use crate::core::shared::schema::{ + marketing_campaigns, marketing_recipients, +}; +use crate::core::shared::state::AppState; +use crate::marketing::campaigns::CrmCampaign; +use crate::core::shared::models::BotResponse; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppCampaignPayload { + pub to: String, + pub body: String, + pub media_url: Option, + pub campaign_id: Option, + pub recipient_id: Option, + pub template_name: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppSendResult { + pub success: bool, + pub message_id: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppTemplate { + pub name: String, + pub language: String, + pub components: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppTemplateComponent { + pub component_type: String, + pub parameters: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppTemplateParameter { + pub parameter_type: String, + pub text: Option, + pub media_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppBusinessConfig { + pub id: Uuid, + pub bot_id: Uuid, + pub phone_number_id: Option, + pub business_account_id: Option, + pub access_token: Option, + pub webhooks_verified: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WhatsAppMetrics { + pub total_sent: i64, + pub total_delivered: i64, + pub total_failed: i64, + pub total_read: i64, + pub delivery_rate: f64, + pub read_rate: f64, +} + +fn get_whatsapp_config( + state: &AppState, + bot_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + #[derive(QueryableByName)] + struct WhatsAppConfigRow { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Uuid)] + bot_id: Uuid, + #[diesel(sql_type = diesel::sql_types::Nullable)] + phone_number_id: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + business_account_id: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + access_token: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + webhooks_verified: Option, + } + + let config = diesel::sql_query("SELECT id, bot_id, phone_number_id, business_account_id, access_token, webhooks_verified FROM whatsapp_business WHERE bot_id = $1") + .bind::(bot_id) + .get_result::(&mut conn) + .map_err(|e| format!("WhatsApp config not found: {}", e))?; + + Ok(WhatsAppBusinessConfig { + id: config.id, + bot_id: config.bot_id, + phone_number_id: config.phone_number_id, + business_account_id: config.business_account_id, + access_token: config.access_token, + webhooks_verified: config.webhooks_verified.unwrap_or(false), + }) +} + +pub async fn send_whatsapp_message( + state: &Arc, + bot_id: Uuid, + payload: WhatsAppCampaignPayload, +) -> Result { + let config = get_whatsapp_config(state, bot_id)?; + + if config.phone_number_id.is_none() || config.access_token.is_none() { + return Err("WhatsApp not configured for this bot".to_string()); + } + + let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id); + + let result: Result> = if let Some(template_name) = payload.template_name { + adapter + .send_template_message( + &payload.to, + &template_name, + "pt_BR", + vec![], + ) + .await + } else if let Some(media_url) = &payload.media_url { + let media_type = if media_url.ends_with(".mp4") { + "video" + } else if media_url.ends_with(".png") || media_url.ends_with(".jpg") || media_url.ends_with(".jpeg") { + "image" + } else if media_url.ends_with(".pdf") { + "document" + } else { + "image" + }; + adapter + .send_media_message(&payload.to, media_url, media_type, Some(&payload.body)) + .await + .map(|_| "sent".to_string()) + } else { + let response = BotResponse::new( + bot_id.to_string(), + "marketing".to_string(), + payload.to.clone(), + payload.body.clone(), + "whatsapp".to_string(), + ); + adapter.send_message(response).await.map(|_| "sent".to_string()) + }; + + match result { + Ok(message_id) => { + if let Some(recipient_id) = payload.recipient_id { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + diesel::update( + marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)), + ) + .set(( + marketing_recipients::status.eq("sent"), + marketing_recipients::sent_at.eq(Some(Utc::now())), + marketing_recipients::response.eq(serde_json::json!({ "message_id": message_id })), + )) + .execute(&mut conn) + .ok(); + } + + Ok(WhatsAppSendResult { + success: true, + message_id: Some(message_id), + error: None, + }) + } + Err(send_err) => { + if let Some(recipient_id) = payload.recipient_id { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + diesel::update( + marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)), + ) + .set(( + marketing_recipients::status.eq("failed"), + marketing_recipients::failed_at.eq(Some(Utc::now())), + marketing_recipients::error_message.eq(Some(send_err.to_string())), + )) + .execute(&mut conn) + .ok(); + } + + Ok(WhatsAppSendResult { + success: false, + message_id: None, + error: Some(send_err.to_string()), + }) + } + } +} + +pub async fn send_bulk_whatsapp_messages( + state: &Arc, + campaign_id: Uuid, + contacts: Vec<(Uuid, String, String)>, +) -> Result<(i32, i32), String> { + let mut sent = 0; + let mut failed = 0; + + let campaign: CrmCampaign = marketing_campaigns::table + .filter(marketing_campaigns::id.eq(campaign_id)) + .first(&mut *state.conn.get().map_err(|e| format!("DB error: {}", e))?) + .map_err(|_| "Campaign not found")?; + + let body = campaign + .content_template + .get("body") + .and_then(|b| b.as_str()) + .unwrap_or("") + .to_string(); + + for (contact_id, phone, name) in contacts { + let personalized_body = body.replace("{{name}}", &name); + + let payload = WhatsAppCampaignPayload { + to: phone, + body: personalized_body, + media_url: campaign.content_template.get("media_url").and_then(|m| m.as_str()).map(String::from), + campaign_id: Some(campaign_id), + recipient_id: Some(contact_id), + template_name: None, + }; + + match send_whatsapp_message(state, campaign.bot_id, payload).await { + Ok(result) => { + if result.success { + sent += 1; + } else { + failed += 1; + log::error!("WhatsApp send failed: {:?}", result.error); + } + } + Err(e) => { + failed += 1; + log::error!("WhatsApp error: {}", e); + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + Ok((sent, failed)) +} + +pub async fn get_whatsapp_metrics( + state: &Arc, + campaign_id: Uuid, +) -> Result { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + let recipients: Vec<(String, Option)> = marketing_recipients::table + .filter(marketing_recipients::campaign_id.eq(campaign_id)) + .filter(marketing_recipients::channel.eq("whatsapp")) + .select((marketing_recipients::status, marketing_recipients::response)) + .load(&mut conn) + .map_err(|e| format!("Query error: {}", e))?; + + let total = recipients.len() as i64; + let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64; + let delivered = recipients + .iter() + .filter(|(_, r)| { + r.as_ref() + .and_then(|v| v.get("status")) + .and_then(|s| s.as_str()) + .map(|s| s == "delivered") + .unwrap_or(false) + }) + .count() as i64; + let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64; + let read = recipients + .iter() + .filter(|(_, r)| { + r.as_ref() + .and_then(|v| v.get("status")) + .and_then(|s| s.as_str()) + .map(|s| s == "read") + .unwrap_or(false) + }) + .count() as i64; + + Ok(WhatsAppMetrics { + total_sent: total, + total_delivered: delivered, + total_failed: failed, + total_read: read, + delivery_rate: if sent > 0 { (delivered as f64 / sent as f64) * 100.0 } else { 0.0 }, + read_rate: if delivered > 0 { (read as f64 / delivered as f64) * 100.0 } else { 0.0 }, + }) +} + +pub async fn handle_webhook_event( + state: &Arc, + payload: serde_json::Value, +) -> Result<(), String> { + let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; + + if let Some(statuses) = payload.get("entry").and_then(|e| e.as_array()) + .and_then(|e| e.first()) + .and_then(|e| e.get("changes")) + .and_then(|c| c.as_array()) + .and_then(|c| c.first()) + .and_then(|c| c.get("value")) + .and_then(|v| v.get("statuses")) + .and_then(|s| s.as_array()) + { + for status in statuses { + if let (Some(message_id), Some(status_str)) = ( + status.get("id").and_then(|m| m.as_str()), + status.get("status").and_then(|s| s.as_str()), + ) { + let delivered_at = if status_str == "delivered" { + Some(Utc::now()) + } else { + None + }; + + diesel::update(marketing_recipients::table.filter( + marketing_recipients::response + .eq(serde_json::json!({ "message_id": message_id })), + )) + .set(( + marketing_recipients::status.eq(status_str), + marketing_recipients::delivered_at.eq(delivered_at), + )) + .execute(&mut conn) + .ok(); + } + } + } + + Ok(()) +} + +#[derive(Debug, Deserialize)] +pub struct SendWhatsAppRequest { + pub to: String, + pub body: String, + pub media_url: Option, + pub template_name: Option, +} + +pub async fn send_whatsapp_api( + State(state): State>, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let bot_id = Uuid::nil(); + + let payload = WhatsAppCampaignPayload { + to: req.to, + body: req.body, + media_url: req.media_url, + campaign_id: None, + recipient_id: None, + template_name: req.template_name, + }; + + match send_whatsapp_message(&state, bot_id, payload).await { + Ok(result) => Ok(Json(result)), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/src/monitoring/real_time.rs b/src/monitoring/real_time.rs index 90f0f076..01e23341 100644 --- a/src/monitoring/real_time.rs +++ b/src/monitoring/real_time.rs @@ -6,6 +6,8 @@ use std::sync::Arc; use tokio::sync::{broadcast, RwLock}; use uuid::Uuid; +use crate::security::command_guard::SafeCommand; + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum MetricType { Counter, @@ -592,9 +594,9 @@ impl MetricsCollector { async fn collect_disk_usage(&self) -> f64 { #[cfg(target_os = "linux")] { - if let Ok(output) = std::process::Command::new("df") + if let Ok(output) = SafeCommand::new("df")? .args(["-h", "/"]) - .output() + .execute() { if let Ok(stdout) = String::from_utf8(output.stdout) { if let Some(line) = stdout.lines().nth(1) { diff --git a/src/security/command_guard.rs b/src/security/command_guard.rs index 8896232f..53bf3861 100644 --- a/src/security/command_guard.rs +++ b/src/security/command_guard.rs @@ -79,6 +79,14 @@ static ALLOWED_COMMANDS: LazyLock> = LazyLock::new(|| { "visudo", "id", "netsh", + // LLM local servers + "llama-server", + "ollama", + // Python + "python", + "python3", + "python3.11", + "python3.12", ]) }); @@ -111,6 +119,12 @@ impl std::fmt::Display for CommandGuardError { } } +impl From for String { + fn from(val: CommandGuardError) -> Self { + val.to_string() + } +} + impl std::error::Error for CommandGuardError {} pub struct SafeCommand { @@ -119,6 +133,8 @@ pub struct SafeCommand { working_dir: Option, allowed_paths: Vec, envs: HashMap, + stdout: Option, + stderr: Option, } impl SafeCommand { @@ -143,6 +159,8 @@ impl SafeCommand { std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")), ], envs: HashMap::new(), + stdout: None, + stderr: None, }) } @@ -257,6 +275,16 @@ impl SafeCommand { Ok(self) } + pub fn stdout(mut self, stdout: std::process::Stdio) -> Self { + self.stdout = Some(stdout); + self + } + + pub fn stderr(mut self, stderr: std::process::Stdio) -> Self { + self.stderr = Some(stderr); + self + } + pub fn execute(&self) -> Result { let mut cmd = std::process::Command::new(&self.command); cmd.args(&self.args); @@ -331,7 +359,7 @@ impl SafeCommand { .map_err(|e| CommandGuardError::ExecutionFailed(e.to_string())) } - pub fn spawn(&self) -> Result { + pub fn spawn(&mut self) -> Result { let mut cmd = std::process::Command::new(&self.command); cmd.args(&self.args); @@ -339,6 +367,14 @@ impl SafeCommand { cmd.current_dir(dir); } + if let Some(stdout) = self.stdout.take() { + cmd.stdout(stdout); + } + + if let Some(stderr) = self.stderr.take() { + cmd.stderr(stderr); + } + cmd.env_clear(); // Build PATH with standard locations plus botserver-stack/bin/shared diff --git a/src/security/rbac_middleware.rs b/src/security/rbac_middleware.rs index a449a3b9..1f30df4f 100644 --- a/src/security/rbac_middleware.rs +++ b/src/security/rbac_middleware.rs @@ -996,6 +996,24 @@ pub fn build_default_route_permissions() -> Vec { RoutePermission::new("/api/files/**", "PUT", ""), RoutePermission::new("/api/files/**", "DELETE", ""), + // Editor + RoutePermission::new("/api/editor/**", "GET", ""), + RoutePermission::new("/api/editor/**", "POST", ""), + RoutePermission::new("/api/editor/**", "PUT", ""), + RoutePermission::new("/api/editor/**", "DELETE", ""), + + // Database + RoutePermission::new("/api/database/**", "GET", ""), + RoutePermission::new("/api/database/**", "POST", ""), + RoutePermission::new("/api/database/**", "PUT", ""), + RoutePermission::new("/api/database/**", "DELETE", ""), + + // Git + RoutePermission::new("/api/git/**", "GET", ""), + RoutePermission::new("/api/git/**", "POST", ""), + RoutePermission::new("/api/git/**", "PUT", ""), + RoutePermission::new("/api/git/**", "DELETE", ""), + // Mail RoutePermission::new("/api/mail/**", "GET", ""), RoutePermission::new("/api/mail/**", "POST", ""),