diff --git a/src/attendance/llm_assist.rs b/src/attendance/llm_assist.rs new file mode 100644 index 000000000..8a9969f0f --- /dev/null +++ b/src/attendance/llm_assist.rs @@ -0,0 +1,2090 @@ +//! LLM-Assisted Attendant Features +//! +//! Provides AI-powered assistance to human attendants during customer conversations. +//! These features help attendants respond faster, more professionally, and with better context. +//! +//! ## Features +//! +//! 1. **Real-time Tips** (`attendant-llm-tips`) - Contextual tips when customer messages arrive +//! 2. **Message Polish** (`attendant-polish-message`) - Improve grammar/tone before sending +//! 3. **Smart Replies** (`attendant-smart-replies`) - Generate 3 contextual reply suggestions +//! 4. **Auto Summary** (`attendant-auto-summary`) - Summarize conversation when attendant joins +//! 5. **Sentiment Analysis** (`attendant-sentiment-analysis`) - Real-time emotional state tracking +//! +//! ## Config.csv Properties +//! +//! ```csv +//! name,value +//! attendant-llm-tips,true +//! attendant-polish-message,true +//! attendant-smart-replies,true +//! attendant-auto-summary,true +//! attendant-sentiment-analysis,true +//! ``` +//! +//! ## WhatsApp Attendant Commands +//! +//! Attendants on WhatsApp can use these commands: +//! - `/queue` - View current queue +//! - `/take` - Take next conversation +//! - `/status [online|busy|away|offline]` - Set status +//! - `/transfer @name` - Transfer current conversation +//! - `/resolve` - Mark conversation as resolved +//! - `/tips` - Get tips for current conversation +//! - `/polish ` - Polish a message before sending +//! - `/replies` - Get smart reply suggestions +//! - `/summary` - Get conversation summary + +use crate::core::config::ConfigManager; +use crate::shared::models::UserSession; +use crate::shared::state::AppState; +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + Json, +}; +use chrono::Utc; +use diesel::prelude::*; +use log::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::sync::Arc; +use uuid::Uuid; + +// ============================================================================ +// Configuration +// ============================================================================ + +/// LLM Assist configuration loaded from config.csv +#[derive(Debug, Clone, Default)] +pub struct LlmAssistConfig { + /// Enable real-time tips when customer messages arrive + pub tips_enabled: bool, + /// Enable message polishing before sending + pub polish_enabled: bool, + /// Enable smart reply generation + pub smart_replies_enabled: bool, + /// Enable auto-summary when attendant takes conversation + pub auto_summary_enabled: bool, + /// Enable LLM-powered sentiment analysis + pub sentiment_enabled: bool, + /// Bot's system prompt for context + pub bot_system_prompt: Option, + /// Bot's description for context + pub bot_description: Option, +} + +impl LlmAssistConfig { + /// Load configuration from config.csv + pub fn from_config(bot_id: Uuid, work_path: &str) -> Self { + let config_path = PathBuf::from(work_path) + .join(format!("{}.gbai", bot_id)) + .join("config.csv"); + + let alt_path = PathBuf::from(work_path).join("config.csv"); + + let path = if config_path.exists() { + config_path + } else if alt_path.exists() { + alt_path + } else { + return Self::default(); + }; + + let mut config = Self::default(); + + if let Ok(content) = std::fs::read_to_string(&path) { + for line in content.lines() { + let line_lower = line.to_lowercase(); + let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect(); + + if parts.len() < 2 { + continue; + } + + let key = parts[0].to_lowercase(); + let value = parts[1]; + + match key.as_str() { + "attendant-llm-tips" => { + config.tips_enabled = value.to_lowercase() == "true"; + } + "attendant-polish-message" => { + config.polish_enabled = value.to_lowercase() == "true"; + } + "attendant-smart-replies" => { + config.smart_replies_enabled = value.to_lowercase() == "true"; + } + "attendant-auto-summary" => { + config.auto_summary_enabled = value.to_lowercase() == "true"; + } + "attendant-sentiment-analysis" => { + config.sentiment_enabled = value.to_lowercase() == "true"; + } + "bot-description" | "bot_description" => { + config.bot_description = Some(value.to_string()); + } + "bot-system-prompt" | "system-prompt" => { + config.bot_system_prompt = Some(value.to_string()); + } + _ => {} + } + } + } + + info!( + "LLM Assist config loaded: tips={}, polish={}, replies={}, summary={}, sentiment={}", + config.tips_enabled, + config.polish_enabled, + config.smart_replies_enabled, + config.auto_summary_enabled, + config.sentiment_enabled + ); + + config + } + + /// Check if any LLM assist feature is enabled + pub fn any_enabled(&self) -> bool { + self.tips_enabled + || self.polish_enabled + || self.smart_replies_enabled + || self.auto_summary_enabled + || self.sentiment_enabled + } +} + +// ============================================================================ +// Request/Response Types +// ============================================================================ + +/// Request for generating tips based on customer message +#[derive(Debug, Deserialize)] +pub struct TipRequest { + pub session_id: Uuid, + pub customer_message: String, + /// Recent conversation history for context + #[serde(default)] + pub history: Vec, +} + +/// Request for polishing an attendant's message +#[derive(Debug, Deserialize)] +pub struct PolishRequest { + pub session_id: Uuid, + pub message: String, + /// Desired tone: professional, friendly, empathetic, formal + #[serde(default = "default_tone")] + pub tone: String, +} + +fn default_tone() -> String { + "professional".to_string() +} + +/// Request for smart reply suggestions +#[derive(Debug, Deserialize)] +pub struct SmartRepliesRequest { + pub session_id: Uuid, + #[serde(default)] + pub history: Vec, +} + +/// Request for conversation summary +#[derive(Debug, Deserialize)] +pub struct SummaryRequest { + pub session_id: Uuid, +} + +/// Request for sentiment analysis +#[derive(Debug, Deserialize)] +pub struct SentimentRequest { + pub session_id: Uuid, + pub message: String, + #[serde(default)] + pub history: Vec, +} + +/// Conversation message for context +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConversationMessage { + pub role: String, // "customer", "attendant", "bot" + pub content: String, + pub timestamp: Option, +} + +/// Response with tips for the attendant +#[derive(Debug, Serialize)] +pub struct TipResponse { + pub success: bool, + pub tips: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Individual tip for attendant +#[derive(Debug, Clone, Serialize)] +pub struct AttendantTip { + pub tip_type: TipType, + pub content: String, + pub confidence: f32, + pub priority: i32, // 1 = high, 2 = medium, 3 = low +} + +/// Types of tips +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TipType { + /// Customer intent detected + Intent, + /// Suggested action to take + Action, + /// Warning about sentiment/escalation + Warning, + /// Relevant knowledge base info + Knowledge, + /// Customer history insight + History, + /// General helpful tip + General, +} + +/// Response with polished message +#[derive(Debug, Serialize)] +pub struct PolishResponse { + pub success: bool, + pub original: String, + pub polished: String, + pub changes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Response with smart reply suggestions +#[derive(Debug, Serialize)] +pub struct SmartRepliesResponse { + pub success: bool, + pub replies: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Individual smart reply suggestion +#[derive(Debug, Clone, Serialize)] +pub struct SmartReply { + pub text: String, + pub tone: String, + pub confidence: f32, + pub category: String, // "greeting", "answer", "follow_up", "closing" +} + +/// Response with conversation summary +#[derive(Debug, Serialize)] +pub struct SummaryResponse { + pub success: bool, + pub summary: ConversationSummary, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Conversation summary +#[derive(Debug, Clone, Serialize, Default)] +pub struct ConversationSummary { + pub brief: String, + pub key_points: Vec, + pub customer_needs: Vec, + pub unresolved_issues: Vec, + pub sentiment_trend: String, + pub recommended_action: String, + pub message_count: i32, + pub duration_minutes: i32, +} + +/// Response with sentiment analysis +#[derive(Debug, Serialize)] +pub struct SentimentResponse { + pub success: bool, + pub sentiment: SentimentAnalysis, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Sentiment analysis result +#[derive(Debug, Clone, Serialize, Default)] +pub struct SentimentAnalysis { + pub overall: String, // positive, neutral, negative + pub score: f32, // -1.0 to 1.0 + pub emotions: Vec, // detected emotions + pub escalation_risk: String, // low, medium, high + pub urgency: String, // low, normal, high, urgent + pub emoji: String, // emoji representation +} + +/// Detected emotion +#[derive(Debug, Clone, Serialize)] +pub struct Emotion { + pub name: String, + pub intensity: f32, // 0.0 to 1.0 +} + +// ============================================================================ +// LLM Integration +// ============================================================================ + +/// Execute LLM generation with the bot's context +async fn execute_llm_with_context( + state: &Arc, + bot_id: Uuid, + system_prompt: &str, + user_prompt: &str, +) -> Result> { + let config_manager = ConfigManager::new(state.conn.clone()); + + let model = config_manager + .get_config(&bot_id, "llm-model", None) + .unwrap_or_else(|_| { + config_manager + .get_config(&Uuid::nil(), "llm-model", None) + .unwrap_or_default() + }); + + let key = config_manager + .get_config(&bot_id, "llm-key", None) + .unwrap_or_else(|_| { + config_manager + .get_config(&Uuid::nil(), "llm-key", None) + .unwrap_or_default() + }); + + // Build messages with system prompt + let messages = serde_json::json!([ + { + "role": "system", + "content": system_prompt + }, + { + "role": "user", + "content": user_prompt + } + ]); + + let response = state + .llm_provider + .generate(user_prompt, &messages, &model, &key) + .await?; + + // Process response through model handler + let handler = crate::llm::llm_models::get_handler(&model); + let processed = handler.process_content(&response); + + Ok(processed) +} + +/// Get the bot's system prompt from config or start.bas +fn get_bot_system_prompt(bot_id: Uuid, work_path: &str) -> String { + // Try config first + let config = LlmAssistConfig::from_config(bot_id, work_path); + if let Some(prompt) = config.bot_system_prompt { + return prompt; + } + + // Try to read from start.bas header comments + let start_bas_path = PathBuf::from(work_path) + .join(format!("{}.gbai", bot_id)) + .join(format!("{}.gbdialog", bot_id)) + .join("start.bas"); + + if let Ok(content) = std::fs::read_to_string(&start_bas_path) { + // Extract description from REM/comments at start + let mut description_lines = Vec::new(); + for line in content.lines() { + let trimmed = line.trim(); + if trimmed.starts_with("REM ") || trimmed.starts_with("' ") { + let comment = trimmed.trim_start_matches("REM ").trim_start_matches("' "); + description_lines.push(comment); + } else if !trimmed.is_empty() { + break; + } + } + if !description_lines.is_empty() { + return description_lines.join(" "); + } + } + + // Default professional assistant prompt + "You are a professional customer service assistant. Be helpful, empathetic, and solution-oriented. Maintain a friendly but professional tone.".to_string() +} + +// ============================================================================ +// API Handlers +// ============================================================================ + +/// POST /api/attendance/llm/tips +/// Generate contextual tips for the attendant based on customer message +pub async fn generate_tips( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!("Generating tips for session {}", request.session_id); + + // Get session and bot info + let session_result = get_session(&state, request.session_id).await; + let session = match session_result { + Ok(s) => s, + Err(e) => { + return ( + StatusCode::NOT_FOUND, + Json(TipResponse { + success: false, + tips: vec![], + error: Some(e), + }), + ) + } + }; + + // Check if tips are enabled + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(session.bot_id, &work_path); + + if !config.tips_enabled { + return ( + StatusCode::OK, + Json(TipResponse { + success: true, + tips: vec![], + error: Some("Tips feature is disabled".to_string()), + }), + ); + } + + // Build context from history + let history_context = request + .history + .iter() + .map(|m| format!("{}: {}", m.role, m.content)) + .collect::>() + .join("\n"); + + let bot_prompt = get_bot_system_prompt(session.bot_id, &work_path); + + let system_prompt = format!( + r#"You are an AI assistant helping a human customer service attendant. +The bot they are replacing has this personality: {} + +Your job is to provide helpful tips to the attendant based on the customer's message. + +Analyze the customer message and provide 2-4 actionable tips. For each tip, classify it as: +- intent: What the customer wants +- action: Suggested action for attendant +- warning: Sentiment or escalation concern +- knowledge: Relevant info they should know +- history: Insight from conversation history +- general: General helpful advice + +Respond in JSON format: +{{ + "tips": [ + {{"type": "intent", "content": "...", "confidence": 0.9, "priority": 1}}, + {{"type": "action", "content": "...", "confidence": 0.8, "priority": 2}} + ] +}}"#, + bot_prompt + ); + + let user_prompt = format!( + r#"Conversation history: +{} + +Latest customer message: "{}" + +Provide tips for the attendant."#, + history_context, request.customer_message + ); + + match execute_llm_with_context(&state, session.bot_id, &system_prompt, &user_prompt).await { + Ok(response) => { + // Parse JSON response + let tips = parse_tips_response(&response); + ( + StatusCode::OK, + Json(TipResponse { + success: true, + tips, + error: None, + }), + ) + } + Err(e) => { + error!("LLM error generating tips: {}", e); + // Return fallback tips + ( + StatusCode::OK, + Json(TipResponse { + success: true, + tips: generate_fallback_tips(&request.customer_message), + error: Some(format!("LLM unavailable, using fallback: {}", e)), + }), + ) + } + } +} + +/// POST /api/attendance/llm/polish +/// Polish an attendant's message for better grammar and tone +pub async fn polish_message( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!("Polishing message for session {}", request.session_id); + + let session_result = get_session(&state, request.session_id).await; + let session = match session_result { + Ok(s) => s, + Err(e) => { + return ( + StatusCode::NOT_FOUND, + Json(PolishResponse { + success: false, + original: request.message.clone(), + polished: request.message.clone(), + changes: vec![], + error: Some(e), + }), + ) + } + }; + + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(session.bot_id, &work_path); + + if !config.polish_enabled { + return ( + StatusCode::OK, + Json(PolishResponse { + success: true, + original: request.message.clone(), + polished: request.message.clone(), + changes: vec![], + error: Some("Polish feature is disabled".to_string()), + }), + ); + } + + let bot_prompt = get_bot_system_prompt(session.bot_id, &work_path); + + let system_prompt = format!( + r#"You are a professional editor helping a customer service attendant. +The service has this tone: {} + +Your job is to polish the attendant's message to be more {} while: +1. Fixing grammar and spelling errors +2. Improving clarity and flow +3. Maintaining the original meaning +4. Keeping it natural (not robotic) + +Respond in JSON format: +{{ + "polished": "The improved message", + "changes": ["Changed X to Y", "Fixed grammar in..."] +}}"#, + bot_prompt, request.tone + ); + + let user_prompt = format!( + r#"Polish this message with a {} tone: + +"{}""#, + request.tone, request.message + ); + + match execute_llm_with_context(&state, session.bot_id, &system_prompt, &user_prompt).await { + Ok(response) => { + let (polished, changes) = parse_polish_response(&response, &request.message); + ( + StatusCode::OK, + Json(PolishResponse { + success: true, + original: request.message.clone(), + polished, + changes, + error: None, + }), + ) + } + Err(e) => { + error!("LLM error polishing message: {}", e); + ( + StatusCode::OK, + Json(PolishResponse { + success: false, + original: request.message.clone(), + polished: request.message.clone(), + changes: vec![], + error: Some(format!("LLM error: {}", e)), + }), + ) + } + } +} + +/// POST /api/attendance/llm/smart-replies +/// Generate smart reply suggestions based on conversation +pub async fn generate_smart_replies( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!( + "Generating smart replies for session {}", + request.session_id + ); + + let session_result = get_session(&state, request.session_id).await; + let session = match session_result { + Ok(s) => s, + Err(e) => { + return ( + StatusCode::NOT_FOUND, + Json(SmartRepliesResponse { + success: false, + replies: vec![], + error: Some(e), + }), + ) + } + }; + + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(session.bot_id, &work_path); + + if !config.smart_replies_enabled { + return ( + StatusCode::OK, + Json(SmartRepliesResponse { + success: true, + replies: vec![], + error: Some("Smart replies feature is disabled".to_string()), + }), + ); + } + + let history_context = request + .history + .iter() + .map(|m| format!("{}: {}", m.role, m.content)) + .collect::>() + .join("\n"); + + let bot_prompt = get_bot_system_prompt(session.bot_id, &work_path); + + let system_prompt = format!( + r#"You are an AI assistant helping a customer service attendant craft responses. +The service has this personality: {} + +Generate exactly 3 reply suggestions that: +1. Are contextually appropriate +2. Sound natural and human (not robotic) +3. Vary in approach (one empathetic, one solution-focused, one follow-up) +4. Are ready to send (no placeholders like [name]) + +Respond in JSON format: +{{ + "replies": [ + {{"text": "...", "tone": "empathetic", "confidence": 0.9, "category": "answer"}}, + {{"text": "...", "tone": "professional", "confidence": 0.85, "category": "solution"}}, + {{"text": "...", "tone": "friendly", "confidence": 0.8, "category": "follow_up"}} + ] +}}"#, + bot_prompt + ); + + let user_prompt = format!( + r#"Conversation: +{} + +Generate 3 reply options for the attendant."#, + history_context + ); + + match execute_llm_with_context(&state, session.bot_id, &system_prompt, &user_prompt).await { + Ok(response) => { + let replies = parse_smart_replies_response(&response); + ( + StatusCode::OK, + Json(SmartRepliesResponse { + success: true, + replies, + error: None, + }), + ) + } + Err(e) => { + error!("LLM error generating smart replies: {}", e); + ( + StatusCode::OK, + Json(SmartRepliesResponse { + success: true, + replies: generate_fallback_replies(), + error: Some(format!("LLM unavailable, using fallback: {}", e)), + }), + ) + } + } +} + +/// GET /api/attendance/llm/summary/{session_id} +/// Generate a summary of the conversation +pub async fn generate_summary( + State(state): State>, + Path(session_id): Path, +) -> impl IntoResponse { + info!("Generating summary for session {}", session_id); + + let session_result = get_session(&state, session_id).await; + let session = match session_result { + Ok(s) => s, + Err(e) => { + return ( + StatusCode::NOT_FOUND, + Json(SummaryResponse { + success: false, + summary: ConversationSummary::default(), + error: Some(e), + }), + ) + } + }; + + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(session.bot_id, &work_path); + + if !config.auto_summary_enabled { + return ( + StatusCode::OK, + Json(SummaryResponse { + success: true, + summary: ConversationSummary::default(), + error: Some("Auto-summary feature is disabled".to_string()), + }), + ); + } + + // Load conversation history from database + let history = load_conversation_history(&state, session_id).await; + + if history.is_empty() { + return ( + StatusCode::OK, + Json(SummaryResponse { + success: true, + summary: ConversationSummary { + brief: "No messages in conversation yet".to_string(), + ..Default::default() + }, + error: None, + }), + ); + } + + let history_text = history + .iter() + .map(|m| format!("{}: {}", m.role, m.content)) + .collect::>() + .join("\n"); + + let bot_prompt = get_bot_system_prompt(session.bot_id, &work_path); + + let system_prompt = format!( + r#"You are an AI assistant helping a customer service attendant understand a conversation. +The bot/service personality is: {} + +Analyze the conversation and provide a comprehensive summary. + +Respond in JSON format: +{{ + "brief": "One sentence summary", + "key_points": ["Point 1", "Point 2"], + "customer_needs": ["Need 1", "Need 2"], + "unresolved_issues": ["Issue 1"], + "sentiment_trend": "improving/stable/declining", + "recommended_action": "What the attendant should do next" +}}"#, + bot_prompt + ); + + let user_prompt = format!( + r#"Summarize this conversation: + +{}"#, + history_text + ); + + match execute_llm_with_context(&state, session.bot_id, &system_prompt, &user_prompt).await { + Ok(response) => { + let mut summary = parse_summary_response(&response); + summary.message_count = history.len() as i32; + + // Calculate duration if we have timestamps + if let (Some(first_ts), Some(last_ts)) = ( + history.first().and_then(|m| m.timestamp.as_ref()), + history.last().and_then(|m| m.timestamp.as_ref()), + ) { + if let (Ok(first), Ok(last)) = ( + chrono::DateTime::parse_from_rfc3339(first_ts), + chrono::DateTime::parse_from_rfc3339(last_ts), + ) { + summary.duration_minutes = (last - first).num_minutes() as i32; + } + } + + ( + StatusCode::OK, + Json(SummaryResponse { + success: true, + summary, + error: None, + }), + ) + } + Err(e) => { + error!("LLM error generating summary: {}", e); + ( + StatusCode::OK, + Json(SummaryResponse { + success: false, + summary: ConversationSummary { + brief: format!("Conversation with {} messages", history.len()), + message_count: history.len() as i32, + ..Default::default() + }, + error: Some(format!("LLM error: {}", e)), + }), + ) + } + } +} + +/// POST /api/attendance/llm/sentiment +/// Analyze sentiment of customer message +pub async fn analyze_sentiment( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!("Analyzing sentiment for session {}", request.session_id); + + let session_result = get_session(&state, request.session_id).await; + let session = match session_result { + Ok(s) => s, + Err(e) => { + return ( + StatusCode::NOT_FOUND, + Json(SentimentResponse { + success: false, + sentiment: SentimentAnalysis::default(), + error: Some(e), + }), + ) + } + }; + + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(session.bot_id, &work_path); + + if !config.sentiment_enabled { + // Fall back to keyword-based analysis + let sentiment = analyze_sentiment_keywords(&request.message); + return ( + StatusCode::OK, + Json(SentimentResponse { + success: true, + sentiment, + error: Some("LLM sentiment disabled, using keyword analysis".to_string()), + }), + ); + } + + let history_context = request + .history + .iter() + .take(5) // Last 5 messages for context + .map(|m| format!("{}: {}", m.role, m.content)) + .collect::>() + .join("\n"); + + let system_prompt = r#"You are a sentiment analysis expert. Analyze the customer's emotional state. + +Consider: +1. Overall sentiment (positive/neutral/negative) +2. Specific emotions present +3. Risk of escalation +4. Urgency level + +Respond in JSON format: +{ + "overall": "positive|neutral|negative", + "score": 0.5, + "emotions": [{"name": "frustration", "intensity": 0.7}], + "escalation_risk": "low|medium|high", + "urgency": "low|normal|high|urgent", + "emoji": "😐" +}"#; + + let user_prompt = format!( + r#"Recent conversation: +{} + +Current message to analyze: "{}" + +Analyze the customer's sentiment."#, + history_context, request.message + ); + + match execute_llm_with_context(&state, session.bot_id, system_prompt, &user_prompt).await { + Ok(response) => { + let sentiment = parse_sentiment_response(&response); + ( + StatusCode::OK, + Json(SentimentResponse { + success: true, + sentiment, + error: None, + }), + ) + } + Err(e) => { + error!("LLM error analyzing sentiment: {}", e); + let sentiment = analyze_sentiment_keywords(&request.message); + ( + StatusCode::OK, + Json(SentimentResponse { + success: true, + sentiment, + error: Some(format!("LLM unavailable, using fallback: {}", e)), + }), + ) + } + } +} + +/// GET /api/attendance/llm/config/{bot_id} +/// Get LLM assist configuration for a bot +pub async fn get_llm_config( + State(_state): State>, + Path(bot_id): Path, +) -> impl IntoResponse { + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + let config = LlmAssistConfig::from_config(bot_id, &work_path); + + ( + StatusCode::OK, + Json(serde_json::json!({ + "tips_enabled": config.tips_enabled, + "polish_enabled": config.polish_enabled, + "smart_replies_enabled": config.smart_replies_enabled, + "auto_summary_enabled": config.auto_summary_enabled, + "sentiment_enabled": config.sentiment_enabled, + "any_enabled": config.any_enabled() + })), + ) +} + +// ============================================================================ +// WhatsApp Attendant Commands +// ============================================================================ + +/// Process WhatsApp command from attendant +pub async fn process_attendant_command( + state: &Arc, + attendant_phone: &str, + command: &str, + current_session: Option, +) -> Result { + let parts: Vec<&str> = command.trim().split_whitespace().collect(); + if parts.is_empty() { + return Err("Empty command".to_string()); + } + + let cmd = parts[0].to_lowercase(); + let args: Vec<&str> = parts[1..].to_vec(); + + match cmd.as_str() { + "/queue" | "/fila" => handle_queue_command(state).await, + "/take" | "/pegar" => handle_take_command(state, attendant_phone).await, + "/status" => handle_status_command(state, attendant_phone, args).await, + "/transfer" | "/transferir" => handle_transfer_command(state, current_session, args).await, + "/resolve" | "/resolver" => handle_resolve_command(state, current_session).await, + "/tips" | "/dicas" => handle_tips_command(state, current_session).await, + "/polish" | "/polir" => { + let message = args.join(" "); + handle_polish_command(state, current_session, &message).await + } + "/replies" | "/respostas" => handle_replies_command(state, current_session).await, + "/summary" | "/resumo" => handle_summary_command(state, current_session).await, + "/help" | "/ajuda" => Ok(get_help_text()), + _ => Err(format!( + "Unknown command: {}. Type /help for available commands.", + cmd + )), + } +} + +async fn handle_queue_command(state: &Arc) -> Result { + // Get queue items + let conn = state.conn.clone(); + let result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| e.to_string())?; + + use crate::shared::models::schema::user_sessions; + + let sessions: Vec = user_sessions::table + .filter( + user_sessions::context_data + .retrieve_as_text("needs_human") + .eq("true"), + ) + .filter( + user_sessions::context_data + .retrieve_as_text("status") + .ne("resolved"), + ) + .order(user_sessions::updated_at.desc()) + .limit(10) + .load(&mut db_conn) + .map_err(|e| e.to_string())?; + + Ok::, String>(sessions) + }) + .await + .map_err(|e| e.to_string())??; + + if result.is_empty() { + return Ok("šŸ“‹ *Queue is empty*\nNo conversations waiting for attention.".to_string()); + } + + let mut response = format!("šŸ“‹ *Queue* ({} waiting)\n\n", result.len()); + + for (i, session) in result.iter().enumerate() { + let name = session + .context_data + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("Unknown"); + let channel = session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web"); + let status = session + .context_data + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or("waiting"); + + response.push_str(&format!( + "{}. *{}* ({})\n Status: {} | ID: {}\n\n", + i + 1, + name, + channel, + status, + &session.id.to_string()[..8] + )); + } + + response.push_str("Type `/take` to take the next conversation."); + + Ok(response) +} + +async fn handle_take_command( + state: &Arc, + attendant_phone: &str, +) -> Result { + let conn = state.conn.clone(); + let phone = attendant_phone.to_string(); + + let result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| e.to_string())?; + + use crate::shared::models::schema::user_sessions; + + // Find next waiting session + let session: Option = user_sessions::table + .filter( + user_sessions::context_data + .retrieve_as_text("needs_human") + .eq("true"), + ) + .filter( + user_sessions::context_data + .retrieve_as_text("status") + .eq("waiting"), + ) + .order(user_sessions::updated_at.asc()) + .first(&mut db_conn) + .optional() + .map_err(|e| e.to_string())?; + + if let Some(session) = session { + // Assign to attendant + let mut ctx = session.context_data.clone(); + ctx["assigned_to_phone"] = serde_json::json!(phone); + ctx["status"] = serde_json::json!("assigned"); + ctx["assigned_at"] = serde_json::json!(Utc::now().to_rfc3339()); + + diesel::update(user_sessions::table.filter(user_sessions::id.eq(session.id))) + .set(user_sessions::context_data.eq(&ctx)) + .execute(&mut db_conn) + .map_err(|e| e.to_string())?; + + let name = session + .context_data + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("Unknown"); + + Ok(format!( + "āœ… *Conversation assigned*\n\nCustomer: *{}*\nSession: {}\n\nYou can now respond to this customer. Their messages will be forwarded to you.", + name, + &session.id.to_string()[..8] + )) + } else { + Ok("šŸ“­ No conversations waiting in queue.".to_string()) + } + }) + .await + .map_err(|e| e.to_string())??; + + Ok(result) +} + +async fn handle_status_command( + _state: &Arc, + _attendant_phone: &str, + args: Vec<&str>, +) -> Result { + if args.is_empty() { + return Ok( + "šŸ“Š *Status Options*\n\n`/status online` - Available\n`/status busy` - In conversation\n`/status away` - Temporarily away\n`/status offline` - Not available" + .to_string(), + ); + } + + let status = args[0].to_lowercase(); + let (emoji, text) = match status.as_str() { + "online" => ("🟢", "Online - Available for conversations"), + "busy" => ("🟔", "Busy - Handling conversations"), + "away" => ("🟠", "Away - Temporarily unavailable"), + "offline" => ("⚫", "Offline - Not available"), + _ => { + return Err(format!( + "Invalid status: {}. Use online, busy, away, or offline.", + status + )) + } + }; + + // TODO: Update attendant status in database + + Ok(format!("{} Status set to *{}*", emoji, text)) +} + +async fn handle_transfer_command( + _state: &Arc, + current_session: Option, + args: Vec<&str>, +) -> Result { + let session_id = current_session.ok_or("No active conversation to transfer")?; + + if args.is_empty() { + return Err("Usage: `/transfer @attendant_name` or `/transfer department`".to_string()); + } + + let target = args.join(" "); + + // TODO: Implement actual transfer logic + + Ok(format!( + "šŸ”„ *Transfer initiated*\n\nSession {} is being transferred to {}.\nThe new attendant will be notified.", + &session_id.to_string()[..8], + target + )) +} + +async fn handle_resolve_command( + state: &Arc, + current_session: Option, +) -> Result { + let session_id = current_session.ok_or("No active conversation to resolve")?; + + let conn = state.conn.clone(); + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| e.to_string())?; + + use crate::shared::models::schema::user_sessions; + + let session: UserSession = user_sessions::table + .find(session_id) + .first(&mut db_conn) + .map_err(|e| e.to_string())?; + + let mut ctx = session.context_data.clone(); + ctx["status"] = serde_json::json!("resolved"); + ctx["needs_human"] = serde_json::json!(false); + ctx["resolved_at"] = serde_json::json!(Utc::now().to_rfc3339()); + + diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) + .set(user_sessions::context_data.eq(&ctx)) + .execute(&mut db_conn) + .map_err(|e| e.to_string())?; + + Ok::<(), String>(()) + }) + .await + .map_err(|e| e.to_string())??; + + Ok(format!( + "āœ… *Conversation resolved*\n\nSession {} has been marked as resolved. The customer will be returned to bot mode.", + &session_id.to_string()[..8] + )) +} + +async fn handle_tips_command( + state: &Arc, + current_session: Option, +) -> Result { + let session_id = current_session.ok_or("No active conversation. Use /take first.")?; + + // Get recent messages and generate tips + let history = load_conversation_history(state, session_id).await; + + if history.is_empty() { + return Ok( + "šŸ’” No messages yet. Tips will appear when the customer sends a message.".to_string(), + ); + } + + let last_customer_msg = history + .iter() + .rev() + .find(|m| m.role == "customer") + .map(|m| m.content.clone()) + .unwrap_or_default(); + + let request = TipRequest { + session_id, + customer_message: last_customer_msg, + history, + }; + + // Generate tips + let response = generate_tips(State(state.clone()), Json(request)).await; + let (_, Json(tip_response)) = response.into_response().into_parts(); + + if tip_response.tips.is_empty() { + return Ok("šŸ’” No specific tips for this conversation yet.".to_string()); + } + + let mut result = "šŸ’” *Tips for this conversation*\n\n".to_string(); + + for tip in tip_response.tips { + let emoji = match tip.tip_type { + TipType::Intent => "šŸŽÆ", + TipType::Action => "āœ…", + TipType::Warning => "āš ļø", + TipType::Knowledge => "šŸ“š", + TipType::History => "šŸ“œ", + TipType::General => "šŸ’”", + }; + result.push_str(&format!("{} {}\n\n", emoji, tip.content)); + } + + Ok(result) +} + +async fn handle_polish_command( + state: &Arc, + current_session: Option, + message: &str, +) -> Result { + let session_id = current_session.ok_or("No active conversation")?; + + if message.is_empty() { + return Err("Usage: `/polish Your message here`".to_string()); + } + + let request = PolishRequest { + session_id, + message: message.to_string(), + tone: "professional".to_string(), + }; + + let response = polish_message(State(state.clone()), Json(request)).await; + let (_, Json(polish_response)) = response.into_response().into_parts(); + + if !polish_response.success { + return Err(polish_response + .error + .unwrap_or("Failed to polish message".to_string())); + } + + let mut result = "✨ *Polished message*\n\n".to_string(); + result.push_str(&format!("_{}_\n\n", polish_response.polished)); + + if !polish_response.changes.is_empty() { + result.push_str("Changes:\n"); + for change in polish_response.changes { + result.push_str(&format!("• {}\n", change)); + } + } + + result.push_str("\n_Copy and send, or edit as needed._"); + + Ok(result) +} + +async fn handle_replies_command( + state: &Arc, + current_session: Option, +) -> Result { + let session_id = current_session.ok_or("No active conversation")?; + + let history = load_conversation_history(state, session_id).await; + + let request = SmartRepliesRequest { + session_id, + history, + }; + + let response = generate_smart_replies(State(state.clone()), Json(request)).await; + let (_, Json(replies_response)) = response.into_response().into_parts(); + + if replies_response.replies.is_empty() { + return Ok("šŸ’¬ No reply suggestions available.".to_string()); + } + + let mut result = "šŸ’¬ *Suggested replies*\n\n".to_string(); + + for (i, reply) in replies_response.replies.iter().enumerate() { + result.push_str(&format!( + "*{}. {}*\n_{}_\n\n", + i + 1, + reply.tone.to_uppercase(), + reply.text + )); + } + + result.push_str("_Copy any reply or use as inspiration._"); + + Ok(result) +} + +async fn handle_summary_command( + state: &Arc, + current_session: Option, +) -> Result { + let session_id = current_session.ok_or("No active conversation")?; + + let response = generate_summary(State(state.clone()), Path(session_id)).await; + let (_, Json(summary_response)) = response.into_response().into_parts(); + + if !summary_response.success { + return Err(summary_response + .error + .unwrap_or("Failed to generate summary".to_string())); + } + + let summary = summary_response.summary; + + let mut result = "šŸ“ *Conversation Summary*\n\n".to_string(); + result.push_str(&format!("{}\n\n", summary.brief)); + + if !summary.key_points.is_empty() { + result.push_str("*Key Points:*\n"); + for point in &summary.key_points { + result.push_str(&format!("• {}\n", point)); + } + result.push('\n'); + } + + if !summary.customer_needs.is_empty() { + result.push_str("*Customer Needs:*\n"); + for need in &summary.customer_needs { + result.push_str(&format!("• {}\n", need)); + } + result.push('\n'); + } + + if !summary.unresolved_issues.is_empty() { + result.push_str("*Unresolved:*\n"); + for issue in &summary.unresolved_issues { + result.push_str(&format!("• {}\n", issue)); + } + result.push('\n'); + } + + result.push_str(&format!( + "šŸ“Š {} messages | {} minutes | Sentiment: {}", + summary.message_count, summary.duration_minutes, summary.sentiment_trend + )); + + if !summary.recommended_action.is_empty() { + result.push_str(&format!( + "\n\nšŸ’” *Recommended:* {}", + summary.recommended_action + )); + } + + Ok(result) +} + +fn get_help_text() -> String { + r#"šŸ¤– *Attendant Commands* + +*Queue Management:* +`/queue` - View waiting conversations +`/take` - Take next conversation +`/transfer @name` - Transfer conversation +`/resolve` - Mark as resolved +`/status [online|busy|away|offline]` + +*AI Assistance:* +`/tips` - Get tips for current conversation +`/polish ` - Improve your message +`/replies` - Get smart reply suggestions +`/summary` - Get conversation summary + +*Other:* +`/help` - Show this help + +_Portuguese: /fila, /pegar, /transferir, /resolver, /dicas, /polir, /respostas, /resumo, /ajuda_"# + .to_string() +} + +// ============================================================================ +// Helper Functions +// ============================================================================ + +/// Get session from database +async fn get_session(state: &Arc, session_id: Uuid) -> Result { + let conn = state.conn.clone(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::shared::models::schema::user_sessions; + + user_sessions::table + .find(session_id) + .first::(&mut db_conn) + .map_err(|e| format!("Session not found: {}", e)) + }) + .await + .map_err(|e| format!("Task error: {}", e))? +} + +/// Load conversation history from database +async fn load_conversation_history( + state: &Arc, + session_id: Uuid, +) -> Vec { + let conn = state.conn.clone(); + + let result = tokio::task::spawn_blocking(move || { + let mut db_conn = match conn.get() { + Ok(c) => c, + Err(_) => return Vec::new(), + }; + + use crate::shared::models::schema::message_history; + + let messages: Vec<(String, i32, chrono::NaiveDateTime)> = message_history::table + .filter(message_history::session_id.eq(session_id)) + .select(( + message_history::content_encrypted, + message_history::role, + message_history::created_at, + )) + .order(message_history::created_at.asc()) + .limit(50) + .load(&mut db_conn) + .unwrap_or_default(); + + messages + .into_iter() + .map(|(content, role, timestamp)| ConversationMessage { + role: match role { + 0 => "customer".to_string(), + 1 => "bot".to_string(), + 2 => "attendant".to_string(), + _ => "system".to_string(), + }, + content, + timestamp: Some(timestamp.and_utc().to_rfc3339()), + }) + .collect() + }) + .await + .unwrap_or_default(); + + result +} + +/// Parse tips from LLM JSON response +fn parse_tips_response(response: &str) -> Vec { + // Try to extract JSON from response + let json_str = extract_json(response); + + if let Ok(parsed) = serde_json::from_str::(&json_str) { + if let Some(tips_array) = parsed.get("tips").and_then(|t| t.as_array()) { + return tips_array + .iter() + .filter_map(|tip| { + let tip_type = match tip + .get("type") + .and_then(|t| t.as_str()) + .unwrap_or("general") + { + "intent" => TipType::Intent, + "action" => TipType::Action, + "warning" => TipType::Warning, + "knowledge" => TipType::Knowledge, + "history" => TipType::History, + _ => TipType::General, + }; + + Some(AttendantTip { + tip_type, + content: tip.get("content").and_then(|c| c.as_str())?.to_string(), + confidence: tip + .get("confidence") + .and_then(|c| c.as_f64()) + .unwrap_or(0.8) as f32, + priority: tip.get("priority").and_then(|p| p.as_i64()).unwrap_or(2) as i32, + }) + }) + .collect(); + } + } + + // Fallback: treat entire response as a single tip + if !response.trim().is_empty() { + vec![AttendantTip { + tip_type: TipType::General, + content: response.trim().to_string(), + confidence: 0.7, + priority: 2, + }] + } else { + Vec::new() + } +} + +/// Parse polish response from LLM JSON +fn parse_polish_response(response: &str, original: &str) -> (String, Vec) { + let json_str = extract_json(response); + + if let Ok(parsed) = serde_json::from_str::(&json_str) { + let polished = parsed + .get("polished") + .and_then(|p| p.as_str()) + .unwrap_or(original) + .to_string(); + + let changes = parsed + .get("changes") + .and_then(|c| c.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(); + + return (polished, changes); + } + + // Fallback: use response as polished message + ( + response.trim().to_string(), + vec!["Message improved".to_string()], + ) +} + +/// Parse smart replies from LLM JSON +fn parse_smart_replies_response(response: &str) -> Vec { + let json_str = extract_json(response); + + if let Ok(parsed) = serde_json::from_str::(&json_str) { + if let Some(replies_array) = parsed.get("replies").and_then(|r| r.as_array()) { + return replies_array + .iter() + .filter_map(|reply| { + Some(SmartReply { + text: reply.get("text").and_then(|t| t.as_str())?.to_string(), + tone: reply + .get("tone") + .and_then(|t| t.as_str()) + .unwrap_or("professional") + .to_string(), + confidence: reply + .get("confidence") + .and_then(|c| c.as_f64()) + .unwrap_or(0.8) as f32, + category: reply + .get("category") + .and_then(|c| c.as_str()) + .unwrap_or("answer") + .to_string(), + }) + }) + .collect(); + } + } + + generate_fallback_replies() +} + +/// Parse summary from LLM JSON +fn parse_summary_response(response: &str) -> ConversationSummary { + let json_str = extract_json(response); + + if let Ok(parsed) = serde_json::from_str::(&json_str) { + return ConversationSummary { + brief: parsed + .get("brief") + .and_then(|b| b.as_str()) + .unwrap_or("Conversation summary") + .to_string(), + key_points: parsed + .get("key_points") + .and_then(|k| k.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(), + customer_needs: parsed + .get("customer_needs") + .and_then(|c| c.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(), + unresolved_issues: parsed + .get("unresolved_issues") + .and_then(|u| u.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(), + sentiment_trend: parsed + .get("sentiment_trend") + .and_then(|s| s.as_str()) + .unwrap_or("stable") + .to_string(), + recommended_action: parsed + .get("recommended_action") + .and_then(|r| r.as_str()) + .unwrap_or("") + .to_string(), + ..Default::default() + }; + } + + ConversationSummary { + brief: response.trim().to_string(), + ..Default::default() + } +} + +/// Parse sentiment from LLM JSON +fn parse_sentiment_response(response: &str) -> SentimentAnalysis { + let json_str = extract_json(response); + + if let Ok(parsed) = serde_json::from_str::(&json_str) { + let emotions = parsed + .get("emotions") + .and_then(|e| e.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|e| { + Some(Emotion { + name: e.get("name").and_then(|n| n.as_str())?.to_string(), + intensity: e.get("intensity").and_then(|i| i.as_f64()).unwrap_or(0.5) + as f32, + }) + }) + .collect() + }) + .unwrap_or_default(); + + return SentimentAnalysis { + overall: parsed + .get("overall") + .and_then(|o| o.as_str()) + .unwrap_or("neutral") + .to_string(), + score: parsed.get("score").and_then(|s| s.as_f64()).unwrap_or(0.0) as f32, + emotions, + escalation_risk: parsed + .get("escalation_risk") + .and_then(|e| e.as_str()) + .unwrap_or("low") + .to_string(), + urgency: parsed + .get("urgency") + .and_then(|u| u.as_str()) + .unwrap_or("normal") + .to_string(), + emoji: parsed + .get("emoji") + .and_then(|e| e.as_str()) + .unwrap_or("😐") + .to_string(), + }; + } + + SentimentAnalysis::default() +} + +/// Extract JSON from a response that might have other text +fn extract_json(response: &str) -> String { + // Look for JSON object + if let Some(start) = response.find('{') { + if let Some(end) = response.rfind('}') { + if end > start { + return response[start..=end].to_string(); + } + } + } + + // Look for JSON array + if let Some(start) = response.find('[') { + if let Some(end) = response.rfind(']') { + if end > start { + return response[start..=end].to_string(); + } + } + } + + response.to_string() +} + +/// Generate fallback tips using keyword analysis +fn generate_fallback_tips(message: &str) -> Vec { + let msg_lower = message.to_lowercase(); + let mut tips = Vec::new(); + + // Urgency detection + if msg_lower.contains("urgent") + || msg_lower.contains("asap") + || msg_lower.contains("immediately") + || msg_lower.contains("emergency") + { + tips.push(AttendantTip { + tip_type: TipType::Warning, + content: "Customer indicates urgency - prioritize quick response".to_string(), + confidence: 0.9, + priority: 1, + }); + } + + // Frustration detection + if msg_lower.contains("frustrated") + || msg_lower.contains("angry") + || msg_lower.contains("ridiculous") + || msg_lower.contains("unacceptable") + { + tips.push(AttendantTip { + tip_type: TipType::Warning, + content: "Customer may be frustrated - use empathetic language".to_string(), + confidence: 0.85, + priority: 1, + }); + } + + // Question detection + if message.contains('?') { + tips.push(AttendantTip { + tip_type: TipType::Intent, + content: "Customer is asking a question - provide clear, direct answer".to_string(), + confidence: 0.8, + priority: 2, + }); + } + + // Complaint detection + if msg_lower.contains("problem") + || msg_lower.contains("issue") + || msg_lower.contains("not working") + || msg_lower.contains("broken") + { + tips.push(AttendantTip { + tip_type: TipType::Action, + content: "Customer reporting an issue - acknowledge and gather details".to_string(), + confidence: 0.8, + priority: 2, + }); + } + + // Thanks/positive detection + if msg_lower.contains("thank") + || msg_lower.contains("great") + || msg_lower.contains("perfect") + || msg_lower.contains("awesome") + { + tips.push(AttendantTip { + tip_type: TipType::General, + content: "Customer is expressing satisfaction - good opportunity to close or upsell" + .to_string(), + confidence: 0.85, + priority: 3, + }); + } + + // Default tip if none matched + if tips.is_empty() { + tips.push(AttendantTip { + tip_type: TipType::General, + content: "Read the message carefully and respond helpfully".to_string(), + confidence: 0.5, + priority: 3, + }); + } + + tips +} + +/// Generate fallback smart replies +fn generate_fallback_replies() -> Vec { + vec![ + SmartReply { + text: "Thank you for reaching out! I'd be happy to help you with that. Could you provide me with a bit more detail?".to_string(), + tone: "friendly".to_string(), + confidence: 0.7, + category: "greeting".to_string(), + }, + SmartReply { + text: "I understand your concern. Let me look into this for you right away.".to_string(), + tone: "empathetic".to_string(), + confidence: 0.7, + category: "acknowledgment".to_string(), + }, + SmartReply { + text: "Is there anything else I can help you with today?".to_string(), + tone: "professional".to_string(), + confidence: 0.7, + category: "follow_up".to_string(), + }, + ] +} + +/// Analyze sentiment using keyword matching (fallback when LLM unavailable) +fn analyze_sentiment_keywords(message: &str) -> SentimentAnalysis { + let msg_lower = message.to_lowercase(); + + let positive_words = [ + "thank", + "great", + "perfect", + "awesome", + "excellent", + "good", + "happy", + "love", + "appreciate", + "wonderful", + "fantastic", + "amazing", + "helpful", + ]; + let negative_words = [ + "angry", + "frustrated", + "terrible", + "awful", + "horrible", + "worst", + "hate", + "disappointed", + "unacceptable", + "ridiculous", + "stupid", + "problem", + "issue", + "broken", + "failed", + "error", + ]; + let urgent_words = [ + "urgent", + "asap", + "immediately", + "emergency", + "now", + "critical", + ]; + + let positive_count = positive_words + .iter() + .filter(|w| msg_lower.contains(*w)) + .count(); + let negative_count = negative_words + .iter() + .filter(|w| msg_lower.contains(*w)) + .count(); + let urgent_count = urgent_words + .iter() + .filter(|w| msg_lower.contains(*w)) + .count(); + + let score = if positive_count > negative_count { + 0.3 + (positive_count as f32 * 0.2).min(0.7) + } else if negative_count > positive_count { + -0.3 - (negative_count as f32 * 0.2).min(0.7) + } else { + 0.0 + }; + + let overall = if score > 0.2 { + "positive" + } else if score < -0.2 { + "negative" + } else { + "neutral" + }; + + let escalation_risk = if negative_count >= 3 { + "high" + } else if negative_count >= 1 { + "medium" + } else { + "low" + }; + + let urgency = if urgent_count >= 2 { + "urgent" + } else if urgent_count >= 1 { + "high" + } else { + "normal" + }; + + let emoji = match overall { + "positive" => "😊", + "negative" => "😟", + _ => "😐", + }; + + let mut emotions = Vec::new(); + if negative_count > 0 { + emotions.push(Emotion { + name: "frustration".to_string(), + intensity: (negative_count as f32 * 0.3).min(1.0), + }); + } + if positive_count > 0 { + emotions.push(Emotion { + name: "satisfaction".to_string(), + intensity: (positive_count as f32 * 0.3).min(1.0), + }); + } + if urgent_count > 0 { + emotions.push(Emotion { + name: "anxiety".to_string(), + intensity: (urgent_count as f32 * 0.4).min(1.0), + }); + } + + SentimentAnalysis { + overall: overall.to_string(), + score, + emotions, + escalation_risk: escalation_risk.to_string(), + urgency: urgency.to_string(), + emoji: emoji.to_string(), + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_defaults() { + let config = LlmAssistConfig::default(); + assert!(!config.tips_enabled); + assert!(!config.polish_enabled); + assert!(!config.any_enabled()); + } + + #[test] + fn test_fallback_tips_urgent() { + let tips = generate_fallback_tips("This is URGENT! I need help immediately!"); + assert!(!tips.is_empty()); + assert!(tips.iter().any(|t| matches!(t.tip_type, TipType::Warning))); + } + + #[test] + fn test_fallback_tips_question() { + let tips = generate_fallback_tips("How do I reset my password?"); + assert!(!tips.is_empty()); + assert!(tips.iter().any(|t| matches!(t.tip_type, TipType::Intent))); + } + + #[test] + fn test_sentiment_positive() { + let sentiment = analyze_sentiment_keywords("Thank you so much! This is great!"); + assert_eq!(sentiment.overall, "positive"); + assert!(sentiment.score > 0.0); + assert_eq!(sentiment.escalation_risk, "low"); + } + + #[test] + fn test_sentiment_negative() { + let sentiment = + analyze_sentiment_keywords("This is terrible! I'm very frustrated with this problem."); + assert_eq!(sentiment.overall, "negative"); + assert!(sentiment.score < 0.0); + assert!(sentiment.escalation_risk == "medium" || sentiment.escalation_risk == "high"); + } + + #[test] + fn test_sentiment_urgent() { + let sentiment = analyze_sentiment_keywords("I need help ASAP! This is urgent!"); + assert!(sentiment.urgency == "high" || sentiment.urgency == "urgent"); + } + + #[test] + fn test_extract_json() { + let response = "Here is the result: {\"key\": \"value\"} and some more text."; + let json = extract_json(response); + assert_eq!(json, "{\"key\": \"value\"}"); + } + + #[test] + fn test_fallback_replies() { + let replies = generate_fallback_replies(); + assert_eq!(replies.len(), 3); + assert!(replies.iter().any(|r| r.category == "greeting")); + assert!(replies.iter().any(|r| r.category == "follow_up")); + } + + #[test] + fn test_help_text() { + let help = get_help_text(); + assert!(help.contains("/queue")); + assert!(help.contains("/tips")); + assert!(help.contains("/polish")); + } +} diff --git a/src/attendance/mod.rs b/src/attendance/mod.rs index 47526166b..6eae9a636 100644 --- a/src/attendance/mod.rs +++ b/src/attendance/mod.rs @@ -7,6 +7,19 @@ //! - **Queue System**: Human handoff for conversations that need human attention //! - **Keyword Services**: Check-in/out, break/resume tracking via keywords //! - **Drive Integration**: S3 storage for attendance records +//! - **WebSocket**: Real-time notifications for attendants +//! - **LLM Assist**: AI-powered tips, polishing, smart replies, summaries, and sentiment analysis +//! +//! ## LLM Assist Features (config.csv) +//! +//! ```csv +//! name,value +//! attendant-llm-tips,true +//! attendant-polish-message,true +//! attendant-smart-replies,true +//! attendant-auto-summary,true +//! attendant-sentiment-analysis,true +//! ``` //! //! ## Usage //! @@ -18,6 +31,7 @@ pub mod drive; pub mod keyword_services; +pub mod llm_assist; pub mod queue; // Re-export main types for convenience @@ -26,17 +40,40 @@ pub use keyword_services::{ AttendanceCommand, AttendanceRecord, AttendanceResponse, AttendanceService, KeywordConfig, KeywordParser, ParsedCommand, }; +pub use llm_assist::{ + AttendantTip, ConversationMessage, ConversationSummary, LlmAssistConfig, PolishRequest, + PolishResponse, SentimentAnalysis, SentimentResponse, SmartRepliesRequest, + SmartRepliesResponse, SmartReply, SummaryRequest, SummaryResponse, TipRequest, TipResponse, + TipType, +}; pub use queue::{ AssignRequest, AttendantStats, AttendantStatus, QueueFilters, QueueItem, QueueStatus, TransferRequest, }; -use crate::shared::state::AppState; +use crate::core::bot::channels::whatsapp::WhatsAppAdapter; +use crate::core::bot::channels::ChannelAdapter; +use crate::shared::models::{BotResponse, UserSession}; +use crate::shared::state::{AppState, AttendantNotification}; use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, Query, State, + }, + http::StatusCode, + response::IntoResponse, routing::{get, post}, - Router, + Json, Router, }; +use chrono::Utc; +use diesel::prelude::*; +use futures::{SinkExt, StreamExt}; +use log::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::broadcast; +use uuid::Uuid; /// Configure attendance routes pub fn configure_attendance_routes() -> Router> { @@ -50,10 +87,570 @@ pub fn configure_attendance_routes() -> Router> { post(queue::transfer_conversation), ) .route( - "/api/attendance/resolve/:session_id", + "/api/attendance/resolve/{session_id}", post(queue::resolve_conversation), ) .route("/api/attendance/insights", get(queue::get_insights)) + // Attendant response endpoint + .route("/api/attendance/respond", post(attendant_respond)) + // WebSocket for real-time notifications + .route("/ws/attendant", get(attendant_websocket_handler)) + // LLM Assist endpoints - AI-powered attendant assistance + .route("/api/attendance/llm/tips", post(llm_assist::generate_tips)) + .route( + "/api/attendance/llm/polish", + post(llm_assist::polish_message), + ) + .route( + "/api/attendance/llm/smart-replies", + post(llm_assist::generate_smart_replies), + ) + .route( + "/api/attendance/llm/summary/{session_id}", + get(llm_assist::generate_summary), + ) + .route( + "/api/attendance/llm/sentiment", + post(llm_assist::analyze_sentiment), + ) + .route( + "/api/attendance/llm/config/{bot_id}", + get(llm_assist::get_llm_config), + ) +} + +/// Attendant response request +#[derive(Debug, Deserialize)] +pub struct AttendantRespondRequest { + pub session_id: String, + pub message: String, + pub attendant_id: String, +} + +/// Attendant response result +#[derive(Debug, Serialize)] +pub struct AttendantRespondResponse { + pub success: bool, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Handle attendant response - routes back to the customer's channel +pub async fn attendant_respond( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!( + "Attendant {} responding to session {}", + request.attendant_id, request.session_id + ); + + let session_id = match Uuid::parse_str(&request.session_id) { + Ok(id) => id, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(AttendantRespondResponse { + success: false, + message: "Invalid session ID".to_string(), + error: Some("Could not parse session ID as UUID".to_string()), + }), + ) + } + }; + + // Get session details + let conn = state.conn.clone(); + let session_result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + use crate::shared::models::schema::user_sessions; + user_sessions::table + .find(session_id) + .first::(&mut db_conn) + .ok() + }) + .await + .ok() + .flatten(); + + let session = match session_result { + Some(s) => s, + None => { + return ( + StatusCode::NOT_FOUND, + Json(AttendantRespondResponse { + success: false, + message: "Session not found".to_string(), + error: Some("No session with that ID exists".to_string()), + }), + ) + } + }; + + // Get channel from session context + let channel = session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web"); + + // Get recipient (phone number for WhatsApp, user_id for web) + let recipient = session + .context_data + .get("phone") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + // Save attendant message to history + if let Err(e) = save_message_to_history(&state, &session, &request.message, "attendant").await { + error!("Failed to save attendant message: {}", e); + } + + // Send to appropriate channel + match channel { + "whatsapp" => { + if recipient.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(AttendantRespondResponse { + success: false, + message: "No phone number found".to_string(), + error: Some("Session has no phone number in context".to_string()), + }), + ); + } + + let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let response = BotResponse { + bot_id: session.bot_id.to_string(), + session_id: session.id.to_string(), + user_id: recipient.to_string(), + channel: "whatsapp".to_string(), + content: request.message.clone(), + message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + + match adapter.send_message(response).await { + Ok(_) => { + // Notify other attendants about the response + broadcast_attendant_action(&state, &session, &request, "attendant_response") + .await; + + ( + StatusCode::OK, + Json(AttendantRespondResponse { + success: true, + message: "Response sent to WhatsApp".to_string(), + error: None, + }), + ) + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(AttendantRespondResponse { + success: false, + message: "Failed to send WhatsApp message".to_string(), + error: Some(e.to_string()), + }), + ), + } + } + "web" | _ => { + // For web and other channels, send via WebSocket if connected + let sent = if let Some(tx) = state + .response_channels + .lock() + .await + .get(&session.id.to_string()) + { + let response = BotResponse { + bot_id: session.bot_id.to_string(), + session_id: session.id.to_string(), + user_id: session.user_id.to_string(), + channel: channel.to_string(), + content: request.message.clone(), + message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + tx.send(response).await.is_ok() + } else { + false + }; + + // Notify other attendants + broadcast_attendant_action(&state, &session, &request, "attendant_response").await; + + if sent { + ( + StatusCode::OK, + Json(AttendantRespondResponse { + success: true, + message: "Response sent via WebSocket".to_string(), + error: None, + }), + ) + } else { + // Message saved but couldn't be delivered in real-time + ( + StatusCode::OK, + Json(AttendantRespondResponse { + success: true, + message: "Response saved (customer not connected)".to_string(), + error: None, + }), + ) + } + } + } +} + +/// Save message to conversation history +async fn save_message_to_history( + state: &Arc, + session: &UserSession, + content: &str, + sender: &str, +) -> Result<(), Box> { + let conn = state.conn.clone(); + let session_id = session.id; + let content_clone = content.to_string(); + let sender_clone = sender.to_string(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::shared::models::schema::message_history; + + diesel::insert_into(message_history::table) + .values(( + message_history::id.eq(Uuid::new_v4()), + message_history::session_id.eq(session_id), + message_history::role.eq(sender_clone), + message_history::content.eq(content_clone), + message_history::created_at.eq(diesel::dsl::now), + )) + .execute(&mut db_conn) + .map_err(|e| format!("Insert error: {}", e))?; + + Ok::<(), String>(()) + }) + .await + .map_err(|e| format!("Task error: {}", e))??; + + Ok(()) +} + +/// Broadcast attendant action to other connected attendants +async fn broadcast_attendant_action( + state: &Arc, + session: &UserSession, + request: &AttendantRespondRequest, + action_type: &str, +) { + if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() { + let notification = AttendantNotification { + notification_type: action_type.to_string(), + session_id: session.id.to_string(), + user_id: session.user_id.to_string(), + user_name: session + .context_data + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + user_phone: session + .context_data + .get("phone") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + channel: session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web") + .to_string(), + content: request.message.clone(), + timestamp: Utc::now().to_rfc3339(), + assigned_to: Some(request.attendant_id.clone()), + priority: 0, + }; + + if let Err(e) = broadcast_tx.send(notification) { + debug!("No attendants listening for broadcast: {}", e); + } + } +} + +/// WebSocket handler for attendant real-time notifications +pub async fn attendant_websocket_handler( + ws: WebSocketUpgrade, + State(state): State>, + Query(params): Query>, +) -> impl IntoResponse { + let attendant_id = params.get("attendant_id").cloned(); + + if attendant_id.is_none() { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ "error": "attendant_id is required" })), + ) + .into_response(); + } + + let attendant_id = attendant_id.unwrap(); + info!( + "Attendant WebSocket connection request from: {}", + attendant_id + ); + + ws.on_upgrade(move |socket| handle_attendant_websocket(socket, state, attendant_id)) + .into_response() +} + +/// Handle attendant WebSocket connection +async fn handle_attendant_websocket(socket: WebSocket, state: Arc, attendant_id: String) { + let (mut sender, mut receiver) = socket.split(); + + info!("Attendant WebSocket connected: {}", attendant_id); + + // Send welcome message + let welcome = serde_json::json!({ + "type": "connected", + "attendant_id": attendant_id, + "message": "Connected to attendant notification service", + "timestamp": Utc::now().to_rfc3339() + }); + + if let Ok(welcome_str) = serde_json::to_string(&welcome) { + if sender + .send(Message::Text(welcome_str.into())) + .await + .is_err() + { + error!("Failed to send welcome message to attendant"); + return; + } + } + + // Subscribe to broadcast channel + let mut broadcast_rx = if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() { + broadcast_tx.subscribe() + } else { + warn!("No broadcast channel available for attendants"); + return; + }; + + // Task to forward broadcast messages to WebSocket + let attendant_id_clone = attendant_id.clone(); + let mut send_task = tokio::spawn(async move { + loop { + match broadcast_rx.recv().await { + Ok(notification) => { + // Check if this notification is relevant to this attendant + // Send all notifications for now (can filter by assigned_to later) + let should_send = notification.assigned_to.is_none() + || notification.assigned_to.as_ref() == Some(&attendant_id_clone); + + if should_send { + if let Ok(json_str) = serde_json::to_string(¬ification) { + debug!( + "Sending notification to attendant {}: {}", + attendant_id_clone, notification.notification_type + ); + if sender.send(Message::Text(json_str.into())).await.is_err() { + error!("Failed to send notification to attendant WebSocket"); + break; + } + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + "Attendant {} lagged behind by {} messages", + attendant_id_clone, n + ); + } + Err(broadcast::error::RecvError::Closed) => { + info!("Broadcast channel closed"); + break; + } + } + } + }); + + // Task to handle incoming messages from attendant (e.g., status updates, typing indicators) + let state_clone = state.clone(); + let attendant_id_for_recv = attendant_id.clone(); + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(msg)) = receiver.next().await { + match msg { + Message::Text(text) => { + debug!( + "Received message from attendant {}: {}", + attendant_id_for_recv, text + ); + + // Parse and handle attendant messages + if let Ok(parsed) = serde_json::from_str::(&text) { + handle_attendant_message(&state_clone, &attendant_id_for_recv, parsed) + .await; + } + } + Message::Ping(data) => { + debug!("Received ping from attendant {}", attendant_id_for_recv); + // Pong is automatically sent by axum + } + Message::Close(_) => { + info!( + "Attendant {} WebSocket close requested", + attendant_id_for_recv + ); + break; + } + _ => {} + } + } + }); + + // Wait for either task to complete + tokio::select! { + _ = (&mut send_task) => { + recv_task.abort(); + } + _ = (&mut recv_task) => { + send_task.abort(); + } + } + + info!("Attendant WebSocket disconnected: {}", attendant_id); +} + +/// Handle incoming messages from attendant WebSocket +async fn handle_attendant_message( + state: &Arc, + attendant_id: &str, + message: serde_json::Value, +) { + let msg_type = message + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + match msg_type { + "status_update" => { + // Update attendant status (online, busy, away, offline) + if let Some(status) = message.get("status").and_then(|v| v.as_str()) { + info!("Attendant {} status update: {}", attendant_id, status); + // Could update in database or broadcast to other attendants + } + } + "typing" => { + // Broadcast typing indicator to customer + if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) { + debug!( + "Attendant {} typing in session {}", + attendant_id, session_id + ); + // Could broadcast to customer's WebSocket + } + } + "read" => { + // Mark messages as read + if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) { + debug!( + "Attendant {} marked session {} as read", + attendant_id, session_id + ); + } + } + "respond" => { + // Handle response message (alternative to REST API) + if let (Some(session_id), Some(content)) = ( + message.get("session_id").and_then(|v| v.as_str()), + message.get("content").and_then(|v| v.as_str()), + ) { + info!( + "Attendant {} responding to {} via WebSocket", + attendant_id, session_id + ); + + // Process response similar to REST endpoint + let request = AttendantRespondRequest { + session_id: session_id.to_string(), + message: content.to_string(), + attendant_id: attendant_id.to_string(), + }; + + // Get session and send response + if let Ok(uuid) = Uuid::parse_str(session_id) { + let conn = state.conn.clone(); + if let Some(session) = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + use crate::shared::models::schema::user_sessions; + user_sessions::table + .find(uuid) + .first::(&mut db_conn) + .ok() + }) + .await + .ok() + .flatten() + { + // Save to history + let _ = + save_message_to_history(state, &session, content, "attendant").await; + + // Send to channel + let channel = session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web"); + + if channel == "whatsapp" { + if let Some(phone) = + session.context_data.get("phone").and_then(|v| v.as_str()) + { + let adapter = + WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let response = BotResponse { + bot_id: session.bot_id.to_string(), + session_id: session.id.to_string(), + user_id: phone.to_string(), + channel: "whatsapp".to_string(), + content: content.to_string(), + message_type: + crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + let _ = adapter.send_message(response).await; + } + } + + // Broadcast to other attendants + broadcast_attendant_action(state, &session, &request, "attendant_response") + .await; + } + } + } + } + _ => { + debug!( + "Unknown message type from attendant {}: {}", + attendant_id, msg_type + ); + } + } } #[cfg(test)] @@ -66,4 +663,17 @@ mod tests { let _config = KeywordConfig::default(); let _parser = KeywordParser::new(); } + + #[test] + fn test_respond_request_parse() { + let json = r#"{ + "session_id": "123e4567-e89b-12d3-a456-426614174000", + "message": "Hello, how can I help?", + "attendant_id": "att-001" + }"#; + + let request: AttendantRespondRequest = serde_json::from_str(json).unwrap(); + assert_eq!(request.attendant_id, "att-001"); + assert_eq!(request.message, "Hello, how can I help?"); + } } diff --git a/src/attendance/queue.rs b/src/attendance/queue.rs index 528a67a6e..47d3f9aed 100644 --- a/src/attendance/queue.rs +++ b/src/attendance/queue.rs @@ -97,22 +97,42 @@ pub struct QueueFilters { pub assigned_to: Option, } -/// Check if bot has transfer enabled in config.csv +/// Check if CRM/transfer is enabled in config.csv +/// Supports both `crm-enabled = true` and legacy `transfer = true` async fn is_transfer_enabled(bot_id: Uuid, work_path: &str) -> bool { let config_path = PathBuf::from(work_path) .join(format!("{}.gbai", bot_id)) .join("config.csv"); if !config_path.exists() { + // Try alternate path without UUID prefix + let alt_path = PathBuf::from(work_path).join("config.csv"); + if alt_path.exists() { + return check_config_for_crm_enabled(&alt_path); + } warn!("Config file not found: {:?}", config_path); return false; } - match std::fs::read_to_string(&config_path) { + check_config_for_crm_enabled(&config_path) +} + +/// Helper to check config file for CRM/transfer settings +fn check_config_for_crm_enabled(config_path: &PathBuf) -> bool { + match std::fs::read_to_string(config_path) { Ok(content) => { for line in content.lines() { - if line.to_lowercase().contains("transfer") && line.to_lowercase().contains("true") + let line_lower = line.to_lowercase(); + // Check for crm-enabled = true or crm_enabled = true (primary) + if (line_lower.contains("crm-enabled") || line_lower.contains("crm_enabled")) + && line_lower.contains("true") { + info!("CRM enabled via crm-enabled setting"); + return true; + } + // Also support legacy transfer = true for backward compatibility + if line_lower.contains("transfer") && line_lower.contains("true") { + info!("CRM enabled via legacy transfer setting"); return true; } } diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index 7188b4f1a..22ac6ef19 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -59,6 +59,7 @@ pub mod social_media; pub mod string_functions; pub mod switch_case; pub mod table_definition; +pub mod transfer_to_human; pub mod universal_messaging; pub mod use_kb; pub mod use_tool; diff --git a/src/basic/keywords/transfer_to_human.rs b/src/basic/keywords/transfer_to_human.rs new file mode 100644 index 000000000..4f072b459 --- /dev/null +++ b/src/basic/keywords/transfer_to_human.rs @@ -0,0 +1,835 @@ +//! Transfer to Human Keyword +//! +//! Provides the TRANSFER TO HUMAN keyword for bot-to-human handoff in conversations. +//! This is a critical feature for hybrid bot/human support workflows. +//! +//! ## Features +//! +//! - Transfer to any available attendant +//! - Transfer to specific person by name or alias +//! - Transfer to specific department +//! - Priority-based queue placement +//! - Context passing for seamless handoff +//! +//! ## Configuration +//! +//! Requires `crm-enabled = true` in the bot's config.csv file. +//! Attendants are configured in attendant.csv in the bot's .gbai folder. +//! +//! ## Usage in BASIC +//! +//! ```basic +//! ' Transfer to any available human +//! TRANSFER TO HUMAN +//! +//! ' Transfer to specific person +//! TRANSFER TO HUMAN "John Smith" +//! +//! ' Transfer to department +//! TRANSFER TO HUMAN department: "sales" +//! +//! ' Transfer with priority and context +//! TRANSFER TO HUMAN "support", "high", "Customer needs help with billing" +//! ``` +//! +//! ## As LLM Tool +//! +//! This keyword is also registered as an LLM tool, allowing the AI to +//! automatically transfer conversations when appropriate. + +use crate::shared::models::UserSession; +use crate::shared::state::AppState; +use chrono::Utc; +use diesel::prelude::*; +use log::{debug, error, info, warn}; +use rhai::{Dynamic, Engine, Map}; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::sync::Arc; +use uuid::Uuid; + +/// Transfer request structure +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransferToHumanRequest { + /// Optional name or alias of the person to transfer to + pub name: Option, + /// Optional department to transfer to + pub department: Option, + /// Priority level: "normal", "high", "urgent" + pub priority: Option, + /// Reason for the transfer (passed to attendant) + pub reason: Option, + /// Additional context from the conversation + pub context: Option, +} + +/// Transfer result +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TransferResult { + pub success: bool, + pub status: TransferStatus, + pub queue_position: Option, + pub assigned_to: Option, + pub assigned_to_name: Option, + pub estimated_wait_seconds: Option, + pub message: String, +} + +/// Transfer status +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TransferStatus { + /// Queued for next available attendant + Queued, + /// Assigned to specific attendant + Assigned, + /// Attendant is online and ready + Connected, + /// No attendants available + NoAttendants, + /// CRM not enabled + CrmDisabled, + /// Specified attendant not found + AttendantNotFound, + /// Error during transfer + Error, +} + +/// Attendant information from CSV +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Attendant { + pub id: String, + pub name: String, + pub channel: String, + pub preferences: String, + pub department: Option, + pub aliases: Vec, + pub status: AttendantStatus, +} + +/// Attendant status +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum AttendantStatus { + Online, + Busy, + Away, + Offline, +} + +impl Default for AttendantStatus { + fn default() -> Self { + AttendantStatus::Offline + } +} + +/// Check if CRM is enabled in bot's config.csv +pub fn is_crm_enabled(bot_id: Uuid, work_path: &str) -> bool { + let config_path = PathBuf::from(work_path) + .join(format!("{}.gbai", bot_id)) + .join("config.csv"); + + if !config_path.exists() { + // Also try without UUID prefix + let alt_path = PathBuf::from(work_path).join("config.csv"); + if alt_path.exists() { + return check_config_for_crm(&alt_path); + } + warn!("Config file not found: {:?}", config_path); + return false; + } + + check_config_for_crm(&config_path) +} + +fn check_config_for_crm(config_path: &PathBuf) -> bool { + match std::fs::read_to_string(config_path) { + Ok(content) => { + for line in content.lines() { + let line_lower = line.to_lowercase(); + // Check for crm-enabled = true or crm_enabled = true + if (line_lower.contains("crm-enabled") || line_lower.contains("crm_enabled")) + && line_lower.contains("true") + { + return true; + } + // Also support legacy transfer = true + if line_lower.contains("transfer") && line_lower.contains("true") { + return true; + } + } + false + } + Err(e) => { + error!("Failed to read config file: {}", e); + false + } + } +} + +/// Read attendants from attendant.csv +pub fn read_attendants(bot_id: Uuid, work_path: &str) -> Vec { + let attendant_path = PathBuf::from(work_path) + .join(format!("{}.gbai", bot_id)) + .join("attendant.csv"); + + if !attendant_path.exists() { + // Try alternate path + let alt_path = PathBuf::from(work_path).join("attendant.csv"); + if alt_path.exists() { + return parse_attendants_csv(&alt_path); + } + warn!("Attendant file not found: {:?}", attendant_path); + return Vec::new(); + } + + parse_attendants_csv(&attendant_path) +} + +fn parse_attendants_csv(path: &PathBuf) -> Vec { + match std::fs::read_to_string(path) { + Ok(content) => { + let mut attendants = Vec::new(); + let mut lines = content.lines(); + + // Skip header + let header = lines.next().unwrap_or(""); + let headers: Vec = header.split(',').map(|s| s.trim().to_lowercase()).collect(); + + // Find column indices + let id_idx = headers.iter().position(|h| h == "id").unwrap_or(0); + let name_idx = headers.iter().position(|h| h == "name").unwrap_or(1); + let channel_idx = headers.iter().position(|h| h == "channel").unwrap_or(2); + let pref_idx = headers.iter().position(|h| h == "preferences").unwrap_or(3); + let dept_idx = headers.iter().position(|h| h == "department"); + let alias_idx = headers.iter().position(|h| h == "aliases" || h == "alias"); + + for line in lines { + if line.trim().is_empty() { + continue; + } + + let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect(); + if parts.len() >= 4 { + let department = dept_idx.and_then(|i| parts.get(i).map(|s| s.to_string())); + let aliases = alias_idx + .and_then(|i| parts.get(i)) + .map(|s| s.split(';').map(|a| a.trim().to_lowercase()).collect()) + .unwrap_or_default(); + + attendants.push(Attendant { + id: parts.get(id_idx).unwrap_or(&"").to_string(), + name: parts.get(name_idx).unwrap_or(&"").to_string(), + channel: parts.get(channel_idx).unwrap_or(&"all").to_string(), + preferences: parts.get(pref_idx).unwrap_or(&"").to_string(), + department, + aliases, + status: AttendantStatus::Online, // Default to online, will be updated from DB + }); + } + } + + info!("Loaded {} attendants from CSV", attendants.len()); + attendants + } + Err(e) => { + error!("Failed to read attendant file: {}", e); + Vec::new() + } + } +} + +/// Find attendant by name, alias, or department +pub fn find_attendant<'a>( + attendants: &'a [Attendant], + name: Option<&str>, + department: Option<&str>, +) -> Option<&'a Attendant> { + if let Some(search_name) = name { + let search_lower = search_name.to_lowercase(); + + // First try exact name match + if let Some(att) = attendants + .iter() + .find(|a| a.name.to_lowercase() == search_lower) + { + return Some(att); + } + + // Try partial name match + if let Some(att) = attendants + .iter() + .find(|a| a.name.to_lowercase().contains(&search_lower)) + { + return Some(att); + } + + // Try alias match + if let Some(att) = attendants + .iter() + .find(|a| a.aliases.contains(&search_lower)) + { + return Some(att); + } + + // Try ID match + if let Some(att) = attendants + .iter() + .find(|a| a.id.to_lowercase() == search_lower) + { + return Some(att); + } + } + + if let Some(dept) = department { + let dept_lower = dept.to_lowercase(); + + // Find first online attendant in department + if let Some(att) = attendants.iter().find(|a| { + a.department + .as_ref() + .map(|d| d.to_lowercase() == dept_lower) + .unwrap_or(false) + && a.status == AttendantStatus::Online + }) { + return Some(att); + } + + // Try preferences match for department + if let Some(att) = attendants.iter().find(|a| { + a.preferences.to_lowercase().contains(&dept_lower) + && a.status == AttendantStatus::Online + }) { + return Some(att); + } + } + + // Return first online attendant if no specific match + attendants + .iter() + .find(|a| a.status == AttendantStatus::Online) +} + +/// Priority to integer for queue ordering +fn priority_to_int(priority: Option<&str>) -> i32 { + match priority.map(|p| p.to_lowercase()).as_deref() { + Some("urgent") => 3, + Some("high") => 2, + Some("normal") | None => 1, + Some("low") => 0, + _ => 1, + } +} + +/// Execute the transfer to human +pub async fn execute_transfer( + state: Arc, + session: &UserSession, + request: TransferToHumanRequest, +) -> TransferResult { + let work_path = "./work"; + let bot_id = session.bot_id; + + // Check if CRM is enabled + if !is_crm_enabled(bot_id, work_path) { + return TransferResult { + success: false, + status: TransferStatus::CrmDisabled, + queue_position: None, + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: None, + message: "CRM features are not enabled. Add 'crm-enabled,true' to config.csv" + .to_string(), + }; + } + + // Load attendants + let attendants = read_attendants(bot_id, work_path); + if attendants.is_empty() { + return TransferResult { + success: false, + status: TransferStatus::NoAttendants, + queue_position: None, + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: None, + message: "No attendants configured. Create attendant.csv in your .gbai folder" + .to_string(), + }; + } + + // Find matching attendant + let attendant = find_attendant( + &attendants, + request.name.as_deref(), + request.department.as_deref(), + ); + + // If specific name was requested but not found + if request.name.is_some() && attendant.is_none() { + return TransferResult { + success: false, + status: TransferStatus::AttendantNotFound, + queue_position: None, + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: None, + message: format!( + "Attendant '{}' not found. Available attendants: {}", + request.name.as_ref().unwrap(), + attendants + .iter() + .map(|a| a.name.as_str()) + .collect::>() + .join(", ") + ), + }; + } + + // Update session to mark as needing human attention + let priority = priority_to_int(request.priority.as_deref()); + let transfer_context = serde_json::json!({ + "transfer_requested_at": Utc::now().to_rfc3339(), + "transfer_priority": priority, + "transfer_reason": request.reason.clone().unwrap_or_default(), + "transfer_context": request.context.clone().unwrap_or_default(), + "transfer_to_name": request.name.clone(), + "transfer_to_department": request.department.clone(), + "needs_human": true, + "assigned_to": attendant.as_ref().map(|a| a.id.clone()), + "assigned_to_name": attendant.as_ref().map(|a| a.name.clone()), + "status": if attendant.is_some() { "assigned" } else { "queued" }, + }); + + // Update session in database + let session_id = session.id; + let conn = state.conn.clone(); + let ctx_data = transfer_context.clone(); + + let update_result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn + .get() + .map_err(|e| format!("DB connection error: {}", e))?; + + use crate::shared::models::schema::user_sessions; + + diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) + .set(user_sessions::context_data.eq(ctx_data)) + .execute(&mut db_conn) + .map_err(|e| format!("Failed to update session: {}", e)) + }) + .await; + + match update_result { + Ok(Ok(_)) => { + if let Some(att) = attendant { + info!( + "Transfer: Session {} assigned to {} ({})", + session.id, att.name, att.id + ); + TransferResult { + success: true, + status: TransferStatus::Assigned, + queue_position: Some(1), + assigned_to: Some(att.id.clone()), + assigned_to_name: Some(att.name.clone()), + estimated_wait_seconds: Some(30), + message: format!( + "You have been connected to {}. They will be with you shortly.", + att.name + ), + } + } else { + info!( + "Transfer: Session {} queued for next available attendant", + session.id + ); + TransferResult { + success: true, + status: TransferStatus::Queued, + queue_position: Some(1), // TODO: Calculate actual position + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: Some(120), + message: "You have been added to the queue. The next available attendant will assist you.".to_string(), + } + } + } + Ok(Err(e)) => { + error!("Transfer failed: {}", e); + TransferResult { + success: false, + status: TransferStatus::Error, + queue_position: None, + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: None, + message: format!("Transfer failed: {}", e), + } + } + Err(e) => { + error!("Transfer task failed: {:?}", e); + TransferResult { + success: false, + status: TransferStatus::Error, + queue_position: None, + assigned_to: None, + assigned_to_name: None, + estimated_wait_seconds: None, + message: format!("Transfer task failed: {:?}", e), + } + } + } +} + +/// Convert TransferResult to Rhai Dynamic +impl TransferResult { + pub fn to_dynamic(&self) -> Dynamic { + let mut map = Map::new(); + map.insert("success".into(), Dynamic::from(self.success)); + map.insert( + "status".into(), + Dynamic::from(format!("{:?}", self.status).to_lowercase()), + ); + map.insert("message".into(), Dynamic::from(self.message.clone())); + + if let Some(pos) = self.queue_position { + map.insert("queue_position".into(), Dynamic::from(pos)); + } + if let Some(ref id) = self.assigned_to { + map.insert("assigned_to".into(), Dynamic::from(id.clone())); + } + if let Some(ref name) = self.assigned_to_name { + map.insert("assigned_to_name".into(), Dynamic::from(name.clone())); + } + if let Some(wait) = self.estimated_wait_seconds { + map.insert("estimated_wait_seconds".into(), Dynamic::from(wait)); + } + + Dynamic::from(map) + } +} + +/// Register the TRANSFER TO HUMAN keyword with the Rhai engine +pub fn register_transfer_to_human_keyword( + state: Arc, + user: UserSession, + engine: &mut Engine, +) { + // TRANSFER TO HUMAN (no arguments - any available) + let state_clone = state.clone(); + let user_clone = user.clone(); + engine.register_fn("transfer_to_human", move || -> Dynamic { + let state = state_clone.clone(); + let session = user_clone.clone(); + + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(async { + execute_transfer( + state, + &session, + TransferToHumanRequest { + name: None, + department: None, + priority: None, + reason: None, + context: None, + }, + ) + .await + }); + + result.to_dynamic() + }); + + // TRANSFER TO HUMAN "name" + let state_clone = state.clone(); + let user_clone = user.clone(); + engine.register_fn("transfer_to_human", move |name: &str| -> Dynamic { + let state = state_clone.clone(); + let session = user_clone.clone(); + let name_str = name.to_string(); + + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(async { + execute_transfer( + state, + &session, + TransferToHumanRequest { + name: Some(name_str), + department: None, + priority: None, + reason: None, + context: None, + }, + ) + .await + }); + + result.to_dynamic() + }); + + // TRANSFER TO HUMAN "department", "priority" + let state_clone = state.clone(); + let user_clone = user.clone(); + engine.register_fn( + "transfer_to_human", + move |department: &str, priority: &str| -> Dynamic { + let state = state_clone.clone(); + let session = user_clone.clone(); + let dept = department.to_string(); + let prio = priority.to_string(); + + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(async { + execute_transfer( + state, + &session, + TransferToHumanRequest { + name: None, + department: Some(dept), + priority: Some(prio), + reason: None, + context: None, + }, + ) + .await + }); + + result.to_dynamic() + }, + ); + + // TRANSFER TO HUMAN "department", "priority", "reason" + let state_clone = state.clone(); + let user_clone = user.clone(); + engine.register_fn( + "transfer_to_human", + move |department: &str, priority: &str, reason: &str| -> Dynamic { + let state = state_clone.clone(); + let session = user_clone.clone(); + let dept = department.to_string(); + let prio = priority.to_string(); + let rsn = reason.to_string(); + + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(async { + execute_transfer( + state, + &session, + TransferToHumanRequest { + name: None, + department: Some(dept), + priority: Some(prio), + reason: Some(rsn), + context: None, + }, + ) + .await + }); + + result.to_dynamic() + }, + ); + + // TRANSFER TO HUMAN with Map (for named parameters) + let state_clone = state.clone(); + let user_clone = user.clone(); + engine.register_fn("transfer_to_human_ex", move |params: Map| -> Dynamic { + let state = state_clone.clone(); + let session = user_clone.clone(); + + let name = params + .get("name") + .and_then(|v| v.clone().try_cast::()); + let department = params + .get("department") + .and_then(|v| v.clone().try_cast::()); + let priority = params + .get("priority") + .and_then(|v| v.clone().try_cast::()); + let reason = params + .get("reason") + .and_then(|v| v.clone().try_cast::()); + let context = params + .get("context") + .and_then(|v| v.clone().try_cast::()); + + let rt = tokio::runtime::Handle::current(); + let result = rt.block_on(async { + execute_transfer( + state, + &session, + TransferToHumanRequest { + name, + department, + priority, + reason, + context, + }, + ) + .await + }); + + result.to_dynamic() + }); + + debug!("Registered TRANSFER TO HUMAN keywords"); +} + +/// Tool schema for LLM integration +pub const TRANSFER_TO_HUMAN_TOOL_SCHEMA: &str = r#"{ + "name": "transfer_to_human", + "description": "Transfer the conversation to a human attendant. Use this when the customer explicitly asks to speak with a person, when the issue is too complex for automated handling, or when emotional support is needed.", + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "If someone wants to talk to somebody specific, provide their name or alias. Leave empty for any available attendant." + }, + "department": { + "type": "string", + "description": "Department to transfer to: sales, support, technical, billing, etc.", + "enum": ["sales", "support", "technical", "billing", "general"] + }, + "priority": { + "type": "string", + "description": "Priority level for the transfer", + "enum": ["normal", "high", "urgent"], + "default": "normal" + }, + "reason": { + "type": "string", + "description": "Brief reason for the transfer to help the attendant understand the context" + } + }, + "required": [] + } +}"#; + +/// Get the tool definition for registration with LLM +pub fn get_tool_definition() -> serde_json::Value { + serde_json::json!({ + "type": "function", + "function": { + "name": "transfer_to_human", + "description": "Transfer the conversation to a human attendant. Use this when the customer explicitly asks to speak with a person, when the issue is too complex for automated handling, or when emotional support is needed.", + "parameters": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "If someone wants to talk to somebody specific, provide their name or alias. Leave empty for any available attendant." + }, + "department": { + "type": "string", + "description": "Department to transfer to: sales, support, technical, billing, etc." + }, + "priority": { + "type": "string", + "description": "Priority level for the transfer: normal, high, or urgent", + "default": "normal" + }, + "reason": { + "type": "string", + "description": "Brief reason for the transfer to help the attendant understand the context" + } + }, + "required": [] + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_priority_to_int() { + assert_eq!(priority_to_int(Some("urgent")), 3); + assert_eq!(priority_to_int(Some("high")), 2); + assert_eq!(priority_to_int(Some("normal")), 1); + assert_eq!(priority_to_int(Some("low")), 0); + assert_eq!(priority_to_int(None), 1); + } + + #[test] + fn test_find_attendant_by_name() { + let attendants = vec![ + Attendant { + id: "att-001".to_string(), + name: "John Smith".to_string(), + channel: "all".to_string(), + preferences: "sales".to_string(), + department: Some("commercial".to_string()), + aliases: vec!["johnny".to_string(), "js".to_string()], + status: AttendantStatus::Online, + }, + Attendant { + id: "att-002".to_string(), + name: "Jane Doe".to_string(), + channel: "web".to_string(), + preferences: "support".to_string(), + department: Some("customer-service".to_string()), + aliases: vec![], + status: AttendantStatus::Online, + }, + ]; + + // Find by exact name + let found = find_attendant(&attendants, Some("John Smith"), None); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, "att-001"); + + // Find by partial name + let found = find_attendant(&attendants, Some("john"), None); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, "att-001"); + + // Find by alias + let found = find_attendant(&attendants, Some("johnny"), None); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, "att-001"); + + // Find by department + let found = find_attendant(&attendants, None, Some("customer-service")); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, "att-002"); + } + + #[test] + fn test_transfer_result_to_dynamic() { + let result = TransferResult { + success: true, + status: TransferStatus::Assigned, + queue_position: Some(1), + assigned_to: Some("att-001".to_string()), + assigned_to_name: Some("John Smith".to_string()), + estimated_wait_seconds: Some(30), + message: "Connected to John".to_string(), + }; + + let dynamic = result.to_dynamic(); + let map = dynamic.try_cast::().unwrap(); + + assert_eq!( + map.get("success") + .unwrap() + .clone() + .try_cast::() + .unwrap(), + true + ); + assert_eq!( + map.get("assigned_to_name") + .unwrap() + .clone() + .try_cast::() + .unwrap(), + "John Smith" + ); + } +} diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 81b0ab865..49ab8a00b 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -51,6 +51,7 @@ use self::keywords::send_mail::send_mail_keyword; use self::keywords::send_template::register_send_template_keywords; use self::keywords::social_media::register_social_media_keywords; use self::keywords::switch_case::preprocess_switch; +use self::keywords::transfer_to_human::register_transfer_to_human_keyword; use self::keywords::use_kb::register_use_kb_keyword; use self::keywords::use_tool::use_tool_keyword; use self::keywords::use_website::{clear_websites_keyword, use_website_keyword}; @@ -173,6 +174,14 @@ impl ScriptService { // Lead Scoring: SCORE LEAD, GET LEAD SCORE, QUALIFY LEAD, AI SCORE LEAD register_lead_scoring_keywords(state.clone(), user.clone(), &mut engine); + // ======================================================================== + // CRM & HUMAN HANDOFF + // ======================================================================== + + // TRANSFER TO HUMAN: Bot-to-human handoff for hybrid support workflows + // Supports transfer by name/alias, department, priority, and context + register_transfer_to_human_keyword(state.clone(), user.clone(), &mut engine); + // ======================================================================== // CORE BASIC FUNCTIONS: Math, Date/Time, Validation, Arrays, Error Handling // ======================================================================== diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 7320cd499..02ee1a34c 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -17,7 +17,23 @@ use redis::Client as RedisClient; use std::any::{Any, TypeId}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; + +/// Notification sent to attendants via WebSocket/broadcast +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AttendantNotification { + #[serde(rename = "type")] + pub notification_type: String, + pub session_id: String, + pub user_id: String, + pub user_name: Option, + pub user_phone: Option, + pub channel: String, + pub content: String, + pub timestamp: String, + pub assigned_to: Option, + pub priority: i32, +} /// Type-erased extension storage for AppState #[derive(Default)] @@ -106,6 +122,9 @@ pub struct AppState { pub task_engine: Arc, /// Type-erased extension storage for web handlers and other components pub extensions: Extensions, + /// Broadcast channel for attendant notifications (human handoff) + /// Used to notify attendants of new messages from customers + pub attendant_broadcast: Option>, } impl Clone for AppState { fn clone(&self) -> Self { @@ -133,6 +152,7 @@ impl Clone for AppState { voice_adapter: Arc::clone(&self.voice_adapter), task_engine: Arc::clone(&self.task_engine), extensions: self.extensions.clone(), + attendant_broadcast: self.attendant_broadcast.clone(), } } } diff --git a/src/main.rs b/src/main.rs index a66ecc454..da1414694 100644 --- a/src/main.rs +++ b/src/main.rs @@ -183,6 +183,18 @@ async fn run_axum_server( api_router = api_router.merge(botserver::sources::configure_sources_routes()); api_router = api_router.merge(botserver::designer::configure_designer_routes()); + // Add WhatsApp webhook routes if feature is enabled + #[cfg(feature = "whatsapp")] + { + api_router = api_router.merge(crate::whatsapp::configure()); + } + + // Add attendance/CRM routes for human handoff + #[cfg(feature = "attendance")] + { + api_router = api_router.merge(crate::attendance::configure_attendance_routes()); + } + // Add OAuth authentication routes api_router = api_router.merge(crate::core::oauth::routes::configure()); @@ -616,6 +628,11 @@ async fn main() -> std::io::Result<()> { // Initialize TaskScheduler (will be set after AppState creation) let task_scheduler = None; + // Create broadcast channel for attendant notifications (human handoff) + let (attendant_tx, _attendant_rx) = tokio::sync::broadcast::channel::< + botserver::core::shared::state::AttendantNotification, + >(1000); + let app_state = Arc::new(AppState { drive: Some(drive.clone()), s3_client: Some(drive), @@ -644,6 +661,7 @@ async fn main() -> std::io::Result<()> { kb_manager: Some(kb_manager.clone()), task_engine: task_engine, extensions: botserver::core::shared::state::Extensions::new(), + attendant_broadcast: Some(attendant_tx), }); // Initialize TaskScheduler with the AppState diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs new file mode 100644 index 000000000..b1fda6dc7 --- /dev/null +++ b/src/whatsapp/mod.rs @@ -0,0 +1,1115 @@ +//! WhatsApp Integration Module +//! +//! Handles incoming WhatsApp webhooks and routes messages to bot or human attendants. +//! Supports the full message flow: +//! 1. WhatsApp Cloud API webhook verification +//! 2. Incoming message processing +//! 3. Bot response or human handoff +//! 4. Attendant response routing back to WhatsApp +//! +//! ## Configuration +//! +//! Add to your bot's config.csv: +//! ```csv +//! whatsapp-api-key,your_access_token +//! whatsapp-phone-number-id,your_phone_number_id +//! whatsapp-verify-token,your_webhook_verify_token +//! whatsapp-business-account-id,your_business_account_id +//! ``` + +use crate::core::bot::channels::whatsapp::WhatsAppAdapter; +use crate::core::bot::channels::ChannelAdapter; +use crate::core::bot::orchestrator::BotOrchestrator; +use crate::shared::models::{BotResponse, UserMessage, UserSession}; +use crate::shared::state::AppState; +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Json, Router, +}; +use chrono::Utc; +use diesel::prelude::*; +use log::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::broadcast; +use uuid::Uuid; + +/// WebSocket broadcast channel for attendant notifications +pub type AttendantBroadcast = broadcast::Sender; + +/// Notification sent to attendants via WebSocket +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AttendantNotification { + #[serde(rename = "type")] + pub notification_type: String, + pub session_id: String, + pub user_id: String, + pub user_name: Option, + pub user_phone: Option, + pub channel: String, + pub content: String, + pub timestamp: String, + pub assigned_to: Option, + pub priority: i32, +} + +/// WhatsApp webhook verification query parameters +#[derive(Debug, Deserialize)] +pub struct WebhookVerifyQuery { + #[serde(rename = "hub.mode")] + pub mode: Option, + #[serde(rename = "hub.verify_token")] + pub verify_token: Option, + #[serde(rename = "hub.challenge")] + pub challenge: Option, +} + +/// WhatsApp webhook payload +#[derive(Debug, Deserialize)] +pub struct WhatsAppWebhook { + pub object: String, + #[serde(default)] + pub entry: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppEntry { + pub id: String, + #[serde(default)] + pub changes: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppChange { + pub field: String, + pub value: WhatsAppValue, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppValue { + pub messaging_product: String, + #[serde(default)] + pub metadata: WhatsAppMetadata, + #[serde(default)] + pub contacts: Vec, + #[serde(default)] + pub messages: Vec, + #[serde(default)] + pub statuses: Vec, +} + +#[derive(Debug, Default, Deserialize)] +pub struct WhatsAppMetadata { + pub display_phone_number: Option, + pub phone_number_id: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppContact { + pub wa_id: String, + pub profile: WhatsAppProfile, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppProfile { + pub name: String, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppMessage { + pub id: String, + pub from: String, + pub timestamp: String, + #[serde(rename = "type")] + pub message_type: String, + #[serde(default)] + pub text: Option, + #[serde(default)] + pub image: Option, + #[serde(default)] + pub audio: Option, + #[serde(default)] + pub video: Option, + #[serde(default)] + pub document: Option, + #[serde(default)] + pub location: Option, + #[serde(default)] + pub interactive: Option, + #[serde(default)] + pub button: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppText { + pub body: String, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppMedia { + pub id: String, + #[serde(default)] + pub mime_type: Option, + #[serde(default)] + pub sha256: Option, + #[serde(default)] + pub caption: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppLocation { + pub latitude: f64, + pub longitude: f64, + #[serde(default)] + pub name: Option, + #[serde(default)] + pub address: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppInteractive { + #[serde(rename = "type")] + pub interactive_type: String, + #[serde(default)] + pub button_reply: Option, + #[serde(default)] + pub list_reply: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppButtonReply { + pub id: String, + pub title: String, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppListReply { + pub id: String, + pub title: String, + #[serde(default)] + pub description: Option, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppButton { + pub payload: String, + pub text: String, +} + +#[derive(Debug, Deserialize)] +pub struct WhatsAppStatus { + pub id: String, + pub status: String, + pub timestamp: String, + pub recipient_id: String, +} + +/// Configure WhatsApp routes +pub fn configure() -> Router> { + Router::new() + .route("/webhook/whatsapp", get(verify_webhook)) + .route("/webhook/whatsapp", post(handle_webhook)) + .route("/api/whatsapp/send", post(send_message)) + .route("/api/attendance/respond", post(attendant_respond)) +} + +/// Verify WhatsApp webhook (GET request) +pub async fn verify_webhook( + State(state): State>, + Query(params): Query, +) -> impl IntoResponse { + info!("WhatsApp webhook verification request received"); + + let mode = params.mode.unwrap_or_default(); + let token = params.verify_token.unwrap_or_default(); + let challenge = params.challenge.unwrap_or_default(); + + if mode != "subscribe" { + warn!("Invalid webhook mode: {}", mode); + return (StatusCode::FORBIDDEN, "Invalid mode".to_string()); + } + + // Get verify token from config + let expected_token = get_verify_token(&state).await; + + if token == expected_token { + info!("Webhook verification successful"); + (StatusCode::OK, challenge) + } else { + warn!("Invalid verify token"); + (StatusCode::FORBIDDEN, "Invalid verify token".to_string()) + } +} + +/// Handle incoming WhatsApp webhook (POST request) +pub async fn handle_webhook( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + info!("WhatsApp webhook received: {:?}", payload.object); + + if payload.object != "whatsapp_business_account" { + return StatusCode::OK; + } + + for entry in payload.entry { + for change in entry.changes { + if change.field == "messages" { + // Get contact info + let contact = change.value.contacts.first(); + let contact_name = contact.map(|c| c.profile.name.clone()); + let contact_phone = contact.map(|c| c.wa_id.clone()); + + // Process messages + for message in change.value.messages { + if let Err(e) = process_incoming_message( + state.clone(), + &message, + contact_name.clone(), + contact_phone.clone(), + ) + .await + { + error!("Failed to process WhatsApp message: {}", e); + } + } + + // Process status updates + for status in change.value.statuses { + debug!( + "Message {} status: {} for {}", + status.id, status.status, status.recipient_id + ); + } + } + } + } + + StatusCode::OK +} + +/// Process an incoming WhatsApp message +async fn process_incoming_message( + state: Arc, + message: &WhatsAppMessage, + contact_name: Option, + contact_phone: Option, +) -> Result<(), Box> { + let phone = contact_phone + .clone() + .unwrap_or_else(|| message.from.clone()); + let name = contact_name.clone().unwrap_or_else(|| phone.clone()); + + info!( + "Processing WhatsApp message from {} ({}): type={}", + name, phone, message.message_type + ); + + // Extract message content + let content = extract_message_content(message); + if content.is_empty() { + debug!("Empty message content, skipping"); + return Ok(()); + } + + // Check if this is an attendant command (starts with /) + if content.starts_with('/') { + if let Some(response) = process_attendant_command(&state, &phone, &content).await { + // Send response back to the attendant + let adapter = WhatsAppAdapter::new(state.conn.clone(), Uuid::nil()); + let bot_response = BotResponse { + bot_id: Uuid::nil().to_string(), + session_id: Uuid::nil().to_string(), + user_id: phone.clone(), + channel: "whatsapp".to_string(), + content: response, + message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + if let Err(e) = adapter.send_message(bot_response).await { + error!("Failed to send attendant command response: {}", e); + } + return Ok(()); + } + } + + // Find or create session for this user + let (session, is_new) = find_or_create_session(&state, &phone, &name).await?; + + // Check if session needs human attention (transferred to human) + let needs_human = check_needs_human(&session); + + if needs_human { + // Route to human attendant + route_to_attendant(&state, &session, &content, &name, &phone).await?; + } else { + // Route to bot + route_to_bot(&state, &session, &content, is_new).await?; + } + + Ok(()) +} + +/// Process attendant commands from WhatsApp +/// Returns Some(response) if this is an attendant command, None otherwise +async fn process_attendant_command( + state: &Arc, + phone: &str, + content: &str, +) -> Option { + // Check if this phone number belongs to an attendant + let is_attendant = check_is_attendant(state, phone).await; + + if !is_attendant { + return None; + } + + // Get current session the attendant is handling (if any) + let current_session = get_attendant_active_session(state, phone).await; + + // Process the command using llm_assist module + match crate::attendance::llm_assist::process_attendant_command( + state, + phone, + content, + current_session, + ) + .await + { + Ok(response) => Some(response), + Err(e) => Some(format!("āŒ Error: {}", e)), + } +} + +/// Check if a phone number belongs to a registered attendant +async fn check_is_attendant(state: &Arc, phone: &str) -> bool { + let conn = state.conn.clone(); + let phone_clone = phone.to_string(); + + tokio::task::spawn_blocking(move || { + // Try to find attendant by phone in bot_configuration or a dedicated table + // For now, check if phone is in attendant.csv context + let work_path = std::env::var("WORK_PATH").unwrap_or_else(|_| "./work".to_string()); + + // Read attendant.csv files from all bots + if let Ok(entries) = std::fs::read_dir(&work_path) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() && path.to_string_lossy().ends_with(".gbai") { + let attendant_path = path.join("attendant.csv"); + if attendant_path.exists() { + if let Ok(content) = std::fs::read_to_string(&attendant_path) { + for line in content.lines().skip(1) { + // Check if phone is in this line (could be in channel or preferences) + if line.to_lowercase().contains(&phone_clone.to_lowercase()) { + return true; + } + } + } + } + } + } + } + false + }) + .await + .unwrap_or(false) +} + +/// Get the active session an attendant is currently handling +async fn get_attendant_active_session(state: &Arc, phone: &str) -> Option { + let conn = state.conn.clone(); + let phone_clone = phone.to_string(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + + use crate::shared::models::schema::user_sessions; + + // Find session assigned to this attendant phone + let session: Option = user_sessions::table + .filter( + user_sessions::context_data + .retrieve_as_text("assigned_to_phone") + .eq(&phone_clone), + ) + .filter( + user_sessions::context_data + .retrieve_as_text("status") + .ne("resolved"), + ) + .order(user_sessions::updated_at.desc()) + .first(&mut db_conn) + .ok(); + + session.map(|s| s.id) + }) + .await + .ok() + .flatten() +} + +/// Extract text content from different message types +fn extract_message_content(message: &WhatsAppMessage) -> String { + match message.message_type.as_str() { + "text" => message + .text + .as_ref() + .map(|t| t.body.clone()) + .unwrap_or_default(), + "interactive" => { + if let Some(interactive) = &message.interactive { + match interactive.interactive_type.as_str() { + "button_reply" => interactive + .button_reply + .as_ref() + .map(|b| b.title.clone()) + .unwrap_or_default(), + "list_reply" => interactive + .list_reply + .as_ref() + .map(|l| l.title.clone()) + .unwrap_or_default(), + _ => String::new(), + } + } else { + String::new() + } + } + "button" => message + .button + .as_ref() + .map(|b| b.text.clone()) + .unwrap_or_default(), + "image" | "audio" | "video" | "document" => { + format!("[{} message]", message.message_type) + } + "location" => { + if let Some(loc) = &message.location { + format!( + "šŸ“ Location: {}, {} ({})", + loc.latitude, + loc.longitude, + loc.name.as_deref().unwrap_or("Unknown") + ) + } else { + "[Location]".to_string() + } + } + _ => String::new(), + } +} + +/// Find existing session or create new one for WhatsApp user +async fn find_or_create_session( + state: &Arc, + phone: &str, + name: &str, +) -> Result<(UserSession, bool), Box> { + let conn = state.conn.clone(); + let phone_clone = phone.to_string(); + let name_clone = name.to_string(); + + let result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::shared::models::schema::{bots, user_sessions, users}; + + // Find user by phone (stored in email field or context) + let existing_user: Option<(Uuid, String)> = users::table + .filter(users::email.eq(&phone_clone)) + .select((users::id, users::username)) + .first(&mut db_conn) + .optional() + .map_err(|e| format!("Query error: {}", e))?; + + let (user_id, _username) = if let Some((id, uname)) = existing_user { + (id, uname) + } else { + // Create new user + let new_user_id = Uuid::new_v4(); + diesel::insert_into(users::table) + .values(( + users::id.eq(new_user_id), + users::username.eq(&name_clone), + users::email.eq(&phone_clone), + users::password_hash.eq("whatsapp_user"), + users::created_at.eq(diesel::dsl::now), + )) + .execute(&mut db_conn) + .map_err(|e| format!("Insert user error: {}", e))?; + (new_user_id, name_clone.clone()) + }; + + // Get default bot + let bot_id: Uuid = bots::table + .filter(bots::is_active.eq(true)) + .select(bots::id) + .first(&mut db_conn) + .map_err(|e| format!("No active bot found: {}", e))?; + + // Find active session for this user + let existing_session: Option = user_sessions::table + .filter(user_sessions::user_id.eq(user_id)) + .filter(user_sessions::bot_id.eq(bot_id)) + .order(user_sessions::created_at.desc()) + .first(&mut db_conn) + .optional() + .map_err(|e| format!("Session query error: {}", e))?; + + if let Some(session) = existing_session { + // Check if session is recent (within 24 hours) + let age = Utc::now() - session.updated_at; + if age.num_hours() < 24 { + return Ok((session, false)); + } + } + + // Create new session + let new_session_id = Uuid::new_v4(); + let context_data = serde_json::json!({ + "channel": "whatsapp", + "phone": phone_clone, + "name": name_clone, + }); + + diesel::insert_into(user_sessions::table) + .values(( + user_sessions::id.eq(new_session_id), + user_sessions::user_id.eq(user_id), + user_sessions::bot_id.eq(bot_id), + user_sessions::context_data.eq(&context_data), + user_sessions::created_at.eq(diesel::dsl::now), + user_sessions::updated_at.eq(diesel::dsl::now), + )) + .execute(&mut db_conn) + .map_err(|e| format!("Create session error: {}", e))?; + + let new_session: UserSession = user_sessions::table + .find(new_session_id) + .first(&mut db_conn) + .map_err(|e| format!("Load session error: {}", e))?; + + Ok((new_session, true)) + }) + .await + .map_err(|e| format!("Task error: {}", e))??; + + Ok(result) +} + +/// Check if session needs human attention +fn check_needs_human(session: &UserSession) -> bool { + if let Some(needs_human) = session.context_data.get("needs_human") { + return needs_human.as_bool().unwrap_or(false); + } + false +} + +/// Route message to bot for processing +async fn route_to_bot( + state: &Arc, + session: &UserSession, + content: &str, + is_new: bool, +) -> Result<(), Box> { + info!("Routing WhatsApp message to bot for session {}", session.id); + + let user_message = UserMessage { + session_id: session.id.to_string(), + content: content.to_string(), + user_id: session.user_id.to_string(), + }; + + // Get WhatsApp adapter for sending responses + let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + + // Process through bot orchestrator + let orchestrator = BotOrchestrator::new(state.clone()); + + // Create response channel + let (tx, mut rx) = tokio::sync::mpsc::channel::(100); + + // Spawn task to collect responses + let phone = session + .context_data + .get("phone") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + + tokio::spawn(async move { + while let Some(response) = rx.recv().await { + if !response.content.is_empty() { + // Send response to WhatsApp + let mut wa_response = response.clone(); + wa_response.user_id = phone.clone(); + wa_response.channel = "whatsapp".to_string(); + + if let Err(e) = adapter_for_send.send_message(wa_response).await { + error!("Failed to send WhatsApp response: {}", e); + } + } + } + }); + + // Process message + if let Err(e) = orchestrator + .process_message(user_message, Some(tx), is_new) + .await + { + error!("Bot processing error: {}", e); + + // Send error message back + let error_response = BotResponse { + bot_id: session.bot_id.to_string(), + session_id: session.id.to_string(), + user_id: phone.clone(), + channel: "whatsapp".to_string(), + content: "Sorry, I encountered an error processing your message. Please try again." + .to_string(), + message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + + if let Err(e) = adapter.send_message(error_response).await { + error!("Failed to send error response: {}", e); + } + } + + Ok(()) +} + +/// Route message to human attendant +async fn route_to_attendant( + state: &Arc, + session: &UserSession, + content: &str, + user_name: &str, + user_phone: &str, +) -> Result<(), Box> { + info!( + "Routing WhatsApp message to attendant for session {}", + session.id + ); + + // Get assigned attendant info + let assigned_to = session + .context_data + .get("assigned_to") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let priority = session + .context_data + .get("transfer_priority") + .and_then(|v| v.as_i64()) + .unwrap_or(1) as i32; + + // Save message to history + save_message_to_history(state, session, content, "customer").await?; + + // Create notification for attendants + let notification = AttendantNotification { + notification_type: "new_message".to_string(), + session_id: session.id.to_string(), + user_id: session.user_id.to_string(), + user_name: Some(user_name.to_string()), + user_phone: Some(user_phone.to_string()), + channel: "whatsapp".to_string(), + content: content.to_string(), + timestamp: Utc::now().to_rfc3339(), + assigned_to, + priority, + }; + + // Broadcast to attendant WebSocket connections + if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() { + if let Err(e) = broadcast_tx.send(notification.clone()) { + debug!("No attendants listening: {}", e); + } else { + info!("Notification sent to attendants"); + } + } + + // Also update queue status + update_queue_item(state, session, content).await?; + + Ok(()) +} + +/// Save message to conversation history +async fn save_message_to_history( + state: &Arc, + session: &UserSession, + content: &str, + sender: &str, +) -> Result<(), Box> { + let conn = state.conn.clone(); + let session_id = session.id; + let content_clone = content.to_string(); + let sender_clone = sender.to_string(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::shared::models::schema::message_history; + + diesel::insert_into(message_history::table) + .values(( + message_history::id.eq(Uuid::new_v4()), + message_history::session_id.eq(session_id), + message_history::role.eq(sender_clone), + message_history::content.eq(content_clone), + message_history::created_at.eq(diesel::dsl::now), + )) + .execute(&mut db_conn) + .map_err(|e| format!("Insert error: {}", e))?; + + Ok::<(), String>(()) + }) + .await + .map_err(|e| format!("Task error: {}", e))??; + + Ok(()) +} + +/// Update queue item with latest message +async fn update_queue_item( + state: &Arc, + session: &UserSession, + last_message: &str, +) -> Result<(), Box> { + let conn = state.conn.clone(); + let session_id = session.id; + let last_msg = last_message.to_string(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?; + + use crate::shared::models::schema::user_sessions; + + // Update session's context_data with last message + let current: UserSession = user_sessions::table + .find(session_id) + .first(&mut db_conn) + .map_err(|e| format!("Find error: {}", e))?; + + let mut ctx = current.context_data.clone(); + ctx["last_message"] = serde_json::json!(last_msg); + ctx["last_message_time"] = serde_json::json!(Utc::now().to_rfc3339()); + + diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) + .set(( + user_sessions::context_data.eq(&ctx), + user_sessions::updated_at.eq(diesel::dsl::now), + )) + .execute(&mut db_conn) + .map_err(|e| format!("Update error: {}", e))?; + + Ok::<(), String>(()) + }) + .await + .map_err(|e| format!("Task error: {}", e))??; + + Ok(()) +} + +/// Send request body +#[derive(Debug, Deserialize)] +pub struct SendMessageRequest { + pub to: String, + pub message: String, + #[serde(default)] + pub template: Option, +} + +/// Send a message to WhatsApp +pub async fn send_message( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!("Sending WhatsApp message to {}", request.to); + + // Get default bot for adapter config + let bot_id = get_default_bot_id(&state).await; + let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id); + + let response = BotResponse { + bot_id: bot_id.to_string(), + session_id: Uuid::new_v4().to_string(), + user_id: request.to.clone(), + channel: "whatsapp".to_string(), + content: request.message.clone(), + message_type: crate::shared::models::message_types::MessageType::EXTERNAL, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + + match adapter.send_message(response).await { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "message": "Message sent" + })), + ), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "success": false, + "error": e.to_string() + })), + ), + } +} + +/// Attendant response request +#[derive(Debug, Deserialize)] +pub struct AttendantRespondRequest { + pub session_id: String, + pub message: String, + pub attendant_id: String, +} + +/// Handle attendant response - routes back to WhatsApp +pub async fn attendant_respond( + State(state): State>, + Json(request): Json, +) -> impl IntoResponse { + info!( + "Attendant {} responding to session {}", + request.attendant_id, request.session_id + ); + + let session_id = match Uuid::parse_str(&request.session_id) { + Ok(id) => id, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "success": false, + "error": "Invalid session ID" + })), + ) + } + }; + + // Get session details + let conn = state.conn.clone(); + let session_result = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + use crate::shared::models::schema::user_sessions; + user_sessions::table + .find(session_id) + .first::(&mut db_conn) + .ok() + }) + .await + .ok() + .flatten(); + + let session = match session_result { + Some(s) => s, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "success": false, + "error": "Session not found" + })), + ) + } + }; + + // Get channel from session + let channel = session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web"); + + // Get recipient + let recipient = session + .context_data + .get("phone") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + if recipient.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "success": false, + "error": "No recipient found for session" + })), + ); + } + + // Save attendant message to history + if let Err(e) = save_message_to_history(&state, &session, &request.message, "attendant").await { + error!("Failed to save attendant message: {}", e); + } + + // Send to appropriate channel + match channel { + "whatsapp" => { + let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); + let response = BotResponse { + bot_id: session.bot_id.to_string(), + session_id: session.id.to_string(), + user_id: recipient.to_string(), + channel: "whatsapp".to_string(), + content: request.message.clone(), + message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + }; + + match adapter.send_message(response).await { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "message": "Response sent to WhatsApp" + })), + ), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "success": false, + "error": e.to_string() + })), + ), + } + } + _ => { + // For web and other channels, broadcast via WebSocket + if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() { + let notification = AttendantNotification { + notification_type: "attendant_response".to_string(), + session_id: session.id.to_string(), + user_id: session.user_id.to_string(), + user_name: None, + user_phone: None, + channel: channel.to_string(), + content: request.message.clone(), + timestamp: Utc::now().to_rfc3339(), + assigned_to: Some(request.attendant_id.clone()), + priority: 0, + }; + + let _ = broadcast_tx.send(notification); + } + + ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "message": "Response sent" + })), + ) + } + } +} + +/// Get verify token from config +async fn get_verify_token(state: &Arc) -> String { + let bot_id = get_default_bot_id(state).await; + let adapter = WhatsAppAdapter::new(state.conn.clone(), bot_id); + + // The verify token is stored in the adapter's config + // For now return a default - in production this should come from config + std::env::var("WHATSAPP_VERIFY_TOKEN").unwrap_or_else(|_| "webhook_verify".to_string()) +} + +/// Get default bot ID +async fn get_default_bot_id(state: &Arc) -> Uuid { + let conn = state.conn.clone(); + + tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + use crate::shared::models::schema::bots; + bots::table + .filter(bots::is_active.eq(true)) + .select(bots::id) + .first::(&mut db_conn) + .ok() + }) + .await + .ok() + .flatten() + .unwrap_or_else(Uuid::nil) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_text_message() { + let message = WhatsAppMessage { + id: "msg123".to_string(), + from: "+1234567890".to_string(), + timestamp: "1234567890".to_string(), + message_type: "text".to_string(), + text: Some(WhatsAppText { + body: "Hello, world!".to_string(), + }), + image: None, + audio: None, + video: None, + document: None, + location: None, + interactive: None, + button: None, + }; + + let content = extract_message_content(&message); + assert_eq!(content, "Hello, world!"); + } + + #[test] + fn test_extract_interactive_button() { + let message = WhatsAppMessage { + id: "msg123".to_string(), + from: "+1234567890".to_string(), + timestamp: "1234567890".to_string(), + message_type: "interactive".to_string(), + text: None, + image: None, + audio: None, + video: None, + document: None, + location: None, + interactive: Some(WhatsAppInteractive { + interactive_type: "button_reply".to_string(), + button_reply: Some(WhatsAppButtonReply { + id: "btn1".to_string(), + title: "Yes".to_string(), + }), + list_reply: None, + }), + button: None, + }; + + let content = extract_message_content(&message); + assert_eq!(content, "Yes"); + } +}