feat(s3): add timeout handling and clean up drive config

- Added 30-second timeout for S3 bucket listing operations in DriveMonitor
- Removed unused `use_ssl` flag from DriveConfig and cleaned up imports
- Improved error handling with proper logging for timeout scenarios
- Fixed syntax in AppConfig initialization (added missing commas)
- Added proper spacing between methods in BootstrapManager
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-11 10:34:06 -03:00
parent fd45f4e0dd
commit fd73d207cc
6 changed files with 88 additions and 72 deletions

View file

@ -190,6 +190,7 @@ impl BootstrapManager {
} }
} }
} }
async fn create_s3_operator(config: &AppConfig) -> Client { async fn create_s3_operator(config: &AppConfig) -> Client {
let endpoint = if !config.drive.server.ends_with('/') { let endpoint = if !config.drive.server.ends_with('/') {
format!("{}/", config.drive.server) format!("{}/", config.drive.server)
@ -213,6 +214,7 @@ impl BootstrapManager {
.build(); .build();
aws_sdk_s3::Client::from_conf(s3_config) aws_sdk_s3::Client::from_conf(s3_config)
} }
pub async fn upload_templates_to_drive(&self, _config: &AppConfig) -> Result<()> { pub async fn upload_templates_to_drive(&self, _config: &AppConfig) -> Result<()> {
let mut conn = establish_pg_connection()?; let mut conn = establish_pg_connection()?;
self.create_bots_from_templates(&mut conn)?; self.create_bots_from_templates(&mut conn)?;

View file

@ -1,9 +1,7 @@
use crate::shared::utils::{ DbPool}; use crate::shared::utils::DbPool;
use diesel::prelude::*; use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::r2d2::{ConnectionManager, PooledConnection};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone)] #[derive(Clone)]
pub struct AppConfig { pub struct AppConfig {
@ -25,7 +23,6 @@ pub struct DriveConfig {
pub server: String, pub server: String,
pub access_key: String, pub access_key: String,
pub secret_key: String, pub secret_key: String,
pub use_ssl: bool,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct ServerConfig { pub struct ServerConfig {
@ -92,7 +89,7 @@ impl AppConfig {
.and_then(|v| v.3.parse().ok()) .and_then(|v| v.3.parse().ok())
.unwrap_or(default) .unwrap_or(default)
}; };
let get_bool = |key: &str, default: bool| -> bool { let _get_bool = |key: &str, default: bool| -> bool {
config_map config_map
.get(key) .get(key)
.map(|v| v.3.to_lowercase() == "true") .map(|v| v.3.to_lowercase() == "true")
@ -120,18 +117,11 @@ impl AppConfig {
Err(_) => get_str("TABLES_DATABASE", "botserver"), Err(_) => get_str("TABLES_DATABASE", "botserver"),
}, },
}; };
let drive = DriveConfig { let drive = DriveConfig {
server: { server: std::env::var("DRIVE_SERVER").unwrap(),
let server = get_str("DRIVE_SERVER", "http://localhost:9000"); access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(),
if !server.starts_with("http://") && !server.starts_with("https://") { secret_key: std::env::var("DRIVE_SECRET").unwrap(),
format!("http://{}", server)
} else {
server
}
},
access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"),
secret_key: get_str("DRIVE_SECRET", "minioadmin"),
use_ssl: get_bool("DRIVE_USE_SSL", false),
}; };
Ok(AppConfig { Ok(AppConfig {
drive, drive,
@ -159,11 +149,9 @@ impl AppConfig {
database: db_name, database: db_name,
}; };
let minio = DriveConfig { let minio = DriveConfig {
server: std::env::var("DRIVE_SERVER") server: std::env::var("DRIVE_SERVER").unwrap(),
.unwrap(); access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(),
access_key: std::env::var("DRIVE_ACCESSKEY") secret_key: std::env::var("DRIVE_SECRET").unwrap(),
.unwrap();
secret_key: std::env::var("DRIVE_SECRET").unwrap_or_else(|_| "minioadmin".to_string()),
}; };
Ok(AppConfig { Ok(AppConfig {
drive: minio, drive: minio,

View file

@ -64,12 +64,21 @@ impl DriveMonitor {
let mut continuation_token = None; let mut continuation_token = None;
loop { loop {
let list_objects = client let list_objects = match tokio::time::timeout(
.list_objects_v2() Duration::from_secs(30),
.bucket(&self.bucket_name.to_lowercase()) client
.set_continuation_token(continuation_token) .list_objects_v2()
.send() .bucket(&self.bucket_name.to_lowercase())
.await?; .set_continuation_token(continuation_token)
.send()
).await {
Ok(Ok(list)) => list,
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
log::error!("Timeout listing objects in bucket {}", self.bucket_name);
return Ok(());
}
};
for obj in list_objects.contents.unwrap_or_default() { for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string(); let path = obj.key().unwrap_or_default().to_string();
@ -136,12 +145,21 @@ impl DriveMonitor {
let mut continuation_token = None; let mut continuation_token = None;
loop { loop {
let list_objects = client let list_objects = match tokio::time::timeout(
.list_objects_v2() Duration::from_secs(30),
.bucket(&self.bucket_name.to_lowercase()) client
.set_continuation_token(continuation_token) .list_objects_v2()
.send() .bucket(&self.bucket_name.to_lowercase())
.await?; .set_continuation_token(continuation_token)
.send()
).await {
Ok(Ok(list)) => list,
Ok(Err(e)) => return Err(e.into()),
Err(_) => {
log::error!("Timeout listing objects in bucket {}", self.bucket_name);
return Ok(());
}
};
for obj in list_objects.contents.unwrap_or_default() { for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string(); let path = obj.key().unwrap_or_default().to_string();

View file

@ -1,14 +1,11 @@
use crate::config::DriveConfig;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use actix_multipart::Multipart; use actix_multipart::Multipart;
use actix_web::web; use actix_web::web;
use actix_web::{post, HttpResponse}; use actix_web::{post, HttpResponse};
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder}; use aws_sdk_s3::Client;
use aws_config::BehaviorVersion;
use std::io::Write; use std::io::Write;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt; use tokio_stream::StreamExt as TokioStreamExt;
// Removed unused import
#[post("/files/upload/{folder_path}")] #[post("/files/upload/{folder_path}")]
pub async fn upload_file( pub async fn upload_file(
@ -66,37 +63,8 @@ pub async fn upload_file(
} }
} }
pub async fn init_drive(config: &DriveConfig) -> Result<S3Client, Box<dyn std::error::Error>> {
let endpoint = if !config.server.ends_with('/') {
format!("{}/", config.server)
} else {
config.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.access_key.clone(),
config.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
Ok(S3Client::from_conf(s3_config))
}
async fn upload_to_s3( async fn upload_to_s3(
client: &S3Client, client: &Client,
bucket: &str, bucket: &str,
key: &str, key: &str,
file_path: &std::path::Path, file_path: &std::path::Path,

View file

@ -39,8 +39,9 @@ use crate::config::AppConfig;
use crate::email::{ use crate::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email, get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
}; };
use crate::file::{init_drive, upload_file}; use crate::file::upload_file;
use crate::meet::{voice_start, voice_stop}; use crate::meet::{voice_start, voice_stop};
use crate::shared::utils::create_s3_operator;
use crate::package_manager::InstallMode; use crate::package_manager::InstallMode;
use crate::session::{create_session, get_session_history, get_sessions, start_session}; use crate::session::{create_session, get_session_history, get_sessions, start_session};
use crate::shared::state::AppState; use crate::shared::state::AppState;
@ -231,7 +232,7 @@ async fn main() -> std::io::Result<()> {
}; };
let web_adapter = Arc::new(WebChannelAdapter::new()); let web_adapter = Arc::new(WebChannelAdapter::new());
let voice_adapter = Arc::new(VoiceAdapter::new()); let voice_adapter = Arc::new(VoiceAdapter::new());
let drive = init_drive(&config.drive) let drive = create_s3_operator(&config.drive)
.await .await
.expect("Failed to initialize Drive"); .expect("Failed to initialize Drive");
let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new( let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new(

View file

@ -13,6 +13,39 @@ use smartstring::SmartString;
use std::error::Error; use std::error::Error;
use tokio::fs::File as TokioFile; use tokio::fs::File as TokioFile;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use aws_config::BehaviorVersion;
use crate::config::DriveConfig;
pub async fn create_s3_operator(config: &DriveConfig) -> Result<S3Client, Box<dyn std::error::Error>> {
let endpoint = if !config.server.ends_with('/') {
format!("{}/", config.server)
} else {
config.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.access_key.clone(),
config.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
Ok(S3Client::from_conf(s3_config))
}
pub fn json_value_to_dynamic(value: &Value) -> Dynamic { pub fn json_value_to_dynamic(value: &Value) -> Dynamic {
match value { match value {
Value::Null => Dynamic::UNIT, Value::Null => Dynamic::UNIT,
@ -39,6 +72,7 @@ pub fn json_value_to_dynamic(value: &Value) -> Dynamic {
), ),
} }
} }
pub fn to_array(value: Dynamic) -> Array { pub fn to_array(value: Dynamic) -> Array {
if value.is_array() { if value.is_array() {
value.cast::<Array>() value.cast::<Array>()
@ -48,6 +82,7 @@ pub fn to_array(value: Dynamic) -> Array {
Array::from([value]) Array::from([value])
} }
} }
pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::Error> { pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::Error> {
let url = url.to_string(); let url = url.to_string();
let output_path = output_path.to_string(); let output_path = output_path.to_string();
@ -60,9 +95,9 @@ pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::E
let total_size = response.content_length().unwrap_or(0); let total_size = response.content_length().unwrap_or(0);
let pb = ProgressBar::new(total_size); let pb = ProgressBar::new(total_size);
pb.set_style(ProgressStyle::default_bar() pb.set_style(ProgressStyle::default_bar()
.template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})") .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap() .unwrap()
.progress_chars("#>-")); .progress_chars("#>-"));
pb.set_message(format!("Downloading {}", url)); pb.set_message(format!("Downloading {}", url));
let mut file = TokioFile::create(&output_path).await?; let mut file = TokioFile::create(&output_path).await?;
let mut downloaded: u64 = 0; let mut downloaded: u64 = 0;
@ -81,6 +116,7 @@ pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::E
}); });
download_handle.await? download_handle.await?
} }
pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn Error>> { pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn Error>> {
let parts: Vec<&str> = filter_str.split('=').collect(); let parts: Vec<&str> = filter_str.split('=').collect();
if parts.len() != 2 { if parts.len() != 2 {
@ -96,15 +132,18 @@ pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn E
} }
Ok((format!("{} = $1", column), vec![value.to_string()])) Ok((format!("{} = $1", column), vec![value.to_string()]))
} }
pub fn estimate_token_count(text: &str) -> usize { pub fn estimate_token_count(text: &str) -> usize {
let char_count = text.chars().count(); let char_count = text.chars().count();
(char_count / 4).max(1) (char_count / 4).max(1)
} }
pub fn establish_pg_connection() -> Result<PgConnection> { pub fn establish_pg_connection() -> Result<PgConnection> {
let database_url = std::env::var("DATABASE_URL").unwrap(); let database_url = std::env::var("DATABASE_URL").unwrap();
PgConnection::establish(&database_url) PgConnection::establish(&database_url)
.with_context(|| format!("Failed to connect to database at {}", database_url)) .with_context(|| format!("Failed to connect to database at {}", database_url))
} }
pub type DbPool = Pool<ConnectionManager<PgConnection>>; pub type DbPool = Pool<ConnectionManager<PgConnection>>;
pub fn create_conn() -> Result<DbPool, r2d2::Error> { pub fn create_conn() -> Result<DbPool, r2d2::Error> {
let database_url = std::env::var("DATABASE_URL") let database_url = std::env::var("DATABASE_URL")