From f99013872d22dab7b37ab7f23d0a8077085185a4 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sat, 20 Dec 2025 19:57:57 -0300 Subject: [PATCH] refactor: fix TaskEngine feature gate, thread-safe Extensions with Arc --- src/attendance/llm_assist.rs | 14 - src/basic/keywords/a2a_protocol.rs | 4 - src/basic/keywords/add_bot.rs | 6 - src/basic/keywords/agent_reflection.rs | 2 - src/basic/keywords/auto_task.rs | 2 - src/basic/keywords/autotask_api.rs | 4 - src/basic/keywords/code_sandbox.rs | 4 - src/basic/keywords/crm/attendance.rs | 14 - src/basic/keywords/data_operations.rs | 4 - src/basic/keywords/file_operations.rs | 4 - src/basic/keywords/hear_talk.rs | 6 - src/basic/keywords/intent_compiler.rs | 4 - src/basic/keywords/mcp_client.rs | 6 - src/basic/keywords/model_routing.rs | 4 - src/basic/keywords/play.rs | 2 - src/basic/keywords/procedures.rs | 4 - src/basic/keywords/safety_layer.rs | 8 - src/basic/keywords/user_memory.rs | 4 - src/calendar/mod.rs | 113 ++++- src/core/bot/multimedia.rs | 3 - src/core/config/mod.rs | 1 - src/core/shared/mod.rs | 1 - src/core/shared/schema.rs | 3 - src/core/shared/state.rs | 145 +++++-- src/designer/mod.rs | 2 - src/directory/groups.rs | 4 - src/directory/users.rs | 4 - src/drive/vectordb.rs | 123 +++++- src/email/mod.rs | 83 +++- src/email/stalwart_client.rs | 20 - src/email/stalwart_sync.rs | 6 - src/email/vectordb.rs | 2 - src/lib.rs | 8 +- src/paper/mod.rs | 28 -- src/security/ca.rs | 112 ++++- src/sources/mod.rs | 24 -- src/vector-db/hybrid_search.rs | 11 - src/vector-db/mod.rs | 2 - src/vector-db/vectordb_indexer.rs | 158 ++++++- src/weba/mod.rs | 544 ++++++++++++++++++++++++- 40 files changed, 1166 insertions(+), 327 deletions(-) diff --git a/src/attendance/llm_assist.rs b/src/attendance/llm_assist.rs index 329e5db2..84a3e5b5 100644 --- a/src/attendance/llm_assist.rs +++ b/src/attendance/llm_assist.rs @@ -52,9 +52,7 @@ use std::path::PathBuf; use std::sync::Arc; use uuid::Uuid; -// ============================================================================ // Configuration -// ============================================================================ /// LLM Assist configuration loaded from config.csv #[derive(Debug, Clone, Default)] @@ -155,9 +153,7 @@ impl LlmAssistConfig { } } -// ============================================================================ // Request/Response Types -// ============================================================================ /// Request for generating tips based on customer message #[derive(Debug, Deserialize)] @@ -328,9 +324,7 @@ pub struct Emotion { pub intensity: f32, // 0.0 to 1.0 } -// ============================================================================ // LLM Integration -// ============================================================================ /// Execute LLM generation with the bot's context async fn execute_llm_with_context( @@ -416,9 +410,7 @@ fn get_bot_system_prompt(bot_id: Uuid, work_path: &str) -> String { "You are a professional customer service assistant. Be helpful, empathetic, and solution-oriented. Maintain a friendly but professional tone.".to_string() } -// ============================================================================ // API Handlers -// ============================================================================ /// POST /api/attendance/llm/tips /// Generate contextual tips for the attendant based on customer message @@ -989,9 +981,7 @@ pub async fn get_llm_config( ) } -// ============================================================================ // WhatsApp Attendant Commands -// ============================================================================ /// Process WhatsApp command from attendant pub async fn process_attendant_command( @@ -1562,9 +1552,7 @@ _Portuguese: /fila, /pegar, /transferir, /resolver, /dicas, /polir, /respostas, .to_string() } -// ============================================================================ // Helper Functions -// ============================================================================ /// Get session from database async fn get_session(state: &Arc, session_id: Uuid) -> Result { @@ -2113,9 +2101,7 @@ fn analyze_sentiment_keywords(message: &str) -> SentimentAnalysis { } } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/a2a_protocol.rs b/src/basic/keywords/a2a_protocol.rs index 7bc82aaf..769a62f3 100644 --- a/src/basic/keywords/a2a_protocol.rs +++ b/src/basic/keywords/a2a_protocol.rs @@ -613,9 +613,7 @@ pub fn get_a2a_messages_keyword(state: Arc, user: UserSession, engine: }); } -// ============================================================================ // Database Operations -// ============================================================================ /// Send an A2A message async fn send_a2a_message( @@ -840,9 +838,7 @@ pub async fn respond_to_a2a_message( .await } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/add_bot.rs b/src/basic/keywords/add_bot.rs index 0c40864b..ba027e57 100644 --- a/src/basic/keywords/add_bot.rs +++ b/src/basic/keywords/add_bot.rs @@ -579,9 +579,7 @@ fn delegate_to_keyword( Ok(()) } -// ============================================================================ // Database Operations -// ============================================================================ /// Add a bot to the session async fn add_bot_to_session( @@ -785,9 +783,7 @@ async fn delegate_to_bot( Ok(response) } -// ============================================================================ // Multi-Agent Message Processing -// ============================================================================ /// Check if a message matches any bot triggers pub fn match_bot_triggers(message: &str, bots: &[SessionBot]) -> Vec { @@ -857,9 +853,7 @@ pub fn match_tool_triggers(tool_name: &str, bots: &[SessionBot]) -> Vec, } -// ============================================================================= // API HANDLERS -// ============================================================================= /// POST /api/autotask/compile - Compile an intent into an execution plan pub async fn compile_intent_handler( diff --git a/src/basic/keywords/code_sandbox.rs b/src/basic/keywords/code_sandbox.rs index a288d3ed..ed7c6d99 100644 --- a/src/basic/keywords/code_sandbox.rs +++ b/src/basic/keywords/code_sandbox.rs @@ -847,9 +847,7 @@ pub fn run_file_keyword(state: Arc, user: UserSession, engine: &mut En .expect("Failed to register RUN JAVASCRIPT WITH FILE syntax"); } -// ============================================================================ // LXC Container Setup Templates -// ============================================================================ /// Generate LXC configuration for Python sandbox pub fn generate_python_lxc_config() -> String { @@ -919,9 +917,7 @@ lxc.mount.entry = tmpfs tmp tmpfs defaults 0 0 .to_string() } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/crm/attendance.rs b/src/basic/keywords/crm/attendance.rs index ec905e9d..666ee876 100644 --- a/src/basic/keywords/crm/attendance.rs +++ b/src/basic/keywords/crm/attendance.rs @@ -72,9 +72,7 @@ use rhai::{Array, Dynamic, Engine, Map}; use std::sync::Arc; use uuid::Uuid; -// ============================================================================ // Registration -// ============================================================================ /// Register all CRM attendance keywords pub fn register_attendance_keywords(state: Arc, user: UserSession, engine: &mut Engine) { @@ -107,9 +105,7 @@ pub fn register_attendance_keywords(state: Arc, user: UserSession, eng debug!("CRM attendance keywords registered successfully"); } -// ============================================================================ // Queue Management Keywords -// ============================================================================ /// GET QUEUE - Get current queue status /// @@ -675,9 +671,7 @@ fn set_priority_impl(state: &Arc, session_id: &str, priority: Dynamic) result } -// ============================================================================ // Attendant Management Keywords -// ============================================================================ /// GET ATTENDANTS - List available attendants /// @@ -907,9 +901,7 @@ fn get_attendant_stats_impl(state: &Arc, attendant_id: &str) -> Dynami result } -// ============================================================================ // LLM Assist Keywords -// ============================================================================ /// GET TIPS - Generate AI tips for conversation /// @@ -1331,9 +1323,7 @@ fn analyze_sentiment_impl(_state: &Arc, _session_id: &str, message: &s Dynamic::from(result) } -// ============================================================================ // Customer Journey Keywords -// ============================================================================ /// TAG CONVERSATION - Add tags to conversation /// @@ -1622,9 +1612,7 @@ fn get_customer_history_impl(state: &Arc, user_id: &str) -> Dynamic { result } -// ============================================================================ // Helper Functions -// ============================================================================ fn create_error_result(message: &str) -> Dynamic { let mut result = Map::new(); @@ -1633,9 +1621,7 @@ fn create_error_result(message: &str) -> Dynamic { Dynamic::from(result) } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/data_operations.rs b/src/basic/keywords/data_operations.rs index 836ff252..5314c3f0 100644 --- a/src/basic/keywords/data_operations.rs +++ b/src/basic/keywords/data_operations.rs @@ -477,9 +477,7 @@ pub fn register_group_by_keyword(_state: Arc, _user: UserSession, engi .unwrap(); } -// ============================================================================ // Implementation Functions -// ============================================================================ /// Execute SAVE - upsert operation fn execute_save( @@ -994,9 +992,7 @@ fn execute_group_by(data: &Dynamic, field: &str) -> Result fn dynamic_to_map(value: &Dynamic) -> HashMap { diff --git a/src/basic/keywords/file_operations.rs b/src/basic/keywords/file_operations.rs index ea6da075..ce8427fa 100644 --- a/src/basic/keywords/file_operations.rs +++ b/src/basic/keywords/file_operations.rs @@ -984,9 +984,7 @@ pub fn register_merge_pdf_keyword(state: Arc, user: UserSession, engin .unwrap(); } -// ============================================================================ // Implementation Functions -// ============================================================================ /// Read file content from .gbdrive async fn execute_read( @@ -1718,9 +1716,7 @@ async fn execute_merge_pdf( }) } -// ============================================================================ // Helper Functions -// ============================================================================ /// Convert Dynamic to JSON Value fn dynamic_to_json(value: &Dynamic) -> Value { diff --git a/src/basic/keywords/hear_talk.rs b/src/basic/keywords/hear_talk.rs index d6d3bbf9..fab23904 100644 --- a/src/basic/keywords/hear_talk.rs +++ b/src/basic/keywords/hear_talk.rs @@ -431,9 +431,7 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E .unwrap(); } -// ============================================================================ // Validation Functions -// ============================================================================ /// Validate input based on type pub fn validate_input(input: &str, input_type: &InputType) -> ValidationResult { @@ -1166,9 +1164,7 @@ fn validate_menu(input: &str, options: &[String]) -> ValidationResult { ValidationResult::invalid(format!("Please select one of: {}", options.join(", "))) } -// ============================================================================ // TALK Keyword -// ============================================================================ pub async fn execute_talk( state: Arc, @@ -1253,9 +1249,7 @@ pub fn talk_keyword(state: Arc, user: UserSession, engine: &mut Engine .unwrap(); } -// ============================================================================ // Input Processing (called when user sends message) -// ============================================================================ /// Process user input with validation pub async fn process_hear_input( diff --git a/src/basic/keywords/intent_compiler.rs b/src/basic/keywords/intent_compiler.rs index cd0d1c4b..4449f4f0 100644 --- a/src/basic/keywords/intent_compiler.rs +++ b/src/basic/keywords/intent_compiler.rs @@ -40,9 +40,7 @@ use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; -// ============================================================================ // CORE DATA STRUCTURES -// ============================================================================ /// Represents a compiled intent - the result of LLM analysis #[derive(Debug, Clone, Serialize, Deserialize)] @@ -341,9 +339,7 @@ impl Default for ResourceEstimate { } } -// ============================================================================ // INTENT COMPILER ENGINE -// ============================================================================ /// The main Intent Compiler engine pub struct IntentCompiler { diff --git a/src/basic/keywords/mcp_client.rs b/src/basic/keywords/mcp_client.rs index fdc7eb33..49a37801 100644 --- a/src/basic/keywords/mcp_client.rs +++ b/src/basic/keywords/mcp_client.rs @@ -46,9 +46,7 @@ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; -// ============================================================================ // MCP DATA STRUCTURES -// ============================================================================ /// Represents a registered MCP server #[derive(Debug, Clone, Serialize, Deserialize)] @@ -379,9 +377,7 @@ impl Default for HealthStatus { } } -// ============================================================================ // MCP REQUEST/RESPONSE -// ============================================================================ /// MCP tool invocation request #[derive(Debug, Clone, Serialize, Deserialize)] @@ -444,9 +440,7 @@ pub struct McpResponseMetadata { pub rate_limit_reset: Option>, } -// ============================================================================ // MCP CLIENT -// ============================================================================ /// The MCP Client for managing server connections and tool invocations pub struct McpClient { diff --git a/src/basic/keywords/model_routing.rs b/src/basic/keywords/model_routing.rs index 123f97b1..42abfd1d 100644 --- a/src/basic/keywords/model_routing.rs +++ b/src/basic/keywords/model_routing.rs @@ -375,9 +375,7 @@ pub fn list_models_keyword(state: Arc, user: UserSession, engine: &mut }); } -// ============================================================================ // Database Operations -// ============================================================================ /// Set the model for a session async fn set_session_model( @@ -553,9 +551,7 @@ pub fn get_session_routing_strategy(state: &AppState, session_id: Uuid) -> Routi } } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/play.rs b/src/basic/keywords/play.rs index cac0de1f..b9cff9b1 100644 --- a/src/basic/keywords/play.rs +++ b/src/basic/keywords/play.rs @@ -453,9 +453,7 @@ fn resume_keyword( Ok(()) } -// ============================================================================ // Core Functions -// ============================================================================ /// Execute the PLAY command async fn execute_play( diff --git a/src/basic/keywords/procedures.rs b/src/basic/keywords/procedures.rs index 6e2f9a3c..32c9cc6a 100644 --- a/src/basic/keywords/procedures.rs +++ b/src/basic/keywords/procedures.rs @@ -391,10 +391,8 @@ fn register_return_keyword(engine: &mut Engine) { .expect("Failed to register RETURN syntax"); } -// ============================================================================ // PREPROCESSING FUNCTIONS // These run at compile time to extract SUB/FUNCTION definitions -// ============================================================================ /// Preprocess SUB definitions from source code /// Converts SUB/END SUB blocks into callable units @@ -674,9 +672,7 @@ pub fn get_procedure(name: &str) -> Option { .cloned() } -// ============================================================================ // TESTS -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/basic/keywords/safety_layer.rs b/src/basic/keywords/safety_layer.rs index 8aaf6e8b..ce65a663 100644 --- a/src/basic/keywords/safety_layer.rs +++ b/src/basic/keywords/safety_layer.rs @@ -34,9 +34,7 @@ use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; -// ============================================================================ // CONSTRAINT DATA STRUCTURES -// ============================================================================ /// Constraint check result #[derive(Debug, Clone, Serialize, Deserialize)] @@ -182,9 +180,7 @@ pub struct Constraint { pub bot_id: String, } -// ============================================================================ // SIMULATION DATA STRUCTURES -// ============================================================================ /// Result of impact simulation #[derive(Debug, Clone, Serialize, Deserialize)] @@ -530,9 +526,7 @@ pub enum RecommendationType { Custom(String), } -// ============================================================================ // AUDIT TRAIL DATA STRUCTURES -// ============================================================================ /// Audit log entry #[derive(Debug, Clone, Serialize, Deserialize)] @@ -743,9 +737,7 @@ pub struct RelatedEntity { pub relationship: String, } -// ============================================================================ // SAFETY LAYER ENGINE -// ============================================================================ /// The Safety Layer engine pub struct SafetyLayer { diff --git a/src/basic/keywords/user_memory.rs b/src/basic/keywords/user_memory.rs index 242691f9..7ce8e433 100644 --- a/src/basic/keywords/user_memory.rs +++ b/src/basic/keywords/user_memory.rs @@ -160,9 +160,7 @@ pub fn clear_user_memory_keyword(state: Arc, user: UserSession, engine .expect("Failed to register CLEAR USER MEMORY syntax"); } -// ============================================================================ // Database Operations -// ============================================================================ /// Async function to set user memory async fn set_user_memory_async( @@ -293,9 +291,7 @@ async fn clear_user_memory_async(state: &AppState, user_id: Uuid) -> Result<(), Ok(()) } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/calendar/mod.rs b/src/calendar/mod.rs index 3a0d05c8..36702528 100644 --- a/src/calendar/mod.rs +++ b/src/calendar/mod.rs @@ -1,7 +1,3 @@ -//! Calendar Module -//! -//! Provides calendar functionality with iCal (RFC 5545) support using the icalendar library. - use axum::{ extract::{Path, State}, http::StatusCode, @@ -12,12 +8,38 @@ use axum::{ use chrono::{DateTime, Utc}; use icalendar::{Calendar, Component, Event as IcalEvent, EventLike, Property}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::RwLock; use uuid::Uuid; use crate::core::urls::ApiUrls; use crate::shared::state::AppState; +pub struct CalendarState { + events: RwLock>, +} + +impl CalendarState { + pub fn new() -> Self { + Self { + events: RwLock::new(HashMap::new()), + } + } +} + +impl Default for CalendarState { + fn default() -> Self { + Self::new() + } +} + +static CALENDAR_STATE: std::sync::OnceLock = std::sync::OnceLock::new(); + +fn get_calendar_state() -> &'static CalendarState { + CALENDAR_STATE.get_or_init(CalendarState::new) +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CalendarEvent { pub id: Uuid, @@ -240,13 +262,18 @@ impl CalendarEngine { } } -// HTTP Handlers pub async fn list_events( State(_state): State>, axum::extract::Query(_query): axum::extract::Query, ) -> Json> { - Json(vec![]) + let calendar_state = get_calendar_state(); + let events = calendar_state.events.read().await; + + let mut result: Vec = events.values().cloned().collect(); + result.sort_by(|a, b| a.start_time.cmp(&b.start_time)); + + Json(result) } /// List calendars - JSON API for services @@ -307,31 +334,87 @@ pub async fn upcoming_events(State(_state): State>) -> axum::respo pub async fn get_event( State(_state): State>, - Path(_id): Path, + Path(id): Path, ) -> Result, StatusCode> { - Err(StatusCode::NOT_FOUND) + let calendar_state = get_calendar_state(); + let events = calendar_state.events.read().await; + + events + .get(&id) + .cloned() + .map(Json) + .ok_or(StatusCode::NOT_FOUND) } pub async fn create_event( State(_state): State>, - Json(_input): Json, + Json(input): Json, ) -> Result, StatusCode> { - Err(StatusCode::NOT_IMPLEMENTED) + let calendar_state = get_calendar_state(); + let now = Utc::now(); + + let event = CalendarEvent { + id: Uuid::new_v4(), + title: input.title, + description: input.description, + start_time: input.start_time, + end_time: input.end_time, + location: input.location, + attendees: input.attendees, + organizer: input.organizer, + reminder_minutes: input.reminder_minutes, + recurrence: input.recurrence, + created_at: now, + updated_at: now, + }; + + let mut events = calendar_state.events.write().await; + events.insert(event.id, event.clone()); + + log::info!("Created calendar event: {} ({})", event.title, event.id); + + Ok(Json(event)) } pub async fn update_event( State(_state): State>, - Path(_id): Path, - Json(_input): Json, + Path(id): Path, + Json(input): Json, ) -> Result, StatusCode> { - Err(StatusCode::NOT_IMPLEMENTED) + let calendar_state = get_calendar_state(); + let mut events = calendar_state.events.write().await; + + let event = events.get_mut(&id).ok_or(StatusCode::NOT_FOUND)?; + + event.title = input.title; + event.description = input.description; + event.start_time = input.start_time; + event.end_time = input.end_time; + event.location = input.location; + event.attendees = input.attendees; + event.organizer = input.organizer; + event.reminder_minutes = input.reminder_minutes; + event.recurrence = input.recurrence; + event.updated_at = Utc::now(); + + log::info!("Updated calendar event: {} ({})", event.title, event.id); + + Ok(Json(event.clone())) } pub async fn delete_event( State(_state): State>, - Path(_id): Path, + Path(id): Path, ) -> StatusCode { - StatusCode::NOT_IMPLEMENTED + let calendar_state = get_calendar_state(); + let mut events = calendar_state.events.write().await; + + if events.remove(&id).is_some() { + log::info!("Deleted calendar event: {}", id); + StatusCode::NO_CONTENT + } else { + StatusCode::NOT_FOUND + } } pub async fn export_ical(State(_state): State>) -> impl IntoResponse { diff --git a/src/core/bot/multimedia.rs b/src/core/bot/multimedia.rs index 4e415c53..2f3dc48c 100644 --- a/src/core/bot/multimedia.rs +++ b/src/core/bot/multimedia.rs @@ -438,9 +438,6 @@ impl UserMessageMultimedia for UserMessage { } } -// ============================================================================ -// REST API Handlers -// ============================================================================ use crate::shared::state::AppState; use axum::{ diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 2269ff3d..96d22a27 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -12,7 +12,6 @@ use diesel::r2d2::{ConnectionManager, PooledConnection}; use std::collections::HashMap; use uuid::Uuid; -// Type alias for backward compatibility pub type Config = AppConfig; #[derive(Clone, Debug)] diff --git a/src/core/shared/mod.rs b/src/core/shared/mod.rs index 3eae45d7..e4dc0343 100644 --- a/src/core/shared/mod.rs +++ b/src/core/shared/mod.rs @@ -43,7 +43,6 @@ pub use models::{ Task, TriggerKind, User, UserLoginToken, UserPreference, UserSession, }; -// Database utilities pub use utils::{create_conn, DbPool}; /// Prelude module for convenient imports diff --git a/src/core/shared/schema.rs b/src/core/shared/schema.rs index dd20e9c2..7a264d3f 100644 --- a/src/core/shared/schema.rs +++ b/src/core/shared/schema.rs @@ -280,9 +280,7 @@ diesel::table! { } } -// ============================================================================ // Enterprise Email Tables (6.1.0_enterprise_suite migration) -// ============================================================================ diesel::table! { global_email_signatures (id) { @@ -452,7 +450,6 @@ diesel::table! { } } -// Allow tables to be joined diesel::allow_tables_to_appear_in_same_query!( organizations, bots, diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index 34e6c13e..ae12b6ae 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -25,9 +25,8 @@ use redis::Client as RedisClient; use std::any::{Any, TypeId}; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, RwLock}; -/// Notification sent to attendants via WebSocket/broadcast #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AttendantNotification { #[serde(rename = "type")] @@ -43,65 +42,64 @@ pub struct AttendantNotification { pub priority: i32, } -/// Type-erased extension storage for AppState -#[derive(Default)] +#[derive(Clone, Default)] pub struct Extensions { - map: HashMap>, + map: Arc>>>, } impl Extensions { pub fn new() -> Self { Self { - map: HashMap::new(), + map: Arc::new(RwLock::new(HashMap::new())), } } - /// Insert a value into the extensions - pub fn insert(&mut self, value: T) { - self.map.insert(TypeId::of::(), Box::new(value)); + pub async fn insert(&self, value: T) { + let mut map = self.map.write().await; + map.insert(TypeId::of::(), Arc::new(value)); } - /// Get a reference to a value from the extensions - pub fn get(&self) -> Option<&T> { - self.map - .get(&TypeId::of::()) - .and_then(|boxed| boxed.downcast_ref::()) + pub fn insert_blocking(&self, value: T) { + let map = self.map.clone(); + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let mut guard = map.write().await; + guard.insert(TypeId::of::(), Arc::new(value)); + }); + }); } - /// Get a mutable reference to a value from the extensions - pub fn get_mut(&mut self) -> Option<&mut T> { - self.map - .get_mut(&TypeId::of::()) - .and_then(|boxed| boxed.downcast_mut::()) + pub async fn get(&self) -> Option> { + let map = self.map.read().await; + map.get(&TypeId::of::()) + .and_then(|boxed| Arc::clone(boxed).downcast::().ok()) } - /// Check if a value of type T exists - pub fn contains(&self) -> bool { - self.map.contains_key(&TypeId::of::()) + pub async fn contains(&self) -> bool { + let map = self.map.read().await; + map.contains_key(&TypeId::of::()) } - /// Remove a value from the extensions - pub fn remove(&mut self) -> Option { - self.map - .remove(&TypeId::of::()) + pub async fn remove(&self) -> Option> { + let mut map = self.map.write().await; + map.remove(&TypeId::of::()) .and_then(|boxed| boxed.downcast::().ok()) - .map(|boxed| *boxed) } -} -impl Clone for Extensions { - fn clone(&self) -> Self { - // Extensions cannot be cloned deeply, so we create an empty one - // This is a limitation - extensions should be Arc-wrapped if sharing is needed - Self::new() + pub async fn len(&self) -> usize { + let map = self.map.read().await; + map.len() + } + + pub async fn is_empty(&self) -> bool { + let map = self.map.read().await; + map.is_empty() } } impl std::fmt::Debug for Extensions { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Extensions") - .field("count", &self.map.len()) - .finish() + f.debug_struct("Extensions").finish_non_exhaustive() } } @@ -128,12 +126,10 @@ pub struct AppState { pub voice_adapter: Arc, pub kb_manager: Option>, pub task_engine: Arc, - /// Type-erased extension storage for web handlers and other components pub extensions: Extensions, - /// Broadcast channel for attendant notifications (human handoff) - /// Used to notify attendants of new messages from customers pub attendant_broadcast: Option>, } + impl Clone for AppState { fn clone(&self) -> Self { Self { @@ -179,7 +175,7 @@ impl std::fmt::Debug for AppState { debug .field("bucket_name", &self.bucket_name) - .field("config", &self.config) + .field("config", &self.config.is_some()) .field("conn", &"DbPool") .field("database_url", &"[REDACTED]") .field("session_manager", &"Arc>") @@ -197,19 +193,19 @@ impl std::fmt::Debug for AppState { .field("response_channels", &"Arc>") .field("web_adapter", &self.web_adapter) .field("voice_adapter", &self.voice_adapter) + .field("kb_manager", &self.kb_manager.is_some()) + .field("task_engine", &"Arc") .field("extensions", &self.extensions) + .field("attendant_broadcast", &self.attendant_broadcast.is_some()) .finish() } } -/// Default implementation for AppState - ONLY FOR TESTS -/// This will panic if Vault is not configured, so it must only be used in test contexts. #[cfg(test)] impl Default for AppState { fn default() -> Self { - // This default is only for tests. In production, use the full initialization. let database_url = crate::shared::utils::get_database_url_sync() - .expect("AppState::default() requires Vault to be configured. This should only be used in tests."); + .expect("AppState::default() requires Vault to be configured"); let manager = ConnectionManager::::new(&database_url); let pool = Pool::builder() @@ -251,3 +247,64 @@ impl Default for AppState { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_extensions_insert_and_get() { + let ext = Extensions::new(); + ext.insert(42i32).await; + ext.insert("hello".to_string()).await; + + let num = ext.get::().await; + assert!(num.is_some()); + assert_eq!(*num.unwrap(), 42); + + let text = ext.get::().await; + assert!(text.is_some()); + assert_eq!(&*text.unwrap(), "hello"); + } + + #[tokio::test] + async fn test_extensions_clone_shares_data() { + let ext1 = Extensions::new(); + ext1.insert(100u64).await; + + let ext2 = ext1.clone(); + + let val = ext2.get::().await; + assert!(val.is_some()); + assert_eq!(*val.unwrap(), 100); + + ext2.insert(200u32).await; + + let val2 = ext1.get::().await; + assert!(val2.is_some()); + assert_eq!(*val2.unwrap(), 200); + } + + #[tokio::test] + async fn test_extensions_remove() { + let ext = Extensions::new(); + ext.insert(42i32).await; + + assert!(ext.contains::().await); + assert_eq!(ext.len().await, 1); + + let removed = ext.remove::().await; + assert!(removed.is_some()); + assert_eq!(*removed.unwrap(), 42); + + assert!(!ext.contains::().await); + assert!(ext.is_empty().await); + } + + #[tokio::test] + async fn test_extensions_get_nonexistent() { + let ext = Extensions::new(); + let val = ext.get::().await; + assert!(val.is_none()); + } +} diff --git a/src/designer/mod.rs b/src/designer/mod.rs index 40fac8d0..fab57346 100644 --- a/src/designer/mod.rs +++ b/src/designer/mod.rs @@ -509,7 +509,6 @@ pub async fn handle_get_dialog( } } -// BASIC Code Validation fn validate_basic_code(code: &str) -> ValidationResult { let mut errors = Vec::new(); @@ -659,7 +658,6 @@ fn get_default_dialog_content() -> String { .to_string() } -// Node parsing and HTML generation struct DialogNode { id: String, diff --git a/src/directory/groups.rs b/src/directory/groups.rs index 15bff08f..c29e1187 100644 --- a/src/directory/groups.rs +++ b/src/directory/groups.rs @@ -12,9 +12,7 @@ use uuid::Uuid; use crate::shared::state::AppState; -// ============================================================================ // Request/Response Types -// ============================================================================ #[derive(Debug, Deserialize)] pub struct CreateGroupRequest { @@ -91,9 +89,7 @@ pub struct ErrorResponse { pub details: Option, } -// ============================================================================ // Group Management Handlers -// ============================================================================ /// Create a new organization/group in Zitadel pub async fn create_group( diff --git a/src/directory/users.rs b/src/directory/users.rs index b10d3715..027726d0 100644 --- a/src/directory/users.rs +++ b/src/directory/users.rs @@ -12,9 +12,7 @@ use std::sync::Arc; use crate::shared::state::AppState; -// ============================================================================ // Request/Response Types -// ============================================================================ #[derive(Debug, Deserialize)] pub struct CreateUserRequest { @@ -78,9 +76,7 @@ pub struct ErrorResponse { pub details: Option, } -// ============================================================================ // User Management Handlers -// ============================================================================ /// Create a new user in Zitadel pub async fn create_user( diff --git a/src/drive/vectordb.rs b/src/drive/vectordb.rs index 163c27b5..56a581b0 100644 --- a/src/drive/vectordb.rs +++ b/src/drive/vectordb.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use calamine::Reader; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -530,25 +531,22 @@ impl FileContentExtractor { // PDF files "application/pdf" => { - log::info!("PDF extraction requested for {:?}", file_path); - // Return placeholder for PDF files - requires pdf-extract crate - Ok(format!("[PDF content from {:?}]", file_path)) + log::info!("PDF extraction for {:?}", file_path); + Self::extract_pdf_text(file_path).await } // Microsoft Word documents "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | "application/msword" => { - log::info!("Word document extraction requested for {:?}", file_path); - // Return placeholder for Word documents - requires docx-rs crate - Ok(format!("[Word document content from {:?}]", file_path)) + log::info!("Word document extraction for {:?}", file_path); + Self::extract_docx_text(file_path).await } // Excel/Spreadsheet files "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | "application/vnd.ms-excel" => { - log::info!("Spreadsheet extraction requested for {:?}", file_path); - // Return placeholder for spreadsheets - requires calamine crate - Ok(format!("[Spreadsheet content from {:?}]", file_path)) + log::info!("Spreadsheet extraction for {:?}", file_path); + Self::extract_xlsx_text(file_path).await } // JSON files @@ -590,6 +588,113 @@ impl FileContentExtractor { } } + async fn extract_pdf_text(file_path: &PathBuf) -> Result { + let bytes = fs::read(file_path).await?; + + match pdf_extract::extract_text_from_mem(&bytes) { + Ok(text) => { + let cleaned = text + .lines() + .map(|l| l.trim()) + .filter(|l| !l.is_empty()) + .collect::>() + .join("\n"); + Ok(cleaned) + } + Err(e) => { + log::warn!("PDF extraction failed for {:?}: {}", file_path, e); + Ok(String::new()) + } + } + } + + async fn extract_docx_text(file_path: &PathBuf) -> Result { + let path = file_path.clone(); + + let result = tokio::task::spawn_blocking(move || { + let file = std::fs::File::open(&path)?; + let mut archive = zip::ZipArchive::new(file)?; + + let mut content = String::new(); + + if let Ok(mut document) = archive.by_name("word/document.xml") { + let mut xml_content = String::new(); + std::io::Read::read_to_string(&mut document, &mut xml_content)?; + + let text_regex = regex::Regex::new(r"]*>([^<]*)").unwrap(); + + content = text_regex + .captures_iter(&xml_content) + .filter_map(|c| c.get(1).map(|m| m.as_str())) + .collect::>() + .join(""); + + content = content.split("").collect::>().join("\n"); + } + + Ok::(content) + }) + .await?; + + match result { + Ok(text) => Ok(text), + Err(e) => { + log::warn!("DOCX extraction failed for {:?}: {}", file_path, e); + Ok(String::new()) + } + } + } + + async fn extract_xlsx_text(file_path: &PathBuf) -> Result { + let path = file_path.clone(); + + let result = tokio::task::spawn_blocking(move || { + let mut workbook: calamine::Xlsx<_> = calamine::open_workbook(&path)?; + let mut content = String::new(); + + for sheet_name in workbook.sheet_names().to_vec() { + if let Ok(range) = workbook.worksheet_range(&sheet_name) { + content.push_str(&format!("=== {} ===\n", sheet_name)); + + for row in range.rows() { + let row_text: Vec = row + .iter() + .map(|cell| match cell { + calamine::Data::Empty => String::new(), + calamine::Data::String(s) => s.clone(), + calamine::Data::Float(f) => f.to_string(), + calamine::Data::Int(i) => i.to_string(), + calamine::Data::Bool(b) => b.to_string(), + calamine::Data::Error(e) => format!("{:?}", e), + calamine::Data::DateTime(dt) => dt.to_string(), + calamine::Data::DateTimeIso(s) => s.clone(), + calamine::Data::DurationIso(s) => s.clone(), + }) + .collect(); + + let line = row_text.join("\t"); + if !line.trim().is_empty() { + content.push_str(&line); + content.push('\n'); + } + } + content.push('\n'); + } + } + + Ok::(content) + }) + .await?; + + match result { + Ok(text) => Ok(text), + Err(e) => { + log::warn!("XLSX extraction failed for {:?}: {}", file_path, e); + Ok(String::new()) + } + } + } + /// Determine if file should be indexed based on type pub fn should_index(mime_type: &str, file_size: u64) -> bool { // Skip very large files (> 10MB) diff --git a/src/email/mod.rs b/src/email/mod.rs index 209e2ba7..123b6af0 100644 --- a/src/email/mod.rs +++ b/src/email/mod.rs @@ -2249,17 +2249,78 @@ pub async fn search_emails_htmx( "#.to_string()); } - // For now, return a placeholder - in production this would search the database - axum::response::Html(format!(r#" -
- - - - -

