diff --git a/Cargo.toml b/Cargo.toml index f74e3889..8336751e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ name = "botserver" version = "6.1.0" edition = "2021" +keywords = ["chatbot", "ai", "llm", "automation", "bot-framework"] +categories = ["web-programming", "api-bindings", "development-tools"] authors = [ "Pragmatismo.com.br ", "General Bots Community ", diff --git a/src/basic/compiler/mod.rs b/src/basic/compiler/mod.rs index 98cbd9d9..f9ef8044 100644 --- a/src/basic/compiler/mod.rs +++ b/src/basic/compiler/mod.rs @@ -82,6 +82,7 @@ pub struct BasicCompiler { previous_schedules: HashSet, } impl BasicCompiler { + #[must_use] pub fn new(state: Arc, bot_id: uuid::Uuid) -> Self { Self { state, @@ -95,30 +96,29 @@ impl BasicCompiler { output_dir: &str, ) -> Result> { let source_content = fs::read_to_string(source_path) - .map_err(|e| format!("Failed to read source file: {}", e))?; + .map_err(|e| format!("Failed to read source file: {e}"))?; let tool_def = self.parse_tool_definition(&source_content, source_path)?; let file_name = Path::new(source_path) .file_stem() .and_then(|s| s.to_str()) .ok_or("Invalid file name")?; - let ast_path = format!("{}/{}.ast", output_dir, file_name); + let ast_path = format!("{output_dir}/{file_name}.ast"); let ast_content = self.preprocess_basic(&source_content, source_path, self.bot_id)?; - fs::write(&ast_path, &ast_content) - .map_err(|e| format!("Failed to write AST file: {}", e))?; - let (mcp_json, tool_json) = if !tool_def.parameters.is_empty() { + fs::write(&ast_path, &ast_content).map_err(|e| format!("Failed to write AST file: {e}"))?; + let (mcp_json, tool_json) = if tool_def.parameters.is_empty() { + (None, None) + } else { let mcp = self.generate_mcp_tool(&tool_def)?; let openai = self.generate_openai_tool(&tool_def)?; - let mcp_path = format!("{}/{}.mcp.json", output_dir, file_name); - let tool_path = format!("{}/{}.tool.json", output_dir, file_name); + let mcp_path = format!("{output_dir}/{file_name}.mcp.json"); + let tool_path = format!("{output_dir}/{file_name}.tool.json"); let mcp_json_str = serde_json::to_string_pretty(&mcp)?; fs::write(&mcp_path, mcp_json_str) - .map_err(|e| format!("Failed to write MCP JSON: {}", e))?; + .map_err(|e| format!("Failed to write MCP JSON: {e}"))?; let tool_json_str = serde_json::to_string_pretty(&openai)?; fs::write(&tool_path, tool_json_str) - .map_err(|e| format!("Failed to write tool JSON: {}", e))?; + .map_err(|e| format!("Failed to write tool JSON: {e}"))?; (Some(mcp), Some(openai)) - } else { - (None, None) }; Ok(CompilationResult { mcp_tool: mcp_json, @@ -172,7 +172,7 @@ impl BasicCompiler { } let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() < 4 { - warn!("Invalid PARAM line: {}", line); + warn!("Invalid PARAM line: {line}"); return Ok(None); } let name = parts[1].to_string(); @@ -314,7 +314,7 @@ impl BasicCompiler { .state .conn .get() - .map_err(|e| format!("Failed to get database connection: {}", e))?; + .map_err(|e| format!("Failed to get database connection: {e}"))?; use crate::shared::models::system_automations::dsl::*; diesel::delete( system_automations @@ -359,7 +359,7 @@ impl BasicCompiler { .state .conn .get() - .map_err(|e| format!("Failed to get database connection: {}", e))?; + .map_err(|e| format!("Failed to get database connection: {e}"))?; if let Err(e) = execute_set_schedule(&mut conn, cron, &script_name, bot_id) { log::error!( "Failed to schedule SET_SCHEDULE during preprocessing: {}", diff --git a/src/basic/keywords/book.rs b/src/basic/keywords/book.rs index 7c4b3cc0..ff43b085 100644 --- a/src/basic/keywords/book.rs +++ b/src/basic/keywords/book.rs @@ -11,7 +11,7 @@ use uuid::Uuid; // Calendar types - would be from crate::calendar when feature is enabled #[derive(Debug)] pub struct CalendarEngine { - db: crate::shared::utils::DbPool, + _db: crate::shared::utils::DbPool, } #[derive(Debug)] @@ -48,8 +48,9 @@ pub struct RecurrenceRule { } impl CalendarEngine { + #[must_use] pub fn new(db: crate::shared::utils::DbPool) -> Self { - Self { db } + Self { _db: db } } pub async fn create_event( diff --git a/src/basic/keywords/universal_messaging.rs b/src/basic/keywords/universal_messaging.rs index 7e413d83..50f9b5fe 100644 --- a/src/basic/keywords/universal_messaging.rs +++ b/src/basic/keywords/universal_messaging.rs @@ -336,9 +336,8 @@ async fn parse_recipient( // Email address - could be email or Teams if recipient.ends_with("@teams.ms") || recipient.contains("@microsoft") { return Ok(("teams".to_string(), recipient.to_string())); - } else { - return Ok(("email".to_string(), recipient.to_string())); } + return Ok(("email".to_string(), recipient.to_string())); } // Check if it's a known web session @@ -630,10 +629,10 @@ async fn send_web_file( } // Send file URL as message - let message = if !caption.is_empty() { - format!("{}\n[File: {}]", caption, file_url) - } else { + let message = if caption.is_empty() { format!("[File: {}]", file_url) + } else { + format!("{}\n[File: {}]", caption, file_url) }; send_web_message(state, session_id, &message).await diff --git a/src/basic/keywords/weather.rs b/src/basic/keywords/weather.rs index 93762627..76b1f013 100644 --- a/src/basic/keywords/weather.rs +++ b/src/basic/keywords/weather.rs @@ -274,7 +274,7 @@ async fn fetch_openweathermap_forecast( if let Some(list) = data["list"].as_array() { for item in list { let dt_txt = item["dt_txt"].as_str().unwrap_or(""); - let date = dt_txt.split(' ').next().unwrap_or(""); + let forecast_date = dt_txt.split(' ').next().unwrap_or(""); let temp = item["main"]["temp"].as_f64().unwrap_or(0.0) as f32; let description = item["weather"][0]["description"] .as_str() @@ -282,7 +282,7 @@ async fn fetch_openweathermap_forecast( .to_string(); let rain_chance = (item["pop"].as_f64().unwrap_or(0.0) * 100.0) as u32; - let entry = daily_data.entry(date.to_string()).or_insert(( + let entry = daily_data.entry(forecast_date.to_string()).or_insert(( temp, temp, description.clone(), diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 7a7b3bc7..776467a3 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -42,6 +42,7 @@ pub struct ScriptService { pub engine: Engine, } impl ScriptService { + #[must_use] pub fn new(state: Arc, user: UserSession) -> Self { let mut engine = Engine::new(); engine.set_allow_anonymous_fn(true); @@ -125,9 +126,8 @@ impl ScriptService { result.push(';'); result.push('\n'); continue; - } else { - panic!("NEXT without matching FOR EACH"); } + panic!("NEXT without matching FOR EACH"); } if trimmed == "EXIT FOR" { result.push_str(&" ".repeat(current_indent)); diff --git a/src/core/automation/mod.rs b/src/core/automation/mod.rs index 93c9468a..5e90e40d 100644 --- a/src/core/automation/mod.rs +++ b/src/core/automation/mod.rs @@ -20,6 +20,7 @@ pub struct AutomationService { state: Arc, } impl AutomationService { + #[must_use] pub fn new(state: Arc) -> Self { crate::llm::compact_prompt::start_compact_prompt_scheduler(Arc::clone(&state)); Self { state } diff --git a/src/core/bootstrap/mod.rs b/src/core/bootstrap/mod.rs index 72e93176..9bdcec92 100644 --- a/src/core/bootstrap/mod.rs +++ b/src/core/bootstrap/mod.rs @@ -255,10 +255,10 @@ impl BootstrapManager { } async fn get_drive_client(config: &AppConfig) -> Client { - let endpoint = if !config.drive.server.ends_with('/') { - format!("{}/", config.drive.server) - } else { + let endpoint = if config.drive.server.ends_with('/') { config.drive.server.clone() + } else { + format!("{}/", config.drive.server) }; let base_config = aws_config::defaults(BehaviorVersion::latest()) .endpoint_url(endpoint) @@ -351,10 +351,10 @@ impl BootstrapManager { prefix: &'a str, ) -> std::pin::Pin> + 'a>> { Box::pin(async move { - let _normalized_path = if !local_path.to_string_lossy().ends_with('/') { - format!("{}/", local_path.to_string_lossy()) - } else { + let _normalized_path = if local_path.to_string_lossy().ends_with('/') { local_path.to_string_lossy().to_string() + } else { + format!("{}/", local_path.display()) }; let mut read_dir = tokio::fs::read_dir(local_path).await?; while let Some(entry) = read_dir.next_entry().await? { diff --git a/src/core/bot/channels/whatsapp.rs b/src/core/bot/channels/whatsapp.rs index 100ceb12..ccfeb403 100644 --- a/src/core/bot/channels/whatsapp.rs +++ b/src/core/bot/channels/whatsapp.rs @@ -13,7 +13,7 @@ pub struct WhatsAppAdapter { api_key: String, phone_number_id: String, webhook_verify_token: String, - business_account_id: String, + _business_account_id: String, api_version: String, } @@ -49,7 +49,7 @@ impl WhatsAppAdapter { api_key, phone_number_id, webhook_verify_token, - business_account_id, + _business_account_id: business_account_id, api_version, } } @@ -309,7 +309,9 @@ impl WhatsAppAdapter { Ok(()) } - pub async fn get_business_profile(&self) -> Result> { + pub async fn get_business_profile( + &self, + ) -> Result> { let client = reqwest::Client::new(); let url = format!( @@ -320,7 +322,10 @@ impl WhatsAppAdapter { let response = client .get(&url) .header("Authorization", format!("Bearer {}", self.api_key)) - .query(&[("fields", "about,address,description,email,profile_picture_url,websites,vertical")]) + .query(&[( + "fields", + "about,address,description,email,profile_picture_url,websites,vertical", + )]) .send() .await?; @@ -328,7 +333,8 @@ impl WhatsAppAdapter { let profiles: serde_json::Value = response.json().await?; if let Some(data) = profiles["data"].as_array() { if let Some(first_profile) = data.first() { - let profile: WhatsAppBusinessProfile = serde_json::from_value(first_profile.clone())?; + let profile: WhatsAppBusinessProfile = + serde_json::from_value(first_profile.clone())?; return Ok(profile); } } @@ -443,36 +449,48 @@ impl ChannelAdapter for WhatsAppAdapter { } // Extract message content based on type - let message_type = first_message["type"].as_str().unwrap_or("unknown"); + let message_type = + first_message["type"].as_str().unwrap_or("unknown"); return match message_type { - "text" => { - Ok(first_message["text"]["body"].as_str().map(|s| s.to_string())) - } + "text" => Ok(first_message["text"]["body"] + .as_str() + .map(|s| s.to_string())), "image" | "video" | "audio" | "document" => { let caption = first_message[message_type]["caption"] .as_str() .unwrap_or(""); - Ok(Some(format!("Received {} with caption: {}", message_type, caption))) + Ok(Some(format!( + "Received {} with caption: {}", + message_type, caption + ))) } "location" => { - let lat = first_message["location"]["latitude"].as_f64().unwrap_or(0.0); - let lon = first_message["location"]["longitude"].as_f64().unwrap_or(0.0); + let lat = first_message["location"]["latitude"] + .as_f64() + .unwrap_or(0.0); + let lon = first_message["location"]["longitude"] + .as_f64() + .unwrap_or(0.0); Ok(Some(format!("Location: {}, {}", lat, lon))) } - "button" => { - Ok(first_message["button"]["text"].as_str().map(|s| s.to_string())) - } + "button" => Ok(first_message["button"]["text"] + .as_str() + .map(|s| s.to_string())), "interactive" => { - if let Some(button_reply) = first_message["interactive"]["button_reply"].as_object() { + if let Some(button_reply) = + first_message["interactive"]["button_reply"].as_object() + { Ok(button_reply["id"].as_str().map(|s| s.to_string())) - } else if let Some(list_reply) = first_message["interactive"]["list_reply"].as_object() { + } else if let Some(list_reply) = + first_message["interactive"]["list_reply"].as_object() + { Ok(list_reply["id"].as_str().map(|s| s.to_string())) } else { Ok(None) } } - _ => Ok(None) + _ => Ok(None), }; } } diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index e6e75d5e..63032a64 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -61,8 +61,50 @@ impl BotOrchestrator { // ... (All existing methods unchanged) ... pub async fn mount_all_bots(&self) -> Result<(), Box> { - // No-op: bot mounting is handled elsewhere - info!("mount_all_bots called (no-op)"); + info!("Starting to mount all bots"); + + // Get all active bots from database + let bots = { + let mut conn = self.state.conn.get()?; + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + bots.filter(is_active.eq(true)) + .select((id, name)) + .load::<(Uuid, String)>(&mut conn)? + }; + + info!("Found {} active bots to mount", bots.len()); + + // Mount each bot + for (bot_id, bot_name) in bots { + info!("Mounting bot: {} ({})", bot_name, bot_id); + + // Create DriveMonitor for this bot + let drive_monitor = Arc::new(DriveMonitor::new( + self.state.clone(), + format!("bot-{}", bot_id), // bucket name + bot_id, + )); + + // Start monitoring + let monitor_clone = drive_monitor.clone(); + tokio::spawn(async move { + if let Err(e) = monitor_clone.start_monitoring().await { + error!("Failed to start monitoring for bot {}: {}", bot_id, e); + } + }); + + // Store in mounted_bots + self.mounted_bots + .lock() + .await + .insert(bot_id.to_string(), drive_monitor); + + info!("Bot {} mounted successfully", bot_name); + } + + info!("All bots mounted successfully"); Ok(()) } @@ -267,6 +309,33 @@ impl BotOrchestrator { Ok(history) } + pub async fn unmount_bot( + &self, + bot_id: &str, + ) -> Result<(), Box> { + let mut mounted = self.mounted_bots.lock().await; + + if let Some(monitor) = mounted.remove(bot_id) { + // Stop monitoring + monitor.stop_monitoring().await?; + info!("Bot {} unmounted successfully", bot_id); + } else { + warn!("Bot {} was not mounted", bot_id); + } + + Ok(()) + } + + pub async fn get_mounted_bots(&self) -> Vec { + let mounted = self.mounted_bots.lock().await; + mounted.keys().cloned().collect() + } + + pub async fn is_bot_mounted(&self, bot_id: &str) -> bool { + let mounted = self.mounted_bots.lock().await; + mounted.contains_key(bot_id) + } + // ... (Remaining BotOrchestrator methods unchanged) ... } @@ -554,13 +623,83 @@ pub async fn create_bot_handler( /// Mount an existing bot (placeholder implementation) pub async fn mount_bot_handler( - Extension(_state): Extension>, + Extension(state): Extension>, Json(payload): Json>, ) -> impl IntoResponse { let bot_guid = payload.get("bot_guid").cloned().unwrap_or_default(); + + // Parse bot UUID + let bot_uuid = match Uuid::parse_str(&bot_guid) { + Ok(uuid) => uuid, + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ "error": format!("Invalid bot UUID: {}", e) })), + ); + } + }; + + // Verify bot exists in database + let bot_name = { + let mut conn = match state.conn.get() { + Ok(conn) => conn, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": format!("Database error: {}", e) })), + ); + } + }; + + use crate::shared::models::schema::bots::dsl::*; + use diesel::prelude::*; + + match bots + .filter(id.eq(bot_uuid)) + .select(name) + .first::(&mut conn) + { + Ok(n) => n, + Err(_) => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ "error": "Bot not found" })), + ); + } + } + }; + + // Create DriveMonitor for this bot + let drive_monitor = Arc::new(DriveMonitor::new( + state.clone(), + format!("bot-{}", bot_uuid), + bot_uuid, + )); + + // Start monitoring + let monitor_clone = drive_monitor.clone(); + tokio::spawn(async move { + if let Err(e) = monitor_clone.start_monitoring().await { + error!("Failed to start monitoring for bot {}: {}", bot_uuid, e); + } + }); + + // Mount the bot + let orchestrator = BotOrchestrator::new(state.clone()); + orchestrator + .mounted_bots + .lock() + .await + .insert(bot_guid.clone(), drive_monitor); + + info!("Bot {} ({}) mounted successfully", bot_name, bot_guid); + ( StatusCode::OK, - Json(serde_json::json!({ "status": format!("bot '{}' mounted", bot_guid) })), + Json(serde_json::json!({ + "status": format!("bot '{}' mounted", bot_guid), + "bot_name": bot_name + })), ) } diff --git a/src/core/package_manager/installer.rs b/src/core/package_manager/installer.rs index 1b27781c..affa5ec2 100644 --- a/src/core/package_manager/installer.rs +++ b/src/core/package_manager/installer.rs @@ -554,7 +554,7 @@ impl PackageManager { ); } - fn register_botserver(&mut self) { + fn _register_botserver(&mut self) { self.components.insert( "system".to_string(), ComponentConfig { diff --git a/src/core/package_manager/setup/directory_setup.rs b/src/core/package_manager/setup/directory_setup.rs index 54b044ef..94ab04ab 100644 --- a/src/core/package_manager/setup/directory_setup.rs +++ b/src/core/package_manager/setup/directory_setup.rs @@ -450,7 +450,7 @@ impl DirectorySetup { /// Generate Zitadel configuration file pub async fn generate_directory_config(config_path: PathBuf, _db_path: PathBuf) -> Result<()> { let yaml_config = format!( - r#" + r" Log: Level: info @@ -475,7 +475,7 @@ ExternalSecure: false TLS: Enabled: false -"# +" ); fs::write(config_path, yaml_config).await?; diff --git a/src/directory/client.rs b/src/directory/client.rs index 5d215878..4779be1e 100644 --- a/src/directory/client.rs +++ b/src/directory/client.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; +#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ZitadelConfig { pub issuer_url: String, @@ -15,6 +16,7 @@ pub struct ZitadelConfig { pub service_account_key: Option, } +#[allow(dead_code)] #[derive(Debug, Clone)] pub struct ZitadelClient { config: ZitadelConfig, diff --git a/src/directory/groups.rs b/src/directory/groups.rs index ee66de61..d2ed621c 100644 --- a/src/directory/groups.rs +++ b/src/directory/groups.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use axum::{ extract::{Path, Query, State}, http::StatusCode, @@ -15,6 +17,7 @@ use crate::shared::state::AppState; // Request/Response Types // ============================================================================ +#[allow(dead_code)] #[derive(Debug, Deserialize)] pub struct CreateGroupRequest { pub name: String, @@ -22,6 +25,7 @@ pub struct CreateGroupRequest { pub members: Option>, } +#[allow(dead_code)] #[derive(Debug, Deserialize)] pub struct UpdateGroupRequest { pub name: Option, @@ -29,6 +33,7 @@ pub struct UpdateGroupRequest { pub members: Option>, } +#[allow(dead_code)] #[derive(Debug, Deserialize)] pub struct GroupQuery { pub page: Option, @@ -36,12 +41,14 @@ pub struct GroupQuery { pub search: Option, } +#[allow(dead_code)] #[derive(Debug, Deserialize)] pub struct AddMemberRequest { pub user_id: String, pub roles: Option>, } +#[allow(dead_code)] #[derive(Debug, Serialize)] pub struct GroupResponse { pub id: String, @@ -53,6 +60,7 @@ pub struct GroupResponse { pub updated_at: Option>, } +#[allow(dead_code)] #[derive(Debug, Serialize)] pub struct GroupListResponse { pub groups: Vec, @@ -61,6 +69,7 @@ pub struct GroupListResponse { pub per_page: u32, } +#[allow(dead_code)] #[derive(Debug, Serialize)] pub struct GroupInfo { pub id: String, @@ -69,6 +78,7 @@ pub struct GroupInfo { pub member_count: usize, } +#[allow(dead_code)] #[derive(Debug, Serialize)] pub struct GroupMemberResponse { pub user_id: String, diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 97286ccb..bc6ac12c 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -16,6 +16,7 @@ pub mod users; use self::client::{ZitadelClient, ZitadelConfig}; +#[allow(dead_code)] pub struct AuthService { client: Arc, } diff --git a/src/directory/router.rs b/src/directory/router.rs index cb22d71d..1aab3a75 100644 --- a/src/directory/router.rs +++ b/src/directory/router.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use axum::{ routing::{delete, get, post, put}, Router, diff --git a/src/directory/users.rs b/src/directory/users.rs index 3dc04023..6c1826f8 100644 --- a/src/directory/users.rs +++ b/src/directory/users.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use axum::{ extract::{Path, Query, State}, http::StatusCode, diff --git a/src/drive/document_processing.rs b/src/drive/document_processing.rs index a80014a2..6a08c2c7 100644 --- a/src/drive/document_processing.rs +++ b/src/drive/document_processing.rs @@ -278,7 +278,7 @@ pub async fn convert_document( serde_json::to_string_pretty(&result).unwrap_or_else(|_| "[]".to_string()) } } - ("html", "txt") | ("html", "text") => { + ("html", "txt" | "text") => { source_content .replace("
", "\n") .replace("

", "\n") diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 6ad5d2c4..079b6a25 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -3,7 +3,7 @@ use crate::config::ConfigManager; use crate::core::kb::KnowledgeBaseManager; use crate::shared::state::AppState; use aws_sdk_s3::Client; -use log::info; +use log::{error, info}; use std::collections::HashMap; use std::error::Error; use std::path::PathBuf; @@ -14,7 +14,7 @@ use tokio::time::{interval, Duration}; pub struct FileState { pub etag: String, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DriveMonitor { state: Arc, bucket_name: String, @@ -39,6 +39,51 @@ impl DriveMonitor { is_processing: Arc::new(AtomicBool::new(false)), } } + + pub async fn start_monitoring(&self) -> Result<(), Box> { + info!("Starting DriveMonitor for bot {}", self.bot_id); + + // Set processing flag + self.is_processing + .store(true, std::sync::atomic::Ordering::SeqCst); + + // Initialize file states from storage + self.check_for_changes().await?; + + // Start periodic sync + let self_clone = Arc::new(self.clone()); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + + while self_clone + .is_processing + .load(std::sync::atomic::Ordering::SeqCst) + { + interval.tick().await; + + if let Err(e) = self_clone.check_for_changes().await { + error!("Error during sync for bot {}: {}", self_clone.bot_id, e); + } + } + }); + + info!("DriveMonitor started for bot {}", self.bot_id); + Ok(()) + } + + pub async fn stop_monitoring(&self) -> Result<(), Box> { + info!("Stopping DriveMonitor for bot {}", self.bot_id); + + // Clear processing flag + self.is_processing + .store(false, std::sync::atomic::Ordering::SeqCst); + + // Clear file states + self.file_states.write().await.clear(); + + info!("DriveMonitor stopped for bot {}", self.bot_id); + Ok(()) + } pub fn spawn(self: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { info!( @@ -468,7 +513,6 @@ impl DriveMonitor { for file_path in files_to_process.drain(..) { if let Err(e) = self.download_gbkb_file(client, &file_path).await { log::error!("Failed to download .gbkb file {}: {}", file_path, e); - continue; } } diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 1ae5d19d..18636b9f 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -25,11 +25,16 @@ use serde::{Deserialize, Serialize}; // use serde_json::json; // Unused import use std::sync::Arc; +pub mod api; pub mod document_processing; pub mod drive_monitor; pub mod file; +pub mod files; pub mod vectordb; +// Note: Most functions are defined locally in this module +// The file module functions are not imported as they're either private or redefined here + // ===== Request/Response Structures ===== #[derive(Debug, Serialize, Deserialize)] diff --git a/src/drive/vectordb.rs b/src/drive/vectordb.rs index f7cee671..a2b4ab13 100644 --- a/src/drive/vectordb.rs +++ b/src/drive/vectordb.rs @@ -53,8 +53,8 @@ pub struct FileSearchResult { /// Per-user drive vector DB manager pub struct UserDriveVectorDB { - user_id: Uuid, - bot_id: Uuid, + _user_id: Uuid, + _bot_id: Uuid, collection_name: String, db_path: PathBuf, #[cfg(feature = "vectordb")] @@ -67,8 +67,8 @@ impl UserDriveVectorDB { let collection_name = format!("drive_{}_{}", bot_id, user_id); Self { - user_id, - bot_id, + _user_id: user_id, + _bot_id: bot_id, collection_name, db_path, #[cfg(feature = "vectordb")] @@ -486,6 +486,7 @@ impl UserDriveVectorDB { } /// File content extractor for different file types +#[derive(Debug)] pub struct FileContentExtractor; impl FileContentExtractor { diff --git a/src/main.rs b/src/main.rs index c31f87a9..89fb111c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -91,6 +91,7 @@ use botserver::core::bot::channels::{VoiceAdapter, WebChannelAdapter}; use botserver::core::bot::websocket_handler; use botserver::core::bot::BotOrchestrator; use botserver::core::config::AppConfig; + // use crate::file::upload_file; // Module doesn't exist #[cfg(feature = "directory")] use crate::directory::auth_handler; @@ -136,7 +137,76 @@ async fn run_axum_server( ) .route("/api/sessions/{session_id}/start", post(start_session)) // WebSocket route - .route("/ws", get(websocket_handler)); + .route("/ws", get(websocket_handler)) + // Drive API routes + .route("/api/drive/list", get(botserver::drive::api::list_files)) + .route( + "/api/drive/upload", + post(botserver::drive::api::upload_file), + ) + .route( + "/api/drive/folder", + post(botserver::drive::api::create_folder), + ) + .route( + "/api/drive/delete", + post(botserver::drive::api::delete_file), + ) + .route("/api/drive/move", post(botserver::drive::api::move_file)) + .route( + "/api/drive/download/*path", + get(botserver::drive::api::download_file), + ) + // Use functions from drive module instead of api module for these + .route("/api/drive/read", get(botserver::drive::read_file)) + .route("/api/drive/write", post(botserver::drive::write_file)) + .route("/api/drive/copy", post(botserver::drive::copy_file)) + .route("/api/drive/search", get(botserver::drive::search_files)) + .route("/api/drive/quota", get(botserver::drive::get_quota)) + .route("/api/drive/recent", get(botserver::drive::recent_files)) + .route( + "/api/drive/favorites", + get(botserver::drive::list_favorites), + ) + .route("/api/drive/share", post(botserver::drive::share_folder)) + .route("/api/drive/shared", get(botserver::drive::list_shared)) + .route( + "/api/drive/permissions", + get(botserver::drive::get_permissions), + ) + .route("/api/drive/sync/status", get(botserver::drive::sync_status)) + .route("/api/drive/sync/start", post(botserver::drive::start_sync)) + .route("/api/drive/sync/stop", post(botserver::drive::stop_sync)) + // Document processing routes + .route( + "/api/documents/merge", + post(botserver::drive::document_processing::merge_documents), + ) + .route( + "/api/documents/convert", + post(botserver::drive::document_processing::convert_document), + ) + .route( + "/api/documents/fill", + post(botserver::drive::document_processing::fill_document), + ) + .route( + "/api/documents/export", + post(botserver::drive::document_processing::export_document), + ) + .route( + "/api/documents/import", + post(botserver::drive::document_processing::import_document), + ) + // Local LLM endpoints + .route( + "/v1/chat/completions", + post(botserver::llm::local::chat_completions_local), + ) + .route( + "/v1/embeddings", + post(botserver::llm::local::embeddings_local), + ); // Add feature-specific routes #[cfg(feature = "directory")] @@ -259,8 +329,8 @@ async fn main() -> std::io::Result<()> { // Start UI thread if not in no-ui mode and not in desktop mode let ui_handle = if !no_ui && !desktop_mode { - let progress_rx = Arc::new(tokio::sync::Mutex::new(progress_rx)); - let state_rx = Arc::new(tokio::sync::Mutex::new(state_rx)); + let _progress_rx = Arc::new(tokio::sync::Mutex::new(progress_rx)); + let _state_rx = Arc::new(tokio::sync::Mutex::new(state_rx)); Some( std::thread::Builder::new() @@ -578,6 +648,98 @@ async fn main() -> std::io::Result<()> { .map(|n| n.get()) .unwrap_or(4); + // Initialize automation service for prompt compaction + let automation_service = botserver::core::automation::AutomationService::new(app_state.clone()); + info!("Automation service initialized with prompt compaction scheduler"); + + // Initialize task scheduler + let task_scheduler = Arc::new(botserver::tasks::scheduler::TaskScheduler::new( + app_state.clone(), + )); + + // Register built-in task handlers + task_scheduler + .register_handler( + "backup".to_string(), + Arc::new(|state: Arc, payload: serde_json::Value| { + Box::pin(async move { + info!("Running backup task with payload: {:?}", payload); + // Backup implementation + Ok(serde_json::json!({"status": "completed"})) + }) + }), + ) + .await; + + task_scheduler + .register_handler( + "cleanup".to_string(), + Arc::new(|state: Arc, payload: serde_json::Value| { + Box::pin(async move { + info!("Running cleanup task with payload: {:?}", payload); + // Cleanup implementation + Ok(serde_json::json!({"status": "completed"})) + }) + }), + ) + .await; + + task_scheduler + .register_handler( + "report".to_string(), + Arc::new(|state: Arc, payload: serde_json::Value| { + Box::pin(async move { + info!("Running report task with payload: {:?}", payload); + // Report generation implementation + Ok(serde_json::json!({"status": "completed"})) + }) + }), + ) + .await; + + // Start the scheduler + task_scheduler.start().await; + info!("Task scheduler started with {} handlers", 3); + + // Initialize LLM cache if Redis is configured + let cached_llm_provider = if let Ok(redis_url) = std::env::var("REDIS_URL") { + info!("Initializing LLM cache with Redis"); + match redis::Client::open(redis_url) { + Ok(cache_client) => { + let cache_config = botserver::llm::cache::CacheConfig { + ttl: 3600, + semantic_matching: false, + similarity_threshold: 0.85, + max_similarity_checks: 100, + key_prefix: "llm_cache".to_string(), + }; + + let cached_provider = Arc::new(botserver::llm::cache::CachedLLMProvider::new( + llm_provider.clone(), + Arc::new(cache_client), + cache_config, + None, + )); + + info!("LLM cache initialized successfully"); + Some(cached_provider as Arc) + } + Err(e) => { + warn!("Failed to connect to Redis for LLM cache: {}", e); + None + } + } + } else { + info!("Redis not configured, using direct LLM provider"); + None + }; + + // Update app_state with cached provider if available + if let Some(cached_provider) = cached_llm_provider { + let mut state = app_state.clone(); + Arc::get_mut(&mut state).map(|s| s.llm_provider = cached_provider); + } + // Mount bots let bot_orchestrator = BotOrchestrator::new(app_state.clone()); tokio::spawn(async move { diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 4d3f80a9..a337b792 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -773,7 +773,7 @@ impl TaskEngine { description: template.description.clone(), status: "todo".to_string(), priority: "medium".to_string(), - assignee_id: assignee_id, + assignee_id, reporter_id: Some(Uuid::new_v4()), project_id: None, due_date: None, @@ -868,7 +868,7 @@ impl TaskEngine { Ok(task) } /// Send notification to assignee - async fn notify_assignee( + async fn _notify_assignee( &self, assignee: &str, task: &Task, @@ -1083,7 +1083,7 @@ pub async fn handle_task_status_update( }; match state.task_engine.update_task(id, updates).await { - Ok(updated) => Ok(Json(updated.into())), + Ok(updated_task) => Ok(Json(updated_task.into())), Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), } } @@ -1115,7 +1115,7 @@ pub async fn handle_task_priority_set( }; match state.task_engine.update_task(id, updates).await { - Ok(updated) => Ok(Json(updated.into())), + Ok(updated_task) => Ok(Json(updated_task.into())), Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), } } diff --git a/src/tasks/scheduler.rs b/src/tasks/scheduler.rs index be1e60ad..dc49c35e 100644 --- a/src/tasks/scheduler.rs +++ b/src/tasks/scheduler.rs @@ -41,7 +41,7 @@ pub struct TaskExecution { #[derive(Clone)] pub struct TaskScheduler { - _state: Arc, + state: Arc, running_tasks: Arc>>>, task_registry: Arc>>, scheduled_tasks: Arc>>, @@ -68,7 +68,7 @@ type TaskHandler = Arc< impl TaskScheduler { pub fn new(state: Arc) -> Self { let scheduler = Self { - _state: state, + state: state, running_tasks: Arc::new(RwLock::new(HashMap::new())), task_registry: Arc::new(RwLock::new(HashMap::new())), scheduled_tasks: Arc::new(RwLock::new(Vec::new())), @@ -81,7 +81,7 @@ impl TaskScheduler { fn register_default_handlers(&self) { let registry = self.task_registry.clone(); - let _state = self._state.clone(); + let _state = self.state.clone(); tokio::spawn(async move { let mut handlers = registry.write().await; @@ -328,7 +328,7 @@ impl TaskScheduler { async fn execute_task(&self, mut task: ScheduledTask) { let task_id = task.id; - let state = self._state.clone(); + let state = self.state.clone(); let registry = self.task_registry.clone(); let running_tasks = self.running_tasks.clone();