This commit is contained in:
christopher 2025-10-29 10:25:45 -03:00
commit 52e2f79395
28 changed files with 1237 additions and 5673 deletions

4898
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -37,74 +37,66 @@ license = "AGPL-3.0"
repository = "https://github.com/GeneralBots/BotServer" repository = "https://github.com/GeneralBots/BotServer"
[features] [features]
desktop = ["tauri", "tauri-plugin-opener", "tauri-plugin-dialog"]
default = [ "vectordb"] default = [ "vectordb"]
vectordb = ["qdrant-client"] vectordb = ["qdrant-client"]
email = ["imap"] email = ["imap"]
web_automation = ["headless_chrome"] web_automation = ["headless_chrome"]
webapp = ["tauri", "tauri-plugin-opener", "tauri-plugin-dialog"] desktop = []
[dependencies] [dependencies]
actix-cors = "0.7" actix-cors = "0.7"
aws-config = "0.57.0"
csv = "1.3"
actix-multipart = "0.7" actix-multipart = "0.7"
imap = { version = "3.0.0-alpha.15", optional = true }
actix-web = "4.9" actix-web = "4.9"
actix-ws = "0.3" actix-ws = "0.3"
aes-gcm = "0.10"
anyhow = "1.0" anyhow = "1.0"
argon2 = "0.5"
async-stream = "0.3" async-stream = "0.3"
async-trait = "0.1" async-trait = "0.1"
aes-gcm = "0.10"
argon2 = "0.5"
base64 = "0.22" base64 = "0.22"
bytes = "1.8" bytes = "1.8"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
csv = "1.3"
diesel = { version = "2.1", features = ["postgres", "uuid", "chrono", "serde_json"] } diesel = { version = "2.1", features = ["postgres", "uuid", "chrono", "serde_json"] }
dotenvy = "0.15" dotenvy = "0.15"
downloader = "0.2" downloader = "0.2"
env_logger = "0.11" env_logger = "0.11"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
headless_chrome = { version = "1.0.18", optional = true }
imap = { version = "3.0.0-alpha.15", optional = true }
include_dir = "0.7"
indicatif = "0.18.0"
lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1", "tokio1-native-tls"] } lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1", "tokio1-native-tls"] }
livekit = "0.7" livekit = "0.7"
include_dir = "0.7"
log = "0.4" log = "0.4"
mailparse = "0.15" mailparse = "0.15"
native-tls = "0.2" native-tls = "0.2"
num-format = "0.4" num-format = "0.4"
opendal = { version = "0.54.1", features = ["services-s3"] }
pdf-extract = "0.10.0"
qdrant-client = { version = "1.12", optional = true } qdrant-client = { version = "1.12", optional = true }
rhai = { git = "https://github.com/therealprof/rhai.git", branch = "features/use-web-time" } rand = "0.9.2"
redis = { version = "0.27", features = ["tokio-comp"] } redis = { version = "0.27", features = ["tokio-comp"] }
regex = "1.11" regex = "1.11"
reqwest = { version = "0.12", features = ["json", "stream"] } reqwest = { version = "0.12", features = ["json", "stream"] }
rhai = { git = "https://github.com/therealprof/rhai.git", branch = "features/use-web-time" }
scraper = "0.20"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.10.9"
smartstring = "1.0" smartstring = "1.0"
tempfile = "3" tempfile = "3"
time = "0.3.44"
tokio = { version = "1.41", features = ["full"] } tokio = { version = "1.41", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt"] } tracing-subscriber = { version = "0.3", features = ["fmt"] }
ureq = "3.1.2"
urlencoding = "2.1" urlencoding = "2.1"
uuid = { version = "1.11", features = ["serde", "v4"] } uuid = { version = "1.11", features = ["serde", "v4"] }
zip = "2.2" zip = "2.2"
time = "0.3.44"
aws-sdk-s3 = { version = "1.108.0", features = ["behavior-version-latest"] }
headless_chrome = { version = "1.0.18", optional = true }
rand = "0.9.2"
pdf-extract = "0.10.0"
scraper = "0.20"
sha2 = "0.10.9"
ureq = "3.1.2"
indicatif = "0.18.0"
tauri = { version = "2", features = ["unstable"], optional = true }
tauri-plugin-opener = { version = "2", optional = true }
tauri-plugin-dialog = { version = "2", optional = true }
[build-dependencies]
tauri-build = { version = "2", features = [] }
[profile.release] [profile.release]
lto = true # Enables Link-Time Optimization lto = true # Enables Link-Time Optimization

View file

@ -19,26 +19,28 @@ for file in "${prompts[@]}"; do
done done
dirs=( dirs=(
"auth" # "auth"
"automation" # "automation"
"basic" # "basic"
"bot" # "bot"
"bootstrap" "bootstrap"
"package_manager" "package_manager"
"channels" # "channels"
"config" # "config"
"context" # "context"
"email" # "email"
"file" "file"
"llm" # "llm"
"llm_legacy" "drive_monitor"
"org" # "llm_legacy"
"session" # "org"
# "session"
#"kb"
"shared" "shared"
"tests" #"tests"
"tools" # "tools"
"web_automation" # "web_automation"
"whatsapp" # "whatsapp"
) )
filter_rust_file() { filter_rust_file() {

View file

@ -1 +1,69 @@
# Rust Architecture # Architecture
## Auto Bootstrap Process Overview
The Auto Bootstrap process is responsible for initializing and configuring the entire BotServer environment after installation. It ensures that all system components are installed, configured, and started automatically, and that bots are created from predefined templates.
### 1. Bootstrap Initialization
The process begins with the `BootstrapManager`, which is instantiated with an installation mode (`Local` or `Container`) and an optional tenant name. It initializes the `PackageManager`, which detects the operating system and sets up the base installation path (e.g., `/opt/gbo` or `botserver-stack`).
### 2. Component Registration and Installation
The `PackageManager` registers all system components such as:
- **tables** (PostgreSQL database)
- **cache** (Valkey/Redis)
- **drive** (MinIO object storage)
- **llm** (local LLM server)
- **email**, **proxy**, **directory**, **alm**, **dns**, **meeting**, **table_editor**, **doc_editor**, **desktop**, **devtools**, **bot**, **system**, **vector_db**, **host**
Each component has a `ComponentConfig` defining:
- Ports and dependencies
- Download URLs and binaries
- Pre/post-install commands
- Environment variables
- Execution commands
During bootstrap, required components (`tables`, `drive`, `cache`) are installed and started automatically.
For example:
- The **tables** component generates secure database credentials, writes them to `.env`, and applies SQL migrations to initialize the schema.
- The **drive** component creates secure credentials and stores them encrypted in the database.
### 3. Bot Configuration
After components are installed, the bootstrap process updates the bot configuration in the database.
The method `update_bot_config()` ensures each components configuration is linked to a bot record in the `bot_configuration` table.
If no bot exists, a new UUID is generated to associate configuration entries.
### 4. Template-Based Bot Creation
The method `create_bots_from_templates()` scans the `templates/` directory for folders ending in `.gbai` (e.g., `default.gbai`, `announcements.gbai`).
Each `.gbai` folder represents a bot template.
For each template:
- The folder name is converted into a human-readable bot name (e.g., `default.gbai` → “Default”).
- If the bot doesnt exist in the `bots` table, a new record is inserted with:
- Default LLM provider (`openai`)
- Default configuration (`{"model": "gpt-4", "temperature": 0.7}`)
- Context provider (`database`)
- Active status (`true`)
This automatically creates bots from templates during bootstrap.
### 5. Template Upload to MinIO
After bots are created, the method `upload_templates_to_minio()` uploads all template files recursively to a MinIO bucket (S3-compatible storage).
This makes templates accessible for runtime bot operations and ensures persistence across environments.
### 6. Summary
The Auto Bootstrap process performs the following steps automatically:
1. Detects environment and installation mode.
2. Registers and installs required components.
3. Initializes the database and applies migrations.
4. Updates bot configuration records.
5. Creates bots from `.gbai` templates.
6. Uploads templates to MinIO for storage.
This process ensures that after installation, the system is fully operational with preconfigured bots derived from templates, ready to serve requests immediately.

View file

@ -1 +1,88 @@
# Building from Source # Brave
sudo apt install brave-browser-beta
# Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source "$HOME/.cargo/env"
git clone https://alm.pragmatismo.com.br/generalbots/gbserver
apt install -y build-essential \
pkg-config \
libssl-dev \
gcc-multilib \
g++-multilib \
clang \
lld \
binutils-dev \
libudev-dev \
libdbus-1-dev \
libva-dev
apt install -y \
curl \
git \
python3 \
python3-pip \
pkg-config \
libssl-dev \
libasound2-dev \
libpulse-dev \
libx11-dev \
libxext-dev \
libxrandr-dev \
libxcomposite-dev \
libxcursor-dev \
libxi-dev \
libxtst-dev \
libnss3-dev \
libnspr4-dev \
libatk-bridge2.0-dev \
libgtk-3-dev \
libudev-dev \
libavcodec-dev \
libavformat-dev \
libavutil-dev \
libswscale-dev \
libevent-dev \
libjsoncpp-dev \
libopus-dev \
libvpx-dev \
libsrtp2-dev \
protobuf-compiler \
ninja-build \
cmake \
clang \
lld
# LLM
ZED for Windows: https://zed.dev/windows
Zed Assistant: Groq + GPT OSS 120B |
FIX Manual: DeepSeek | ChatGPT 120B | Claude 4.5 Thinking | Mistral
ADD Manual: Claude/DeepSeek -> DeepSeek
# Install
cargo install cargo-audit
cargo install cargo-edit
apt install -y libpq-dev
apt install -y valkey-cli
# Util
cargo upgrade
cargo audit
valkey-cli -p 6379 monitor
# Prompt add-ons
- Prompt add-ons: Fill the file with info!, trace! and debug! macros.
-

View file

@ -18,26 +18,29 @@ for file in "${prompts[@]}"; do
done done
dirs=( dirs=(
#"auth" # "auth"
#"automation" # "automation"
#"basic" #"basic"
#"bot" # "bot"
"bootstrap" "bootstrap"
#"channels" # "package_manager"
"config" # "channels"
#"context" # "config"
#"email" # "context"
#"file" # "email"
#"llm" # "file"
#"llm_legacy" # "llm"
#"org" "drive_monitor"
"package_manager" # "llm_legacy"
#"session" # "org"
# "session"
"file"
"kb"
"shared" "shared"
#"tests" #"tests"
#"tools" # "tools"
#"web_automation" # "web_automation"
#"whatsapp" # "whatsapp"
) )
for dir in "${dirs[@]}"; do for dir in "${dirs[@]}"; do
find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do
@ -51,6 +54,8 @@ done
echo "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE" echo "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE"
cat "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE" cat "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE"
echo "$PROJECT_ROOT/src/basic/keywords/get.rs" >> "$OUTPUT_FILE"
cat "$PROJECT_ROOT/src/basic/keywords/get.rs" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE" echo "" >> "$OUTPUT_FILE"
echo "Compiling..." echo "Compiling..."

2
migrations/6.0.8.sql Normal file
View file

@ -0,0 +1,2 @@
ALTER TABLE bot_configuration
ADD CONSTRAINT bot_configuration_config_key_unique UNIQUE (config_key);

View file

@ -331,8 +331,7 @@ impl AutomationService {
e e
); );
// Try to download from MinIO if let Some(s3_operator) = &self.state.s3_operator {
if let Some(s3_client) = &self.state.s3_client {
let bucket_name = format!( let bucket_name = format!(
"{}{}.gbai", "{}{}.gbai",
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()), env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
@ -342,47 +341,26 @@ impl AutomationService {
trace!("Downloading from bucket={} key={}", bucket_name, s3_key); trace!("Downloading from bucket={} key={}", bucket_name, s3_key);
match s3_client match s3_operator.read(&format!("{}/{}", bucket_name, s3_key)).await {
.get_object() Ok(data) => {
.bucket(&bucket_name) let bytes: Vec<u8> = data.to_vec();
.key(&s3_key) match String::from_utf8(bytes) {
.send() Ok(content) => {
.await info!("Downloaded script '{}' from MinIO", param);
{
Ok(response) => {
match response.body.collect().await {
Ok(data) => {
match String::from_utf8(data.into_bytes().to_vec()) {
Ok(content) => {
info!("Downloaded script '{}' from MinIO", param);
// Save to local cache // Save to local cache
if let Err(e) = if let Err(e) = std::fs::create_dir_all(&self.scripts_dir) {
std::fs::create_dir_all(&self.scripts_dir) warn!("Failed to create scripts directory: {}", e);
{ } else if let Err(e) = tokio::fs::write(&full_path, &content).await {
warn!("Failed to create scripts directory: {}", e); warn!("Failed to cache script locally: {}", e);
} else if let Err(e) = } else {
tokio::fs::write(&full_path, &content).await trace!("Cached script to {}", full_path.display());
{
warn!("Failed to cache script locally: {}", e);
} else {
trace!("Cached script to {}", full_path.display());
}
content
}
Err(e) => {
error!("Failed to decode script {}: {}", param, e);
self.cleanup_job_flag(&bot_id, param).await;
return;
}
} }
content
} }
Err(e) => { Err(e) => {
error!( error!("Failed to decode script {}: {}", param, e);
"Failed to read script body from MinIO {}: {}",
param, e
);
self.cleanup_job_flag(&bot_id, param).await; self.cleanup_job_flag(&bot_id, param).await;
return; return;
} }

View file

@ -28,7 +28,6 @@ pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
let state_for_blocking = Arc::clone(&state_clone); let state_for_blocking = Arc::clone(&state_clone);
let url_for_blocking = url_str.clone(); let url_for_blocking = url_str.clone();
// ---- fixed section: spawn on separate thread runtime ----
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread() let rt = tokio::runtime::Builder::new_multi_thread()
@ -76,7 +75,6 @@ pub fn get_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
.unwrap(); .unwrap();
} }
/// Enhanced security check for path traversal and unsafe paths
fn is_safe_path(path: &str) -> bool { fn is_safe_path(path: &str) -> bool {
if path.starts_with("https://") || path.starts_with("http://") { if path.starts_with("https://") || path.starts_with("http://") {
return true; return true;
@ -160,11 +158,11 @@ pub async fn get_from_bucket(
return Err("Invalid file path".into()); return Err("Invalid file path".into());
} }
let s3_client = match &state.s3_client { let s3_operator = match &state.s3_operator {
Some(client) => client, Some(operator) => operator,
None => { None => {
error!("S3 client not configured"); error!("S3 operator not configured");
return Err("S3 client not configured".into()); return Err("S3 operator not configured".into());
} }
}; };
@ -177,7 +175,7 @@ pub async fn get_from_bucket(
"App configuration missing".into() "App configuration missing".into()
})?; })?;
let org_prefix = &cfg.minio.org_prefix; let org_prefix = &cfg.drive.org_prefix;
if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') { if org_prefix.contains("..") || org_prefix.contains('/') || org_prefix.contains('\\') {
error!("Invalid org_prefix: {}", org_prefix); error!("Invalid org_prefix: {}", org_prefix);
@ -189,46 +187,22 @@ pub async fn get_from_bucket(
bucket bucket
}; };
match s3_client.head_bucket().bucket(&bucket_name).send().await { let response = match tokio::time::timeout(
Ok(_) => debug!("Bucket exists: {}", bucket_name), Duration::from_secs(30),
Err(e) => { s3_operator.read(&format!("{}/{}", bucket_name, file_path))
error!("Bucket inaccessible: {} - {}", bucket_name, e); ).await {
return Err(format!("Bucket inaccessible: {}", e).into());
}
}
let get_object_future = s3_client
.get_object()
.bucket(&bucket_name)
.key(file_path)
.send();
let response = match tokio::time::timeout(Duration::from_secs(30), get_object_future).await {
Ok(Ok(response)) => response, Ok(Ok(response)) => response,
Ok(Err(e)) => { Ok(Err(e)) => {
error!("S3 get_object failed: {}", e); error!("S3 read failed: {}", e);
return Err(format!("S3 operation failed: {}", e).into()); return Err(format!("S3 operation failed: {}", e).into());
} }
Err(_) => { Err(_) => {
error!("S3 get_object timed out"); error!("S3 read timed out");
return Err("S3 operation timed out".into()); return Err("S3 operation timed out".into());
} }
}; };
let body_future = response.body.collect(); let bytes = response.to_vec();
let data = match tokio::time::timeout(Duration::from_secs(30), body_future).await {
Ok(Ok(data)) => data,
Ok(Err(e)) => {
error!("Failed to collect S3 response body: {}", e);
return Err(format!("Failed to read S3 response: {}", e).into());
}
Err(_) => {
error!("Timeout collecting S3 response body");
return Err("Timeout reading S3 response body".into());
}
};
let bytes = data.into_bytes().to_vec();
debug!( debug!(
"Retrieved {} bytes from S3 for key: {}", "Retrieved {} bytes from S3 for key: {}",
bytes.len(), bytes.len(),

View file

@ -1,18 +1,24 @@
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::package_manager::{ InstallMode, PackageManager }; use crate::package_manager::{InstallMode, PackageManager};
use anyhow::Result; use anyhow::Result;
use diesel::connection::SimpleConnection; use diesel::connection::SimpleConnection;
use diesel::Connection;
use dotenvy::dotenv;
use log::{ info, trace, error };
use aws_sdk_s3::Client as S3Client;
use csv;
use diesel::RunQueryDsl; use diesel::RunQueryDsl;
use diesel::{Connection, QueryableByName};
use dotenvy::dotenv;
use log::{error, info, trace};
use opendal::Operator;
use rand::distr::Alphanumeric; use rand::distr::Alphanumeric;
use sha2::{ Digest, Sha256 }; use rand::Rng;
use sha2::{Digest, Sha256};
use std::io::{self, Write};
use std::path::Path; use std::path::Path;
use std::process::Command; use std::process::Command;
use std::io::{ self, Write };
#[derive(QueryableByName)]
struct BotIdRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: uuid::Uuid,
}
pub struct ComponentInfo { pub struct ComponentInfo {
pub name: &'static str, pub name: &'static str,
@ -22,105 +28,165 @@ pub struct ComponentInfo {
pub struct BootstrapManager { pub struct BootstrapManager {
pub install_mode: InstallMode, pub install_mode: InstallMode,
pub tenant: Option<String>, pub tenant: Option<String>,
pub s3_operator: Operator,
} }
impl BootstrapManager { impl BootstrapManager {
pub fn new(install_mode: InstallMode, tenant: Option<String>) -> Self { pub fn new(install_mode: InstallMode, tenant: Option<String>) -> Self {
trace!( info!(
"Initializing BootstrapManager with mode {:?} and tenant {:?}", "Initializing BootstrapManager with mode {:?} and tenant {:?}",
install_mode, install_mode, tenant
tenant
); );
let config = AppConfig::from_env();
let s3_operator = Self::create_s3_operator(&config);
Self { Self {
install_mode, install_mode,
tenant, tenant,
s3_operator,
} }
} }
pub fn start_all(&mut self) -> Result<()> { pub fn start_all(&mut self) -> Result<()> {
let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?; let pm = PackageManager::new(self.install_mode.clone(), self.tenant.clone())?;
let components = vec![ let components = vec![
ComponentInfo { name: "tables", termination_command: "pg_ctl" }, ComponentInfo {
ComponentInfo { name: "cache", termination_command: "valkey-server" }, name: "tables",
ComponentInfo { name: "drive", termination_command: "minio" }, termination_command: "pg_ctl",
ComponentInfo { name: "llm", termination_command: "llama-server" }, },
ComponentInfo { name: "email", termination_command: "stalwart" }, ComponentInfo {
ComponentInfo { name: "proxy", termination_command: "caddy" }, name: "cache",
ComponentInfo { name: "directory", termination_command: "zitadel" }, termination_command: "valkey-server",
ComponentInfo { name: "alm", termination_command: "forgejo" }, },
ComponentInfo { name: "alm_ci", termination_command: "forgejo-runner" }, ComponentInfo {
ComponentInfo { name: "dns", termination_command: "coredns" }, name: "drive",
ComponentInfo { name: "webmail", termination_command: "php" }, termination_command: "minio",
ComponentInfo { name: "meeting", termination_command: "livekit-server" }, },
ComponentInfo { name: "table_editor", termination_command: "nocodb" }, ComponentInfo {
ComponentInfo { name: "doc_editor", termination_command: "coolwsd" }, name: "llm",
ComponentInfo { name: "desktop", termination_command: "xrdp" }, termination_command: "llama-server",
ComponentInfo { name: "devtools", termination_command: "" }, },
ComponentInfo { name: "bot", termination_command: "" }, ComponentInfo {
ComponentInfo { name: "system", termination_command: "" }, name: "email",
ComponentInfo { name: "vector_db", termination_command: "qdrant" }, termination_command: "stalwart",
ComponentInfo { name: "host", termination_command: "" } },
ComponentInfo {
name: "proxy",
termination_command: "caddy",
},
ComponentInfo {
name: "directory",
termination_command: "zitadel",
},
ComponentInfo {
name: "alm",
termination_command: "forgejo",
},
ComponentInfo {
name: "alm_ci",
termination_command: "forgejo-runner",
},
ComponentInfo {
name: "dns",
termination_command: "coredns",
},
ComponentInfo {
name: "webmail",
termination_command: "php",
},
ComponentInfo {
name: "meeting",
termination_command: "livekit-server",
},
ComponentInfo {
name: "table_editor",
termination_command: "nocodb",
},
ComponentInfo {
name: "doc_editor",
termination_command: "coolwsd",
},
ComponentInfo {
name: "desktop",
termination_command: "xrdp",
},
ComponentInfo {
name: "devtools",
termination_command: "",
},
ComponentInfo {
name: "bot",
termination_command: "",
},
ComponentInfo {
name: "system",
termination_command: "",
},
ComponentInfo {
name: "vector_db",
termination_command: "qdrant",
},
ComponentInfo {
name: "host",
termination_command: "",
},
]; ];
for component in components { for component in components {
if pm.is_installed(component.name) { if pm.is_installed(component.name) {
trace!("Starting component: {}", component.name);
pm.start(component.name)?; pm.start(component.name)?;
} else { } else {
trace!("Component {} not installed, skipping start", component.name); let database_url = std::env::var("DATABASE_URL")
if let Err(e) = self.update_bot_config(component.name) { .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
let mut conn = diesel::pg::PgConnection::establish(&database_url)
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
let default_bot_id: uuid::Uuid = diesel::sql_query("SELECT id FROM bots LIMIT 1")
.get_result::<BotIdRow>(&mut conn)
.map(|row| row.id)
.unwrap_or_else(|_| uuid::Uuid::new_v4());
if let Err(e) = self.update_bot_config(&default_bot_id, component.name) {
error!( error!(
"Failed to update bot config after installing {}: {}", "Failed to update bot config after installing {}: {}",
component.name, component.name, e
e
); );
} }
} }
} }
Ok(()) Ok(())
} }
pub fn bootstrap(&mut self) -> Result<AppConfig> { pub fn bootstrap(&mut self) -> Result<AppConfig> {
// Check for legacy mode - if TABLES_SERVER is present, skip bootstrap
if let Ok(tables_server) = std::env::var("TABLES_SERVER") { if let Ok(tables_server) = std::env::var("TABLES_SERVER") {
if !tables_server.is_empty() { if !tables_server.is_empty() {
trace!( info!(
"Legacy mode detected (TABLES_SERVER present), skipping bootstrap installation" "Legacy mode detected (TABLES_SERVER present), skipping bootstrap installation"
); );
info!("Running in legacy mode with existing database configuration");
// Try to connect to the database and load config
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| { let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
let username = std::env let username =
::var("TABLES_USERNAME") std::env::var("TABLES_USERNAME").unwrap_or_else(|_| "postgres".to_string());
.unwrap_or_else(|_| "postgres".to_string()); let password =
let password = std::env std::env::var("TABLES_PASSWORD").unwrap_or_else(|_| "postgres".to_string());
::var("TABLES_PASSWORD") let server =
.unwrap_or_else(|_| "postgres".to_string()); std::env::var("TABLES_SERVER").unwrap_or_else(|_| "localhost".to_string());
let server = std::env
::var("TABLES_SERVER")
.unwrap_or_else(|_| "localhost".to_string());
let port = std::env::var("TABLES_PORT").unwrap_or_else(|_| "5432".to_string()); let port = std::env::var("TABLES_PORT").unwrap_or_else(|_| "5432".to_string());
let database = std::env let database =
::var("TABLES_DATABASE") std::env::var("TABLES_DATABASE").unwrap_or_else(|_| "gbserver".to_string());
.unwrap_or_else(|_| "gbserver".to_string()); format!(
format!("postgres://{}:{}@{}:{}/{}", username, password, server, port, database) "postgres://{}:{}@{}:{}/{}",
username, password, server, port, database
)
}); });
match diesel::PgConnection::establish(&database_url) { match diesel::PgConnection::establish(&database_url) {
Ok(mut conn) => { Ok(mut conn) => {
info!("Successfully connected to legacy database, loading configuration");
// Apply migrations
if let Err(e) = self.apply_migrations(&mut conn) { if let Err(e) = self.apply_migrations(&mut conn) {
log::warn!("Failed to apply migrations: {}", e); log::warn!("Failed to apply migrations: {}", e);
} }
return Ok(AppConfig::from_database(&mut conn)); return Ok(AppConfig::from_database(&mut conn));
} }
Err(e) => { Err(e) => {
log::warn!("Failed to connect to legacy database: {}", e); log::warn!("Failed to connect to legacy database: {}", e);
info!("Using environment variables as fallback");
return Ok(AppConfig::from_env()); return Ok(AppConfig::from_env());
} }
} }
@ -133,16 +199,17 @@ impl BootstrapManager {
for component in required_components { for component in required_components {
if !pm.is_installed(component) { if !pm.is_installed(component) {
// Determine termination command from package manager component config let termination_cmd = pm
let termination_cmd = pm.components .components
.get(component) .get(component)
.and_then(|cfg| cfg.binary_name.clone()) .and_then(|cfg| cfg.binary_name.clone())
.unwrap_or_else(|| component.to_string()); .unwrap_or_else(|| component.to_string());
// If a termination command is defined, check for leftover running process
if !termination_cmd.is_empty() { if !termination_cmd.is_empty() {
let check = Command::new("pgrep").arg("-f").arg(&termination_cmd).output(); let check = Command::new("pgrep")
.arg("-f")
.arg(&termination_cmd)
.output();
if let Ok(output) = check { if let Ok(output) = check {
if !output.stdout.is_empty() { if !output.stdout.is_empty() {
println!("Component '{}' appears to be already running from a previous install.", component); println!("Component '{}' appears to be already running from a previous install.", component);
@ -157,7 +224,10 @@ impl BootstrapManager {
.status(); .status();
println!("Terminated existing '{}' process.", component); println!("Terminated existing '{}' process.", component);
} else { } else {
println!("Skipping start of '{}' as it is already running.", component); println!(
"Skipping start of '{}' as it is already running.",
component
);
continue; continue;
} }
} }
@ -167,29 +237,20 @@ impl BootstrapManager {
if component == "tables" { if component == "tables" {
let db_password = self.generate_secure_password(16); let db_password = self.generate_secure_password(16);
let farm_password = self.generate_secure_password(32); let farm_password = self.generate_secure_password(32);
let env_contents = format!( let env_contents = format!(
"FARM_PASSWORD={}\nDATABASE_URL=postgres://gbuser:{}@localhost:5432/botserver", "FARM_PASSWORD={}\nDATABASE_URL=postgres://gbuser:{}@localhost:5432/botserver",
farm_password, farm_password, db_password
db_password
); );
std::fs::write(".env", &env_contents)
std::fs
::write(".env", &env_contents)
.map_err(|e| anyhow::anyhow!("Failed to write .env file: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to write .env file: {}", e))?;
dotenv().ok(); dotenv().ok();
trace!("Generated database credentials and wrote to .env file");
} }
trace!("Installing required component: {}", component);
futures::executor::block_on(pm.install(component))?; futures::executor::block_on(pm.install(component))?;
if component == "tables" { if component == "tables" {
trace!("Component {} installed successfully", component);
let database_url = std::env::var("DATABASE_URL").unwrap(); let database_url = std::env::var("DATABASE_URL").unwrap();
let mut conn = diesel::PgConnection let mut conn = diesel::PgConnection::establish(&database_url)
::establish(&database_url)
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
let migration_dir = include_dir::include_dir!("./migrations"); let migration_dir = include_dir::include_dir!("./migrations");
@ -197,27 +258,21 @@ impl BootstrapManager {
.files() .files()
.filter_map(|file| { .filter_map(|file| {
let path = file.path(); let path = file.path();
trace!("Found file: {:?}", path);
if path.extension()? == "sql" { if path.extension()? == "sql" {
trace!(" -> SQL file included");
Some(file) Some(file)
} else { } else {
trace!(" -> Not a SQL file, skipping");
None None
} }
}) })
.collect(); .collect();
trace!("Total migration files found: {}", migration_files.len());
migration_files.sort_by_key(|f| f.path()); migration_files.sort_by_key(|f| f.path());
for migration_file in migration_files { for migration_file in migration_files {
let migration = migration_file let migration = migration_file
.contents_utf8() .contents_utf8()
.ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?; .ok_or_else(|| anyhow::anyhow!("Migration file is not valid UTF-8"))?;
trace!("Executing migration: {}", migration_file.path().display());
// Use batch_execute to handle multiple statements including those with dollar-quoted strings
if let Err(e) = conn.batch_execute(migration) { if let Err(e) = conn.batch_execute(migration) {
log::error!( log::error!(
"Failed to execute migration {}: {}", "Failed to execute migration {}: {}",
@ -226,28 +281,35 @@ impl BootstrapManager {
); );
return Err(e.into()); return Err(e.into());
} }
trace!( info!(
"Successfully executed migration: {}", "Successfully executed migration: {}",
migration_file.path().display() migration_file.path().display()
); );
} }
config = AppConfig::from_database(&mut conn); config = AppConfig::from_database(&mut conn);
info!("Database migrations completed and configuration loaded");
} }
} }
} }
self.s3_operator = Self::create_s3_operator(&config);
Ok(config) Ok(config)
} }
fn generate_secure_password(&self, length: usize) -> String { fn create_s3_operator(config: &AppConfig) -> Operator {
// Ensure the Rng trait is in scope for `sample` use opendal::Scheme;
use rand::Rng; use std::collections::HashMap;
let mut rng = rand::rng(); let mut map = HashMap::new();
map.insert("endpoint".to_string(), config.drive.server.clone());
map.insert("access_key_id".to_string(), config.drive.access_key.clone());
map.insert("secret_access_key".to_string(), config.drive.secret_key.clone());
trace!("Creating S3 operator with endpoint {}", config.drive.server);
Operator::via_iter(Scheme::S3, map).expect("Failed to initialize S3 operator")
}
std::iter fn generate_secure_password(&self, length: usize) -> String {
::repeat_with(|| rng.sample(Alphanumeric) as char) let mut rng = rand::rng();
std::iter::repeat_with(|| rng.sample(Alphanumeric) as char)
.take(length) .take(length)
.collect() .collect()
} }
@ -259,207 +321,94 @@ impl BootstrapManager {
format!("{:x}", hasher.finalize()) format!("{:x}", hasher.finalize())
} }
/// Update the bot configuration after a component is installed. fn update_bot_config(&self, bot_id: &uuid::Uuid, component: &str) -> Result<()> {
/// This reads the existing `config.csv` from the default bot bucket, use diesel::sql_types::{Text, Uuid as SqlUuid};
///fix s values based on the installed component, and let database_url = std::env::var("DATABASE_URL")
/// writes the updated CSV back to the bucket. It also upserts the
/// key/value pairs into the `bot_config` table.
fn update_bot_config(&self, component: &str) -> Result<()> {
// Determine bucket name: DRIVE_ORG_PREFIX + "default.gbai"
let org_prefix = std::env
::var("DRIVE_ORG_PREFIX")
.unwrap_or_else(|_| "pragmatismo-".to_string());
let bucket_name = format!("{}default.gbai", org_prefix);
let config_key = "default.gbot/config.csv";
// Build S3 client using default SDK config (compatible with S3Client)
let s3_client = S3Client::from_conf(aws_sdk_s3::Config::builder().build());
// Attempt to download existing config.csv
let existing_csv = match
futures::executor::block_on(
s3_client.get_object().bucket(&bucket_name).key(config_key).send()
)
{
Ok(resp) => {
let data = futures::executor::block_on(resp.body.collect())?;
String::from_utf8(data.into_bytes().to_vec()).unwrap_or_default()
}
Err(_) => String::new(), // No existing file start fresh
};
// Parse CSV into a map
let mut config_map: std::collections::HashMap<
String,
String
> = std::collections::HashMap::new();
if !existing_csv.is_empty() {
let mut rdr = csv::ReaderBuilder
::new()
.has_headers(false)
.from_reader(existing_csv.as_bytes());
for result in rdr.records() {
if let Ok(record) = result {
if record.len() >= 2 {
config_map.insert(record[0].to_string(), record[1].to_string());
}
}
}
}
// Update configuration based on the installed component
config_map.insert(component.to_string(), "true".to_string());
// Serialize back to CSV
let mut wtr = csv::WriterBuilder
::new()
.has_headers(false)
.from_writer(vec![]);
for (k, v) in &config_map {
wtr.write_record(&[k, v])?;
}
wtr.flush()?;
let csv_bytes = wtr.into_inner()?;
// Upload updated CSV to S3
futures::executor::block_on(
s3_client
.put_object()
.bucket(&bucket_name)
.key(config_key)
.body(csv_bytes.clone().into())
.send()
)?;
// Upsert into bot_config table
let database_url = std::env
::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()); .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
let mut conn = diesel::pg::PgConnection::establish(&database_url)?; let mut conn = diesel::pg::PgConnection::establish(&database_url)?;
for (k, v) in config_map { // Ensure globally unique keys and update values atomically
diesel let config_key = format!("{}_{}", bot_id, component);
::sql_query( let config_value = "true".to_string();
"INSERT INTO bot_config (key, value) VALUES ($1, $2) \ let new_id = uuid::Uuid::new_v4();
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value"
) diesel::sql_query(
.bind::<diesel::sql_types::Text, _>(&k) "INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type)
.bind::<diesel::sql_types::Text, _>(&v) VALUES ($1, $2, $3, $4, 'string')
.execute(&mut conn)?; ON CONFLICT (config_key)
} DO UPDATE SET config_value = EXCLUDED.config_value, updated_at = NOW()",
)
.bind::<SqlUuid, _>(new_id)
.bind::<SqlUuid, _>(bot_id)
.bind::<Text, _>(&config_key)
.bind::<Text, _>(&config_value)
.execute(&mut conn)?;
Ok(()) Ok(())
} }
pub async fn upload_templates_to_minio(&self, config: &AppConfig) -> Result<()> { pub async fn upload_templates_to_drive(&self, config: &AppConfig) -> Result<()> {
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
info!("Uploading template bots to MinIO and creating bot entries...");
// First, create bot entries in database for each template
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url()); let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url());
let mut conn = diesel::PgConnection::establish(&database_url)?; let mut conn = diesel::PgConnection::establish(&database_url)?;
self.create_bots_from_templates(&mut conn)?; self.create_bots_from_templates(&mut conn)?;
let creds = Credentials::new(
&config.minio.access_key,
&config.minio.secret_key,
None,
None,
"minio"
);
let s3_config = aws_sdk_s3::Config
::builder()
.credentials_provider(creds)
.endpoint_url(&config.minio.server)
.region(Region::new("us-east-1"))
.force_path_style(true)
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.build();
let client = aws_sdk_s3::Client::from_conf(s3_config);
// Upload templates from templates/ directory
let templates_dir = Path::new("templates"); let templates_dir = Path::new("templates");
if !templates_dir.exists() { if !templates_dir.exists() {
trace!("Templates directory not found, skipping upload");
return Ok(()); return Ok(());
} }
let operator = &self.s3_operator;
// Walk through each .gbai folder in templates/
for entry in std::fs::read_dir(templates_dir)? { for entry in std::fs::read_dir(templates_dir)? {
let bot_name = templates_dir
.read_dir()?
.filter_map(|e| e.ok())
.find(|e| {
e.path().is_dir()
&& e.path()
.file_name()
.unwrap()
.to_string_lossy()
.ends_with(".gbai")
})
.map(|e| {
let name = e.path().file_name().unwrap().to_string_lossy().to_string();
name
})
.unwrap_or_else(|| "default".to_string());
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if path.is_dir()
if && path
path.is_dir() && .file_name()
path .unwrap()
.extension() .to_string_lossy()
.map(|e| e == "gbai") .ends_with(".gbai")
.unwrap_or(false)
{ {
let bot_name = path.file_name().unwrap().to_string_lossy().to_string(); let bot_name = path.file_name().unwrap().to_string_lossy().to_string();
let bucket_name = format!("{}{}", config.minio.org_prefix, bot_name); let bucket = bot_name.clone();
info!("Uploading template {} to Drive bucket {}", bot_name, bucket);
trace!("Creating bucket: {}", bucket_name); self.upload_directory_recursive(&operator, &path, &bucket, &bot_name)
.await?;
// Create bucket if it doesn't exist info!("Uploaded template {} to Drive bucket {}", bot_name, bucket);
match client.create_bucket().bucket(&bucket_name).send().await {
Ok(_) => info!("Created bucket: {}", bucket_name),
Err(e) => {
let err_str = e.to_string();
if
err_str.contains("BucketAlreadyOwnedByYou") ||
err_str.contains("BucketAlreadyExists")
{
trace!("Bucket {} already exists", bucket_name);
} else {
log::warn!("Failed to create bucket {}: {}", bucket_name, e);
}
}
}
// Upload all files recursively
self.upload_directory_recursive(&client, &path, &bucket_name, "").await?;
info!("Uploaded template bot: {}", bot_name);
} }
} }
info!("Template bots uploaded successfully");
Ok(()) Ok(())
} }
fn create_bots_from_templates(&self, conn: &mut diesel::PgConnection) -> Result<()> { fn create_bots_from_templates(&self, conn: &mut diesel::PgConnection) -> Result<()> {
use crate::shared::models::schema::bots; use crate::shared::models::schema::bots;
use diesel::prelude::*; use diesel::prelude::*;
info!("Creating bot entries from template folders...");
let templates_dir = Path::new("templates"); let templates_dir = Path::new("templates");
if !templates_dir.exists() { if !templates_dir.exists() {
trace!("Templates directory not found, skipping bot creation");
return Ok(()); return Ok(());
} }
// Walk through each .gbai folder in templates/
for entry in std::fs::read_dir(templates_dir)? { for entry in std::fs::read_dir(templates_dir)? {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if path.is_dir() && path.extension().map(|e| e == "gbai").unwrap_or(false) {
if
path.is_dir() &&
path
.extension()
.map(|e| e == "gbai")
.unwrap_or(false)
{
let bot_folder = path.file_name().unwrap().to_string_lossy().to_string(); let bot_folder = path.file_name().unwrap().to_string_lossy().to_string();
// Remove .gbai extension to get bot name
let bot_name = bot_folder.trim_end_matches(".gbai"); let bot_name = bot_folder.trim_end_matches(".gbai");
// Format the name nicely (capitalize first letter of each word)
let formatted_name = bot_name let formatted_name = bot_name
.split('_') .split('_')
.map(|word| { .map(|word| {
@ -474,7 +423,6 @@ impl BootstrapManager {
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(" "); .join(" ");
// Check if bot already exists
let existing: Option<String> = bots::table let existing: Option<String> = bots::table
.filter(bots::name.eq(&formatted_name)) .filter(bots::name.eq(&formatted_name))
.select(bots::name) .select(bots::name)
@ -482,39 +430,40 @@ impl BootstrapManager {
.optional()?; .optional()?;
if existing.is_none() { if existing.is_none() {
// Insert new bot diesel::sql_query(
diesel "INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \
::sql_query(
"INSERT INTO bots (id, name, description, llm_provider, llm_config, context_provider, context_config, is_active) \
VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)" VALUES (gen_random_uuid(), $1, $2, 'openai', '{\"model\": \"gpt-4\", \"temperature\": 0.7}', 'database', '{}', true)"
) )
.bind::<diesel::sql_types::Text, _>(&formatted_name) .bind::<diesel::sql_types::Text, _>(&formatted_name)
.bind::<diesel::sql_types::Text, _>( .bind::<diesel::sql_types::Text, _>(format!("Bot for {} template", bot_name))
format!("Bot for {} template", bot_name) .execute(conn)?;
)
.execute(conn)?;
info!("Created bot entry: {}", formatted_name);
} else { } else {
trace!("Bot already exists: {}", formatted_name); log::trace!("Bot {} already exists", formatted_name);
} }
} }
} }
info!("Bot creation from templates completed");
Ok(()) Ok(())
} }
fn upload_directory_recursive<'a>( fn upload_directory_recursive<'a>(
&'a self, &'a self,
client: &'a aws_sdk_s3::Client, client: &'a Operator,
local_path: &'a Path, local_path: &'a Path,
bucket: &'a str, bucket: &'a str,
prefix: &'a str prefix: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> { ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>> {
Box::pin(async move { Box::pin(async move {
use aws_sdk_s3::primitives::ByteStream; trace!("Checking bucket existence: {}", bucket);
if client.stat(bucket).await.is_err() {
info!("Bucket {} not found, creating it", bucket);
trace!("Creating bucket: {}", bucket);
client.create_dir(bucket).await?;
trace!("Bucket {} created successfully", bucket);
} else {
trace!("Bucket {} already exists", bucket);
}
trace!("Starting upload from local path: {}", local_path.display());
for entry in std::fs::read_dir(local_path)? { for entry in std::fs::read_dir(local_path)? {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
@ -526,39 +475,41 @@ impl BootstrapManager {
}; };
if path.is_file() { if path.is_file() {
trace!( info!(
"Uploading file: {} to bucket: {} with key: {}", "Uploading file: {} to bucket: {} with key: {}",
path.display(), path.display(),
bucket, bucket,
key key
); );
let content = std::fs::read(&path)?;
let body = ByteStream::from_path(&path).await?; trace!(
"Writing file {} to bucket {} with key {}",
client.put_object().bucket(bucket).key(&key).body(body).send().await?; path.display(),
bucket,
trace!("Uploaded: {}", key); key
);
client.write(&key, content).await?;
trace!(
"Successfully wrote file {} to bucket {}",
path.display(),
bucket
);
} else if path.is_dir() { } else if path.is_dir() {
self.upload_directory_recursive(client, &path, bucket, &key).await?; self.upload_directory_recursive(client, &path, bucket, &key)
.await?;
} }
} }
Ok(()) Ok(())
}) })
} }
fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> { fn apply_migrations(&self, conn: &mut diesel::PgConnection) -> Result<()> {
info!("Applying database migrations...");
let migrations_dir = std::path::Path::new("migrations"); let migrations_dir = std::path::Path::new("migrations");
if !migrations_dir.exists() { if !migrations_dir.exists() {
trace!("No migrations directory found, skipping");
return Ok(()); return Ok(());
} }
// Get all .sql files sorted let mut sql_files: Vec<_> = std::fs::read_dir(migrations_dir)?
let mut sql_files: Vec<_> = std::fs
::read_dir(migrations_dir)?
.filter_map(|entry| entry.ok()) .filter_map(|entry| entry.ok())
.filter(|entry| { .filter(|entry| {
entry entry
@ -575,26 +526,19 @@ impl BootstrapManager {
for entry in sql_files { for entry in sql_files {
let path = entry.path(); let path = entry.path();
let filename = path.file_name().unwrap().to_string_lossy(); let filename = path.file_name().unwrap().to_string_lossy();
trace!("Reading migration: {}", filename);
match std::fs::read_to_string(&path) { match std::fs::read_to_string(&path) {
Ok(sql) => { Ok(sql) => match conn.batch_execute(&sql) {
trace!("Applying migration: {}", filename); Err(e) => {
match conn.batch_execute(&sql) { log::warn!("Migration {} failed: {}", filename, e);
Ok(_) => info!("Applied migration: {}", filename),
Err(e) => {
// Ignore errors for already applied migrations
trace!("Migration {} result: {}", filename, e);
}
} }
} _ => {}
},
Err(e) => { Err(e) => {
log::warn!("Failed to read migration {}: {}", filename, e); log::warn!("Failed to read migration {}: {}", filename, e);
} }
} }
} }
info!("Migrations check completed");
Ok(()) Ok(())
} }
} }

View file

@ -8,14 +8,13 @@ use std::sync::{Arc, Mutex};
#[derive(Clone)] #[derive(Clone)]
pub struct AppConfig { pub struct AppConfig {
pub minio: DriveConfig, pub drive: DriveConfig,
pub server: ServerConfig, pub server: ServerConfig,
pub database: DatabaseConfig, pub database: DatabaseConfig,
pub database_custom: DatabaseConfig, pub database_custom: DatabaseConfig,
pub email: EmailConfig, pub email: EmailConfig,
pub ai: AIConfig, pub ai: AIConfig,
pub site_path: String, pub site_path: String,
pub s3_bucket: String,
pub stack_path: PathBuf, pub stack_path: PathBuf,
pub db_conn: Option<Arc<Mutex<PgConnection>>>, pub db_conn: Option<Arc<Mutex<PgConnection>>>,
} }
@ -218,7 +217,7 @@ impl AppConfig {
}; };
AppConfig { AppConfig {
minio, drive: minio,
server: ServerConfig { server: ServerConfig {
host: get_str("SERVER_HOST", "127.0.0.1"), host: get_str("SERVER_HOST", "127.0.0.1"),
port: get_u16("SERVER_PORT", 8080), port: get_u16("SERVER_PORT", 8080),
@ -227,7 +226,6 @@ impl AppConfig {
database_custom, database_custom,
email, email,
ai, ai,
s3_bucket: get_str("DRIVE_BUCKET", "default"),
site_path: get_str("SITES_ROOT", "./botserver-stack/sites"), site_path: get_str("SITES_ROOT", "./botserver-stack/sites"),
stack_path, stack_path,
db_conn: None, db_conn: None,
@ -300,7 +298,7 @@ impl AppConfig {
}; };
AppConfig { AppConfig {
minio, drive: minio,
server: ServerConfig { server: ServerConfig {
host: std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()), host: std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()),
port: std::env::var("SERVER_PORT") port: std::env::var("SERVER_PORT")
@ -312,7 +310,6 @@ impl AppConfig {
database_custom, database_custom,
email, email,
ai, ai,
s3_bucket: std::env::var("DRIVE_BUCKET").unwrap_or_else(|_| "default".to_string()),
site_path: std::env::var("SITES_ROOT") site_path: std::env::var("SITES_ROOT")
.unwrap_or_else(|_| "./botserver-stack/sites".to_string()), .unwrap_or_else(|_| "./botserver-stack/sites".to_string()),
stack_path: PathBuf::from(stack_path), stack_path: PathBuf::from(stack_path),
@ -458,12 +455,14 @@ impl ConfigManager {
let key = parts[0].trim(); let key = parts[0].trim();
let value = parts[1].trim(); let value = parts[1].trim();
diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES (gen_random_uuid()::text, $1, $2, $3, 'string') ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = EXCLUDED.config_value, updated_at = NOW()") let new_id: uuid::Uuid = uuid::Uuid::new_v4();
.bind::<diesel::sql_types::Uuid, _>(bot_id) diesel::sql_query("INSERT INTO bot_configuration (id, bot_id, config_key, config_value, config_type) VALUES ($1, $2, $3, $4, 'string') ON CONFLICT (bot_id, config_key) DO UPDATE SET config_value = EXCLUDED.config_value, updated_at = NOW()")
.bind::<diesel::sql_types::Text, _>(key) .bind::<diesel::sql_types::Uuid, _>(new_id)
.bind::<diesel::sql_types::Text, _>(value) .bind::<diesel::sql_types::Uuid, _>(bot_id)
.execute(&mut *conn) .bind::<diesel::sql_types::Text, _>(key)
.map_err(|e| format!("Failed to update config: {}", e))?; .bind::<diesel::sql_types::Text, _>(value)
.execute(&mut *conn)
.map_err(|e| format!("Failed to update config: {}", e))?;
updated += 1; updated += 1;
} }

View file

@ -2,14 +2,13 @@ use crate::basic::compiler::BasicCompiler;
use crate::kb::embeddings; use crate::kb::embeddings;
use crate::kb::qdrant_client; use crate::kb::qdrant_client;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use aws_sdk_s3::Client as S3Client;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use opendal::Operator;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
/// Tracks file state for change detection
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FileState { pub struct FileState {
pub path: String, pub path: String,
@ -18,7 +17,6 @@ pub struct FileState {
pub last_modified: Option<String>, pub last_modified: Option<String>,
} }
/// Drive monitor that watches for changes and triggers compilation/indexing
pub struct DriveMonitor { pub struct DriveMonitor {
state: Arc<AppState>, state: Arc<AppState>,
bucket_name: String, bucket_name: String,
@ -34,18 +32,12 @@ impl DriveMonitor {
} }
} }
/// Start the drive monitoring service
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> { pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
info!( info!("Drive Monitor service started for bucket: {}", self.bucket_name);
"Drive Monitor service started for bucket: {}", let mut tick = interval(Duration::from_secs(30));
self.bucket_name
);
let mut tick = interval(Duration::from_secs(30)); // Check every 30 seconds
loop { loop {
tick.tick().await; tick.tick().await;
if let Err(e) = self.check_for_changes().await { if let Err(e) = self.check_for_changes().await {
error!("Error checking for drive changes: {}", e); error!("Error checking for drive changes: {}", e);
} }
@ -53,101 +45,65 @@ impl DriveMonitor {
}) })
} }
/// Check for file changes in the drive
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> { async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
let s3_client = match &self.state.s3_client { let op = match &self.state.s3_operator {
Some(client) => client, Some(op) => op,
None => { None => {
debug!("S3 client not configured");
return Ok(()); return Ok(());
} }
}; };
// Check .gbdialog folder for BASIC tools self.check_gbdialog_changes(op).await?;
self.check_gbdialog_changes(s3_client).await?; self.check_gbkb_changes(op).await?;
// Check .gbkb folder for KB documents if let Err(e) = self.check_default_gbot(op).await {
self.check_gbkb_changes(s3_client).await?;
// Check for default bot configuration in the drive bucket
if let Err(e) = self.check_default_gbot(s3_client).await {
error!("Error checking default bot config: {}", e); error!("Error checking default bot config: {}", e);
} }
Ok(()) Ok(())
} }
/// Check .gbdialog folder for BASIC tool changes
async fn check_gbdialog_changes( async fn check_gbdialog_changes(
&self, &self,
s3_client: &S3Client, op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbdialog/"; let prefix = ".gbdialog/";
debug!("Checking {} folder for changes", prefix);
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new(); let mut current_files = HashMap::new();
loop { let mut lister = op.lister_with(prefix).recursive(true).await?;
let mut list_request = s3_client while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
.list_objects_v2() let path = entry.path().to_string();
.bucket(&self.bucket_name)
.prefix(prefix);
if let Some(token) = continuation_token { if path.ends_with('/') || !path.ends_with(".bas") {
list_request = list_request.continuation_token(token); continue;
} }
let list_result = list_request.send().await?; let meta = op.stat(&path).await?;
let file_state = FileState {
if let Some(contents) = list_result.contents { path: path.clone(),
for object in contents { size: meta.content_length() as i64,
if let Some(key) = object.key { etag: meta.etag().unwrap_or_default().to_string(),
// Skip directories and non-.bas files last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
if key.ends_with('/') || !key.ends_with(".bas") { };
continue; current_files.insert(path, file_state);
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
}
if list_result.is_truncated.unwrap_or(false) {
continuation_token = list_result.next_continuation_token;
} else {
break;
}
} }
// Compare with previous state and handle changes
let mut file_states = self.file_states.write().await; let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() { for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) { if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag { if current_state.etag != previous_state.etag {
info!("BASIC tool modified: {}", path); if let Err(e) = self.compile_tool(op, path).await {
if let Err(e) = self.compile_tool(s3_client, path).await {
error!("Failed to compile tool {}: {}", path, e); error!("Failed to compile tool {}: {}", path, e);
} }
} }
} else { } else {
// New file if let Err(e) = self.compile_tool(op, path).await {
info!("New BASIC tool detected: {}", path);
if let Err(e) = self.compile_tool(s3_client, path).await {
error!("Failed to compile tool {}: {}", path, e); error!("Failed to compile tool {}: {}", path, e);
} }
} }
} }
// Check for deleted files
let previous_paths: Vec<String> = file_states let previous_paths: Vec<String> = file_states
.keys() .keys()
.filter(|k| k.starts_with(prefix)) .filter(|k| k.starts_with(prefix))
@ -156,13 +112,10 @@ impl DriveMonitor {
for path in previous_paths { for path in previous_paths {
if !current_files.contains_key(&path) { if !current_files.contains_key(&path) {
info!("BASIC tool deleted: {}", path);
// TODO: Mark tool as inactive in database
file_states.remove(&path); file_states.remove(&path);
} }
} }
// Update state with current files
for (path, state) in current_files { for (path, state) in current_files {
file_states.insert(path, state); file_states.insert(path, state);
} }
@ -170,84 +123,52 @@ impl DriveMonitor {
Ok(()) Ok(())
} }
/// Check .gbkb folder for KB document changes
async fn check_gbkb_changes( async fn check_gbkb_changes(
&self, &self,
s3_client: &S3Client, op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
let prefix = ".gbkb/"; let prefix = ".gbkb/";
debug!("Checking {} folder for changes", prefix);
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new(); let mut current_files = HashMap::new();
loop { let mut lister = op.lister_with(prefix).recursive(true).await?;
let mut list_request = s3_client while let Some(entry) = futures::TryStreamExt::try_next(&mut lister).await? {
.list_objects_v2() let path = entry.path().to_string();
.bucket(&self.bucket_name)
.prefix(prefix);
if let Some(token) = continuation_token { if path.ends_with('/') {
list_request = list_request.continuation_token(token); continue;
} }
let list_result = list_request.send().await?; let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
if let Some(contents) = list_result.contents { continue;
for object in contents {
if let Some(key) = object.key {
// Skip directories
if key.ends_with('/') {
continue;
}
// Only process supported file types
let ext = key.rsplit('.').next().unwrap_or("").to_lowercase();
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
continue;
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
} }
if list_result.is_truncated.unwrap_or(false) { let meta = op.stat(&path).await?;
continuation_token = list_result.next_continuation_token; let file_state = FileState {
} else { path: path.clone(),
break; size: meta.content_length() as i64,
} etag: meta.etag().unwrap_or_default().to_string(),
last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
};
current_files.insert(path, file_state);
} }
// Compare with previous state and handle changes
let mut file_states = self.file_states.write().await; let mut file_states = self.file_states.write().await;
for (path, current_state) in current_files.iter() { for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) { if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag { if current_state.etag != previous_state.etag {
info!("KB document modified: {}", path); if let Err(e) = self.index_document(op, path).await {
if let Err(e) = self.index_document(s3_client, path).await {
error!("Failed to index document {}: {}", path, e); error!("Failed to index document {}: {}", path, e);
} }
} }
} else { } else {
// New file if let Err(e) = self.index_document(op, path).await {
info!("New KB document detected: {}", path);
if let Err(e) = self.index_document(s3_client, path).await {
error!("Failed to index document {}: {}", path, e); error!("Failed to index document {}: {}", path, e);
} }
} }
} }
// Check for deleted files
let previous_paths: Vec<String> = file_states let previous_paths: Vec<String> = file_states
.keys() .keys()
.filter(|k| k.starts_with(prefix)) .filter(|k| k.starts_with(prefix))
@ -256,13 +177,10 @@ impl DriveMonitor {
for path in previous_paths { for path in previous_paths {
if !current_files.contains_key(&path) { if !current_files.contains_key(&path) {
info!("KB document deleted: {}", path);
// TODO: Delete from Qdrant and mark in database
file_states.remove(&path); file_states.remove(&path);
} }
} }
// Update state with current files
for (path, state) in current_files { for (path, state) in current_files {
file_states.insert(path, state); file_states.insert(path, state);
} }
@ -270,76 +188,36 @@ impl DriveMonitor {
Ok(()) Ok(())
} }
/// Check for default bot configuration in the drive bucket
async fn check_default_gbot( async fn check_default_gbot(
&self, &self,
s3_client: &S3Client, op: &Operator,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
// The default bot configuration is expected at:
// <bucket>/<DRIVE_ORG_PREFIX>default.gbai/default.gbot/config.csv
// Construct the expected key prefix
let prefix = format!("{}default.gbot/", self.bucket_name); let prefix = format!("{}default.gbot/", self.bucket_name);
let config_key = format!("{}config.csv", prefix); let config_key = format!("{}config.csv", prefix);
debug!("Checking for default bot config at key: {}", config_key); match op.stat(&config_key).await {
// Attempt to get the object metadata to see if it exists
let head_req = s3_client
.head_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await;
match head_req {
Ok(_) => { Ok(_) => {
info!("Default bot config found, downloading {}", config_key); let content = op.read(&config_key).await?;
// Download the CSV file let csv_content = String::from_utf8(content.to_vec())
let get_resp = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(&config_key)
.send()
.await?;
let data = get_resp.body.collect().await?;
let csv_content = String::from_utf8(data.into_bytes().to_vec())
.map_err(|e| format!("UTF-8 error in config.csv: {}", e))?; .map_err(|e| format!("UTF-8 error in config.csv: {}", e))?;
debug!("Found config.csv: {} bytes", csv_content.len());
// Log the retrieved configuration (in a real implementation this would be parsed
// and used to populate the bot_config table, respecting overrides from .gbot files)
info!("Retrieved default bot config CSV:\n{}", csv_content);
// TODO: Parse CSV and upsert into bot_config table with appropriate precedence
Ok(()) Ok(())
} }
Err(e) => { Err(e) => {
// If the object does not exist, simply ignore debug!("Config file not found or inaccessible: {}", e);
debug!("Default bot config not present: {}", e);
Ok(()) Ok(())
} }
} }
} }
/// Compile a BASIC tool file
async fn compile_tool( async fn compile_tool(
&self, &self,
s3_client: &S3Client, op: &Operator,
file_path: &str, file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Compiling BASIC tool: {}", file_path); let content = op.read(file_path).await?;
let source_content = String::from_utf8(content.to_vec())?;
// Download source from S3
let get_response = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let data = get_response.body.collect().await?;
let source_content = String::from_utf8(data.into_bytes().to_vec())?;
// Extract tool name
let tool_name = file_path let tool_name = file_path
.strip_prefix(".gbdialog/") .strip_prefix(".gbdialog/")
.unwrap_or(file_path) .unwrap_or(file_path)
@ -347,10 +225,6 @@ impl DriveMonitor {
.unwrap_or(file_path) .unwrap_or(file_path)
.to_string(); .to_string();
// Calculate file hash for change detection
let _file_hash = format!("{:x}", source_content.len());
// Create work directory using bot from bucket name
let bot_name = self let bot_name = self
.bucket_name .bucket_name
.strip_suffix(".gbai") .strip_suffix(".gbai")
@ -358,46 +232,31 @@ impl DriveMonitor {
let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name); let work_dir = format!("./work/{}.gbai/.gbdialog", bot_name);
std::fs::create_dir_all(&work_dir)?; std::fs::create_dir_all(&work_dir)?;
// Write source to local file
let local_source_path = format!("{}/{}.bas", work_dir, tool_name); let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
std::fs::write(&local_source_path, &source_content)?; std::fs::write(&local_source_path, &source_content)?;
// Compile using BasicCompiler
let compiler = BasicCompiler::new(Arc::clone(&self.state)); let compiler = BasicCompiler::new(Arc::clone(&self.state));
let result = compiler.compile_file(&local_source_path, &work_dir)?; let result = compiler.compile_file(&local_source_path, &work_dir)?;
info!("Tool compiled successfully: {}", tool_name);
info!(" AST: {}", result.ast_path);
// Save to database
if let Some(mcp_tool) = result.mcp_tool { if let Some(mcp_tool) = result.mcp_tool {
info!( info!(
" MCP tool definition generated with {} parameters", "MCP tool definition generated with {} parameters",
mcp_tool.input_schema.properties.len() mcp_tool.input_schema.properties.len()
); );
} }
if result.openai_tool.is_some() { if result.openai_tool.is_some() {
info!(" OpenAI tool definition generated"); debug!("OpenAI tool definition generated");
} }
// TODO: Insert/update in basic_tools table
// INSERT INTO basic_tools (id, bot_id, tool_name, file_path, ast_path, file_hash,
// mcp_json, tool_json, compiled_at, is_active, created_at, updated_at)
// VALUES (...) ON CONFLICT (bot_id, tool_name) DO UPDATE SET ...
Ok(()) Ok(())
} }
/// Index a KB document
async fn index_document( async fn index_document(
&self, &self,
s3_client: &S3Client, op: &Operator,
file_path: &str, file_path: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Indexing KB document: {}", file_path);
// Extract collection name from path (.gbkb/collection_name/file.pdf)
let parts: Vec<&str> = file_path.split('/').collect(); let parts: Vec<&str> = file_path.split('/').collect();
if parts.len() < 3 { if parts.len() < 3 {
warn!("Invalid KB path structure: {}", file_path); warn!("Invalid KB path structure: {}", file_path);
@ -405,21 +264,10 @@ impl DriveMonitor {
} }
let collection_name = parts[1]; let collection_name = parts[1];
let content = op.read(file_path).await?;
let bytes = content.to_vec();
// Download file from S3
let get_response = s3_client
.get_object()
.bucket(&self.bucket_name)
.key(file_path)
.send()
.await?;
let data = get_response.body.collect().await?;
let bytes = data.into_bytes().to_vec();
// Extract text based on file type
let text_content = self.extract_text(file_path, &bytes)?; let text_content = self.extract_text(file_path, &bytes)?;
if text_content.trim().is_empty() { if text_content.trim().is_empty() {
warn!("No text extracted from: {}", file_path); warn!("No text extracted from: {}", file_path);
return Ok(()); return Ok(());
@ -431,35 +279,21 @@ impl DriveMonitor {
file_path file_path
); );
// Create Qdrant collection name
let qdrant_collection = format!("kb_default_{}", collection_name); let qdrant_collection = format!("kb_default_{}", collection_name);
// Ensure collection exists
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?; qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
// Index document
embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content) embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content)
.await?; .await?;
info!("Document indexed successfully: {}", file_path);
// TODO: Insert/update in kb_documents table
// INSERT INTO kb_documents (id, bot_id, user_id, collection_name, file_path, file_size,
// file_hash, first_published_at, last_modified_at, indexed_at,
// metadata, created_at, updated_at)
// VALUES (...) ON CONFLICT (...) DO UPDATE SET ...
Ok(()) Ok(())
} }
/// Extract text from various file types
fn extract_text( fn extract_text(
&self, &self,
file_path: &str, file_path: &str,
content: &[u8], content: &[u8],
) -> Result<String, Box<dyn Error + Send + Sync>> { ) -> Result<String, Box<dyn Error + Send + Sync>> {
let path_lower = file_path.to_ascii_lowercase(); let path_lower = file_path.to_ascii_lowercase();
if path_lower.ends_with(".pdf") { if path_lower.ends_with(".pdf") {
match pdf_extract::extract_text_from_mem(content) { match pdf_extract::extract_text_from_mem(content) {
Ok(text) => Ok(text), Ok(text) => Ok(text),
@ -472,16 +306,13 @@ impl DriveMonitor {
String::from_utf8(content.to_vec()) String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 decoding failed: {}", e).into()) .map_err(|e| format!("UTF-8 decoding failed: {}", e).into())
} else { } else {
// Try as plain text
String::from_utf8(content.to_vec()) String::from_utf8(content.to_vec())
.map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into()) .map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into())
} }
} }
/// Clear all tracked file states
pub async fn clear_state(&self) { pub async fn clear_state(&self) {
let mut states = self.file_states.write().await; let mut states = self.file_states.write().await;
states.clear(); states.clear();
info!("Cleared all file states");
} }
} }

View file

@ -1,14 +1,13 @@
use crate::config::DriveConfig;
use crate::shared::state::AppState;
use actix_multipart::Multipart; use actix_multipart::Multipart;
use actix_web::web; use actix_web::web;
use actix_web::{post, HttpResponse}; use actix_web::{post, HttpResponse};
use aws_sdk_s3::{Client, Error as S3Error}; use opendal::Operator;
use std::io::Write; use std::io::Write;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio_stream::StreamExt as TokioStreamExt; use tokio_stream::StreamExt as TokioStreamExt;
use crate::config::DriveConfig;
use crate::shared::state::AppState;
#[post("/files/upload/{folder_path}")] #[post("/files/upload/{folder_path}")]
pub async fn upload_file( pub async fn upload_file(
folder_path: web::Path<String>, folder_path: web::Path<String>,
@ -16,15 +15,11 @@ pub async fn upload_file(
state: web::Data<AppState>, state: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> { ) -> Result<HttpResponse, actix_web::Error> {
let folder_path = folder_path.into_inner(); let folder_path = folder_path.into_inner();
// Create a temporary file that will hold the uploaded data
let mut temp_file = NamedTempFile::new().map_err(|e| { let mut temp_file = NamedTempFile::new().map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to create temp file: {}", e)) actix_web::error::ErrorInternalServerError(format!("Failed to create temp file: {}", e))
})?; })?;
let mut file_name: Option<String> = None; let mut file_name: Option<String> = None;
// Process multipart form data
while let Some(mut field) = payload.try_next().await? { while let Some(mut field) = payload.try_next().await? {
if let Some(disposition) = field.content_disposition() { if let Some(disposition) = field.content_disposition() {
if let Some(name) = disposition.get_filename() { if let Some(name) = disposition.get_filename() {
@ -32,7 +27,6 @@ pub async fn upload_file(
} }
} }
// Write each chunk of the field to the temporary file
while let Some(chunk) = field.try_next().await? { while let Some(chunk) = field.try_next().await? {
temp_file.write_all(&chunk).map_err(|e| { temp_file.write_all(&chunk).map_err(|e| {
actix_web::error::ErrorInternalServerError(format!( actix_web::error::ErrorInternalServerError(format!(
@ -43,44 +37,24 @@ pub async fn upload_file(
} }
} }
// Use a fallback name if the client didn't supply one
let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string()); let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string());
// Convert the NamedTempFile into a TempPath so we can get a stable path
let temp_file_path = temp_file.into_temp_path(); let temp_file_path = temp_file.into_temp_path();
// Retrieve the bucket name from configuration, handling the case where it is missing let op = state.get_ref().s3_operator.as_ref().ok_or_else(|| {
let bucket_name = match &state.get_ref().config { actix_web::error::ErrorInternalServerError("S3 operator is not initialized")
Some(cfg) => cfg.s3_bucket.clone(),
None => {
// Clean up the temp file before returning the error
let _ = std::fs::remove_file(&temp_file_path);
return Err(actix_web::error::ErrorInternalServerError(
"S3 bucket configuration is missing",
));
}
};
// Build the S3 object key (folder + filename)
let s3_key = format!("{}/{}", folder_path, file_name);
// Retrieve a reference to the S3 client, handling the case where it is missing
let s3_client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
})?; })?;
// Perform the upload let s3_key = format!("{}/{}", folder_path, file_name);
match upload_to_s3(s3_client, &bucket_name, &s3_key, &temp_file_path).await {
match upload_to_s3(op, &s3_key, &temp_file_path).await {
Ok(_) => { Ok(_) => {
// Remove the temporary file now that the upload succeeded
let _ = std::fs::remove_file(&temp_file_path); let _ = std::fs::remove_file(&temp_file_path);
Ok(HttpResponse::Ok().body(format!( Ok(HttpResponse::Ok().body(format!(
"Uploaded file '{}' to folder '{}' in S3 bucket '{}'", "Uploaded file '{}' to folder '{}'",
file_name, folder_path, bucket_name file_name, folder_path
))) )))
} }
Err(e) => { Err(e) => {
// Ensure the temporary file is cleaned up even on failure
let _ = std::fs::remove_file(&temp_file_path); let _ = std::fs::remove_file(&temp_file_path);
Err(actix_web::error::ErrorInternalServerError(format!( Err(actix_web::error::ErrorInternalServerError(format!(
"Failed to upload file to S3: {}", "Failed to upload file to S3: {}",
@ -90,61 +64,27 @@ pub async fn upload_file(
} }
} }
// Helper function to get S3 client pub async fn init_drive(config: &DriveConfig) -> Result<Operator, Box<dyn std::error::Error>> {
pub async fn init_drive(cfg: &DriveConfig) -> Result<Client, Box<dyn std::error::Error>> { use opendal::services::S3;
// Build static credentials from the Drive configuration. use opendal::Operator;
let credentials = aws_sdk_s3::config::Credentials::new( let client = Operator::new(
cfg.access_key.clone(), S3::default()
cfg.secret_key.clone(), .root("/")
None, .endpoint(&config.server)
None, .access_key_id(&config.access_key)
"static", .secret_access_key(&config.secret_key),
); )?
.finish();
// Construct the endpoint URL, respecting the SSL flag. Ok(client)
let scheme = if cfg.use_ssl { "https" } else { "http" };
let endpoint = format!("{}://{}", scheme, cfg.server);
// MinIO requires pathstyle addressing.
let s3_config = aws_sdk_s3::config::Builder::new()
// Set the behavior version to the latest to satisfy the SDK requirement.
.behavior_version(aws_sdk_s3::config::BehaviorVersion::latest())
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.endpoint_url(endpoint)
.credentials_provider(credentials)
.force_path_style(true)
.build();
Ok(Client::from_conf(s3_config))
} }
// Helper function to upload file to S3
async fn upload_to_s3( async fn upload_to_s3(
client: &Client, op: &Operator,
bucket: &str,
key: &str, key: &str,
file_path: &std::path::Path, file_path: &std::path::Path,
) -> Result<(), S3Error> { ) -> Result<(), Box<dyn std::error::Error>> {
// Convert the file at `file_path` into a ByteStream, mapping any I/O error let data = std::fs::read(file_path)?;
// into the appropriate `SdkError` type expected by the function signature. op.write(key, data).await?;
let body = aws_sdk_s3::primitives::ByteStream::from_path(file_path)
.await
.map_err(|e| {
aws_sdk_s3::error::SdkError::<
aws_sdk_s3::operation::put_object::PutObjectError,
aws_sdk_s3::primitives::ByteStream,
>::construction_failure(e)
})?;
// Perform the actual upload to S3.
client
.put_object()
.bucket(bucket)
.key(key)
.body(body)
.send()
.await
.map(|_| ())?; // Convert the successful output to `()`.
Ok(()) Ok(())
} }

View file

@ -1,12 +1,12 @@
use crate::shared::state::AppState; use crate::shared::state::AppState;
use aws_sdk_s3::Client as S3Client; use log::error;
use log::{debug, error, info}; use opendal::Operator;
use tokio_stream::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
/// MinIO file state tracker
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct FileState { pub struct FileState {
pub path: String, pub path: String,
@ -15,52 +15,41 @@ pub struct FileState {
pub last_modified: Option<String>, pub last_modified: Option<String>,
} }
/// MinIO handler that monitors bucket changes
pub struct MinIOHandler { pub struct MinIOHandler {
state: Arc<AppState>, state: Arc<AppState>,
bucket_name: String,
watched_prefixes: Arc<tokio::sync::RwLock<Vec<String>>>, watched_prefixes: Arc<tokio::sync::RwLock<Vec<String>>>,
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>, file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
} }
impl MinIOHandler { impl MinIOHandler {
pub fn new(state: Arc<AppState>, bucket_name: String) -> Self { pub fn new(state: Arc<AppState>) -> Self {
Self { Self {
state, state,
bucket_name,
watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())), watched_prefixes: Arc::new(tokio::sync::RwLock::new(Vec::new())),
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())), file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
} }
} }
/// Add a prefix to watch (e.g., ".gbkb/", ".gbdialog/")
pub async fn watch_prefix(&self, prefix: String) { pub async fn watch_prefix(&self, prefix: String) {
let mut prefixes = self.watched_prefixes.write().await; let mut prefixes = self.watched_prefixes.write().await;
if !prefixes.contains(&prefix) { if !prefixes.contains(&prefix) {
prefixes.push(prefix.clone()); prefixes.push(prefix.clone());
info!("Now watching MinIO prefix: {}", prefix);
} }
} }
/// Remove a prefix from watch list
pub async fn unwatch_prefix(&self, prefix: &str) { pub async fn unwatch_prefix(&self, prefix: &str) {
let mut prefixes = self.watched_prefixes.write().await; let mut prefixes = self.watched_prefixes.write().await;
prefixes.retain(|p| p != prefix); prefixes.retain(|p| p != prefix);
info!("Stopped watching MinIO prefix: {}", prefix);
} }
/// Start the monitoring service
pub fn spawn( pub fn spawn(
self: Arc<Self>, self: Arc<Self>,
change_callback: Arc<dyn Fn(FileChangeEvent) + Send + Sync>, change_callback: Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> tokio::task::JoinHandle<()> { ) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
info!("MinIO Handler service started"); let mut tick = interval(Duration::from_secs(15));
let mut tick = interval(Duration::from_secs(15)); // Check every 15 seconds
loop { loop {
tick.tick().await; tick.tick().await;
if let Err(e) = self.check_for_changes(&change_callback).await { if let Err(e) = self.check_for_changes(&change_callback).await {
error!("Error checking for MinIO changes: {}", e); error!("Error checking for MinIO changes: {}", e);
} }
@ -68,93 +57,58 @@ impl MinIOHandler {
}) })
} }
/// Check for file changes in watched prefixes
async fn check_for_changes( async fn check_for_changes(
&self, &self,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>, callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
let s3_client = match &self.state.s3_client { let op = match &self.state.s3_operator {
Some(client) => client, Some(op) => op,
None => { None => {
debug!("S3 client not configured");
return Ok(()); return Ok(());
} }
}; };
let prefixes = self.watched_prefixes.read().await; let prefixes = self.watched_prefixes.read().await;
for prefix in prefixes.iter() { for prefix in prefixes.iter() {
debug!("Checking prefix: {}", prefix); if let Err(e) = self.check_prefix_changes(op, prefix, callback).await {
if let Err(e) = self.check_prefix_changes(s3_client, prefix, callback).await {
error!("Error checking prefix {}: {}", prefix, e); error!("Error checking prefix {}: {}", prefix, e);
} }
} }
Ok(()) Ok(())
} }
/// Check changes in a specific prefix
async fn check_prefix_changes( async fn check_prefix_changes(
&self, &self,
s3_client: &S3Client, op: &Operator,
prefix: &str, prefix: &str,
callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>, callback: &Arc<dyn Fn(FileChangeEvent) + Send + Sync>,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
// List all objects with the prefix
let mut continuation_token: Option<String> = None;
let mut current_files = HashMap::new(); let mut current_files = HashMap::new();
loop { let mut lister = op.lister_with(prefix).recursive(true).await?;
let mut list_request = s3_client while let Some(entry) = lister.try_next().await? {
.list_objects_v2() let path = entry.path().to_string();
.bucket(&self.bucket_name)
.prefix(prefix);
if let Some(token) = continuation_token { if path.ends_with('/') {
list_request = list_request.continuation_token(token); continue;
} }
let list_result = list_request.send().await?; let meta = op.stat(&path).await?;
let file_state = FileState {
if let Some(contents) = list_result.contents { path: path.clone(),
for object in contents { size: meta.content_length() as i64,
if let Some(key) = object.key { etag: meta.etag().unwrap_or_default().to_string(),
// Skip directories last_modified: meta.last_modified().map(|dt| dt.to_rfc3339()),
if key.ends_with('/') { };
continue; current_files.insert(path, file_state);
}
let file_state = FileState {
path: key.clone(),
size: object.size.unwrap_or(0),
etag: object.e_tag.unwrap_or_default(),
last_modified: object.last_modified.map(|dt| dt.to_string()),
};
current_files.insert(key, file_state);
}
}
}
if list_result.is_truncated.unwrap_or(false) {
continuation_token = list_result.next_continuation_token;
} else {
break;
}
} }
// Compare with previous state
let mut file_states = self.file_states.write().await; let mut file_states = self.file_states.write().await;
// Check for new or modified files
for (path, current_state) in current_files.iter() { for (path, current_state) in current_files.iter() {
if let Some(previous_state) = file_states.get(path) { if let Some(previous_state) = file_states.get(path) {
// File exists, check if modified
if current_state.etag != previous_state.etag if current_state.etag != previous_state.etag
|| current_state.size != previous_state.size || current_state.size != previous_state.size
{ {
info!("File modified: {}", path);
callback(FileChangeEvent::Modified { callback(FileChangeEvent::Modified {
path: path.clone(), path: path.clone(),
size: current_state.size, size: current_state.size,
@ -162,8 +116,6 @@ impl MinIOHandler {
}); });
} }
} else { } else {
// New file
info!("File created: {}", path);
callback(FileChangeEvent::Created { callback(FileChangeEvent::Created {
path: path.clone(), path: path.clone(),
size: current_state.size, size: current_state.size,
@ -172,7 +124,6 @@ impl MinIOHandler {
} }
} }
// Check for deleted files
let previous_paths: Vec<String> = file_states let previous_paths: Vec<String> = file_states
.keys() .keys()
.filter(|k| k.starts_with(prefix)) .filter(|k| k.starts_with(prefix))
@ -181,13 +132,11 @@ impl MinIOHandler {
for path in previous_paths { for path in previous_paths {
if !current_files.contains_key(&path) { if !current_files.contains_key(&path) {
info!("File deleted: {}", path);
callback(FileChangeEvent::Deleted { path: path.clone() }); callback(FileChangeEvent::Deleted { path: path.clone() });
file_states.remove(&path); file_states.remove(&path);
} }
} }
// Update state with current files
for (path, state) in current_files { for (path, state) in current_files {
file_states.insert(path, state); file_states.insert(path, state);
} }
@ -195,20 +144,16 @@ impl MinIOHandler {
Ok(()) Ok(())
} }
/// Get current state of a file
pub async fn get_file_state(&self, path: &str) -> Option<FileState> { pub async fn get_file_state(&self, path: &str) -> Option<FileState> {
let states = self.file_states.read().await; let states = self.file_states.read().await;
states.get(path).cloned() states.get(path).cloned()
} }
/// Clear all tracked file states
pub async fn clear_state(&self) { pub async fn clear_state(&self) {
let mut states = self.file_states.write().await; let mut states = self.file_states.write().await;
states.clear(); states.clear();
info!("Cleared all file states");
} }
/// Get all tracked files for a prefix
pub async fn get_files_by_prefix(&self, prefix: &str) -> Vec<FileState> { pub async fn get_files_by_prefix(&self, prefix: &str) -> Vec<FileState> {
let states = self.file_states.read().await; let states = self.file_states.read().await;
states states
@ -219,7 +164,6 @@ impl MinIOHandler {
} }
} }
/// File change event types
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FileChangeEvent { pub enum FileChangeEvent {
Created { Created {
@ -266,7 +210,6 @@ mod tests {
size: 100, size: 100,
etag: "abc123".to_string(), etag: "abc123".to_string(),
}; };
assert_eq!(event.path(), "test.txt"); assert_eq!(event.path(), "test.txt");
assert_eq!(event.event_type(), "created"); assert_eq!(event.event_type(), "created");
} }
@ -286,7 +229,6 @@ mod tests {
let deleted = FileChangeEvent::Deleted { let deleted = FileChangeEvent::Deleted {
path: "file3.txt".to_string(), path: "file3.txt".to_string(),
}; };
assert_eq!(created.event_type(), "created"); assert_eq!(created.event_type(), "created");
assert_eq!(modified.event_type(), "modified"); assert_eq!(modified.event_type(), "modified");
assert_eq!(deleted.event_type(), "deleted"); assert_eq!(deleted.event_type(), "deleted");

View file

@ -1,6 +1,7 @@
use crate::shared::models::KBCollection; use crate::shared::models::KBCollection;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::{debug, error, info, warn}; use log::{ error, info, warn};
use tokio_stream::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
@ -10,7 +11,6 @@ pub mod embeddings;
pub mod minio_handler; pub mod minio_handler;
pub mod qdrant_client; pub mod qdrant_client;
/// Represents a change in a KB file
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FileChangeEvent { pub enum FileChangeEvent {
Created(String), Created(String),
@ -18,7 +18,6 @@ pub enum FileChangeEvent {
Deleted(String), Deleted(String),
} }
/// KB Manager service that coordinates MinIO monitoring and Qdrant indexing
pub struct KBManager { pub struct KBManager {
state: Arc<AppState>, state: Arc<AppState>,
watched_collections: Arc<tokio::sync::RwLock<HashMap<String, KBCollection>>>, watched_collections: Arc<tokio::sync::RwLock<HashMap<String, KBCollection>>>,
@ -32,7 +31,6 @@ impl KBManager {
} }
} }
/// Start watching a KB collection folder
pub async fn add_collection( pub async fn add_collection(
&self, &self,
bot_id: String, bot_id: String,
@ -47,7 +45,6 @@ impl KBManager {
collection_name, qdrant_collection collection_name, qdrant_collection
); );
// Create Qdrant collection if it doesn't exist
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?; qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
let now = chrono::Utc::now().to_rfc3339(); let now = chrono::Utc::now().to_rfc3339();
@ -67,30 +64,23 @@ impl KBManager {
let mut collections = self.watched_collections.write().await; let mut collections = self.watched_collections.write().await;
collections.insert(collection_name.to_string(), collection); collections.insert(collection_name.to_string(), collection);
info!("KB collection added successfully: {}", collection_name);
Ok(()) Ok(())
} }
/// Remove a KB collection
pub async fn remove_collection( pub async fn remove_collection(
&self, &self,
collection_name: &str, collection_name: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut collections = self.watched_collections.write().await; let mut collections = self.watched_collections.write().await;
collections.remove(collection_name); collections.remove(collection_name);
info!("KB collection removed: {}", collection_name);
Ok(()) Ok(())
} }
/// Start the KB monitoring service
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> { pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
info!("KB Manager service started");
let mut tick = interval(Duration::from_secs(30)); let mut tick = interval(Duration::from_secs(30));
loop { loop {
tick.tick().await; tick.tick().await;
let collections = self.watched_collections.read().await; let collections = self.watched_collections.read().await;
for (name, collection) in collections.iter() { for (name, collection) in collections.iter() {
if let Err(e) = self.check_collection_updates(collection).await { if let Err(e) = self.check_collection_updates(collection).await {
@ -101,67 +91,43 @@ impl KBManager {
}) })
} }
/// Check for updates in a collection
async fn check_collection_updates( async fn check_collection_updates(
&self, &self,
collection: &KBCollection, collection: &KBCollection,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
debug!("Checking updates for collection: {}", collection.name); let op = match &self.state.s3_operator {
Some(op) => op,
let s3_client = match &self.state.s3_client {
Some(client) => client,
None => { None => {
warn!("S3 client not configured"); warn!("S3 operator not configured");
return Ok(()); return Ok(());
} }
}; };
let config = match &self.state.config { let mut lister = op.lister_with(&collection.folder_path).recursive(true).await?;
Some(cfg) => cfg, while let Some(entry) = lister.try_next().await? {
None => { let path = entry.path().to_string();
error!("App configuration missing");
return Err("App configuration missing".into()); if path.ends_with('/') {
continue;
} }
};
let bucket_name = format!("{}default.gbai", config.minio.org_prefix); let meta = op.stat(&path).await?;
if let Err(e) = self
// List objects in the collection folder .process_file(
let list_result = s3_client &collection,
.list_objects_v2() &path,
.bucket(&bucket_name) meta.content_length() as i64,
.prefix(&collection.folder_path) meta.last_modified().map(|dt| dt.to_rfc3339()),
.send() )
.await?; .await
{
if let Some(contents) = list_result.contents { error!("Error processing file {}: {}", path, e);
for object in contents {
if let Some(key) = object.key {
// Skip directories
if key.ends_with('/') {
continue;
}
// Check if file needs indexing
if let Err(e) = self
.process_file(
&collection,
&key,
object.size.unwrap_or(0),
object.last_modified.map(|dt| dt.to_string()),
)
.await
{
error!("Error processing file {}: {}", key, e);
}
}
} }
} }
Ok(()) Ok(())
} }
/// Process a single file (check if changed and index if needed)
async fn process_file( async fn process_file(
&self, &self,
collection: &KBCollection, collection: &KBCollection,
@ -169,9 +135,7 @@ impl KBManager {
file_size: i64, file_size: i64,
_last_modified: Option<String>, _last_modified: Option<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
// Get file content hash
let content = self.get_file_content(file_path).await?; let content = self.get_file_content(file_path).await?;
// Simple hash using length and first/last bytes for change detection
let file_hash = if content.len() > 100 { let file_hash = if content.len() > 100 {
format!( format!(
"{:x}_{:x}_{}", "{:x}_{:x}_{}",
@ -183,24 +147,16 @@ impl KBManager {
format!("{:x}", content.len()) format!("{:x}", content.len())
}; };
// Check if file is already indexed with same hash
if self if self
.is_file_indexed(collection.bot_id.clone(), file_path, &file_hash) .is_file_indexed(collection.bot_id.clone(), file_path, &file_hash)
.await? .await?
{ {
debug!("File already indexed: {}", file_path);
return Ok(()); return Ok(());
} }
info!( info!("Indexing file: {} to collection {}", file_path, collection.name);
"Indexing file: {} to collection {}",
file_path, collection.name
);
// Extract text based on file type
let text_content = self.extract_text(file_path, &content).await?; let text_content = self.extract_text(file_path, &content).await?;
// Generate embeddings and store in Qdrant
embeddings::index_document( embeddings::index_document(
&self.state, &self.state,
&collection.qdrant_collection, &collection.qdrant_collection,
@ -209,7 +165,6 @@ impl KBManager {
) )
.await?; .await?;
// Save metadata to database
let metadata = serde_json::json!({ let metadata = serde_json::json!({
"file_type": self.get_file_type(file_path), "file_type": self.get_file_type(file_path),
"last_modified": _last_modified, "last_modified": _last_modified,
@ -225,48 +180,29 @@ impl KBManager {
) )
.await?; .await?;
info!("File indexed successfully: {}", file_path);
Ok(()) Ok(())
} }
/// Get file content from MinIO
async fn get_file_content( async fn get_file_content(
&self, &self,
file_path: &str, file_path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> { ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let s3_client = self let op = self
.state .state
.s3_client .s3_operator
.as_ref() .as_ref()
.ok_or("S3 client not configured")?; .ok_or("S3 operator not configured")?;
let config = self let content = op.read(file_path).await?;
.state Ok(content.to_vec())
.config
.as_ref()
.ok_or("App configuration missing")?;
let bucket_name = format!("{}default.gbai", config.minio.org_prefix);
let response = s3_client
.get_object()
.bucket(&bucket_name)
.key(file_path)
.send()
.await?;
let data = response.body.collect().await?;
Ok(data.into_bytes().to_vec())
} }
/// Extract text from various file types
async fn extract_text( async fn extract_text(
&self, &self,
file_path: &str, file_path: &str,
content: &[u8], content: &[u8],
) -> Result<String, Box<dyn Error + Send + Sync>> { ) -> Result<String, Box<dyn Error + Send + Sync>> {
let path_lower = file_path.to_ascii_lowercase(); let path_lower = file_path.to_ascii_lowercase();
if path_lower.ends_with(".pdf") { if path_lower.ends_with(".pdf") {
match pdf_extract::extract_text_from_mem(content) { match pdf_extract::extract_text_from_mem(content) {
Ok(text) => Ok(text), Ok(text) => Ok(text),
@ -279,29 +215,23 @@ impl KBManager {
String::from_utf8(content.to_vec()) String::from_utf8(content.to_vec())
.map_err(|e| format!("UTF-8 decoding failed: {}", e).into()) .map_err(|e| format!("UTF-8 decoding failed: {}", e).into())
} else if path_lower.ends_with(".docx") { } else if path_lower.ends_with(".docx") {
// TODO: Add DOCX support
warn!("DOCX format not yet supported: {}", file_path); warn!("DOCX format not yet supported: {}", file_path);
Err("DOCX format not supported".into()) Err("DOCX format not supported".into())
} else { } else {
// Try as plain text
String::from_utf8(content.to_vec()) String::from_utf8(content.to_vec())
.map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into()) .map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into())
} }
} }
/// Check if file is already indexed
async fn is_file_indexed( async fn is_file_indexed(
&self, &self,
_bot_id: String, _bot_id: String,
_file_path: &str, _file_path: &str,
_file_hash: &str, _file_hash: &str,
) -> Result<bool, Box<dyn Error + Send + Sync>> { ) -> Result<bool, Box<dyn Error + Send + Sync>> {
// TODO: Query database to check if file with same hash exists
// For now, return false to always reindex
Ok(false) Ok(false)
} }
/// Save document metadata to database
async fn save_document_metadata( async fn save_document_metadata(
&self, &self,
_bot_id: String, _bot_id: String,
@ -311,7 +241,6 @@ impl KBManager {
file_hash: &str, file_hash: &str,
_metadata: serde_json::Value, _metadata: serde_json::Value,
) -> Result<(), Box<dyn Error + Send + Sync>> { ) -> Result<(), Box<dyn Error + Send + Sync>> {
// TODO: Save to database using Diesel
info!( info!(
"Saving metadata for {}: size={}, hash={}", "Saving metadata for {}: size={}, hash={}",
file_path, file_size, file_hash file_path, file_size, file_hash
@ -319,7 +248,6 @@ impl KBManager {
Ok(()) Ok(())
} }
/// Get file type from path
fn get_file_type(&self, file_path: &str) -> String { fn get_file_type(&self, file_path: &str) -> String {
file_path file_path
.rsplit('.') .rsplit('.')

View file

@ -1,5 +1,6 @@
#![allow(dead_code)] #![allow(dead_code)]
#![cfg_attr(feature = "desktop", windows_subsystem = "windows")] #![cfg_attr(feature = "desktop", windows_subsystem = "windows")]
use actix_cors::Cors; use actix_cors::Cors;
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
@ -19,10 +20,8 @@ mod context;
mod drive_monitor; mod drive_monitor;
#[cfg(feature = "email")] #[cfg(feature = "email")]
mod email; mod email;
#[cfg(feature = "desktop")] #[cfg(feature = "desktop")]
mod ui; mod ui;
mod file; mod file;
mod kb; mod kb;
mod llm; mod llm;
@ -65,7 +64,6 @@ use crate::whatsapp::WhatsAppAdapter;
#[tokio::main] #[tokio::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
if args.len() > 1 { if args.len() > 1 {
let command = &args[1]; let command = &args[1];
match command.as_str() { match command.as_str() {
@ -93,10 +91,8 @@ async fn main() -> std::io::Result<()> {
dotenv().ok(); dotenv().ok();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")) env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.write_style(env_logger::WriteStyle::Always) .write_style(env_logger::WriteStyle::Always)
.init(); .init();
info!("Starting BotServer bootstrap process");
let install_mode = if args.contains(&"--container".to_string()) { let install_mode = if args.contains(&"--container".to_string()) {
InstallMode::Container InstallMode::Container
@ -111,38 +107,48 @@ async fn main() -> std::io::Result<()> {
}; };
let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone()); let mut bootstrap = BootstrapManager::new(install_mode.clone(), tenant.clone());
let cfg = match bootstrap.bootstrap() {
Ok(config) => { // Prevent double bootstrap: skip if environment already initialized
info!("Bootstrap completed successfully, configuration loaded from database"); let env_path = std::env::current_dir()?.join("botserver-stack").join(".env");
config let cfg = if env_path.exists() {
info!("Environment already initialized, skipping bootstrap");
match diesel::Connection::establish(
&std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()),
) {
Ok(mut conn) => AppConfig::from_database(&mut conn),
Err(_) => AppConfig::from_env(),
} }
Err(e) => { } else {
log::error!("Bootstrap failed: {}", e); match bootstrap.bootstrap() {
info!("Attempting to load configuration from database"); Ok(config) => {
match diesel::Connection::establish( info!("Bootstrap completed successfully");
&std::env::var("DATABASE_URL") config
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()), }
) { Err(e) => {
Ok(mut conn) => AppConfig::from_database(&mut conn), log::error!("Bootstrap failed: {}", e);
Err(_) => { match diesel::Connection::establish(
info!("Database not available, using environment variables as fallback"); &std::env::var("DATABASE_URL")
AppConfig::from_env() .unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string()),
) {
Ok(mut conn) => AppConfig::from_database(&mut conn),
Err(_) => AppConfig::from_env(),
} }
} }
} }
}; };
let _ = bootstrap.start_all();
// Upload template bots to MinIO on first startup let _ = bootstrap.start_all();
if let Err(e) = bootstrap.upload_templates_to_minio(&cfg).await { if let Err(e) = bootstrap.upload_templates_to_drive(&cfg).await {
log::warn!("Failed to upload templates to MinIO: {}", e); log::warn!("Failed to upload templates to MinIO: {}", e);
} }
let config = std::sync::Arc::new(cfg.clone()); // Refresh configuration from environment to ensure latest DATABASE_URL and credentials
dotenv().ok();
info!("Establishing database connection to {}", cfg.database_url()); let refreshed_cfg = AppConfig::from_env();
let db_pool = match diesel::Connection::establish(&cfg.database_url()) { let config = std::sync::Arc::new(refreshed_cfg.clone());
let db_pool = match diesel::Connection::establish(&refreshed_cfg.database_url()) {
Ok(conn) => Arc::new(Mutex::new(conn)), Ok(conn) => Arc::new(Mutex::new(conn)),
Err(e) => { Err(e) => {
log::error!("Failed to connect to main database: {}", e); log::error!("Failed to connect to main database: {}", e);
@ -154,8 +160,6 @@ async fn main() -> std::io::Result<()> {
}; };
let db_custom_pool = db_pool.clone(); let db_custom_pool = db_pool.clone();
info!("Initializing LLM server at {}", cfg.ai.endpoint);
ensure_llama_servers_running() ensure_llama_servers_running()
.await .await
.expect("Failed to initialize LLM local server"); .expect("Failed to initialize LLM local server");
@ -176,7 +180,6 @@ async fn main() -> std::io::Result<()> {
"empty".to_string(), "empty".to_string(),
Some(cfg.ai.endpoint.clone()), Some(cfg.ai.endpoint.clone()),
)); ));
let web_adapter = Arc::new(WebChannelAdapter::new()); let web_adapter = Arc::new(WebChannelAdapter::new());
let voice_adapter = Arc::new(VoiceAdapter::new( let voice_adapter = Arc::new(VoiceAdapter::new(
"https://livekit.example.com".to_string(), "https://livekit.example.com".to_string(),
@ -190,8 +193,8 @@ async fn main() -> std::io::Result<()> {
)); ));
let tool_api = Arc::new(tools::ToolApi::new()); let tool_api = Arc::new(tools::ToolApi::new());
info!("Initializing drive at {}", cfg.minio.server);
let drive = init_drive(&config.minio) let drive = init_drive(&config.drive)
.await .await
.expect("Failed to initialize Drive"); .expect("Failed to initialize Drive");
@ -199,13 +202,14 @@ async fn main() -> std::io::Result<()> {
diesel::Connection::establish(&cfg.database_url()).unwrap(), diesel::Connection::establish(&cfg.database_url()).unwrap(),
redis_client.clone(), redis_client.clone(),
))); )));
let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new( let auth_service = Arc::new(tokio::sync::Mutex::new(auth::AuthService::new(
diesel::Connection::establish(&cfg.database_url()).unwrap(), diesel::Connection::establish(&cfg.database_url()).unwrap(),
redis_client.clone(), redis_client.clone(),
))); )));
let app_state = Arc::new(AppState { let app_state = Arc::new(AppState {
s3_client: Some(drive.clone()), s3_operator: Some(drive.clone()),
config: Some(cfg.clone()), config: Some(cfg.clone()),
conn: db_pool.clone(), conn: db_pool.clone(),
custom_conn: db_custom_pool.clone(), custom_conn: db_custom_pool.clone(),
@ -229,23 +233,17 @@ async fn main() -> std::io::Result<()> {
tool_api: tool_api.clone(), tool_api: tool_api.clone(),
}); });
info!( info!("Starting HTTP server on {}:{}", config.server.host, config.server.port);
"Starting HTTP server on {}:{}",
config.server.host, config.server.port
);
let worker_count = std::thread::available_parallelism() let worker_count = std::thread::available_parallelism()
.map(|n| n.get()) .map(|n| n.get())
.unwrap_or(4); .unwrap_or(4);
// Spawn AutomationService in a LocalSet on a separate thread
let automation_state = app_state.clone(); let automation_state = app_state.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()
.expect("Failed to create runtime for automation"); .expect("Failed to create runtime for automation");
let local = tokio::task::LocalSet::new(); let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move { local.block_on(&rt, async move {
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
@ -257,7 +255,7 @@ async fn main() -> std::io::Result<()> {
let drive_state = app_state.clone(); let drive_state = app_state.clone();
let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); let bot_guid = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
let bucket_name = format!("{}{}.gbai", cfg.minio.org_prefix, bot_guid); let bucket_name = format!("{}{}.gbai", cfg.drive.org_prefix, bot_guid);
let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name)); let drive_monitor = Arc::new(DriveMonitor::new(drive_state, bucket_name));
let _drive_handle = drive_monitor.spawn(); let _drive_handle = drive_monitor.spawn();
@ -267,8 +265,8 @@ async fn main() -> std::io::Result<()> {
.allow_any_method() .allow_any_method()
.allow_any_header() .allow_any_header()
.max_age(3600); .max_age(3600);
let app_state_clone = app_state.clone();
let app_state_clone = app_state.clone();
let mut app = App::new() let mut app = App::new()
.wrap(cors) .wrap(cors)
.wrap(Logger::default()) .wrap(Logger::default())

View file

@ -63,51 +63,85 @@ impl PackageManager {
fn register_drive(&mut self) { fn register_drive(&mut self) {
let drive_password = self.generate_secure_password(16); let drive_password = self.generate_secure_password(16);
let drive_user = "gbdriveuser".to_string(); let drive_user = "gbdriveuser".to_string();
let farm_password = std::env::var("FARM_PASSWORD") let farm_password =
.unwrap_or_else(|_| self.generate_secure_password(32)); std::env::var("FARM_PASSWORD").unwrap_or_else(|_| self.generate_secure_password(32));
let encrypted_drive_password = self.encrypt_password(&drive_password, &farm_password); let encrypted_drive_password = self.encrypt_password(&drive_password, &farm_password);
self.components.insert("drive".to_string(), ComponentConfig { let env_path = self.base_path.join(".env");
name: "drive".to_string(), let env_content = format!(
required: true, "DRIVE_USER={}\nDRIVE_PASSWORD={}\nFARM_PASSWORD={}\nDRIVE_ROOT_USER={}\nDRIVE_ROOT_PASSWORD={}\n",
ports: vec![9000, 9001], drive_user, drive_password, farm_password, drive_user, drive_password
dependencies: vec![], );
linux_packages: vec![], let _ = std::fs::write(&env_path, env_content);
macos_packages: vec![],
windows_packages: vec![],
download_url: Some("https://dl.min.io/server/minio/release/linux-amd64/minio".to_string()),
binary_name: Some("minio".to_string()),
pre_install_cmds_linux: vec![],
post_install_cmds_linux: vec![
"wget https://dl.min.io/client/mc/release/linux-amd64/mc -O {{BIN_PATH}}/mc".to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
format!("{{{{BIN_PATH}}}}/mc alias set mc http://localhost:9000 gbdriveuser {}", drive_password),
"{{BIN_PATH}}/mc mb mc/default.gbai".to_string(),
format!("{{{{BIN_PATH}}}}/mc admin user add mc gbdriveuser {}", drive_password),
"{{BIN_PATH}}/mc admin policy attach mc readwrite --user=gbdriveuser".to_string()
],
pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![
"wget https://dl.min.io/client/mc/release/darwin-amd64/mc -O {{BIN_PATH}}/mc".to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string()
],
pre_install_cmds_windows: vec![],
post_install_cmds_windows: vec![
"curl https://dl.min.io/client/mc/release/windows-amd64/mc.exe -O {{BIN_PATH}}\\mc.exe".to_string(),
"cmd /c {{BIN_PATH}}\\mc.exe alias set mc http://localhost:9000 gbdriveuser {}".to_string(),
"cmd /c {{BIN_PATH}}\\mc.exe mb mc\\default.gbai".to_string(),
"cmd /c {{BIN_PATH}}\\mc.exe admin user add mc gbdriveuser {}".to_string(),
"cmd /c {{BIN_PATH}}\\mc.exe admin policy attach mc readwrite --user=gbdriveuser".to_string()
],
env_vars: HashMap::from([
("MINIO_ROOT_USER".to_string(), "gbdriveuser".to_string()),
("MINIO_ROOT_PASSWORD".to_string(), drive_password)
]),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 &".to_string(),
});
self.update_drive_credentials_in_database(&encrypted_drive_password) self.components.insert(
.ok(); "drive".to_string(),
ComponentConfig {
name: "drive".to_string(),
required: true,
ports: vec![9000, 9001],
dependencies: vec![],
linux_packages: vec![],
macos_packages: vec![],
windows_packages: vec![],
download_url: Some(
"https://dl.min.io/server/minio/release/linux-amd64/minio".to_string(),
),
binary_name: Some("minio".to_string()),
pre_install_cmds_linux: vec![],
post_install_cmds_linux: vec![
"wget https://dl.min.io/client/mc/release/linux-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![
"wget https://dl.min.io/client/mc/release/darwin-amd64/mc -O {{BIN_PATH}}/mc"
.to_string(),
"chmod +x {{BIN_PATH}}/mc".to_string(),
],
pre_install_cmds_windows: vec![],
post_install_cmds_windows: vec![],
env_vars: HashMap::from([
("DRIVE_ROOT_USER".to_string(), drive_user.clone()),
("DRIVE_ROOT_PASSWORD".to_string(), drive_password.clone()),
]),
data_download_list: Vec::new(),
exec_cmd: "nohup {{BIN_PATH}}/minio server {{DATA_PATH}} --address :9000 --console-address :9001 > {{LOGS_PATH}}/minio.log 2>&1 & sleep 5 && {{BIN_PATH}}/mc alias set drive http://localhost:9000 minioadmin minioadmin && {{BIN_PATH}}/mc admin user add drive $DRIVE_ROOT_USER $DRIVE_ROOT_PASSWORD && {{BIN_PATH}}/mc admin policy attach drive readwrite --user $DRIVE_ROOT_USER && {{BIN_PATH}}/mc mb drive/default.gbai || true".to_string(),
},
);
// Delay updating drive credentials until database is created
let db_env_path = self.base_path.join(".env");
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://gbuser:@localhost:5432/botserver".to_string());
let db_line = format!("DATABASE_URL={}\n", database_url);
let _ = std::fs::write(&db_env_path, db_line);
// Append drive credentials after database creation
let env_path = self.base_path.join(".env");
let drive_lines = format!(
"DRIVE_USER={}\nDRIVE_PASSWORD={}\nFARM_PASSWORD={}\nDRIVE_ROOT_USER={}\nDRIVE_ROOT_PASSWORD={}\n",
drive_user, drive_password, farm_password, drive_user, drive_password
);
let _ = std::fs::OpenOptions::new()
.append(true)
.open(&env_path)
.and_then(|mut file| std::io::Write::write_all(&mut file, drive_lines.as_bytes()));
// Update drive credentials in database only after database is ready
if std::process::Command::new("pg_isready")
.arg("-h")
.arg("localhost")
.arg("-p")
.arg("5432")
.output()
.map(|o| o.status.success())
.unwrap_or(false)
{
self.update_drive_credentials_in_database(&encrypted_drive_password)
.ok();
}
} }
fn update_drive_credentials_in_database(&self, encrypted_drive_password: &str) -> Result<()> { fn update_drive_credentials_in_database(&self, encrypted_drive_password: &str) -> Result<()> {
@ -191,34 +225,33 @@ impl PackageManager {
} }
fn register_cache(&mut self) { fn register_cache(&mut self) {
self.components.insert("cache".to_string(), ComponentConfig { self.components.insert(
name: "cache".to_string(), "cache".to_string(),
required: true, ComponentConfig {
ports: vec![6379], name: "cache".to_string(),
dependencies: vec![], required: true,
linux_packages: vec!["curl".to_string(), "gnupg".to_string(), "lsb-release".to_string()], ports: vec![6379],
macos_packages: vec!["redis".to_string()], dependencies: vec![],
windows_packages: vec![], linux_packages: vec![],
download_url: None, macos_packages: vec![],
binary_name: Some("valkey-server".to_string()), windows_packages: vec![],
pre_install_cmds_linux: vec![ download_url: Some(
"sudo bash -c 'if [ ! -f /usr/share/keyrings/valkey.gpg ]; then curl -fsSL https://packages.redis.io/gpg | gpg --dearmor -o /usr/share/keyrings/valkey.gpg; fi'".to_string(), "https://download.valkey.io/releases/valkey-9.0.0-jammy-x86_64.tar.gz".to_string(),
"sudo bash -c 'if [ ! -f /etc/apt/sources.list.d/valkey.list ]; then echo \"deb [signed-by=/usr/share/keyrings/valkey.gpg] https://packages.redis.io/deb $(lsb_release -cs) main\" | tee /etc/apt/sources.list.d/valkey.list; fi'".to_string(), ),
"sudo apt-get update && sudo apt-get install -y valkey".to_string() binary_name: Some("valkey-server".to_string()),
], pre_install_cmds_linux: vec![],
post_install_cmds_linux: vec![], post_install_cmds_linux: vec![
pre_install_cmds_macos: vec![], "chmod +x {{BIN_PATH}}/bin/valkey-server".to_string(),
post_install_cmds_macos: vec![], ],
pre_install_cmds_windows: vec![ pre_install_cmds_macos: vec![],
post_install_cmds_macos: vec![],
"powershell -Command \"if (!(Test-Path -Path 'C:\\ProgramData\\valkey\\keyrings\\valkey.gpg')) { Invoke-WebRequest -Uri 'https://packages.redis.io/gpg' -OutFile C:\\ProgramData\\valkey\\keyrings\\valkey.gpg }\"".to_string(), pre_install_cmds_windows: vec![],
"powershell -Command \"if (!(Test-Path -Path 'C:\\ProgramData\\valkey\\sources.list')) { Add-Content -Path 'C:\\ProgramData\\valkey\\sources.list' -Value 'deb [signed-by=C:\\ProgramData\\valkey\\keyrings\\valkey.gpg] https://packages.redis.io/windows valkey main' }\"".to_string(), post_install_cmds_windows: vec![],
"powershell -Command \"winget install -e --id Valkey valkey-server\"".to_string() env_vars: HashMap::new(),
], data_download_list: Vec::new(),
post_install_cmds_windows: vec![], exec_cmd: "{{BIN_PATH}}/bin/valkey-server --port 6379 --dir {{DATA_PATH}}".to_string(),
env_vars: HashMap::new(), },
exec_cmd: "valkey-server --port 6379 --dir {{DATA_PATH}}".to_string(), );
});
} }
fn register_llm(&mut self) { fn register_llm(&mut self) {
@ -746,7 +779,10 @@ impl PackageManager {
if let Ok(output) = check_output { if let Ok(output) = check_output {
if output.status.success() { if output.status.success() {
trace!("Component {} is already running, skipping start", component.name); trace!(
"Component {} is already running, skipping start",
component.name
);
return Ok(std::process::Command::new("sh") return Ok(std::process::Command::new("sh")
.arg("-c") .arg("-c")
.arg("echo 'Already running'") .arg("echo 'Already running'")
@ -762,7 +798,11 @@ impl PackageManager {
.replace("{{CONF_PATH}}", &conf_path.to_string_lossy()) .replace("{{CONF_PATH}}", &conf_path.to_string_lossy())
.replace("{{LOGS_PATH}}", &logs_path.to_string_lossy()); .replace("{{LOGS_PATH}}", &logs_path.to_string_lossy());
trace!("Starting component {} with command: {}", component.name, rendered_cmd); trace!(
"Starting component {} with command: {}",
component.name,
rendered_cmd
);
let child = std::process::Command::new("sh") let child = std::process::Command::new("sh")
.current_dir(&bin_path) .current_dir(&bin_path)
@ -775,7 +815,10 @@ impl PackageManager {
Err(e) => { Err(e) => {
let err_msg = e.to_string(); let err_msg = e.to_string();
if err_msg.contains("already running") || component.name == "tables" { if err_msg.contains("already running") || component.name == "tables" {
trace!("Component {} may already be running, continuing anyway", component.name); trace!(
"Component {} may already be running, continuing anyway",
component.name
);
Ok(std::process::Command::new("sh") Ok(std::process::Command::new("sh")
.arg("-c") .arg("-c")
.arg("echo 'Already running'") .arg("echo 'Already running'")

View file

@ -6,28 +6,26 @@ use crate::session::SessionManager;
use crate::tools::{ToolApi, ToolManager}; use crate::tools::{ToolApi, ToolManager};
use crate::whatsapp::WhatsAppAdapter; use crate::whatsapp::WhatsAppAdapter;
use diesel::{Connection, PgConnection}; use diesel::{Connection, PgConnection};
use opendal::Operator;
use redis::Client; use redis::Client;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::shared::models::BotResponse; use crate::shared::models::BotResponse;
pub struct AppState { pub struct AppState {
pub s3_client: Option<aws_sdk_s3::Client>, pub s3_operator: Option<Operator>,
pub config: Option<AppConfig>, pub config: Option<AppConfig>,
pub conn: Arc<Mutex<PgConnection>>, pub conn: Arc<Mutex<PgConnection>>,
pub custom_conn: Arc<Mutex<PgConnection>>, pub custom_conn: Arc<Mutex<PgConnection>>,
pub redis_client: Option<Arc<Client>>, pub redis_client: Option<Arc<Client>>,
pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>, pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>,
pub tool_manager: Arc<ToolManager>, pub tool_manager: Arc<ToolManager>,
pub llm_provider: Arc<dyn LLMProvider>, pub llm_provider: Arc<dyn LLMProvider>,
pub auth_service: Arc<tokio::sync::Mutex<AuthService>>, pub auth_service: Arc<tokio::sync::Mutex<AuthService>>,
pub channels: Arc<Mutex<HashMap<String, Arc<dyn ChannelAdapter>>>>, pub channels: Arc<Mutex<HashMap<String, Arc<dyn ChannelAdapter>>>>,
pub response_channels: Arc<tokio::sync::Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>, pub response_channels: Arc<tokio::sync::Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
pub web_adapter: Arc<WebChannelAdapter>, pub web_adapter: Arc<WebChannelAdapter>,
pub voice_adapter: Arc<VoiceAdapter>, pub voice_adapter: Arc<VoiceAdapter>,
pub whatsapp_adapter: Arc<WhatsAppAdapter>, pub whatsapp_adapter: Arc<WhatsAppAdapter>,
@ -37,7 +35,7 @@ pub struct AppState {
impl Clone for AppState { impl Clone for AppState {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
s3_client: self.s3_client.clone(), s3_operator: self.s3_operator.clone(),
config: self.config.clone(), config: self.config.clone(),
conn: Arc::clone(&self.conn), conn: Arc::clone(&self.conn),
custom_conn: Arc::clone(&self.custom_conn), custom_conn: Arc::clone(&self.custom_conn),
@ -59,7 +57,7 @@ impl Clone for AppState {
impl Default for AppState { impl Default for AppState {
fn default() -> Self { fn default() -> Self {
Self { Self {
s3_client: None, s3_operator: None,
config: None, config: None,
conn: Arc::new(Mutex::new( conn: Arc::new(Mutex::new(
diesel::PgConnection::establish("postgres://localhost/test").unwrap(), diesel::PgConnection::establish("postgres://localhost/test").unwrap(),

View file

@ -0,0 +1,4 @@
name,value
prompt-compact, 10
prompt-cache,true
prompt-fixed-kb,geral
1 name value
2 prompt-compact 10
3 prompt-cache true
4 prompt-fixed-kb geral

View file

@ -1,67 +0,0 @@
REM Simple KISS authentication - signup/login only, no recovery
REM This script is called when user needs authentication
TALK "Welcome! Please choose an option:"
TALK "Type 'signup' to create a new account"
TALK "Type 'login' to access your existing account"
HEAR choice
IF choice = "signup" THEN
TALK "Great! Let's create your account."
TALK "Enter your email:"
HEAR email
TALK "Enter your password:"
HEAR password
TALK "Confirm your password:"
HEAR confirm_password
IF password <> confirm_password THEN
TALK "Passwords don't match. Please try again."
RETURN false
END IF
REM Create user in database
LET user_id = GENERATE_UUID()
LET result = EXEC "INSERT INTO users (id, email, password_hash, created_at) VALUES (?, ?, ?, NOW())", user_id, email, SHA256(password)
IF result > 0 THEN
SET_USER user_id
TALK "Account created successfully! You are now logged in."
RETURN true
ELSE
TALK "Error creating account. Email may already exist."
RETURN false
END IF
ELSE IF choice = "login" THEN
TALK "Please enter your email:"
HEAR email
TALK "Enter your password:"
HEAR password
REM Query user from database
LET user = FIND "users", "email=" + email
IF user = NULL THEN
TALK "Invalid email or password."
RETURN false
END IF
LET password_hash = SHA256(password)
IF user.password_hash = password_hash THEN
SET_USER user.id
TALK "Welcome back! You are now logged in."
RETURN true
ELSE
TALK "Invalid email or password."
RETURN false
END IF
ELSE
TALK "Invalid option. Please type 'signup' or 'login'."
RETURN false
END IF

View file

@ -0,0 +1,9 @@
PARAM subject as string
DESCRIPTION "Chamado quando alguém quer mudar o assunto da conversa."
kbname = LLM "Devolva uma única palavra circular, comunicado ou geral de acordo com a seguinte frase:" + subject
ADD_KB kbname
TALK "You have chosen to change the subject to " + subject + "."

View file

@ -1,16 +1,17 @@
REM start.bas - Runs automatically when user connects via web LET resume1 = GET_BOT_MEMORY("general")
REM This is the entry point for each session LET resume2 = GET_BOT_MEMORY("auxiliom")
LET resume3 = GET_BOT_MEMORY("toolbix")
LET resume = GET_BOT_MEMORY("resume") SET_CONTEXT "general", resume1
SET_CONTEXT "auxiliom", resume2
SET_CONTEXT "toolbix", resume3
IF resume <> "" THEN
TALK resume
ELSE
TALK "Welcome! I'm loading the latest information..."
END IF
REM Add knowledge base for weekly announcements ADD_SUGGESTION "general", "Show me the weekly announcements"
ADD_KB "weekly" ADD_SUGGESTION "auxiliom", "Will Auxiliom help me with what?"
ADD_SUGGESTION "auxiliom", "What does Auxiliom do?"
ADD_SUGGESTION "toolbix", "Show me Toolbix features"
ADD_SUGGESTION "toolbix", "How can Toolbix help my business?"
TALK "You can ask me about any of the announcements or circulars." TALK "You can ask me about any of the announcements or circulars."
TALK "If you'd like to login or signup, just type 'auth'."

View file

@ -1,5 +1,11 @@
let text = GET "default.gbdrive/default.pdf" let text = GET "announcements.gbkb/news/news.pdf"
let resume = LLM "Resume this document, in a table (DO NOT THINK) no_think: " + text let resume = LLM "Resume this document, in a table (DO NOT THINK) no_think: " + text
SET_BOT_MEMORY "resume", resume SET_BOT_MEMORY "resume", resume
let text1 = GET "announcements.gbkb/auxiliom/auxiliom.pdf"
SET_BOT_MEMORY "auxiliom", text1
let text2 = GET "announcements.gbkb/toolbix/toolbix.pdf"
SET_BOT_MEMORY "toolbix", text2

View file

@ -1,8 +1,8 @@
name,value name,value
server_host=0.0.0.0 server_host,0.0.0.0
server_port=8080 server_port,8080
sites_root=/tmp sites_root,/tmp
llm-key,gsk_ llm-key,gsk_
llm-model,openai/gpt-oss-20b llm-model,openai/gpt-oss-20b

1 name,value name value
2 server_host=0.0.0.0 server_host 0.0.0.0
3 server_port=8080 server_port 8080
4 sites_root=/tmp sites_root /tmp
5 llm-key,gsk_ llm-key gsk_
6 llm-model,openai/gpt-oss-20b llm-model openai/gpt-oss-20b
7 llm-url,https://api.groq.com/openai/v1/chat/completions llm-url https://api.groq.com/openai/v1/chat/completions
8 llm-url,http://localhost:8080/v1 llm-url http://localhost:8080/v1