Compare commits

...

4 commits

Author SHA1 Message Date
c072fb936e fix(llm): load system-prompt from config.csv correctly
All checks were successful
BotServer CI / build (push) Successful in 17m27s
- Move system_prompt retrieval inside spawn_blocking closure
- Include system_prompt in the return tuple to fix scope issue
- Add trace logging for debugging system-prompt loading
- GLM-5 and other LLM providers now correctly receive custom system prompts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-09 11:55:05 -03:00
97661d75e2 feat(whatsapp): isolate lists as single messages and remove code blocks
- Split list detection into numbered and bullet list items
- Add looks_like_list_start() to detect when list is beginning
- Add looks_like_list_end() to detect when list has ended
- Add split_text_before_list() to separate text before list
- Add split_list_from_text() to separate list from text after
- Update streaming logic to send lists as isolated messages
- Add code block removal (triple backticks and inline backticks)
- Add comprehensive unit tests for list detection functions

Resolves: Lists being mixed with other text in WhatsApp messages
Resolves: JavaScript/C# code leaking into WhatsApp messages
2026-03-08 14:52:59 -03:00
c5d69f9752 feat(whatsapp): Add /clear command for session history
- Add /clear command handler to allow users to clear their conversation history
- Implement clear_session_history() function using diesel delete
- Remove dead code (unused list processing functions)
- Add message deduplication using Redis cache

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-07 18:58:00 -03:00
85b4653899 Fix WhatsApp streaming to send complete messages
- Accumulate all content before sending (no chunking)
- Only send when is_final = true
- Fixes list (li/ul) handling - lists sent as one complete message

- Improves WhatsApp user experience by sending complete formatted responses
- Removes complex chunked logic in favor of simplicity

🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-06 18:52:53 -03:00
12 changed files with 1531 additions and 178 deletions

View file

@ -10,7 +10,7 @@ features = ["database", "i18n"]
[features]
# ===== DEFAULT =====
default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail"]
default = ["chat", "automation", "drive", "tasks", "cache", "directory", "llm", "crawler", "browser", "terminal", "editor", "mail", "whatsapp"]
browser = ["automation", "drive", "cache"]
terminal = ["automation", "drive", "cache"]

View file

