fix: whatsapp dynamic routing and openai tool call accumulation
All checks were successful
BotServer CI / build (push) Successful in 13m40s

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-10 17:19:17 -03:00
parent 786d404938
commit 1053c86a73
5 changed files with 134 additions and 25 deletions

View file

@ -34,11 +34,29 @@ pub async fn execute_talk(
} }
} }
let channel = user_session
.context_data
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("web")
.to_string();
let target_user_id = if channel == "whatsapp" {
user_session
.context_data
.get("phone")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
} else {
user_session.user_id.to_string()
};
let response = BotResponse { let response = BotResponse {
bot_id: user_session.bot_id.to_string(), bot_id: user_session.bot_id.to_string(),
user_id: user_session.user_id.to_string(), user_id: target_user_id.clone(),
session_id: user_session.id.to_string(), session_id: user_session.id.to_string(),
channel: "web".to_string(), channel: channel.clone(),
content: message, content: message,
message_type: MessageType::BOT_RESPONSE, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
@ -49,10 +67,31 @@ pub async fn execute_talk(
context_max_length: 0, context_max_length: 0,
}; };
let user_id = user_session.id.to_string();
let response_clone = response.clone(); let response_clone = response.clone();
if channel == "whatsapp" {
use crate::core::bot::channels::ChannelAdapter;
use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
// WhatsApp expects the phone number as the target
let mut wa_response = response_clone;
wa_response.user_id = target_user_id;
let pool = state.conn.clone();
let bot_id = user_session.bot_id;
tokio::spawn(async move {
let adapter = WhatsAppAdapter::new(pool, bot_id);
if let Err(e) = adapter.send_message(wa_response).await {
error!("Failed to send TALK message via whatsapp adapter: {}", e);
} else {
trace!("TALK message sent via whatsapp adapter");
}
});
} else {
let user_id = user_session.id.to_string();
let web_adapter = Arc::clone(&state.web_adapter); let web_adapter = Arc::clone(&state.web_adapter);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = web_adapter if let Err(e) = web_adapter
.send_message_to_session(&user_id, response_clone) .send_message_to_session(&user_id, response_clone)
@ -63,6 +102,7 @@ pub async fn execute_talk(
trace!("TALK message sent via web adapter"); trace!("TALK message sent via web adapter");
} }
}); });
}
Ok(response) Ok(response)
} }

View file

@ -206,8 +206,8 @@ impl CachedLLMProvider {
.unwrap_or(self.config.ttl); .unwrap_or(self.config.ttl);
let semantic_enabled = config_manager let semantic_enabled = config_manager
.get_config(&bot_uuid, "llm-cache-semantic", Some("true")) .get_config(&bot_uuid, "llm-cache-semantic", Some("false"))
.unwrap_or_else(|_| "true".to_string()) .unwrap_or_else(|_| "false".to_string())
.to_lowercase() .to_lowercase()
== "true"; == "true";

View file

@ -424,6 +424,9 @@ impl LLMProvider for OpenAIClient {
let handler = get_handler(model); let handler = get_handler(model);
let mut stream = response.bytes_stream(); let mut stream = response.bytes_stream();
// Accumulate tool calls here because OpenAI streams them in fragments
let mut active_tool_calls: Vec<serde_json::Value> = Vec::new();
while let Some(chunk_result) = stream.next().await { while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?; let chunk = chunk_result?;
let chunk_str = String::from_utf8_lossy(&chunk); let chunk_str = String::from_utf8_lossy(&chunk);
@ -439,12 +442,35 @@ impl LLMProvider for OpenAIClient {
// Handle standard OpenAI tool_calls // Handle standard OpenAI tool_calls
if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() { if let Some(tool_calls) = data["choices"][0]["delta"]["tool_calls"].as_array() {
for tool_call in tool_calls { for tool_delta in tool_calls {
// We send the tool_call object as a JSON string so stream_response if let Some(index) = tool_delta["index"].as_u64() {
// can buffer it and parse it using ToolExecutor::parse_tool_call let idx = index as usize;
if let Some(func) = tool_call.get("function") { if active_tool_calls.len() <= idx {
active_tool_calls.resize(idx + 1, serde_json::json!({
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": ""
}
}));
}
let current = &mut active_tool_calls[idx];
if let Some(id) = tool_delta["id"].as_str() {
current["id"] = serde_json::Value::String(id.to_string());
}
if let Some(func) = tool_delta.get("function") {
if let Some(name) = func.get("name").and_then(|n| n.as_str()) {
current["function"]["name"] = serde_json::Value::String(name.to_string());
}
if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) { if let Some(args) = func.get("arguments").and_then(|a| a.as_str()) {
let _ = tx.send(args.to_string()).await; if let Some(existing_args) = current["function"]["arguments"].as_str() {
let mut new_args = existing_args.to_string();
new_args.push_str(args);
current["function"]["arguments"] = serde_json::Value::String(new_args);
} }
} }
} }
@ -453,6 +479,19 @@ impl LLMProvider for OpenAIClient {
} }
} }
} }
}
}
// Send accumulated tool calls when stream finishes
for tool_call in active_tool_calls {
if !tool_call["function"]["name"].as_str().unwrap_or("").is_empty() {
let tool_call_json = serde_json::json!({
"type": "tool_call",
"content": tool_call
}).to_string();
let _ = tx.send(tool_call_json).await;
}
}
Ok(()) Ok(())
} }

