feat(attendance): Add LLM-assisted attendant features

- Real-time tips when customer messages arrive
- Message polishing with one click
- Smart reply generation (3 contextual suggestions)
- Auto-summary when attendant takes conversation
- LLM-powered sentiment analysis with escalation warnings

WhatsApp Attendant Commands:
- /queue, /take, /status, /transfer, /resolve
- /tips, /polish, /replies, /summary, /help
- Portuguese versions: /fila, /pegar, /dicas, /polir, /respostas, /resumo

Config options (config.csv):
- attendant-llm-tips
- attendant-polish-message
- attendant-smart-replies
- attendant-auto-summary
- attendant-sentiment-analysis

API Endpoints:
- POST /api/attendance/llm/tips
- POST /api/attendance/llm/polish
- POST /api/attendance/llm/smart-replies
- GET /api/attendance/llm/summary/{session_id}
- POST /api/attendance/llm/sentiment
- GET /api/attendance/llm/config/{bot_id}

Uses bot's system prompt for consistency between bot and human-assisted responses.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-05 13:47:15 -03:00
parent 415c7cce77
commit bde3244ce9
9 changed files with 4725 additions and 7 deletions

2090
src/attendance/llm_assist.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -7,6 +7,19 @@
//! - **Queue System**: Human handoff for conversations that need human attention
//! - **Keyword Services**: Check-in/out, break/resume tracking via keywords
//! - **Drive Integration**: S3 storage for attendance records
//! - **WebSocket**: Real-time notifications for attendants
//! - **LLM Assist**: AI-powered tips, polishing, smart replies, summaries, and sentiment analysis
//!
//! ## LLM Assist Features (config.csv)
//!
//! ```csv
//! name,value
//! attendant-llm-tips,true
//! attendant-polish-message,true
//! attendant-smart-replies,true
//! attendant-auto-summary,true
//! attendant-sentiment-analysis,true
//! ```
//!
//! ## Usage
//!
@ -18,6 +31,7 @@
pub mod drive;
pub mod keyword_services;
pub mod llm_assist;
pub mod queue;
// Re-export main types for convenience
@ -26,17 +40,40 @@ pub use keyword_services::{
AttendanceCommand, AttendanceRecord, AttendanceResponse, AttendanceService, KeywordConfig,
KeywordParser, ParsedCommand,
};
pub use llm_assist::{
AttendantTip, ConversationMessage, ConversationSummary, LlmAssistConfig, PolishRequest,
PolishResponse, SentimentAnalysis, SentimentResponse, SmartRepliesRequest,
SmartRepliesResponse, SmartReply, SummaryRequest, SummaryResponse, TipRequest, TipResponse,
TipType,
};
pub use queue::{
AssignRequest, AttendantStats, AttendantStatus, QueueFilters, QueueItem, QueueStatus,
TransferRequest,
};
use crate::shared::state::AppState;
use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
use crate::core::bot::channels::ChannelAdapter;
use crate::shared::models::{BotResponse, UserSession};
use crate::shared::state::{AppState, AttendantNotification};
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, Query, State,
},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
Json, Router,
};
use chrono::Utc;
use diesel::prelude::*;
use futures::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
/// Configure attendance routes
pub fn configure_attendance_routes() -> Router<Arc<AppState>> {
@ -50,10 +87,570 @@ pub fn configure_attendance_routes() -> Router<Arc<AppState>> {
post(queue::transfer_conversation),
)
.route(
"/api/attendance/resolve/:session_id",
"/api/attendance/resolve/{session_id}",
post(queue::resolve_conversation),
)
.route("/api/attendance/insights", get(queue::get_insights))
// Attendant response endpoint
.route("/api/attendance/respond", post(attendant_respond))
// WebSocket for real-time notifications
.route("/ws/attendant", get(attendant_websocket_handler))
// LLM Assist endpoints - AI-powered attendant assistance
.route("/api/attendance/llm/tips", post(llm_assist::generate_tips))
.route(
"/api/attendance/llm/polish",
post(llm_assist::polish_message),
)
.route(
"/api/attendance/llm/smart-replies",
post(llm_assist::generate_smart_replies),
)
.route(
"/api/attendance/llm/summary/{session_id}",
get(llm_assist::generate_summary),
)
.route(
"/api/attendance/llm/sentiment",
post(llm_assist::analyze_sentiment),
)
.route(
"/api/attendance/llm/config/{bot_id}",
get(llm_assist::get_llm_config),
)
}
/// Attendant response request
#[derive(Debug, Deserialize)]
pub struct AttendantRespondRequest {
pub session_id: String,
pub message: String,
pub attendant_id: String,
}
/// Attendant response result
#[derive(Debug, Serialize)]
pub struct AttendantRespondResponse {
pub success: bool,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// Handle attendant response - routes back to the customer's channel
pub async fn attendant_respond(
State(state): State<Arc<AppState>>,
Json(request): Json<AttendantRespondRequest>,
) -> impl IntoResponse {
info!(
"Attendant {} responding to session {}",
request.attendant_id, request.session_id
);
let session_id = match Uuid::parse_str(&request.session_id) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(AttendantRespondResponse {
success: false,
message: "Invalid session ID".to_string(),
error: Some("Could not parse session ID as UUID".to_string()),
}),
)
}
};
// Get session details
let conn = state.conn.clone();
let session_result = tokio::task::spawn_blocking(move || {
let mut db_conn = conn.get().ok()?;
use crate::shared::models::schema::user_sessions;
user_sessions::table
.find(session_id)
.first::<UserSession>(&mut db_conn)
.ok()
})
.await
.ok()
.flatten();
let session = match session_result {
Some(s) => s,
None => {
return (
StatusCode::NOT_FOUND,
Json(AttendantRespondResponse {
success: false,
message: "Session not found".to_string(),
error: Some("No session with that ID exists".to_string()),
}),
)
}
};
// Get channel from session context
let channel = session
.context_data
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("web");
// Get recipient (phone number for WhatsApp, user_id for web)
let recipient = session
.context_data
.get("phone")
.and_then(|v| v.as_str())
.unwrap_or("");
// Save attendant message to history
if let Err(e) = save_message_to_history(&state, &session, &request.message, "attendant").await {
error!("Failed to save attendant message: {}", e);
}
// Send to appropriate channel
match channel {
"whatsapp" => {
if recipient.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(AttendantRespondResponse {
success: false,
message: "No phone number found".to_string(),
error: Some("Session has no phone number in context".to_string()),
}),
);
}
let adapter = WhatsAppAdapter::new(state.conn.clone(), session.bot_id);
let response = BotResponse {
bot_id: session.bot_id.to_string(),
session_id: session.id.to_string(),
user_id: recipient.to_string(),
channel: "whatsapp".to_string(),
content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE,
stream_token: None,
is_complete: true,
suggestions: vec![],
context_name: None,
};
match adapter.send_message(response).await {
Ok(_) => {
// Notify other attendants about the response
broadcast_attendant_action(&state, &session, &request, "attendant_response")
.await;
(
StatusCode::OK,
Json(AttendantRespondResponse {
success: true,
message: "Response sent to WhatsApp".to_string(),
error: None,
}),
)
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(AttendantRespondResponse {
success: false,
message: "Failed to send WhatsApp message".to_string(),
error: Some(e.to_string()),
}),
),
}
}
"web" | _ => {
// For web and other channels, send via WebSocket if connected
let sent = if let Some(tx) = state
.response_channels
.lock()
.await
.get(&session.id.to_string())
{
let response = BotResponse {
bot_id: session.bot_id.to_string(),
session_id: session.id.to_string(),
user_id: session.user_id.to_string(),
channel: channel.to_string(),
content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE,
stream_token: None,
is_complete: true,
suggestions: vec![],
context_name: None,
};
tx.send(response).await.is_ok()
} else {
false
};
// Notify other attendants
broadcast_attendant_action(&state, &session, &request, "attendant_response").await;
if sent {
(
StatusCode::OK,
Json(AttendantRespondResponse {
success: true,
message: "Response sent via WebSocket".to_string(),
error: None,
}),
)
} else {
// Message saved but couldn't be delivered in real-time
(
StatusCode::OK,
Json(AttendantRespondResponse {
success: true,
message: "Response saved (customer not connected)".to_string(),
error: None,
}),
)
}
}
}
}
/// Save message to conversation history
async fn save_message_to_history(
state: &Arc<AppState>,
session: &UserSession,
content: &str,
sender: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let conn = state.conn.clone();
let session_id = session.id;
let content_clone = content.to_string();
let sender_clone = sender.to_string();
tokio::task::spawn_blocking(move || {
let mut db_conn = conn.get().map_err(|e| format!("DB error: {}", e))?;
use crate::shared::models::schema::message_history;
diesel::insert_into(message_history::table)
.values((
message_history::id.eq(Uuid::new_v4()),
message_history::session_id.eq(session_id),
message_history::role.eq(sender_clone),
message_history::content.eq(content_clone),
message_history::created_at.eq(diesel::dsl::now),
))
.execute(&mut db_conn)
.map_err(|e| format!("Insert error: {}", e))?;
Ok::<(), String>(())
})
.await
.map_err(|e| format!("Task error: {}", e))??;
Ok(())
}
/// Broadcast attendant action to other connected attendants
async fn broadcast_attendant_action(
state: &Arc<AppState>,
session: &UserSession,
request: &AttendantRespondRequest,
action_type: &str,
) {
if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() {
let notification = AttendantNotification {
notification_type: action_type.to_string(),
session_id: session.id.to_string(),
user_id: session.user_id.to_string(),
user_name: session
.context_data
.get("name")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
user_phone: session
.context_data
.get("phone")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
channel: session
.context_data
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("web")
.to_string(),
content: request.message.clone(),
timestamp: Utc::now().to_rfc3339(),
assigned_to: Some(request.attendant_id.clone()),
priority: 0,
};
if let Err(e) = broadcast_tx.send(notification) {
debug!("No attendants listening for broadcast: {}", e);
}
}
}
/// WebSocket handler for attendant real-time notifications
pub async fn attendant_websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
Query(params): Query<HashMap<String, String>>,
) -> impl IntoResponse {
let attendant_id = params.get("attendant_id").cloned();
if attendant_id.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "attendant_id is required" })),
)
.into_response();
}
let attendant_id = attendant_id.unwrap();
info!(
"Attendant WebSocket connection request from: {}",
attendant_id
);
ws.on_upgrade(move |socket| handle_attendant_websocket(socket, state, attendant_id))
.into_response()
}
/// Handle attendant WebSocket connection
async fn handle_attendant_websocket(socket: WebSocket, state: Arc<AppState>, attendant_id: String) {
let (mut sender, mut receiver) = socket.split();
info!("Attendant WebSocket connected: {}", attendant_id);
// Send welcome message
let welcome = serde_json::json!({
"type": "connected",
"attendant_id": attendant_id,
"message": "Connected to attendant notification service",
"timestamp": Utc::now().to_rfc3339()
});
if let Ok(welcome_str) = serde_json::to_string(&welcome) {
if sender
.send(Message::Text(welcome_str.into()))
.await
.is_err()
{
error!("Failed to send welcome message to attendant");
return;
}
}
// Subscribe to broadcast channel
let mut broadcast_rx = if let Some(broadcast_tx) = state.attendant_broadcast.as_ref() {
broadcast_tx.subscribe()
} else {
warn!("No broadcast channel available for attendants");
return;
};
// Task to forward broadcast messages to WebSocket
let attendant_id_clone = attendant_id.clone();
let mut send_task = tokio::spawn(async move {
loop {
match broadcast_rx.recv().await {
Ok(notification) => {
// Check if this notification is relevant to this attendant
// Send all notifications for now (can filter by assigned_to later)
let should_send = notification.assigned_to.is_none()
|| notification.assigned_to.as_ref() == Some(&attendant_id_clone);
if should_send {
if let Ok(json_str) = serde_json::to_string(&notification) {
debug!(
"Sending notification to attendant {}: {}",
attendant_id_clone, notification.notification_type
);
if sender.send(Message::Text(json_str.into())).await.is_err() {
error!("Failed to send notification to attendant WebSocket");
break;
}
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(
"Attendant {} lagged behind by {} messages",
attendant_id_clone, n
);
}
Err(broadcast::error::RecvError::Closed) => {
info!("Broadcast channel closed");
break;
}
}
}
});
// Task to handle incoming messages from attendant (e.g., status updates, typing indicators)
let state_clone = state.clone();
let attendant_id_for_recv = attendant_id.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
debug!(
"Received message from attendant {}: {}",
attendant_id_for_recv, text
);
// Parse and handle attendant messages
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
handle_attendant_message(&state_clone, &attendant_id_for_recv, parsed)
.await;
}
}
Message::Ping(data) => {
debug!("Received ping from attendant {}", attendant_id_for_recv);
// Pong is automatically sent by axum
}
Message::Close(_) => {
info!(
"Attendant {} WebSocket close requested",
attendant_id_for_recv
);
break;
}
_ => {}
}
}
});
// Wait for either task to complete
tokio::select! {
_ = (&mut send_task) => {
recv_task.abort();
}
_ = (&mut recv_task) => {
send_task.abort();
}
}
info!("Attendant WebSocket disconnected: {}", attendant_id);
}
/// Handle incoming messages from attendant WebSocket
async fn handle_attendant_message(
state: &Arc<AppState>,
attendant_id: &str,
message: serde_json::Value,
) {
let msg_type = message
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
match msg_type {
"status_update" => {
// Update attendant status (online, busy, away, offline)
if let Some(status) = message.get("status").and_then(|v| v.as_str()) {
info!("Attendant {} status update: {}", attendant_id, status);
// Could update in database or broadcast to other attendants
}
}
"typing" => {
// Broadcast typing indicator to customer
if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) {
debug!(
"Attendant {} typing in session {}",
attendant_id, session_id
);
// Could broadcast to customer's WebSocket
}
}
"read" => {
// Mark messages as read
if let Some(session_id) = message.get("session_id").and_then(|v| v.as_str()) {
debug!(
"Attendant {} marked session {} as read",
attendant_id, session_id
);
}
}
"respond" => {
// Handle response message (alternative to REST API)
if let (Some(session_id), Some(content)) = (
message.get("session_id").and_then(|v| v.as_str()),
message.get("content").and_then(|v| v.as_str()),
) {
info!(
"Attendant {} responding to {} via WebSocket",
attendant_id, session_id
);
// Process response similar to REST endpoint
let request = AttendantRespondRequest {
session_id: session_id.to_string(),
message: content.to_string(),
attendant_id: attendant_id.to_string(),
};
// Get session and send response
if let Ok(uuid) = Uuid::parse_str(session_id) {
let conn = state.conn.clone();
if let Some(session) = tokio::task::spawn_blocking(move || {
let mut db_conn = conn.get().ok()?;
use crate::shared::models::schema::user_sessions;
user_sessions::table
.find(uuid)
.first::<UserSession>(&mut db_conn)
.ok()
})
.await
.ok()
.flatten()
{
// Save to history
let _ =
save_message_to_history(state, &session, content, "attendant").await;
// Send to channel
let channel = session
.context_data
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("web");
if channel == "whatsapp" {
if let Some(phone) =
session.context_data.get("phone").and_then(|v| v.as_str())
{
let adapter =
WhatsAppAdapter::new(state.conn.clone(), session.bot_id);
let response = BotResponse {
bot_id: session.bot_id.to_string(),
session_id: session.id.to_string(),
user_id: phone.to_string(),
channel: "whatsapp".to_string(),
content: content.to_string(),
message_type:
crate::shared::models::message_types::MessageType::BOT_RESPONSE,
stream_token: None,
is_complete: true,
suggestions: vec![],
context_name: None,
};
let _ = adapter.send_message(response).await;
}
}
// Broadcast to other attendants
broadcast_attendant_action(state, &session, &request, "attendant_response")
.await;
}
}
}
}
_ => {
debug!(
"Unknown message type from attendant {}: {}",
attendant_id, msg_type
);
}
}
}
#[cfg(test)]
@ -66,4 +663,17 @@ mod tests {
let _config = KeywordConfig::default();
let _parser = KeywordParser::new();
}
#[test]
fn test_respond_request_parse() {
let json = r#"{
"session_id": "123e4567-e89b-12d3-a456-426614174000",
"message": "Hello, how can I help?",
"attendant_id": "att-001"
}"#;
let request: AttendantRespondRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.attendant_id, "att-001");
assert_eq!(request.message, "Hello, how can I help?");
}
}