@ -160,15 +160,28 @@ impl BootstrapManager {
if pm.is_installed("directory") {
// Wait for Zitadel to be ready - it might have been started during installation
// Give it up to 60 seconds before trying to start it ourselves
// Use incremental backoff: check frequently at first, then slow down
let mut directory_already_running = zitadel_health_check();
if !directory_already_running {
info!("Zitadel not responding to health check, waiting up to 60s for it to start...");
for i in 0..30 {
sleep(Duration::from_secs(2)).await;
if zitadel_health_check() {
info!("Zitadel/Directory service is now responding (waited {}s)", (i + 1) * 2);
directory_already_running = true;
info!("Zitadel not responding to health check, waiting with incremental backoff...");
// Check intervals: 1s x5, 2s x5, 5s x5, 10s x3 = ~60s total
let intervals: [(u64, u32); 4] = [(1, 5), (2, 5), (5, 5), (10, 3)];
let mut total_waited: u64 = 0;
for (interval_secs, count) in intervals {
for i in 0..count {
if zitadel_health_check() {
info!("Zitadel/Directory service is now responding (waited {}s)", total_waited);
directory_already_running = true;
break;
}
sleep(Duration::from_secs(interval_secs)).await;
total_waited += interval_secs;
// Show incremental progress every ~10s
if total_waited % 10 == 0 || i == 0 {
info!("Zitadel health check: {}s elapsed, retrying...", total_waited);
}
}
if directory_already_running {
break;
}
}

View file

@ -50,6 +50,65 @@ impl WhatsAppAdapter {
}
}
/// Sanitize Markdown text for WhatsApp compatibility
/// WhatsApp only supports: *bold*, _italic_, ~strikethrough~, ```monospace```
/// Does NOT support: headers (###), links [text](url), checkboxes, etc.
pub fn sanitize_for_whatsapp(text: &str) -> String {
let mut result = text.to_string();
// Remove Markdown headers (### ## # at start of lines)
result = regex::Regex::new(r"(?m)^#{1,6}\s*")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Convert Markdown links [text](url) to "text: url"
result = regex::Regex::new(r"\[([^\]]+)\]\(([^)]+)\)")
.map(|re| re.replace_all(&result, "$1: $2").to_string())
.unwrap_or(result);
// Remove image syntax ![alt](url) - just keep alt text
result = regex::Regex::new(r"!\[([^\]]*)\]\([^)]+\)")
.map(|re| re.replace_all(&result, "$1").to_string())
.unwrap_or(result);
// Remove checkbox syntax [ ] and [x]
result = regex::Regex::new(r"\[[ x]\]")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Remove horizontal rules (--- or ***)
result = regex::Regex::new(r"(?m)^[-*]{3,}\s*$")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Remove code blocks with triple backticks ```code```
result = regex::Regex::new(r"```[\s\S]*?```")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Remove inline code with single backticks `code`
result = regex::Regex::new(r"`[^`]+`")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Remove HTML tags if any
result = regex::Regex::new(r"<[^>]+>")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
// Clean up multiple consecutive blank lines
result = regex::Regex::new(r"\n{3,}")
.map(|re| re.replace_all(&result, "\n\n").to_string())
.unwrap_or(result);
// Clean up trailing whitespace on lines
result = regex::Regex::new(r"[ \t]+$")
.map(|re| re.replace_all(&result, "").to_string())
.unwrap_or(result);
result.trim().to_string()
}
async fn send_whatsapp_message(
&self,
to: &str,
@ -368,6 +427,154 @@ impl WhatsAppAdapter {
}
}
/// Smart message splitting for WhatsApp's character limit.
/// Splits at paragraph boundaries, keeping lists together.
/// Groups up to 3 paragraphs per message when possible.
pub fn split_message_smart(&self, content: &str, max_length: usize) -> Vec<String> {
let mut parts = Vec::new();
let mut current_part = String::new();
let mut paragraph_count = 0;
// Split content into blocks (paragraphs or list items)
let lines: Vec<&str> = content.lines().collect();
let mut i = 0;
while i < lines.len() {
let line = lines[i];
let is_list_item = line.trim().starts_with("- ")
|| line.trim().starts_with("* ")
|| line.trim().starts_with("")
|| line.trim().starts_with(|c: char| c.is_numeric());
// Check if this is the start of a list
if is_list_item {
// Flush current part if it has content and adding list would exceed limit
if !current_part.is_empty() {
// If we have 3+ paragraphs, flush
if paragraph_count >= 3 || current_part.len() + line.len() > max_length {
parts.push(current_part.trim().to_string());
current_part = String::new();
paragraph_count = 0;
}
}
// Collect entire list as one block
let mut list_block = String::new();
while i < lines.len() {
let list_line = lines[i];
let is_still_list = list_line.trim().starts_with("- ")
|| list_line.trim().starts_with("* ")
|| list_line.trim().starts_with("")
|| list_line.trim().starts_with(|c: char| c.is_numeric())
|| (list_line.trim().is_empty() && i + 1 < lines.len() && {
let next = lines[i + 1];
next.trim().starts_with("- ")
|| next.trim().starts_with("* ")
|| next.trim().starts_with("")
});
if is_still_list || (list_line.trim().is_empty() && !list_block.is_empty()) {
if list_block.len() + list_line.len() + 1 > max_length {
// List is too long, split it
if !list_block.is_empty() {
if !current_part.is_empty() {
parts.push(current_part.trim().to_string());
current_part = String::new();
}
parts.push(list_block.trim().to_string());
list_block = String::new();
}
}
if !list_line.trim().is_empty() {
if !list_block.is_empty() {
list_block.push('\n');
}
list_block.push_str(list_line);
}
i += 1;
} else {
break;
}
}
if !list_block.is_empty() {
if !current_part.is_empty() && current_part.len() + list_block.len() + 1 <= max_length {
current_part.push('\n');
current_part.push_str(&list_block);
} else {
if !current_part.is_empty() {
parts.push(current_part.trim().to_string());
}
parts.push(list_block.trim().to_string());
current_part = String::new();
paragraph_count = 0;
}
}
continue;
}
// Regular paragraph
if !line.trim().is_empty() {
if !current_part.is_empty() {
current_part.push('\n');
}
current_part.push_str(line);
paragraph_count += 1;
// Flush if we have 3 paragraphs or exceeded max length
if paragraph_count >= 3 || current_part.len() > max_length {
parts.push(current_part.trim().to_string());
current_part = String::new();
paragraph_count = 0;
}
} else if !current_part.is_empty() {
// Empty line marks paragraph end
paragraph_count += 1;
if paragraph_count >= 3 {
parts.push(current_part.trim().to_string());
current_part = String::new();
paragraph_count = 0;
}
}
i += 1;
}
// Don't forget the last part
if !current_part.trim().is_empty() {
parts.push(current_part.trim().to_string());
}
// Handle edge case: if a single part exceeds max_length, force split
let mut final_parts = Vec::new();
for part in parts {
if part.len() <= max_length {
final_parts.push(part);
} else {
// Hard split at max_length, trying to break at word boundary
let mut remaining = part.as_str();
while !remaining.is_empty() {
if remaining.len() <= max_length {
final_parts.push(remaining.to_string());
break;
}
// Find last space before max_length
let split_pos = remaining[..max_length]
.rfind(' ')
.unwrap_or(max_length);
final_parts.push(remaining[..split_pos].to_string());
remaining = remaining[split_pos..].trim();
}
}
}
if final_parts.is_empty() {
final_parts.push(content.to_string());
}
final_parts
}
pub fn verify_webhook(&self, token: &str) -> bool {
token == self.webhook_verify_token
}
@ -405,14 +612,43 @@ impl ChannelAdapter for WhatsAppAdapter {
return Err("WhatsApp not configured".into());
}
let message_id = self
.send_whatsapp_message(&response.user_id, &response.content)
.await?;
// WhatsApp has a 4096 character limit per message
// Split message at paragraph/list boundaries
const MAX_WHATSAPP_LENGTH: usize = 4000; // Leave some buffer
info!(
"WhatsApp message sent to {}: {} (message_id: {})",
response.user_id, response.content, message_id
);
// Sanitize Markdown for WhatsApp compatibility
let sanitized_content = Self::sanitize_for_whatsapp(&response.content);
if sanitized_content.len() <= MAX_WHATSAPP_LENGTH {
// Message fits in one part
let message_id = self
.send_whatsapp_message(&response.user_id, &sanitized_content)
.await?;
info!(
"WhatsApp message sent to {}: {} (message_id: {})",
response.user_id, &sanitized_content.chars().take(100).collect::<String>(), message_id
);
} else {
// Split message at appropriate boundaries
let parts = self.split_message_smart(&sanitized_content, MAX_WHATSAPP_LENGTH);
for (i, part) in parts.iter().enumerate() {
let message_id = self
.send_whatsapp_message(&response.user_id, part)
.await?;
info!(
"WhatsApp message part {}/{} sent to {}: {} (message_id: {})",
i + 1, parts.len(), response.user_id, &part.chars().take(50).collect::<String>(), message_id
);
// Small delay between messages to avoid rate limiting
if i < parts.len() - 1 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}
}
Ok(())
}

View file

@ -416,7 +416,7 @@ impl BotOrchestrator {
let session_id = Uuid::parse_str(&message.session_id)?;
let message_content = message.content.clone();
let (session, context_data, history, model, key) = {
let (session, context_data, history, model, key, system_prompt) = {
let state_clone = self.state.clone();
tokio::task::spawn_blocking(
move || -> Result<_, Box<dyn std::error::Error + Send + Sync>> {
@ -453,15 +453,25 @@ impl BotOrchestrator {
.get_config(&session.bot_id, "llm-key", Some(""))
.unwrap_or_default();
Ok((session, context_data, history, model, key))
// Load system-prompt from config.csv, fallback to default
let system_prompt = config_manager
.get_config(&session.bot_id, "system-prompt", Some("You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response."))
.unwrap_or_else(|_| "You are a helpful assistant.".to_string());
trace!("Loaded system-prompt for bot {}: {}", session.bot_id, &system_prompt[..system_prompt.len().min(100)]);
Ok((session, context_data, history, model, key, system_prompt))
},
)
.await??
};
let system_prompt = "You are a helpful assistant with access to tools that can help you complete tasks. When a user's request matches one of your available tools, use the appropriate tool instead of providing a generic response.".to_string();
let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
trace!("Built messages array with {} items, first message role: {:?}",
messages.as_array().map(|a| a.len()).unwrap_or(0),
messages.as_array().and_then(|a| a.first()).and_then(|m| m.get("role")));
// Get bot name for KB and tool injection
let bot_name_for_context = {
let conn = self.state.conn.get().ok();
@ -643,10 +653,10 @@ impl BotOrchestrator {
let mut in_analysis = false;
let mut tool_call_buffer = String::new(); // Accumulate potential tool call JSON chunks
let mut accumulating_tool_call = false; // Track if we're currently accumulating a tool call
let mut tool_was_executed = false; // Track if a tool was executed to avoid duplicate final message
let handler = llm_models::get_handler(&model);
trace!("Using model handler for {}", model);
trace!("Receiving LLM stream chunks...");
#[cfg(feature = "nvidia")]
{
@ -670,7 +680,6 @@ impl BotOrchestrator {
}
while let Some(chunk) = stream_rx.recv().await {
trace!("Received LLM chunk: {:?}", chunk);
// ===== GENERIC TOOL EXECUTION =====
// Add chunk to tool_call_buffer and try to parse
@ -815,7 +824,6 @@ impl BotOrchestrator {
// Clear the tool_call_buffer since we found and executed a tool call
tool_call_buffer.clear();
accumulating_tool_call = false; // Reset accumulation flag
tool_was_executed = true; // Mark that a tool was executed
// Continue to next chunk
continue;
}
@ -957,6 +965,8 @@ impl BotOrchestrator {
}
}
trace!("LLM stream complete. Full response: {}", full_response);
let state_for_save = self.state.clone();
let full_response_clone = full_response.clone();
tokio::task::spawn_blocking(
@ -977,9 +987,10 @@ impl BotOrchestrator {
#[cfg(not(feature = "chat"))]
let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new();
// When a tool was executed, the content was already sent as streaming chunks
// (pre-tool text + tool result). Sending full_response again would duplicate it.
let final_content = if tool_was_executed { String::new() } else { full_response };
// Content was already sent as streaming chunks.
// Sending full_response again would duplicate it (especially for WhatsApp which accumulates buffer).
// The final response is just a signal that streaming is complete - it should not contain content.
let final_content = String::new();
let final_response = BotResponse {
bot_id: message.bot_id,

View file

@ -337,6 +337,7 @@ diesel::allow_tables_to_appear_in_same_query!(
users,
website_crawls,
bots,
bot_configuration,
organizations,
organization_invitations,
);

View file

@ -275,7 +275,7 @@ impl CachedLLMProvider {
}
if self.config.semantic_matching && self.embedding_service.is_some() {
if let Some(similar) = self.find_similar_cached(prompt, messages, model).await {
if let Some(similar) = self.find_similar_cached(prompt, messages, model, self.config.similarity_threshold).await {
info!(
"Cache hit (semantic match) for prompt: ~{} tokens",
estimate_token_count(prompt)
@ -296,6 +296,7 @@ impl CachedLLMProvider {
prompt: &str,
messages: &Value,
model: &str,
threshold: f32,
) -> Option<CachedResponse> {
let embedding_service = self.embedding_service.as_ref()?;
@ -305,9 +306,40 @@ impl CachedLLMProvider {
messages
};
let combined_context = format!("{}\n{}", prompt, actual_messages);
// Extract ONLY the latest user question for semantic matching
// This prevents false positives from matching on old conversation history
let latest_user_question = if let Some(msgs) = actual_messages.as_array() {
// Find the last message with role "user"
msgs.iter()
.rev()
.find_map(|msg| {
if msg.get("role").and_then(|r| r.as_str()) == Some("user") {
msg.get("content").and_then(|c| c.as_str())
} else {
None
}
})
.unwrap_or("")
} else {
""
};
let prompt_embedding = match embedding_service.get_embedding(&combined_context).await {
// Use only the latest user question for semantic matching, not the full history
// The prompt contains system context, so we combine with latest question
let semantic_query = if latest_user_question.is_empty() {
prompt.to_string()
} else {
format!("{}\n{}", prompt, latest_user_question)
};
// Debug: log the text being sent for embedding
debug!(
"Embedding request text (len={}, using latest user question): {}",
semantic_query.len(),
&semantic_query.chars().take(200).collect::<String>()
);
let prompt_embedding = match embedding_service.get_embedding(&semantic_query).await {
Ok(emb) => emb,
Err(e) => {
debug!("Failed to get embedding for prompt: {}", e);
@ -343,7 +375,7 @@ impl CachedLLMProvider {
.compute_similarity(&prompt_embedding, cached_embedding)
.await;
if similarity >= self.config.similarity_threshold
if similarity >= threshold
&& best_match.as_ref().is_none_or(|(_, s)| *s < similarity)
{
best_match = Some((cached.clone(), similarity));
@ -390,8 +422,30 @@ impl CachedLLMProvider {
};
let embedding = if let Some(ref service) = self.embedding_service {
let combined_context = format!("{}\n{}", prompt, actual_messages);
service.get_embedding(&combined_context).await.ok()
// Extract ONLY the latest user question for embedding
// Same logic as find_similar_cached to ensure consistency
let latest_user_question = if let Some(msgs) = actual_messages.as_array() {
msgs.iter()
.rev()
.find_map(|msg| {
if msg.get("role").and_then(|r| r.as_str()) == Some("user") {
msg.get("content").and_then(|c| c.as_str())
} else {
None
}
})
.unwrap_or("")
} else {
""
};
let semantic_query = if latest_user_question.is_empty() {
prompt.to_string()
} else {
format!("{}\n{}", prompt, latest_user_question)
};
service.get_embedding(&semantic_query).await.ok()
} else {
None
};
@ -520,7 +574,7 @@ impl LLMProvider for CachedLLMProvider {
}
if bot_cache_config.semantic_matching && self.embedding_service.is_some() {
if let Some(cached) = self.find_similar_cached(prompt, messages, model).await {
if let Some(cached) = self.find_similar_cached(prompt, messages, model, bot_cache_config.similarity_threshold).await {
info!(
"Cache hit (semantic match) for bot {} with similarity threshold {}",
bot_id, bot_cache_config.similarity_threshold
@ -616,6 +670,31 @@ impl LocalEmbeddingService {
api_key,
}
}
/// Generate a deterministic hash-based embedding for fallback
fn hash_embedding(&self, text: &str) -> Vec<f32> {
const EMBEDDING_DIM: usize = 384; // Match common embedding dimensions
let mut embedding = vec![0.0f32; EMBEDDING_DIM];
let hash = Sha256::digest(text.as_bytes());
// Use hash bytes to seed the embedding
for (i, byte) in hash.iter().cycle().take(EMBEDDING_DIM * 4).enumerate() {
let idx = i % EMBEDDING_DIM;
let value = (*byte as f32 - 128.0) / 128.0;
embedding[idx] += value * 0.1;
}
// Normalize
let norm: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for val in &mut embedding {
*val /= norm;
}
}
embedding
}
}
#[async_trait]
@ -636,13 +715,6 @@ impl EmbeddingService for LocalEmbeddingService {
format!("{}/embedding", self.embedding_url)
};
let mut request = client.post(&url);
// Add authorization header if API key is provided
if let Some(ref api_key) = self.api_key {
request = request.header("Authorization", format!("Bearer {}", api_key));
}
// Determine request body format based on URL
let request_body = if self.embedding_url.contains("huggingface.co") {
serde_json::json!({
@ -660,88 +732,131 @@ impl EmbeddingService for LocalEmbeddingService {
})
};
let response = request
.json(&request_body)
.send()
.await?;
// Retry logic with exponential backoff
const MAX_RETRIES: u32 = 3;
const INITIAL_DELAY_MS: u64 = 500;
let status = response.status();
let response_text = response.text().await?;
if !status.is_success() {
debug!(
"Embedding service HTTP error {}: {}",
status,
response_text
);
return Err(format!(
"Embedding service returned HTTP {}: {}",
status,
response_text
).into());
}
let result: Value = serde_json::from_str(&response_text)
.map_err(|e| {
debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text);
format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text)
})?;
if let Some(error) = result.get("error") {
debug!("Embedding service returned error: {}", error);
return Err(format!("Embedding service error: {}", error).into());
}
// Try multiple response formats
let embedding = if let Some(arr) = result.as_array() {
// HuggingFace format: direct array [0.1, 0.2, ...]
arr.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else if let Some(result_obj) = result.get("result") {
// Cloudflare AI format: {"result": {"data": [[...]]}}
if let Some(data) = result_obj.get("data") {
if let Some(data_arr) = data.as_array() {
if let Some(first) = data_arr.first() {
if let Some(embedding_arr) = first.as_array() {
embedding_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
data_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
}
} else {
return Err("Empty data array in Cloudflare response".into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into());
for attempt in 0..MAX_RETRIES {
if attempt > 0 {
let delay_ms = INITIAL_DELAY_MS * (1 << (attempt - 1)); // 500, 1000, 2000
debug!("Embedding service retry attempt {}/{} after {}ms", attempt + 1, MAX_RETRIES, delay_ms);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
}
} else if let Some(data) = result.get("data") {
// OpenAI/Standard format: {"data": [{"embedding": [...]}]}
data[0]["embedding"]
.as_array()
.ok_or_else(|| {
debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text);
format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text)
})?
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
return Err(format!(
"Invalid embedding response format - Expected array or data[0].embedding, got: {}",
response_text
).into());
};
Ok(embedding)
let mut request = client.post(&url);
// Add authorization header if API key is provided
if let Some(ref api_key) = self.api_key {
request = request.header("Authorization", format!("Bearer {}", api_key));
}
match request
.json(&request_body)
.timeout(std::time::Duration::from_secs(30))
.send()
.await
{
Ok(response) => {
let status = response.status();
let response_text = match response.text().await {
Ok(t) => t,
Err(e) => {
debug!("Failed to read response body: {}", e);
continue;
}
};
if !status.is_success() {
debug!(
"Embedding service HTTP error {} (attempt {}/{}): {}",
status, attempt + 1, MAX_RETRIES, response_text
);
// Retry on 5xx errors
if status.as_u16() >= 500 {
continue;
}
// Non-retriable error
return Err(format!(
"Embedding service returned HTTP {}: {}",
status, response_text
).into());
}
// Success - parse response
let result: Value = match serde_json::from_str(&response_text) {
Ok(r) => r,
Err(e) => {
debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text);
return Err(format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text).into());
}
};
if let Some(error) = result.get("error") {
debug!("Embedding service returned error: {}", error);
return Err(format!("Embedding service error: {}", error).into());
}
// Try multiple response formats
let embedding = if let Some(arr) = result.as_array() {
// HuggingFace format: direct array [0.1, 0.2, ...]
arr.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else if let Some(result_obj) = result.get("result") {
// Cloudflare AI format: {"result": {"data": [[...]]}}
if let Some(data) = result_obj.get("data") {
if let Some(data_arr) = data.as_array() {
if let Some(first) = data_arr.first() {
if let Some(embedding_arr) = first.as_array() {
embedding_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
data_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
}
} else {
return Err("Empty data array in Cloudflare response".into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into());
}
} else if let Some(data) = result.get("data") {
// OpenAI/Standard format: {"data": [{"embedding": [...]}]}
data[0]["embedding"]
.as_array()
.ok_or_else(|| {
debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text);
format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text)
})?
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
return Err(format!(
"Invalid embedding response format - Expected array or data[0].embedding, got: {}",
response_text
).into());
};
return Ok(embedding);
}
Err(e) => {
// Network error - retry
debug!("Embedding service network error (attempt {}/{}): {}", attempt + 1, MAX_RETRIES, e);
}
}
}
// All retries exhausted - use hash-based fallback
debug!("Embedding service failed after all retries, using hash-based fallback");
Ok(self.hash_embedding(text))
}
async fn compute_similarity(&self, embedding1: &[f32], embedding2: &[f32]) -> f32 {

View file

@ -752,14 +752,21 @@ fn init_llm_provider(
.get_config(&bot_id, "embedding-key", None)
.ok();
let semantic_cache_enabled = config_manager
.get_config(&bot_id, "semantic-cache-enabled", Some("true"))
.get_config(&bot_id, "llm-cache-semantic", Some("true"))
.unwrap_or_else(|_| "true".to_string())
.to_lowercase() == "true";
let similarity_threshold = config_manager
.get_config(&bot_id, "llm-cache-threshold", Some("0.85"))
.unwrap_or_else(|_| "0.85".to_string())
.parse::<f32>()
.unwrap_or(0.85);
info!("Embedding URL: {}", embedding_url);
info!("Embedding Model: {}", embedding_model);
info!("Embedding Key: {}", if embedding_key.is_some() { "configured" } else { "not set" });
info!("Semantic Cache Enabled: {}", semantic_cache_enabled);
info!("Cache Similarity Threshold: {}", similarity_threshold);
let embedding_service = if semantic_cache_enabled {
Some(Arc::new(LocalEmbeddingService::new(
@ -774,7 +781,7 @@ fn init_llm_provider(
let cache_config = CacheConfig {
ttl: 3600,
semantic_matching: semantic_cache_enabled,
similarity_threshold: 0.85,
similarity_threshold,
max_similarity_checks: 100,
key_prefix: "llm_cache".to_string(),
};

View file

@ -69,6 +69,7 @@ pub async fn run_axum_server(
.add_anonymous_path("/api/client-errors")
.add_anonymous_path("/ws")
.add_anonymous_path("/auth")
.add_anonymous_path("/webhook/whatsapp") // WhatsApp webhook for Meta verification
.add_public_path("/static")
.add_public_path("/favicon.ico")
.add_public_path("/suite")

View file

@ -30,6 +30,7 @@ impl Default for AuthConfig {
"/api/auth/refresh".to_string(),
"/oauth".to_string(),
"/auth/callback".to_string(),
"/webhook/whatsapp".to_string(),
],
public_paths: vec![
"/".to_string(),

View file

@ -399,6 +399,7 @@ impl RbacManager {
continue;
}
// Check allow_anonymous FIRST before authentication check
if route.allow_anonymous {
let result = AccessDecisionResult::allow("Anonymous access allowed")
.with_rule(route.path_pattern.clone());
@ -406,6 +407,7 @@ impl RbacManager {
return result;
}
// Only check authentication after confirming route is not anonymous
if !user.is_authenticated() {
let result = AccessDecisionResult::deny("Authentication required");
return result;
@ -949,6 +951,10 @@ pub fn build_default_route_permissions() -> Vec<RoutePermission> {
RoutePermission::new("/api/bot/config", "GET", "").with_anonymous(true),
RoutePermission::new("/api/i18n/**", "GET", "").with_anonymous(true),
// WhatsApp webhook - anonymous for Meta verification and message delivery
RoutePermission::new("/webhook/whatsapp/:bot_id", "GET", "").with_anonymous(true),
RoutePermission::new("/webhook/whatsapp/:bot_id", "POST", "").with_anonymous(true),
// Auth routes - login must be anonymous
RoutePermission::new("/api/auth", "GET", "").with_anonymous(true),

View file

@ -401,17 +401,36 @@ async fn route_to_bot(
let chat_id_clone = chat_id.to_string();
tokio::spawn(async move {
while let Some(response) = rx.recv().await {
let tg_response = BotResponse::new(
response.bot_id,
response.session_id,
chat_id_clone.clone(),
response.content,
"telegram",
);
// Buffer to accumulate streaming chunks
let mut accumulated_content = String::new();
let mut chunk_count = 0u32;
if let Err(e) = adapter.send_message(tg_response).await {
error!("Failed to send Telegram response: {}", e);
while let Some(response) = rx.recv().await {
// Accumulate content from each chunk
if !response.content.is_empty() {
accumulated_content.push_str(&response.content);
chunk_count += 1;
}
// Send when complete or as fallback after 5 chunks
if response.is_complete || chunk_count >= 5 {
if !accumulated_content.is_empty() {
let tg_response = BotResponse::new(
response.bot_id,
response.session_id,
chat_id_clone.clone(),
accumulated_content.clone(),
"telegram",
);
if let Err(e) = adapter.send_message(tg_response).await {
error!("Failed to send Telegram response: {}", e);
}
// Reset buffer after sending
accumulated_content.clear();
chunk_count = 0;
}
}
}
});

File diff suppressed because it is too large Load diff