diff --git a/Cargo.toml b/Cargo.toml index 53d6453d..e3381fd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ features = ["database", "i18n"] [features] # ===== DEFAULT ===== -default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail"] +default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail", "whatsapp"] browser = ["automation", "drive", "cache"] terminal = ["automation", "drive", "cache"] diff --git a/src/core/bootstrap/bootstrap_manager.rs b/src/core/bootstrap/bootstrap_manager.rs index 9ed74998..856c87f2 100644 --- a/src/core/bootstrap/bootstrap_manager.rs +++ b/src/core/bootstrap/bootstrap_manager.rs @@ -160,15 +160,28 @@ impl BootstrapManager { if pm.is_installed("directory") { // Wait for Zitadel to be ready - it might have been started during installation - // Give it up to 60 seconds before trying to start it ourselves + // Use incremental backoff: check frequently at first, then slow down let mut directory_already_running = zitadel_health_check(); if !directory_already_running { - info!("Zitadel not responding to health check, waiting up to 60s for it to start..."); - for i in 0..30 { - sleep(Duration::from_secs(2)).await; - if zitadel_health_check() { - info!("Zitadel/Directory service is now responding (waited {}s)", (i + 1) * 2); - directory_already_running = true; + info!("Zitadel not responding to health check, waiting with incremental backoff..."); + // Check intervals: 1s x5, 2s x5, 5s x5, 10s x3 = ~60s total + let intervals: [(u64, u32); 4] = [(1, 5), (2, 5), (5, 5), (10, 3)]; + let mut total_waited: u64 = 0; + for (interval_secs, count) in intervals { + for i in 0..count { + if zitadel_health_check() { + info!("Zitadel/Directory service is now responding (waited {}s)", total_waited); + directory_already_running = true; + break; + } + sleep(Duration::from_secs(interval_secs)).await; + total_waited += interval_secs; + // Show incremental progress every ~10s + if total_waited % 10 == 0 || i == 0 { + info!("Zitadel health check: {}s elapsed, retrying...", total_waited); + } + } + if directory_already_running { break; } } diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index fbe821a1..63245408 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -416,7 +416,7 @@ impl BotOrchestrator { let session_id = Uuid::parse_str(&message.session_id)?; let message_content = message.content.clone(); - let (session, context_data, history, model, key) = { + let (session, context_data, history, model, key, system_prompt) = { let state_clone = self.state.clone(); tokio::task::spawn_blocking( move || -> Result<_, Box> { @@ -453,15 +453,25 @@ impl BotOrchestrator { .get_config(&session.bot_id, "llm-key", Some("")) .unwrap_or_default(); - Ok((session, context_data, history, model, key)) + // Load system-prompt from config.csv, fallback to default + let system_prompt = config_manager + .get_config(&session.bot_id, "system-prompt", Some("You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response.")) + .unwrap_or_else(|_| "You are a helpful assistant.".to_string()); + + trace!("Loaded system-prompt for bot {}: {}", session.bot_id, &system_prompt[..system_prompt.len().min(100)]); + + Ok((session, context_data, history, model, key, system_prompt)) }, ) .await?? }; - let system_prompt = "You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response.".to_string(); let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history); + trace!("Built messages array with {} items, first message role: {:?}", + messages.as_array().map(|a| a.len()).unwrap_or(0), + messages.as_array().and_then(|a| a.first()).and_then(|m| m.get("role"))); + // Get bot name for KB and tool injection let bot_name_for_context = { let conn = self.state.conn.get().ok(); @@ -643,10 +653,10 @@ impl BotOrchestrator { let mut in_analysis = false; let mut tool_call_buffer = String::new(); // Accumulate potential tool call JSON chunks let mut accumulating_tool_call = false; // Track if we're currently accumulating a tool call - let mut tool_was_executed = false; // Track if a tool was executed to avoid duplicate final message let handler = llm_models::get_handler(&model); trace!("Using model handler for {}", model); + trace!("Receiving LLM stream chunks..."); #[cfg(feature = "nvidia")] { @@ -670,7 +680,6 @@ impl BotOrchestrator { } while let Some(chunk) = stream_rx.recv().await { - trace!("Received LLM chunk: {:?}", chunk); // ===== GENERIC TOOL EXECUTION ===== // Add chunk to tool_call_buffer and try to parse @@ -815,7 +824,6 @@ impl BotOrchestrator { // Clear the tool_call_buffer since we found and executed a tool call tool_call_buffer.clear(); accumulating_tool_call = false; // Reset accumulation flag - tool_was_executed = true; // Mark that a tool was executed // Continue to next chunk continue; } @@ -957,6 +965,8 @@ impl BotOrchestrator { } } + trace!("LLM stream complete. Full response: {}", full_response); + let state_for_save = self.state.clone(); let full_response_clone = full_response.clone(); tokio::task::spawn_blocking( @@ -977,9 +987,10 @@ impl BotOrchestrator { #[cfg(not(feature = "chat"))] let suggestions: Vec = Vec::new(); - // When a tool was executed, the content was already sent as streaming chunks - // (pre-tool text + tool result). Sending full_response again would duplicate it. - let final_content = if tool_was_executed { String::new() } else { full_response }; + // Content was already sent as streaming chunks. + // Sending full_response again would duplicate it (especially for WhatsApp which accumulates buffer). + // The final response is just a signal that streaming is complete - it should not contain content. + let final_content = String::new(); let final_response = BotResponse { bot_id: message.bot_id, diff --git a/src/core/shared/schema/core.rs b/src/core/shared/schema/core.rs index 14bd2d62..04bc446a 100644 --- a/src/core/shared/schema/core.rs +++ b/src/core/shared/schema/core.rs @@ -337,6 +337,7 @@ diesel::allow_tables_to_appear_in_same_query!( users, website_crawls, bots, + bot_configuration, organizations, organization_invitations, ); diff --git a/src/llm/cache.rs b/src/llm/cache.rs index 5e3c94e7..dd4147f9 100644 --- a/src/llm/cache.rs +++ b/src/llm/cache.rs @@ -275,7 +275,7 @@ impl CachedLLMProvider { } if self.config.semantic_matching && self.embedding_service.is_some() { - if let Some(similar) = self.find_similar_cached(prompt, messages, model).await { + if let Some(similar) = self.find_similar_cached(prompt, messages, model, self.config.similarity_threshold).await { info!( "Cache hit (semantic match) for prompt: ~{} tokens", estimate_token_count(prompt) @@ -296,6 +296,7 @@ impl CachedLLMProvider { prompt: &str, messages: &Value, model: &str, + threshold: f32, ) -> Option { let embedding_service = self.embedding_service.as_ref()?; @@ -305,9 +306,40 @@ impl CachedLLMProvider { messages }; - let combined_context = format!("{}\n{}", prompt, actual_messages); + // Extract ONLY the latest user question for semantic matching + // This prevents false positives from matching on old conversation history + let latest_user_question = if let Some(msgs) = actual_messages.as_array() { + // Find the last message with role "user" + msgs.iter() + .rev() + .find_map(|msg| { + if msg.get("role").and_then(|r| r.as_str()) == Some("user") { + msg.get("content").and_then(|c| c.as_str()) + } else { + None + } + }) + .unwrap_or("") + } else { + "" + }; - let prompt_embedding = match embedding_service.get_embedding(&combined_context).await { + // Use only the latest user question for semantic matching, not the full history + // The prompt contains system context, so we combine with latest question + let semantic_query = if latest_user_question.is_empty() { + prompt.to_string() + } else { + format!("{}\n{}", prompt, latest_user_question) + }; + + // Debug: log the text being sent for embedding + debug!( + "Embedding request text (len={}, using latest user question): {}", + semantic_query.len(), + &semantic_query.chars().take(200).collect::() + ); + + let prompt_embedding = match embedding_service.get_embedding(&semantic_query).await { Ok(emb) => emb, Err(e) => { debug!("Failed to get embedding for prompt: {}", e); @@ -343,7 +375,7 @@ impl CachedLLMProvider { .compute_similarity(&prompt_embedding, cached_embedding) .await; - if similarity >= self.config.similarity_threshold + if similarity >= threshold && best_match.as_ref().is_none_or(|(_, s)| *s < similarity) { best_match = Some((cached.clone(), similarity)); @@ -390,8 +422,30 @@ impl CachedLLMProvider { }; let embedding = if let Some(ref service) = self.embedding_service { - let combined_context = format!("{}\n{}", prompt, actual_messages); - service.get_embedding(&combined_context).await.ok() + // Extract ONLY the latest user question for embedding + // Same logic as find_similar_cached to ensure consistency + let latest_user_question = if let Some(msgs) = actual_messages.as_array() { + msgs.iter() + .rev() + .find_map(|msg| { + if msg.get("role").and_then(|r| r.as_str()) == Some("user") { + msg.get("content").and_then(|c| c.as_str()) + } else { + None + } + }) + .unwrap_or("") + } else { + "" + }; + + let semantic_query = if latest_user_question.is_empty() { + prompt.to_string() + } else { + format!("{}\n{}", prompt, latest_user_question) + }; + + service.get_embedding(&semantic_query).await.ok() } else { None }; @@ -520,7 +574,7 @@ impl LLMProvider for CachedLLMProvider { } if bot_cache_config.semantic_matching && self.embedding_service.is_some() { - if let Some(cached) = self.find_similar_cached(prompt, messages, model).await { + if let Some(cached) = self.find_similar_cached(prompt, messages, model, bot_cache_config.similarity_threshold).await { info!( "Cache hit (semantic match) for bot {} with similarity threshold {}", bot_id, bot_cache_config.similarity_threshold @@ -616,6 +670,31 @@ impl LocalEmbeddingService { api_key, } } + + /// Generate a deterministic hash-based embedding for fallback + fn hash_embedding(&self, text: &str) -> Vec { + const EMBEDDING_DIM: usize = 384; // Match common embedding dimensions + let mut embedding = vec![0.0f32; EMBEDDING_DIM]; + + let hash = Sha256::digest(text.as_bytes()); + + // Use hash bytes to seed the embedding + for (i, byte) in hash.iter().cycle().take(EMBEDDING_DIM * 4).enumerate() { + let idx = i % EMBEDDING_DIM; + let value = (*byte as f32 - 128.0) / 128.0; + embedding[idx] += value * 0.1; + } + + // Normalize + let norm: f32 = embedding.iter().map(|x| x * x).sum::().sqrt(); + if norm > 0.0 { + for val in &mut embedding { + *val /= norm; + } + } + + embedding + } } #[async_trait] @@ -636,13 +715,6 @@ impl EmbeddingService for LocalEmbeddingService { format!("{}/embedding", self.embedding_url) }; - let mut request = client.post(&url); - - // Add authorization header if API key is provided - if let Some(ref api_key) = self.api_key { - request = request.header("Authorization", format!("Bearer {}", api_key)); - } - // Determine request body format based on URL let request_body = if self.embedding_url.contains("huggingface.co") { serde_json::json!({ @@ -660,88 +732,131 @@ impl EmbeddingService for LocalEmbeddingService { }) }; - let response = request - .json(&request_body) - .send() - .await?; + // Retry logic with exponential backoff + const MAX_RETRIES: u32 = 3; + const INITIAL_DELAY_MS: u64 = 500; - let status = response.status(); - let response_text = response.text().await?; - - if !status.is_success() { - debug!( - "Embedding service HTTP error {}: {}", - status, - response_text - ); - return Err(format!( - "Embedding service returned HTTP {}: {}", - status, - response_text - ).into()); - } - - let result: Value = serde_json::from_str(&response_text) - .map_err(|e| { - debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text); - format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text) - })?; - - if let Some(error) = result.get("error") { - debug!("Embedding service returned error: {}", error); - return Err(format!("Embedding service error: {}", error).into()); - } - - // Try multiple response formats - let embedding = if let Some(arr) = result.as_array() { - // HuggingFace format: direct array [0.1, 0.2, ...] - arr.iter() - .filter_map(|v| v.as_f64().map(|f| f as f32)) - .collect() - } else if let Some(result_obj) = result.get("result") { - // Cloudflare AI format: {"result": {"data": [[...]]}} - if let Some(data) = result_obj.get("data") { - if let Some(data_arr) = data.as_array() { - if let Some(first) = data_arr.first() { - if let Some(embedding_arr) = first.as_array() { - embedding_arr - .iter() - .filter_map(|v| v.as_f64().map(|f| f as f32)) - .collect() - } else { - data_arr - .iter() - .filter_map(|v| v.as_f64().map(|f| f as f32)) - .collect() - } - } else { - return Err("Empty data array in Cloudflare response".into()); - } - } else { - return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into()); - } - } else { - return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into()); + for attempt in 0..MAX_RETRIES { + if attempt > 0 { + let delay_ms = INITIAL_DELAY_MS * (1 << (attempt - 1)); // 500, 1000, 2000 + debug!("Embedding service retry attempt {}/{} after {}ms", attempt + 1, MAX_RETRIES, delay_ms); + tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await; } - } else if let Some(data) = result.get("data") { - // OpenAI/Standard format: {"data": [{"embedding": [...]}]} - data[0]["embedding"] - .as_array() - .ok_or_else(|| { - debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text); - format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text) - })? - .iter() - .filter_map(|v| v.as_f64().map(|f| f as f32)) - .collect() - } else { - return Err(format!( - "Invalid embedding response format - Expected array or data[0].embedding, got: {}", - response_text - ).into()); - }; - Ok(embedding) + let mut request = client.post(&url); + + // Add authorization header if API key is provided + if let Some(ref api_key) = self.api_key { + request = request.header("Authorization", format!("Bearer {}", api_key)); + } + + match request + .json(&request_body) + .timeout(std::time::Duration::from_secs(30)) + .send() + .await + { + Ok(response) => { + let status = response.status(); + let response_text = match response.text().await { + Ok(t) => t, + Err(e) => { + debug!("Failed to read response body: {}", e); + continue; + } + }; + + if !status.is_success() { + debug!( + "Embedding service HTTP error {} (attempt {}/{}): {}", + status, attempt + 1, MAX_RETRIES, response_text + ); + // Retry on 5xx errors + if status.as_u16() >= 500 { + continue; + } + // Non-retriable error + return Err(format!( + "Embedding service returned HTTP {}: {}", + status, response_text + ).into()); + } + + // Success - parse response + let result: Value = match serde_json::from_str(&response_text) { + Ok(r) => r, + Err(e) => { + debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text); + return Err(format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text).into()); + } + }; + + if let Some(error) = result.get("error") { + debug!("Embedding service returned error: {}", error); + return Err(format!("Embedding service error: {}", error).into()); + } + + // Try multiple response formats + let embedding = if let Some(arr) = result.as_array() { + // HuggingFace format: direct array [0.1, 0.2, ...] + arr.iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect() + } else if let Some(result_obj) = result.get("result") { + // Cloudflare AI format: {"result": {"data": [[...]]}} + if let Some(data) = result_obj.get("data") { + if let Some(data_arr) = data.as_array() { + if let Some(first) = data_arr.first() { + if let Some(embedding_arr) = first.as_array() { + embedding_arr + .iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect() + } else { + data_arr + .iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect() + } + } else { + return Err("Empty data array in Cloudflare response".into()); + } + } else { + return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into()); + } + } else { + return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into()); + } + } else if let Some(data) = result.get("data") { + // OpenAI/Standard format: {"data": [{"embedding": [...]}]} + data[0]["embedding"] + .as_array() + .ok_or_else(|| { + debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text); + format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text) + })? + .iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect() + } else { + return Err(format!( + "Invalid embedding response format - Expected array or data[0].embedding, got: {}", + response_text + ).into()); + }; + + return Ok(embedding); + } + Err(e) => { + // Network error - retry + debug!("Embedding service network error (attempt {}/{}): {}", attempt + 1, MAX_RETRIES, e); + } + } + } + + // All retries exhausted - use hash-based fallback + debug!("Embedding service failed after all retries, using hash-based fallback"); + Ok(self.hash_embedding(text)) } async fn compute_similarity(&self, embedding1: &[f32], embedding2: &[f32]) -> f32 { diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index c1973703..cf948c0b 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -752,14 +752,21 @@ fn init_llm_provider( .get_config(&bot_id, "embedding-key", None) .ok(); let semantic_cache_enabled = config_manager - .get_config(&bot_id, "semantic-cache-enabled", Some("true")) + .get_config(&bot_id, "llm-cache-semantic", Some("true")) .unwrap_or_else(|_| "true".to_string()) .to_lowercase() == "true"; + let similarity_threshold = config_manager + .get_config(&bot_id, "llm-cache-threshold", Some("0.85")) + .unwrap_or_else(|_| "0.85".to_string()) + .parse::() + .unwrap_or(0.85); + info!("Embedding URL: {}", embedding_url); info!("Embedding Model: {}", embedding_model); info!("Embedding Key: {}", if embedding_key.is_some() { "configured" } else { "not set" }); info!("Semantic Cache Enabled: {}", semantic_cache_enabled); + info!("Cache Similarity Threshold: {}", similarity_threshold); let embedding_service = if semantic_cache_enabled { Some(Arc::new(LocalEmbeddingService::new( @@ -774,7 +781,7 @@ fn init_llm_provider( let cache_config = CacheConfig { ttl: 3600, semantic_matching: semantic_cache_enabled, - similarity_threshold: 0.85, + similarity_threshold, max_similarity_checks: 100, key_prefix: "llm_cache".to_string(), }; diff --git a/src/main_module/server.rs b/src/main_module/server.rs index 6e98274c..64bb106c 100644 --- a/src/main_module/server.rs +++ b/src/main_module/server.rs @@ -69,6 +69,7 @@ pub async fn run_axum_server( .add_anonymous_path("/api/client-errors") .add_anonymous_path("/ws") .add_anonymous_path("/auth") + .add_anonymous_path("/webhook/whatsapp") // WhatsApp webhook for Meta verification .add_public_path("/static") .add_public_path("/favicon.ico") .add_public_path("/suite") diff --git a/src/security/auth_api/config.rs b/src/security/auth_api/config.rs index 578c5aa4..2c75ea1e 100644 --- a/src/security/auth_api/config.rs +++ b/src/security/auth_api/config.rs @@ -30,6 +30,7 @@ impl Default for AuthConfig { "/api/auth/refresh".to_string(), "/oauth".to_string(), "/auth/callback".to_string(), + "/webhook/whatsapp".to_string(), ], public_paths: vec![ "/".to_string(), diff --git a/src/security/rbac_middleware.rs b/src/security/rbac_middleware.rs index 21c7ebf4..5dc02a2a 100644 --- a/src/security/rbac_middleware.rs +++ b/src/security/rbac_middleware.rs @@ -399,6 +399,7 @@ impl RbacManager { continue; } + // Check allow_anonymous FIRST before authentication check if route.allow_anonymous { let result = AccessDecisionResult::allow("Anonymous access allowed") .with_rule(route.path_pattern.clone()); @@ -406,6 +407,7 @@ impl RbacManager { return result; } + // Only check authentication after confirming route is not anonymous if !user.is_authenticated() { let result = AccessDecisionResult::deny("Authentication required"); return result; @@ -949,6 +951,10 @@ pub fn build_default_route_permissions() -> Vec { RoutePermission::new("/api/bot/config", "GET", "").with_anonymous(true), RoutePermission::new("/api/i18n/**", "GET", "").with_anonymous(true), + // WhatsApp webhook - anonymous for Meta verification and message delivery + RoutePermission::new("/webhook/whatsapp/:bot_id", "GET", "").with_anonymous(true), + RoutePermission::new("/webhook/whatsapp/:bot_id", "POST", "").with_anonymous(true), + // Auth routes - login must be anonymous RoutePermission::new("/api/auth", "GET", "").with_anonymous(true), diff --git a/src/telegram/mod.rs b/src/telegram/mod.rs index fb89d96c..afd46e1a 100644 --- a/src/telegram/mod.rs +++ b/src/telegram/mod.rs @@ -401,17 +401,36 @@ async fn route_to_bot( let chat_id_clone = chat_id.to_string(); tokio::spawn(async move { - while let Some(response) = rx.recv().await { - let tg_response = BotResponse::new( - response.bot_id, - response.session_id, - chat_id_clone.clone(), - response.content, - "telegram", - ); + // Buffer to accumulate streaming chunks + let mut accumulated_content = String::new(); + let mut chunk_count = 0u32; - if let Err(e) = adapter.send_message(tg_response).await { - error!("Failed to send Telegram response: {}", e); + while let Some(response) = rx.recv().await { + // Accumulate content from each chunk + if !response.content.is_empty() { + accumulated_content.push_str(&response.content); + chunk_count += 1; + } + + // Send when complete or as fallback after 5 chunks + if response.is_complete || chunk_count >= 5 { + if !accumulated_content.is_empty() { + let tg_response = BotResponse::new( + response.bot_id, + response.session_id, + chat_id_clone.clone(), + accumulated_content.clone(), + "telegram", + ); + + if let Err(e) = adapter.send_message(tg_response).await { + error!("Failed to send Telegram response: {}", e); + } + + // Reset buffer after sending + accumulated_content.clear(); + chunk_count = 0; + } } } }); diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index 4dba7746..e9057f0b 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -1,4 +1,4 @@ -use crate::core::bot::BotOrchestrator; +use crate::core::bot::{BotOrchestrator, get_default_bot}; use crate::core::bot::channels::whatsapp::WhatsAppAdapter; use crate::core::bot::channels::ChannelAdapter; use crate::core::config::ConfigManager; @@ -178,12 +178,48 @@ pub fn configure() -> Router> { .route("/api/whatsapp/send", post(send_message)) } +/// Resolve bot_id string to Uuid. +/// - "default" → returns UUID of the default bot +/// - Valid UUID string → returns the UUID +/// - Otherwise → returns error response +async fn resolve_bot_id( + bot_id_str: &str, + state: &Arc, +) -> Result { + if bot_id_str == "default" { + let conn = state.conn.clone(); + let bot_id = tokio::task::spawn_blocking(move || { + let mut db_conn = conn.get().ok()?; + let (id, _) = get_default_bot(&mut db_conn); + Some(id) + }) + .await + .ok() + .flatten() + .unwrap_or_else(Uuid::nil); + + if bot_id.is_nil() { + return Err((StatusCode::NOT_FOUND, "Default bot not found".to_string())); + } + info!("Resolved 'default' to bot_id: {}", bot_id); + Ok(bot_id) + } else { + Uuid::parse_str(bot_id_str) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid bot ID: {}", e))) + } +} + pub async fn verify_webhook( State(state): State>, - Path(bot_id): Path, + Path(bot_id_str): Path, Query(params): Query, ) -> impl IntoResponse { - info!("WhatsApp webhook verification request received for bot {}", bot_id); + let bot_id = match resolve_bot_id(&bot_id_str, &state).await { + Ok(id) => id, + Err(err) => return err, + }; + + info!("WhatsApp webhook verification request received for bot {} (input: {})", bot_id, bot_id_str); let mode = params.mode.unwrap_or_default(); let token = params.verify_token.unwrap_or_default(); @@ -207,9 +243,14 @@ pub async fn verify_webhook( pub async fn handle_webhook( State(state): State>, - Path(bot_id): Path, + Path(bot_id_str): Path, body: axum::body::Bytes, ) -> impl IntoResponse { + let bot_id = match resolve_bot_id(&bot_id_str, &state).await { + Ok(id) => id, + Err(err) => return err.0, + }; + debug!("Raw webhook body: {}", String::from_utf8_lossy(&body)); let payload: WhatsAppWebhook = match serde_json::from_slice(&body) { @@ -267,6 +308,110 @@ pub async fn handle_webhook( StatusCode::OK } +// ==================== Phone → Bot Routing Cache Functions ==================== + +/// Get the cached bot_id for a phone number from the routing cache +async fn get_cached_bot_for_phone(state: &Arc, phone: &str) -> Option { + let cache = state.cache.as_ref()?; + let mut conn = cache.get_multiplexed_async_connection().await.ok()?; + let key = format!("wa_phone_bot:{}", phone); + + let bot_id_str: Option = redis::cmd("GET") + .arg(&key) + .query_async(&mut conn) + .await + .ok() + .flatten(); + + if let Some(bot_id_str) = bot_id_str { + if let Ok(bot_id) = Uuid::parse_str(&bot_id_str) { + debug!("Found cached bot {} for phone {}", bot_id, phone); + return Some(bot_id); + } + } + None +} + +/// Set the bot_id for a phone number in the routing cache +async fn set_cached_bot_for_phone(state: &Arc, phone: &str, bot_id: Uuid) { + if let Some(cache) = &state.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + let key = format!("wa_phone_bot:{}", phone); + // Cache for 24 hours (86400 seconds) + let result: Result<(), _> = redis::cmd("SET") + .arg(&key) + .arg(bot_id.to_string()) + .arg("EX") + .arg(86400) + .query_async(&mut conn) + .await; + if let Err(e) = result { + error!("Failed to cache bot for phone {}: {}", phone, e); + } else { + info!("Cached bot {} for phone {}", bot_id, phone); + } + } + } +} + +// ==================== WhatsApp ID Routing Functions ==================== + +/// Check if the message text is a whatsapp-id routing command. +/// Returns the bot_id if a matching bot is found with that whatsapp-id. +async fn check_whatsapp_id_routing( + state: &Arc, + message_text: &str, +) -> Option { + let text = message_text.trim().to_lowercase(); + + // Skip empty messages or messages that are too long (whatsapp-id should be short) + if text.is_empty() || text.len() > 50 { + return None; + } + + // Skip messages that look like regular sentences (contain spaces or common punctuation) + if text.contains(' ') || text.contains('.') || text.contains('?') || text.contains('!') { + return None; + } + + // Search for a bot with matching whatsapp-id in config + let conn = state.conn.clone(); + let search_text = text.clone(); + + let result = tokio::task::spawn_blocking(move || { + use crate::core::shared::models::schema::{bots, bot_configuration}; + use diesel::prelude::*; + + let mut db_conn = conn.get().ok()?; + + // Find all active bots with whatsapp-id config + let bot_ids_with_whatsapp_id: Vec<(Uuid, String)> = bot_configuration::table + .inner_join(bots::table.on(bot_configuration::bot_id.eq(bots::id))) + .filter(bots::is_active.eq(true)) + .filter(bot_configuration::config_key.eq("whatsapp-id")) + .select((bot_configuration::bot_id, bot_configuration::config_value)) + .load::<(Uuid, String)>(&mut db_conn) + .ok()?; + + // Find matching bot + for (bot_id, whatsapp_id) in bot_ids_with_whatsapp_id { + if whatsapp_id.to_lowercase() == search_text { + return Some(bot_id); + } + } + None + }) + .await + .ok() + .flatten(); + + if let Some(bot_id) = result { + info!("Found bot {} matching whatsapp-id: {}", bot_id, text); + } + + result +} + async fn process_incoming_message( state: Arc, bot_id: &Uuid, @@ -302,11 +447,6 @@ async fn process_incoming_message( .unwrap_or_else(|| message.from.clone()); let name = contact_name.clone().unwrap_or_else(|| phone.clone()); - info!( - "Processing WhatsApp message from {} ({}) for bot {}: type={}", - name, phone, bot_id, message.message_type - ); - let content = extract_message_content(message); debug!("Extracted content from WhatsApp message: '{}'", content); @@ -315,12 +455,63 @@ async fn process_incoming_message( return Ok(()); } + // ==================== Dynamic Bot Routing ==================== + // Check if this is a whatsapp-id routing command (e.g., "cristo", "salesianos") + let mut effective_bot_id = *bot_id; + + if let Some(routed_bot_id) = check_whatsapp_id_routing(&state, &content).await { + // User typed a whatsapp-id command - switch to that bot + info!( + "Routing WhatsApp user {} from bot {} to bot {} (whatsapp-id: {})", + phone, bot_id, routed_bot_id, content + ); + effective_bot_id = routed_bot_id; + set_cached_bot_for_phone(&state, &phone, routed_bot_id).await; + + // Send confirmation message + let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); + let bot_response = BotResponse { + bot_id: effective_bot_id.to_string(), + session_id: Uuid::nil().to_string(), + user_id: phone.clone(), + channel: "whatsapp".to_string(), + content: format!("✅ Bot alterado para: {}", content), + message_type: MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions: vec![], + context_name: None, + context_length: 0, + context_max_length: 0, + }; + if let Err(e) = adapter.send_message(bot_response).await { + error!("Failed to send routing confirmation: {}", e); + } + return Ok(()); + } + + // Check if there's a cached bot for this phone number + if let Some(cached_bot_id) = get_cached_bot_for_phone(&state, &phone).await { + if cached_bot_id != *bot_id { + info!( + "Using cached bot {} for phone {} (webhook bot: {})", + cached_bot_id, phone, bot_id + ); + effective_bot_id = cached_bot_id; + } + } + + info!( + "Processing WhatsApp message from {} ({}) for bot {}: type={}", + name, phone, effective_bot_id, message.message_type + ); + // Handle /clear command - available to all users if content.trim().to_lowercase() == "/clear" { - let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id); + let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); // Find and clear the user's session - match find_or_create_session(&state, bot_id, &phone, &name).await { + match find_or_create_session(&state, &effective_bot_id, &phone, &name).await { Ok((session, _)) => { // Clear message history for this session if let Err(e) = clear_session_history(&state, &session.id).await { @@ -328,7 +519,7 @@ async fn process_incoming_message( } let bot_response = BotResponse { - bot_id: bot_id.to_string(), + bot_id: effective_bot_id.to_string(), session_id: session.id.to_string(), user_id: phone.clone(), channel: "whatsapp".to_string(), @@ -355,9 +546,9 @@ async fn process_incoming_message( if content.starts_with('/') { if let Some(response) = process_attendant_command(&state, &phone, &content).await { - let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id); + let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); let bot_response = BotResponse { - bot_id: bot_id.to_string(), + bot_id: effective_bot_id.to_string(), session_id: Uuid::nil().to_string(), user_id: phone.clone(), channel: "whatsapp".to_string(),