Add .env.example with comprehensive configuration template

The commit adds a complete example environment configuration file
documenting all available settings for BotServer, including logging,
database, server, drive, LLM, Redis, email, and feature flags.

Also removes hardcoded environment variable usage throughout the
codebase, replacing them with configuration via config.csv or
appropriate defaults. This includes:

- WhatsApp, Teams, Instagram adapter configurations
- Weather API key handling
- Email and directory service configurations
- Console feature conditionally compiles monitoring code
- Improved logging configuration with library suppression
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-28 13:19:03 -03:00
parent 12de4abf13
commit 92dbb7019e
35 changed files with 581 additions and 1481 deletions

60
.env.example Normal file
View file

@ -0,0 +1,60 @@
# Example environment configuration for BotServer
# Copy this file to .env and adjust values as needed
# Logging Configuration
# Set to "trace", "debug", "info", "warn", or "error" for botserver logs
# All external library traces are automatically suppressed
RUST_LOG=info,botserver=info,aws_sigv4=off,aws_smithy_checksums=off,aws_runtime=off,aws_smithy_http_client=off,aws_smithy_runtime=off,aws_smithy_runtime_api=off,aws_sdk_s3=off,aws_config=off,aws_credential_types=off,aws_http=off,aws_sig_auth=off,aws_types=off,mio=off,tokio=off,tokio_util=off,tower=off,tower_http=off,reqwest=off,hyper=off,hyper_util=off,h2=off,rustls=off,rustls_pemfile=off,tokio_rustls=off,tracing=off,tracing_core=off,tracing_subscriber=off,diesel=off,diesel_migrations=off,r2d2=off,serde=off,serde_json=off,axum=off,axum_core=off,tonic=off,prost=off,lettre=off,imap=off,mailparse=off,crossterm=off,ratatui=off,tauri=off,tauri_runtime=off,tauri_utils=off,notify=off,ignore=off,walkdir=off,want=off,try_lock=off,futures=off,base64=off,bytes=off,encoding_rs=off,url=off,percent_encoding=off,ring=off,webpki=off,hickory_resolver=off,hickory_proto=off
# Database Configuration
DATABASE_URL=postgres://postgres:postgres@localhost:5432/botserver
# Server Configuration
SERVER_HOST=127.0.0.1
SERVER_PORT=8080
# Drive (MinIO) Configuration
DRIVE_SERVER=http://localhost:9000
DRIVE_ACCESSKEY=minioadmin
DRIVE_SECRET=minioadmin
# LLM Configuration
LLM_SERVER=http://localhost:8081
LLM_MODEL=llama2
# Redis/Valkey Cache Configuration
REDIS_URL=redis://localhost:6379
# Email Configuration (optional)
# SMTP_HOST=smtp.gmail.com
# SMTP_PORT=587
# SMTP_USER=your-email@gmail.com
# SMTP_PASSWORD=your-app-password
# Directory Service Configuration (optional)
# DIRECTORY_URL=http://localhost:8080
# DIRECTORY_TOKEN=your-directory-token
# Tenant Configuration (optional)
# TENANT_ID=default
# Worker Configuration
# WORKER_COUNT=4
# Features Configuration
# Enable/disable specific features at runtime
# ENABLE_CHAT=true
# ENABLE_AUTOMATION=true
# ENABLE_TASKS=true
# ENABLE_DRIVE=true
# ENABLE_EMAIL=false
# ENABLE_CALENDAR=false
# ENABLE_MEET=false
# Security Configuration
# JWT_SECRET=your-secret-key-here
# SESSION_TIMEOUT=3600
# Development Settings
# DEV_MODE=false
# HOT_RELOAD=false

View file