View file

@ -755,8 +755,8 @@ fn init_llm_provider(
.get_config(&bot_id, "embedding-key", None) .get_config(&bot_id, "embedding-key", None)
.ok(); .ok();
let semantic_cache_enabled = config_manager let semantic_cache_enabled = config_manager
.get_config(&bot_id, "llm-cache-semantic", Some("true")) .get_config(&bot_id, "llm-cache-semantic", Some("false"))
.unwrap_or_else(|_| "true".to_string()) .unwrap_or_else(|_| "false".to_string())
.to_lowercase() == "true"; .to_lowercase() == "true";
let similarity_threshold = config_manager let similarity_threshold = config_manager

View file

@ -178,6 +178,8 @@ pub fn configure() -> Router<Arc<AppState>> {
.route("/api/whatsapp/send", post(send_message)) .route("/api/whatsapp/send", post(send_message))
} }
/// Resolve bot_id string to Uuid. /// Resolve bot_id string to Uuid.
/// - "default" → returns UUID of the default bot /// - "default" → returns UUID of the default bot
/// - Valid UUID string → returns the UUID /// - Valid UUID string → returns the UUID
@ -525,11 +527,32 @@ async fn process_incoming_message(
effective_bot_id = routed_bot_id; effective_bot_id = routed_bot_id;
set_cached_bot_for_phone(&state, &phone, routed_bot_id).await; set_cached_bot_for_phone(&state, &phone, routed_bot_id).await;
// Get or create session for the new bot
let (session, is_new) = match find_or_create_session(&state, &effective_bot_id, &phone, &name).await {
Ok(s) => s,
Err(e) => {
error!("Failed to create session for routed bot: {}", e);
return Ok(());
}
};
// Clear start.bas execution flag for new bot's session
if let Some(cache) = &state.cache {
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
let key = format!("start_bas_executed:{}", session.id);
let _: Result<(), redis::RedisError> = redis::cmd("DEL")
.arg(&key)
.query_async(&mut conn)
.await;
debug!("Cleared start.bas flag {} after bot switch to {}", key, routed_bot_id);
}
}
// Send confirmation message // Send confirmation message
let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id); let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id);
let bot_response = BotResponse { let bot_response = BotResponse {
bot_id: effective_bot_id.to_string(), bot_id: effective_bot_id.to_string(),
session_id: Uuid::nil().to_string(), session_id: session.id.to_string(),
user_id: phone.clone(), user_id: phone.clone(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: format!("✅ Bot alterado para: {}", content), content: format!("✅ Bot alterado para: {}", content),
@ -544,6 +567,13 @@ async fn process_incoming_message(
if let Err(e) = adapter.send_message(bot_response).await { if let Err(e) = adapter.send_message(bot_response).await {
error!("Failed to send routing confirmation: {}", e); error!("Failed to send routing confirmation: {}", e);
} }
// Execute start.bas immediately by calling route_to_bot
info!("Executing start.bas for bot '{}' via route_to_bot", routed_bot_id);
if let Err(e) = route_to_bot(&state, &session, "", is_new).await {
error!("Failed to execute start.bas for bot switch: {}", e);
}
return Ok(()); return Ok(());
} }