View file

@ -97,22 +97,42 @@ pub struct QueueFilters {
pub assigned_to: Option<Uuid>,
}
/// Check if bot has transfer enabled in config.csv
/// Check if CRM/transfer is enabled in config.csv
/// Supports both `crm-enabled = true` and legacy `transfer = true`
async fn is_transfer_enabled(bot_id: Uuid, work_path: &str) -> bool {
let config_path = PathBuf::from(work_path)
.join(format!("{}.gbai", bot_id))
.join("config.csv");
if !config_path.exists() {
// Try alternate path without UUID prefix
let alt_path = PathBuf::from(work_path).join("config.csv");
if alt_path.exists() {
return check_config_for_crm_enabled(&alt_path);
}
warn!("Config file not found: {:?}", config_path);
return false;
}
match std::fs::read_to_string(&config_path) {
check_config_for_crm_enabled(&config_path)
}
/// Helper to check config file for CRM/transfer settings
fn check_config_for_crm_enabled(config_path: &PathBuf) -> bool {
match std::fs::read_to_string(config_path) {
Ok(content) => {
for line in content.lines() {
if line.to_lowercase().contains("transfer") && line.to_lowercase().contains("true")
let line_lower = line.to_lowercase();
// Check for crm-enabled = true or crm_enabled = true (primary)
if (line_lower.contains("crm-enabled") || line_lower.contains("crm_enabled"))
&& line_lower.contains("true")
{
info!("CRM enabled via crm-enabled setting");
return true;
}
// Also support legacy transfer = true for backward compatibility
if line_lower.contains("transfer") && line_lower.contains("true") {
info!("CRM enabled via legacy transfer setting");
return true;
}
}

View file

@ -59,6 +59,7 @@ pub mod social_media;
pub mod string_functions;
pub mod switch_case;
pub mod table_definition;
pub mod transfer_to_human;
pub mod universal_messaging;
pub mod use_kb;
pub mod use_tool;

View file

@ -0,0 +1,835 @@
//! Transfer to Human Keyword
//!
//! Provides the TRANSFER TO HUMAN keyword for bot-to-human handoff in conversations.
//! This is a critical feature for hybrid bot/human support workflows.
//!
//! ## Features
//!
//! - Transfer to any available attendant
//! - Transfer to specific person by name or alias
//! - Transfer to specific department
//! - Priority-based queue placement
//! - Context passing for seamless handoff
//!
//! ## Configuration
//!
//! Requires `crm-enabled = true` in the bot's config.csv file.
//! Attendants are configured in attendant.csv in the bot's .gbai folder.
//!
//! ## Usage in BASIC
//!
//! ```basic
//! ' Transfer to any available human
//! TRANSFER TO HUMAN
//!
//! ' Transfer to specific person
//! TRANSFER TO HUMAN "John Smith"
//!
//! ' Transfer to department
//! TRANSFER TO HUMAN department: "sales"
//!
//! ' Transfer with priority and context
//! TRANSFER TO HUMAN "support", "high", "Customer needs help with billing"
//! ```
//!
//! ## As LLM Tool
//!
//! This keyword is also registered as an LLM tool, allowing the AI to
//! automatically transfer conversations when appropriate.
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::Utc;
use diesel::prelude::*;
use log::{debug, error, info, warn};
use rhai::{Dynamic, Engine, Map};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
/// Transfer request structure
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferToHumanRequest {
/// Optional name or alias of the person to transfer to
pub name: Option<String>,
/// Optional department to transfer to
pub department: Option<String>,
/// Priority level: "normal", "high", "urgent"
pub priority: Option<String>,
/// Reason for the transfer (passed to attendant)
pub reason: Option<String>,
/// Additional context from the conversation
pub context: Option<String>,
}
/// Transfer result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferResult {
pub success: bool,
pub status: TransferStatus,
pub queue_position: Option<i32>,
pub assigned_to: Option<String>,
pub assigned_to_name: Option<String>,
pub estimated_wait_seconds: Option<i32>,
pub message: String,
}
/// Transfer status
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TransferStatus {
/// Queued for next available attendant
Queued,
/// Assigned to specific attendant
Assigned,
/// Attendant is online and ready
Connected,
/// No attendants available
NoAttendants,
/// CRM not enabled
CrmDisabled,
/// Specified attendant not found
AttendantNotFound,
/// Error during transfer
Error,
}
/// Attendant information from CSV
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Attendant {
pub id: String,
pub name: String,
pub channel: String,
pub preferences: String,
pub department: Option<String>,
pub aliases: Vec<String>,
pub status: AttendantStatus,
}
/// Attendant status
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AttendantStatus {
Online,
Busy,
Away,
Offline,
}
impl Default for AttendantStatus {
fn default() -> Self {
AttendantStatus::Offline
}
}
/// Check if CRM is enabled in bot's config.csv
pub fn is_crm_enabled(bot_id: Uuid, work_path: &str) -> bool {
let config_path = PathBuf::from(work_path)
.join(format!("{}.gbai", bot_id))
.join("config.csv");
if !config_path.exists() {
// Also try without UUID prefix
let alt_path = PathBuf::from(work_path).join("config.csv");
if alt_path.exists() {
return check_config_for_crm(&alt_path);
}
warn!("Config file not found: {:?}", config_path);
return false;
}
check_config_for_crm(&config_path)
}
fn check_config_for_crm(config_path: &PathBuf) -> bool {
match std::fs::read_to_string(config_path) {
Ok(content) => {
for line in content.lines() {
let line_lower = line.to_lowercase();
// Check for crm-enabled = true or crm_enabled = true
if (line_lower.contains("crm-enabled") || line_lower.contains("crm_enabled"))
&& line_lower.contains("true")
{
return true;
}
// Also support legacy transfer = true
if line_lower.contains("transfer") && line_lower.contains("true") {
return true;
}
}
false
}
Err(e) => {
error!("Failed to read config file: {}", e);
false
}
}
}
/// Read attendants from attendant.csv
pub fn read_attendants(bot_id: Uuid, work_path: &str) -> Vec<Attendant> {
let attendant_path = PathBuf::from(work_path)
.join(format!("{}.gbai", bot_id))
.join("attendant.csv");
if !attendant_path.exists() {
// Try alternate path
let alt_path = PathBuf::from(work_path).join("attendant.csv");
if alt_path.exists() {
return parse_attendants_csv(&alt_path);
}
warn!("Attendant file not found: {:?}", attendant_path);
return Vec::new();
}
parse_attendants_csv(&attendant_path)
}
fn parse_attendants_csv(path: &PathBuf) -> Vec<Attendant> {
match std::fs::read_to_string(path) {
Ok(content) => {
let mut attendants = Vec::new();
let mut lines = content.lines();
// Skip header
let header = lines.next().unwrap_or("");
let headers: Vec<String> = header.split(',').map(|s| s.trim().to_lowercase()).collect();
// Find column indices
let id_idx = headers.iter().position(|h| h == "id").unwrap_or(0);
let name_idx = headers.iter().position(|h| h == "name").unwrap_or(1);
let channel_idx = headers.iter().position(|h| h == "channel").unwrap_or(2);
let pref_idx = headers.iter().position(|h| h == "preferences").unwrap_or(3);
let dept_idx = headers.iter().position(|h| h == "department");
let alias_idx = headers.iter().position(|h| h == "aliases" || h == "alias");
for line in lines {
if line.trim().is_empty() {
continue;
}
let parts: Vec<&str> = line.split(',').map(|s| s.trim()).collect();
if parts.len() >= 4 {
let department = dept_idx.and_then(|i| parts.get(i).map(|s| s.to_string()));
let aliases = alias_idx
.and_then(|i| parts.get(i))
.map(|s| s.split(';').map(|a| a.trim().to_lowercase()).collect())
.unwrap_or_default();
attendants.push(Attendant {
id: parts.get(id_idx).unwrap_or(&"").to_string(),
name: parts.get(name_idx).unwrap_or(&"").to_string(),
channel: parts.get(channel_idx).unwrap_or(&"all").to_string(),
preferences: parts.get(pref_idx).unwrap_or(&"").to_string(),
department,
aliases,
status: AttendantStatus::Online, // Default to online, will be updated from DB
});
}
}
info!("Loaded {} attendants from CSV", attendants.len());
attendants
}
Err(e) => {
error!("Failed to read attendant file: {}", e);
Vec::new()
}
}
}
/// Find attendant by name, alias, or department
pub fn find_attendant<'a>(
attendants: &'a [Attendant],
name: Option<&str>,
department: Option<&str>,
) -> Option<&'a Attendant> {
if let Some(search_name) = name {
let search_lower = search_name.to_lowercase();
// First try exact name match
if let Some(att) = attendants
.iter()
.find(|a| a.name.to_lowercase() == search_lower)
{
return Some(att);
}
// Try partial name match
if let Some(att) = attendants
.iter()
.find(|a| a.name.to_lowercase().contains(&search_lower))
{
return Some(att);
}
// Try alias match
if let Some(att) = attendants
.iter()
.find(|a| a.aliases.contains(&search_lower))
{
return Some(att);
}
// Try ID match
if let Some(att) = attendants
.iter()
.find(|a| a.id.to_lowercase() == search_lower)
{
return Some(att);
}
}
if let Some(dept) = department {
let dept_lower = dept.to_lowercase();
// Find first online attendant in department
if let Some(att) = attendants.iter().find(|a| {
a.department
.as_ref()
.map(|d| d.to_lowercase() == dept_lower)
.unwrap_or(false)
&& a.status == AttendantStatus::Online
}) {
return Some(att);
}
// Try preferences match for department
if let Some(att) = attendants.iter().find(|a| {
a.preferences.to_lowercase().contains(&dept_lower)
&& a.status == AttendantStatus::Online
}) {
return Some(att);
}
}
// Return first online attendant if no specific match
attendants
.iter()
.find(|a| a.status == AttendantStatus::Online)
}
/// Priority to integer for queue ordering
fn priority_to_int(priority: Option<&str>) -> i32 {
match priority.map(|p| p.to_lowercase()).as_deref() {
Some("urgent") => 3,
Some("high") => 2,
Some("normal") | None => 1,
Some("low") => 0,
_ => 1,
}
}
/// Execute the transfer to human
pub async fn execute_transfer(
state: Arc<AppState>,
session: &UserSession,
request: TransferToHumanRequest,
) -> TransferResult {
let work_path = "./work";
let bot_id = session.bot_id;
// Check if CRM is enabled
if !is_crm_enabled(bot_id, work_path) {
return TransferResult {
success: false,
status: TransferStatus::CrmDisabled,
queue_position: None,
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: None,
message: "CRM features are not enabled. Add 'crm-enabled,true' to config.csv"
.to_string(),
};
}
// Load attendants
let attendants = read_attendants(bot_id, work_path);
if attendants.is_empty() {
return TransferResult {
success: false,
status: TransferStatus::NoAttendants,
queue_position: None,
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: None,
message: "No attendants configured. Create attendant.csv in your .gbai folder"
.to_string(),
};
}
// Find matching attendant
let attendant = find_attendant(
&attendants,
request.name.as_deref(),
request.department.as_deref(),
);
// If specific name was requested but not found
if request.name.is_some() && attendant.is_none() {
return TransferResult {
success: false,
status: TransferStatus::AttendantNotFound,
queue_position: None,
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: None,
message: format!(
"Attendant '{}' not found. Available attendants: {}",
request.name.as_ref().unwrap(),
attendants
.iter()
.map(|a| a.name.as_str())
.collect::<Vec<_>>()
.join(", ")
),
};
}
// Update session to mark as needing human attention
let priority = priority_to_int(request.priority.as_deref());
let transfer_context = serde_json::json!({
"transfer_requested_at": Utc::now().to_rfc3339(),
"transfer_priority": priority,
"transfer_reason": request.reason.clone().unwrap_or_default(),
"transfer_context": request.context.clone().unwrap_or_default(),
"transfer_to_name": request.name.clone(),
"transfer_to_department": request.department.clone(),
"needs_human": true,
"assigned_to": attendant.as_ref().map(|a| a.id.clone()),
"assigned_to_name": attendant.as_ref().map(|a| a.name.clone()),
"status": if attendant.is_some() { "assigned" } else { "queued" },
});
// Update session in database
let session_id = session.id;
let conn = state.conn.clone();
let ctx_data = transfer_context.clone();
let update_result = tokio::task::spawn_blocking(move || {
let mut db_conn = conn
.get()
.map_err(|e| format!("DB connection error: {}", e))?;
use crate::shared::models::schema::user_sessions;
diesel::update(user_sessions::table.filter(user_sessions::id.eq(session_id)))
.set(user_sessions::context_data.eq(ctx_data))
.execute(&mut db_conn)
.map_err(|e| format!("Failed to update session: {}", e))
})
.await;
match update_result {
Ok(Ok(_)) => {
if let Some(att) = attendant {
info!(
"Transfer: Session {} assigned to {} ({})",
session.id, att.name, att.id
);
TransferResult {
success: true,
status: TransferStatus::Assigned,
queue_position: Some(1),
assigned_to: Some(att.id.clone()),
assigned_to_name: Some(att.name.clone()),
estimated_wait_seconds: Some(30),
message: format!(
"You have been connected to {}. They will be with you shortly.",
att.name
),
}
} else {
info!(
"Transfer: Session {} queued for next available attendant",
session.id
);
TransferResult {
success: true,
status: TransferStatus::Queued,
queue_position: Some(1), // TODO: Calculate actual position
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: Some(120),
message: "You have been added to the queue. The next available attendant will assist you.".to_string(),
}
}
}
Ok(Err(e)) => {
error!("Transfer failed: {}", e);
TransferResult {
success: false,
status: TransferStatus::Error,
queue_position: None,
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: None,
message: format!("Transfer failed: {}", e),
}
}
Err(e) => {
error!("Transfer task failed: {:?}", e);
TransferResult {
success: false,
status: TransferStatus::Error,
queue_position: None,
assigned_to: None,
assigned_to_name: None,
estimated_wait_seconds: None,
message: format!("Transfer task failed: {:?}", e),
}
}
}
}
/// Convert TransferResult to Rhai Dynamic
impl TransferResult {
pub fn to_dynamic(&self) -> Dynamic {
let mut map = Map::new();
map.insert("success".into(), Dynamic::from(self.success));
map.insert(
"status".into(),
Dynamic::from(format!("{:?}", self.status).to_lowercase()),
);
map.insert("message".into(), Dynamic::from(self.message.clone()));
if let Some(pos) = self.queue_position {
map.insert("queue_position".into(), Dynamic::from(pos));
}
if let Some(ref id) = self.assigned_to {
map.insert("assigned_to".into(), Dynamic::from(id.clone()));
}
if let Some(ref name) = self.assigned_to_name {
map.insert("assigned_to_name".into(), Dynamic::from(name.clone()));
}
if let Some(wait) = self.estimated_wait_seconds {
map.insert("estimated_wait_seconds".into(), Dynamic::from(wait));
}
Dynamic::from(map)
}
}
/// Register the TRANSFER TO HUMAN keyword with the Rhai engine
pub fn register_transfer_to_human_keyword(
state: Arc<AppState>,
user: UserSession,
engine: &mut Engine,
) {
// TRANSFER TO HUMAN (no arguments - any available)
let state_clone = state.clone();
let user_clone = user.clone();
engine.register_fn("transfer_to_human", move || -> Dynamic {
let state = state_clone.clone();
let session = user_clone.clone();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(async {
execute_transfer(
state,
&session,
TransferToHumanRequest {
name: None,
department: None,
priority: None,
reason: None,
context: None,
},
)
.await
});
result.to_dynamic()
});
// TRANSFER TO HUMAN "name"
let state_clone = state.clone();
let user_clone = user.clone();
engine.register_fn("transfer_to_human", move |name: &str| -> Dynamic {
let state = state_clone.clone();
let session = user_clone.clone();
let name_str = name.to_string();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(async {
execute_transfer(
state,
&session,
TransferToHumanRequest {
name: Some(name_str),
department: None,
priority: None,
reason: None,
context: None,
},
)
.await
});
result.to_dynamic()
});
// TRANSFER TO HUMAN "department", "priority"
let state_clone = state.clone();
let user_clone = user.clone();
engine.register_fn(
"transfer_to_human",
move |department: &str, priority: &str| -> Dynamic {
let state = state_clone.clone();
let session = user_clone.clone();
let dept = department.to_string();
let prio = priority.to_string();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(async {
execute_transfer(
state,
&session,
TransferToHumanRequest {
name: None,
department: Some(dept),
priority: Some(prio),
reason: None,
context: None,
},
)
.await
});
result.to_dynamic()
},
);
// TRANSFER TO HUMAN "department", "priority", "reason"
let state_clone = state.clone();
let user_clone = user.clone();
engine.register_fn(
"transfer_to_human",
move |department: &str, priority: &str, reason: &str| -> Dynamic {
let state = state_clone.clone();
let session = user_clone.clone();
let dept = department.to_string();
let prio = priority.to_string();
let rsn = reason.to_string();
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(async {
execute_transfer(
state,
&session,
TransferToHumanRequest {
name: None,
department: Some(dept),
priority: Some(prio),
reason: Some(rsn),
context: None,
},
)
.await
});
result.to_dynamic()
},
);
// TRANSFER TO HUMAN with Map (for named parameters)
let state_clone = state.clone();
let user_clone = user.clone();
engine.register_fn("transfer_to_human_ex", move |params: Map| -> Dynamic {
let state = state_clone.clone();
let session = user_clone.clone();
let name = params
.get("name")
.and_then(|v| v.clone().try_cast::<String>());
let department = params
.get("department")
.and_then(|v| v.clone().try_cast::<String>());
let priority = params
.get("priority")
.and_then(|v| v.clone().try_cast::<String>());
let reason = params
.get("reason")
.and_then(|v| v.clone().try_cast::<String>());
let context = params
.get("context")
.and_then(|v| v.clone().try_cast::<String>());
let rt = tokio::runtime::Handle::current();
let result = rt.block_on(async {
execute_transfer(
state,
&session,
TransferToHumanRequest {
name,
department,
priority,
reason,
context,
},
)
.await
});
result.to_dynamic()
});
debug!("Registered TRANSFER TO HUMAN keywords");
}
/// Tool schema for LLM integration
pub const TRANSFER_TO_HUMAN_TOOL_SCHEMA: &str = r#"{
"name": "transfer_to_human",
"description": "Transfer the conversation to a human attendant. Use this when the customer explicitly asks to speak with a person, when the issue is too complex for automated handling, or when emotional support is needed.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "If someone wants to talk to somebody specific, provide their name or alias. Leave empty for any available attendant."
},
"department": {
"type": "string",
"description": "Department to transfer to: sales, support, technical, billing, etc.",
"enum": ["sales", "support", "technical", "billing", "general"]
},
"priority": {
"type": "string",
"description": "Priority level for the transfer",
"enum": ["normal", "high", "urgent"],
"default": "normal"
},
"reason": {
"type": "string",
"description": "Brief reason for the transfer to help the attendant understand the context"
}
},
"required": []
}
}"#;
/// Get the tool definition for registration with LLM
pub fn get_tool_definition() -> serde_json::Value {
serde_json::json!({
"type": "function",
"function": {
"name": "transfer_to_human",
"description": "Transfer the conversation to a human attendant. Use this when the customer explicitly asks to speak with a person, when the issue is too complex for automated handling, or when emotional support is needed.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "If someone wants to talk to somebody specific, provide their name or alias. Leave empty for any available attendant."
},
"department": {
"type": "string",
"description": "Department to transfer to: sales, support, technical, billing, etc."
},
"priority": {
"type": "string",
"description": "Priority level for the transfer: normal, high, or urgent",
"default": "normal"
},
"reason": {
"type": "string",
"description": "Brief reason for the transfer to help the attendant understand the context"
}
},
"required": []
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_priority_to_int() {
assert_eq!(priority_to_int(Some("urgent")), 3);
assert_eq!(priority_to_int(Some("high")), 2);
assert_eq!(priority_to_int(Some("normal")), 1);
assert_eq!(priority_to_int(Some("low")), 0);
assert_eq!(priority_to_int(None), 1);
}
#[test]
fn test_find_attendant_by_name() {
let attendants = vec![
Attendant {
id: "att-001".to_string(),
name: "John Smith".to_string(),
channel: "all".to_string(),
preferences: "sales".to_string(),
department: Some("commercial".to_string()),
aliases: vec!["johnny".to_string(), "js".to_string()],
status: AttendantStatus::Online,
},
Attendant {
id: "att-002".to_string(),
name: "Jane Doe".to_string(),
channel: "web".to_string(),
preferences: "support".to_string(),
department: Some("customer-service".to_string()),
aliases: vec![],
status: AttendantStatus::Online,
},
];
// Find by exact name
let found = find_attendant(&attendants, Some("John Smith"), None);
assert!(found.is_some());
assert_eq!(found.unwrap().id, "att-001");
// Find by partial name
let found = find_attendant(&attendants, Some("john"), None);
assert!(found.is_some());
assert_eq!(found.unwrap().id, "att-001");
// Find by alias
let found = find_attendant(&attendants, Some("johnny"), None);
assert!(found.is_some());
assert_eq!(found.unwrap().id, "att-001");
// Find by department
let found = find_attendant(&attendants, None, Some("customer-service"));
assert!(found.is_some());
assert_eq!(found.unwrap().id, "att-002");
}
#[test]
fn test_transfer_result_to_dynamic() {
let result = TransferResult {
success: true,
status: TransferStatus::Assigned,
queue_position: Some(1),
assigned_to: Some("att-001".to_string()),
assigned_to_name: Some("John Smith".to_string()),
estimated_wait_seconds: Some(30),
message: "Connected to John".to_string(),
};
let dynamic = result.to_dynamic();
let map = dynamic.try_cast::<Map>().unwrap();
assert_eq!(
map.get("success")
.unwrap()
.clone()
.try_cast::<bool>()
.unwrap(),
true
);
assert_eq!(
map.get("assigned_to_name")
.unwrap()
.clone()
.try_cast::<String>()
.unwrap(),
"John Smith"
);
}
}

View file

@ -51,6 +51,7 @@ use self::keywords::send_mail::send_mail_keyword;
use self::keywords::send_template::register_send_template_keywords;
use self::keywords::social_media::register_social_media_keywords;
use self::keywords::switch_case::preprocess_switch;
use self::keywords::transfer_to_human::register_transfer_to_human_keyword;
use self::keywords::use_kb::register_use_kb_keyword;
use self::keywords::use_tool::use_tool_keyword;
use self::keywords::use_website::{clear_websites_keyword, use_website_keyword};
@ -173,6 +174,14 @@ impl ScriptService {
// Lead Scoring: SCORE LEAD, GET LEAD SCORE, QUALIFY LEAD, AI SCORE LEAD
register_lead_scoring_keywords(state.clone(), user.clone(), &mut engine);
// ========================================================================
// CRM & HUMAN HANDOFF
// ========================================================================
// TRANSFER TO HUMAN: Bot-to-human handoff for hybrid support workflows
// Supports transfer by name/alias, department, priority, and context
register_transfer_to_human_keyword(state.clone(), user.clone(), &mut engine);
// ========================================================================
// CORE BASIC FUNCTIONS: Math, Date/Time, Validation, Arrays, Error Handling
// ========================================================================

View file

@ -17,7 +17,23 @@ use redis::Client as RedisClient;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
/// Notification sent to attendants via WebSocket/broadcast
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AttendantNotification {
#[serde(rename = "type")]
pub notification_type: String,
pub session_id: String,
pub user_id: String,
pub user_name: Option<String>,
pub user_phone: Option<String>,
pub channel: String,
pub content: String,
pub timestamp: String,
pub assigned_to: Option<String>,
pub priority: i32,
}
/// Type-erased extension storage for AppState
#[derive(Default)]
@ -106,6 +122,9 @@ pub struct AppState {
pub task_engine: Arc<TaskEngine>,
/// Type-erased extension storage for web handlers and other components
pub extensions: Extensions,
/// Broadcast channel for attendant notifications (human handoff)
/// Used to notify attendants of new messages from customers
pub attendant_broadcast: Option<broadcast::Sender<AttendantNotification>>,
}
impl Clone for AppState {
fn clone(&self) -> Self {
@ -133,6 +152,7 @@ impl Clone for AppState {
voice_adapter: Arc::clone(&self.voice_adapter),
task_engine: Arc::clone(&self.task_engine),
extensions: self.extensions.clone(),
attendant_broadcast: self.attendant_broadcast.clone(),
}
}
}

View file

@ -183,6 +183,18 @@ async fn run_axum_server(
api_router = api_router.merge(botserver::sources::configure_sources_routes());
api_router = api_router.merge(botserver::designer::configure_designer_routes());
// Add WhatsApp webhook routes if feature is enabled
#[cfg(feature = "whatsapp")]
{
api_router = api_router.merge(crate::whatsapp::configure());
}
// Add attendance/CRM routes for human handoff
#[cfg(feature = "attendance")]
{
api_router = api_router.merge(crate::attendance::configure_attendance_routes());
}
// Add OAuth authentication routes
api_router = api_router.merge(crate::core::oauth::routes::configure());
@ -616,6 +628,11 @@ async fn main() -> std::io::Result<()> {
// Initialize TaskScheduler (will be set after AppState creation)
let task_scheduler = None;
// Create broadcast channel for attendant notifications (human handoff)
let (attendant_tx, _attendant_rx) = tokio::sync::broadcast::channel::<
botserver::core::shared::state::AttendantNotification,
>(1000);
let app_state = Arc::new(AppState {
drive: Some(drive.clone()),
s3_client: Some(drive),
@ -644,6 +661,7 @@ async fn main() -> std::io::Result<()> {
kb_manager: Some(kb_manager.clone()),
task_engine: task_engine,
extensions: botserver::core::shared::state::Extensions::new(),
attendant_broadcast: Some(attendant_tx),
});
// Initialize TaskScheduler with the AppState

1115
src/whatsapp/mod.rs Normal file

File diff suppressed because it is too large Load diff