//! Queue Management API for Attendant System //! //! Handles conversation queues, attendant assignment, and real-time updates. //! Reads attendant data from attendant.csv in bot's .gbai folder. use crate::shared::models::UserSession; use crate::shared::state::AppState; use axum::{ extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, Json, }; use chrono::Utc; use diesel::prelude::*; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct QueueItem { pub session_id: Uuid, pub user_id: Uuid, pub bot_id: Uuid, pub channel: String, pub user_name: String, pub user_email: Option, pub last_message: String, pub last_message_time: String, pub waiting_time_seconds: i64, pub priority: i32, pub status: QueueStatus, pub assigned_to: Option, pub assigned_to_name: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum QueueStatus { Waiting, Assigned, Active, Resolved, Abandoned, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AttendantStats { pub attendant_id: String, pub attendant_name: String, pub channel: String, pub preferences: String, pub active_conversations: i32, pub total_handled_today: i32, pub avg_response_time_seconds: i32, pub status: AttendantStatus, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AttendantCSV { pub id: String, pub name: String, pub channel: String, pub preferences: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum AttendantStatus { Online, Busy, Away, Offline, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AssignRequest { pub session_id: Uuid, pub attendant_id: Uuid, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransferRequest { pub session_id: Uuid, pub from_attendant_id: Uuid, pub to_attendant_id: Uuid, pub reason: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct QueueFilters { pub channel: Option, pub status: Option, pub assigned_to: Option, } /// Check if bot has transfer enabled in config.csv async fn is_transfer_enabled(bot_id: Uuid, work_path: &str) -> bool { let config_path = PathBuf::from(work_path) .join(format!("{}.gbai", bot_id)) .join("config.csv"); if !config_path.exists() { warn!("Config file not found: {:?}", config_path); return false; } match std::fs::read_to_string(&config_path) { Ok(content) => { for line in content.lines() { if line.to_lowercase().contains("transfer") && line.to_lowercase().contains("true") { return true; } } false } Err(e) => { error!("Failed to read config file: {}", e); false } } } /// Read attendants from attendant.csv async fn read_attendants_csv(bot_id: Uuid, work_path: &str) -> Vec { let attendant_path = PathBuf::from(work_path) .join(format!("{}.gbai", bot_id)) .join("attendant.csv"); if !attendant_path.exists() { warn!("Attendant file not found: {:?}", attendant_path); return Vec::new(); } match std::fs::read_to_string(&attendant_path) { Ok(content) => { let mut attendants = Vec::new(); let mut lines = content.lines(); // Skip header lines.next(); for line in lines { let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect(); if parts.len() >= 4 { attendants.push(AttendantCSV { id: parts[0].to_string(), name: parts[1].to_string(), channel: parts[2].to_string(), preferences: parts[3].to_string(), }); } } attendants } Err(e) => { error!("Failed to read attendant file: {}", e); Vec::new() } } } /// GET /api/queue/list /// Get all conversations in queue (only if bot has transfer=true) pub async fn list_queue( State(state): State>, Query(filters): Query, ) -> impl IntoResponse { info!("Listing queue items with filters: {:?}", filters); let result = tokio::task::spawn_blocking({ let conn = state.conn.clone(); move || { let mut db_conn = conn .get() .map_err(|e| format!("Failed to get database connection: {}", e))?; use crate::shared::models::schema::user_sessions; use crate::shared::models::schema::users; // Build query - get recent sessions with user info let sessions_data: Vec = user_sessions::table .order(user_sessions::created_at.desc()) .limit(50) .load(&mut db_conn) .map_err(|e| format!("Failed to load sessions: {}", e))?; let mut queue_items = Vec::new(); for session_data in sessions_data { // Get user info separately let user_info: Option<(String, String)> = users::table .filter(users::id.eq(session_data.user_id)) .select((users::username, users::email)) .first(&mut db_conn) .optional() .map_err(|e| format!("Failed to load user: {}", e))?; let (uname, uemail) = user_info.unwrap_or_else(|| { ( format!("user_{}", session_data.user_id), format!("{}@unknown.local", session_data.user_id), ) }); let channel = session_data .context_data .get("channel") .and_then(|c| c.as_str()) .unwrap_or("web") .to_string(); let waiting_time = (Utc::now() - session_data.updated_at).num_seconds(); queue_items.push(QueueItem { session_id: session_data.id, user_id: session_data.user_id, bot_id: session_data.bot_id, channel, user_name: uname, user_email: Some(uemail), last_message: session_data.title.clone(), last_message_time: session_data.updated_at.to_rfc3339(), waiting_time_seconds: waiting_time, priority: if waiting_time > 300 { 2 } else { 1 }, status: QueueStatus::Waiting, assigned_to: None, assigned_to_name: None, }); } Ok::, String>(queue_items) } }) .await; match result { Ok(Ok(queue_items)) => { info!("Found {} queue items", queue_items.len()); (StatusCode::OK, Json(queue_items)) } Ok(Err(e)) => { error!("Queue list error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(vec![] as Vec), ) } Err(e) => { error!("Task error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(vec![] as Vec), ) } } } /// GET /api/queue/attendants?bot_id={bot_id} /// Get all attendants from attendant.csv for a bot pub async fn list_attendants( State(state): State>, Query(params): Query>, ) -> impl IntoResponse { info!("Listing attendants"); let bot_id_str = params.get("bot_id").cloned().unwrap_or_default(); let bot_id = match Uuid::parse_str(&bot_id_str) { Ok(id) => id, Err(_) => { // Get default bot let conn = state.conn.clone(); let result = tokio::task::spawn_blocking(move || { let mut db_conn = conn.get().ok()?; use crate::shared::models::schema::bots; bots::table .filter(bots::is_active.eq(true)) .select(bots::id) .first::(&mut db_conn) .ok() }) .await; match result { Ok(Some(id)) => id, _ => { error!("No valid bot_id provided and no default bot found"); return (StatusCode::BAD_REQUEST, Json(vec![] as Vec)); } } } }; // Check if transfer is enabled let work_path = "./work"; if !is_transfer_enabled(bot_id, work_path).await { warn!("Transfer not enabled for bot {}", bot_id); return (StatusCode::OK, Json(vec![] as Vec)); } // Read attendants from CSV let attendant_csvs = read_attendants_csv(bot_id, work_path).await; let attendants: Vec = attendant_csvs .into_iter() .map(|att| AttendantStats { attendant_id: att.id, attendant_name: att.name, channel: att.channel, preferences: att.preferences, active_conversations: 0, total_handled_today: 0, avg_response_time_seconds: 0, status: AttendantStatus::Online, }) .collect(); info!("Found {} attendants from CSV", attendants.len()); (StatusCode::OK, Json(attendants)) } /// POST /api/queue/assign /// Assign conversation to attendant (stores in session context_data) pub async fn assign_conversation( State(state): State>, Json(request): Json, ) -> impl IntoResponse { info!( "Assigning session {} to attendant {}", request.session_id, request.attendant_id ); // Store assignment in session context_data let result = tokio::task::spawn_blocking({ let conn = state.conn.clone(); let session_id = request.session_id; let attendant_id = request.attendant_id; move || { let mut db_conn = conn .get() .map_err(|e| format!("Failed to get database connection: {}", e))?; use crate::shared::models::schema::user_sessions; // Get current session let session: UserSession = user_sessions::table .filter(user_sessions::id.eq(session_id)) .first(&mut db_conn) .map_err(|e| format!("Session not found: {}", e))?; // Update context_data with assignment let mut ctx = session.context_data.clone(); ctx["assigned_to"] = serde_json::json!(attendant_id.to_string()); ctx["assigned_at"] = serde_json::json!(Utc::now().to_rfc3339()); ctx["status"] = serde_json::json!("assigned"); diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) .set(user_sessions::context_data.eq(&ctx)) .execute(&mut db_conn) .map_err(|e| format!("Failed to update session: {}", e))?; Ok::<(), String>(()) } }) .await; match result { Ok(Ok(())) => ( StatusCode::OK, Json(serde_json::json!({ "success": true, "session_id": request.session_id, "attendant_id": request.attendant_id, "assigned_at": Utc::now().to_rfc3339() })), ), Ok(Err(e)) => { error!("Assignment error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": e })), ) } Err(e) => { error!("Assignment error: {:?}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": format!("{:?}", e) })), ) } } } /// POST /api/queue/transfer /// Transfer conversation between attendants pub async fn transfer_conversation( State(state): State>, Json(request): Json, ) -> impl IntoResponse { info!( "Transferring session {} from {} to {}", request.session_id, request.from_attendant_id, request.to_attendant_id ); let result = tokio::task::spawn_blocking({ let conn = state.conn.clone(); let session_id = request.session_id; let to_attendant = request.to_attendant_id; let reason = request.reason.clone(); move || { let mut db_conn = conn .get() .map_err(|e| format!("Failed to get database connection: {}", e))?; use crate::shared::models::schema::user_sessions; // Get current session let session: UserSession = user_sessions::table .filter(user_sessions::id.eq(session_id)) .first(&mut db_conn) .map_err(|e| format!("Session not found: {}", e))?; // Update context_data with transfer info let mut ctx = session.context_data.clone(); ctx["assigned_to"] = serde_json::json!(to_attendant.to_string()); ctx["transferred_at"] = serde_json::json!(Utc::now().to_rfc3339()); ctx["transfer_reason"] = serde_json::json!(reason.unwrap_or_default()); ctx["status"] = serde_json::json!("transferred"); diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) .set(( user_sessions::context_data.eq(&ctx), user_sessions::updated_at.eq(Utc::now()), )) .execute(&mut db_conn) .map_err(|e| format!("Failed to update session: {}", e))?; Ok::<(), String>(()) } }) .await; match result { Ok(Ok(())) => ( StatusCode::OK, Json(serde_json::json!({ "success": true, "session_id": request.session_id, "from_attendant": request.from_attendant_id, "to_attendant": request.to_attendant_id, "transferred_at": Utc::now().to_rfc3339() })), ), Ok(Err(e)) => { error!("Transfer error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": e })), ) } Err(e) => { error!("Transfer error: {:?}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": format!("{:?}", e) })), ) } } } /// POST /api/queue/resolve /// Mark conversation as resolved pub async fn resolve_conversation( State(state): State>, Json(payload): Json, ) -> impl IntoResponse { let session_id = payload .get("session_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()) .unwrap_or_else(Uuid::nil); info!("Resolving session {}", session_id); let result = tokio::task::spawn_blocking({ let conn = state.conn.clone(); move || { let mut db_conn = conn .get() .map_err(|e| format!("Failed to get database connection: {}", e))?; use crate::shared::models::schema::user_sessions; // Get current session let session: UserSession = user_sessions::table .filter(user_sessions::id.eq(session_id)) .first(&mut db_conn) .map_err(|e| format!("Session not found: {}", e))?; // Update context_data to mark as resolved let mut ctx = session.context_data.clone(); ctx["status"] = serde_json::json!("resolved"); ctx["resolved_at"] = serde_json::json!(Utc::now().to_rfc3339()); ctx["resolved"] = serde_json::json!(true); diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id))) .set(( user_sessions::context_data.eq(&ctx), user_sessions::updated_at.eq(Utc::now()), )) .execute(&mut db_conn) .map_err(|e| format!("Failed to update session: {}", e))?; Ok::<(), String>(()) } }) .await; match result { Ok(Ok(())) => ( StatusCode::OK, Json(serde_json::json!({ "success": true, "session_id": session_id, "resolved_at": Utc::now().to_rfc3339() })), ), Ok(Err(e)) => { error!("Resolve error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": e })), ) } Err(e) => { error!("Resolve error: {:?}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "success": false, "error": format!("{:?}", e) })), ) } } } /// GET /api/queue/insights/{session_id} /// Get bot insights for a conversation pub async fn get_insights( State(state): State>, Path(session_id): Path, ) -> impl IntoResponse { info!("Getting insights for session {}", session_id); let result = tokio::task::spawn_blocking({ let conn = state.conn.clone(); move || { let mut db_conn = conn .get() .map_err(|e| format!("Failed to get database connection: {}", e))?; use crate::shared::models::schema::message_history; // Get recent messages let messages: Vec<(String, i32)> = message_history::table .filter(message_history::session_id.eq(session_id)) .select((message_history::content_encrypted, message_history::role)) .order(message_history::created_at.desc()) .limit(10) .load(&mut db_conn) .map_err(|e| format!("Failed to load messages: {}", e))?; // Analyze sentiment and intent (simplified) let user_messages: Vec = messages .iter() .filter(|(_, r)| *r == 0) // User messages .map(|(c, _)| c.clone()) .collect(); let sentiment = if user_messages.iter().any(|m| { m.to_lowercase().contains("urgent") || m.to_lowercase().contains("problem") || m.to_lowercase().contains("issue") }) { "negative" } else if user_messages .iter() .any(|m| m.to_lowercase().contains("thanks") || m.to_lowercase().contains("great")) { "positive" } else { "neutral" }; let suggested_reply = if sentiment == "negative" { "I understand this is frustrating. Let me help you resolve this immediately." } else { "How can I assist you further?" }; Ok::(serde_json::json!({ "session_id": session_id, "sentiment": sentiment, "message_count": messages.len(), "suggested_reply": suggested_reply, "key_topics": ["support", "technical"], "priority": if sentiment == "negative" { "high" } else { "normal" }, "language": "en" })) } }) .await; match result { Ok(Ok(insights)) => (StatusCode::OK, Json(insights)), Ok(Err(e)) => { error!("Insights error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": e })), ) } Err(e) => { error!("Task error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ "error": format!("Task error: {}", e) })), ) } } }