From 1053c86a73bd4b238d6436d3bcf342b53f43d68c Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 10 Mar 2026 17:19:17 -0300 Subject: [PATCH] fix: whatsapp dynamic routing and openai tool call accumulation --- src/basic/keywords/hearing/talk.rs | 68 ++++++++++++++++++++++++------ src/llm/cache.rs | 4 +- src/llm/mod.rs | 51 +++++++++++++++++++--- src/main_module/bootstrap.rs | 4 +- src/whatsapp/mod.rs | 32 +++++++++++++- 5 files changed, 134 insertions(+), 25 deletions(-) diff --git a/src/basic/keywords/hearing/talk.rs b/src/basic/keywords/hearing/talk.rs index 1fc24faa..f1555084 100644 --- a/src/basic/keywords/hearing/talk.rs +++ b/src/basic/keywords/hearing/talk.rs @@ -34,11 +34,29 @@ pub async fn execute_talk( } } + let channel = user_session + .context_data + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("web") + .to_string(); + + let target_user_id = if channel == "whatsapp" { + user_session + .context_data + .get("phone") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string() + } else { + user_session.user_id.to_string() + }; + let response = BotResponse { bot_id: user_session.bot_id.to_string(), - user_id: user_session.user_id.to_string(), + user_id: target_user_id.clone(), session_id: user_session.id.to_string(), - channel: "web".to_string(), + channel: channel.clone(), content: message, message_type: MessageType::BOT_RESPONSE, stream_token: None, @@ -49,20 +67,42 @@ pub async fn execute_talk( context_max_length: 0, }; - let user_id = user_session.id.to_string(); let response_clone = response.clone(); - let web_adapter = Arc::clone(&state.web_adapter); - tokio::spawn(async move { - if let Err(e) = web_adapter - .send_message_to_session(&user_id, response_clone) - .await - { - error!("Failed to send TALK message via web adapter: {}", e); - } else { - trace!("TALK message sent via web adapter"); - } - }); + if channel == "whatsapp" { + use crate::core::bot::channels::ChannelAdapter; + use crate::core::bot::channels::whatsapp::WhatsAppAdapter; + + // WhatsApp expects the phone number as the target + let mut wa_response = response_clone; + wa_response.user_id = target_user_id; + + let pool = state.conn.clone(); + let bot_id = user_session.bot_id; + + tokio::spawn(async move { + let adapter = WhatsAppAdapter::new(pool, bot_id); + if let Err(e) = adapter.send_message(wa_response).await { + error!("Failed to send TALK message via whatsapp adapter: {}", e); + } else { + trace!("TALK message sent via whatsapp adapter"); + } + }); + } else { + let user_id = user_session.id.to_string(); + let web_adapter = Arc::clone(&state.web_adapter); + + tokio::spawn(async move { + if let Err(e) = web_adapter + .send_message_to_session(&user_id, response_clone) + .await + { + error!("Failed to send TALK message via web adapter: {}", e); + } else { + trace!("TALK message sent via web adapter"); + } + }); + } Ok(response) } diff --git a/src/llm/cache.rs b/src/llm/cache.rs index dd4147f9..14403f17 100644 --- a/src/llm/cache.rs +++ b/src/llm/cache.rs @@ -206,8 +206,8 @@ impl CachedLLMProvider { .unwrap_or(self.config.ttl); let semantic_enabled = config_manager - .get_config(&bot_uuid, "llm-cache-semantic", Some("true")) - .unwrap_or_else(|_| "true".to_string()) + .get_config(&bot_uuid, "llm-cache-semantic", Some("false")) + .unwrap_or_else(|_| "false".to_string()) .to_lowercase() == "true"; diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 6b449acb..05ca01f1 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -423,6 +423,9 @@ impl LLMProvider for OpenAIClient { let handler = get_handler(model); let mut stream = response.bytes_stream(); + + // Accumulate tool calls here because OpenAI streams them in fragments + let mut active_tool_calls: Vec = Vec::new(); while let Some(chunk_result) = stream.next().await { let chunk = chunk_result?; @@ -439,12 +442,37 @@ impl LLMProvider for OpenAIClient { // Handle standard OpenAI tool_calls if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { - for tool_call in tool_calls { - // We send the tool_call object as a JSON string so stream_response - // can buffer it and parse it using ToolExecutor::parse_tool_call - if let Some(func) = tool_call.get("function") { - if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { - let _ = tx.send(args.to_string()).await; + for tool_delta in tool_calls { + if let Some(index) = tool_delta["index"].as_u64() { + let idx = index as usize; + if active_tool_calls.len() <= idx { + active_tool_calls.resize(idx + 1, serde_json::json!({ + "id": "", + "type": "function", + "function": { + "name": "", + "arguments": "" + } + })); + } + + let current = &mut active_tool_calls[idx]; + + if let Some(id) = tool_delta["id"].as_str() { + current["id"] = serde_json::Value::String(id.to_string()); + } + + if let Some(func) = tool_delta.get("function") { + if let Some(name) = func.get("name").and_then(|n| n.as_str()) { + current["function"]["name"] = serde_json::Value::String(name.to_string()); + } + if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { + if let Some(existing_args) = current["function"]["arguments"].as_str() { + let mut new_args = existing_args.to_string(); + new_args.push_str(args); + current["function"]["arguments"] = serde_json::Value::String(new_args); + } + } } } } @@ -453,6 +481,17 @@ impl LLMProvider for OpenAIClient { } } } + + // Send accumulated tool calls when stream finishes + for tool_call in active_tool_calls { + if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() { + let tool_call_json = serde_json::json!({ + "type": "tool_call", + "content": tool_call + }).to_string(); + let _ = tx.send(tool_call_json).await; + } + } Ok(()) } diff --git a/src/main_module/bootstrap.rs b/src/main_module/bootstrap.rs index d2cbea14..b2005647 100644 --- a/src/main_module/bootstrap.rs +++ b/src/main_module/bootstrap.rs @@ -755,8 +755,8 @@ fn init_llm_provider( .get_config(&bot_id, "embedding-key", None) .ok(); let semantic_cache_enabled = config_manager - .get_config(&bot_id, "llm-cache-semantic", Some("true")) - .unwrap_or_else(|_| "true".to_string()) + .get_config(&bot_id, "llm-cache-semantic", Some("false")) + .unwrap_or_else(|_| "false".to_string()) .to_lowercase() == "true"; let similarity_threshold = config_manager diff --git a/src/whatsapp/mod.rs b/src/whatsapp/mod.rs index f4d2bf71..79d2098d 100644 --- a/src/whatsapp/mod.rs +++ b/src/whatsapp/mod.rs @@ -178,6 +178,8 @@ pub fn configure() -> Router> { .route("/api/whatsapp/send", post(send_message)) } + + /// Resolve bot_id string to Uuid. /// - "default" → returns UUID of the default bot /// - Valid UUID string → returns the UUID @@ -525,11 +527,32 @@ async fn process_incoming_message( effective_bot_id = routed_bot_id; set_cached_bot_for_phone(&state, &phone, routed_bot_id).await; + // Get or create session for the new bot + let (session, is_new) = match find_or_create_session(&state, &effective_bot_id, &phone, &name).await { + Ok(s) => s, + Err(e) => { + error!("Failed to create session for routed bot: {}", e); + return Ok(()); + } + }; + + // Clear start.bas execution flag for new bot's session + if let Some(cache) = &state.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + let key = format!("start_bas_executed:{}", session.id); + let _: Result<(), redis::RedisError> = redis::cmd("DEL") + .arg(&key) + .query_async(&mut conn) + .await; + debug!("Cleared start.bas flag {} after bot switch to {}", key, routed_bot_id); + } + } + // Send confirmation message let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); let bot_response = BotResponse { bot_id: effective_bot_id.to_string(), - session_id: Uuid::nil().to_string(), + session_id: session.id.to_string(), user_id: phone.clone(), channel: "whatsapp".to_string(), content: format!("✅ Bot alterado para: {}", content), @@ -544,6 +567,13 @@ async fn process_incoming_message( if let Err(e) = adapter.send_message(bot_response).await { error!("Failed to send routing confirmation: {}", e); } + + // Execute start.bas immediately by calling route_to_bot + info!("Executing start.bas for bot '{}' via route_to_bot", routed_bot_id); + if let Err(e) = route_to_bot(&state, &session, "", is_new).await { + error!("Failed to execute start.bas for bot switch: {}", e); + } + return Ok(()); }