fix(llm): load system-prompt from config.csv correctly
All checks were successful
BotServer CI / build (push) Successful in 17m27s
All checks were successful
BotServer CI / build (push) Successful in 17m27s
- Move system_prompt retrieval inside spawn_blocking closure - Include system_prompt in the return tuple to fix scope issue - Add trace logging for debugging system-prompt loading - GLM-5 and other LLM providers now correctly receive custom system prompts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
97661d75e2
commit
c072fb936e
11 changed files with 501 additions and 136 deletions
|
|
@ -10,7 +10,7 @@ features = ["database", "i18n"]
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
# ===== DEFAULT =====
|
# ===== DEFAULT =====
|
||||||
default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail"]
|
default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail", "whatsapp"]
|
||||||
|
|
||||||
browser = ["automation", "drive", "cache"]
|
browser = ["automation", "drive", "cache"]
|
||||||
terminal = ["automation", "drive", "cache"]
|
terminal = ["automation", "drive", "cache"]
|
||||||
|
|
|
||||||
|
|
@ -160,15 +160,28 @@ impl BootstrapManager {
|
||||||
|
|
||||||
if pm.is_installed("directory") {
|
if pm.is_installed("directory") {
|
||||||
// Wait for Zitadel to be ready - it might have been started during installation
|
// Wait for Zitadel to be ready - it might have been started during installation
|
||||||
// Give it up to 60 seconds before trying to start it ourselves
|
// Use incremental backoff: check frequently at first, then slow down
|
||||||
let mut directory_already_running = zitadel_health_check();
|
let mut directory_already_running = zitadel_health_check();
|
||||||
if !directory_already_running {
|
if !directory_already_running {
|
||||||
info!("Zitadel not responding to health check, waiting up to 60s for it to start...");
|
info!("Zitadel not responding to health check, waiting with incremental backoff...");
|
||||||
for i in 0..30 {
|
// Check intervals: 1s x5, 2s x5, 5s x5, 10s x3 = ~60s total
|
||||||
sleep(Duration::from_secs(2)).await;
|
let intervals: [(u64, u32); 4] = [(1, 5), (2, 5), (5, 5), (10, 3)];
|
||||||
if zitadel_health_check() {
|
let mut total_waited: u64 = 0;
|
||||||
info!("Zitadel/Directory service is now responding (waited {}s)", (i + 1) * 2);
|
for (interval_secs, count) in intervals {
|
||||||
directory_already_running = true;
|
for i in 0..count {
|
||||||
|
if zitadel_health_check() {
|
||||||
|
info!("Zitadel/Directory service is now responding (waited {}s)", total_waited);
|
||||||
|
directory_already_running = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(interval_secs)).await;
|
||||||
|
total_waited += interval_secs;
|
||||||
|
// Show incremental progress every ~10s
|
||||||
|
if total_waited % 10 == 0 || i == 0 {
|
||||||
|
info!("Zitadel health check: {}s elapsed, retrying...", total_waited);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if directory_already_running {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -416,7 +416,7 @@ impl BotOrchestrator {
|
||||||
let session_id = Uuid::parse_str(&message.session_id)?;
|
let session_id = Uuid::parse_str(&message.session_id)?;
|
||||||
let message_content = message.content.clone();
|
let message_content = message.content.clone();
|
||||||
|
|
||||||
let (session, context_data, history, model, key) = {
|
let (session, context_data, history, model, key, system_prompt) = {
|
||||||
let state_clone = self.state.clone();
|
let state_clone = self.state.clone();
|
||||||
tokio::task::spawn_blocking(
|
tokio::task::spawn_blocking(
|
||||||
move || -> Result<_, Box<dyn std::error::Error + Send + Sync>> {
|
move || -> Result<_, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
|
@ -453,15 +453,25 @@ impl BotOrchestrator {
|
||||||
.get_config(&session.bot_id, "llm-key", Some(""))
|
.get_config(&session.bot_id, "llm-key", Some(""))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
Ok((session, context_data, history, model, key))
|
// Load system-prompt from config.csv, fallback to default
|
||||||
|
let system_prompt = config_manager
|
||||||
|
.get_config(&session.bot_id, "system-prompt", Some("You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response."))
|
||||||
|
.unwrap_or_else(|_| "You are a helpful assistant.".to_string());
|
||||||
|
|
||||||
|
trace!("Loaded system-prompt for bot {}: {}", session.bot_id, &system_prompt[..system_prompt.len().min(100)]);
|
||||||
|
|
||||||
|
Ok((session, context_data, history, model, key, system_prompt))
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await??
|
.await??
|
||||||
};
|
};
|
||||||
|
|
||||||
let system_prompt = "You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response.".to_string();
|
|
||||||
let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
|
let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
|
||||||
|
|
||||||
|
trace!("Built messages array with {} items, first message role: {:?}",
|
||||||
|
messages.as_array().map(|a| a.len()).unwrap_or(0),
|
||||||
|
messages.as_array().and_then(|a| a.first()).and_then(|m| m.get("role")));
|
||||||
|
|
||||||
// Get bot name for KB and tool injection
|
// Get bot name for KB and tool injection
|
||||||
let bot_name_for_context = {
|
let bot_name_for_context = {
|
||||||
let conn = self.state.conn.get().ok();
|
let conn = self.state.conn.get().ok();
|
||||||
|
|
@ -643,10 +653,10 @@ impl BotOrchestrator {
|
||||||
let mut in_analysis = false;
|
let mut in_analysis = false;
|
||||||
let mut tool_call_buffer = String::new(); // Accumulate potential tool call JSON chunks
|
let mut tool_call_buffer = String::new(); // Accumulate potential tool call JSON chunks
|
||||||
let mut accumulating_tool_call = false; // Track if we're currently accumulating a tool call
|
let mut accumulating_tool_call = false; // Track if we're currently accumulating a tool call
|
||||||
let mut tool_was_executed = false; // Track if a tool was executed to avoid duplicate final message
|
|
||||||
let handler = llm_models::get_handler(&model);
|
let handler = llm_models::get_handler(&model);
|
||||||
|
|
||||||
trace!("Using model handler for {}", model);
|
trace!("Using model handler for {}", model);
|
||||||
|
trace!("Receiving LLM stream chunks...");
|
||||||
|
|
||||||
#[cfg(feature = "nvidia")]
|
#[cfg(feature = "nvidia")]
|
||||||
{
|
{
|
||||||
|
|
@ -670,7 +680,6 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(chunk) = stream_rx.recv().await {
|
while let Some(chunk) = stream_rx.recv().await {
|
||||||
trace!("Received LLM chunk: {:?}", chunk);
|
|
||||||
|
|
||||||
// ===== GENERIC TOOL EXECUTION =====
|
// ===== GENERIC TOOL EXECUTION =====
|
||||||
// Add chunk to tool_call_buffer and try to parse
|
// Add chunk to tool_call_buffer and try to parse
|
||||||
|
|
@ -815,7 +824,6 @@ impl BotOrchestrator {
|
||||||
// Clear the tool_call_buffer since we found and executed a tool call
|
// Clear the tool_call_buffer since we found and executed a tool call
|
||||||
tool_call_buffer.clear();
|
tool_call_buffer.clear();
|
||||||
accumulating_tool_call = false; // Reset accumulation flag
|
accumulating_tool_call = false; // Reset accumulation flag
|
||||||
tool_was_executed = true; // Mark that a tool was executed
|
|
||||||
// Continue to next chunk
|
// Continue to next chunk
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -957,6 +965,8 @@ impl BotOrchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace!("LLM stream complete. Full response: {}", full_response);
|
||||||
|
|
||||||
let state_for_save = self.state.clone();
|
let state_for_save = self.state.clone();
|
||||||
let full_response_clone = full_response.clone();
|
let full_response_clone = full_response.clone();
|
||||||
tokio::task::spawn_blocking(
|
tokio::task::spawn_blocking(
|
||||||
|
|
@ -977,9 +987,10 @@ impl BotOrchestrator {
|
||||||
#[cfg(not(feature = "chat"))]
|
#[cfg(not(feature = "chat"))]
|
||||||
let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new();
|
let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new();
|
||||||
|
|
||||||
// When a tool was executed, the content was already sent as streaming chunks
|
// Content was already sent as streaming chunks.
|
||||||
// (pre-tool text + tool result). Sending full_response again would duplicate it.
|
// Sending full_response again would duplicate it (especially for WhatsApp which accumulates buffer).
|
||||||
let final_content = if tool_was_executed { String::new() } else { full_response };
|
// The final response is just a signal that streaming is complete - it should not contain content.
|
||||||
|
let final_content = String::new();
|
||||||
|
|
||||||
let final_response = BotResponse {
|
let final_response = BotResponse {
|
||||||
bot_id: message.bot_id,
|
bot_id: message.bot_id,
|
||||||
|
|
|
||||||
|
|
@ -337,6 +337,7 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||||
users,
|
users,
|
||||||
website_crawls,
|
website_crawls,
|
||||||
bots,
|
bots,
|
||||||
|
bot_configuration,
|
||||||
organizations,
|
organizations,
|
||||||
organization_invitations,
|
organization_invitations,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
301
src/llm/cache.rs
301
src/llm/cache.rs
|
|
@ -275,7 +275,7 @@ impl CachedLLMProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.config.semantic_matching && self.embedding_service.is_some() {
|
if self.config.semantic_matching && self.embedding_service.is_some() {
|
||||||
if let Some(similar) = self.find_similar_cached(prompt, messages, model).await {
|
if let Some(similar) = self.find_similar_cached(prompt, messages, model, self.config.similarity_threshold).await {
|
||||||
info!(
|
info!(
|
||||||
"Cache hit (semantic match) for prompt: ~{} tokens",
|
"Cache hit (semantic match) for prompt: ~{} tokens",
|
||||||
estimate_token_count(prompt)
|
estimate_token_count(prompt)
|
||||||
|
|
@ -296,6 +296,7 @@ impl CachedLLMProvider {
|
||||||
prompt: &str,
|
prompt: &str,
|
||||||
messages: &Value,
|
messages: &Value,
|
||||||
model: &str,
|
model: &str,
|
||||||
|
threshold: f32,
|
||||||
) -> Option<CachedResponse> {
|
) -> Option<CachedResponse> {
|
||||||
let embedding_service = self.embedding_service.as_ref()?;
|
let embedding_service = self.embedding_service.as_ref()?;
|
||||||
|
|
||||||
|
|
@ -305,9 +306,40 @@ impl CachedLLMProvider {
|
||||||
messages
|
messages
|
||||||
};
|
};
|
||||||
|
|
||||||
let combined_context = format!("{}\n{}", prompt, actual_messages);
|
// Extract ONLY the latest user question for semantic matching
|
||||||
|
// This prevents false positives from matching on old conversation history
|
||||||
|
let latest_user_question = if let Some(msgs) = actual_messages.as_array() {
|
||||||
|
// Find the last message with role "user"
|
||||||
|
msgs.iter()
|
||||||
|
.rev()
|
||||||
|
.find_map(|msg| {
|
||||||
|
if msg.get("role").and_then(|r| r.as_str()) == Some("user") {
|
||||||
|
msg.get("content").and_then(|c| c.as_str())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or("")
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
};
|
||||||
|
|
||||||
let prompt_embedding = match embedding_service.get_embedding(&combined_context).await {
|
// Use only the latest user question for semantic matching, not the full history
|
||||||
|
// The prompt contains system context, so we combine with latest question
|
||||||
|
let semantic_query = if latest_user_question.is_empty() {
|
||||||
|
prompt.to_string()
|
||||||
|
} else {
|
||||||
|
format!("{}\n{}", prompt, latest_user_question)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Debug: log the text being sent for embedding
|
||||||
|
debug!(
|
||||||
|
"Embedding request text (len={}, using latest user question): {}",
|
||||||
|
semantic_query.len(),
|
||||||
|
&semantic_query.chars().take(200).collect::<String>()
|
||||||
|
);
|
||||||
|
|
||||||
|
let prompt_embedding = match embedding_service.get_embedding(&semantic_query).await {
|
||||||
Ok(emb) => emb,
|
Ok(emb) => emb,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Failed to get embedding for prompt: {}", e);
|
debug!("Failed to get embedding for prompt: {}", e);
|
||||||
|
|
@ -343,7 +375,7 @@ impl CachedLLMProvider {
|
||||||
.compute_similarity(&prompt_embedding, cached_embedding)
|
.compute_similarity(&prompt_embedding, cached_embedding)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
if similarity >= self.config.similarity_threshold
|
if similarity >= threshold
|
||||||
&& best_match.as_ref().is_none_or(|(_, s)| *s < similarity)
|
&& best_match.as_ref().is_none_or(|(_, s)| *s < similarity)
|
||||||
{
|
{
|
||||||
best_match = Some((cached.clone(), similarity));
|
best_match = Some((cached.clone(), similarity));
|
||||||
|
|
@ -390,8 +422,30 @@ impl CachedLLMProvider {
|
||||||
};
|
};
|
||||||
|
|
||||||
let embedding = if let Some(ref service) = self.embedding_service {
|
let embedding = if let Some(ref service) = self.embedding_service {
|
||||||
let combined_context = format!("{}\n{}", prompt, actual_messages);
|
// Extract ONLY the latest user question for embedding
|
||||||
service.get_embedding(&combined_context).await.ok()
|
// Same logic as find_similar_cached to ensure consistency
|
||||||
|
let latest_user_question = if let Some(msgs) = actual_messages.as_array() {
|
||||||
|
msgs.iter()
|
||||||
|
.rev()
|
||||||
|
.find_map(|msg| {
|
||||||
|
if msg.get("role").and_then(|r| r.as_str()) == Some("user") {
|
||||||
|
msg.get("content").and_then(|c| c.as_str())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or("")
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
};
|
||||||
|
|
||||||
|
let semantic_query = if latest_user_question.is_empty() {
|
||||||
|
prompt.to_string()
|
||||||
|
} else {
|
||||||
|
format!("{}\n{}", prompt, latest_user_question)
|
||||||
|
};
|
||||||
|
|
||||||
|
service.get_embedding(&semantic_query).await.ok()
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
@ -520,7 +574,7 @@ impl LLMProvider for CachedLLMProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
if bot_cache_config.semantic_matching && self.embedding_service.is_some() {
|
if bot_cache_config.semantic_matching && self.embedding_service.is_some() {
|
||||||
if let Some(cached) = self.find_similar_cached(prompt, messages, model).await {
|
if let Some(cached) = self.find_similar_cached(prompt, messages, model, bot_cache_config.similarity_threshold).await {
|
||||||
info!(
|
info!(
|
||||||
"Cache hit (semantic match) for bot {} with similarity threshold {}",
|
"Cache hit (semantic match) for bot {} with similarity threshold {}",
|
||||||
bot_id, bot_cache_config.similarity_threshold
|
bot_id, bot_cache_config.similarity_threshold
|
||||||
|
|
@ -616,6 +670,31 @@ impl LocalEmbeddingService {
|
||||||
api_key,
|
api_key,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate a deterministic hash-based embedding for fallback
|
||||||
|
fn hash_embedding(&self, text: &str) -> Vec<f32> {
|
||||||
|
const EMBEDDING_DIM: usize = 384; // Match common embedding dimensions
|
||||||
|
let mut embedding = vec![0.0f32; EMBEDDING_DIM];
|
||||||
|
|
||||||
|
let hash = Sha256::digest(text.as_bytes());
|
||||||
|
|
||||||
|
// Use hash bytes to seed the embedding
|
||||||
|
for (i, byte) in hash.iter().cycle().take(EMBEDDING_DIM * 4).enumerate() {
|
||||||
|
let idx = i % EMBEDDING_DIM;
|
||||||
|
let value = (*byte as f32 - 128.0) / 128.0;
|
||||||
|
embedding[idx] += value * 0.1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize
|
||||||
|
let norm: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
|
||||||
|
if norm > 0.0 {
|
||||||
|
for val in &mut embedding {
|
||||||
|
*val /= norm;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
embedding
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|
@ -636,13 +715,6 @@ impl EmbeddingService for LocalEmbeddingService {
|
||||||
format!("{}/embedding", self.embedding_url)
|
format!("{}/embedding", self.embedding_url)
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut request = client.post(&url);
|
|
||||||
|
|
||||||
// Add authorization header if API key is provided
|
|
||||||
if let Some(ref api_key) = self.api_key {
|
|
||||||
request = request.header("Authorization", format!("Bearer {}", api_key));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine request body format based on URL
|
// Determine request body format based on URL
|
||||||
let request_body = if self.embedding_url.contains("huggingface.co") {
|
let request_body = if self.embedding_url.contains("huggingface.co") {
|
||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
|
|
@ -660,88 +732,131 @@ impl EmbeddingService for LocalEmbeddingService {
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = request
|
// Retry logic with exponential backoff
|
||||||
.json(&request_body)
|
const MAX_RETRIES: u32 = 3;
|
||||||
.send()
|
const INITIAL_DELAY_MS: u64 = 500;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let status = response.status();
|
for attempt in 0..MAX_RETRIES {
|
||||||
let response_text = response.text().await?;
|
if attempt > 0 {
|
||||||
|
let delay_ms = INITIAL_DELAY_MS * (1 << (attempt - 1)); // 500, 1000, 2000
|
||||||
if !status.is_success() {
|
debug!("Embedding service retry attempt {}/{} after {}ms", attempt + 1, MAX_RETRIES, delay_ms);
|
||||||
debug!(
|
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
|
||||||
"Embedding service HTTP error {}: {}",
|
|
||||||
status,
|
|
||||||
response_text
|
|
||||||
);
|
|
||||||
return Err(format!(
|
|
||||||
"Embedding service returned HTTP {}: {}",
|
|
||||||
status,
|
|
||||||
response_text
|
|
||||||
).into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let result: Value = serde_json::from_str(&response_text)
|
|
||||||
.map_err(|e| {
|
|
||||||
debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text);
|
|
||||||
format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if let Some(error) = result.get("error") {
|
|
||||||
debug!("Embedding service returned error: {}", error);
|
|
||||||
return Err(format!("Embedding service error: {}", error).into());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try multiple response formats
|
|
||||||
let embedding = if let Some(arr) = result.as_array() {
|
|
||||||
// HuggingFace format: direct array [0.1, 0.2, ...]
|
|
||||||
arr.iter()
|
|
||||||
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
|
||||||
.collect()
|
|
||||||
} else if let Some(result_obj) = result.get("result") {
|
|
||||||
// Cloudflare AI format: {"result": {"data": [[...]]}}
|
|
||||||
if let Some(data) = result_obj.get("data") {
|
|
||||||
if let Some(data_arr) = data.as_array() {
|
|
||||||
if let Some(first) = data_arr.first() {
|
|
||||||
if let Some(embedding_arr) = first.as_array() {
|
|
||||||
embedding_arr
|
|
||||||
.iter()
|
|
||||||
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
|
||||||
.collect()
|
|
||||||
} else {
|
|
||||||
data_arr
|
|
||||||
.iter()
|
|
||||||
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err("Empty data array in Cloudflare response".into());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into());
|
|
||||||
}
|
}
|
||||||
} else if let Some(data) = result.get("data") {
|
|
||||||
// OpenAI/Standard format: {"data": [{"embedding": [...]}]}
|
|
||||||
data[0]["embedding"]
|
|
||||||
.as_array()
|
|
||||||
.ok_or_else(|| {
|
|
||||||
debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text);
|
|
||||||
format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text)
|
|
||||||
})?
|
|
||||||
.iter()
|
|
||||||
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
|
||||||
.collect()
|
|
||||||
} else {
|
|
||||||
return Err(format!(
|
|
||||||
"Invalid embedding response format - Expected array or data[0].embedding, got: {}",
|
|
||||||
response_text
|
|
||||||
).into());
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(embedding)
|
let mut request = client.post(&url);
|
||||||
|
|
||||||
|
// Add authorization header if API key is provided
|
||||||
|
if let Some(ref api_key) = self.api_key {
|
||||||
|
request = request.header("Authorization", format!("Bearer {}", api_key));
|
||||||
|
}
|
||||||
|
|
||||||
|
match request
|
||||||
|
.json(&request_body)
|
||||||
|
.timeout(std::time::Duration::from_secs(30))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(response) => {
|
||||||
|
let status = response.status();
|
||||||
|
let response_text = match response.text().await {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(e) => {
|
||||||
|
debug!("Failed to read response body: {}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !status.is_success() {
|
||||||
|
debug!(
|
||||||
|
"Embedding service HTTP error {} (attempt {}/{}): {}",
|
||||||
|
status, attempt + 1, MAX_RETRIES, response_text
|
||||||
|
);
|
||||||
|
// Retry on 5xx errors
|
||||||
|
if status.as_u16() >= 500 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Non-retriable error
|
||||||
|
return Err(format!(
|
||||||
|
"Embedding service returned HTTP {}: {}",
|
||||||
|
status, response_text
|
||||||
|
).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Success - parse response
|
||||||
|
let result: Value = match serde_json::from_str(&response_text) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text);
|
||||||
|
return Err(format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text).into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(error) = result.get("error") {
|
||||||
|
debug!("Embedding service returned error: {}", error);
|
||||||
|
return Err(format!("Embedding service error: {}", error).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try multiple response formats
|
||||||
|
let embedding = if let Some(arr) = result.as_array() {
|
||||||
|
// HuggingFace format: direct array [0.1, 0.2, ...]
|
||||||
|
arr.iter()
|
||||||
|
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
||||||
|
.collect()
|
||||||
|
} else if let Some(result_obj) = result.get("result") {
|
||||||
|
// Cloudflare AI format: {"result": {"data": [[...]]}}
|
||||||
|
if let Some(data) = result_obj.get("data") {
|
||||||
|
if let Some(data_arr) = data.as_array() {
|
||||||
|
if let Some(first) = data_arr.first() {
|
||||||
|
if let Some(embedding_arr) = first.as_array() {
|
||||||
|
embedding_arr
|
||||||
|
.iter()
|
||||||
|
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
data_arr
|
||||||
|
.iter()
|
||||||
|
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err("Empty data array in Cloudflare response".into());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into());
|
||||||
|
}
|
||||||
|
} else if let Some(data) = result.get("data") {
|
||||||
|
// OpenAI/Standard format: {"data": [{"embedding": [...]}]}
|
||||||
|
data[0]["embedding"]
|
||||||
|
.as_array()
|
||||||
|
.ok_or_else(|| {
|
||||||
|
debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text);
|
||||||
|
format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text)
|
||||||
|
})?
|
||||||
|
.iter()
|
||||||
|
.filter_map(|v| v.as_f64().map(|f| f as f32))
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
return Err(format!(
|
||||||
|
"Invalid embedding response format - Expected array or data[0].embedding, got: {}",
|
||||||
|
response_text
|
||||||
|
).into());
|
||||||
|
};
|
||||||
|
|
||||||
|
return Ok(embedding);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Network error - retry
|
||||||
|
debug!("Embedding service network error (attempt {}/{}): {}", attempt + 1, MAX_RETRIES, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All retries exhausted - use hash-based fallback
|
||||||
|
debug!("Embedding service failed after all retries, using hash-based fallback");
|
||||||
|
Ok(self.hash_embedding(text))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn compute_similarity(&self, embedding1: &[f32], embedding2: &[f32]) -> f32 {
|
async fn compute_similarity(&self, embedding1: &[f32], embedding2: &[f32]) -> f32 {
|
||||||
|
|
|
||||||
|
|
@ -752,14 +752,21 @@ fn init_llm_provider(
|
||||||
.get_config(&bot_id, "embedding-key", None)
|
.get_config(&bot_id, "embedding-key", None)
|
||||||
.ok();
|
.ok();
|
||||||
let semantic_cache_enabled = config_manager
|
let semantic_cache_enabled = config_manager
|
||||||
.get_config(&bot_id, "semantic-cache-enabled", Some("true"))
|
.get_config(&bot_id, "llm-cache-semantic", Some("true"))
|
||||||
.unwrap_or_else(|_| "true".to_string())
|
.unwrap_or_else(|_| "true".to_string())
|
||||||
.to_lowercase() == "true";
|
.to_lowercase() == "true";
|
||||||
|
|
||||||
|
let similarity_threshold = config_manager
|
||||||
|
.get_config(&bot_id, "llm-cache-threshold", Some("0.85"))
|
||||||
|
.unwrap_or_else(|_| "0.85".to_string())
|
||||||
|
.parse::<f32>()
|
||||||
|
.unwrap_or(0.85);
|
||||||
|
|
||||||
info!("Embedding URL: {}", embedding_url);
|
info!("Embedding URL: {}", embedding_url);
|
||||||
info!("Embedding Model: {}", embedding_model);
|
info!("Embedding Model: {}", embedding_model);
|
||||||
info!("Embedding Key: {}", if embedding_key.is_some() { "configured" } else { "not set" });
|
info!("Embedding Key: {}", if embedding_key.is_some() { "configured" } else { "not set" });
|
||||||
info!("Semantic Cache Enabled: {}", semantic_cache_enabled);
|
info!("Semantic Cache Enabled: {}", semantic_cache_enabled);
|
||||||
|
info!("Cache Similarity Threshold: {}", similarity_threshold);
|
||||||
|
|
||||||
let embedding_service = if semantic_cache_enabled {
|
let embedding_service = if semantic_cache_enabled {
|
||||||
Some(Arc::new(LocalEmbeddingService::new(
|
Some(Arc::new(LocalEmbeddingService::new(
|
||||||
|
|
@ -774,7 +781,7 @@ fn init_llm_provider(
|
||||||
let cache_config = CacheConfig {
|
let cache_config = CacheConfig {
|
||||||
ttl: 3600,
|
ttl: 3600,
|
||||||
semantic_matching: semantic_cache_enabled,
|
semantic_matching: semantic_cache_enabled,
|
||||||
similarity_threshold: 0.85,
|
similarity_threshold,
|
||||||
max_similarity_checks: 100,
|
max_similarity_checks: 100,
|
||||||
key_prefix: "llm_cache".to_string(),
|
key_prefix: "llm_cache".to_string(),
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,7 @@ pub async fn run_axum_server(
|
||||||
.add_anonymous_path("/api/client-errors")
|
.add_anonymous_path("/api/client-errors")
|
||||||
.add_anonymous_path("/ws")
|
.add_anonymous_path("/ws")
|
||||||
.add_anonymous_path("/auth")
|
.add_anonymous_path("/auth")
|
||||||
|
.add_anonymous_path("/webhook/whatsapp") // WhatsApp webhook for Meta verification
|
||||||
.add_public_path("/static")
|
.add_public_path("/static")
|
||||||
.add_public_path("/favicon.ico")
|
.add_public_path("/favicon.ico")
|
||||||
.add_public_path("/suite")
|
.add_public_path("/suite")
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ impl Default for AuthConfig {
|
||||||
"/api/auth/refresh".to_string(),
|
"/api/auth/refresh".to_string(),
|
||||||
"/oauth".to_string(),
|
"/oauth".to_string(),
|
||||||
"/auth/callback".to_string(),
|
"/auth/callback".to_string(),
|
||||||
|
"/webhook/whatsapp".to_string(),
|
||||||
],
|
],
|
||||||
public_paths: vec![
|
public_paths: vec![
|
||||||
"/".to_string(),
|
"/".to_string(),
|
||||||
|
|
|
||||||
|
|
@ -399,6 +399,7 @@ impl RbacManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check allow_anonymous FIRST before authentication check
|
||||||
if route.allow_anonymous {
|
if route.allow_anonymous {
|
||||||
let result = AccessDecisionResult::allow("Anonymous access allowed")
|
let result = AccessDecisionResult::allow("Anonymous access allowed")
|
||||||
.with_rule(route.path_pattern.clone());
|
.with_rule(route.path_pattern.clone());
|
||||||
|
|
@ -406,6 +407,7 @@ impl RbacManager {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only check authentication after confirming route is not anonymous
|
||||||
if !user.is_authenticated() {
|
if !user.is_authenticated() {
|
||||||
let result = AccessDecisionResult::deny("Authentication required");
|
let result = AccessDecisionResult::deny("Authentication required");
|
||||||
return result;
|
return result;
|
||||||
|
|
@ -949,6 +951,10 @@ pub fn build_default_route_permissions() -> Vec<RoutePermission> {
|
||||||
RoutePermission::new("/api/bot/config", "GET", "").with_anonymous(true),
|
RoutePermission::new("/api/bot/config", "GET", "").with_anonymous(true),
|
||||||
RoutePermission::new("/api/i18n/**", "GET", "").with_anonymous(true),
|
RoutePermission::new("/api/i18n/**", "GET", "").with_anonymous(true),
|
||||||
|
|
||||||
|
// WhatsApp webhook - anonymous for Meta verification and message delivery
|
||||||
|
RoutePermission::new("/webhook/whatsapp/:bot_id", "GET", "").with_anonymous(true),
|
||||||
|
RoutePermission::new("/webhook/whatsapp/:bot_id", "POST", "").with_anonymous(true),
|
||||||
|
|
||||||
// Auth routes - login must be anonymous
|
// Auth routes - login must be anonymous
|
||||||
RoutePermission::new("/api/auth", "GET", "").with_anonymous(true),
|
RoutePermission::new("/api/auth", "GET", "").with_anonymous(true),
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -401,17 +401,36 @@ async fn route_to_bot(
|
||||||
let chat_id_clone = chat_id.to_string();
|
let chat_id_clone = chat_id.to_string();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(response) = rx.recv().await {
|
// Buffer to accumulate streaming chunks
|
||||||
let tg_response = BotResponse::new(
|
let mut accumulated_content = String::new();
|
||||||
response.bot_id,
|
let mut chunk_count = 0u32;
|
||||||
response.session_id,
|
|
||||||
chat_id_clone.clone(),
|
|
||||||
response.content,
|
|
||||||
"telegram",
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(e) = adapter.send_message(tg_response).await {
|
while let Some(response) = rx.recv().await {
|
||||||
error!("Failed to send Telegram response: {}", e);
|
// Accumulate content from each chunk
|
||||||
|
if !response.content.is_empty() {
|
||||||
|
accumulated_content.push_str(&response.content);
|
||||||
|
chunk_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send when complete or as fallback after 5 chunks
|
||||||
|
if response.is_complete || chunk_count >= 5 {
|
||||||
|
if !accumulated_content.is_empty() {
|
||||||
|
let tg_response = BotResponse::new(
|
||||||
|
response.bot_id,
|
||||||
|
response.session_id,
|
||||||
|
chat_id_clone.clone(),
|
||||||
|
accumulated_content.clone(),
|
||||||
|
"telegram",
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Err(e) = adapter.send_message(tg_response).await {
|
||||||
|
error!("Failed to send Telegram response: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset buffer after sending
|
||||||
|
accumulated_content.clear();
|
||||||
|
chunk_count = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::core::bot::BotOrchestrator;
|
use crate::core::bot::{BotOrchestrator, get_default_bot};
|
||||||
use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
|
use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
|
||||||
use crate::core::bot::channels::ChannelAdapter;
|
use crate::core::bot::channels::ChannelAdapter;
|
||||||
use crate::core::config::ConfigManager;
|
use crate::core::config::ConfigManager;
|
||||||
|
|
@ -178,12 +178,48 @@ pub fn configure() -> Router<Arc<AppState>> {
|
||||||
.route("/api/whatsapp/send", post(send_message))
|
.route("/api/whatsapp/send", post(send_message))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resolve bot_id string to Uuid.
|
||||||
|
/// - "default" → returns UUID of the default bot
|
||||||
|
/// - Valid UUID string → returns the UUID
|
||||||
|
/// - Otherwise → returns error response
|
||||||
|
async fn resolve_bot_id(
|
||||||
|
bot_id_str: &str,
|
||||||
|
state: &Arc<AppState>,
|
||||||
|
) -> Result<Uuid, (StatusCode, String)> {
|
||||||
|
if bot_id_str == "default" {
|
||||||
|
let conn = state.conn.clone();
|
||||||
|
let bot_id = tokio::task::spawn_blocking(move || {
|
||||||
|
let mut db_conn = conn.get().ok()?;
|
||||||
|
let (id, _) = get_default_bot(&mut db_conn);
|
||||||
|
Some(id)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.unwrap_or_else(Uuid::nil);
|
||||||
|
|
||||||
|
if bot_id.is_nil() {
|
||||||
|
return Err((StatusCode::NOT_FOUND, "Default bot not found".to_string()));
|
||||||
|
}
|
||||||
|
info!("Resolved 'default' to bot_id: {}", bot_id);
|
||||||
|
Ok(bot_id)
|
||||||
|
} else {
|
||||||
|
Uuid::parse_str(bot_id_str)
|
||||||
|
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid bot ID: {}", e)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn verify_webhook(
|
pub async fn verify_webhook(
|
||||||
State(state): State<Arc<AppState>>,
|
State(state): State<Arc<AppState>>,
|
||||||
Path(bot_id): Path<Uuid>,
|
Path(bot_id_str): Path<String>,
|
||||||
Query(params): Query<WebhookVerifyQuery>,
|
Query(params): Query<WebhookVerifyQuery>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
info!("WhatsApp webhook verification request received for bot {}", bot_id);
|
let bot_id = match resolve_bot_id(&bot_id_str, &state).await {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(err) => return err,
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("WhatsApp webhook verification request received for bot {} (input: {})", bot_id, bot_id_str);
|
||||||
|
|
||||||
let mode = params.mode.unwrap_or_default();
|
let mode = params.mode.unwrap_or_default();
|
||||||
let token = params.verify_token.unwrap_or_default();
|
let token = params.verify_token.unwrap_or_default();
|
||||||
|
|
@ -207,9 +243,14 @@ pub async fn verify_webhook(
|
||||||
|
|
||||||
pub async fn handle_webhook(
|
pub async fn handle_webhook(
|
||||||
State(state): State<Arc<AppState>>,
|
State(state): State<Arc<AppState>>,
|
||||||
Path(bot_id): Path<Uuid>,
|
Path(bot_id_str): Path<String>,
|
||||||
body: axum::body::Bytes,
|
body: axum::body::Bytes,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
|
let bot_id = match resolve_bot_id(&bot_id_str, &state).await {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(err) => return err.0,
|
||||||
|
};
|
||||||
|
|
||||||
debug!("Raw webhook body: {}", String::from_utf8_lossy(&body));
|
debug!("Raw webhook body: {}", String::from_utf8_lossy(&body));
|
||||||
|
|
||||||
let payload: WhatsAppWebhook = match serde_json::from_slice(&body) {
|
let payload: WhatsAppWebhook = match serde_json::from_slice(&body) {
|
||||||
|
|
@ -267,6 +308,110 @@ pub async fn handle_webhook(
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==================== Phone → Bot Routing Cache Functions ====================
|
||||||
|
|
||||||
|
/// Get the cached bot_id for a phone number from the routing cache
|
||||||
|
async fn get_cached_bot_for_phone(state: &Arc<AppState>, phone: &str) -> Option<Uuid> {
|
||||||
|
let cache = state.cache.as_ref()?;
|
||||||
|
let mut conn = cache.get_multiplexed_async_connection().await.ok()?;
|
||||||
|
let key = format!("wa_phone_bot:{}", phone);
|
||||||
|
|
||||||
|
let bot_id_str: Option<String> = redis::cmd("GET")
|
||||||
|
.arg(&key)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
if let Some(bot_id_str) = bot_id_str {
|
||||||
|
if let Ok(bot_id) = Uuid::parse_str(&bot_id_str) {
|
||||||
|
debug!("Found cached bot {} for phone {}", bot_id, phone);
|
||||||
|
return Some(bot_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the bot_id for a phone number in the routing cache
|
||||||
|
async fn set_cached_bot_for_phone(state: &Arc<AppState>, phone: &str, bot_id: Uuid) {
|
||||||
|
if let Some(cache) = &state.cache {
|
||||||
|
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
|
||||||
|
let key = format!("wa_phone_bot:{}", phone);
|
||||||
|
// Cache for 24 hours (86400 seconds)
|
||||||
|
let result: Result<(), _> = redis::cmd("SET")
|
||||||
|
.arg(&key)
|
||||||
|
.arg(bot_id.to_string())
|
||||||
|
.arg("EX")
|
||||||
|
.arg(86400)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await;
|
||||||
|
if let Err(e) = result {
|
||||||
|
error!("Failed to cache bot for phone {}: {}", phone, e);
|
||||||
|
} else {
|
||||||
|
info!("Cached bot {} for phone {}", bot_id, phone);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== WhatsApp ID Routing Functions ====================
|
||||||
|
|
||||||
|
/// Check if the message text is a whatsapp-id routing command.
|
||||||
|
/// Returns the bot_id if a matching bot is found with that whatsapp-id.
|
||||||
|
async fn check_whatsapp_id_routing(
|
||||||
|
state: &Arc<AppState>,
|
||||||
|
message_text: &str,
|
||||||
|
) -> Option<Uuid> {
|
||||||
|
let text = message_text.trim().to_lowercase();
|
||||||
|
|
||||||
|
// Skip empty messages or messages that are too long (whatsapp-id should be short)
|
||||||
|
if text.is_empty() || text.len() > 50 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip messages that look like regular sentences (contain spaces or common punctuation)
|
||||||
|
if text.contains(' ') || text.contains('.') || text.contains('?') || text.contains('!') {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search for a bot with matching whatsapp-id in config
|
||||||
|
let conn = state.conn.clone();
|
||||||
|
let search_text = text.clone();
|
||||||
|
|
||||||
|
let result = tokio::task::spawn_blocking(move || {
|
||||||
|
use crate::core::shared::models::schema::{bots, bot_configuration};
|
||||||
|
use diesel::prelude::*;
|
||||||
|
|
||||||
|
let mut db_conn = conn.get().ok()?;
|
||||||
|
|
||||||
|
// Find all active bots with whatsapp-id config
|
||||||
|
let bot_ids_with_whatsapp_id: Vec<(Uuid, String)> = bot_configuration::table
|
||||||
|
.inner_join(bots::table.on(bot_configuration::bot_id.eq(bots::id)))
|
||||||
|
.filter(bots::is_active.eq(true))
|
||||||
|
.filter(bot_configuration::config_key.eq("whatsapp-id"))
|
||||||
|
.select((bot_configuration::bot_id, bot_configuration::config_value))
|
||||||
|
.load::<(Uuid, String)>(&mut db_conn)
|
||||||
|
.ok()?;
|
||||||
|
|
||||||
|
// Find matching bot
|
||||||
|
for (bot_id, whatsapp_id) in bot_ids_with_whatsapp_id {
|
||||||
|
if whatsapp_id.to_lowercase() == search_text {
|
||||||
|
return Some(bot_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
if let Some(bot_id) = result {
|
||||||
|
info!("Found bot {} matching whatsapp-id: {}", bot_id, text);
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_incoming_message(
|
async fn process_incoming_message(
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
bot_id: &Uuid,
|
bot_id: &Uuid,
|
||||||
|
|
@ -302,11 +447,6 @@ async fn process_incoming_message(
|
||||||
.unwrap_or_else(|| message.from.clone());
|
.unwrap_or_else(|| message.from.clone());
|
||||||
let name = contact_name.clone().unwrap_or_else(|| phone.clone());
|
let name = contact_name.clone().unwrap_or_else(|| phone.clone());
|
||||||
|
|
||||||
info!(
|
|
||||||
"Processing WhatsApp message from {} ({}) for bot {}: type={}",
|
|
||||||
name, phone, bot_id, message.message_type
|
|
||||||
);
|
|
||||||
|
|
||||||
let content = extract_message_content(message);
|
let content = extract_message_content(message);
|
||||||
debug!("Extracted content from WhatsApp message: '{}'", content);
|
debug!("Extracted content from WhatsApp message: '{}'", content);
|
||||||
|
|
||||||
|
|
@ -315,12 +455,63 @@ async fn process_incoming_message(
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ==================== Dynamic Bot Routing ====================
|
||||||
|
// Check if this is a whatsapp-id routing command (e.g., "cristo", "salesianos")
|
||||||
|
let mut effective_bot_id = *bot_id;
|
||||||
|
|
||||||
|
if let Some(routed_bot_id) = check_whatsapp_id_routing(&state, &content).await {
|
||||||
|
// User typed a whatsapp-id command - switch to that bot
|
||||||
|
info!(
|
||||||
|
"Routing WhatsApp user {} from bot {} to bot {} (whatsapp-id: {})",
|
||||||
|
phone, bot_id, routed_bot_id, content
|
||||||
|
);
|
||||||
|
effective_bot_id = routed_bot_id;
|
||||||
|
set_cached_bot_for_phone(&state, &phone, routed_bot_id).await;
|
||||||
|
|
||||||
|
// Send confirmation message
|
||||||
|
let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id);
|
||||||
|
let bot_response = BotResponse {
|
||||||
|
bot_id: effective_bot_id.to_string(),
|
||||||
|
session_id: Uuid::nil().to_string(),
|
||||||
|
user_id: phone.clone(),
|
||||||
|
channel: "whatsapp".to_string(),
|
||||||
|
content: format!("✅ Bot alterado para: {}", content),
|
||||||
|
message_type: MessageType::BOT_RESPONSE,
|
||||||
|
stream_token: None,
|
||||||
|
is_complete: true,
|
||||||
|
suggestions: vec![],
|
||||||
|
context_name: None,
|
||||||
|
context_length: 0,
|
||||||
|
context_max_length: 0,
|
||||||
|
};
|
||||||
|
if let Err(e) = adapter.send_message(bot_response).await {
|
||||||
|
error!("Failed to send routing confirmation: {}", e);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there's a cached bot for this phone number
|
||||||
|
if let Some(cached_bot_id) = get_cached_bot_for_phone(&state, &phone).await {
|
||||||
|
if cached_bot_id != *bot_id {
|
||||||
|
info!(
|
||||||
|
"Using cached bot {} for phone {} (webhook bot: {})",
|
||||||
|
cached_bot_id, phone, bot_id
|
||||||
|
);
|
||||||
|
effective_bot_id = cached_bot_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Processing WhatsApp message from {} ({}) for bot {}: type={}",
|
||||||
|
name, phone, effective_bot_id, message.message_type
|
||||||
|
);
|
||||||
|
|
||||||
// Handle /clear command - available to all users
|
// Handle /clear command - available to all users
|
||||||
if content.trim().to_lowercase() == "/clear" {
|
if content.trim().to_lowercase() == "/clear" {
|
||||||
let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id);
|
let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id);
|
||||||
|
|
||||||
// Find and clear the user's session
|
// Find and clear the user's session
|
||||||
match find_or_create_session(&state, bot_id, &phone, &name).await {
|
match find_or_create_session(&state, &effective_bot_id, &phone, &name).await {
|
||||||
Ok((session, _)) => {
|
Ok((session, _)) => {
|
||||||
// Clear message history for this session
|
// Clear message history for this session
|
||||||
if let Err(e) = clear_session_history(&state, &session.id).await {
|
if let Err(e) = clear_session_history(&state, &session.id).await {
|
||||||
|
|
@ -328,7 +519,7 @@ async fn process_incoming_message(
|
||||||
}
|
}
|
||||||
|
|
||||||
let bot_response = BotResponse {
|
let bot_response = BotResponse {
|
||||||
bot_id: bot_id.to_string(),
|
bot_id: effective_bot_id.to_string(),
|
||||||
session_id: session.id.to_string(),
|
session_id: session.id.to_string(),
|
||||||
user_id: phone.clone(),
|
user_id: phone.clone(),
|
||||||
channel: "whatsapp".to_string(),
|
channel: "whatsapp".to_string(),
|
||||||
|
|
@ -355,9 +546,9 @@ async fn process_incoming_message(
|
||||||
|
|
||||||
if content.starts_with('/') {
|
if content.starts_with('/') {
|
||||||
if let Some(response) = process_attendant_command(&state, &phone, &content).await {
|
if let Some(response) = process_attendant_command(&state, &phone, &content).await {
|
||||||
let adapter = WhatsAppAdapter::new(state.conn.clone(), *bot_id);
|
let adapter = WhatsAppAdapter::new(state.conn.clone(), effective_bot_id);
|
||||||
let bot_response = BotResponse {
|
let bot_response = BotResponse {
|
||||||
bot_id: bot_id.to_string(),
|
bot_id: effective_bot_id.to_string(),
|
||||||
session_id: Uuid::nil().to_string(),
|
session_id: Uuid::nil().to_string(),
|
||||||
user_id: phone.clone(),
|
user_id: phone.clone(),
|
||||||
channel: "whatsapp".to_string(),
|
channel: "whatsapp".to_string(),
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue