fix: use blocking HTTP for Vault email config to avoid runtime nesting
Some checks failed
BotServer CI/CD / build (push) Failing after 2s
Some checks failed
BotServer CI/CD / build (push) Failing after 2s
This commit is contained in:
parent
7a5f858d86
commit
c90c5dc039
2 changed files with 127 additions and 27 deletions
|
|
@ -235,3 +235,4 @@ bigdecimal = { workspace = true }
|
||||||
|
|
||||||
[lints]
|
[lints]
|
||||||
workspace = true
|
workspace = true
|
||||||
|
ureq = { version = "2", features = ["json"] }
|
||||||
|
|
|
||||||
|
|
@ -868,28 +868,12 @@ impl SecretsManager {
|
||||||
pub fn get_email_config_for_bot_sync(&self, bot_id: &Uuid) -> (String, u16, String, String, String) {
|
pub fn get_email_config_for_bot_sync(&self, bot_id: &Uuid) -> (String, u16, String, String, String) {
|
||||||
let bot_path = format!("gbo/bots/{}/email", bot_id);
|
let bot_path = format!("gbo/bots/{}/email", bot_id);
|
||||||
let default_path = "gbo/bots/default/email".to_string();
|
let default_path = "gbo/bots/default/email".to_string();
|
||||||
let self_owned = self.clone();
|
|
||||||
|
|
||||||
if let Ok(handle) = tokio::runtime::Handle::try_current() {
|
let paths = vec![bot_path, default_path, SecretPaths::EMAIL.to_string()];
|
||||||
let result = handle.block_on(async move {
|
|
||||||
if let Ok(s) = self_owned.get_secret(&bot_path).await {
|
for path in paths {
|
||||||
if !s.is_empty() && s.contains_key("smtp_from") {
|
if let Ok(secrets) = self.get_secret_blocking(&path) {
|
||||||
return Some(s);
|
if !secrets.is_empty() && secrets.contains_key("smtp_from") {
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Ok(s) = self_owned.get_secret(&default_path).await {
|
|
||||||
if !s.is_empty() && s.contains_key("smtp_from") {
|
|
||||||
return Some(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Ok(s) = self_owned.get_secret(SecretPaths::EMAIL).await {
|
|
||||||
if !s.is_empty() && s.contains_key("smtp_from") {
|
|
||||||
return Some(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
});
|
|
||||||
if let Some(secrets) = result {
|
|
||||||
return (
|
return (
|
||||||
secrets.get("smtp_host").cloned().unwrap_or_default(),
|
secrets.get("smtp_host").cloned().unwrap_or_default(),
|
||||||
secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587),
|
secrets.get("smtp_port").and_then(|p| p.parse().ok()).unwrap_or(587),
|
||||||
|
|
@ -899,9 +883,124 @@ impl SecretsManager {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
(String::new(), 587, String::new(), String::new(), String::new())
|
(String::new(), 587, String::new(), String::new(), String::new())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_secret_blocking(&self, path: &str) -> Result<HashMap<String, String>> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Self::get_from_env(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(cached) = self.get_cached_sync(path) {
|
||||||
|
return Ok(cached);
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = self
|
||||||
|
.client
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| anyhow!("No Vault client"))?;
|
||||||
|
|
||||||
|
let url = format!("{}/v1/secret/data/{}", self.addr, path);
|
||||||
|
let resp = ureq::get(&url)
|
||||||
|
.set("X-Vault-Token", &self.token)
|
||||||
|
.call()
|
||||||
|
.map_err(|e| anyhow!("Vault HTTP error: {}", e))?;
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.into_json()
|
||||||
|
.map_err(|e| anyhow!("Vault JSON parse error: {}", e))?;
|
||||||
|
|
||||||
|
if let Some(data) = body.get("data").and_then(|d| d.get("data")) {
|
||||||
|
if let Some(map) = data.as_object() {
|
||||||
|
let result: HashMap<String, String> = map.iter()
|
||||||
|
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.cache_secret_sync(path, result.clone());
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::get_from_env(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_cached_sync(&self, path: &str) -> Option<HashMap<String, String>> {
|
||||||
|
let cache = self.cache.read().ok()?;
|
||||||
|
let entry = cache.get(path)?;
|
||||||
|
if entry.1.elapsed() < self.cache_ttl {
|
||||||
|
Some(entry.0.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_secret_sync(&self, path: &str, data: HashMap<String, String>) {
|
||||||
|
if self.cache_ttl > 0 {
|
||||||
|
if let Ok(mut cache) = self.cache.write() {
|
||||||
|
cache.insert(path.to_string(), (data, std::time::Instant::now()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(String::new(), 587, String::new(), String::new(), String::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_secret_blocking(&self, path: &str) -> Result<HashMap<String, String>> {
|
||||||
|
if !self.enabled {
|
||||||
|
return Self::get_from_env(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(cached) = self.get_cached_sync(path) {
|
||||||
|
return Ok(cached);
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = self
|
||||||
|
.client
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| anyhow!("No Vault client"))?;
|
||||||
|
|
||||||
|
let url = format!("{}/v1/secret/data/{}", self.addr, path);
|
||||||
|
let resp = ureq::get(&url)
|
||||||
|
.set("X-Vault-Token", &self.token)
|
||||||
|
.call()
|
||||||
|
.map_err(|e| anyhow!("Vault HTTP error: {}", e))?;
|
||||||
|
|
||||||
|
let body: serde_json::Value = resp.into_json()
|
||||||
|
.map_err(|e| anyhow!("Vault JSON parse error: {}", e))?;
|
||||||
|
|
||||||
|
if let Some(data) = body.get("data").and_then(|d| d.get("data")) {
|
||||||
|
if let Some(map) = data.as_object() {
|
||||||
|
let result: HashMap<String, String> = map.iter()
|
||||||
|
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.cache_secret_sync(path, result.clone());
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Self::get_from_env(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_cached_sync(&self, path: &str) -> Option<HashMap<String, String>> {
|
||||||
|
let cache = self.cache.read().ok()?;
|
||||||
|
let entry = cache.get(path)?;
|
||||||
|
if entry.1.elapsed() < self.cache_ttl {
|
||||||
|
Some(entry.0.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_secret_sync(&self, path: &str, data: HashMap<String, String>) {
|
||||||
|
if self.cache_ttl > 0 {
|
||||||
|
if let Ok(mut cache) = self.cache.write() {
|
||||||
|
cache.insert(path.to_string(), (data, std::time::Instant::now()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ============ TENANT-AWARE METHODS (org_id -> tenant -> secrets) ============
|
// ============ TENANT-AWARE METHODS (org_id -> tenant -> secrets) ============
|
||||||
|
|
||||||
/// Get database config for an organization (resolves tenant from org, then gets infra)
|
/// Get database config for an organization (resolves tenant from org, then gets infra)
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue