From ab1f2df476368297fa3c902a173086501b09dceb Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Tue, 17 Mar 2026 00:00:36 -0300 Subject: [PATCH] Read Drive config from Vault at runtime with fallback defaults --- src/basic/keywords/hearing/syntax.rs | 4 +- src/core/bot/mod.rs | 2 +- src/core/config/mod.rs | 40 ++++++- src/core/secrets/mod.rs | 166 ++++++++++++++++++++++++++- src/core/shared/state.rs | 1 + src/email/integration.rs | 2 +- src/llm/observability.rs | 14 +-- 7 files changed, 207 insertions(+), 22 deletions(-) diff --git a/src/basic/keywords/hearing/syntax.rs b/src/basic/keywords/hearing/syntax.rs index 8ddabbde..227acefc 100644 --- a/src/basic/keywords/hearing/syntax.rs +++ b/src/basic/keywords/hearing/syntax.rs @@ -28,7 +28,7 @@ fn hear_block(state: &Arc, session_id: uuid::Uuid, variable_name: &str // Mark session as waiting and store metadata in Redis (for UI hints like menus) let state_clone = Arc::clone(state); let var = variable_name.to_string(); - let _ = tokio::runtime::Handle::current().block_on(async move { + tokio::runtime::Handle::current().block_on(async move { { let mut sm = state_clone.session_manager.lock().await; sm.mark_waiting(session_id); @@ -207,7 +207,7 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E // Store suggestions in Redis for UI let state_for_suggestions = Arc::clone(&state_clone); let opts_clone = options.clone(); - let _ = tokio::runtime::Handle::current().block_on(async move { + tokio::runtime::Handle::current().block_on(async move { if let Some(redis) = &state_for_suggestions.cache { if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { let key = format!("suggestions:{session_id}:{session_id}"); diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 4527a61a..b3be621e 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -450,7 +450,7 @@ impl BotOrchestrator { // If a HEAR is blocking the script thread for this session, deliver the input // directly and return — the script continues from where it paused. - if crate::basic::keywords::hearing::syntax::deliver_hear_input( + if crate::basic::keywords::hearing::deliver_hear_input( &self.state, session_id, message_content.clone(), diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index b842eaf4..e26480ce 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -9,6 +9,7 @@ pub use model_routing_config::{ModelRoutingConfig, RoutingStrategy, TaskType}; pub use sse_config::SseConfig; pub use user_memory_config::UserMemoryConfig; +use crate::core::secrets::SecretsManager; use crate::core::shared::utils::DbPool; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; @@ -277,10 +278,24 @@ impl AppConfig { .and_then(|v| v.parse().ok()) .unwrap_or(default) }; + + // Read from Vault with fallback to defaults + let secrets = SecretsManager::from_env().ok(); + let (drive_server, drive_access, drive_secret) = secrets + .as_ref() + .map(|s| s.get_drive_config()) + .unwrap_or_else(|| { + ( + crate::core::urls::InternalUrls::DRIVE.to_string(), + "minioadmin".to_string(), + "minioadmin".to_string(), + ) + }); + let drive = DriveConfig { - server: crate::core::urls::InternalUrls::DRIVE.to_string(), - access_key: String::new(), - secret_key: String::new(), + server: drive_server, + access_key: drive_access, + secret_key: drive_secret, }; let email = EmailConfig { server: get_str("EMAIL_IMAP_SERVER", "imap.gmail.com"), @@ -315,10 +330,23 @@ impl AppConfig { }) } pub fn from_env() -> Result { + // Read from Vault with fallback to defaults + let secrets = SecretsManager::from_env().ok(); + let (drive_server, drive_access, drive_secret) = secrets + .as_ref() + .map(|s| s.get_drive_config()) + .unwrap_or_else(|| { + ( + crate::core::urls::InternalUrls::DRIVE.to_string(), + "minioadmin".to_string(), + "minioadmin".to_string(), + ) + }); + let minio = DriveConfig { - server: crate::core::urls::InternalUrls::DRIVE.to_string(), - access_key: String::new(), - secret_key: String::new(), + server: drive_server, + access_key: drive_access, + secret_key: drive_secret, }; let email = EmailConfig { server: "imap.gmail.com".to_string(), diff --git a/src/core/secrets/mod.rs b/src/core/secrets/mod.rs index c24e7f51..d28c705f 100644 --- a/src/core/secrets/mod.rs +++ b/src/core/secrets/mod.rs @@ -197,6 +197,54 @@ impl SecretsManager { .ok_or_else(|| anyhow!("Key '{}' not found in '{}'", key, path)) } + pub fn get_value_blocking(&self, path: &str, key: &str, default: &str) -> String { + // Try to get synchronously using blocking call + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(path)) { + if let Some(value) = secrets.get(key) { + return value.clone(); + } + } + } + // Fallback to env defaults + if let Ok(secrets) = Self::get_from_env(path) { + if let Some(value) = secrets.get(key) { + return value.clone(); + } + } + default.to_string() + } + + pub fn get_drive_config(&self) -> (String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::DRIVE)) { + return ( + secrets.get("host").cloned().unwrap_or_else(|| "localhost:9100".into()), + secrets.get("accesskey").cloned().unwrap_or_else(|| "minioadmin".into()), + secrets.get("secret").cloned().unwrap_or_else(|| "minioadmin".into()), + ); + } + } + // Fallback + ("localhost:9100".to_string(), "minioadmin".to_string(), "minioadmin".to_string()) + } + + pub fn get_database_config_sync(&self) -> (String, u16, String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::TABLES)) { + return ( + secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), + secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(5432), + secrets.get("database").cloned().unwrap_or_else(|| "botserver".into()), + secrets.get("username").cloned().unwrap_or_else(|| "gbuser".into()), + secrets.get("password").cloned().unwrap_or_default(), + ); + } + } + // Fallback + ("localhost".to_string(), 5432, "botserver".to_string(), "gbuser".to_string(), "changeme".to_string()) + } + pub async fn get_drive_credentials(&self) -> Result<(String, String)> { let s = self.get_secret(SecretPaths::DRIVE).await?; Ok(( @@ -297,8 +345,122 @@ impl SecretsManager { self.get_value(SecretPaths::ENCRYPTION, "master_key").await } - pub async fn get_jwt_secret(&self) -> Result { - self.get_value(SecretPaths::JWT, "secret").await + pub fn get_cache_config(&self) -> (String, u16, Option) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::CACHE)) { + return ( + secrets.get("host").cloned().unwrap_or_else(|| "localhost".into()), + secrets.get("port").and_then(|p| p.parse().ok()).unwrap_or(6379), + secrets.get("password").cloned(), + ); + } + } + ("localhost".to_string(), 6379, None) + } + + pub fn get_directory_config_sync(&self) -> (String, String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::DIRECTORY)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("project_id").cloned().unwrap_or_default(), + secrets.get("client_id").cloned().unwrap_or_default(), + secrets.get("client_secret").cloned().unwrap_or_default(), + ); + } + } + ("http://localhost:9000".to_string(), String::new(), String::new(), String::new()) + } + + pub fn get_email_config(&self) -> (String, u16, String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::EMAIL)) { + return ( + secrets.get("smtp_host").cloned().unwrap_or_else(|| "smtp.gmail.com".into()), + secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587), + secrets.get("smtp_user").cloned().unwrap_or_default(), + secrets.get("smtp_password").cloned().unwrap_or_default(), + secrets.get("smtp_from").cloned().unwrap_or_default(), + ); + } + } + ("smtp.gmail.com".to_string(), 587, String::new(), String::new(), String::new()) + } + + pub fn get_llm_config(&self) -> (String, String, Option, Option, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::LLM)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8081".into()), + secrets.get("model").cloned().unwrap_or_else(|| "gpt-4".into()), + secrets.get("openai_key").cloned(), + secrets.get("anthropic_key").cloned(), + secrets.get("ollama_url").cloned().unwrap_or_else(|| "http://localhost:11434".into()), + ); + } + } + ("http://localhost:8081".to_string(), "gpt-4".to_string(), None, None, "http://localhost:11434".to_string()) + } + + pub fn get_meet_config(&self) -> (String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::MEET)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:7880".into()), + secrets.get("app_id").cloned().unwrap_or_default(), + secrets.get("app_secret").cloned().unwrap_or_default(), + ); + } + } + ("http://localhost:7880".to_string(), String::new(), String::new()) + } + + pub fn get_vectordb_config_sync(&self) -> (String, Option) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::VECTORDB)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "https://localhost:6334".into()), + secrets.get("api_key").cloned(), + ); + } + } + ("https://localhost:6334".to_string(), None) + } + + pub fn get_observability_config_sync(&self) -> (String, String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::OBSERVABILITY)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:8086".into()), + secrets.get("org").cloned().unwrap_or_else(|| "system".into()), + secrets.get("bucket").cloned().unwrap_or_else(|| "metrics".into()), + secrets.get("token").cloned().unwrap_or_default(), + ); + } + } + ("http://localhost:8086".to_string(), "system".to_string(), "metrics".to_string(), String::new()) + } + + pub fn get_alm_config(&self) -> (String, String, String) { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::ALM)) { + return ( + secrets.get("url").cloned().unwrap_or_else(|| "http://localhost:9000".into()), + secrets.get("token").cloned().unwrap_or_default(), + secrets.get("default_org").cloned().unwrap_or_default(), + ); + } + } + ("http://localhost:9000".to_string(), String::new(), String::new()) + } + + pub fn get_jwt_secret_sync(&self) -> String { + if let Ok(runtime) = tokio::runtime::Handle::try_current() { + if let Ok(secrets) = runtime.block_on(self.get_secret(SecretPaths::JWT)) { + return secrets.get("secret").cloned().unwrap_or_default(); + } + } + String::new() } pub async fn put_secret(&self, path: &str, data: HashMap) -> Result<()> { diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index a6e6534c..2fcfd005 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -426,6 +426,7 @@ impl Clone for AppState { kb_manager: self.kb_manager.clone(), channels: Arc::clone(&self.channels), response_channels: Arc::clone(&self.response_channels), + hear_channels: Arc::clone(&self.hear_channels), web_adapter: Arc::clone(&self.web_adapter), voice_adapter: Arc::clone(&self.voice_adapter), #[cfg(feature = "tasks")] diff --git a/src/email/integration.rs b/src/email/integration.rs index 88c0c857..817a6309 100644 --- a/src/email/integration.rs +++ b/src/email/integration.rs @@ -59,7 +59,7 @@ pub async fn extract_lead_from_email( .split('@') .nth(1) .and_then(|d| d.split('.').next()) - .map(|c| capitalize(c)); + .map(capitalize); Ok(Json(LeadExtractionResponse { first_name, diff --git a/src/llm/observability.rs b/src/llm/observability.rs index b10e75de..d57d78d7 100644 --- a/src/llm/observability.rs +++ b/src/llm/observability.rs @@ -45,7 +45,9 @@ pub struct LLMRequestMetrics { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "lowercase")] +#[derive(Default)] pub enum RequestType { + #[default] Chat, Completion, Embedding, @@ -56,11 +58,6 @@ pub enum RequestType { AudioGeneration, } -impl Default for RequestType { - fn default() -> Self { - Self::Chat - } -} impl std::fmt::Display for RequestType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -225,17 +222,14 @@ pub enum TraceEventType { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] +#[derive(Default)] pub enum TraceStatus { Ok, Error, + #[default] InProgress, } -impl Default for TraceStatus { - fn default() -> Self { - Self::InProgress - } -} #[derive(Debug, Clone)] pub struct ObservabilityConfig {