diff --git a/src/bootstrap/mod.rs b/src/bootstrap/mod.rs index 3f20a813..1bb8033c 100644 --- a/src/bootstrap/mod.rs +++ b/src/bootstrap/mod.rs @@ -190,6 +190,7 @@ impl BootstrapManager { } } } + async fn create_s3_operator(config: &AppConfig) -> Client { let endpoint = if !config.drive.server.ends_with('/') { format!("{}/", config.drive.server) @@ -213,6 +214,7 @@ impl BootstrapManager { .build(); aws_sdk_s3::Client::from_conf(s3_config) } + pub async fn upload_templates_to_drive(&self, _config: &AppConfig) -> Result<()> { let mut conn = establish_pg_connection()?; self.create_bots_from_templates(&mut conn)?; diff --git a/src/config/mod.rs b/src/config/mod.rs index 7850d01d..4f8beb10 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,9 +1,7 @@ -use crate::shared::utils::{ DbPool}; +use crate::shared::utils::DbPool; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; use std::collections::HashMap; -use std::fs::OpenOptions; -use std::io::Write; use uuid::Uuid; #[derive(Clone)] pub struct AppConfig { @@ -25,7 +23,6 @@ pub struct DriveConfig { pub server: String, pub access_key: String, pub secret_key: String, - pub use_ssl: bool, } #[derive(Clone)] pub struct ServerConfig { @@ -92,7 +89,7 @@ impl AppConfig { .and_then(|v| v.3.parse().ok()) .unwrap_or(default) }; - let get_bool = |key: &str, default: bool| -> bool { + let _get_bool = |key: &str, default: bool| -> bool { config_map .get(key) .map(|v| v.3.to_lowercase() == "true") @@ -120,18 +117,11 @@ impl AppConfig { Err(_) => get_str("TABLES_DATABASE", "botserver"), }, }; + let drive = DriveConfig { - server: { - let server = get_str("DRIVE_SERVER", "http://localhost:9000"); - if !server.starts_with("http://") && !server.starts_with("https://") { - format!("http://{}", server) - } else { - server - } - }, - access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"), - secret_key: get_str("DRIVE_SECRET", "minioadmin"), - use_ssl: get_bool("DRIVE_USE_SSL", false), + server: std::env::var("DRIVE_SERVER").unwrap(), + access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(), + secret_key: std::env::var("DRIVE_SECRET").unwrap(), }; Ok(AppConfig { drive, @@ -159,11 +149,9 @@ impl AppConfig { database: db_name, }; let minio = DriveConfig { - server: std::env::var("DRIVE_SERVER") - .unwrap(); - access_key: std::env::var("DRIVE_ACCESSKEY") - .unwrap(); - secret_key: std::env::var("DRIVE_SECRET").unwrap_or_else(|_| "minioadmin".to_string()), + server: std::env::var("DRIVE_SERVER").unwrap(), + access_key: std::env::var("DRIVE_ACCESSKEY").unwrap(), + secret_key: std::env::var("DRIVE_SECRET").unwrap(), }; Ok(AppConfig { drive: minio, diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 1f3a7237..1bee58b9 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -64,12 +64,21 @@ impl DriveMonitor { let mut continuation_token = None; loop { - let list_objects = client - .list_objects_v2() - .bucket(&self.bucket_name.to_lowercase()) - .set_continuation_token(continuation_token) - .send() - .await?; + let list_objects = match tokio::time::timeout( + Duration::from_secs(30), + client + .list_objects_v2() + .bucket(&self.bucket_name.to_lowercase()) + .set_continuation_token(continuation_token) + .send() + ).await { + Ok(Ok(list)) => list, + Ok(Err(e)) => return Err(e.into()), + Err(_) => { + log::error!("Timeout listing objects in bucket {}", self.bucket_name); + return Ok(()); + } + }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); @@ -136,12 +145,21 @@ impl DriveMonitor { let mut continuation_token = None; loop { - let list_objects = client - .list_objects_v2() - .bucket(&self.bucket_name.to_lowercase()) - .set_continuation_token(continuation_token) - .send() - .await?; + let list_objects = match tokio::time::timeout( + Duration::from_secs(30), + client + .list_objects_v2() + .bucket(&self.bucket_name.to_lowercase()) + .set_continuation_token(continuation_token) + .send() + ).await { + Ok(Ok(list)) => list, + Ok(Err(e)) => return Err(e.into()), + Err(_) => { + log::error!("Timeout listing objects in bucket {}", self.bucket_name); + return Ok(()); + } + }; for obj in list_objects.contents.unwrap_or_default() { let path = obj.key().unwrap_or_default().to_string(); diff --git a/src/file/mod.rs b/src/file/mod.rs index f0aa4c24..52249d07 100644 --- a/src/file/mod.rs +++ b/src/file/mod.rs @@ -1,14 +1,11 @@ -use crate::config::DriveConfig; use crate::shared::state::AppState; use actix_multipart::Multipart; use actix_web::web; use actix_web::{post, HttpResponse}; -use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder}; -use aws_config::BehaviorVersion; +use aws_sdk_s3::Client; use std::io::Write; use tempfile::NamedTempFile; use tokio_stream::StreamExt as TokioStreamExt; -// Removed unused import #[post("/files/upload/{folder_path}")] pub async fn upload_file( @@ -66,37 +63,8 @@ pub async fn upload_file( } } -pub async fn init_drive(config: &DriveConfig) -> Result> { - let endpoint = if !config.server.ends_with('/') { - format!("{}/", config.server) - } else { - config.server.clone() - }; - - let base_config = aws_config::defaults(BehaviorVersion::latest()) - .endpoint_url(endpoint) - .region("auto") - .credentials_provider( - aws_sdk_s3::config::Credentials::new( - config.access_key.clone(), - config.secret_key.clone(), - None, - None, - "static", - ) - ) - .load() - .await; - - let s3_config = S3ConfigBuilder::from(&base_config) - .force_path_style(true) - .build(); - - Ok(S3Client::from_conf(s3_config)) -} - async fn upload_to_s3( - client: &S3Client, + client: &Client, bucket: &str, key: &str, file_path: &std::path::Path, diff --git a/src/main.rs b/src/main.rs index 0481ae0f..58b8c28a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,8 +39,9 @@ use crate::config::AppConfig; use crate::email::{ get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email, }; -use crate::file::{init_drive, upload_file}; +use crate::file::upload_file; use crate::meet::{voice_start, voice_stop}; +use crate::shared::utils::create_s3_operator; use crate::package_manager::InstallMode; use crate::session::{create_session, get_session_history, get_sessions, start_session}; use crate::shared::state::AppState; @@ -231,7 +232,7 @@ async fn main() -> std::io::Result<()> { }; let web_adapter = Arc::new(WebChannelAdapter::new()); let voice_adapter = Arc::new(VoiceAdapter::new()); - let drive = init_drive(&config.drive) + let drive = create_s3_operator(&config.drive) .await .expect("Failed to initialize Drive"); let session_manager = Arc::new(tokio::sync::Mutex::new(session::SessionManager::new( diff --git a/src/shared/utils.rs b/src/shared/utils.rs index 72edf372..b13d1581 100644 --- a/src/shared/utils.rs +++ b/src/shared/utils.rs @@ -13,6 +13,39 @@ use smartstring::SmartString; use std::error::Error; use tokio::fs::File as TokioFile; use tokio::io::AsyncWriteExt; +use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder}; +use aws_config::BehaviorVersion; +use crate::config::DriveConfig; + +pub async fn create_s3_operator(config: &DriveConfig) -> Result> { + let endpoint = if !config.server.ends_with('/') { + format!("{}/", config.server) + } else { + config.server.clone() + }; + + let base_config = aws_config::defaults(BehaviorVersion::latest()) + .endpoint_url(endpoint) + .region("auto") + .credentials_provider( + aws_sdk_s3::config::Credentials::new( + config.access_key.clone(), + config.secret_key.clone(), + None, + None, + "static", + ) + ) + .load() + .await; + + let s3_config = S3ConfigBuilder::from(&base_config) + .force_path_style(true) + .build(); + + Ok(S3Client::from_conf(s3_config)) +} + pub fn json_value_to_dynamic(value: &Value) -> Dynamic { match value { Value::Null => Dynamic::UNIT, @@ -39,6 +72,7 @@ pub fn json_value_to_dynamic(value: &Value) -> Dynamic { ), } } + pub fn to_array(value: Dynamic) -> Array { if value.is_array() { value.cast::() @@ -48,6 +82,7 @@ pub fn to_array(value: Dynamic) -> Array { Array::from([value]) } } + pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::Error> { let url = url.to_string(); let output_path = output_path.to_string(); @@ -60,9 +95,9 @@ pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::E let total_size = response.content_length().unwrap_or(0); let pb = ProgressBar::new(total_size); pb.set_style(ProgressStyle::default_bar() - .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})") - .unwrap() - .progress_chars("#>-")); + .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})") + .unwrap() + .progress_chars("#>-")); pb.set_message(format!("Downloading {}", url)); let mut file = TokioFile::create(&output_path).await?; let mut downloaded: u64 = 0; @@ -81,6 +116,7 @@ pub async fn download_file(url: &str, output_path: &str) -> Result<(), anyhow::E }); download_handle.await? } + pub fn parse_filter(filter_str: &str) -> Result<(String, Vec), Box> { let parts: Vec<&str> = filter_str.split('=').collect(); if parts.len() != 2 { @@ -96,15 +132,18 @@ pub fn parse_filter(filter_str: &str) -> Result<(String, Vec), Box usize { let char_count = text.chars().count(); (char_count / 4).max(1) } + pub fn establish_pg_connection() -> Result { let database_url = std::env::var("DATABASE_URL").unwrap(); PgConnection::establish(&database_url) .with_context(|| format!("Failed to connect to database at {}", database_url)) } + pub type DbPool = Pool>; pub fn create_conn() -> Result { let database_url = std::env::var("DATABASE_URL")