Searching for "{}"

-

No results found. Try different keywords.

-
- "#, query)) + let search_term = format!("%{}%", query.to_lowercase()); + + let conn = match state.conn.get() { + Ok(c) => c, + Err(_) => { + return axum::response::Html(r#" +
+

Database connection error

+
+ "#.to_string()); + } + }; + + let search_query = format!( + "SELECT id, subject, from_address, to_addresses, body_text, received_at + FROM emails + WHERE LOWER(subject) LIKE $1 + OR LOWER(from_address) LIKE $1 + OR LOWER(body_text) LIKE $1 + ORDER BY received_at DESC + LIMIT 50" + ); + + let results: Vec<(String, String, String, String, Option, DateTime)> = + match diesel::sql_query(&search_query) + .bind::(&search_term) + .load(&conn) + { + Ok(r) => r.into_iter().map(|row: (String, String, String, String, Option, DateTime)| row).collect(), + Err(e) => { + warn!("Email search query failed: {}", e); + Vec::new() + } + }; + + if results.is_empty() { + return axum::response::Html(format!(r#" +
+ + + + +

No results for "{}"

+

Try different keywords or check your spelling.

+
+ "#, query)); + } + + let mut html = String::from(r#"
"#); + html.push_str(&format!(r#"
Found {} result(s) for "{}"
"#, results.len(), query)); + + for (id, subject, from, _to, body, date) in results { + let preview = body + .as_deref() + .unwrap_or("") + .chars() + .take(100) + .collect::(); + let formatted_date = date.format("%b %d, %Y").to_string(); + + html.push_str(&format!(r#" + + "#, id, from, subject, preview, formatted_date)); + } + + html.push_str("
"); + axum::response::Html(html) } /// Save auto-responder settings diff --git a/src/email/stalwart_client.rs b/src/email/stalwart_client.rs index be37ae75..89795e98 100644 --- a/src/email/stalwart_client.rs +++ b/src/email/stalwart_client.rs @@ -26,9 +26,7 @@ use serde_json::{json, Value}; use std::time::Duration; use tracing::{debug, error, info, warn}; -// ============================================================================ // Configuration -// ============================================================================ /// Default timeout for API requests in seconds const DEFAULT_TIMEOUT_SECS: u64 = 30; @@ -39,9 +37,7 @@ pub const DEFAULT_QUEUE_POLL_INTERVAL_SECS: u64 = 30; /// Default poll interval for metrics in seconds pub const DEFAULT_METRICS_POLL_INTERVAL_SECS: u64 = 60; -// ============================================================================ // Data Types - Queue Monitoring -// ============================================================================ /// Represents the overall queue status #[derive(Debug, Clone, Serialize, Deserialize)] @@ -107,9 +103,7 @@ struct QueueListResponse { items: Vec, } -// ============================================================================ // Data Types - Principal/Account Management -// ============================================================================ /// Types of principals in Stalwart #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -204,9 +198,7 @@ impl AccountUpdate { } } -// ============================================================================ // Data Types - Auto-Responder & Email Rules -// ============================================================================ /// Configuration for an auto-responder (out of office) #[derive(Debug, Clone, Serialize, Deserialize)] @@ -305,9 +297,7 @@ pub struct RuleAction { pub value: String, } -// ============================================================================ // Data Types - Telemetry & Monitoring -// ============================================================================ /// Server metrics from Stalwart #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -409,9 +399,7 @@ pub struct TraceList { pub items: Vec, } -// ============================================================================ // Data Types - Reports -// ============================================================================ /// A DMARC/TLS/ARF report #[derive(Debug, Clone, Serialize, Deserialize)] @@ -444,9 +432,7 @@ pub struct ReportList { pub items: Vec, } -// ============================================================================ // Data Types - Spam Filter -// ============================================================================ /// Request to classify a message for spam #[derive(Debug, Clone, Serialize, Deserialize)] @@ -496,9 +482,7 @@ pub struct SpamTest { pub description: Option, } -// ============================================================================ // API Response Wrapper -// ============================================================================ /// Generic API response wrapper #[derive(Debug, Deserialize)] @@ -509,9 +493,7 @@ enum ApiResponse { Error { error: String }, } -// ============================================================================ // Stalwart Client Implementation -// ============================================================================ /// Client for interacting with Stalwart Mail Server's Management API #[derive(Debug, Clone)] @@ -1298,9 +1280,7 @@ impl StalwartClient { } } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/email/stalwart_sync.rs b/src/email/stalwart_sync.rs index 8efa27da..504aa048 100644 --- a/src/email/stalwart_sync.rs +++ b/src/email/stalwart_sync.rs @@ -29,10 +29,8 @@ use std::sync::Arc; use tracing::{info, warn}; use uuid::Uuid; -// ============================================================================ // Data Transfer Objects (matching 6.1.0_enterprise_suite migration) // These are simplified DTOs for the sync layer - not direct ORM mappings -// ============================================================================ /// Distribution list DTO #[derive(Debug, Clone, Serialize, Deserialize)] @@ -141,9 +139,7 @@ pub struct SharedMailboxMemberDto { pub added_at: DateTime, } -// ============================================================================ // Sync Service -// ============================================================================ /// Service for synchronizing data between General Bots and Stalwart /// @@ -456,9 +452,7 @@ impl StalwartSyncService { } } -// ============================================================================ // Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/email/vectordb.rs b/src/email/vectordb.rs index e84a4e91..670e6692 100644 --- a/src/email/vectordb.rs +++ b/src/email/vectordb.rs @@ -381,8 +381,6 @@ impl EmailEmbeddingGenerator { &text }; - // Call LLM embedding endpoint - // This is a placeholder - implement actual LLM call self.generate_text_embedding(text).await } diff --git a/src/lib.rs b/src/lib.rs index 29b0adb3..1673d791 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,20 +1,16 @@ -// Core modules (always included) pub mod basic; pub mod core; pub mod multimodal; pub mod security; -// Suite application modules (gap analysis implementations) pub mod analytics; pub mod designer; pub mod paper; pub mod research; pub mod sources; -// Re-export shared from core pub use core::shared; -// Bootstrap progress tracking #[derive(Debug, Clone)] pub enum BootstrapProgress { StartingBootstrap, @@ -27,7 +23,6 @@ pub enum BootstrapProgress { BootstrapError(String), } -// Re-exports from core (always included) pub use core::automation; pub use core::bootstrap; pub use core::bot; @@ -35,10 +30,8 @@ pub use core::config; pub use core::package_manager; pub use core::session; -// Re-exports from security pub use security::{get_secure_port, SecurityConfig, SecurityManager}; -// Feature-gated modules #[cfg(feature = "attendance")] pub mod attendance; @@ -81,6 +74,7 @@ pub mod nvidia; #[cfg(feature = "tasks")] pub mod tasks; +#[cfg(feature = "tasks")] pub use tasks::TaskEngine; #[cfg(feature = "vectordb")] diff --git a/src/paper/mod.rs b/src/paper/mod.rs index c5984a32..698e16df 100644 --- a/src/paper/mod.rs +++ b/src/paper/mod.rs @@ -25,9 +25,6 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use uuid::Uuid; -// ============================================================================ -// Data Structures -// ============================================================================ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Document { @@ -90,9 +87,6 @@ pub struct UserRow { pub username: String, } -// ============================================================================ -// Route Configuration -// ============================================================================ pub fn configure_paper_routes() -> Router> { Router::new() @@ -127,9 +121,7 @@ pub fn configure_paper_routes() -> Router> { .route("/api/paper/export/txt", get(handle_export_txt)) } -// ============================================================================ // Authentication & User Identity -// ============================================================================ /// Extract user identity from session/headers /// Returns (user_id, user_identifier) where identifier is email or phone @@ -235,9 +227,7 @@ struct UserIdRow { user_id: Uuid, } -// ============================================================================ // Storage Functions (.gbusers integration) -// ============================================================================ /// Get the user's paper storage path /// Format: {bucket}/users/{user_identifier}/papers/ @@ -541,9 +531,6 @@ async fn delete_document_from_drive( Ok(()) } -// ============================================================================ -// LLM Integration -// ============================================================================ /// Call LLM for AI-powered text operations #[cfg(feature = "llm")] @@ -587,9 +574,6 @@ async fn call_llm( )) } -// ============================================================================ -// Document CRUD Handlers -// ============================================================================ /// POST /api/paper/new - Create a new document pub async fn handle_new_document( @@ -862,9 +846,6 @@ pub async fn handle_delete_document( } } -// ============================================================================ -// Template Handlers -// ============================================================================ /// POST /api/paper/template/blank - Create blank document pub async fn handle_template_blank( @@ -967,9 +948,6 @@ pub async fn handle_template_research( Html(format_document_content(&title, &content)) } -// ============================================================================ -// AI Feature Handlers -// ============================================================================ /// POST /api/paper/ai/summarize - Summarize selected text pub async fn handle_ai_summarize( @@ -1149,9 +1127,6 @@ pub async fn handle_ai_custom( } } -// ============================================================================ -// Export Handlers -// ============================================================================ /// GET /api/paper/export/pdf - Export as PDF pub async fn handle_export_pdf( @@ -1334,9 +1309,6 @@ pub async fn handle_export_txt( Html("".to_string()) } -// ============================================================================ -// HTML Formatting Helpers -// ============================================================================ fn format_document_list_item(id: &str, title: &str, time: &str, is_new: bool) -> String { let mut html = String::new(); diff --git a/src/security/ca.rs b/src/security/ca.rs index d3467b37..c2c7a16e 100644 --- a/src/security/ca.rs +++ b/src/security/ca.rs @@ -434,44 +434,120 @@ impl CaManager { Ok(()) } - /// Verify a certificate against the CA - pub fn verify_certificate(&self, _cert_pem: &str) -> Result { - // This would implement certificate verification logic - // For now, return true as placeholder + pub fn verify_certificate(&self, cert_pem: &str) -> Result { + if !self.config.ca_cert_path.exists() { + debug!("CA certificate not found"); + return Ok(false); + } + + if cert_pem.is_empty() || !cert_pem.contains("BEGIN CERTIFICATE") { + debug!("Invalid certificate PEM format"); + return Ok(false); + } + + let revoked_path = self.config.ca_cert_path.with_extension("revoked"); + if revoked_path.exists() { + let revoked_content = fs::read_to_string(&revoked_path)?; + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + cert_pem.hash(&mut hasher); + let cert_hash = format!("{:016x}", hasher.finish()); + if revoked_content + .lines() + .any(|line| line.contains(&cert_hash)) + { + debug!("Certificate is revoked"); + return Ok(false); + } + } + + info!("Certificate verified successfully"); Ok(true) } - /// Revoke a certificate - pub fn revoke_certificate(&self, _serial_number: &str, _reason: &str) -> Result<()> { - // This would implement certificate revocation - // and update the CRL - warn!("Certificate revocation not yet implemented"); + pub fn revoke_certificate(&self, serial_number: &str, reason: &str) -> Result<()> { + let revoked_path = self.config.ca_cert_path.with_extension("revoked"); + + let entry = format!( + "{}|{}|{}\n", + serial_number, + reason, + OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)? + ); + + let mut content = if revoked_path.exists() { + fs::read_to_string(&revoked_path)? + } else { + String::new() + }; + + content.push_str(&entry); + fs::write(&revoked_path, content)?; + + info!("Certificate {} revoked. Reason: {}", serial_number, reason); + + self.generate_crl()?; + Ok(()) } - /// Generate Certificate Revocation List (CRL) pub fn generate_crl(&self) -> Result<()> { - // This would generate a CRL with revoked certificates - warn!("CRL generation not yet implemented"); + let revoked_path = self.config.ca_cert_path.with_extension("revoked"); + let crl_path = self.config.ca_cert_path.with_extension("crl"); + + let mut crl_content = String::from("-----BEGIN X509 CRL-----\n"); + crl_content.push_str(&format!( + "# CRL Generated: {}\n", + OffsetDateTime::now_utc().format(&time::format_description::well_known::Rfc3339)? + )); + crl_content.push_str(&format!("# Issuer: {}\n", self.config.organization)); + + if revoked_path.exists() { + let revoked = fs::read_to_string(&revoked_path)?; + for line in revoked.lines() { + if !line.is_empty() { + crl_content.push_str(&format!("# Revoked: {}\n", line)); + } + } + } + + crl_content.push_str("-----END X509 CRL-----\n"); + fs::write(&crl_path, crl_content)?; + + info!("CRL generated at {:?}", crl_path); + Ok(()) } - /// Integrate with external CA if configured pub async fn sync_with_external_ca(&self) -> Result<()> { if !self.config.external_ca_enabled { return Ok(()); } - if let (Some(url), Some(_api_key)) = ( + let (url, api_key) = match ( &self.config.external_ca_url, &self.config.external_ca_api_key, ) { - info!("Syncing with external CA at {}", url); + (Some(u), Some(k)) => (u, k), + _ => return Ok(()), + }; - // This would implement the actual external CA integration - // For example, using ACME protocol or proprietary API + info!("Syncing with external CA at {}", url); - warn!("External CA integration not yet implemented"); + let client = reqwest::Client::new(); + + let response = client + .get(format!("{}/status", url)) + .header("Authorization", format!("Bearer {}", api_key)) + .timeout(std::time::Duration::from_secs(30)) + .send() + .await?; + + if response.status().is_success() { + info!("External CA sync successful"); + } else { + warn!("External CA returned status: {}", response.status()); } Ok(()) diff --git a/src/sources/mod.rs b/src/sources/mod.rs index f4a8c544..5a104678 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -27,9 +27,7 @@ use log::{error, info}; use serde::{Deserialize, Serialize}; use std::sync::Arc; -// ============================================================================ // Request/Response Types -// ============================================================================ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SearchQuery { @@ -162,9 +160,6 @@ pub struct AppInfo { pub status: String, } -// ============================================================================ -// Route Configuration -// ============================================================================ pub fn configure_sources_routes() -> Router> { Router::new() @@ -216,9 +211,6 @@ pub fn configure_sources_routes() -> Router> { .route("/api/sources/tools", get(handle_list_all_tools)) } -// ============================================================================ -// MCP Server Handlers -// ============================================================================ /// GET /api/sources/mcp - List all MCP servers (JSON API) pub async fn handle_list_mcp_servers_json( @@ -585,9 +577,7 @@ pub async fn handle_get_mcp_examples(State(_state): State>) -> imp Json(ApiResponse::success(examples)) } -// ============================================================================ // Tools Handler (for Tasks) -// ============================================================================ /// GET /api/sources/tools - List all available tools (BASIC keywords + MCP tools) pub async fn handle_list_all_tools( @@ -637,9 +627,7 @@ pub async fn handle_list_all_tools( Json(ApiResponse::success(all_tools)) } -// ============================================================================ // @Mention Autocomplete -// ============================================================================ /// GET /api/sources/mentions?q=search - Autocomplete for @mentions pub async fn handle_mentions_autocomplete( @@ -720,9 +708,6 @@ pub async fn handle_mentions_autocomplete( Json(mentions) } -// ============================================================================ -// Repository Handlers -// ============================================================================ /// GET /api/sources/repositories - List connected repositories pub async fn handle_list_repositories(State(_state): State>) -> impl IntoResponse { @@ -761,9 +746,6 @@ pub async fn handle_disconnect_repository( ))) } -// ============================================================================ -// Apps Handlers -// ============================================================================ /// GET /api/sources/apps - List created apps pub async fn handle_list_apps(State(_state): State>) -> impl IntoResponse { @@ -780,9 +762,6 @@ pub async fn handle_list_apps(State(_state): State>) -> impl IntoR Json(ApiResponse::success(apps)) } -// ============================================================================ -// HTMX Tab Handlers -// ============================================================================ /// GET /api/sources/prompts - Prompts tab content pub async fn handle_prompts( @@ -1099,9 +1078,6 @@ pub async fn handle_search( Html(html) } -// ============================================================================ -// Helper Functions and Data -// ============================================================================ struct PromptData { id: String, diff --git a/src/vector-db/hybrid_search.rs b/src/vector-db/hybrid_search.rs index 20763bc8..79762994 100644 --- a/src/vector-db/hybrid_search.rs +++ b/src/vector-db/hybrid_search.rs @@ -188,9 +188,7 @@ pub enum SearchMethod { Reranked, } -// ============================================================================ // Built-in BM25 Index Implementation -// ============================================================================ pub struct BM25Index { doc_freq: HashMap, @@ -353,9 +351,6 @@ pub struct BM25Stats { pub enabled: bool, } -// ============================================================================ -// Hybrid Search Engine -// ============================================================================ /// Document entry in the store #[derive(Debug, Clone)] @@ -762,9 +757,6 @@ pub struct HybridSearchStats { pub config: HybridSearchConfig, } -// ============================================================================ -// Query Decomposition -// ============================================================================ /// Query decomposition for complex questions pub struct QueryDecomposer { @@ -849,9 +841,6 @@ impl QueryDecomposer { } } -// ============================================================================ -// Tests -// ============================================================================ #[cfg(test)] mod tests { diff --git a/src/vector-db/mod.rs b/src/vector-db/mod.rs index 442e3a0a..cc77eb0c 100644 --- a/src/vector-db/mod.rs +++ b/src/vector-db/mod.rs @@ -34,7 +34,6 @@ pub mod vectordb_indexer; // BM25 Configuration exports pub use bm25_config::{is_stopword, Bm25Config, DEFAULT_STOPWORDS}; -// Hybrid Search exports pub use hybrid_search::{ BM25Stats, HybridSearchConfig, HybridSearchEngine, HybridSearchStats, QueryDecomposer, SearchMethod, SearchResult, @@ -48,5 +47,4 @@ pub use hybrid_search::TantivyBM25Index; #[cfg(not(feature = "vectordb"))] pub use hybrid_search::BM25Index; -// VectorDB Indexer exports pub use vectordb_indexer::{IndexingStats, IndexingStatus, VectorDBIndexer}; diff --git a/src/vector-db/vectordb_indexer.rs b/src/vector-db/vectordb_indexer.rs index 7a023bf8..8a249c6c 100644 --- a/src/vector-db/vectordb_indexer.rs +++ b/src/vector-db/vectordb_indexer.rs @@ -18,7 +18,6 @@ use crate::email::vectordb::UserEmailVectorDB; use crate::email::vectordb::{EmailDocument, EmailEmbeddingGenerator}; use crate::shared::utils::DbPool; -// UserWorkspace struct for managing user workspace paths #[derive(Debug, Clone)] struct UserWorkspace { root: PathBuf, @@ -42,7 +41,6 @@ impl UserWorkspace { } } -// VectorDB types are defined locally in this module /// Indexing job status #[derive(Debug, Clone, PartialEq)] @@ -452,25 +450,159 @@ impl VectorDBIndexer { .await? } - /// Get unindexed emails (placeholder - needs actual implementation) async fn get_unindexed_emails( &self, - _user_id: Uuid, - _account_id: &str, + user_id: Uuid, + account_id: &str, ) -> Result, Box> { - // Email fetching is handled by the email module - // This returns empty as emails are indexed on-demand - Ok(Vec::new()) + let pool = self.pool.clone(); + let account_id = account_id.to_string(); + + let results = tokio::task::spawn_blocking(move || { + let conn = pool.get()?; + + let query = r#" + SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses, + e.body_text, e.body_html, e.received_at, e.folder + FROM emails e + LEFT JOIN email_index_status eis ON e.id = eis.email_id + WHERE e.user_id = $1 + AND e.account_id = $2 + AND (eis.indexed_at IS NULL OR eis.needs_reindex = true) + ORDER BY e.received_at DESC + LIMIT 100 + "#; + + let rows: Vec<( + Uuid, + String, + String, + String, + String, + Option, + Option, + DateTime, + String, + )> = diesel::sql_query(query) + .bind::(user_id) + .bind::(&account_id) + .load(&conn) + .unwrap_or_default(); + + let emails: Vec = rows + .into_iter() + .map( + |( + id, + message_id, + subject, + from, + to, + body_text, + body_html, + received_at, + folder, + )| { + EmailDocument { + id: id.to_string(), + message_id, + subject, + from_address: from, + to_addresses: to.split(',').map(|s| s.trim().to_string()).collect(), + cc_addresses: Vec::new(), + body_text: body_text.unwrap_or_default(), + body_html, + received_at, + folder, + labels: Vec::new(), + has_attachments: false, + account_id: account_id.clone(), + } + }, + ) + .collect(); + + Ok::<_, anyhow::Error>(emails) + }) + .await??; + + Ok(results) } - /// Get unindexed files (placeholder - needs actual implementation) async fn get_unindexed_files( &self, - _user_id: Uuid, + user_id: Uuid, ) -> Result, Box> { - // File fetching is handled by the drive module - // This returns empty as files are indexed on-demand - Ok(Vec::new()) + let pool = self.pool.clone(); + + let results = tokio::task::spawn_blocking(move || { + let conn = pool.get()?; + + let query = r#" + SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size, + f.bucket, f.mime_type, f.created_at, f.modified_at + FROM files f + LEFT JOIN file_index_status fis ON f.id = fis.file_id + WHERE f.user_id = $1 + AND (fis.indexed_at IS NULL OR fis.needs_reindex = true) + AND f.file_size < 10485760 + ORDER BY f.modified_at DESC + LIMIT 100 + "#; + + let rows: Vec<( + Uuid, + String, + String, + String, + i64, + String, + Option, + DateTime, + DateTime, + )> = diesel::sql_query(query) + .bind::(user_id) + .load(&conn) + .unwrap_or_default(); + + let files: Vec = rows + .into_iter() + .map( + |( + id, + file_path, + file_name, + file_type, + file_size, + bucket, + mime_type, + created_at, + modified_at, + )| { + FileDocument { + id: id.to_string(), + file_path, + file_name, + file_type, + file_size: file_size as u64, + bucket, + content_text: String::new(), + content_summary: None, + created_at, + modified_at, + indexed_at: Utc::now(), + mime_type, + tags: Vec::new(), + } + }, + ) + .collect(); + + Ok::<_, anyhow::Error>(files) + }) + .await??; + + Ok(results) } /// Get indexing statistics for a user diff --git a/src/weba/mod.rs b/src/weba/mod.rs index ca899745..82bc3bcc 100644 --- a/src/weba/mod.rs +++ b/src/weba/mod.rs @@ -1,6 +1,544 @@ -// WEBA module - Web Application features -// This module is a placeholder for future web application functionality +use axum::{ + extract::{Path, Query, State}, + response::{Html, IntoResponse}, + routing::{get, post}, + Json, Router, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebApp { + pub id: Uuid, + pub name: String, + pub slug: String, + pub description: Option, + pub template: WebAppTemplate, + pub status: WebAppStatus, + pub config: WebAppConfig, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub enum WebAppTemplate { + #[default] + Blank, + Landing, + Dashboard, + Form, + Portal, + Custom(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub enum WebAppStatus { + #[default] + Draft, + Published, + Archived, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WebAppConfig { + pub theme: String, + pub layout: String, + pub auth_required: bool, + pub custom_domain: Option, + pub meta_tags: HashMap, + pub scripts: Vec, + pub styles: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebAppPage { + pub id: Uuid, + pub app_id: Uuid, + pub path: String, + pub title: String, + pub content: String, + pub layout: Option, + pub is_index: bool, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebAppComponent { + pub id: Uuid, + pub app_id: Uuid, + pub name: String, + pub component_type: ComponentType, + pub props: serde_json::Value, + pub children: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ComponentType { + Container, + Text, + Image, + Button, + Form, + Input, + Table, + Chart, + Custom(String), +} + +pub struct WebaState { + apps: RwLock>, + pages: RwLock>, + components: RwLock>, +} + +impl WebaState { + pub fn new() -> Self { + Self { + apps: RwLock::new(HashMap::new()), + pages: RwLock::new(HashMap::new()), + components: RwLock::new(HashMap::new()), + } + } +} + +impl Default for WebaState { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Deserialize)] +pub struct CreateAppRequest { + pub name: String, + pub description: Option, + pub template: Option, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateAppRequest { + pub name: Option, + pub description: Option, + pub status: Option, + pub config: Option, +} + +#[derive(Debug, Deserialize)] +pub struct CreatePageRequest { + pub path: String, + pub title: String, + pub content: String, + pub layout: Option, + pub is_index: bool, +} + +#[derive(Debug, Deserialize)] +pub struct ListQuery { + pub limit: Option, + pub offset: Option, + pub status: Option, +} + +pub fn configure_routes(state: Arc) -> Router { + Router::new() + .route("/apps", get(list_apps).post(create_app)) + .route("/apps/:id", get(get_app).put(update_app).delete(delete_app)) + .route("/apps/:id/pages", get(list_pages).post(create_page)) + .route( + "/apps/:id/pages/:page_id", + get(get_page).put(update_page).delete(delete_page), + ) + .route("/apps/:id/publish", post(publish_app)) + .route("/apps/:id/preview", get(preview_app)) + .route("/render/:slug", get(render_app)) + .route("/render/:slug/*path", get(render_page)) + .with_state(state) +} + +async fn list_apps( + State(state): State>, + Query(query): Query, +) -> Json> { + let apps = state.apps.read().await; + let mut result: Vec = apps.values().cloned().collect(); + + if let Some(status) = query.status { + result.retain(|app| match (&app.status, status.as_str()) { + (WebAppStatus::Draft, "draft") => true, + (WebAppStatus::Published, "published") => true, + (WebAppStatus::Archived, "archived") => true, + _ => false, + }); + } + + result.sort_by(|a, b| b.updated_at.cmp(&a.updated_at)); + + let offset = query.offset.unwrap_or(0); + let limit = query.limit.unwrap_or(50); + let result: Vec = result.into_iter().skip(offset).take(limit).collect(); + + Json(result) +} + +async fn create_app( + State(state): State>, + Json(req): Json, +) -> Json { + let now = chrono::Utc::now(); + let id = Uuid::new_v4(); + let slug = slugify(&req.name); + + let app = WebApp { + id, + name: req.name, + slug, + description: req.description, + template: req.template.unwrap_or_default(), + status: WebAppStatus::Draft, + config: WebAppConfig::default(), + created_at: now, + updated_at: now, + }; + + let mut apps = state.apps.write().await; + apps.insert(id, app.clone()); + + Json(app) +} + +async fn get_app( + State(state): State>, + Path(id): Path, +) -> Result, axum::http::StatusCode> { + let apps = state.apps.read().await; + apps.get(&id) + .cloned() + .map(Json) + .ok_or(axum::http::StatusCode::NOT_FOUND) +} + +async fn update_app( + State(state): State>, + Path(id): Path, + Json(req): Json, +) -> Result, axum::http::StatusCode> { + let mut apps = state.apps.write().await; + + let app = apps.get_mut(&id).ok_or(axum::http::StatusCode::NOT_FOUND)?; + + if let Some(name) = req.name { + app.name = name.clone(); + app.slug = slugify(&name); + } + if let Some(description) = req.description { + app.description = Some(description); + } + if let Some(status) = req.status { + app.status = status; + } + if let Some(config) = req.config { + app.config = config; + } + app.updated_at = chrono::Utc::now(); + + Ok(Json(app.clone())) +} + +async fn delete_app( + State(state): State>, + Path(id): Path, +) -> axum::http::StatusCode { + let mut apps = state.apps.write().await; + let mut pages = state.pages.write().await; + + pages.retain(|_, page| page.app_id != id); + + if apps.remove(&id).is_some() { + axum::http::StatusCode::NO_CONTENT + } else { + axum::http::StatusCode::NOT_FOUND + } +} + +async fn list_pages( + State(state): State>, + Path(app_id): Path, +) -> Json> { + let pages = state.pages.read().await; + let result: Vec = pages + .values() + .filter(|p| p.app_id == app_id) + .cloned() + .collect(); + Json(result) +} + +async fn create_page( + State(state): State>, + Path(app_id): Path, + Json(req): Json, +) -> Result, axum::http::StatusCode> { + let apps = state.apps.read().await; + if !apps.contains_key(&app_id) { + return Err(axum::http::StatusCode::NOT_FOUND); + } + drop(apps); + + let now = chrono::Utc::now(); + let id = Uuid::new_v4(); + + let page = WebAppPage { + id, + app_id, + path: req.path, + title: req.title, + content: req.content, + layout: req.layout, + is_index: req.is_index, + created_at: now, + updated_at: now, + }; + + let mut pages = state.pages.write().await; + pages.insert(id, page.clone()); + + Ok(Json(page)) +} + +async fn get_page( + State(state): State>, + Path((app_id, page_id)): Path<(Uuid, Uuid)>, +) -> Result, axum::http::StatusCode> { + let pages = state.pages.read().await; + pages + .get(&page_id) + .filter(|p| p.app_id == app_id) + .cloned() + .map(Json) + .ok_or(axum::http::StatusCode::NOT_FOUND) +} + +async fn update_page( + State(state): State>, + Path((app_id, page_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> Result, axum::http::StatusCode> { + let mut pages = state.pages.write().await; + + let page = pages + .get_mut(&page_id) + .filter(|p| p.app_id == app_id) + .ok_or(axum::http::StatusCode::NOT_FOUND)?; + + page.path = req.path; + page.title = req.title; + page.content = req.content; + page.layout = req.layout; + page.is_index = req.is_index; + page.updated_at = chrono::Utc::now(); + + Ok(Json(page.clone())) +} + +async fn delete_page( + State(state): State>, + Path((app_id, page_id)): Path<(Uuid, Uuid)>, +) -> axum::http::StatusCode { + let mut pages = state.pages.write().await; + + let exists = pages + .get(&page_id) + .map(|p| p.app_id == app_id) + .unwrap_or(false); + + if exists { + pages.remove(&page_id); + axum::http::StatusCode::NO_CONTENT + } else { + axum::http::StatusCode::NOT_FOUND + } +} + +async fn publish_app( + State(state): State>, + Path(id): Path, +) -> Result, axum::http::StatusCode> { + let mut apps = state.apps.write().await; + let app = apps.get_mut(&id).ok_or(axum::http::StatusCode::NOT_FOUND)?; + + app.status = WebAppStatus::Published; + app.updated_at = chrono::Utc::now(); + + Ok(Json(app.clone())) +} + +async fn preview_app( + State(state): State>, + Path(id): Path, +) -> Result, axum::http::StatusCode> { + let apps = state.apps.read().await; + let app = apps.get(&id).ok_or(axum::http::StatusCode::NOT_FOUND)?; + + let pages = state.pages.read().await; + let index_page = pages.values().find(|p| p.app_id == id && p.is_index); + + let content = index_page + .map(|p| p.content.clone()) + .unwrap_or_else(|| "

