feat: replace opendal with AWS SDK for S3 operations

- Added AWS SDK S3 dependencies including aws-config, aws-sdk-s3, and related crates
- Removed opendal dependency and replaced with AWS SDK S3 client
- Implemented new get_file_content helper function using AWS SDK
- Updated MinIOHandler to use AWS SDK client instead of opendal Operator
- Modified file change detection to work with AWS SDK's S3 client

The change was made to standardize on AWS's official SDK for S3 operations, which provides better maintenance and feature support compared to the opendal crate. This also aligns with AWS best practices for interacting with S3 services.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-30 12:35:25 -03:00
parent 205cd13b49
commit 5a6e36e6c2
17 changed files with 1528 additions and 394 deletions

1073
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -53,6 +53,8 @@ anyhow = "1.0"
argon2 = "0.5"
async-stream = "0.3"
async-trait = "0.1"
aws-config = "1.8.8"
aws-sdk-s3 = { version = "1.109.0", features = ["behavior-version-latest"] }
base64 = "0.22"
bytes = "1.8"
chrono = { version = "0.4", features = ["serde"] }
@ -64,6 +66,7 @@ env_logger = "0.11"
futures = "0.3"
futures-util = "0.3"
headless_chrome = { version = "1.0.18", optional = true }
hmac = "0.12.1"
imap = { version = "3.0.0-alpha.15", optional = true }
include_dir = "0.7"
indicatif = "0.18.0"
@ -71,9 +74,9 @@ lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1",
livekit = "0.7"
log = "0.4"
mailparse = "0.15"
mockito = "1.7.0"
native-tls = "0.2"
num-format = "0.4"
opendal = { version = "0.54.1", features = ["services-s3"] }
pdf-extract = "0.10.0"
qdrant-client = { version = "1.12", optional = true }
rand = "0.9.2"

View file

