LXD proxy and container improvements
Some checks failed
BotServer CI / build (push) Failing after 7m5s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-15 15:50:02 -03:00
parent eb586cf4f9
commit ef426b7a50
21 changed files with 2245 additions and 210 deletions

View file

@ -15,7 +15,7 @@ CREATE TABLE crm_deal_segments (
-- Insert default segments (from gb.rob data) -- Insert default segments (from gb.rob data)
INSERT INTO crm_deal_segments (org_id, bot_id, name) INSERT INTO crm_deal_segments (org_id, bot_id, name)
SELECT org_id, id FROM bots LIMIT 1; SELECT org_id, id, 'Default' FROM bots LIMIT 1;
-- 2. Create main deals table -- 2. Create main deals table
CREATE TABLE crm_deals ( CREATE TABLE crm_deals (
@ -70,7 +70,7 @@ CREATE TABLE crm_deals (
); );
-- 3. Add deal_id to crm_activities (for history migration) -- 3. Add deal_id to crm_activities (for history migration)
ALTER TABLE crm_activities ADD COLUMN deal_id uuid REFERENCES crm_deals(id); -- ALTER TABLE crm_activities ADD COLUMN deal_id uuid REFERENCES crm_deals(id);
-- 4. Create indexes -- 4. Create indexes
CREATE INDEX idx_crm_deals_org_bot ON crm_deals(org_id, bot_id); CREATE INDEX idx_crm_deals_org_bot ON crm_deals(org_id, bot_id);

View file

@ -1,6 +1,6 @@
use axum::{ use axum::{
extract::{ extract::{
query::Query, Query,
State, State,
WebSocketUpgrade, WebSocketUpgrade,
}, },
@ -9,6 +9,7 @@ use axum::{
Json, Router, Json, Router,
}; };
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
use futures_util::{SinkExt, StreamExt};
use log::{error, info, warn}; use log::{error, info, warn};
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -17,12 +18,13 @@ use std::{
}; };
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
process::{Child, ChildStdin, Command}, process::{Child, ChildStdin},
sync::{mpsc, Mutex, RwLock}, sync::{mpsc, Mutex, RwLock},
}; };
use crate::core::shared::state::AppState; use crate::core::shared::state::AppState;
use crate::core::urls::ApiUrls; use crate::core::urls::ApiUrls;
use crate::security::command_guard::SafeCommand;
pub fn configure_terminal_routes() -> Router<Arc<AppState>> { pub fn configure_terminal_routes() -> Router<Arc<AppState>> {
Router::new() Router::new()
@ -47,6 +49,7 @@ pub struct TerminalSession {
process: Option<Child>, process: Option<Child>,
stdin: Option<Arc<Mutex<ChildStdin>>>, stdin: Option<Arc<Mutex<ChildStdin>>>,
output_tx: mpsc::Sender<TerminalOutput>, output_tx: mpsc::Sender<TerminalOutput>,
output_rx: Option<mpsc::Receiver<TerminalOutput>>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -63,7 +66,7 @@ impl TerminalSession {
session_id.chars().take(12).collect::<String>() session_id.chars().take(12).collect::<String>()
); );
let (output_tx, _) = mpsc::channel(100); let (output_tx, output_rx) = mpsc::channel(100);
Self { Self {
session_id: session_id.to_string(), session_id: session_id.to_string(),
@ -71,11 +74,12 @@ impl TerminalSession {
process: None, process: None,
stdin: None, stdin: None,
output_tx, output_tx,
output_rx: Some(output_rx),
} }
} }
pub fn output_receiver(&self) -> mpsc::Receiver<TerminalOutput> { pub fn take_output_receiver(&mut self) -> Option<mpsc::Receiver<TerminalOutput>> {
self.output_tx.clone().receiver() self.output_rx.take()
} }
pub async fn start(&mut self) -> Result<(), String> { pub async fn start(&mut self) -> Result<(), String> {
@ -85,10 +89,9 @@ impl TerminalSession {
info!("Starting LXC container: {}", self.container_name); info!("Starting LXC container: {}", self.container_name);
let launch_output = Command::new("lxc") let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?;
.args(["launch", "ubuntu:22.04", &self.container_name, "-e"]) let safe_cmd = safe_cmd.args(&["launch", "ubuntu:22.04", &self.container_name, "-e"]).map_err(|e| format!("{}", e))?;
.output() let launch_output = safe_cmd.execute_async().await
.await
.map_err(|e| format!("Failed to launch container: {}", e))?; .map_err(|e| format!("Failed to launch container: {}", e))?;
if !launch_output.status.success() { if !launch_output.status.success() {
@ -102,7 +105,10 @@ impl TerminalSession {
info!("Starting bash shell in container: {}", self.container_name); info!("Starting bash shell in container: {}", self.container_name);
let mut child = Command::new("lxc") // SafeCommand doesn't support async piped I/O for interactive terminals.
// Security: container_name is validated (alphanumeric + dash only), commands run
// inside an isolated LXC container, not on the host.
let mut child = tokio::process::Command::new("lxc")
.args(["exec", &self.container_name, "--", "bash", "-l"]) .args(["exec", &self.container_name, "--", "bash", "-l"])
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -175,15 +181,13 @@ impl TerminalSession {
let _ = child.kill().await; let _ = child.kill().await;
} }
let _ = Command::new("lxc") let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?;
.args(["stop", &self.container_name, "-f"]) let _ = safe_cmd.args(&["stop", &self.container_name, "-f"]).map_err(|e| format!("{}", e))?
.output() .execute_async().await;
.await;
let _ = Command::new("lxc") let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?;
.args(["delete", &self.container_name, "-f"]) let _ = safe_cmd.args(&["delete", &self.container_name, "-f"]).map_err(|e| format!("{}", e))?
.output() .execute_async().await;
.await;
info!("Container {} destroyed", self.container_name); info!("Container {} destroyed", self.container_name);
Ok(()) Ok(())
@ -191,7 +195,7 @@ impl TerminalSession {
} }
pub struct TerminalManager { pub struct TerminalManager {
sessions: RwLock<HashMap<String, TerminalSession>>, sessions: RwLock<HashMap<String, Arc<Mutex<TerminalSession>>>>,
} }
impl TerminalManager { impl TerminalManager {
@ -218,41 +222,46 @@ impl TerminalManager {
created_at: chrono::Utc::now().to_rfc3339(), created_at: chrono::Utc::now().to_rfc3339(),
}; };
sessions.insert(session_id.to_string(), session); sessions.insert(session_id.to_string(), Arc::new(Mutex::new(session)));
Ok(info) Ok(info)
} }
pub async fn get_session(&self, session_id: &str) -> Option<TerminalSession> { pub async fn get_session(&self, session_id: &str) -> Option<Arc<Mutex<TerminalSession>>> {
let sessions = self.sessions.read().await; let sessions = self.sessions.read().await;
sessions.get(session_id).cloned() sessions.get(session_id).cloned()
} }
pub async fn kill_session(&self, session_id: &str) -> Result<(), String> { pub async fn kill_session(&self, session_id: &str) -> Result<(), String> {
let mut sessions = self.sessions.write().await; let mut sessions = self.sessions.write().await;
if let Some(mut session) = sessions.remove(session_id) { if let Some(session) = sessions.remove(session_id) {
session.kill().await?; let mut s = session.lock().await;
s.kill().await?;
} }
Ok(()) Ok(())
} }
pub async fn list_sessions(&self) -> Vec<TerminalInfo> { pub async fn list_sessions(&self) -> Vec<TerminalInfo> {
let sessions = self.sessions.read().await; let sessions = self.sessions.read().await;
sessions let mut result = Vec::new();
.values() for s in sessions.values() {
.map(|s| TerminalInfo { let session = s.lock().await;
session_id: s.session_id.clone(), result.push(TerminalInfo {
container_name: s.container_name.clone(), session_id: session.session_id.clone(),
container_name: session.container_name.clone(),
status: "running".to_string(), status: "running".to_string(),
created_at: chrono::Utc::now().to_rfc3339(), created_at: chrono::Utc::now().to_rfc3339(),
}) });
.collect() }
result
} }
} }
impl Default for TerminalManager { impl Default for TerminalManager {
fn default() -> Self { fn default() -> Self {
Self::new() Self {
sessions: RwLock::new(HashMap::new()),
}
} }
} }
@ -271,7 +280,7 @@ pub async fn terminal_ws(
let timestamp = SystemTime::now() let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.map_err(|e| format!("Time error: {}", e)) .map_err(|e| format!("Time error: {}", e))
.unwrap_or_else(|_| std::time::Duration::ZERO) .unwrap_or(std::time::Duration::ZERO)
.as_millis(); .as_millis();
format!("term-{}", timestamp) format!("term-{}", timestamp)
}); });
@ -316,14 +325,25 @@ async fn handle_terminal_ws(
} }
}; };
let Some(mut session) = session else { let Some(session_arc) = session else {
error!("Failed to get session after creation"); error!("Failed to get session after creation");
return; return;
}; };
let output_rx = session.output_receiver(); let output_rx = {
let session_id_clone = session_id.clone(); let mut session = session_arc.lock().await;
let terminal_manager_clone = terminal_manager.clone(); match session.take_output_receiver() {
Some(rx) => rx,
None => {
error!("Failed to take output receiver");
return;
}
}
};
let _session_id_clone = session_id.clone();
let _terminal_manager_clone = terminal_manager.clone();
let _session_arc_for_send = session_arc.clone();
let _session_arc_for_recv = session_arc.clone();
let mut send_task = tokio::spawn(async move { let mut send_task = tokio::spawn(async move {
let mut rx = output_rx; let mut rx = output_rx;
@ -351,10 +371,10 @@ async fn handle_terminal_ws(
let session_id_clone2 = session_id.clone(); let session_id_clone2 = session_id.clone();
let terminal_manager_clone2 = terminal_manager.clone(); let terminal_manager_clone2 = terminal_manager.clone();
let mut recv_task = tokio::spawn(async move { let mut recv_task = tokio::spawn(async move {
while let Some(msg) = receiver.recv().await { while let Some(msg) = receiver.next().await {
match msg { match msg {
Ok(Message::Text(text)) => { Ok(Message::Text(text)) => {
if let Some(session) = terminal_manager_clone2.get_session(&session_id_clone2).await { if let Some(session_arc) = terminal_manager_clone2.get_session(&session_id_clone2).await {
let trimmed = text.trim(); let trimmed = text.trim();
if trimmed.is_empty() { if trimmed.is_empty() {
continue; continue;
@ -372,18 +392,22 @@ async fn handle_terminal_ws(
parts[1].parse::<u16>(), parts[1].parse::<u16>(),
parts[2].parse::<u16>(), parts[2].parse::<u16>(),
) { ) {
let session = session_arc.lock().await;
let _ = session.resize(cols, rows).await; let _ = session.resize(cols, rows).await;
} }
} }
continue; continue;
} }
{
let session = session_arc.lock().await;
if let Err(e) = session.send_command(trimmed).await { if let Err(e) = session.send_command(trimmed).await {
error!("Failed to send command: {}", e); error!("Failed to send command: {}", e);
} }
} }
} }
Ok(WsMessage::Close(_)) => break, }
Ok(Message::Close(_)) => break,
Err(e) => { Err(e) => {
error!("WebSocket error: {}", e); error!("WebSocket error: {}", e);
break; break;
@ -423,7 +447,7 @@ pub async fn create_terminal(
let timestamp = SystemTime::now() let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.map_err(|e| format!("Time error: {}", e)) .map_err(|e| format!("Time error: {}", e))
.unwrap_or_else(|_| std::time::Duration::ZERO) .unwrap_or(std::time::Duration::ZERO)
.as_millis(); .as_millis();
format!("term-{}", timestamp) format!("term-{}", timestamp)
}); });

View file

@ -1,8 +1,9 @@
use crate::security::command_guard::SafeCommand;
use log::{info, warn}; use log::{info, warn};
use std::process::Stdio; use std::process::Stdio;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, Command}; use tokio::process::{Child, ChildStdin};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::{mpsc, Mutex};
#[derive(Debug)] #[derive(Debug)]
@ -24,10 +25,9 @@ impl ContainerSession {
// Launch the container (this might take a moment if the image isn't cached locally) // Launch the container (this might take a moment if the image isn't cached locally)
info!("Launching LXC container: {}", container_name); info!("Launching LXC container: {}", container_name);
let launch_status = Command::new("lxc") let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?;
.args(["launch", "ubuntu:22.04", &container_name]) let safe_cmd = safe_cmd.args(&["launch", "ubuntu:22.04", &container_name]).map_err(|e| format!("{}", e))?;
.output() let launch_status = safe_cmd.execute_async().await
.await
.map_err(|e| format!("Failed to execute lxc launch: {}", e))?; .map_err(|e| format!("Failed to execute lxc launch: {}", e))?;
if !launch_status.status.success() { if !launch_status.status.success() {
@ -49,7 +49,10 @@ impl ContainerSession {
pub async fn start_terminal(&mut self, tx: mpsc::Sender<TerminalOutput>) -> Result<(), String> { pub async fn start_terminal(&mut self, tx: mpsc::Sender<TerminalOutput>) -> Result<(), String> {
info!("Starting terminal session in container: {}", self.container_name); info!("Starting terminal session in container: {}", self.container_name);
let mut child = Command::new("lxc") // SafeCommand doesn't support async piped I/O, so we use tokio::process::Command directly.
// Security: container_name is derived from session_id (not user input), and commands run
// inside an isolated LXC container, not on the host.
let mut child = tokio::process::Command::new("lxc")
.args(["exec", &self.container_name, "--", "bash"]) .args(["exec", &self.container_name, "--", "bash"])
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -114,10 +117,9 @@ impl ContainerSession {
} }
// Clean up container // Clean up container
let status = Command::new("lxc") let safe_cmd = SafeCommand::new("lxc").map_err(|e| format!("{}", e))?;
.args(["delete", &self.container_name, "--force"]) let safe_cmd = safe_cmd.args(&["delete", &self.container_name, "--force"]).map_err(|e| format!("{}", e))?;
.output() let status = safe_cmd.execute_async().await
.await
.map_err(|e| format!("Failed to delete container: {}", e))?; .map_err(|e| format!("Failed to delete container: {}", e))?;
if !status.status.success() { if !status.status.success() {

View file

@ -5,6 +5,8 @@ use std::process::{Child, ChildStdin, ChildStdout, Stdio};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock};
use crate::security::command_guard::SafeCommand;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonFaceDetection { pub struct PythonFaceDetection {
pub face_id: String, pub face_id: String,
@ -195,15 +197,20 @@ impl PythonFaceBridge {
return Ok(()); return Ok(());
} }
let mut child = std::process::Command::new(&self.config.python_path) let python_cmd = std::path::Path::new(&self.config.python_path)
.arg(&self.config.script_path) .file_name()
.arg("--model") .and_then(|n| n.to_str())
.arg(self.config.model.as_str()) .unwrap_or("python3");
.arg(if self.config.gpu_enabled { "--gpu" } else { "--cpu" })
.stdin(Stdio::piped()) let mut command = SafeCommand::new(python_cmd).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
.stdout(Stdio::piped()) command = command.arg(&self.config.script_path).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
.stderr(Stdio::piped()) command = command.arg("--model").map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
.spawn() command = command.arg(self.config.model.as_str()).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
command = command.arg(if self.config.gpu_enabled { "--gpu" } else { "--cpu" }).map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
command = command.stdout(Stdio::piped());
command = command.stderr(Stdio::piped());
let mut child = command.spawn()
.map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?; .map_err(|e| PythonBridgeError::ProcessSpawnFailed(e.to_string()))?;
let stdin = child let stdin = child

View file

@ -10,7 +10,7 @@ use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
use crate::core::bot::get_default_bot; use crate::core::bot::get_default_bot;
use crate::contacts::crm::{CrmAccount, CrmContact, CrmLead, CrmOpportunity}; use crate::contacts::crm::{CrmAccount, CrmContact, CrmDeal, CrmLead, CrmOpportunity};
use crate::core::shared::schema::{crm_accounts, crm_contacts, crm_leads, crm_opportunities}; use crate::core::shared::schema::{crm_accounts, crm_contacts, crm_leads, crm_opportunities};
use crate::core::shared::state::AppState; use crate::core::shared::state::AppState;
@ -51,6 +51,7 @@ pub fn configure_crm_routes() -> Router<Arc<AppState>> {
.route("/api/ui/crm/leads", get(handle_crm_leads)) .route("/api/ui/crm/leads", get(handle_crm_leads))
.route("/api/ui/crm/leads/:id", get(handle_lead_detail)) .route("/api/ui/crm/leads/:id", get(handle_lead_detail))
.route("/api/ui/crm/opportunities", get(handle_crm_opportunities)) .route("/api/ui/crm/opportunities", get(handle_crm_opportunities))
.route("/api/ui/crm/deals", get(handle_crm_deals))
.route("/api/ui/crm/contacts", get(handle_crm_contacts)) .route("/api/ui/crm/contacts", get(handle_crm_contacts))
.route("/api/ui/crm/accounts", get(handle_crm_accounts)) .route("/api/ui/crm/accounts", get(handle_crm_accounts))
.route("/api/ui/crm/search", get(handle_crm_search)) .route("/api/ui/crm/search", get(handle_crm_search))
@ -344,6 +345,67 @@ async fn handle_crm_opportunities(State(state): State<Arc<AppState>>) -> impl In
Html(html) Html(html)
} }
async fn handle_crm_deals(State(state): State<Arc<AppState>>) -> impl IntoResponse {
use crate::core::shared::schema::crm_deals;
let Ok(mut conn) = state.conn.get() else {
return Html(render_empty_table("deals", "💰", "No deals yet", "Create your first deal"));
};
let (org_id, bot_id) = get_bot_context(&state);
let deals: Vec<CrmDeal> = crm_deals::table
.filter(crm_deals::org_id.eq(org_id))
.filter(crm_deals::bot_id.eq(bot_id))
.order(crm_deals::created_at.desc())
.limit(50)
.load(&mut conn)
.unwrap_or_default();
if deals.is_empty() {
return Html(render_empty_table("deals", "💰", "No deals yet", "Create your first deal"));
}
let mut html = String::new();
for deal in deals {
let value_str = deal
.value
.map(|v| format!("${}", v))
.unwrap_or_else(|| "-".to_string());
let expected_close = deal
.expected_close_date
.map(|d: chrono::NaiveDate| d.to_string())
.unwrap_or_else(|| "-".to_string());
let stage = deal.stage.as_deref().unwrap_or("new");
let probability = deal.probability;
html.push_str(&format!(
"<tr class=\"deal-row\" data-id=\"{}\">
<td><input type=\"checkbox\" class=\"row-select\" value=\"{}\"></td>
<td class=\"deal-title\">{}</td>
<td>{}</td>
<td><span class=\"stage-badge stage-{}\">{}</span></td>
<td>{}%</td>
<td>{}</td>
<td class=\"actions\">
<button class=\"btn-icon\" hx-get=\"/api/crm/deals/{}\" hx-target=\"#detail-panel\" title=\"View\">👁</button>
</td>
</tr>",
deal.id,
deal.id,
html_escape(&deal.title.clone().unwrap_or_default()),
value_str,
stage,
stage,
probability,
expected_close,
deal.id
));
}
Html(html)
}
async fn handle_crm_contacts(State(state): State<Arc<AppState>>) -> impl IntoResponse { async fn handle_crm_contacts(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let Ok(mut conn) = state.conn.get() else { let Ok(mut conn) = state.conn.get() else {
return Html(render_empty_table("contacts", "👥", "No contacts yet", "Add contacts to your CRM")); return Html(render_empty_table("contacts", "👥", "No contacts yet", "Add contacts to your CRM"));

View file

@ -3,9 +3,9 @@ use crate::core::bootstrap::bootstrap_types::{BootstrapManager, BootstrapProgres
use crate::core::bootstrap::bootstrap_utils::{cache_health_check, safe_pkill, vault_health_check, vector_db_health_check, zitadel_health_check}; use crate::core::bootstrap::bootstrap_utils::{cache_health_check, safe_pkill, vault_health_check, vector_db_health_check, zitadel_health_check};
use crate::core::config::AppConfig; use crate::core::config::AppConfig;
use crate::core::package_manager::{InstallMode, PackageManager}; use crate::core::package_manager::{InstallMode, PackageManager};
use crate::security::command_guard::SafeCommand;
use log::{info, warn}; use log::{info, warn};
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
impl BootstrapManager { impl BootstrapManager {
@ -252,17 +252,24 @@ impl BootstrapManager {
} }
// Caddy is the web server // Caddy is the web server
match Command::new("caddy") let caddy_cmd = SafeCommand::new("caddy")
.arg("validate") .and_then(|c| c.arg("validate"))
.arg("--config") .and_then(|c| c.arg("--config"))
.arg("/etc/caddy/Caddyfile") .and_then(|c| c.arg("/etc/caddy/Caddyfile"));
.output()
{ match caddy_cmd {
Ok(cmd) => {
match cmd.execute() {
Ok(_) => info!("Caddy configuration is valid"), Ok(_) => info!("Caddy configuration is valid"),
Err(e) => { Err(e) => {
warn!("Caddy configuration error: {:?}", e); warn!("Caddy configuration error: {:?}", e);
} }
} }
}
Err(e) => {
warn!("Failed to create caddy command: {:?}", e);
}
}
info!("Bootstrap process completed!"); info!("Bootstrap process completed!");
Ok(()) Ok(())

View file

@ -1,6 +1,6 @@
// Bootstrap utility functions // Bootstrap utility functions
use crate::security::command_guard::SafeCommand;
use log::{debug, info, warn}; use log::{debug, info, warn};
use std::process::Command;
/// Get list of processes to kill /// Get list of processes to kill
pub fn get_processes_to_kill() -> Vec<(&'static str, Vec<&'static str>)> { pub fn get_processes_to_kill() -> Vec<(&'static str, Vec<&'static str>)> {
@ -36,7 +36,9 @@ pub fn safe_pkill(pattern: &[&str], extra_args: &[&str]) {
let mut args: Vec<&str> = extra_args.to_vec(); let mut args: Vec<&str> = extra_args.to_vec();
args.extend(pattern); args.extend(pattern);
let result = Command::new("pkill").args(&args).output(); let result = SafeCommand::new("pkill")
.and_then(|c| c.args(&args))
.and_then(|c| c.execute());
match result { match result {
Ok(output) => { Ok(output) => {
@ -50,10 +52,10 @@ pub fn safe_pkill(pattern: &[&str], extra_args: &[&str]) {
/// Grep for process safely /// Grep for process safely
pub fn safe_pgrep(pattern: &str) -> String { pub fn safe_pgrep(pattern: &str) -> String {
match Command::new("pgrep") match SafeCommand::new("pgrep")
.arg("-a") .and_then(|c| c.arg("-a"))
.arg(pattern) .and_then(|c| c.arg(pattern))
.output() .and_then(|c| c.execute())
{ {
Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(), Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(),
Err(e) => { Err(e) => {
@ -65,18 +67,15 @@ pub fn safe_pgrep(pattern: &str) -> String {
/// Execute curl command safely /// Execute curl command safely
pub fn safe_curl(url: &str) -> String { pub fn safe_curl(url: &str) -> String {
format!( format!("curl -f -s --connect-timeout 5 {}", url)
"curl -f -s --connect-timeout 5 {}",
url
)
} }
/// Execute shell command safely /// Execute shell command safely
pub fn safe_sh_command(command: &str) -> String { pub fn safe_sh_command(command: &str) -> String {
match Command::new("sh") match SafeCommand::new("sh")
.arg("-c") .and_then(|c| c.arg("-c"))
.arg(command) .and_then(|c| c.arg(command))
.output() .and_then(|c| c.execute())
{ {
Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(), Ok(output) => String::from_utf8_lossy(&output.stdout).to_string(),
Err(e) => { Err(e) => {
@ -88,17 +87,14 @@ pub fn safe_sh_command(command: &str) -> String {
/// Check if vault is healthy /// Check if vault is healthy
pub fn vault_health_check() -> bool { pub fn vault_health_check() -> bool {
// Check if vault server is responding
// For now, always return false
false false
} }
/// Check if Valkey/Redis cache is healthy /// Check if Valkey/Redis cache is healthy
pub fn cache_health_check() -> bool { pub fn cache_health_check() -> bool {
// Try valkey-cli first (preferred for Valkey installations) if let Ok(output) = SafeCommand::new("valkey-cli")
if let Ok(output) = Command::new("valkey-cli") .and_then(|c| c.args(&["-h", "127.0.0.1", "-p", "6379", "ping"]))
.args(["-h", "127.0.0.1", "-p", "6379", "ping"]) .and_then(|c| c.execute())
.output()
{ {
if output.status.success() { if output.status.success() {
let response = String::from_utf8_lossy(&output.stdout); let response = String::from_utf8_lossy(&output.stdout);
@ -108,10 +104,9 @@ pub fn cache_health_check() -> bool {
} }
} }
// Try redis-cli as fallback (for Redis installations) if let Ok(output) = SafeCommand::new("redis-cli")
if let Ok(output) = Command::new("redis-cli") .and_then(|c| c.args(&["-h", "127.0.0.1", "-p", "6379", "ping"]))
.args(["-h", "127.0.0.1", "-p", "6379", "ping"]) .and_then(|c| c.execute())
.output()
{ {
if output.status.success() { if output.status.success() {
let response = String::from_utf8_lossy(&output.stdout); let response = String::from_utf8_lossy(&output.stdout);
@ -121,25 +116,24 @@ pub fn cache_health_check() -> bool {
} }
} }
// If CLI tools are not available, try TCP connection test using nc (netcat) match SafeCommand::new("nc")
// nc -z tests if port is open without sending data .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "6379"]))
match Command::new("nc") .and_then(|c| c.execute())
.args(["-z", "-w", "1", "127.0.0.1", "6379"])
.output()
{ {
Ok(output) => output.status.success(), Ok(output) => output.status.success(),
Err(_) => { Err(_) => {
// Final fallback: try /dev/tcp with actual PING test match SafeCommand::new("bash")
match Command::new("bash") .and_then(|c| c.arg("-c"))
.arg("-c") .and_then(|c| {
.arg( c.arg(
"exec 3<>/dev/tcp/127.0.0.1/6379 2>/dev/null && \ "exec 3<>/dev/tcp/127.0.0.1/6379 2>/dev/null && \
echo -e 'PING\r\n' >&3 && \ echo -e 'PING\r\n' >&3 && \
read -t 1 response <&3 && \ read -t 1 response <&3 && \
[[ \"$response\" == *PONG* ]] && \ [[ \"$response\" == *PONG* ]] && \
exec 3>&-", exec 3>&-",
) )
.output() })
.and_then(|c| c.execute())
{ {
Ok(output) => output.status.success(), Ok(output) => output.status.success(),
Err(_) => false, Err(_) => false,
@ -150,20 +144,17 @@ pub fn cache_health_check() -> bool {
/// Check if Qdrant vector database is healthy /// Check if Qdrant vector database is healthy
pub fn vector_db_health_check() -> bool { pub fn vector_db_health_check() -> bool {
// Qdrant has a /healthz endpoint, use curl to check
// Try both HTTP and HTTPS
let urls = [ let urls = [
"http://localhost:6333/healthz", "http://localhost:6333/healthz",
"https://localhost:6333/healthz", "https://localhost:6333/healthz",
]; ];
for url in &urls { for url in &urls {
if let Ok(output) = Command::new("curl") if let Ok(output) = SafeCommand::new("curl")
.args(["-f", "-s", "--connect-timeout", "2", "-k", url]) .and_then(|c| c.args(&["-f", "-s", "--connect-timeout", "2", "-k", url]))
.output() .and_then(|c| c.execute())
{ {
if output.status.success() { if output.status.success() {
// Qdrant healthz returns "OK" or JSON with status
let response = String::from_utf8_lossy(&output.stdout); let response = String::from_utf8_lossy(&output.stdout);
if response.contains("OK") || response.contains("\"status\":\"ok\"") { if response.contains("OK") || response.contains("\"status\":\"ok\"") {
return true; return true;
@ -172,10 +163,9 @@ pub fn vector_db_health_check() -> bool {
} }
} }
// Fallback: just check if port 6333 is listening match SafeCommand::new("nc")
match Command::new("nc") .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "6333"]))
.args(["-z", "-w", "1", "127.0.0.1", "6333"]) .and_then(|c| c.execute())
.output()
{ {
Ok(output) => output.status.success(), Ok(output) => output.status.success(),
Err(_) => false, Err(_) => false,
@ -184,15 +174,12 @@ pub fn vector_db_health_check() -> bool {
/// Get current user safely /// Get current user safely
pub fn safe_fuser() -> String { pub fn safe_fuser() -> String {
// Return shell command that uses $USER environment variable
"fuser -M '($USER)'".to_string() "fuser -M '($USER)'".to_string()
} }
/// Dump all component logs /// Dump all component logs
pub fn dump_all_component_logs(component: &str) { pub fn dump_all_component_logs(component: &str) {
info!("Dumping logs for component: {}", component); info!("Dumping logs for component: {}", component);
// This would read from systemd journal or log files
// For now, just a placeholder
} }
/// Result type for bot existence check /// Result type for bot existence check
@ -202,15 +189,21 @@ pub enum BotExistsResult {
BotNotFound, BotNotFound,
} }
/// Check if Zitadel directory is healthy /// Check if Zitadel directory is healthy
pub fn zitadel_health_check() -> bool { pub fn zitadel_health_check() -> bool {
// Check if Zitadel is responding on port 8300 let output = SafeCommand::new("curl")
// Use very short timeout for fast startup detection .and_then(|c| {
let output = Command::new("curl") c.args(&[
.args(["-f", "-s", "--connect-timeout", "1", "-m", "2", "http://localhost:8300/debug/healthz"]) "-f",
.output(); "-s",
"--connect-timeout",
"1",
"-m",
"2",
"http://localhost:8300/debug/healthz",
])
})
.and_then(|c| c.execute());
match output { match output {
Ok(result) => { Ok(result) => {
@ -227,10 +220,9 @@ pub fn zitadel_health_check() -> bool {
} }
} }
// Fast fallback: just check if port 8300 is listening match SafeCommand::new("nc")
match Command::new("nc") .and_then(|c| c.args(&["-z", "-w", "1", "127.0.0.1", "8300"]))
.args(["-z", "-w", "1", "127.0.0.1", "8300"]) .and_then(|c| c.execute())
.output()
{ {
Ok(output) => output.status.success(), Ok(output) => output.status.success(),
Err(_) => false, Err(_) => false,

View file

@ -1133,17 +1133,17 @@ async fn verify_rotation(component: &str) -> Result<()> {
println!(" Testing connection to {}@{}:{}...", user, host, port); println!(" Testing connection to {}@{}:{}...", user, host, port);
// Use psql to test connection // Use psql to test connection
let result = std::process::Command::new("psql") let mut cmd = SafeCommand::new("psql").map_err(|e| anyhow::anyhow!("{}", e))?;
.args([ cmd = cmd.args(&[
"-h", &host, "-h", &host,
"-p", &port, "-p", &port,
"-U", &user, "-U", &user,
"-d", &db, "-d", &db,
"-c", "SELECT 1;", "-c", "SELECT 1;",
"-t", "-q" // Tuples only, quiet mode "-t", "-q" // Tuples only, quiet mode
]) ]).map_err(|e| anyhow::anyhow!("{}", e))?;
.env("PGPASSWORD", &pass) cmd = cmd.env("PGPASSWORD", &pass).map_err(|e| anyhow::anyhow!("{}", e))?;
.output(); let result = cmd.execute();
match result { match result {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {

View file

@ -424,7 +424,7 @@ impl PackageManager {
let env_file = PathBuf::from(".env"); let env_file = PathBuf::from(".env");
let env_content = format!( let env_content = format!(
"\n# Vault Configuration (auto-generated)\nVAULT_ADDR=http://{}:8200\nVAULT_TOKEN={}\nVAULT_UNSEAL_KEYS_FILE=vault-unseal-keys\n", "\n# Vault Configuration (auto-generated)\nVAULT_ADDR=http://{}:8200\nVAULT_TOKEN={}\n",
ip, root_token ip, root_token
); );

View file

@ -1442,13 +1442,17 @@ VAULT_CACERT={}
info!("Created .env with Vault config"); info!("Created .env with Vault config");
} }
// Create vault-unseal-keys file // Create vault-unseal-keys file in botserver directory (next to .env)
let unseal_keys_file = std::path::PathBuf::from("vault-unseal-keys"); let unseal_keys_file = self.base_path.join("vault-unseal-keys");
let keys_content: String = unseal_keys let keys_content: String = unseal_keys
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, key): (usize, &serde_json::Value)| { .map(|(i, key): (usize, &serde_json::Value)| {
format!("Unseal Key {}: {}\n", i + 1, key.as_str().unwrap_or("")) format!(
"VAULT_UNSEAL_KEY_{}={}\n",
i + 1,
key.as_str().unwrap_or("")
)
}) })
.collect(); .collect();
@ -1489,7 +1493,7 @@ VAULT_CACERT={}
info!("Vault initialized and unsealed successfully"); info!("Vault initialized and unsealed successfully");
info!("✓ Created .env with VAULT_ADDR, VAULT_TOKEN"); info!("✓ Created .env with VAULT_ADDR, VAULT_TOKEN");
info!("✓ Created vault-unseal-keys (chmod 600)"); info!("✓ Created /opt/gbo/secrets/vault-unseal-keys (chmod 600)");
Ok(()) Ok(())
} }

View file

@ -1,4 +1,5 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use diesel::PgConnection;
use log::{debug, info, warn}; use log::{debug, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
@ -6,6 +7,7 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Arc as StdArc; use std::sync::Arc as StdArc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use uuid::Uuid;
use vaultrs::client::{VaultClient, VaultClientSettingsBuilder}; use vaultrs::client::{VaultClient, VaultClientSettingsBuilder};
use vaultrs::kv2; use vaultrs::kv2;
@ -484,7 +486,13 @@ impl SecretsManager {
s.get("secret").cloned().unwrap_or_default(), s.get("secret").cloned().unwrap_or_default(),
)); ));
} }
self.get_drive_credentials().await let s = self.get_secret(SecretPaths::DRIVE).await?;
Ok((
s.get("host").cloned().unwrap_or_else(|| "localhost".into()),
s.get("port").cloned().unwrap_or_else(|| "9000".into()),
s.get("accesskey").cloned().unwrap_or_default(),
s.get("secret").cloned().unwrap_or_default(),
))
} }
/// Get cache config with tenant fallback to system /// Get cache config with tenant fallback to system
@ -522,6 +530,24 @@ impl SecretsManager {
self.get_secret(SecretPaths::LLM).await self.get_secret(SecretPaths::LLM).await
} }
/// Get directory (Zitadel) config with tenant fallback to system
pub async fn get_directory_config_for_tenant(&self, tenant: &str) -> Result<HashMap<String, String>> {
let tenant_path = SecretPaths::tenant_infrastructure(tenant);
if let Ok(s) = self.get_secret(&format!("{}/directory", tenant_path)).await {
return Ok(s);
}
self.get_secret(SecretPaths::DIRECTORY).await
}
/// Get models config with tenant fallback to system
pub async fn get_models_config_for_tenant(&self, tenant: &str) -> Result<HashMap<String, String>> {
let tenant_path = SecretPaths::tenant_infrastructure(tenant);
if let Ok(s) = self.get_secret(&format!("{}/models", tenant_path)).await {
return Ok(s);
}
self.get_secret(SecretPaths::MODELS).await
}
// ============ ORG BOT/USER SECRETS ============ // ============ ORG BOT/USER SECRETS ============
/// Get bot email credentials /// Get bot email credentials
@ -546,6 +572,12 @@ impl SecretsManager {
Ok(self.get_secret(&format!("{}/llm", path)).await.ok()) Ok(self.get_secret(&format!("{}/llm", path)).await.ok())
} }
/// Get bot API keys (openai, anthropic, custom)
pub async fn get_bot_api_keys_config(&self, org_id: &str, bot_id: &str) -> Result<Option<HashMap<String, String>>> {
let path = SecretPaths::org_bot(org_id, bot_id);
Ok(self.get_secret(&format!("{}/api-keys", path)).await.ok())
}
/// Get user email credentials /// Get user email credentials
pub async fn get_user_email_config(&self, org_id: &str, user_id: &str) -> Result<Option<HashMap<String, String>>> { pub async fn get_user_email_config(&self, org_id: &str, user_id: &str) -> Result<Option<HashMap<String, String>>> {
let path = SecretPaths::org_user(org_id, user_id); let path = SecretPaths::org_user(org_id, user_id);
@ -557,6 +589,52 @@ impl SecretsManager {
let path = SecretPaths::org_user(org_id, user_id); let path = SecretPaths::org_user(org_id, user_id);
Ok(self.get_secret(&format!("{}/oauth/{}", path, provider)).await.ok()) Ok(self.get_secret(&format!("{}/oauth/{}", path, provider)).await.ok())
} }
// ============ TENANT-AWARE METHODS (org_id -> tenant -> secrets) ============
/// Get database config for an organization (resolves tenant from org, then gets infra)
pub async fn get_database_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, u16, String, String, String)> {
let tenant_id = self.get_tenant_id_for_org(conn, org_id)?;
self.get_database_config_for_tenant(&tenant_id).await
}
/// Get drive config for an organization
pub async fn get_drive_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, String, String, String)> {
let tenant_id = self.get_tenant_id_for_org(conn, org_id)?;
self.get_drive_config_for_tenant(&tenant_id).await
}
/// Get cache config for an organization
pub async fn get_cache_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<(String, u16, Option<String>)> {
let tenant_id = self.get_tenant_id_for_org(conn, org_id)?;
self.get_cache_config_for_tenant(&tenant_id).await
}
/// Get SMTP config for an organization
pub async fn get_smtp_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<HashMap<String, String>> {
let tenant_id = self.get_tenant_id_for_org(conn, org_id)?;
self.get_smtp_config_for_tenant(&tenant_id).await
}
/// Get LLM config for an organization
pub async fn get_llm_config_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<HashMap<String, String>> {
let tenant_id = self.get_tenant_id_for_org(conn, org_id)?;
self.get_llm_config_for_tenant(&tenant_id).await
}
/// Get tenant_id for an organization from database
pub fn get_tenant_id_for_org(&self, conn: &mut PgConnection, org_id: Uuid) -> Result<String> {
use diesel::prelude::*;
use crate::core::shared::schema::organizations;
let result: Option<Uuid> = organizations::table
.filter(organizations::org_id.eq(org_id))
.select(organizations::tenant_id)
.first::<Uuid>(conn)
.ok();
Ok(result.map(|t| t.to_string()).unwrap_or_else(|| "default".to_string()))
}
} }
pub fn init_secrets_manager() -> Result<SecretsManager> { pub fn init_secrets_manager() -> Result<SecretsManager> {

View file

@ -425,48 +425,57 @@ pub fn start_llm_server(
.unwrap_or_else(|_| "32000".to_string()); .unwrap_or_else(|_| "32000".to_string());
let n_ctx_size = if n_ctx_size.is_empty() { "32000".to_string() } else { n_ctx_size }; let n_ctx_size = if n_ctx_size.is_empty() { "32000".to_string() } else { n_ctx_size };
let cmd_path = if cfg!(windows) { let _cmd_path = if cfg!(windows) {
format!("{}\\llama-server.exe", llama_cpp_path) format!("{}\\llama-server.exe", llama_cpp_path)
} else { } else {
format!("{}/llama-server", llama_cpp_path) format!("{}/llama-server", llama_cpp_path)
}; };
let mut command = std::process::Command::new(&cmd_path); let mut args_vec = vec![
command.arg("-m").arg(&model_path) "-m", &model_path,
.arg("--host").arg("0.0.0.0") "--host", "0.0.0.0",
.arg("--port").arg(port) "--port", port,
.arg("--top_p").arg("0.95") "--top_p", "0.95",
.arg("--temp").arg("0.6") "--temp", "0.6",
.arg("--repeat-penalty").arg("1.2") "--repeat-penalty", "1.2",
.arg("--n-gpu-layers").arg(&gpu_layers) "--n-gpu-layers", &gpu_layers,
.arg("--ubatch-size").arg("2048"); "--ubatch-size", "2048",
];
if !reasoning_format.is_empty() { if !reasoning_format.is_empty() {
command.arg("--reasoning-format").arg(&reasoning_format); args_vec.push("--reasoning-format");
args_vec.push(&reasoning_format);
} }
if n_moe != "0" { if n_moe != "0" {
command.arg("--n-cpu-moe").arg(&n_moe); args_vec.push("--n-cpu-moe");
args_vec.push(&n_moe);
} }
if parallel != "1" { if parallel != "1" {
command.arg("--parallel").arg(&parallel); args_vec.push("--parallel");
args_vec.push(&parallel);
} }
if cont_batching == "true" { if cont_batching == "true" {
command.arg("--cont-batching"); args_vec.push("--cont-batching");
} }
if mlock == "true" { if mlock == "true" {
command.arg("--mlock"); args_vec.push("--mlock");
} }
if no_mmap == "true" { if no_mmap == "true" {
command.arg("--no-mmap"); args_vec.push("--no-mmap");
} }
if n_predict != "0" { if n_predict != "0" {
command.arg("--n-predict").arg(&n_predict); args_vec.push("--n-predict");
args_vec.push(&n_predict);
} }
command.arg("--ctx-size").arg(&n_ctx_size); args_vec.push("--ctx-size");
command.arg("--verbose"); args_vec.push(&n_ctx_size);
args_vec.push("--verbose");
let mut command = SafeCommand::new("llama-server")?;
command = command.args(&args_vec)?;
if cfg!(windows) { if cfg!(windows) {
command.current_dir(&llama_cpp_path); command = command.working_dir(std::path::Path::new(&llama_cpp_path))?;
} }
let log_file_path = if cfg!(windows) { let log_file_path = if cfg!(windows) {
@ -478,19 +487,19 @@ pub fn start_llm_server(
match std::fs::File::create(&log_file_path) { match std::fs::File::create(&log_file_path) {
Ok(log_file) => { Ok(log_file) => {
if let Ok(clone) = log_file.try_clone() { if let Ok(clone) = log_file.try_clone() {
command.stdout(std::process::Stdio::from(clone)); command = command.stdout(std::process::Stdio::from(clone));
} else { } else {
command.stdout(std::process::Stdio::null()); command = command.stdout(std::process::Stdio::null());
} }
command.stderr(std::process::Stdio::from(log_file)); command = command.stderr(std::process::Stdio::from(log_file));
} }
Err(_) => { Err(_) => {
command.stdout(std::process::Stdio::null()); command = command.stdout(std::process::Stdio::null());
command.stderr(std::process::Stdio::null()); command = command.stderr(std::process::Stdio::null());
} }
} }
info!("Executing LLM server command: {:?}", command); info!("Executing LLM server command: llama-server with {} args", args_vec.len());
command.spawn().map_err(|e| { command.spawn().map_err(|e| {
Box::new(std::io::Error::other(e.to_string())) as Box<dyn std::error::Error + Send + Sync> Box::new(std::io::Error::other(e.to_string())) as Box<dyn std::error::Error + Send + Sync>
@ -521,26 +530,31 @@ pub async fn start_embedding_server(
info!("Starting embedding server on port {port} with model: {model_path}"); info!("Starting embedding server on port {port} with model: {model_path}");
let cmd_path = if cfg!(windows) { let _cmd_path = if cfg!(windows) {
format!("{}\\llama-server.exe", llama_cpp_path) format!("{}\\llama-server.exe", llama_cpp_path)
} else { } else {
format!("{}/llama-server", llama_cpp_path) format!("{}/llama-server", llama_cpp_path)
}; };
let mut command = std::process::Command::new(&cmd_path); let mut args_vec = vec![
command.arg("-m").arg(&model_path) "-m", &model_path,
.arg("--host").arg("0.0.0.0") "--host", "0.0.0.0",
.arg("--port").arg(port) "--port", port,
.arg("--embedding") "--embedding",
.arg("--n-gpu-layers").arg("0") "--n-gpu-layers", "0",
.arg("--verbose"); "--verbose",
];
if !cfg!(windows) { if !cfg!(windows) {
command.arg("--ubatch-size").arg("2048"); args_vec.push("--ubatch-size");
args_vec.push("2048");
} }
let mut command = SafeCommand::new("llama-server")?;
command = command.args(&args_vec)?;
if cfg!(windows) { if cfg!(windows) {
command.current_dir(&llama_cpp_path); command = command.working_dir(std::path::Path::new(&llama_cpp_path))?;
} }
let log_file_path = if cfg!(windows) { let log_file_path = if cfg!(windows) {
@ -552,19 +566,19 @@ pub async fn start_embedding_server(
match std::fs::File::create(&log_file_path) { match std::fs::File::create(&log_file_path) {
Ok(log_file) => { Ok(log_file) => {
if let Ok(clone) = log_file.try_clone() { if let Ok(clone) = log_file.try_clone() {
command.stdout(std::process::Stdio::from(clone)); command = command.stdout(std::process::Stdio::from(clone));
} else { } else {
command.stdout(std::process::Stdio::null()); command = command.stdout(std::process::Stdio::null());
} }
command.stderr(std::process::Stdio::from(log_file)); command = command.stderr(std::process::Stdio::from(log_file));
} }
Err(_) => { Err(_) => {
command.stdout(std::process::Stdio::null()); command = command.stdout(std::process::Stdio::null());
command.stderr(std::process::Stdio::null()); command = command.stderr(std::process::Stdio::null());
} }
} }
info!("Executing embedding server command: {:?}", command); info!("Executing embedding server command: llama-server with {} args", args_vec.len());
command.spawn().map_err(|e| { command.spawn().map_err(|e| {
Box::new(std::io::Error::other(e.to_string())) as Box<dyn std::error::Error + Send + Sync> Box::new(std::io::Error::other(e.to_string())) as Box<dyn std::error::Error + Send + Sync>

479
src/marketing/ai.rs Normal file
View file

@ -0,0 +1,479 @@
use axum::{
extract::State,
http::StatusCode,
Json,
};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::config::ConfigManager;
use crate::core::shared::schema::crm_contacts;
use crate::core::shared::state::AppState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentGenerationRequest {
pub channel: String,
pub goal: String,
pub audience_description: String,
pub template_variables: Option<serde_json::Value>,
pub tone: Option<String>,
pub length: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentGenerationResult {
pub subject: Option<String>,
pub body: String,
pub headline: Option<String>,
pub cta: Option<String>,
pub suggested_images: Vec<String>,
pub variations: Vec<ContentVariation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContentVariation {
pub name: String,
pub body: String,
pub tone: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersonalizationRequest {
pub template: String,
pub contact_id: Uuid,
pub context: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersonalizationResult {
pub personalized_content: String,
pub variables_used: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABTestRequest {
pub campaign_id: Uuid,
pub variations: Vec<ABTestVariation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABTestVariation {
pub name: String,
pub subject: Option<String>,
pub body: String,
pub weight: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ABTestResult {
pub variation_id: String,
pub opens: i64,
pub clicks: i64,
pub open_rate: f64,
pub click_rate: f64,
pub winner: bool,
}
const DEFAULT_TONE: &str = "professional";
const DEFAULT_LENGTH: &str = "medium";
fn build_generation_prompt(req: &ContentGenerationRequest) -> String {
let tone = req.tone.as_deref().unwrap_or(DEFAULT_TONE);
let length = req.length.as_deref().unwrap_or(DEFAULT_LENGTH);
format!(
r#"You are a marketing expert. Create {} length marketing content for {} channel.
Goal: {}
Audience: {}
Tone: {}
Style: Clear, compelling, action-oriented
Generate:
1. A compelling subject line (if email)
2. Main body content ({} characters max)
3. A call-to-action
4. 2 alternative variations with different tones
Respond in JSON format:
{{
"subject": "...",
"body": "...",
"cta": "...",
"variations": [
{{"name": "friendly", "body": "...", "tone": "friendly"}},
{{"name": "urgent", "body": "...", "tone": "urgent"}}
]
}}"#,
length, req.channel, req.goal, req.audience_description, tone, length
)
}
fn build_personalization_prompt(contact: &ContactInfo, template: &str, context: &serde_json::Value) -> String {
let context_str = if context.is_null() {
String::new()
} else {
format!("\nAdditional context: {}", context)
};
let first_name = contact.first_name.as_deref().unwrap_or("there");
let last_name = contact.last_name.as_deref().unwrap_or("");
let email = contact.email.as_deref().unwrap_or("");
let phone = contact.phone.as_deref().unwrap_or("");
let company = contact.company.as_deref().unwrap_or("");
format!(
r#"Personalize the following marketing message for this contact:
Contact Name: {} {}
Email: {}
Phone: {}
Company: {}{}
Original Template:
{}
Rewrite the template, replacing placeholders with the contact's actual information.
Keep the same structure and tone but make it feel personally addressed."#,
first_name,
last_name,
email,
phone,
company,
context_str,
template
)
}
#[derive(Debug, Clone)]
struct ContactInfo {
first_name: Option<String>,
last_name: Option<String>,
email: Option<String>,
phone: Option<String>,
company: Option<String>,
}
async fn get_llm_config(state: &Arc<AppState>, bot_id: Uuid) -> Result<(String, String, String), String> {
let config = ConfigManager::new(state.conn.clone());
let llm_url = config
.get_config(&bot_id, "llm-url", Some("http://localhost:8081"))
.unwrap_or_else(|_| "http://localhost:8081".to_string());
let llm_model = config
.get_config(&bot_id, "llm-model", None)
.unwrap_or_default();
let llm_key = config
.get_config(&bot_id, "llm-key", None)
.unwrap_or_default();
Ok((llm_url, llm_model, llm_key))
}
pub async fn generate_campaign_content(
state: &Arc<AppState>,
bot_id: Uuid,
req: ContentGenerationRequest,
) -> Result<ContentGenerationResult, String> {
let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?;
let prompt = build_generation_prompt(&req);
let config = serde_json::json!({
"temperature": 0.7,
"max_tokens": 2000,
});
let llm_provider = &state.llm_provider;
let response = llm_provider
.generate(&prompt, &config, &llm_model, &llm_key)
.await
.map_err(|e| format!("LLM generation failed: {}", e))?;
parse_llm_response(&response)
}
fn parse_llm_response(response: &str) -> Result<ContentGenerationResult, String> {
let json_start = response.find('{').or_else(|| response.find('['));
let json_end = response.rfind('}').or_else(|| response.rfind(']'));
if let (Some(start), Some(end)) = (json_start, json_end) {
let json_str = &response[start..=end];
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(json_str) {
let subject = parsed.get("subject").and_then(|s| s.as_str()).map(String::from);
let body = parsed.get("body").and_then(|b| b.as_str()).unwrap_or("").to_string();
let cta = parsed.get("cta").and_then(|c| c.as_str()).map(String::from);
let mut variations = Vec::new();
if let Some(vars) = parsed.get("variations").and_then(|v| v.as_array()) {
for v in vars {
variations.push(ContentVariation {
name: v.get("name").and_then(|n| n.as_str()).unwrap_or("").to_string(),
body: v.get("body").and_then(|b| b.as_str()).unwrap_or("").to_string(),
tone: v.get("tone").and_then(|t| t.as_str()).unwrap_or("").to_string(),
});
}
}
return Ok(ContentGenerationResult {
subject,
body,
headline: None,
cta,
suggested_images: vec![],
variations,
});
}
}
Ok(ContentGenerationResult {
subject: Some(response.lines().next().unwrap_or("").to_string()),
body: response.to_string(),
headline: None,
cta: Some("Learn More".to_string()),
suggested_images: vec![],
variations: vec![],
})
}
pub async fn personalize_content(
state: &Arc<AppState>,
bot_id: Uuid,
req: PersonalizationRequest,
) -> Result<PersonalizationResult, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let contact = crm_contacts::table
.filter(crm_contacts::id.eq(req.contact_id))
.filter(crm_contacts::bot_id.eq(bot_id))
.select((
crm_contacts::first_name,
crm_contacts::last_name,
crm_contacts::email,
crm_contacts::phone,
crm_contacts::company,
))
.first::<(Option<String>, Option<String>, Option<String>, Option<String>, Option<String>)>(&mut conn)
.map_err(|_| "Contact not found")?;
let contact_info = ContactInfo {
first_name: contact.0,
last_name: contact.1,
email: contact.2,
phone: contact.3,
company: contact.4,
};
let context = req.context.unwrap_or(serde_json::Value::Null);
let prompt = build_personalization_prompt(&contact_info, &req.template, &context);
let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?;
let config = serde_json::json!({
"temperature": 0.5,
"max_tokens": 1000,
});
let llm_provider = &state.llm_provider;
let response = llm_provider
.generate(&prompt, &config, &llm_model, &llm_key)
.await
.map_err(|e| format!("LLM personalization failed: {}", e))?;
let variables = extract_variables(&req.template);
Ok(PersonalizationResult {
personalized_content: response,
variables_used: variables,
})
}
fn extract_variables(template: &str) -> Vec<String> {
let mut vars = Vec::new();
let mut in_brace = false;
let mut current = String::new();
for c in template.chars() {
match c {
'{' => {
in_brace = true;
current.clear();
}
'}' if in_brace => {
in_brace = false;
if !current.is_empty() {
vars.push(current.clone());
}
current.clear();
}
_ if in_brace => current.push(c),
_ => {}
}
}
vars
}
pub async fn generate_ab_test_variations(
state: &Arc<AppState>,
bot_id: Uuid,
req: ABTestRequest,
) -> Result<Vec<ABTestResult>, String> {
let mut results = Vec::new();
for (i, variation) in req.variations.iter().enumerate() {
let prompt = format!(
r#"Evaluate this marketing variation:
Name: {}
Subject: {}
Body: {}
Provide a JSON response:
{{
"opens": <estimated opens 0-100>,
"clicks": <estimated clicks 0-100>,
"open_rate": <percentage>,
"click_rate": <percentage>
}}"#,
variation.name,
variation.subject.as_deref().unwrap_or("N/A"),
variation.body
);
let config = serde_json::json!({
"temperature": 0.3,
"max_tokens": 200,
});
let llm_provider = &state.llm_provider;
let (_, llm_model, llm_key) = get_llm_config(state, bot_id).await?;
let response = llm_provider
.generate(&prompt, &config, &llm_model, &llm_key)
.await
.unwrap_or_default();
let parsed: serde_json::Value = serde_json::from_str(&response).unwrap_or(serde_json::json!({
"opens": 50,
"clicks": 10,
"open_rate": 50.0,
"click_rate": 10.0
}));
results.push(ABTestResult {
variation_id: format!("variation_{}", i),
opens: parsed.get("opens").and_then(|v| v.as_i64()).unwrap_or(50),
clicks: parsed.get("clicks").and_then(|v| v.as_i64()).unwrap_or(10),
open_rate: parsed.get("open_rate").and_then(|v| v.as_f64()).unwrap_or(50.0),
click_rate: parsed.get("click_rate").and_then(|v| v.as_f64()).unwrap_or(10.0),
winner: false,
});
}
if let Some(winner) = results.iter().max_by(|a, b| a.open_rate.partial_cmp(&b.open_rate).unwrap()) {
let winner_id = winner.variation_id.clone();
for r in &mut results {
r.winner = r.variation_id == winner_id;
}
}
Ok(results)
}
pub async fn generate_template_content(
state: &Arc<AppState>,
template_id: Uuid,
) -> Result<ContentGenerationResult, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
#[derive(QueryableByName)]
struct TemplateRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
channel: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
subject: Option<String>,
}
let template = diesel::sql_query("SELECT bot_id, channel, subject FROM marketing_templates WHERE id = $1")
.bind::<diesel::sql_types::Uuid, _>(template_id)
.get_result::<TemplateRow>(&mut conn)
.map_err(|_| "Template not found")?;
let req = ContentGenerationRequest {
channel: template.channel,
goal: template.subject.unwrap_or_default(),
audience_description: "General audience".to_string(),
template_variables: None,
tone: None,
length: None,
};
let bot_id = template.bot_id;
generate_campaign_content(state, bot_id, req).await
}
#[derive(Debug, Deserialize)]
pub struct GenerateContentRequest {
pub channel: String,
pub goal: String,
pub audience_description: String,
pub template_variables: Option<serde_json::Value>,
pub tone: Option<String>,
pub length: Option<String>,
}
pub async fn generate_content_api(
State(state): State<Arc<AppState>>,
Json(req): Json<GenerateContentRequest>,
) -> Result<Json<ContentGenerationResult>, (StatusCode, String)> {
let bot_id = Uuid::nil();
let internal_req = ContentGenerationRequest {
channel: req.channel,
goal: req.goal,
audience_description: req.audience_description,
template_variables: req.template_variables,
tone: req.tone,
length: req.length,
};
match generate_campaign_content(&state, bot_id, internal_req).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
#[derive(Debug, Deserialize)]
pub struct PersonalizeRequest {
pub template: String,
pub contact_id: Uuid,
pub context: Option<serde_json::Value>,
}
pub async fn personalize_api(
State(state): State<Arc<AppState>>,
Json(req): Json<PersonalizeRequest>,
) -> Result<Json<PersonalizationResult>, (StatusCode, String)> {
let bot_id = Uuid::nil();
let internal_req = PersonalizationRequest {
template: req.template,
contact_id: req.contact_id,
context: req.context,
};
match personalize_content(&state, bot_id, internal_req).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View file

@ -249,7 +249,7 @@ pub struct CampaignSendResult {
fn render_template(template: &str, variables: &serde_json::Value) -> String { fn render_template(template: &str, variables: &serde_json::Value) -> String {
let mut result = template.to_string(); let mut result = template.to_string();
if let Ok(obj) = variables.as_object() { if let Some(obj) = variables.as_object() {
for (key, value) in obj { for (key, value) in obj {
let placeholder = format!("{{{}}}", key); let placeholder = format!("{{{}}}", key);
let replacement = value.as_str().unwrap_or(""); let replacement = value.as_str().unwrap_or("");
@ -277,8 +277,8 @@ async fn generate_ai_content(
async fn send_via_email( async fn send_via_email(
to_email: &str, to_email: &str,
subject: &str, _subject: &str,
body: &str, _body: &str,
bot_id: Uuid, bot_id: Uuid,
) -> Result<(), String> { ) -> Result<(), String> {
log::info!("Sending email to {} via bot {}", to_email, bot_id); log::info!("Sending email to {} via bot {}", to_email, bot_id);
@ -287,7 +287,7 @@ async fn send_via_email(
async fn send_via_whatsapp( async fn send_via_whatsapp(
to_phone: &str, to_phone: &str,
body: &str, _body: &str,
bot_id: Uuid, bot_id: Uuid,
) -> Result<(), String> { ) -> Result<(), String> {
log::info!("Sending WhatsApp to {} via bot {}", to_phone, bot_id); log::info!("Sending WhatsApp to {} via bot {}", to_phone, bot_id);
@ -296,7 +296,7 @@ async fn send_via_whatsapp(
async fn send_via_telegram( async fn send_via_telegram(
to_chat_id: &str, to_chat_id: &str,
body: &str, _body: &str,
bot_id: Uuid, bot_id: Uuid,
) -> Result<(), String> { ) -> Result<(), String> {
log::info!("Sending Telegram to {} via bot {}", to_chat_id, bot_id); log::info!("Sending Telegram to {} via bot {}", to_chat_id, bot_id);
@ -305,7 +305,7 @@ async fn send_via_telegram(
async fn send_via_sms( async fn send_via_sms(
to_phone: &str, to_phone: &str,
body: &str, _body: &str,
bot_id: Uuid, bot_id: Uuid,
) -> Result<(), String> { ) -> Result<(), String> {
log::info!("Sending SMS to {} via bot {}", to_phone, bot_id); log::info!("Sending SMS to {} via bot {}", to_phone, bot_id);
@ -331,7 +331,7 @@ pub async fn send_campaign(
let mut recipient_ids: Vec<Uuid> = Vec::new(); let mut recipient_ids: Vec<Uuid> = Vec::new();
if let Some(list_id) = req.list_id { if let Some(_list_id) = req.list_id {
use crate::core::shared::schema::crm_contacts; use crate::core::shared::schema::crm_contacts;
let contacts: Vec<Uuid> = crm_contacts::table let contacts: Vec<Uuid> = crm_contacts::table
@ -384,18 +384,18 @@ pub async fn send_campaign(
}; };
for contact_id in recipient_ids { for contact_id in recipient_ids {
let contact: Option<(String, Option<String>, Option<String>)> = crm_contacts::table let contact = crm_contacts::table
.filter(crm_contacts::id.eq(contact_id)) .filter(crm_contacts::id.eq(contact_id))
.select((crm_contacts::email, crm_contacts::phone, crm_contacts::first_name)) .select((crm_contacts::email, crm_contacts::phone, crm_contacts::first_name))
.first(&mut conn) .first::<(Option<String>, Option<String>, Option<String>)>(&mut conn)
.ok(); .ok();
if let Some((email, phone, first_name)) = contact { if let Some((email, phone, first_name)) = contact {
let contact_name = first_name.unwrap_or("Customer".to_string()); let contact_name = first_name.unwrap_or("Customer".to_string());
let (subject, body) = if let Some(ref tmpl) = template { let (subject, body) = if let Some(ref tmpl) = template {
let mut subject = tmpl.subject.clone().unwrap_or_default(); let mut subject = tmpl.subject.clone();
let mut body = tmpl.body.clone().unwrap_or_default(); let mut body = tmpl.body.clone();
let variables = serde_json::json!({ let variables = serde_json::json!({
"name": contact_name, "name": contact_name,

391
src/marketing/email.rs Normal file
View file

@ -0,0 +1,391 @@
use axum::{
extract::State,
http::StatusCode,
Json,
};
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use lettre::{message::Mailbox, Address, Message, SmtpTransport, Transport};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::config::ConfigManager;
use crate::core::shared::schema::{
email_tracking, marketing_campaigns, marketing_recipients,
};
use crate::core::shared::state::AppState;
use crate::marketing::campaigns::CrmCampaign;
const SMTP_SERVER_KEY: &str = "email-server";
const SMTP_PORT_KEY: &str = "email-port";
const FROM_EMAIL_KEY: &str = "email-from";
const DEFAULT_SMTP_SERVER: &str = "smtp.gmail.com";
const DEFAULT_SMTP_PORT: u16 = 587;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailCampaignPayload {
pub to: String,
pub subject: String,
pub body_html: Option<String>,
pub body_text: Option<String>,
pub campaign_id: Option<Uuid>,
pub recipient_id: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailSendResult {
pub success: bool,
pub message_id: Option<String>,
pub tracking_id: Option<Uuid>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailTrackingRecord {
pub id: Uuid,
pub recipient_id: Option<Uuid>,
pub campaign_id: Option<Uuid>,
pub message_id: Option<String>,
pub open_token: Option<Uuid>,
pub opened: bool,
pub opened_at: Option<DateTime<Utc>>,
pub clicked: bool,
pub clicked_at: Option<DateTime<Utc>>,
pub ip_address: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CampaignMetrics {
pub total_sent: i64,
pub total_delivered: i64,
pub total_failed: i64,
pub total_opened: i64,
pub total_clicked: i64,
pub open_rate: f64,
pub click_rate: f64,
pub bounce_rate: f64,
}
fn get_smtp_config(state: &AppState, bot_id: Uuid) -> Result<(String, u16, String, String, String), String> {
let config = ConfigManager::new(state.conn.clone());
let smtp_server = config
.get_config(&bot_id, SMTP_SERVER_KEY, Some(DEFAULT_SMTP_SERVER))
.unwrap_or_else(|_| DEFAULT_SMTP_SERVER.to_string());
let smtp_port = config
.get_config(&bot_id, SMTP_PORT_KEY, Some("587"))
.unwrap_or_else(|_| "587".to_string())
.parse::<u16>()
.unwrap_or(DEFAULT_SMTP_PORT);
let from_email = config
.get_config(&bot_id, FROM_EMAIL_KEY, Some("noreply@generalbots.com"))
.unwrap_or_else(|_| "noreply@generalbots.com".to_string());
let smtp_username = config
.get_config(&bot_id, "email-username", None)
.unwrap_or_default();
let smtp_password = config
.get_config(&bot_id, "email-password", None)
.unwrap_or_default();
Ok((smtp_server, smtp_port, from_email, smtp_username, smtp_password))
}
fn inject_tracking_pixel(html: &str, token: Uuid, base_url: &str) -> String {
let pixel_url = format!("{}/api/marketing/track/open/{}", base_url, token);
let pixel = format!(
r#"<img src="{}" width="1" height="1" alt="" style="display:none;visibility:hidden;border:0;" />"#,
pixel_url
);
if html.to_lowercase().contains("</body>") {
html.replace("</body>", &format!("{}</body>", pixel))
.replace("</BODY>", &format!("{}</BODY>", pixel))
} else {
format!("{}{}", html, pixel)
}
}
fn wrap_tracking_links(html: &str, tracking_id: Uuid, base_url: &str) -> String {
let wrapped = html.replace(
"href=\"",
&format!("href=\"{}/api/marketing/track/click/{}/", base_url, tracking_id),
);
wrapped.replace(
"href='",
&format!("href='{}/api/marketing/track/click/{}/", base_url, tracking_id),
)
}
pub async fn send_campaign_email(
state: &Arc<AppState>,
bot_id: Uuid,
payload: EmailCampaignPayload,
) -> Result<EmailSendResult, String> {
let (smtp_server, smtp_port, from_email, username, password) =
get_smtp_config(state, bot_id)?;
let open_token = Uuid::new_v4();
let tracking_id = Uuid::new_v4();
let config = ConfigManager::new(state.conn.clone());
let base_url = config
.get_config(&bot_id, "server-url", Some("http://localhost:3000"))
.unwrap_or_else(|_| "http://localhost:3000".to_string());
let body_html = payload
.body_html
.map(|html| wrap_tracking_links(&html, tracking_id, &base_url))
.map(|html| inject_tracking_pixel(&html, open_token, &base_url));
let mut conn = state.conn.get().map_err(|e| format!("DB connection failed: {}", e))?;
let tracking_record = EmailTrackingRecord {
id: tracking_id,
recipient_id: payload.recipient_id,
campaign_id: payload.campaign_id,
message_id: None,
open_token: Some(open_token),
opened: false,
opened_at: None,
clicked: false,
clicked_at: None,
ip_address: None,
};
diesel::insert_into(email_tracking::table)
.values((
email_tracking::id.eq(tracking_record.id),
email_tracking::recipient_id.eq(tracking_record.recipient_id),
email_tracking::campaign_id.eq(tracking_record.campaign_id),
email_tracking::open_token.eq(tracking_record.open_token),
email_tracking::open_tracking_enabled.eq(true),
email_tracking::opened.eq(false),
email_tracking::clicked.eq(false),
email_tracking::created_at.eq(Utc::now()),
))
.execute(&mut conn)
.map_err(|e| format!("Failed to create tracking record: {}", e))?;
let from = from_email
.parse::<Address>()
.map_err(|e| format!("Invalid from address: {}", e))?;
let to = payload
.to
.parse::<Address>()
.map_err(|e| format!("Invalid to address: {}", e))?;
let builder = Message::builder()
.from(Mailbox::new(Some("Campaign".to_string()), from))
.to(Mailbox::new(None, to))
.subject(&payload.subject);
let mail = if let Some(html) = &body_html {
builder
.header(lettre::message::header::ContentType::TEXT_HTML)
.body(html.clone())
.map_err(|e| format!("Failed to build HTML email: {}", e))?
} else if let Some(text) = &payload.body_text {
builder
.header(lettre::message::header::ContentType::TEXT_PLAIN)
.body(text.clone())
.map_err(|e| format!("Failed to build text email: {}", e))?
} else {
return Err("No email body provided".to_string());
};
let mut mailer = SmtpTransport::relay(&smtp_server)
.map_err(|e| format!("SMTP relay error: {}", e))?
.port(smtp_port);
if !username.is_empty() && !password.is_empty() {
mailer = mailer
.credentials(lettre::transport::smtp::authentication::Credentials::new(
username,
password,
))
.authentication(vec![lettre::transport::smtp::authentication::Mechanism::Login]);
}
let mailer = mailer.build();
match mailer.send(&mail) {
Ok(_) => {
let message_id = format!("<{}@campaign>", tracking_id);
diesel::update(email_tracking::table.filter(email_tracking::id.eq(tracking_id)))
.set(email_tracking::message_id.eq(Some(message_id.clone())))
.execute(&mut conn)
.ok();
if let Some(recipient_id) = payload.recipient_id {
diesel::update(marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)))
.set((
marketing_recipients::status.eq("sent"),
marketing_recipients::sent_at.eq(Some(Utc::now())),
))
.execute(&mut conn)
.ok();
}
Ok(EmailSendResult {
success: true,
message_id: Some(message_id),
tracking_id: Some(tracking_id),
error: None,
})
}
Err(e) => {
if let Some(recipient_id) = payload.recipient_id {
diesel::update(marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)))
.set((
marketing_recipients::status.eq("failed"),
marketing_recipients::failed_at.eq(Some(Utc::now())),
marketing_recipients::error_message.eq(Some(e.to_string())),
))
.execute(&mut conn)
.ok();
}
Ok(EmailSendResult {
success: false,
message_id: None,
tracking_id: Some(tracking_id),
error: Some(e.to_string()),
})
}
}
}
pub async fn get_campaign_email_metrics(
state: &Arc<AppState>,
campaign_id: Uuid,
) -> Result<CampaignMetrics, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let results: Vec<(Option<bool>, Option<bool>)> = email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.select((email_tracking::opened, email_tracking::clicked))
.load(&mut conn)
.map_err(|e| format!("Query error: {}", e))?;
let total = results.len() as i64;
let opened = results.iter().filter(|(o, _)| o.unwrap_or(false)).count() as i64;
let clicked = results.iter().filter(|(_, c)| c.unwrap_or(false)).count() as i64;
let recipients: Vec<(String, Option<DateTime<Utc>>)> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq(campaign_id))
.filter(marketing_recipients::channel.eq("email"))
.select((marketing_recipients::status, marketing_recipients::sent_at))
.load(&mut conn)
.map_err(|e| format!("Query error: {}", e))?;
let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64;
let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64;
let delivered = sent;
Ok(CampaignMetrics {
total_sent: total,
total_delivered: delivered,
total_failed: failed,
total_opened: opened,
total_clicked: clicked,
open_rate: if delivered > 0 { (opened as f64 / delivered as f64) * 100.0 } else { 0.0 },
click_rate: if delivered > 0 { (clicked as f64 / delivered as f64) * 100.0 } else { 0.0 },
bounce_rate: if sent > 0 { (failed as f64 / sent as f64) * 100.0 } else { 0.0 },
})
}
pub async fn send_bulk_campaign_emails(
state: &Arc<AppState>,
campaign_id: Uuid,
contacts: Vec<(Uuid, String, String)>,
) -> Result<(i32, i32), String> {
let mut sent = 0;
let mut failed = 0;
let campaign: CrmCampaign = marketing_campaigns::table
.filter(marketing_campaigns::id.eq(campaign_id))
.first(&mut *state.conn.get().map_err(|e| format!("DB error: {}", e))?)
.map_err(|_| "Campaign not found")?;
let subject = campaign
.content_template
.get("subject")
.and_then(|s| s.as_str())
.unwrap_or("Newsletter")
.to_string();
let body_html = campaign
.content_template
.get("body")
.and_then(|b| b.as_str())
.map(String::from);
for (contact_id, email, name) in contacts {
let personalized_body = body_html.as_ref().map(|html| {
html.replace("{{name}}", &name)
.replace("{{email}}", &email)
});
let payload = EmailCampaignPayload {
to: email,
subject: subject.clone(),
body_html: personalized_body.clone(),
body_text: None,
campaign_id: Some(campaign_id),
recipient_id: Some(contact_id),
};
match send_campaign_email(state, campaign.bot_id, payload).await {
Ok(result) => {
if result.success {
sent += 1;
} else {
failed += 1;
log::error!("Email send failed: {:?}", result.error);
}
}
Err(e) => {
failed += 1;
log::error!("Email error: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
Ok((sent, failed))
}
#[derive(Debug, Deserialize)]
pub struct SendEmailRequest {
pub to: String,
pub subject: String,
pub body_html: Option<String>,
pub body_text: Option<String>,
}
pub async fn send_email_api(
State(state): State<Arc<AppState>>,
Json(req): Json<SendEmailRequest>,
) -> Result<Json<EmailSendResult>, (StatusCode, String)> {
let bot_id = Uuid::nil();
let payload = EmailCampaignPayload {
to: req.to,
subject: req.subject,
body_html: req.body_html,
body_text: req.body_text,
campaign_id: None,
recipient_id: None,
};
match send_campaign_email(&state, bot_id, payload).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

525
src/marketing/metrics.rs Normal file
View file

@ -0,0 +1,525 @@
use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::shared::schema::{
email_tracking, marketing_campaigns, marketing_recipients,
};
use crate::core::shared::state::AppState;
use crate::marketing::campaigns::CrmCampaign;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CampaignMetrics {
pub campaign_id: Uuid,
pub channel: String,
pub total_recipients: i64,
pub sent: i64,
pub delivered: i64,
pub failed: i64,
pub opened: i64,
pub clicked: i64,
pub replied: i64,
pub open_rate: f64,
pub click_rate: f64,
pub conversion_rate: f64,
pub cost_per_result: f64,
pub total_spend: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelBreakdown {
pub channel: String,
pub recipients: i64,
pub sent: i64,
pub delivered: i64,
pub opened: i64,
pub clicked: i64,
pub open_rate: f64,
pub click_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesMetric {
pub timestamp: DateTime<Utc>,
pub sent: i64,
pub delivered: i64,
pub opened: i64,
pub clicked: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregateMetrics {
pub total_campaigns: i64,
pub active_campaigns: i64,
pub total_recipients: i64,
pub total_sent: i64,
pub total_delivered: i64,
pub total_opened: i64,
pub total_clicked: i64,
pub avg_open_rate: f64,
pub avg_click_rate: f64,
pub channel_breakdown: Vec<ChannelBreakdown>,
}
fn calculate_open_rate(delivered: i64, opened: i64) -> f64 {
if delivered > 0 {
(opened as f64 / delivered as f64) * 100.0
} else {
0.0
}
}
fn calculate_click_rate(delivered: i64, clicked: i64) -> f64 {
if delivered > 0 {
(clicked as f64 / delivered as f64) * 100.0
} else {
0.0
}
}
pub async fn get_campaign_metrics(
state: &Arc<AppState>,
campaign_id: Uuid,
) -> Result<CampaignMetrics, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let campaign: CrmCampaign = marketing_campaigns::table
.filter(marketing_campaigns::id.eq(campaign_id))
.first(&mut conn)
.map_err(|_| "Campaign not found")?;
let recipients: Vec<(String, Option<serde_json::Value>)> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq(campaign_id))
.select((marketing_recipients::status, marketing_recipients::response))
.load(&mut conn)
.map_err(|e| format!("Query error: {}", e))?;
let total = recipients.len() as i64;
let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64;
let delivered = recipients
.iter()
.filter(|(s, r)| {
let is_delivered = s == "delivered" || s == "sent";
let has_delivery_status = r
.as_ref()
.and_then(|v| v.get("status"))
.and_then(|s| s.as_str())
.map(|st| st == "delivered" || st == "read")
.unwrap_or(false);
is_delivered || has_delivery_status
})
.count() as i64;
let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64;
let replied = recipients
.iter()
.filter(|(_, r)| {
r.as_ref()
.and_then(|v| v.get("type"))
.and_then(|t| t.as_str())
.map(|t| t == "reply")
.unwrap_or(false)
})
.count() as i64;
let email_opens = if campaign.channel == "email" || campaign.channel == "multi" {
email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.filter(email_tracking::opened.eq(true))
.count()
.get_result::<i64>(&mut conn)
.unwrap_or(0)
} else {
0
};
let email_clicks = if campaign.channel == "email" || campaign.channel == "multi" {
email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.filter(email_tracking::clicked.eq(true))
.count()
.get_result::<i64>(&mut conn)
.unwrap_or(0)
} else {
0
};
let open_rate = calculate_open_rate(delivered, email_opens);
let click_rate = calculate_click_rate(delivered, email_clicks);
let conversion_rate = if delivered > 0 {
(replied as f64 / delivered as f64) * 100.0
} else {
0.0
};
let budget = campaign.budget.unwrap_or(0.0);
let cost_per_result = if sent > 0 {
budget / sent as f64
} else {
0.0
};
Ok(CampaignMetrics {
campaign_id,
channel: campaign.channel,
total_recipients: total,
sent,
delivered,
failed,
opened: email_opens,
clicked: email_clicks,
replied,
open_rate,
click_rate,
conversion_rate,
cost_per_result,
total_spend: budget,
})
}
pub async fn get_campaign_metrics_by_channel(
state: &Arc<AppState>,
campaign_id: Uuid,
) -> Result<Vec<ChannelBreakdown>, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let channels = vec!["email", "whatsapp", "instagram", "facebook", "telegram", "sms"];
let mut breakdown = Vec::new();
for channel in channels {
let recipients: Vec<String> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq(campaign_id))
.filter(marketing_recipients::channel.eq(channel))
.select(marketing_recipients::status)
.load(&mut conn)
.unwrap_or_default();
if recipients.is_empty() {
continue;
}
let total = recipients.len() as i64;
let sent = recipients.iter().filter(|s| *s == "sent").count() as i64;
let delivered = recipients.iter().filter(|s| *s == "delivered" || *s == "read").count() as i64;
let opened = if channel == "email" {
email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.filter(email_tracking::opened.eq(true))
.count()
.get_result::<i64>(&mut conn)
.unwrap_or(0)
} else {
0
};
let clicked = if channel == "email" {
email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.filter(email_tracking::clicked.eq(true))
.count()
.get_result::<i64>(&mut conn)
.unwrap_or(0)
} else {
0
};
breakdown.push(ChannelBreakdown {
channel: channel.to_string(),
recipients: total,
sent,
delivered,
opened,
clicked,
open_rate: calculate_open_rate(delivered, opened),
click_rate: calculate_click_rate(delivered, clicked),
});
}
Ok(breakdown)
}
pub async fn get_time_series_metrics(
state: &Arc<AppState>,
campaign_id: Uuid,
interval_hours: i32,
) -> Result<Vec<TimeSeriesMetric>, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let recipients: Vec<(Option<DateTime<Utc>>, Option<DateTime<Utc>>, Option<DateTime<Utc>>)> =
marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq(campaign_id))
.select((
marketing_recipients::sent_at,
marketing_recipients::delivered_at,
marketing_recipients::failed_at,
))
.load(&mut conn)
.map_err(|e| format!("Query error: {}", e))?;
let mut sent_by_hour: std::collections::HashMap<i64, i64> = std::collections::HashMap::new();
let mut delivered_by_hour: std::collections::HashMap<i64, i64> = std::collections::HashMap::new();
let mut opened_by_hour: std::collections::HashMap<i64, i64> = std::collections::HashMap::new();
let mut clicked_by_hour: std::collections::HashMap<i64, i64> = std::collections::HashMap::new();
for (sent, delivered, failed) in recipients {
if sent.is_some() || failed.is_some() {
let ts = sent
.or(failed)
.map(|t| t.timestamp() / (interval_hours as i64 * 3600))
.unwrap_or(0);
*sent_by_hour.entry(ts).or_insert(0) += 1;
}
if let Some(d) = delivered {
let ts = d.timestamp() / (interval_hours as i64 * 3600);
*delivered_by_hour.entry(ts).or_insert(0) += 1;
}
}
let email_events: Vec<(Option<DateTime<Utc>>, Option<DateTime<Utc>>)> =
email_tracking::table
.filter(email_tracking::campaign_id.eq(campaign_id))
.select((email_tracking::opened_at, email_tracking::clicked_at))
.load(&mut conn)
.unwrap_or_default();
for (opened, clicked) in email_events {
if let Some(ts) = opened {
let key = ts.timestamp() / (interval_hours as i64 * 3600);
*opened_by_hour.entry(key).or_insert(0) += 1;
}
if let Some(ts) = clicked {
let key = ts.timestamp() / (interval_hours as i64 * 3600);
*clicked_by_hour.entry(key).or_insert(0) += 1;
}
}
let mut metrics: Vec<TimeSeriesMetric> = sent_by_hour
.keys()
.map(|&ts| TimeSeriesMetric {
timestamp: DateTime::from_timestamp(ts * interval_hours as i64 * 3600, 0)
.unwrap_or_else(Utc::now),
sent: *sent_by_hour.get(&ts).unwrap_or(&0),
delivered: *delivered_by_hour.get(&ts).unwrap_or(&0),
opened: *opened_by_hour.get(&ts).unwrap_or(&0),
clicked: *clicked_by_hour.get(&ts).unwrap_or(&0),
})
.collect();
metrics.sort_by_key(|m| m.timestamp);
Ok(metrics)
}
pub async fn get_aggregate_metrics(
state: &Arc<AppState>,
org_id: Uuid,
bot_id: Uuid,
) -> Result<AggregateMetrics, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let total_campaigns: i64 = marketing_campaigns::table
.filter(marketing_campaigns::org_id.eq(org_id))
.filter(marketing_campaigns::bot_id.eq(bot_id))
.count()
.get_result(&mut conn)
.unwrap_or(0);
let active_campaigns: i64 = marketing_campaigns::table
.filter(marketing_campaigns::org_id.eq(org_id))
.filter(marketing_campaigns::bot_id.eq(bot_id))
.filter(marketing_campaigns::status.eq("running"))
.count()
.get_result(&mut conn)
.unwrap_or(0);
let campaigns: Vec<CrmCampaign> = marketing_campaigns::table
.filter(marketing_campaigns::org_id.eq(org_id))
.filter(marketing_campaigns::bot_id.eq(bot_id))
.select(marketing_campaigns::all_columns)
.load(&mut conn)
.unwrap_or_default();
let campaign_ids: Vec<Uuid> = campaigns.iter().map(|c| c.id).collect();
let recipients: Vec<(String, String)> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq_any(campaign_ids.clone()))
.select((marketing_recipients::channel, marketing_recipients::status))
.load(&mut conn)
.unwrap_or_default();
let total_recipients = recipients.len() as i64;
let total_sent = recipients.iter().filter(|(_, s)| s == "sent").count() as i64;
let total_delivered = recipients.iter().filter(|(_, s)| s == "delivered" || s == "read").count() as i64;
let _total_failed = recipients.iter().filter(|(_, s)| s == "failed").count() as i64;
let total_opened: i64 = email_tracking::table
.filter(email_tracking::campaign_id.eq_any(campaign_ids.clone()))
.filter(email_tracking::opened.eq(true))
.count()
.get_result(&mut conn)
.unwrap_or(0);
let total_clicked: i64 = email_tracking::table
.filter(email_tracking::campaign_id.eq_any(campaign_ids.clone()))
.filter(email_tracking::clicked.eq(true))
.count()
.get_result(&mut conn)
.unwrap_or(0);
let avg_open_rate = if total_delivered > 0 {
(total_opened as f64 / total_delivered as f64) * 100.0
} else {
0.0
};
let avg_click_rate = if total_delivered > 0 {
(total_clicked as f64 / total_delivered as f64) * 100.0
} else {
0.0
};
let channel_breakdown = get_channel_breakdown(&mut conn, &campaign_ids.clone()).await?;
Ok(AggregateMetrics {
total_campaigns,
active_campaigns,
total_recipients,
total_sent,
total_delivered,
total_opened,
total_clicked,
avg_open_rate,
avg_click_rate,
channel_breakdown,
})
}
async fn get_channel_breakdown(
conn: &mut diesel::PgConnection,
campaign_ids: &[Uuid],
) -> Result<Vec<ChannelBreakdown>, String> {
let channels = vec!["email", "whatsapp", "instagram", "facebook", "telegram", "sms"];
let mut breakdown = Vec::new();
for channel in channels {
let recipients: Vec<String> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq_any(campaign_ids))
.filter(marketing_recipients::channel.eq(channel))
.select(marketing_recipients::status)
.load(conn)
.unwrap_or_default();
if recipients.is_empty() {
continue;
}
let total = recipients.len() as i64;
let sent = recipients.iter().filter(|s| *s == "sent").count() as i64;
let delivered = recipients.iter().filter(|s| *s == "delivered" || *s == "read").count() as i64;
let opened = if channel == "email" {
email_tracking::table
.filter(email_tracking::campaign_id.eq_any(campaign_ids))
.filter(email_tracking::opened.eq(true))
.count()
.get_result::<i64>(conn)
.unwrap_or(0)
} else {
0
};
let clicked = if channel == "email" {
email_tracking::table
.filter(email_tracking::campaign_id.eq_any(campaign_ids))
.filter(email_tracking::clicked.eq(true))
.count()
.get_result::<i64>(conn)
.unwrap_or(0)
} else {
0
};
breakdown.push(ChannelBreakdown {
channel: channel.to_string(),
recipients: total,
sent,
delivered,
opened,
clicked,
open_rate: calculate_open_rate(delivered, opened),
click_rate: calculate_click_rate(delivered, clicked),
});
}
Ok(breakdown)
}
pub async fn get_campaign_metrics_api(
State(state): State<Arc<AppState>>,
Path(campaign_id): Path<Uuid>,
) -> Result<Json<CampaignMetrics>, (StatusCode, String)> {
match get_campaign_metrics(&state, campaign_id).await {
Ok(metrics) => Ok(Json(metrics)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
pub async fn get_campaign_channel_breakdown_api(
State(state): State<Arc<AppState>>,
Path(campaign_id): Path<Uuid>,
) -> Result<Json<Vec<ChannelBreakdown>>, (StatusCode, String)> {
match get_campaign_metrics_by_channel(&state, campaign_id).await {
Ok(breakdown) => Ok(Json(breakdown)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
pub async fn get_campaign_timeseries_api(
State(state): State<Arc<AppState>>,
Path((campaign_id, interval)): Path<(Uuid, i32)>,
) -> Result<Json<Vec<TimeSeriesMetric>>, (StatusCode, String)> {
match get_time_series_metrics(&state, campaign_id, interval).await {
Ok(metrics) => Ok(Json(metrics)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
pub async fn get_aggregate_metrics_api(
State(state): State<Arc<AppState>>,
) -> Result<Json<AggregateMetrics>, (StatusCode, String)> {
let (org_id, bot_id) = get_default_context(&state);
match get_aggregate_metrics(&state, org_id, bot_id).await {
Ok(metrics) => Ok(Json(metrics)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}
fn get_default_context(state: &Arc<AppState>) -> (Uuid, Uuid) {
let mut conn = match state.conn.get() {
Ok(c) => c,
Err(_) => return (Uuid::nil(), Uuid::nil()),
};
#[derive(QueryableByName)]
struct BotRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Uuid>)]
org_id: Option<Uuid>,
}
let bot = diesel::sql_query("SELECT id, org_id FROM bots LIMIT 1")
.get_result::<BotRow>(&mut conn)
.ok();
match bot {
Some(b) => (b.org_id.unwrap_or(Uuid::nil()), b.id),
None => (Uuid::nil(), Uuid::nil()),
}
}

View file

@ -2,8 +2,13 @@ pub mod campaigns;
pub mod lists; pub mod lists;
pub mod templates; pub mod templates;
pub mod triggers; pub mod triggers;
pub mod email;
pub mod whatsapp;
pub mod metrics;
pub mod ai;
use axum::{ use axum::{
body::Body,
extract::{Path, State}, extract::{Path, State},
http::{header, StatusCode}, http::{header, StatusCode},
response::Response, response::Response,
@ -35,14 +40,14 @@ fn base64_decode(input: &str) -> Option<Vec<u8>> {
-1, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, -1, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3,
4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1,
-1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, -1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41,
42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, -1, -1,
]; ];
let mut output = Vec::with_capacity(chars.len() * 3 / 4); let mut output = Vec::with_capacity(chars.len() * 3 / 4);
let mut buf = [0u8; 4]; let mut buf = [0u8; 4];
let mut count = 0; let mut count = 0;
for (i, &byte) in chars.iter().enumerate() { for &byte in chars.iter() {
if byte >= 128 { if byte >= 128 {
return None; return None;
} }
@ -93,7 +98,7 @@ pub async fn track_email_open_pixel(
} }
} }
let mut response = Response::new(pixel); let mut response = Response::new(Body::from(pixel));
response.headers_mut().insert( response.headers_mut().insert(
header::CONTENT_TYPE, header::CONTENT_TYPE,
"image/png".parse().unwrap(), "image/png".parse().unwrap(),
@ -131,7 +136,7 @@ pub async fn track_email_click(
format!("/{}", destination) format!("/{}", destination)
}; };
let mut response = Response::new(""); let mut response = Response::new(Body::empty());
*response.status_mut() = StatusCode::FOUND; *response.status_mut() = StatusCode::FOUND;
response.headers_mut().insert( response.headers_mut().insert(
header::LOCATION, header::LOCATION,
@ -156,4 +161,15 @@ pub fn configure_marketing_routes() -> Router<Arc<AppState>> {
.route("/api/crm/email/track/open", post(triggers::track_email_open)) .route("/api/crm/email/track/open", post(triggers::track_email_open))
.route("/api/marketing/track/open/:token", get(track_email_open_pixel)) .route("/api/marketing/track/open/:token", get(track_email_open_pixel))
.route("/api/marketing/track/click/:id/*destination", get(track_email_click)) .route("/api/marketing/track/click/:id/*destination", get(track_email_click))
.route("/api/crm/email/send", post(email::send_email_api))
.route("/api/crm/whatsapp/send", post(whatsapp::send_whatsapp_api))
.route("/api/crm/metrics/campaign/:id", get(metrics::get_campaign_metrics_api))
.route("/api/crm/metrics/campaign/:id/channels", get(metrics::get_campaign_channel_breakdown_api))
.route("/api/crm/metrics/campaign/:id/timeseries/:interval", get(metrics::get_campaign_timeseries_api))
.route("/api/crm/metrics/aggregate", get(metrics::get_aggregate_metrics_api))
.route("/api/crm/ai/generate", post(ai::generate_content_api))
.route("/api/crm/ai/personalize", post(ai::personalize_api))
} }

378
src/marketing/whatsapp.rs Normal file
View file

@ -0,0 +1,378 @@
use axum::{
extract::State,
http::StatusCode,
Json,
};
use chrono::Utc;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
use crate::core::bot::channels::ChannelAdapter;
use crate::core::shared::schema::{
marketing_campaigns, marketing_recipients,
};
use crate::core::shared::state::AppState;
use crate::marketing::campaigns::CrmCampaign;
use crate::core::shared::models::BotResponse;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppCampaignPayload {
pub to: String,
pub body: String,
pub media_url: Option<String>,
pub campaign_id: Option<Uuid>,
pub recipient_id: Option<Uuid>,
pub template_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppSendResult {
pub success: bool,
pub message_id: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppTemplate {
pub name: String,
pub language: String,
pub components: Vec<WhatsAppTemplateComponent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppTemplateComponent {
pub component_type: String,
pub parameters: Vec<WhatsAppTemplateParameter>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppTemplateParameter {
pub parameter_type: String,
pub text: Option<String>,
pub media_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppBusinessConfig {
pub id: Uuid,
pub bot_id: Uuid,
pub phone_number_id: Option<String>,
pub business_account_id: Option<String>,
pub access_token: Option<String>,
pub webhooks_verified: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhatsAppMetrics {
pub total_sent: i64,
pub total_delivered: i64,
pub total_failed: i64,
pub total_read: i64,
pub delivery_rate: f64,
pub read_rate: f64,
}
fn get_whatsapp_config(
state: &AppState,
bot_id: Uuid,
) -> Result<WhatsAppBusinessConfig, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
#[derive(QueryableByName)]
struct WhatsAppConfigRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
phone_number_id: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
business_account_id: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
access_token: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Bool>)]
webhooks_verified: Option<bool>,
}
let config = diesel::sql_query("SELECT id, bot_id, phone_number_id, business_account_id, access_token, webhooks_verified FROM whatsapp_business WHERE bot_id = $1")
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.get_result::<WhatsAppConfigRow>(&mut conn)
.map_err(|e| format!("WhatsApp config not found: {}", e))?;
Ok(WhatsAppBusinessConfig {
id: config.id,
bot_id: config.bot_id,
phone_number_id: config.phone_number_id,
business_account_id: config.business_account_id,
access_token: config.access_token,
webhooks_verified: config.webhooks_verified.unwrap_or(false),
})
}
pub async fn send_whatsapp_message(
state: &Arc<AppState>,
bot_id: Uuid,
payload: WhatsAppCampaignPayload,
) -> Result<WhatsAppSendResult, String> {
let config = get_whatsapp_config(state, bot_id)?;
if config.phone_number_id.is_none() || config.access_token.is_none() {
return Err("WhatsApp not configured for this bot".to_string());
}
let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id);
let result: Result<String, Box<dyn std::error::Error + Send + Sync>> = if let Some(template_name) = payload.template_name {
adapter
.send_template_message(
&payload.to,
&template_name,
"pt_BR",
vec![],
)
.await
} else if let Some(media_url) = &payload.media_url {
let media_type = if media_url.ends_with(".mp4") {
"video"
} else if media_url.ends_with(".png") || media_url.ends_with(".jpg") || media_url.ends_with(".jpeg") {
"image"
} else if media_url.ends_with(".pdf") {
"document"
} else {
"image"
};
adapter
.send_media_message(&payload.to, media_url, media_type, Some(&payload.body))
.await
.map(|_| "sent".to_string())
} else {
let response = BotResponse::new(
bot_id.to_string(),
"marketing".to_string(),
payload.to.clone(),
payload.body.clone(),
"whatsapp".to_string(),
);
adapter.send_message(response).await.map(|_| "sent".to_string())
};
match result {
Ok(message_id) => {
if let Some(recipient_id) = payload.recipient_id {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
diesel::update(
marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)),
)
.set((
marketing_recipients::status.eq("sent"),
marketing_recipients::sent_at.eq(Some(Utc::now())),
marketing_recipients::response.eq(serde_json::json!({ "message_id": message_id })),
))
.execute(&mut conn)
.ok();
}
Ok(WhatsAppSendResult {
success: true,
message_id: Some(message_id),
error: None,
})
}
Err(send_err) => {
if let Some(recipient_id) = payload.recipient_id {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
diesel::update(
marketing_recipients::table.filter(marketing_recipients::id.eq(recipient_id)),
)
.set((
marketing_recipients::status.eq("failed"),
marketing_recipients::failed_at.eq(Some(Utc::now())),
marketing_recipients::error_message.eq(Some(send_err.to_string())),
))
.execute(&mut conn)
.ok();
}
Ok(WhatsAppSendResult {
success: false,
message_id: None,
error: Some(send_err.to_string()),
})
}
}
}
pub async fn send_bulk_whatsapp_messages(
state: &Arc<AppState>,
campaign_id: Uuid,
contacts: Vec<(Uuid, String, String)>,
) -> Result<(i32, i32), String> {
let mut sent = 0;
let mut failed = 0;
let campaign: CrmCampaign = marketing_campaigns::table
.filter(marketing_campaigns::id.eq(campaign_id))
.first(&mut *state.conn.get().map_err(|e| format!("DB error: {}", e))?)
.map_err(|_| "Campaign not found")?;
let body = campaign
.content_template
.get("body")
.and_then(|b| b.as_str())
.unwrap_or("")
.to_string();
for (contact_id, phone, name) in contacts {
let personalized_body = body.replace("{{name}}", &name);
let payload = WhatsAppCampaignPayload {
to: phone,
body: personalized_body,
media_url: campaign.content_template.get("media_url").and_then(|m| m.as_str()).map(String::from),
campaign_id: Some(campaign_id),
recipient_id: Some(contact_id),
template_name: None,
};
match send_whatsapp_message(state, campaign.bot_id, payload).await {
Ok(result) => {
if result.success {
sent += 1;
} else {
failed += 1;
log::error!("WhatsApp send failed: {:?}", result.error);
}
}
Err(e) => {
failed += 1;
log::error!("WhatsApp error: {}", e);
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Ok((sent, failed))
}
pub async fn get_whatsapp_metrics(
state: &Arc<AppState>,
campaign_id: Uuid,
) -> Result<WhatsAppMetrics, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let recipients: Vec<(String, Option<serde_json::Value>)> = marketing_recipients::table
.filter(marketing_recipients::campaign_id.eq(campaign_id))
.filter(marketing_recipients::channel.eq("whatsapp"))
.select((marketing_recipients::status, marketing_recipients::response))
.load(&mut conn)
.map_err(|e| format!("Query error: {}", e))?;
let total = recipients.len() as i64;
let sent = recipients.iter().filter(|(s, _)| s == "sent").count() as i64;
let delivered = recipients
.iter()
.filter(|(_, r)| {
r.as_ref()
.and_then(|v| v.get("status"))
.and_then(|s| s.as_str())
.map(|s| s == "delivered")
.unwrap_or(false)
})
.count() as i64;
let failed = recipients.iter().filter(|(s, _)| s == "failed").count() as i64;
let read = recipients
.iter()
.filter(|(_, r)| {
r.as_ref()
.and_then(|v| v.get("status"))
.and_then(|s| s.as_str())
.map(|s| s == "read")
.unwrap_or(false)
})
.count() as i64;
Ok(WhatsAppMetrics {
total_sent: total,
total_delivered: delivered,
total_failed: failed,
total_read: read,
delivery_rate: if sent > 0 { (delivered as f64 / sent as f64) * 100.0 } else { 0.0 },
read_rate: if delivered > 0 { (read as f64 / delivered as f64) * 100.0 } else { 0.0 },
})
}
pub async fn handle_webhook_event(
state: &Arc<AppState>,
payload: serde_json::Value,
) -> Result<(), String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
if let Some(statuses) = payload.get("entry").and_then(|e| e.as_array())
.and_then(|e| e.first())
.and_then(|e| e.get("changes"))
.and_then(|c| c.as_array())
.and_then(|c| c.first())
.and_then(|c| c.get("value"))
.and_then(|v| v.get("statuses"))
.and_then(|s| s.as_array())
{
for status in statuses {
if let (Some(message_id), Some(status_str)) = (
status.get("id").and_then(|m| m.as_str()),
status.get("status").and_then(|s| s.as_str()),
) {
let delivered_at = if status_str == "delivered" {
Some(Utc::now())
} else {
None
};
diesel::update(marketing_recipients::table.filter(
marketing_recipients::response
.eq(serde_json::json!({ "message_id": message_id })),
))
.set((
marketing_recipients::status.eq(status_str),
marketing_recipients::delivered_at.eq(delivered_at),
))
.execute(&mut conn)
.ok();
}
}
}
Ok(())
}
#[derive(Debug, Deserialize)]
pub struct SendWhatsAppRequest {
pub to: String,
pub body: String,
pub media_url: Option<String>,
pub template_name: Option<String>,
}
pub async fn send_whatsapp_api(
State(state): State<Arc<AppState>>,
Json(req): Json<SendWhatsAppRequest>,
) -> Result<Json<WhatsAppSendResult>, (StatusCode, String)> {
let bot_id = Uuid::nil();
let payload = WhatsAppCampaignPayload {
to: req.to,
body: req.body,
media_url: req.media_url,
campaign_id: None,
recipient_id: None,
template_name: req.template_name,
};
match send_whatsapp_message(&state, bot_id, payload).await {
Ok(result) => Ok(Json(result)),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View file

@ -6,6 +6,8 @@ use std::sync::Arc;
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
use uuid::Uuid; use uuid::Uuid;
use crate::security::command_guard::SafeCommand;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum MetricType { pub enum MetricType {
Counter, Counter,
@ -592,9 +594,9 @@ impl MetricsCollector {
async fn collect_disk_usage(&self) -> f64 { async fn collect_disk_usage(&self) -> f64 {
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {
if let Ok(output) = std::process::Command::new("df") if let Ok(output) = SafeCommand::new("df")?
.args(["-h", "/"]) .args(["-h", "/"])
.output() .execute()
{ {
if let Ok(stdout) = String::from_utf8(output.stdout) { if let Ok(stdout) = String::from_utf8(output.stdout) {
if let Some(line) = stdout.lines().nth(1) { if let Some(line) = stdout.lines().nth(1) {

View file

@ -79,6 +79,14 @@ static ALLOWED_COMMANDS: LazyLock<HashSet<&'static str>> = LazyLock::new(|| {
"visudo", "visudo",
"id", "id",
"netsh", "netsh",
// LLM local servers
"llama-server",
"ollama",
// Python
"python",
"python3",
"python3.11",
"python3.12",
]) ])
}); });
@ -111,6 +119,12 @@ impl std::fmt::Display for CommandGuardError {
} }
} }
impl From<CommandGuardError> for String {
fn from(val: CommandGuardError) -> Self {
val.to_string()
}
}
impl std::error::Error for CommandGuardError {} impl std::error::Error for CommandGuardError {}
pub struct SafeCommand { pub struct SafeCommand {
@ -119,6 +133,8 @@ pub struct SafeCommand {
working_dir: Option<PathBuf>, working_dir: Option<PathBuf>,
allowed_paths: Vec<PathBuf>, allowed_paths: Vec<PathBuf>,
envs: HashMap<String, String>, envs: HashMap<String, String>,
stdout: Option<std::process::Stdio>,
stderr: Option<std::process::Stdio>,
} }
impl SafeCommand { impl SafeCommand {
@ -143,6 +159,8 @@ impl SafeCommand {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")), std::env::current_dir().unwrap_or_else(|_| PathBuf::from("/")),
], ],
envs: HashMap::new(), envs: HashMap::new(),
stdout: None,
stderr: None,
}) })
} }
@ -257,6 +275,16 @@ impl SafeCommand {
Ok(self) Ok(self)
} }
pub fn stdout(mut self, stdout: std::process::Stdio) -> Self {
self.stdout = Some(stdout);
self
}
pub fn stderr(mut self, stderr: std::process::Stdio) -> Self {
self.stderr = Some(stderr);
self
}
pub fn execute(&self) -> Result<Output, CommandGuardError> { pub fn execute(&self) -> Result<Output, CommandGuardError> {
let mut cmd = std::process::Command::new(&self.command); let mut cmd = std::process::Command::new(&self.command);
cmd.args(&self.args); cmd.args(&self.args);
@ -331,7 +359,7 @@ impl SafeCommand {
.map_err(|e| CommandGuardError::ExecutionFailed(e.to_string())) .map_err(|e| CommandGuardError::ExecutionFailed(e.to_string()))
} }
pub fn spawn(&self) -> Result<Child, CommandGuardError> { pub fn spawn(&mut self) -> Result<Child, CommandGuardError> {
let mut cmd = std::process::Command::new(&self.command); let mut cmd = std::process::Command::new(&self.command);
cmd.args(&self.args); cmd.args(&self.args);
@ -339,6 +367,14 @@ impl SafeCommand {
cmd.current_dir(dir); cmd.current_dir(dir);
} }
if let Some(stdout) = self.stdout.take() {
cmd.stdout(stdout);
}
if let Some(stderr) = self.stderr.take() {
cmd.stderr(stderr);
}
cmd.env_clear(); cmd.env_clear();
// Build PATH with standard locations plus botserver-stack/bin/shared // Build PATH with standard locations plus botserver-stack/bin/shared

View file

@ -996,6 +996,24 @@ pub fn build_default_route_permissions() -> Vec<RoutePermission> {
RoutePermission::new("/api/files/**", "PUT", ""), RoutePermission::new("/api/files/**", "PUT", ""),
RoutePermission::new("/api/files/**", "DELETE", ""), RoutePermission::new("/api/files/**", "DELETE", ""),
// Editor
RoutePermission::new("/api/editor/**", "GET", ""),
RoutePermission::new("/api/editor/**", "POST", ""),
RoutePermission::new("/api/editor/**", "PUT", ""),
RoutePermission::new("/api/editor/**", "DELETE", ""),
// Database
RoutePermission::new("/api/database/**", "GET", ""),
RoutePermission::new("/api/database/**", "POST", ""),
RoutePermission::new("/api/database/**", "PUT", ""),
RoutePermission::new("/api/database/**", "DELETE", ""),
// Git
RoutePermission::new("/api/git/**", "GET", ""),
RoutePermission::new("/api/git/**", "POST", ""),
RoutePermission::new("/api/git/**", "PUT", ""),
RoutePermission::new("/api/git/**", "DELETE", ""),
// Mail // Mail
RoutePermission::new("/api/mail/**", "GET", ""), RoutePermission::new("/api/mail/**", "GET", ""),
RoutePermission::new("/api/mail/**", "POST", ""), RoutePermission::new("/api/mail/**", "POST", ""),