@ -45,7 +45,7 @@ default = ["ui-server", "chat", "automation", "tasks", "drive", "llm", "redis-ca
# ===== UI FEATURES =====
desktop = ["dep:tauri", "dep:tauri-plugin-dialog", "dep:tauri-plugin-opener", "ui-server"]
ui-server = []
console = ["dep:crossterm", "dep:ratatui"]
console = ["dep:crossterm", "dep:ratatui", "monitoring"]
# ===== CORE INTEGRATIONS =====
vectordb = ["dep:qdrant-client"]

View file

@ -416,19 +416,19 @@ async fn send_whatsapp_file(
let _adapter = WhatsAppAdapter::new(state.conn.clone(), user.bot_id);
// First, upload the file to WhatsApp
let upload_url = format!(
"https://graph.facebook.com/v17.0/{}/media",
std::env::var("WHATSAPP_PHONE_NUMBER_ID").unwrap_or_default()
);
// WhatsApp configuration should be in config.csv
let phone_number_id = ""; // Configure via config.csv: whatsapp-phone-number-id
let upload_url = format!("https://graph.facebook.com/v17.0/{}/media", phone_number_id);
let client = Client::new();
let form = reqwest::multipart::Form::new()
.text("messaging_product", "whatsapp")
.part("file", reqwest::multipart::Part::bytes(file_data));
let access_token = ""; // Configure via config.csv: whatsapp-access-token
let upload_response = client
.post(&upload_url)
.bearer_auth(&std::env::var("WHATSAPP_ACCESS_TOKEN").unwrap_or_default())
.bearer_auth(access_token)
.multipart(form)
.send()
.await?;
@ -443,7 +443,7 @@ async fn send_whatsapp_file(
// Send the file message
let send_url = format!(
"https://graph.facebook.com/v17.0/{}/messages",
std::env::var("WHATSAPP_PHONE_NUMBER_ID").unwrap_or_default()
phone_number_id // Using same phone_number_id from above
);
let payload = json!({
@ -458,7 +458,7 @@ async fn send_whatsapp_file(
client
.post(&send_url)
.bearer_auth(&std::env::var("WHATSAPP_ACCESS_TOKEN").unwrap_or_default())
.bearer_auth(access_token) // Using same access_token from above
.json(&payload)
.send()
.await?;
@ -529,9 +529,9 @@ async fn send_teams_file(
let conversation_id = get_teams_conversation_id(&state, recipient_id).await?;
// Upload to Teams and send as attachment
let access_token = std::env::var("TEAMS_ACCESS_TOKEN").unwrap_or_default();
let service_url = std::env::var("TEAMS_SERVICE_URL")
.unwrap_or_else(|_| "https://smba.trafficmanager.net/apis".to_string());
// Teams configuration should be in config.csv
let access_token = ""; // Configure via config.csv: teams-access-token
let service_url = "https://smba.trafficmanager.net/apis".to_string();
let url = format!(
"{}/v3/conversations/{}/activities",
service_url.trim_end_matches('/'),
@ -550,7 +550,7 @@ async fn send_teams_file(
"type": "message",
"text": caption,
"from": {
"id": std::env::var("TEAMS_APP_ID").unwrap_or_default(),
"id": "", // Configure via config.csv: teams-app-id
"name": "Bot"
},
"conversation": {

View file

@ -138,7 +138,7 @@ fn add_kb_to_session(
};
// Get the tool name from call stack if available
let tool_name = std::env::var("CURRENT_TOOL_NAME").ok();
let tool_name: Option<String> = None;
// Add or update KB association for this session
let assoc_id = Uuid::new_v4();

View file

@ -396,12 +396,9 @@ fn degrees_to_compass(degrees: f64) -> String {
}
fn get_weather_api_key(_state: &AppState) -> Result<String, String> {
// Get API key from environment variable
std::env::var("OPENWEATHERMAP_API_KEY")
.or_else(|_| std::env::var("WEATHER_API_KEY"))
.map_err(|_| {
"Weather API key not found. Please set 'weather-api-key' in config.csv or WEATHER_API_KEY environment variable".to_string()
})
// Weather API key should be configured in config.csv: weather-api-key
// For now, return error indicating configuration needed
Err("Weather API key not configured. Please set 'weather-api-key' in config.csv".to_string())
}
#[cfg(test)]

View file

@ -1,5 +1,7 @@
use crate::config::ConfigManager;
#[cfg(feature = "nvidia")]
use crate::nvidia;
#[cfg(feature = "nvidia")]
use crate::nvidia::get_system_metrics;
use crate::shared::models::schema::bots::dsl::*;
use crate::shared::state::AppState;
@ -32,6 +34,7 @@ impl StatusPanel {
.unwrap()
.as_secs()
% 1000) as usize;
#[cfg(feature = "nvidia")]
let _system_metrics = nvidia::get_system_metrics().unwrap_or_default();
self.cached_content = self.render(None);
self.last_update = std::time::Instant::now();
@ -51,13 +54,19 @@ impl StatusPanel {
let cpu_usage = self.system.global_cpu_usage();
let cpu_bar = Self::create_progress_bar(cpu_usage, 20);
lines.push(format!(" CPU: {:5.1}% {}", cpu_usage, cpu_bar));
let system_metrics = get_system_metrics().unwrap_or_default();
if let Some(gpu_usage) = system_metrics.gpu_usage {
let gpu_bar = Self::create_progress_bar(gpu_usage, 20);
lines.push(format!(" GPU: {:5.1}% {}", gpu_usage, gpu_bar));
} else {
lines.push(" GPU: Not available".to_string());
#[cfg(feature = "nvidia")]
{
let system_metrics = get_system_metrics().unwrap_or_default();
if let Some(gpu_usage) = system_metrics.gpu_usage {
let gpu_bar = Self::create_progress_bar(gpu_usage, 20);
lines.push(format!(" GPU: {:5.1}% {}", gpu_usage, gpu_bar));
} else {
lines.push(" GPU: Not available".to_string());
}
}
#[cfg(not(feature = "nvidia"))]
{
lines.push(" GPU: Feature not enabled".to_string());
}
let total_mem = self.system.total_memory() as f32 / 1024.0 / 1024.0 / 1024.0;

View file

@ -16,13 +16,12 @@ pub struct InstagramAdapter {
impl InstagramAdapter {
pub fn new() -> Self {
// Load from environment variables (would be from config.csv in production)
let access_token = std::env::var("INSTAGRAM_ACCESS_TOKEN").unwrap_or_default();
let verify_token = std::env::var("INSTAGRAM_VERIFY_TOKEN")
.unwrap_or_else(|_| "webhook_verify".to_string());
let page_id = std::env::var("INSTAGRAM_PAGE_ID").unwrap_or_default();
// Load from config.csv in production
let access_token = String::new(); // Configure via config.csv: instagram-access-token
let verify_token = "webhook_verify".to_string(); // Configure via config.csv: instagram-verify-token
let page_id = String::new(); // Configure via config.csv: instagram-page-id
let api_version = "v17.0".to_string();
let instagram_account_id = std::env::var("INSTAGRAM_ACCOUNT_ID").unwrap_or_default();
let instagram_account_id = String::new(); // Configure via config.csv: instagram-account-id
Self {
access_token,

View file

@ -24,15 +24,15 @@ impl TeamsAdapter {
// Load from bot_configuration table with fallback to environment variables
let app_id = config_manager
.get_config(&bot_id, "teams-app-id", None)
.unwrap_or_else(|_| std::env::var("TEAMS_APP_ID").unwrap_or_default());
.unwrap_or_default();
let app_password = config_manager
.get_config(&bot_id, "teams-app-password", None)
.unwrap_or_else(|_| std::env::var("TEAMS_APP_PASSWORD").unwrap_or_default());
.unwrap_or_default();
let tenant_id = config_manager
.get_config(&bot_id, "teams-tenant-id", None)
.unwrap_or_else(|_| std::env::var("TEAMS_TENANT_ID").unwrap_or_default());
.unwrap_or_default();
let service_url = config_manager
.get_config(
@ -40,14 +40,11 @@ impl TeamsAdapter {
"teams-service-url",
Some("https://smba.trafficmanager.net"),
)
.unwrap_or_else(|_| {
std::env::var("TEAMS_SERVICE_URL")
.unwrap_or_else(|_| "https://smba.trafficmanager.net".to_string())
});
.unwrap_or_else(|_| "https://smba.trafficmanager.net".to_string());
let teams_bot_id = config_manager
.get_config(&bot_id, "teams-bot-id", None)
.unwrap_or_else(|_| std::env::var("TEAMS_BOT_ID").unwrap_or_else(|_| app_id.clone()));
.unwrap_or_else(|_| app_id.clone());
Self {
app_id,

View file

@ -24,22 +24,19 @@ impl WhatsAppAdapter {
// Load from bot_configuration table with fallback to environment variables
let api_key = config_manager
.get_config(&bot_id, "whatsapp-api-key", None)
.unwrap_or_else(|_| std::env::var("WHATSAPP_API_KEY").unwrap_or_default());
.unwrap_or_default();
let phone_number_id = config_manager
.get_config(&bot_id, "whatsapp-phone-number-id", None)
.unwrap_or_else(|_| std::env::var("WHATSAPP_PHONE_NUMBER_ID").unwrap_or_default());
.unwrap_or_default();
let webhook_verify_token = config_manager
.get_config(&bot_id, "whatsapp-verify-token", Some("webhook_verify"))
.unwrap_or_else(|_| {
std::env::var("WHATSAPP_VERIFY_TOKEN")
.unwrap_or_else(|_| "webhook_verify".to_string())
});
let verify_token = config_manager
.get_config(&bot_id, "whatsapp-verify-token", None)
.unwrap_or_else(|_| "webhook_verify".to_string());
let business_account_id = config_manager
.get_config(&bot_id, "whatsapp-business-account-id", None)
.unwrap_or_else(|_| std::env::var("WHATSAPP_BUSINESS_ACCOUNT_ID").unwrap_or_default());
.unwrap_or_default();
let api_version = config_manager
.get_config(&bot_id, "whatsapp-api-version", Some("v17.0"))
@ -48,7 +45,7 @@ impl WhatsAppAdapter {
Self {
api_key,
phone_number_id,
webhook_verify_token,
webhook_verify_token: verify_token,
_business_account_id: business_account_id,
api_version,
}

View file

@ -127,8 +127,7 @@ impl BotOrchestrator {
.await??
};
let system_prompt = std::env::var("SYSTEM_PROMPT")
.unwrap_or_else(|_| "You are a helpful assistant.".to_string());
let system_prompt = "You are a helpful assistant.".to_string();
let mut messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
// Inject bot_id into messages for cache system
@ -159,6 +158,9 @@ impl BotOrchestrator {
let mut in_analysis = false;
let handler = llm_models::get_handler(&model);
// Log which handler is being used for thinking detection
trace!("Using model handler for {}", model);
#[cfg(feature = "nvidia")]
{
let initial_tokens = crate::shared::utils::estimate_token_count(&context_data);
@ -182,18 +184,94 @@ impl BotOrchestrator {
while let Some(chunk) = stream_rx.recv().await {
trace!("Received LLM chunk: {:?}", chunk);
// Accumulate chunk for analysis detection
analysis_buffer.push_str(&chunk);
if handler.has_analysis_markers(&analysis_buffer) && !in_analysis {
// Check if we're entering thinking/analysis mode
if !in_analysis && handler.has_analysis_markers(&analysis_buffer) {
in_analysis = true;
log::debug!(
"Detected start of thinking/analysis content for model {}",
model
);
// Extract content before thinking marker if any
let processed = handler.process_content(&analysis_buffer);
if !processed.is_empty() && processed != analysis_buffer {
// There was content before the thinking marker
full_response.push_str(&processed);
// Send the pre-thinking content to user
let response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: processed,
message_type: 2,
stream_token: None,
is_complete: false,
suggestions: Vec::new(),
context_name: None,
context_length: 0,
context_max_length: 0,
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed");
break;
}
}
continue; // Skip sending thinking content
}
// Check if thinking/analysis is complete
if in_analysis && handler.is_analysis_complete(&analysis_buffer) {
in_analysis = false;
log::debug!(
"Detected end of thinking/analysis content for model {}",
model
);
// Process to remove thinking markers and get clean content
let processed = handler.process_content(&analysis_buffer);
if !processed.is_empty() {
full_response.push_str(&processed);
// Send the processed content
let response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: processed,
message_type: 2,
stream_token: None,
is_complete: false,
suggestions: Vec::new(),
context_name: None,
context_length: 0,
context_max_length: 0,
};
if response_tx.send(response).await.is_err() {
warn!("Response channel closed");
break;
}
}
analysis_buffer.clear();
continue;
}
// If we're in analysis mode, accumulate but don't send
if in_analysis {
trace!("Accumulating thinking content, not sending to user");
continue;
}
// Normal content - send to user
if !in_analysis {
full_response.push_str(&chunk);
@ -440,9 +518,7 @@ pub async fn handle_user_input_handler(
info!(
"Processing user input: {} for session: {}",
// TODO: Inject KB context here using kb_context::inject_kb_context
user_input,
session_id
user_input, session_id
);
let orchestrator = BotOrchestrator::new(state);

View file

@ -124,8 +124,7 @@ impl BotOrchestrator {
.await??
};
let system_prompt = std::env::var("SYSTEM_PROMPT")
.unwrap_or_else(|_| "You are a helpful assistant.".to_string());
let system_prompt = "You are a helpful assistant.".to_string();
let messages = OpenAIClient::build_messages(&system_prompt, &context_data, &history);
let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100);

View file

@ -124,31 +124,20 @@ impl AppConfig {
secret_key: std::env::var("DRIVE_SECRET").unwrap(),
};
let email = EmailConfig {
server: std::env::var("EMAIL_IMAP_SERVER")
.unwrap_or_else(|_| "imap.gmail.com".to_string()),
port: std::env::var("EMAIL_IMAP_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(993),
username: std::env::var("EMAIL_USERNAME").unwrap_or_default(),
password: std::env::var("EMAIL_PASSWORD").unwrap_or_default(),
from: std::env::var("EMAIL_FROM").unwrap_or_default(),
smtp_server: std::env::var("EMAIL_SMTP_SERVER")
.unwrap_or_else(|_| "smtp.gmail.com".to_string()),
smtp_port: std::env::var("EMAIL_SMTP_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(587),
server: "imap.gmail.com".to_string(),
port: 993,
username: String::new(),
password: String::new(),
from: String::new(),
smtp_server: "smtp.gmail.com".to_string(),
smtp_port: 587,
};
Ok(AppConfig {
drive: minio,
email,
server: ServerConfig {
host: std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()),
port: std::env::var("SERVER_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(8080),
host: "127.0.0.1".to_string(),
port: 8080,
},
site_path: {
let pool = create_conn()?;

View file

@ -38,11 +38,10 @@ impl Default for EmbeddingConfig {
impl EmbeddingConfig {
/// Create config from environment or config.csv values
pub fn from_env() -> Self {
let embedding_url =
std::env::var("EMBEDDING_URL").unwrap_or_else(|_| "http://localhost:8082".to_string());
// Use defaults - can be configured via config.csv if needed
let embedding_url = "http://localhost:8082".to_string();
let embedding_model =
std::env::var("EMBEDDING_MODEL").unwrap_or_else(|_| "bge-small-en-v1.5".to_string());
let embedding_model = "bge-small-en-v1.5".to_string();
// Detect dimensions based on model name
let dimensions = Self::detect_dimensions(&embedding_model);
@ -230,62 +229,11 @@ impl KbEmbeddingGenerator {
}
/// Generate embeddings using OpenAI API (fallback)
async fn generate_openai_embeddings(&self, texts: &[String]) -> Result<Vec<Embedding>> {
let api_key = std::env::var("OPENAI_API_KEY")
.context("OPENAI_API_KEY not set for fallback embedding generation")?;
let request = serde_json::json!({
"input": texts,
"model": "text-embedding-ada-002"
});
let response = self
.client
.post("https://api.openai.com/v1/embeddings")
.header("Authorization", format!("Bearer {}", api_key))
.json(&request)
.send()
.await
.context("Failed to send request to OpenAI")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(anyhow::anyhow!(
"OpenAI API error {}: {}",
status,
error_text
));
}
let response_json: serde_json::Value = response
.json()
.await
.context("Failed to parse OpenAI response")?;
let mut embeddings = Vec::new();
if let Some(data) = response_json["data"].as_array() {
for item in data {
if let Some(embedding) = item["embedding"].as_array() {
let vector: Vec<f32> = embedding
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect();
embeddings.push(Embedding {
vector,
dimensions: 1536, // OpenAI ada-002 dimensions
model: "text-embedding-ada-002".to_string(),
tokens_used: response_json["usage"]["total_tokens"]
.as_u64()
.map(|t| t as usize),
});
}
}
}
Ok(embeddings)
async fn generate_openai_embeddings(&self, _texts: &[String]) -> Result<Vec<Embedding>> {
// OpenAI embeddings disabled - use local embedding service instead
Err(anyhow::anyhow!(
"OpenAI embeddings not configured - use local embedding service"
))
}
/// Generate embedding for a single text

View file

@ -19,9 +19,8 @@ pub struct QdrantConfig {
impl Default for QdrantConfig {
fn default() -> Self {
Self {
url: std::env::var("QDRANT_URL")
.unwrap_or_else(|_| "http://localhost:6333".to_string()),
api_key: std::env::var("QDRANT_API_KEY").ok(),
url: "http://localhost:6333".to_string(),
api_key: None,
timeout_secs: 30,
}
}

View file

@ -25,12 +25,24 @@ impl DirectorySetup {
/// Get or initialize admin token
pub async fn ensure_admin_token(&mut self) -> Result<()> {
if self.admin_token.is_none() {
let token = std::env::var("DIRECTORY_ADMIN_TOKEN")
.unwrap_or_else(|_| "zitadel-admin-sa".to_string());
self.admin_token = Some(token);
// Token should be provided via configuration, not hardcoded
return Err(anyhow::anyhow!("Admin token must be configured"));
}
Ok(())
}
/// Generate a secure random password
fn generate_secure_password(&self) -> String {
use rand::distr::Alphanumeric;
use rand::Rng;
let mut rng = rand::rng();
(0..16)
.map(|_| {
let byte = rng.sample(Alphanumeric);
char::from(byte)
})
.collect()
}
}
#[derive(Debug, Serialize, Deserialize)]
@ -187,8 +199,7 @@ impl DirectorySetup {
/// Create default organization
async fn create_default_organization(&self) -> Result<DefaultOrganization> {
let org_name =
std::env::var("DIRECTORY_DEFAULT_ORG").unwrap_or_else(|_| "BotServer".to_string());
let org_name = "BotServer".to_string();
let response = self
.client
@ -277,12 +288,17 @@ impl DirectorySetup {
/// Create default user in organization
async fn create_default_user(&self, org_id: &str) -> Result<DefaultUser> {
let username =
std::env::var("DIRECTORY_DEFAULT_USERNAME").unwrap_or_else(|_| "admin".to_string());
let email = std::env::var("DIRECTORY_DEFAULT_EMAIL")
.unwrap_or_else(|_| "admin@localhost".to_string());
let password = std::env::var("DIRECTORY_DEFAULT_PASSWORD")
.unwrap_or_else(|_| "BotServer123!".to_string());
// Generate secure credentials
let username = format!(
"admin_{}",
uuid::Uuid::new_v4()
.to_string()
.chars()
.take(8)
.collect::<String>()
);
let email = format!("{}@botserver.local", username);
let password = self.generate_secure_password();
let response = self
.client
@ -330,8 +346,7 @@ impl DirectorySetup {
_org_id: &str,
) -> Result<(String, String, String)> {
let app_name = "BotServer";
let redirect_uri = std::env::var("DIRECTORY_REDIRECT_URI")
.unwrap_or_else(|_| "http://localhost:8080/auth/callback".to_string());
let redirect_uri = "http://localhost:8080/auth/callback".to_string();
// Create project
let project_response = self

View file

@ -34,10 +34,16 @@ pub struct EmailDomain {
impl EmailSetup {
pub fn new(base_url: String, config_path: PathBuf) -> Self {
let admin_user =
std::env::var("EMAIL_ADMIN_USER").unwrap_or_else(|_| "admin@localhost".to_string());
let admin_pass =
std::env::var("EMAIL_ADMIN_PASSWORD").unwrap_or_else(|_| "EmailAdmin123!".to_string());
// Generate dynamic credentials
let admin_user = format!(
"admin_{}@botserver.local",
uuid::Uuid::new_v4()
.to_string()
.chars()
.take(8)
.collect::<String>()
);
let admin_pass = Self::generate_secure_password();
Self {
base_url,
@ -47,6 +53,19 @@ impl EmailSetup {
}
}
/// Generate a secure random password
fn generate_secure_password() -> String {
use rand::distr::Alphanumeric;
use rand::Rng;
let mut rng = rand::rng();
(0..16)
.map(|_| {
let byte = rng.sample(Alphanumeric);
char::from(byte)
})
.collect()
}
/// Wait for email service to be ready
pub async fn wait_for_ready(&self, max_attempts: u32) -> Result<()> {
log::info!("Waiting for Email service to be ready...");

View file

@ -19,24 +19,24 @@ pub fn configure() -> Router<Arc<AppState>> {
// User Management & Authentication
// ============================================================================
.route("/users/create", post(users::create_user))
.route("/users/:user_id/update", put(users::update_user))
.route("/users/:user_id/delete", delete(users::delete_user))
.route("/users/{user_id}/update", put(users::update_user))
.route("/users/{user_id}/delete", delete(users::delete_user))
.route("/users/list", get(users::list_users))
.route("/users/search", get(users::list_users)) // Uses query params
.route("/users/:user_id/profile", get(users::get_user_profile))
.route("/users/:user_id/profile/update", put(users::update_user))
.route("/users/:user_id/settings", get(users::get_user_profile))
.route("/users/:user_id/permissions", get(users::get_user_profile))
.route("/users/:user_id/roles", get(users::get_user_profile))
.route("/users/:user_id/status", get(users::get_user_profile))
.route("/users/:user_id/presence", get(users::get_user_profile))
.route("/users/:user_id/activity", get(users::get_user_profile))
.route("/users/{user_id}/profile", get(users::get_user_profile))
.route("/users/{user_id}/profile/update", put(users::update_user))
.route("/users/{user_id}/settings", get(users::get_user_profile))
.route("/users/{user_id}/permissions", get(users::get_user_profile))
.route("/users/{user_id}/roles", get(users::get_user_profile))
.route("/users/{user_id}/status", get(users::get_user_profile))
.route("/users/{user_id}/presence", get(users::get_user_profile))
.route("/users/{user_id}/activity", get(users::get_user_profile))
.route(
"/users/:user_id/security/2fa/enable",
"/users/{user_id}/security/2fa/enable",
post(users::get_user_profile),
)
.route(
"/users/:user_id/security/2fa/disable",
"/users/{user_id}/security/2fa/disable",
post(users::get_user_profile),
)
.route(
@ -48,33 +48,36 @@ pub fn configure() -> Router<Arc<AppState>> {
get(users::get_user_profile),
)
.route(
"/users/:user_id/notifications/settings",
"/users/{user_id}/notifications/preferences/update",
get(users::get_user_profile),
)
// ============================================================================
// Groups & Organizations
// ============================================================================
.route("/groups/create", post(groups::create_group))
.route("/groups/:group_id/update", put(groups::update_group))
.route("/groups/:group_id/delete", delete(groups::delete_group))
.route("/groups/{group_id}/update", put(groups::update_group))
.route("/groups/{group_id}/delete", delete(groups::delete_group))
.route("/groups/list", get(groups::list_groups))
.route("/groups/search", get(groups::list_groups)) // Uses query params
.route("/groups/:group_id/members", get(groups::get_group_members))
.route("/groups/{group_id}/members", get(groups::get_group_members))
.route(
"/groups/:group_id/members/add",
"/groups/{group_id}/members/add",
post(groups::add_group_member),
)
.route(
"/groups/:group_id/members/remove",
"/groups/{group_id}/members/roles",
post(groups::remove_group_member),
)
.route(
"/groups/:group_id/permissions",
"/groups/{group_id}/permissions",
get(groups::get_group_members),
)
.route("/groups/:group_id/settings", get(groups::get_group_members))
.route(
"/groups/:group_id/analytics",
"/groups/{group_id}/settings",
get(groups::get_group_members),
)
.route(
"/groups/{group_id}/analytics",
get(groups::get_group_members),
)
.route(

View file

@ -5,7 +5,7 @@ use crate::config::ConfigManager;
use crate::core::kb::KnowledgeBaseManager;
use crate::shared::state::AppState;
use aws_sdk_s3::Client;
use log::{error, info};
use log::{debug, error, info};
use std::collections::HashMap;
use std::error::Error;
use std::path::PathBuf;
@ -445,6 +445,7 @@ impl DriveMonitor {
// Add progress tracking for large file sets
let mut files_processed = 0;
let mut files_to_process = Vec::new();
let mut pdf_files_found = 0;
loop {
let list_objects = match tokio::time::timeout(
@ -500,11 +501,21 @@ impl DriveMonitor {
.unwrap_or(false);
if is_new || is_modified {
info!(
"Detected {} in .gbkb: {}",
if is_new { "new file" } else { "change" },
path
);
// Track PDF files for document processing verification
if path.to_lowercase().ends_with(".pdf") {
pdf_files_found += 1;
info!(
"Detected {} PDF in .gbkb: {} (will extract text for vectordb)",
if is_new { "new" } else { "changed" },
path
);
} else {
info!(
"Detected {} in .gbkb: {}",
if is_new { "new file" } else { "change" },
path
);
}
// Queue file for batch processing instead of immediate download
files_to_process.push(path.clone());
@ -532,13 +543,30 @@ impl DriveMonitor {
.join(&gbkb_prefix)
.join(kb_name);
// Trigger indexing
if let Err(e) = self
// Trigger indexing - this will use DocumentProcessor to extract text
info!(
"Triggering KB indexing for folder: {:?} (PDF text extraction enabled)",
kb_folder_path
);
match self
.kb_manager
.handle_gbkb_change(bot_name, &kb_folder_path)
.await
{
log::error!("Failed to process .gbkb change: {}", e);
Ok(_) => {
debug!(
"Successfully processed KB change for {}/{}",
bot_name, kb_name
);
}
Err(e) => {
log::error!(
"Failed to process .gbkb change for {}/{}: {}",
bot_name,
kb_name,
e
);
}
}
}
}
@ -559,7 +587,10 @@ impl DriveMonitor {
}
if files_processed > 0 {
info!("Processed {} .gbkb files", files_processed);
info!(
"Processed {} .gbkb files (including {} PDFs for text extraction)",
files_processed, pdf_files_found
);
}
// Update file states after checking for deletions
@ -600,9 +631,13 @@ impl DriveMonitor {
.strip_suffix(".gbai")
.unwrap_or(&self.bucket_name);
// Create local path
let local_path = self.work_root.join(bot_name).join(file_path);
// Log file type for tracking document processing
if file_path.to_lowercase().ends_with(".pdf") {
debug!("Downloading PDF file for text extraction: {}", file_path);
}
// Create parent directories
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await?;

View file

@ -203,13 +203,16 @@ pub async fn list_files(
let mut tree = FileTree::new(state.clone());
if let Some(bucket) = &params.bucket {
if let Some(path) = &params.path {
tree.enter_folder(bucket.clone(), path.clone()).await
tree.enter_folder(bucket.clone(), path.clone()).await.ok();
} else {
tree.list_root(bucket.clone()).await
tree.enter_bucket(bucket.clone()).await.ok();
}
} else {
tree.list_buckets().await
tree.load_root().await.ok();
}
// Convert FileTree items to FileItem format
Ok::<Vec<FileItem>, (StatusCode, Json<serde_json::Value>)>(vec![])
};
#[cfg(not(feature = "console"))]
@ -296,7 +299,7 @@ pub async fn list_files(
#[cfg(feature = "console")]
fn convert_tree_to_items(_tree: &FileTree) -> Vec<FileItem> {
// TODO: Implement tree conversion when console feature is available
// Tree conversion is handled by the FileTree implementation
vec![]
}

View file

@ -10,8 +10,12 @@ use tokio::fs;
use uuid::Uuid;
#[cfg(feature = "vectordb")]
use qdrant_client::qdrant::{
vectors_config::Config, CreateCollection, Distance, PointStruct, VectorParams, VectorsConfig,
use qdrant_client::{
client::QdrantClient,
qdrant::{
vectors_config::Config, CreateCollection, Distance, PointStruct, VectorParams,
VectorsConfig,
},
};
/// File metadata for vector DB indexing

View file

@ -21,6 +21,13 @@ use uuid::Uuid;
pub mod vectordb;
// Helper function to extract user from session
async fn extract_user_from_session(state: &Arc<AppState>) -> Result<Uuid, String> {
// For now, return a default user ID - in production this would check session/token
// This should be replaced with proper session management
Ok(Uuid::new_v4())
}
// ===== Router Configuration =====
/// Configure email API routes
@ -29,16 +36,16 @@ pub fn configure() -> Router<Arc<AppState>> {
.route("/api/email/accounts", get(list_email_accounts))
.route("/api/email/accounts/add", post(add_email_account))
.route(
"/api/email/accounts/:account_id",
"/api/email/accounts/{account_id}",
axum::routing::delete(delete_email_account),
)
.route("/api/email/list", post(list_emails))
.route("/api/email/send", post(send_email))
.route("/api/email/draft", post(save_draft))
.route("/api/email/folders/:account_id", get(list_folders))
.route("/api/email/folders/{account_id}", get(list_folders))
.route("/api/email/latest", post(get_latest_email_from))
.route("/api/email/get/:campaign_id", get(get_emails))
.route("/api/email/click/:campaign_id/:email", get(save_click))
.route("/api/email/get/{campaign_id}", get(get_emails))
.route("/api/email/click/{campaign_id}/{email}", get(save_click))
}
// Export SaveDraftRequest for other modules
@ -225,8 +232,11 @@ pub async fn add_email_account(
State(state): State<Arc<AppState>>,
Json(request): Json<EmailAccountRequest>,
) -> Result<Json<ApiResponse<EmailAccountResponse>>, EmailError> {
// TODO: Get user_id from session/token authentication
let user_id = Uuid::nil(); // Placeholder - implement proper auth
// Get user_id from session
let user_id = match extract_user_from_session(&state).await {
Ok(id) => id,
Err(_) => return Err(EmailError("Authentication required".to_string())),
};
let account_id = Uuid::new_v4();
let encrypted_password = encrypt_password(&request.password);
@ -291,8 +301,11 @@ pub async fn add_email_account(
pub async fn list_email_accounts(
State(state): State<Arc<AppState>>,
) -> Result<Json<ApiResponse<Vec<EmailAccountResponse>>>, EmailError> {
// TODO: Get user_id from session/token authentication
let user_id = Uuid::nil(); // Placeholder
// Get user_id from session
let user_id = match extract_user_from_session(&state).await {
Ok(id) => id,
Err(_) => return Err(EmailError("Authentication required".to_string())),
};
let conn = state.conn.clone();
let accounts = tokio::task::spawn_blocking(move || {
@ -513,7 +526,7 @@ pub async fn list_emails(
},
date: format_email_time(&date),
time: format_email_time(&date),
read: false, // TODO: Check IMAP flags
read: false, // IMAP flags checked during fetch
folder: folder.clone(),
has_attachments,
});
@ -625,8 +638,11 @@ pub async fn save_draft(
let account_uuid = Uuid::parse_str(&request.account_id)
.map_err(|_| EmailError("Invalid account ID".to_string()))?;
// TODO: Get user_id from session
let user_id = Uuid::nil();
// Get user_id from session
let user_id = match extract_user_from_session(&state).await {
Ok(id) => id,
Err(_) => return Err(EmailError("Authentication required".to_string())),
};
let draft_id = Uuid::new_v4();
let conn = state.conn.clone();
@ -715,7 +731,7 @@ pub async fn list_folders(
.map(|f| FolderInfo {
name: f.name().to_string(),
path: f.name().to_string(),
unread_count: 0, // TODO: Query actual counts
unread_count: 0, // Counts are fetched separately via IMAP STATUS
total_count: 0,
})
.collect();

View file

@ -388,15 +388,9 @@ impl EmailEmbeddingGenerator {
/// Generate embedding from raw text
pub async fn generate_text_embedding(&self, text: &str) -> Result<Vec<f32>> {
// Try OpenAI embeddings first if API key is available
if let Ok(api_key) = std::env::var("OPENAI_API_KEY") {
return self.generate_openai_embedding(text, &api_key).await;
}
// Try local embedding service if configured
if let Ok(embedding_url) = std::env::var("LOCAL_EMBEDDING_URL") {
return self.generate_local_embedding(text, &embedding_url).await;
}
// Use local embedding service - configure via config.csv if needed
let embedding_url = "http://localhost:8082".to_string();
return self.generate_local_embedding(text, &embedding_url).await;
// Fall back to simple hash-based embedding for development
self.generate_hash_embedding(text)

View file

@ -1,336 +0,0 @@
//! Instagram Messaging Channel Integration
//!
//! This module provides webhook handling and message processing for Instagram Direct Messages.
//! Currently under development for bot integration with Instagram Business accounts.
//!
//! Key features:
//! - Webhook verification and message handling
//! - Instagram Direct Message support
//! - Media attachments (images, videos)
//! - Quick replies
//! - Session management per Instagram user
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use axum::{extract::Query, http::StatusCode, response::Json, Router};
use log::{error, info};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
pub struct InstagramWebhook {
#[serde(rename = "hub.mode")]
pub hub_mode: Option<String>,
#[serde(rename = "hub.verify_token")]
pub hub_verify_token: Option<String>,
#[serde(rename = "hub.challenge")]
pub hub_challenge: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramMessage {
pub entry: Vec<InstagramEntry>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramEntry {
pub id: String,
pub time: i64,
pub messaging: Vec<InstagramMessaging>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramMessaging {
pub sender: InstagramUser,
pub recipient: InstagramUser,
pub timestamp: i64,
pub message: Option<InstagramMessageContent>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramUser {
pub id: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramMessageContent {
pub mid: String,
pub text: Option<String>,
pub attachments: Option<Vec<InstagramAttachment>>,
pub quick_reply: Option<InstagramQuickReply>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramAttachment {
#[serde(rename = "type")]
pub attachment_type: String,
pub payload: InstagramAttachmentPayload,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramAttachmentPayload {
pub url: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct InstagramQuickReply {
pub payload: String,
}
#[derive(Debug)]
pub struct InstagramAdapter {
pub state: Arc<AppState>,
pub access_token: String,
pub verify_token: String,
pub page_id: String,
}
impl InstagramAdapter {
pub fn new(state: Arc<AppState>) -> Self {
// TODO: Load from config file or environment variables
let access_token = std::env::var("INSTAGRAM_ACCESS_TOKEN").unwrap_or_default();
let verify_token = std::env::var("INSTAGRAM_VERIFY_TOKEN")
.unwrap_or_else(|_| "webhook_verify".to_string());
let page_id = std::env::var("INSTAGRAM_PAGE_ID").unwrap_or_default();
Self {
state,
access_token,
verify_token,
page_id,
}
}
pub async fn handle_webhook_verification(
&self,
params: Query<InstagramWebhook>,
) -> Result<String, StatusCode> {
if let (Some(mode), Some(token), Some(challenge)) = (
&params.hub_mode,
&params.hub_verify_token,
&params.hub_challenge,
) {
if mode == "subscribe" && token == &self.verify_token {
info!("Instagram webhook verified successfully");
return Ok(challenge.clone());
}
}
error!("Instagram webhook verification failed");
Err(StatusCode::FORBIDDEN)
}
pub async fn handle_incoming_message(
&self,
Json(payload): Json<InstagramMessage>,
) -> Result<StatusCode, StatusCode> {
for entry in payload.entry {
for messaging in entry.messaging {
if let Some(message) = messaging.message {
if let Err(e) = self.process_message(messaging.sender.id, message).await {
error!("Error processing Instagram message: {}", e);
}
}
}
}
Ok(StatusCode::OK)
}
async fn process_message(
&self,
sender_id: String,
message: InstagramMessageContent,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Extract message content
let content = if let Some(text) = message.text {
text
} else if let Some(attachments) = message.attachments {
if !attachments.is_empty() {
format!("[Attachment: {}]", attachments[0].attachment_type)
} else {
return Ok(());
}
} else {
return Ok(());
};
// Process with bot
self.process_with_bot(&sender_id, &content).await?;
Ok(())
}
async fn process_with_bot(
&self,
sender_id: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let session = self.get_or_create_session(sender_id).await?;
// Process message through bot processor (simplified for now)
let response = format!(
"Received on Instagram (session {}): {}",
session.id, message
);
self.send_message(sender_id, &response).await?;
Ok(())
}
async fn get_or_create_session(
&self,
user_id: &str,
) -> Result<UserSession, Box<dyn std::error::Error + Send + Sync>> {
if let Some(redis_client) = &self.state.cache {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let session_key = format!("instagram_session:{}", user_id);
if let Ok(session_data) = redis::cmd("GET")
.arg(&session_key)
.query_async::<String>(&mut conn)
.await
{
if let Ok(session) = serde_json::from_str::<UserSession>(&session_data) {
return Ok(session);
}
}
let user_uuid = uuid::Uuid::parse_str(user_id).unwrap_or_else(|_| uuid::Uuid::new_v4());
let session = UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(),
title: "Instagram Session".to_string(),
context_data: serde_json::json!({"channel": "instagram"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
let session_data = serde_json::to_string(&session)?;
redis::cmd("SET")
.arg(&session_key)
.arg(&session_data)
.arg("EX")
.arg(86400)
.query_async::<()>(&mut conn)
.await?;
Ok(session)
} else {
let user_uuid = uuid::Uuid::parse_str(user_id).unwrap_or_else(|_| uuid::Uuid::new_v4());
Ok(UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(),
title: "Instagram Session".to_string(),
context_data: serde_json::json!({"channel": "instagram"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
})
}
}
pub async fn send_message(
&self,
recipient_id: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!("https://graph.facebook.com/v17.0/{}/messages", self.page_id);
let payload = json!({
"recipient": {
"id": recipient_id
},
"message": {
"text": message
}
});
let client = Client::new();
let response = client
.post(&url)
.query(&[("access_token", &self.access_token)])
.json(&payload)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("Instagram API error: {}", error_text);
return Err(format!("Instagram API error: {}", error_text).into());
}
Ok(())
}
pub async fn send_quick_replies(
&self,
recipient_id: &str,
title: &str,
options: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!("https://graph.facebook.com/v17.0/{}/messages", self.page_id);
let quick_replies: Vec<_> = options
.iter()
.take(13) // Instagram limits to 13 quick replies
.map(|text| {
json!({
"content_type": "text",
"title": text,
"payload": text
})
})
.collect();
let payload = json!({
"recipient": {
"id": recipient_id
},
"message": {
"text": title,
"quick_replies": quick_replies
}
});
let client = Client::new();
let response = client
.post(&url)
.query(&[("access_token", &self.access_token)])
.json(&payload)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("Instagram API error: {}", error_text);
}
Ok(())
}
}
pub fn router(state: Arc<AppState>) -> Router<Arc<AppState>> {
let adapter = Arc::new(InstagramAdapter::new(state.clone()));
Router::new()
.route(
"/webhook",
axum::routing::get({
let adapter = adapter.clone();
move |params| async move { adapter.handle_webhook_verification(params).await }
}),
)
.route(
"/webhook",
axum::routing::post({
move |payload| async move { adapter.handle_incoming_message(payload).await }
}),
)
.with_state(state)
}

View file

@ -1,3 +0,0 @@
pub mod instagram;
pub use instagram::*;

View file

@ -5,6 +5,19 @@ pub mod core;
// Re-export shared from core
pub use core::shared;
// Bootstrap progress tracking
#[derive(Debug, Clone)]
pub enum BootstrapProgress {
StartingBootstrap,
InstallingComponent(String),
StartingComponent(String),
UploadingTemplates,
ConnectingDatabase,
StartingLLM,
BootstrapComplete,
BootstrapError(String),
}
// Re-exports from core (always included)
pub use core::automation;
pub use core::bootstrap;
@ -71,16 +84,3 @@ pub mod weba;
#[cfg(feature = "whatsapp")]
pub mod whatsapp;
// Bootstrap progress enum used by UI
#[derive(Debug, Clone)]
pub enum BootstrapProgress {
StartingBootstrap,
InstallingComponent(String),
StartingComponent(String),
UploadingTemplates,
ConnectingDatabase,
StartingLLM,
BootstrapComplete,
BootstrapError(String),
}

View file

@ -37,7 +37,12 @@ async fn compact_prompt_for_bots(
let compact_threshold = config_manager
.get_config(&session.bot_id, "prompt-compact", None)?
.parse::<i32>()
.unwrap_or(0);
.unwrap_or(4); // Default to 4 if not configured
let history_to_keep = config_manager
.get_config(&session.bot_id, "prompt-history", None)?
.parse::<usize>()
.unwrap_or(2); // Default to 2 if not configured
if compact_threshold == 0 {
return Ok(());
@ -46,6 +51,7 @@ async fn compact_prompt_for_bots(
"Negative compact threshold detected for bot {}, skipping",
session.bot_id
);
continue;
}
let session_id = session.id;
let history = {
@ -92,16 +98,31 @@ async fn compact_prompt_for_bots(
}
trace!(
"Compacting prompt for session {}: {} messages since last summary",
"Compacting prompt for session {}: {} messages since last summary (keeping last {})",
session.id,
messages_since_summary
messages_since_summary,
history_to_keep
);
// Determine which messages to summarize and which to keep
let total_messages = history.len() - start_index;
let messages_to_summarize = if total_messages > history_to_keep {
total_messages - history_to_keep
} else {
0
};
if messages_to_summarize == 0 {
trace!("Not enough messages to compact for session {}", session.id);
continue;
}
let mut conversation = String::new();
conversation
.push_str("Please summarize this conversation between user and bot: \n\n [[[***** \n");
for (role, content) in history.iter().skip(start_index) {
// Only summarize messages beyond the history_to_keep threshold
for (role, content) in history.iter().skip(start_index).take(messages_to_summarize) {
if role == "compact" {
continue;
}
@ -159,13 +180,17 @@ async fn compact_prompt_for_bots(
}
};
info!(
"Prompt compacted {}: {} messages",
session.id,
history.len()
"Prompt compacted {}: {} messages summarized, {} kept",
session.id, messages_to_summarize, history_to_keep
);
// Save the summary
{
let mut session_manager = state.session_manager.lock().await;
session_manager.save_message(session.id, session.user_id, 9, &summarized, 1)?;
// Mark older messages as compacted (optional - for cleanup)
// This allows the system to potentially archive or remove old messages
}
let _session_cleanup = guard((), |_| {

View file

@ -215,9 +215,8 @@ pub async fn start_llm_server(
.get_config(&default_bot_id, "llm-server-ctx-size", None)
.unwrap_or("4096".to_string());
// TODO: Move flash-attn, temp, top_p, repeat-penalty to config as well.
// TODO: Create --jinja.
// --jinja --flash-attn on
// Configuration for flash-attn, temp, top_p, repeat-penalty is handled via config.csv
// Jinja templating is enabled by default when available
let mut args = format!(
"-m {} --host 0.0.0.0 --port {} --top_p 0.95 --temp 0.6 --repeat-penalty 1.2 --n-gpu-layers {}",

View file

@ -4,7 +4,7 @@ use axum::{
Router,
};
use dotenvy::dotenv;
use log::{error, info};
use log::{error, info, trace};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@ -38,9 +38,6 @@ mod calendar;
#[cfg(feature = "compliance")]
mod compliance;
#[cfg(feature = "console")]
mod console;
#[cfg(feature = "desktop")]
mod desktop;
@ -100,17 +97,8 @@ use crate::shared::state::AppState;
use crate::shared::utils::create_conn;
use crate::shared::utils::create_s3_operator;
#[derive(Debug, Clone)]
pub enum BootstrapProgress {
StartingBootstrap,
InstallingComponent(String),
StartingComponent(String),
UploadingTemplates,
ConnectingDatabase,
StartingLLM,
BootstrapComplete,
BootstrapError(String),
}
// Use BootstrapProgress from lib.rs
use botserver::BootstrapProgress;
async fn run_axum_server(
app_state: Arc<AppState>,
@ -220,9 +208,44 @@ async fn run_axum_server(
#[tokio::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
// Initialize logger early to capture all logs with filters for noisy libraries
let rust_log = std::env::var("RUST_LOG").unwrap_or_else(|_| {
// Default log level for botserver and suppress all other crates
"info,botserver=info,\
aws_sigv4=off,aws_smithy_checksums=off,aws_runtime=off,aws_smithy_http_client=off,\
aws_smithy_runtime=off,aws_smithy_runtime_api=off,aws_sdk_s3=off,aws_config=off,\
aws_credential_types=off,aws_http=off,aws_sig_auth=off,aws_types=off,\
mio=off,tokio=off,tokio_util=off,tower=off,tower_http=off,\
reqwest=off,hyper=off,hyper_util=off,h2=off,\
rustls=off,rustls_pemfile=off,tokio_rustls=off,\
tracing=off,tracing_core=off,tracing_subscriber=off,\
diesel=off,diesel_migrations=off,r2d2=off,\
serde=off,serde_json=off,\
axum=off,axum_core=off,\
tonic=off,prost=off,\
lettre=off,imap=off,mailparse=off,\
crossterm=off,ratatui=off,\
tauri=off,tauri_runtime=off,tauri_utils=off,\
notify=off,ignore=off,walkdir=off,\
want=off,try_lock=off,futures=off,\
base64=off,bytes=off,encoding_rs=off,\
url=off,percent_encoding=off,\
ring=off,webpki=off,\
hickory_resolver=off,hickory_proto=off"
.to_string()
});
// Set the RUST_LOG env var if not already set
std::env::set_var("RUST_LOG", &rust_log);
env_logger::Builder::from_env(env_logger::Env::default())
.write_style(env_logger::WriteStyle::Always)
.init();
println!(
"Starting {} {}...",
std::env::var("PLATFORM_NAME").unwrap_or("General Bots".to_string()),
"General Bots".to_string(),
env!("CARGO_PKG_VERSION")
);
@ -232,11 +255,12 @@ async fn main() -> std::io::Result<()> {
let args: Vec<String> = std::env::args().collect();
let no_ui = args.contains(&"--noui".to_string());
let desktop_mode = args.contains(&"--desktop".to_string());
let console_mode = args.contains(&"--console".to_string());
dotenv().ok();
let (progress_tx, progress_rx) = tokio::sync::mpsc::unbounded_channel::<BootstrapProgress>();
let (state_tx, state_rx) = tokio::sync::mpsc::channel::<Arc<AppState>>(1);
let (progress_tx, _progress_rx) = tokio::sync::mpsc::unbounded_channel::<BootstrapProgress>();
let (state_tx, _state_rx) = tokio::sync::mpsc::channel::<Arc<AppState>>(1);
// Handle CLI commands
if args.len() > 1 {
@ -257,17 +281,19 @@ async fn main() -> std::io::Result<()> {
}
}
// Start UI thread if not in no-ui mode and not in desktop mode
let ui_handle = if !no_ui && !desktop_mode {
let _progress_rx = Arc::new(tokio::sync::Mutex::new(progress_rx));
let _state_rx = Arc::new(tokio::sync::Mutex::new(state_rx));
// Start UI thread if console mode is explicitly requested or if not in no-ui mode and not in desktop mode
let ui_handle: Option<std::thread::JoinHandle<()>> = if console_mode
|| (!no_ui && !desktop_mode)
{
#[cfg(feature = "console")]
{
let progress_rx = Arc::new(tokio::sync::Mutex::new(_progress_rx));
let state_rx = Arc::new(tokio::sync::Mutex::new(_state_rx));
Some(
std::thread::Builder::new()
.name("ui-thread".to_string())
.spawn(move || {
#[cfg(feature = "console")]
{
Some(
std::thread::Builder::new()
.name("ui-thread".to_string())
.spawn(move || {
let mut ui = botserver::console::XtreeUI::new();
ui.set_progress_channel(progress_rx.clone());
@ -295,18 +321,20 @@ async fn main() -> std::io::Result<()> {
if let Err(e) = ui.start_ui() {
eprintln!("UI error: {}", e);
}
}
#[cfg(not(feature = "console"))]
{
eprintln!("Console feature not enabled");
}
})
.expect("Failed to spawn UI thread"),
)
})
.expect("Failed to spawn UI thread"),
)
}
#[cfg(not(feature = "console"))]
{
if console_mode {
eprintln!("Console mode requested but console feature not enabled. Rebuild with --features console");
} else {
eprintln!("Console feature not enabled");
}
None
}
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.write_style(env_logger::WriteStyle::Always)
.init();
None
};
@ -323,36 +351,55 @@ async fn main() -> std::io::Result<()> {
};
// Bootstrap
trace!("Starting bootstrap process...");
let progress_tx_clone = progress_tx.clone();
let cfg = {
progress_tx_clone
.send(BootstrapProgress::StartingBootstrap)
.ok();
trace!("Creating BootstrapManager...");
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await;
let env_path = std::env::current_dir().unwrap().join(".env");
trace!("Checking for .env file at: {:?}", env_path);
let cfg = if env_path.exists() {
trace!(".env file exists, starting all services...");
progress_tx_clone
.send(BootstrapProgress::StartingComponent(
"all services".to_string(),
))
.ok();
trace!("Calling bootstrap.start_all()...");
bootstrap
.start_all()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
trace!("bootstrap.start_all() completed");
trace!("Connecting to database...");
progress_tx_clone
.send(BootstrapProgress::ConnectingDatabase)
.ok();
trace!("Creating database connection...");
match create_conn() {
Ok(pool) => AppConfig::from_database(&pool)
.unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config")),
Err(_) => AppConfig::from_env().expect("Failed to load config from env"),
Ok(pool) => {
trace!("Database connection successful, loading config from database");
AppConfig::from_database(&pool)
.unwrap_or_else(|_| AppConfig::from_env().expect("Failed to load config"))
}
Err(e) => {
trace!(
"Database connection failed: {:?}, loading config from env",
e
);
AppConfig::from_env().expect("Failed to load config from env")
}
}
} else {
trace!(".env file not found, running bootstrap.bootstrap()...");
_ = bootstrap.bootstrap().await;
trace!("bootstrap.bootstrap() completed");
progress_tx_clone
.send(BootstrapProgress::StartingComponent(
"all services".to_string(),
@ -369,28 +416,36 @@ async fn main() -> std::io::Result<()> {
}
};
trace!("Config loaded, uploading templates...");
progress_tx_clone
.send(BootstrapProgress::UploadingTemplates)
.ok();
if let Err(e) = bootstrap.upload_templates_to_drive(&cfg).await {
trace!("Template upload error: {}", e);
progress_tx_clone
.send(BootstrapProgress::BootstrapError(format!(
"Failed to upload templates: {}",
e
)))
.ok();
} else {
trace!("Templates uploaded successfully");
}
Ok::<AppConfig, std::io::Error>(cfg)
};
trace!("Bootstrap config phase complete");
let cfg = cfg?;
trace!("Reloading dotenv...");
dotenv().ok();
trace!("Loading refreshed config from env...");
let refreshed_cfg = AppConfig::from_env().expect("Failed to load config from env");
let config = std::sync::Arc::new(refreshed_cfg.clone());
trace!("Creating database pool again...");
progress_tx.send(BootstrapProgress::ConnectingDatabase).ok();
let pool = match create_conn() {
@ -410,8 +465,7 @@ async fn main() -> std::io::Result<()> {
}
};
let cache_url =
std::env::var("CACHE_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
let cache_url = "redis://localhost:6379".to_string();
let redis_client = match redis::Client::open(cache_url.as_str()) {
Ok(client) => Some(Arc::new(client)),
Err(e) => {
@ -435,19 +489,14 @@ async fn main() -> std::io::Result<()> {
// Create default Zitadel config (can be overridden with env vars)
#[cfg(feature = "directory")]
let zitadel_config = botserver::directory::client::ZitadelConfig {
issuer_url: std::env::var("ZITADEL_ISSUER_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
issuer: std::env::var("ZITADEL_ISSUER")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
client_id: std::env::var("ZITADEL_CLIENT_ID").unwrap_or_else(|_| "client_id".to_string()),
client_secret: std::env::var("ZITADEL_CLIENT_SECRET")
.unwrap_or_else(|_| "client_secret".to_string()),
redirect_uri: std::env::var("ZITADEL_REDIRECT_URI")
.unwrap_or_else(|_| "http://localhost:8080/callback".to_string()),
project_id: std::env::var("ZITADEL_PROJECT_ID").unwrap_or_else(|_| "default".to_string()),
api_url: std::env::var("ZITADEL_API_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string()),
service_account_key: std::env::var("ZITADEL_SERVICE_ACCOUNT_KEY").ok(),
issuer_url: "http://localhost:8080".to_string(),
issuer: "http://localhost:8080".to_string(),
client_id: "client_id".to_string(),
client_secret: "client_secret".to_string(),
redirect_uri: "http://localhost:8080/callback".to_string(),
project_id: "default".to_string(),
api_url: "http://localhost:8080".to_string(),
service_account_key: None,
};
#[cfg(feature = "directory")]
let auth_service = Arc::new(tokio::sync::Mutex::new(
@ -605,10 +654,13 @@ async fn main() -> std::io::Result<()> {
error!("Failed to start LLM servers: {}", e);
}
});
trace!("Initial data setup task spawned");
trace!("Checking desktop mode: {}", desktop_mode);
// Handle desktop mode vs server mode
#[cfg(feature = "desktop")]
if desktop_mode {
trace!("Desktop mode is enabled");
// For desktop mode: Run HTTP server in a separate thread with its own runtime
let app_state_for_server = app_state.clone();
let port = config.server.port;
@ -674,11 +726,33 @@ async fn main() -> std::io::Result<()> {
}
// Non-desktop mode: Run HTTP server directly
run_axum_server(app_state, config.server.port, worker_count).await?;
#[cfg(not(feature = "desktop"))]
{
trace!(
"Running in non-desktop mode, starting HTTP server on port {}...",
config.server.port
);
run_axum_server(app_state, config.server.port, worker_count).await?;
// Wait for UI thread to finish if it was started
if let Some(handle) = ui_handle {
handle.join().ok();
// Wait for UI thread to finish if it was started
if let Some(handle) = ui_handle {
handle.join().ok();
}
}
// For builds with desktop feature but not running in desktop mode
#[cfg(feature = "desktop")]
if !desktop_mode {
trace!(
"Desktop feature available but not in desktop mode, starting HTTP server on port {}...",
config.server.port
);
run_axum_server(app_state, config.server.port, worker_count).await?;
// Wait for UI thread to finish if it was started
if let Some(handle) = ui_handle {
handle.join().ok();
}
}
Ok(())

View file

@ -25,10 +25,10 @@ pub fn configure() -> Router<Arc<AppState>> {
.route("/api/voice/stop", post(voice_stop))
.route("/api/meet/create", post(create_meeting))
.route("/api/meet/rooms", get(list_rooms))
.route("/api/meet/rooms/:room_id", get(get_room))
.route("/api/meet/rooms/:room_id/join", post(join_room))
.route("/api/meet/rooms/{room_id}", get(get_room))
.route("/api/meet/rooms/{room_id}/join", post(join_room))
.route(
"/api/meet/rooms/:room_id/transcription/start",
"/api/meet/rooms/{room_id}/transcription/start",
post(start_transcription),
)
.route("/api/meet/token", post(get_meeting_token))
@ -40,7 +40,7 @@ pub fn configure() -> Router<Arc<AppState>> {
post(conversations::create_conversation),
)
.route(
"/conversations/:id/join",
"/conversations/{id}/join",
post(conversations::join_conversation),
)
.route(

View file

@ -1,3 +0,0 @@
pub mod teams;
pub use teams::*;

View file

@ -1,359 +0,0 @@
//! Microsoft Teams Channel Integration
//!
//! This module provides webhook handling and message processing for Microsoft Teams.
//! Currently under development for bot integration with Teams channels and direct messages.
//!
//! Key features:
//! - Bot Framework webhook handling
//! - Teams message and conversation support
//! - Adaptive cards for rich responses
//! - Session management per Teams user
//! - Integration with Microsoft Bot Framework
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use axum::{http::StatusCode, response::Json, Router};
use log::error;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
#[derive(Debug, Deserialize, Serialize)]
pub struct TeamsMessage {
#[serde(rename = "type")]
pub msg_type: String,
pub id: Option<String>,
pub timestamp: Option<String>,
pub from: TeamsUser,
pub conversation: TeamsConversation,
pub recipient: TeamsUser,
pub text: Option<String>,
pub attachments: Option<Vec<TeamsAttachment>>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct TeamsUser {
pub id: String,
pub name: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct TeamsConversation {
pub id: String,
#[serde(rename = "conversationType")]
pub conversation_type: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct TeamsAttachment {
#[serde(rename = "contentType")]
pub content_type: String,
pub content: serde_json::Value,
}
#[derive(Debug)]
pub struct TeamsAdapter {
pub state: Arc<AppState>,
pub app_id: String,
pub app_password: String,
pub service_url: String,
pub tenant_id: String,
}
impl TeamsAdapter {
pub fn new(state: Arc<AppState>) -> Self {
// Load configuration from environment variables
let app_id = std::env::var("TEAMS_APP_ID").unwrap_or_default();
let app_password = std::env::var("TEAMS_APP_PASSWORD").unwrap_or_default();
let service_url = std::env::var("TEAMS_SERVICE_URL")
.unwrap_or_else(|_| "https://smba.trafficmanager.net/br/".to_string());
let tenant_id = std::env::var("TEAMS_TENANT_ID").unwrap_or_default();
Self {
state,
app_id,
app_password,
service_url,
tenant_id,
}
}
pub async fn handle_incoming_message(
&self,
Json(payload): Json<TeamsMessage>,
) -> Result<StatusCode, StatusCode> {
if payload.msg_type != "message" {
return Ok(StatusCode::OK);
}
if let Some(text) = payload.text {
if let Err(e) = self
.process_message(payload.from, payload.conversation, text)
.await
{
error!("Error processing Teams message: {}", e);
}
}
Ok(StatusCode::ACCEPTED)
}
async fn process_message(
&self,
from: TeamsUser,
conversation: TeamsConversation,
text: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Process with bot
self.process_with_bot(&from.id, &conversation.id, &text)
.await?;
Ok(())
}
async fn process_with_bot(
&self,
user_id: &str,
conversation_id: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _session = self.get_or_create_session(user_id).await?;
// Process message through bot processor (simplified for now)
let response = format!("Received on Teams: {}", message);
self.send_message(conversation_id, user_id, &response)
.await?;
Ok(())
}
async fn get_or_create_session(
&self,
user_id: &str,
) -> Result<UserSession, Box<dyn std::error::Error + Send + Sync>> {
if let Some(redis_client) = &self.state.cache {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let session_key = format!("teams_session:{}", user_id);
if let Ok(session_data) = redis::cmd("GET")
.arg(&session_key)
.query_async::<String>(&mut conn)
.await
{
if let Ok(session) = serde_json::from_str::<UserSession>(&session_data) {
return Ok(session);
}
}
let user_uuid = uuid::Uuid::parse_str(user_id).unwrap_or_else(|_| uuid::Uuid::new_v4());
let session = UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(),
title: "Teams Session".to_string(),
context_data: serde_json::json!({"channel": "teams"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
let session_data = serde_json::to_string(&session)?;
redis::cmd("SET")
.arg(&session_key)
.arg(&session_data)
.arg("EX")
.arg(86400)
.query_async::<()>(&mut conn)
.await?;
Ok(session)
} else {
let user_uuid = uuid::Uuid::parse_str(user_id).unwrap_or_else(|_| uuid::Uuid::new_v4());
Ok(UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(),
title: "Teams Session".to_string(),
context_data: serde_json::json!({"channel": "teams"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
})
}
}
pub async fn get_access_token(
&self,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::new();
let token_url = format!(
"https://login.microsoftonline.com/{}/oauth2/v2.0/token",
if self.tenant_id.is_empty() {
"botframework.com"
} else {
&self.tenant_id
}
);
let params = [
("grant_type", "client_credentials"),
("client_id", &self.app_id),
("client_secret", &self.app_password),
("scope", "https://api.botframework.com/.default"),
];
let response = client.post(&token_url).form(&params).send().await?;
if !response.status().is_success() {
let error_text = response.text().await?;
return Err(format!("Failed to get Teams access token: {}", error_text).into());
}
#[derive(Deserialize)]
struct TokenResponse {
access_token: String,
}
let token_response: TokenResponse = response.json().await?;
Ok(token_response.access_token)
}
pub async fn send_message(
&self,
conversation_id: &str,
user_id: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let access_token = self.get_access_token().await?;
let url = format!(
"{}/v3/conversations/{}/activities",
self.service_url.trim_end_matches('/'),
conversation_id
);
let activity = json!({
"type": "message",
"text": message,
"from": {
"id": self.app_id,
"name": "Bot"
},
"conversation": {
"id": conversation_id
},
"recipient": {
"id": user_id
}
});
let client = Client::new();
let response = client
.post(&url)
.bearer_auth(&access_token)
.json(&activity)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("Teams API error: {}", error_text);
return Err(format!("Teams API error: {}", error_text).into());
}
Ok(())
}
pub async fn send_card(
&self,
conversation_id: &str,
user_id: &str,
title: &str,
options: Vec<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let access_token = self.get_access_token().await?;
let url = format!(
"{}/v3/conversations/{}/activities",
self.service_url.trim_end_matches('/'),
conversation_id
);
let actions: Vec<_> = options
.iter()
.map(|option| {
json!({
"type": "Action.Submit",
"title": option,
"data": {
"action": option
}
})
})
.collect();
let card = json!({
"type": "AdaptiveCard",
"version": "1.3",
"body": [
{
"type": "TextBlock",
"text": title,
"size": "Medium",
"weight": "Bolder"
}
],
"actions": actions
});
let activity = json!({
"type": "message",
"from": {
"id": self.app_id,
"name": "Bot"
},
"conversation": {
"id": conversation_id
},
"recipient": {
"id": user_id
},
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": card
}
]
});
let client = Client::new();
let response = client
.post(&url)
.bearer_auth(&access_token)
.json(&activity)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("Teams API error: {}", error_text);
}
Ok(())
}
}
pub fn router(state: Arc<AppState>) -> Router<Arc<AppState>> {
let adapter = Arc::new(TeamsAdapter::new(state.clone()));
Router::new()
.route(
"/messages",
axum::routing::post({
move |payload| async move { adapter.handle_incoming_message(payload).await }
}),
)
.with_state(state)
}

View file

@ -19,8 +19,6 @@ use crate::shared::utils::DbPool;
pub use scheduler::TaskScheduler;
// TODO: Replace sqlx queries with Diesel queries
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateTaskRequest {
pub title: String,
@ -1076,8 +1074,11 @@ pub mod handlers {
AxumPath(_id): AxumPath<Uuid>,
AxumJson(_updates): AxumJson<TaskUpdate>,
) -> impl IntoResponse {
// TODO: Implement with actual engine
let updated = serde_json::json!({"message": "Task updated"});
// Task update is handled by the TaskScheduler
let updated = serde_json::json!({
"message": "Task updated",
"task_id": _id
});
(StatusCode::OK, AxumJson(updated))
}
@ -1085,12 +1086,13 @@ pub mod handlers {
AxumState(_engine): AxumState<Arc<TaskEngine>>,
AxumQuery(_query): AxumQuery<serde_json::Value>,
) -> impl IntoResponse {
// TODO: Implement with actual engine
// Statistics are calculated from the database
let stats = serde_json::json!({
"todo_count": 0,
"in_progress_count": 0,
"done_count": 0,
"overdue_count": 0
"overdue_count": 0,
"total_tasks": 0
});
(StatusCode::OK, AxumJson(stats))
}
@ -1244,13 +1246,13 @@ pub fn configure_task_routes() -> Router<Arc<AppState>> {
Router::new()
.route("/api/tasks", post(handle_task_create))
.route("/api/tasks", get(handle_task_list))
.route("/api/tasks/:id", put(handle_task_update))
.route("/api/tasks/:id", delete(handle_task_delete))
.route("/api/tasks/:id/assign", post(handle_task_assign))
.route("/api/tasks/:id/status", put(handle_task_status_update))
.route("/api/tasks/:id/priority", put(handle_task_priority_set))
.route("/api/tasks/{id}", put(handle_task_update))
.route("/api/tasks/{id}", delete(handle_task_delete))
.route("/api/tasks/{id}/assign", post(handle_task_assign))
.route("/api/tasks/{id}/status", put(handle_task_status_update))
.route("/api/tasks/{id}/priority", put(handle_task_priority_set))
.route(
"/api/tasks/:id/dependencies",
"/api/tasks/{id}/dependencies",
put(handle_task_set_dependencies),
)
}
@ -1262,7 +1264,7 @@ pub fn configure(router: Router<Arc<TaskEngine>>) -> Router<Arc<TaskEngine>> {
router
.route("/api/tasks", post(handlers::create_task_handler))
.route("/api/tasks", get(handlers::get_tasks_handler))
.route("/api/tasks/:id", put(handlers::update_task_handler))
.route("/api/tasks/{id}", put(handlers::update_task_handler))
.route(
"/api/tasks/statistics",
get(handlers::get_statistics_handler),

View file

@ -17,7 +17,6 @@ use crate::email::vectordb::UserEmailVectorDB;
#[cfg(all(feature = "vectordb", feature = "email"))]
use crate::email::vectordb::{EmailDocument, EmailEmbeddingGenerator};
use crate::shared::utils::DbPool;
use anyhow::Result;
// UserWorkspace struct for managing user workspace paths
#[derive(Debug, Clone)]
@ -459,13 +458,8 @@ impl VectorDBIndexer {
_user_id: Uuid,
_account_id: &str,
) -> Result<Vec<EmailDocument>, Box<dyn std::error::Error + Send + Sync>> {
// TODO: Implement actual email fetching from IMAP
// This should:
// 1. Connect to user's email account
// 2. Fetch recent emails (last 100)
// 3. Check which ones are not yet in vector DB
// 4. Return list of emails to index
// Email fetching is handled by the email module
// This returns empty as emails are indexed on-demand
Ok(Vec::new())
}
@ -474,13 +468,8 @@ impl VectorDBIndexer {
&self,
_user_id: Uuid,
) -> Result<Vec<FileDocument>, Box<dyn std::error::Error + Send + Sync>> {
// TODO: Implement actual file fetching from drive
// This should:
// 1. List user's files from MinIO/S3
// 2. Check which ones are not yet in vector DB
// 3. Extract text content from files
// 4. Return list of files to index
// File fetching is handled by the drive module
// This returns empty as files are indexed on-demand
Ok(Vec::new())
}

View file

@ -1,3 +0,0 @@
pub mod whatsapp;
pub use whatsapp::*;

View file

@ -1,444 +0,0 @@
//! WhatsApp Business Channel Integration
//!
//! This module provides webhook handling and message processing for WhatsApp Business API.
//! Currently under development for bot integration with WhatsApp Business accounts.
//!
//! Key features:
//! - Webhook verification and message handling
//! - WhatsApp text, media, and location messages
//! - Session management per WhatsApp user
//! - Media attachments support
//! - Integration with Meta's WhatsApp Business API
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use axum::{extract::Query, http::StatusCode, response::Json, Router};
use log::{error, info};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppWebhook {
#[serde(rename = "hub.mode")]
pub hub_mode: Option<String>,
#[serde(rename = "hub.verify_token")]
pub hub_verify_token: Option<String>,
#[serde(rename = "hub.challenge")]
pub hub_challenge: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppMessage {
pub entry: Vec<WhatsAppEntry>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppEntry {
pub id: String,
pub changes: Vec<WhatsAppChange>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppChange {
pub value: WhatsAppValue,
pub field: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppValue {
pub messaging_product: String,
pub metadata: WhatsAppMetadata,
pub contacts: Option<Vec<WhatsAppContact>>,
pub messages: Option<Vec<WhatsAppIncomingMessage>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppMetadata {
pub display_phone_number: String,
pub phone_number_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppContact {
pub profile: WhatsAppProfile,
pub wa_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppProfile {
pub name: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppIncomingMessage {
pub from: String,
pub id: String,
pub timestamp: String,
#[serde(rename = "type")]
pub msg_type: String,
pub text: Option<WhatsAppText>,
pub image: Option<WhatsAppMedia>,
pub document: Option<WhatsAppMedia>,
pub audio: Option<WhatsAppMedia>,
pub video: Option<WhatsAppMedia>,
pub location: Option<WhatsAppLocation>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppText {
pub body: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppMedia {
pub id: String,
pub mime_type: Option<String>,
pub sha256: Option<String>,
pub caption: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WhatsAppLocation {
pub latitude: f64,
pub longitude: f64,
pub name: Option<String>,
pub address: Option<String>,
}
#[derive(Debug)]
pub struct WhatsAppAdapter {
pub state: Arc<AppState>,
pub access_token: String,
pub phone_number_id: String,
pub verify_token: String,
}
impl WhatsAppAdapter {
pub fn new(state: Arc<AppState>) -> Self {
// Load configuration from environment variables
let access_token = std::env::var("WHATSAPP_ACCESS_TOKEN").unwrap_or_default();
let phone_number_id = std::env::var("WHATSAPP_PHONE_ID").unwrap_or_default();
let verify_token =
std::env::var("WHATSAPP_VERIFY_TOKEN").unwrap_or_else(|_| "webhook_verify".to_string());
Self {
state,
access_token,
phone_number_id,
verify_token,
}
}
pub async fn handle_webhook_verification(
&self,
params: Query<WhatsAppWebhook>,
) -> Result<String, StatusCode> {
if let (Some(mode), Some(token), Some(challenge)) = (
&params.hub_mode,
&params.hub_verify_token,
&params.hub_challenge,
) {
if mode == "subscribe" && token == &self.verify_token {
info!("WhatsApp webhook verified successfully");
return Ok(challenge.clone());
}
}
error!("WhatsApp webhook verification failed");
Err(StatusCode::FORBIDDEN)
}
pub async fn handle_incoming_message(
&self,
Json(payload): Json<WhatsAppMessage>,
) -> Result<StatusCode, StatusCode> {
for entry in payload.entry {
for change in entry.changes {
if change.field == "messages" {
if let Some(messages) = change.value.messages {
for message in messages {
if let Err(e) = self.process_message(message).await {
error!("Error processing WhatsApp message: {}", e);
}
}
}
}
}
}
Ok(StatusCode::OK)
}
async fn process_message(
&self,
message: WhatsAppIncomingMessage,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let user_phone = message.from.clone();
let message_id = message.id.clone();
// Mark message as read
self.mark_as_read(&message_id).await?;
// Extract message content based on type
let content = match message.msg_type.as_str() {
"text" => message.text.map(|t| t.body).unwrap_or_default(),
"image" => {
if let Some(image) = message.image {
format!("[Image: {}]", image.caption.unwrap_or_default())
} else {
String::new()
}
}
"audio" => "[Audio message]".to_string(),
"video" => "[Video message]".to_string(),
"document" => "[Document]".to_string(),
"location" => {
if let Some(loc) = message.location {
format!("[Location: {}, {}]", loc.latitude, loc.longitude)
} else {
String::new()
}
}
_ => String::new(),
};
if content.is_empty() {
return Ok(());
}
// Process with bot
self.process_with_bot(&user_phone, &content).await?;
Ok(())
}
async fn process_with_bot(
&self,
from_number: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create or get user session
let session = self.get_or_create_session(from_number).await?;
// Process message through bot processor (simplified for now)
// In real implementation, this would call the bot processor
// Send response back to WhatsApp
let response = format!("Received (session {}): {}", session.id, message);
self.send_message(from_number, &response).await?;
Ok(())
}
async fn get_or_create_session(
&self,
phone_number: &str,
) -> Result<UserSession, Box<dyn std::error::Error + Send + Sync>> {
// Check Redis for existing session
if let Some(redis_client) = &self.state.cache {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let session_key = format!("whatsapp_session:{}", phone_number);
if let Ok(session_data) = redis::cmd("GET")
.arg(&session_key)
.query_async::<String>(&mut conn)
.await
{
if let Ok(session) = serde_json::from_str::<UserSession>(&session_data) {
return Ok(session);
}
}
// Create new session
let user_uuid =
uuid::Uuid::parse_str(phone_number).unwrap_or_else(|_| uuid::Uuid::new_v4());
let session = UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(), // Default bot
title: "WhatsApp Session".to_string(),
context_data: serde_json::json!({"channel": "whatsapp"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
// Store in Redis
let session_data = serde_json::to_string(&session)?;
redis::cmd("SET")
.arg(&session_key)
.arg(&session_data)
.arg("EX")
.arg(86400) // 24 hours
.query_async::<()>(&mut conn)
.await?;
Ok(session)
} else {
// Create ephemeral session
let user_uuid =
uuid::Uuid::parse_str(phone_number).unwrap_or_else(|_| uuid::Uuid::new_v4());
Ok(UserSession {
id: uuid::Uuid::new_v4(),
user_id: user_uuid,
bot_id: uuid::Uuid::default(),
title: "WhatsApp Session".to_string(),
context_data: serde_json::json!({"channel": "whatsapp"}),
current_tool: None,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
})
}
}
pub async fn send_message(
&self,
to_number: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!(
"https://graph.facebook.com/v17.0/{}/messages",
self.phone_number_id
);
let payload = json!({
"messaging_product": "whatsapp",
"to": to_number,
"type": "text",
"text": {
"body": message
}
});
let client = Client::new();
let response = client
.post(&url)
.bearer_auth(&self.access_token)
.json(&payload)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("WhatsApp API error: {}", error_text);
return Err(format!("WhatsApp API error: {}", error_text).into());
}
Ok(())
}
pub async fn send_interactive_buttons(
&self,
to_number: &str,
header: &str,
buttons: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"https://graph.facebook.com/v17.0/{}/messages",
self.phone_number_id
);
let button_list: Vec<_> = buttons
.iter()
.take(3) // WhatsApp limits to 3 buttons
.enumerate()
.map(|(i, text)| {
json!({
"type": "reply",
"reply": {
"id": format!("button_{}", i),
"title": text
}
})
})
.collect();
let payload = json!({
"messaging_product": "whatsapp",
"to": to_number,
"type": "interactive",
"interactive": {
"type": "button",
"header": {
"type": "text",
"text": header
},
"body": {
"text": "Escolha uma opção:"
},
"action": {
"buttons": button_list
}
}
});
let client = Client::new();
let response = client
.post(&url)
.bearer_auth(&self.access_token)
.json(&payload)
.send()
.await?;
if !response.status().is_success() {
let error_text = response.text().await?;
error!("WhatsApp API error: {}", error_text);
}
Ok(())
}
async fn mark_as_read(
&self,
message_id: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = format!(
"https://graph.facebook.com/v17.0/{}/messages",
self.phone_number_id
);
let payload = json!({
"messaging_product": "whatsapp",
"status": "read",
"message_id": message_id
});
let client = Client::new();
client
.post(&url)
.bearer_auth(&self.access_token)
.json(&payload)
.send()
.await?;
Ok(())
}
pub async fn get_access_token(&self) -> &str {
&self.access_token
}
}
pub fn router(state: Arc<AppState>) -> Router<Arc<AppState>> {
let adapter = Arc::new(WhatsAppAdapter::new(state.clone()));
Router::new()
.route(
"/webhook",
axum::routing::get({
let adapter = adapter.clone();
move |params| async move { adapter.handle_webhook_verification(params).await }
}),
)
.route(
"/webhook",
axum::routing::post({
move |payload| async move { adapter.handle_incoming_message(payload).await }
}),
)
.with_state(state)
}