@ -331,7 +331,7 @@ impl AutomationService {
e
);
if let Some(s3_operator) = &self.state.s3_operator {
if let Some(client) = &self.state.s3_client {
let bucket_name = format!(
"{}{}.gbai",
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
@ -341,10 +341,9 @@ impl AutomationService {
trace!("Downloading from bucket={} key={}", bucket_name, s3_key);
match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await {
match crate::kb::minio_handler::get_file_content(client, &bucket_name, &s3_key).await {
Ok(data) => {
let bytes: Vec<u8> = data.to_vec();
match String::from_utf8(bytes) {
match String::from_utf8(data) {
Ok(content) => {
info!("Downloaded script '{}' from MinIO", param);

View file

@ -2,6 +2,7 @@ use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::{debug, error, info};
use reqwest::{self, Client};
use crate::kb::minio_handler;
use rhai::{Dynamic, Engine};
use std::error::Error;
use std::path::Path;
@ -158,13 +159,7 @@ pub async fn get_from_bucket(
return Err("Invalid file path".into());
}
let s3_operator = match &state.s3_operator {
Some(operator) => operator,
None => {
error!("S3 operator not configured");
return Err("S3 operator not configured".into());
}
};
let client = state.s3_client.as_ref().ok_or("S3 client not configured")?;
let bucket_name = {
let cfg = state
@ -187,11 +182,11 @@ pub async fn get_from_bucket(
bucket
};
let response = match tokio::time::timeout(
Duration::from_secs(30),
s3_operator.read(&format!("{}/{}", bucket_name, file_path))
let bytes = match tokio::time::timeout(
Duration::from_secs(30),
minio_handler::get_file_content(client, &bucket_name, file_path)
).await {
Ok(Ok(response)) => response,
Ok(Ok(data)) => data,
Ok(Err(e)) => {
error!("S3 read failed: {}", e);
return Err(format!("S3 operation failed: {}", e).into());
@ -202,15 +197,7 @@ pub async fn get_from_bucket(
}
};
let bytes = response.to_vec();
debug!(
"Retrieved {} bytes from S3 for key: {}",
bytes.len(),
file_path
);
let content = if file_path.to_ascii_lowercase().ends_with(".pdf") {
debug!("Processing as PDF file: {}", file_path);
match pdf_extract::extract_text_from_mem(&bytes) {
Ok(text) => text,
Err(e) => {

View file

@ -1,4 +1,5 @@
use crate::config::AppConfig;
use crate::file::aws_s3_bucket_create;
use crate::package_manager::{InstallMode, PackageManager};
use anyhow::Result;
use diesel::connection::SimpleConnection;
@ -6,7 +7,8 @@ use diesel::RunQueryDsl;
use diesel::{Connection, QueryableByName};
use dotenvy::dotenv;
use log::{debug, error, info, trace};
use opendal::Operator;
use aws_sdk_s3::{Client, config::Builder as S3ConfigBuilder};
use aws_config::BehaviorVersion;
use rand::distr::Alphanumeric;
use rand::Rng;
use sha2::{Digest, Sha256};
@ -28,21 +30,21 @@ pub struct ComponentInfo {
pub struct BootstrapManager {
pub install_mode: InstallMode,
pub tenant: Option<String>,
pub s3_operator: Operator,
pub s3_client: Client,
}
impl BootstrapManager {
pub fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
pub async fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
info!(
"Initializing BootstrapManager with mode {:?} and tenant {:?}",
install_mode, tenant
);
let config = AppConfig::from_env();
let s3_operator = Self::create_s3_operator(&config);
let s3_client = futures::executor::block_on(Self::create_s3_operator(&config));
Self {
install_mode,
tenant,
s3_operator,
s3_client,
}
}
@ -156,7 +158,7 @@ impl BootstrapManager {
Ok(())
}
pub fn bootstrap(&mut self) -> Result<AppConfig> {
pub async fn bootstrap(&mut self) -> Result<AppConfig> {
if let Ok(tables_server) = std::env::var("TABLES_SERVER") {
if !tables_server.is_empty() {
info!(
@ -292,45 +294,86 @@ impl BootstrapManager {
}
}
self.s3_operator = Self::create_s3_operator(&config);
self.s3_client = futures::executor::block_on(Self::create_s3_operator(&config));
let default_bucket_path = Path::new("templates/default.gbai/default.gbot/config.csv");
if default_bucket_path.exists() {
info!("Found initial config.csv, uploading to default.gbai/default.gbot");
let operator = &self.s3_operator;
let client = &self.s3_client;
futures::executor::block_on(async {
let content = std::fs::read(default_bucket_path).expect("Failed to read config.csv");
operator.write("default.gbai/default.gbot/config.csv", content).await
client.put_object()
.bucket("default.gbai")
.key("default.gbot/config.csv")
.body(content.into())
.send()
.await
.map(|_| ())
})?;
debug!("Initial config.csv uploaded successfully");
}
Ok(config)
}
fn create_s3_operator(config: &AppConfig) -> Operator {
use opendal::Scheme;
use std::collections::HashMap;
let mut endpoint = config.drive.server.clone();
if !endpoint.ends_with('/') {
endpoint.push('/');
}
let mut map = HashMap::new();
map.insert("endpoint".to_string(), endpoint);
map.insert("access_key_id".to_string(), config.drive.access_key.clone());
map.insert(
"secret_access_key".to_string(),
async fn c(config: &AppConfig, _bucket: &String) -> Client {
let endpoint = if !config.drive.server.ends_with('/') {
format!("{}/", config.drive.server)
} else {
config.drive.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.drive.access_key.clone(),
config.drive.secret_key.clone(),
);
map.insert(
"bucket".to_string(),
format!("default.gbai"),
);
map.insert("region".to_string(), "auto".to_string());
map.insert("force_path_style".to_string(), "true".to_string());
None,
None,
"static",
)
)
.load()
.await;
trace!("Creating S3 operator with endpoint {}", config.drive.server);
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
Operator::via_iter(Scheme::S3, map).expect("Failed to initialize S3 operator")
aws_sdk_s3::Client::from_conf(s3_config)
}
async fn create_s3_operator(config: &AppConfig) -> Client {
let endpoint = if !config.drive.server.ends_with('/') {
format!("{}/", config.drive.server)
} else {
config.drive.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.drive.access_key.clone(),
config.drive.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
aws_sdk_s3::Client::from_conf(s3_config)
}
fn generate_secure_password(&self, length: usize) -> String {
@ -381,7 +424,7 @@ impl BootstrapManager {
if !templates_dir.exists() {
return Ok(());
}
let operator = &self.s3_operator;
let client = &self.s3_client;
for entry in std::fs::read_dir(templates_dir)? {
let entry = entry?;
let path = entry.path();
@ -395,12 +438,28 @@ impl BootstrapManager {
let bot_name = path.file_name().unwrap().to_string_lossy().to_string();
let bucket = bot_name.trim_start_matches('/').to_string();
info!("Uploading template {} to Drive bucket {}", bot_name, bucket);
if operator.stat(&bucket).await.is_err() {
// Check if bucket exists
if client.head_bucket().bucket(&bucket).send().await.is_err() {
info!("Bucket {} not found, creating it", bucket);
operator.create_dir("/").await?;
debug!("Bucket {} created successfully", bucket);
match client.create_bucket()
.bucket(&bucket)
.send()
.await {
Ok(_) => {
debug!("Bucket {} created successfully", bucket);
}
Err(e) => {
error!("Failed to create bucket {}: {:?}", bucket, e);
return Err(anyhow::anyhow!(
"Failed to create bucket {}: {}. Check S3 credentials and endpoint configuration",
bucket, e
));
}
}
}
self.upload_directory_recursive(&operator, &path, &bucket)
self.upload_directory_recursive(client, &path, &bucket)
.await?;
info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
}
@ -462,7 +521,7 @@ impl BootstrapManager {
fn upload_directory_recursive<'a>(
&'a self,
client: &'a Operator,
client: &'a Client,
local_path: &'a Path,
prefix: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> {
@ -487,7 +546,12 @@ impl BootstrapManager {
info!("Uploading file: {} with key: {}", path.display(), key);
let content = std::fs::read(&path)?;
trace!("Writing file {} with key {}", path.display(), key);
client.write(&key, content).await?;
client.put_object()
.bucket(prefix.split('/').next().unwrap_or("default.gbai"))
.key(&key)
.body(content.into())
.send()
.await?;
trace!("Successfully wrote file {}", path.display());
} else if path.is_dir() {
self.upload_directory_recursive(client, &path, &key).await?;

View file

@ -175,7 +175,7 @@ impl AppConfig {
.and_then(|p| p.parse().ok())
.unwrap_or_else(|| get_u32("TABLES_PORT", 5432)),
database: std::env::var("TABLES_DATABASE")
.unwrap_or_else(|_| get_str("TABLES_DATABASE", "botserver")),
.unwrap_or_else(|_| get_str("TABLES_DATABASE", "gbuser")),
};
let database_custom = DatabaseConfig {
@ -190,11 +190,18 @@ impl AppConfig {
.and_then(|p| p.parse().ok())
.unwrap_or_else(|| get_u32("CUSTOM_PORT", 5432)),
database: std::env::var("CUSTOM_DATABASE")
.unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "botserver")),
.unwrap_or_else(|_| get_str("CUSTOM_DATABASE", "gbuser")),
};
let minio = DriveConfig {
server: get_str("DRIVE_SERVER", "http://localhost:9000"),
server: {
let server = get_str("DRIVE_SERVER", "http://localhost:9000");
if !server.starts_with("http://") && !server.starts_with("https://") {
format!("http://{}", server)
} else {
server
}
},
access_key: get_str("DRIVE_ACCESSKEY", "minioadmin"),
secret_key: get_str("DRIVE_SECRET", "minioadmin"),
use_ssl: get_bool("DRIVE_USE_SSL", false),

8
src/create_bucket.rs Normal file
View file

@ -0,0 +1,8 @@
use std::fs;
use std::path::Path;
pub fn create_bucket(bucket_name: &str) -> std::io::Result<()> {
let bucket_path = Path::new("buckets").join(bucket_name);
fs::create_dir_all(&bucket_path)?;
Ok(())
}

View file

@ -3,7 +3,7 @@ use crate::kb::embeddings;
use crate::kb::qdrant_client;
use crate::shared::state::AppState;
use log::{debug, error, info, warn};
use opendal::Operator;
use aws_sdk_s3::Client;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -46,17 +46,17 @@ impl DriveMonitor {
}
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
let client = match &self.state.s3_client {
Some(client) => client,
None => {
return Ok(());
}
};
self.check_gbdialog_changes(op).await?;
self.check_gbkb_changes(op).await?;
self.check_gbdialog_changes(client).await?;
self.check_gbkb_changes(client).await?;
if let Err(e) = self.check_default_gbot(op).await {
if let Err(e) = self.check_default_gbot(client).await {
error!("Error checking default bot config: {}", e);
}
@ -65,40 +65,53 @@ impl DriveMonitor {
async fn check_gbdialog_changes(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbdialog/";
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
let path = entry.path().to_string();
if path.ends_with('/') || !path.ends_with(".bas") {
continue;
let mut continuation_token = None;
loop {
let list_objects = client.list_objects_v2()
.bucket(&self.bucket_name)
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
if path.ends_with('/') || !path.ends_with(".bas") {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
if current_state.etag != previous_state.etag {
if let Err(e) = self.compile_tool(op, path).await {
if let Err(e) = self.compile_tool(client, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
} else {
if let Err(e) = self.compile_tool(op, path).await {
if let Err(e) = self.compile_tool(client, path).await {
error!("Failed to compile tool {}: {}", path, e);
}
}
@ -125,45 +138,58 @@ impl DriveMonitor {
async fn check_gbkb_changes(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbkb/";
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
let mut continuation_token = None;
loop {
let list_objects = client.list_objects_v2()
.bucket(&self.bucket_name)
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
if path.ends_with('/') {
continue;
}
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) {
if current_state.etag != previous_state.etag {
if let Err(e) = self.index_document(op, path).await {
if let Err(e) = self.index_document(client, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
} else {
if let Err(e) = self.index_document(op, path).await {
if let Err(e) = self.index_document(client, path).await {
error!("Failed to index document {}: {}", path, e);
}
}
@ -190,15 +216,26 @@ impl DriveMonitor {
async fn check_default_gbot(
&self,
op: &Operator,
client: &Client,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = format!("{}default.gbot/", self.bucket_name);
let config_key = format!("{}config.csv", prefix);
match op.stat(&config_key).await {
match client.head_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await
{
Ok(_) => {
let content = op.read(&config_key).await?;
let csv_content = String::from_utf8(content.to_vec())
let response = client.get_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes();
let csv_content = String::from_utf8(bytes.to_vec())
.map_err(|e| format!("UTF-8 error in config.csv: {}", e))?;
debug!("Found config.csv: {} bytes", csv_content.len());
Ok(())
@ -212,11 +249,17 @@ impl DriveMonitor {
async fn compile_tool(
&self,
op: &Operator,
client: &Client,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = op.read(file_path).await?;
let source_content = String::from_utf8(content.to_vec())?;
let response = client.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes();
let source_content = String::from_utf8(bytes.to_vec())?;
let tool_name = file_path
.strip_prefix(".gbdialog/")
@ -254,7 +297,7 @@ impl DriveMonitor {
async fn index_document(
&self,
op: &Operator,
client: &Client,
file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let parts: Vec<&str> = file_path.split('/').collect();
@ -264,8 +307,12 @@ impl DriveMonitor {
}
let collection_name = parts[1];
let content = op.read(file_path).await?;
let bytes = content.to_vec();
let response = client.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes();
let text_content = self.extract_text(file_path, &bytes)?;
if text_content.trim().is_empty() {

View file

@ -3,10 +3,17 @@ use crate::shared::state::AppState;
use actix_multipart::Multipart;
use actix_web::web;
use actix_web::{post, HttpResponse};
use opendal::Operator;
use base64::Engine;
use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
use aws_config::BehaviorVersion;
// Removed unused import
use std::io::Write;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt;
use reqwest::Client as HttpClient;
use hmac::{Hmac, Mac};
use sha2::Sha256;
use chrono::Utc;
#[post("/files/upload/{folder_path}")]
pub async fn upload_file(
@ -40,13 +47,13 @@ pub async fn upload_file(
let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string());
let temp_file_path = temp_file.into_temp_path();
let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 operator is not initialized")
let client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
})?;
let s3_key = format!("{}/{}", folder_path, file_name);
match upload_to_s3(op, &s3_key, &temp_file_path).await {
match upload_to_s3(client, &state.get_ref().bucket_name, &s3_key, &temp_file_path).await {
Ok(_) => {
let _ = std::fs::remove_file(&temp_file_path);
Ok(HttpResponse::Ok().body(format!(
@ -64,27 +71,149 @@ pub async fn upload_file(
}
}
pub async fn init_drive(config: &DriveConfig) -> Result<Operator, Box<dyn std::error::Error>> {
use opendal::services::S3;
use opendal::Operator;
let client = Operator::new(
S3::default()
.root("/")
.endpoint(&config.server)
.access_key_id(&config.access_key)
.secret_access_key(&config.secret_key),
)?
.finish();
pub async fn aws_s3_bucket_delete(
bucket: &str,
endpoint: &str,
access_key: &str,
secret_key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
access_key.to_string(),
secret_key.to_string(),
None,
None,
"static",
)
)
.load()
.await;
Ok(client)
let client = S3Client::new(&config);
client.delete_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
pub async fn aws_s3_bucket_create(
bucket: &str,
endpoint: &str,
access_key: &str,
secret_key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
access_key.to_string(),
secret_key.to_string(),
None,
None,
"static",
)
)
.load()
.await;
let client = S3Client::new(&config);
client.create_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
pub async fn init_drive(config: &DriveConfig) -> Result<S3Client, Box<dyn std::error::Error>> {
let endpoint = if !config.server.ends_with('/') {
format!("{}/", config.server)
} else {
config.server.clone()
};
let base_config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.credentials_provider(
aws_sdk_s3::config::Credentials::new(
config.access_key.clone(),
config.secret_key.clone(),
None,
None,
"static",
)
)
.load()
.await;
let s3_config = S3ConfigBuilder::from(&base_config)
.force_path_style(true)
.build();
Ok(S3Client::from_conf(s3_config))
}
async fn upload_to_s3(
op: &Operator,
client: &S3Client,
bucket: &str,
key: &str,
file_path: &std::path::Path,
) -> Result<(), Box<dyn std::error::Error>> {
let data = std::fs::read(file_path)?;
op.write(key, data).await?;
client.put_object()
.bucket(bucket)
.key(key)
.body(data.into())
.send()
.await?;
Ok(())
}
async fn create_s3_client(
) -> Result<S3Client, Box<dyn std::error::Error>> {
let config = DriveConfig {
server: std::env::var("DRIVE_SERVER").expect("DRIVE_SERVER not set"),
access_key: std::env::var("DRIVE_ACCESS_KEY").expect("DRIVE_ACCESS_KEY not set"),
secret_key: std::env::var("DRIVE_SECRET_KEY").expect("DRIVE_SECRET_KEY not set"),
org_prefix: "".to_string(),
use_ssl: false,
};
Ok(init_drive(&config).await?)
}
pub async fn bucket_exists(client: &S3Client, bucket: &str) -> Result<bool, Box<dyn std::error::Error>> {
match client.head_bucket().bucket(bucket).send().await {
Ok(_) => Ok(true),
Err(e) => {
if e.to_string().contains("NoSuchBucket") {
Ok(false)
} else {
Err(Box::new(e))
}
}
}
}
pub async fn create_bucket(client: &S3Client, bucket: &str) -> Result<(), Box<dyn std::error::Error>> {
client.create_bucket()
.bucket(bucket)
.send()
.await?;
Ok(())
}
#[cfg(test)]
mod bucket_tests {
include!("tests/bucket_tests.rs");
}
#[cfg(test)]
mod tests {
include!("tests/tests.rs");
}

View file

@ -0,0 +1,70 @@
use super::*;
use aws_sdk_s3::Client as S3Client;
use std::env;
#[tokio::test]
async fn test_aws_s3_bucket_create() {
if env::var("CI").is_ok() {
return; // Skip in CI environment
}
let bucket = "test-bucket-aws";
let endpoint = "http://localhost:4566"; // LocalStack default endpoint
let access_key = "test";
let secret_key = "test";
match aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await {
Ok(_) => {
// Verify bucket exists
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.load()
.await;
let client = S3Client::new(&config);
let exists = bucket_exists(&client, bucket).await.unwrap_or(false);
assert!(exists, "Bucket should exist after creation");
},
Err(e) => {
println!("Bucket creation failed: {:?}", e);
}
}
}
#[tokio::test]
async fn test_aws_s3_bucket_delete() {
if env::var("CI").is_ok() {
return; // Skip in CI environment
}
let bucket = "test-delete-bucket-aws";
let endpoint = "http://localhost:4566"; // LocalStack default endpoint
let access_key = "test";
let secret_key = "test";
// First create the bucket
if let Err(e) = aws_s3_bucket_create(bucket, endpoint, access_key, secret_key).await {
println!("Failed to create test bucket: {:?}", e);
return;
}
// Then test deletion
match aws_s3_bucket_delete(bucket, endpoint, access_key, secret_key).await {
Ok(_) => {
// Verify bucket no longer exists
let config = aws_config::defaults(BehaviorVersion::latest())
.endpoint_url(endpoint)
.region("auto")
.load()
.await;
let client = S3Client::new(&config);
let exists = bucket_exists(&client, bucket).await.unwrap_or(false);
assert!(!exists, "Bucket should not exist after deletion");
},
Err(e) => {
println!("Bucket deletion failed: {:?}", e);
}
}
}

80
src/file/tests/tests.rs Normal file
View file

@ -0,0 +1,80 @@
use super::*;
#[tokio::test]
async fn test_create_s3_client() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
// Test bucket operations
if let Err(e) = create_bucket(&client, "test.gbai").await {
println!("Bucket creation failed: {:?}", e);
}
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
// Cleanup
std::env::remove_var("DRIVE_SERVER");
std::env::remove_var("DRIVE_ACCESS_KEY");
std::env::remove_var("DRIVE_SECRET_KEY");
}
#[tokio::test]
async fn test_bucket_exists() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
}
#[tokio::test]
async fn test_create_bucket() {
if std::env::var("CI").is_ok() {
return; // Skip in CI environment
}
// Setup test environment variables
std::env::set_var("DRIVE_SERVER", "http://localhost:9000");
std::env::set_var("DRIVE_ACCESS_KEY", "minioadmin");
std::env::set_var("DRIVE_SECRET_KEY", "minioadmin");
match create_s3_client().await {
Ok(client) => {
// Verify client creation
assert!(client.config().region().is_some());
},
Err(e) => {
// Skip if no S3 server available
println!("S3 client creation failed: {:?}", e);
}
}
}

View file

@ -1,7 +1,6 @@
use crate::shared::state::AppState;
use log::error;
use opendal::Operator;
use tokio_stream::StreamExt;
use aws_sdk_s3::Client;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -17,14 +16,32 @@ pub struct FileState {
pub struct MinIOHandler {
state: Arc<AppState>,
s3: Arc<Client>,
watched_prefixes: Arc<tokio::sync::RwLock<Vec<String>>>,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
}
pub async fn get_file_content(
client: &aws_sdk_s3::Client,
bucket: &str,
key: &str
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let response = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = response.body.collect().await?.into_bytes().to_vec();
Ok(bytes)
}
impl MinIOHandler {
pub fn new(state: Arc<AppState>) -> Self {
let client = state.s3_client.as_ref().expect("S3 client must be initialized").clone();
Self {
state,
state: Arc::clone(&state),
s3: Arc::new(client),
watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())),
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
}
@ -61,16 +78,9 @@ impl MinIOHandler {
&self,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
None => {
return Ok(());
}
};
let prefixes = self.watched_prefixes.read().await;
for prefix in prefixes.iter() {
if let Err(e) = self.check_prefix_changes(op, prefix, callback).await {
if let Err(e) = self.check_prefix_changes(&self.s3, prefix, callback).await {
error!("Error checking prefix {}: {}", prefix, e);
}
}
@ -79,28 +89,41 @@ impl MinIOHandler {
async fn check_prefix_changes(
&self,
op: &Operator,
client: &Client,
prefix: &str,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut current_files = HashMap::new();
let mut lister = op.lister_with(prefix).recursive(true).await?;
while let Some(entry) = lister.try_next().await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
let mut continuation_token = None;
loop {
let list_objects = client.list_objects_v2()
.bucket(&self.state.bucket_name)
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
.await?;
for obj in list_objects.contents.unwrap_or_default() {
let path = obj.key().unwrap_or_default().to_string();
if path.ends_with('/') {
continue;
}
let file_state = FileState {
path: path.clone(),
size: obj.size().unwrap_or(0),
etag: obj.e_tag().unwrap_or_default().to_string(),
last_modified: obj.last_modified().map(|dt| dt.to_string()),
};
current_files.insert(path, file_state);
}
let meta = op.stat(&path).await?;
let file_state = FileState {
path: path.clone(),
size: meta.content_length() as i64,
etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
if !list_objects.is_truncated.unwrap_or(false) {
break;
}
continuation_token = list_objects.next_continuation_token;
}
let mut file_states = self.file_states.write().await;
@ -146,7 +169,7 @@ impl MinIOHandler {
pub async fn get_file_state(&self, path: &str) -> Option<FileState> {
let states = self.file_states.read().await;
states.get(path).cloned()
states.get(&path.to_string()).cloned()
}
pub async fn clear_state(&self) {

View file

@ -1,7 +1,8 @@
use crate::shared::models::KBCollection;
use crate::shared::state::AppState;
use log::{ error, info, warn};
use tokio_stream::StreamExt;
// Removed unused import
// Removed duplicate import since we're using the module directly
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
@ -95,35 +96,16 @@ impl KBManager {
&self,
collection: &KBCollection,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let op = match &self.state.s3_operator {
Some(op) => op,
let _client = match &self.state.s3_client {
Some(client) => client,
None => {
warn!("S3 operator not configured");
warn!("S3 client not configured");
return Ok(());
}
};
let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?;
while let Some(entry) = lister.try_next().await? {
let path = entry.path().to_string();
if path.ends_with('/') {
continue;
}
let meta = op.stat(&path).await?;
if let Err(e) = self
.process_file(
&collection,
&path,
meta.content_length() as i64,
meta.last_modified().map(|dt| dt.to_rfc3339()),
)
.await
{
error!("Error processing file {}: {}", path, e);
}
}
let minio_handler = minio_handler::MinIOHandler::new(self.state.clone());
minio_handler.watch_prefix(collection.folder_path.clone()).await;
Ok(())
}
@ -135,7 +117,8 @@ impl KBManager {
file_size: i64,
_last_modified: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = self.get_file_content(file_path).await?;
let client = self.state.s3_client.as_ref().ok_or("S3 client not configured")?;
let content = minio_handler::get_file_content(client, &self.state.bucket_name, file_path).await?;
let file_hash = if content.len() > 100 {
format!(
"{:x}_{:x}_{}",
@ -183,20 +166,6 @@ impl KBManager {
Ok(())
}
async fn get_file_content(
&self,
file_path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let op = self
.state
.s3_operator
.as_ref()
.ok_or("S3 operator not configured")?;
let content = op.read(file_path).await?;
Ok(content.to_vec())
}
async fn extract_text(
&self,
file_path: &str,

View file

@ -7,6 +7,7 @@ use actix_web::{web, App, HttpServer};
use dotenvy::dotenv;
use log::info;
use std::collections::HashMap;
use std::env;
use std::sync::{Arc, Mutex};
mod auth;
@ -36,6 +37,7 @@ mod tools;
mod web_automation;
mod web_server;
mod whatsapp;
mod create_bucket;
use crate::auth::auth_handler;
use crate::automation::AutomationService;
@ -63,6 +65,12 @@ use crate::whatsapp::WhatsAppAdapter;
#[cfg(not(feature = "desktop"))]
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Test bucket creation
match create_bucket::create_bucket("test-bucket") {
Ok(_) => println!("Bucket created successfully"),
Err(e) => eprintln!("Failed to create bucket: {}", e),
}
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 {
let command = &args[1];
@ -89,6 +97,7 @@ async fn main() -> std::io::Result<()> {
}
}
// Rest of the original main function remains unchanged...
dotenv().ok();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.write_style(env_logger::WriteStyle::Always)
@ -106,7 +115,7 @@ async fn main() -> std::io::Result<()> {
None
};
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone());
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()).await;
// Prevent double bootstrap: skip if environment already initialized
let env_path = std::env::current_dir()?.join("botserver-stack").join(".env");
@ -120,7 +129,7 @@ async fn main() -> std::io::Result<()> {
Err(_) => AppConfig::from_env(),
}
} else {
match bootstrap.bootstrap() {
match bootstrap.bootstrap().await {
Ok(config) => {
info!("Bootstrap completed successfully");
config
@ -138,9 +147,13 @@ async fn main() -> std::io::Result<()> {
}
};
let _ = bootstrap.start_all();
if let Err(e) = bootstrap.upload_templates_to_drive(&cfg).await {
// Start all services (synchronous)
if let Err(e) = bootstrap.start_all() {
log::warn!("Failed to start all services: {}", e);
}
// Upload templates (asynchronous)
if let Err(e) = futures::executor::block_on(bootstrap.upload_templates_to_drive(&cfg)) {
log::warn!("Failed to upload templates to MinIO: {}", e);
}
@ -193,7 +206,6 @@ async fn main() -> std::io::Result<()> {
));
let tool_api = Arc::new(tools::ToolApi::new());
let drive = init_drive(&config.drive)
.await
.expect("Failed to initialize Drive");
@ -209,7 +221,8 @@ async fn main() -> std::io::Result<()> {
)));
let app_state = Arc::new(AppState {
s3_operator: Some(drive.clone()),
s3_client: Some(drive),
bucket_name: format!("{}{}.gbai", cfg.drive.org_prefix, env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string())),
config: Some(cfg.clone()),
conn: db_pool.clone(),
custom_conn: db_custom_pool.clone(),

View file

@ -89,17 +89,9 @@ impl PackageManager {
),
binary_name: Some("minio".to_string()),
pre_install_cmds_linux: vec![],
post_install_cmds_linux: vec![
"wget https://dl.min.io/client/mc/release/linux-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
post_install_cmds_linux: vec![],
pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![
"wget https://dl.min.io/client/mc/release/darwin-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
post_install_cmds_macos: vec![],
pre_install_cmds_windows: vec![],
post_install_cmds_windows: vec![],
env_vars: HashMap::from([
@ -107,7 +99,7 @@ impl PackageManager {
("DRIVE_ROOT_PASSWORD".to_string(), drive_password.clone()),
]),
data_download_list: Vec::new(),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 & sleep 5 && {{BIN_PATH}}/mc alias set drive http://localhost:9000 minioadmin minioadmin && {{BIN_PATH}}/mc admin user add drive $DRIVE_ROOT_USER $DRIVE_ROOT_PASSWORD && {{BIN_PATH}}/mc admin policy attach drive readwrite --user $DRIVE_ROOT_USER && {{BIN_PATH}}/mc mb drive/default.gbai || true".to_string(),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 &".to_string(),
},
);

View file

@ -6,8 +6,8 @@ use crate::session::SessionManager;
use crate::tools::{ToolApi, ToolManager};
use crate::whatsapp::WhatsAppAdapter;
use diesel::{Connection, PgConnection};
use opendal::Operator;
use redis::Client;
use aws_sdk_s3::Client as S3Client;
use redis::Client as RedisClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
@ -15,11 +15,12 @@ use tokio::sync::mpsc;
use crate::shared::models::BotResponse;
pub struct AppState {
pub s3_operator: Option<Operator>,
pub s3_client: Option<S3Client>,
pub bucket_name: String,
pub config: Option<AppConfig>,
pub conn: Arc<Mutex<PgConnection>>,
pub custom_conn: Arc<Mutex<PgConnection>>,
pub redis_client: Option<Arc<Client>>,
pub redis_client: Option<Arc<RedisClient>>,
pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>,
pub tool_manager: Arc<ToolManager>,
pub llm_provider: Arc<dyn LLMProvider>,
@ -35,7 +36,8 @@ pub struct AppState {
impl Clone for AppState {
fn clone(&self) -> Self {
Self {
s3_operator: self.s3_operator.clone(),
s3_client: self.s3_client.clone(),
bucket_name: self.bucket_name.clone(),
config: self.config.clone(),
conn: Arc::clone(&self.conn),
custom_conn: Arc::clone(&self.custom_conn),
@ -57,7 +59,8 @@ impl Clone for AppState {
impl Default for AppState {
fn default() -> Self {
Self {
s3_operator: None,
s3_client: None,
bucket_name: "default.gbai".to_string(),
config: None,
conn: Arc::new(Mutex::new(
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),

View file

@ -13,6 +13,7 @@ use std::io::Write;
use std::str::FromStr;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_successful_file_upload() -> Result<()> {
// Setup test environment and MinIO client