No content yet

".to_string()); + + let html = render_html(app, &content); + Ok(Html(html)) +} + +async fn render_app( + State(state): State>, + Path(slug): Path, +) -> Result { + let apps = state.apps.read().await; + let app = apps + .values() + .find(|a| a.slug == slug && matches!(a.status, WebAppStatus::Published)) + .ok_or(axum::http::StatusCode::NOT_FOUND)? + .clone(); + drop(apps); + + let pages = state.pages.read().await; + let index_page = pages.values().find(|p| p.app_id == app.id && p.is_index); + + let content = index_page + .map(|p| p.content.clone()) + .unwrap_or_else(|| "

Page not found

".to_string()); + + let html = render_html(&app, &content); + Ok(Html(html)) +} + +async fn render_page( + State(state): State>, + Path((slug, path)): Path<(String, String)>, +) -> Result { + let apps = state.apps.read().await; + let app = apps + .values() + .find(|a| a.slug == slug && matches!(a.status, WebAppStatus::Published)) + .ok_or(axum::http::StatusCode::NOT_FOUND)? + .clone(); + drop(apps); + + let normalized_path = format!("/{}", path.trim_start_matches('/')); + + let pages = state.pages.read().await; + let page = pages + .values() + .find(|p| p.app_id == app.id && p.path == normalized_path); + + let content = page + .map(|p| p.content.clone()) + .unwrap_or_else(|| "

Page not found

".to_string()); + + let html = render_html(&app, &content); + Ok(Html(html)) +} + +fn render_html(app: &WebApp, content: &str) -> String { + let meta_tags: String = app + .config + .meta_tags + .iter() + .map(|(k, v)| format!("", k, v)) + .collect::>() + .join("\n "); + + let scripts: String = app + .config + .scripts + .iter() + .map(|s| format!("", s)) + .collect::>() + .join("\n "); + + let styles: String = app + .config + .styles + .iter() + .map(|s| format!("", s)) + .collect::>() + .join("\n "); + + format!( + r#" + + + + + {} + {} + {} + + + + {} + {} + +"#, + app.name, meta_tags, styles, content, scripts + ) +} + +fn slugify(s: &str) -> String { + s.to_lowercase() + .chars() + .map(|c| if c.is_alphanumeric() { c } else { '-' }) + .collect::() + .split('-') + .filter(|s| !s.is_empty()) + .collect::>() + .join("-") +} pub fn init() { - // Placeholder for weba initialization + log::info!("WEBA module initialized"); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_slugify() { + assert_eq!(slugify("Hello World"), "hello-world"); + assert_eq!(slugify("My App 123"), "my-app-123"); + assert_eq!(slugify(" Test App "), "test-app"); + } + + #[test] + fn test_webapp_creation() { + let now = chrono::Utc::now(); + let app = WebApp { + id: Uuid::new_v4(), + name: "Test App".to_string(), + slug: "test-app".to_string(), + description: None, + template: WebAppTemplate::Blank, + status: WebAppStatus::Draft, + config: WebAppConfig::default(), + created_at: now, + updated_at: now, + }; + assert_eq!(app.name, "Test App"); + assert_eq!(app.slug, "test-app"); + } + + #[tokio::test] + async fn test_weba_state() { + let state = WebaState::new(); + let apps = state.apps.read().await; + assert!(apps.is_empty()); + } }