From 57bf5b87548fe96491f94003a5ff940477d5c9eb Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Thu, 18 Dec 2025 16:17:58 -0300 Subject: [PATCH] feat: add ON EMAIL and ON CHANGE keywords for event-driven monitoring - Add ON EMAIL keyword with FROM/SUBJECT filters - Add ON CHANGE keyword with account:// syntax (gdrive, onedrive, dropbox, local) - Add TriggerKind::EmailReceived (5) and FolderChange (6) - Add migration 6.1.3_bot_hierarchy_monitors with: - email_monitors, folder_monitors tables - email_received_events, folder_change_events tables - user_organizations table - Bot hierarchy: parent_bot_id, enabled_tabs_json, inherit_parent_config - Add 26 unit tests (12 on_email, 12 on_change, 2 trigger_kind) - Update PROMPT.md with weekly maintenance checklist - Zero warnings, zero errors --- PROMPT.md | 161 +++- migrations/6.1.2_connected_accounts/down.sql | 20 + migrations/6.1.2_connected_accounts/up.sql | 72 ++ .../6.1.3_bot_hierarchy_monitors/down.sql | 47 ++ .../6.1.3_bot_hierarchy_monitors/up.sql | 131 ++++ scripts/utils/set-limits.sh | 6 +- src/basic/keywords/file_operations.rs | 183 +++++ src/basic/keywords/mod.rs | 16 + src/basic/keywords/on_change.rs | 709 ++++++++++++++++++ src/basic/keywords/on_email.rs | 598 +++++++++++++++ src/basic/keywords/send_mail.rs | 202 ++++- src/basic/keywords/use_account.rs | 253 +++++++ src/basic/mod.rs | 4 + src/core/shared/models.rs | 21 + 14 files changed, 2410 insertions(+), 13 deletions(-) create mode 100644 migrations/6.1.2_connected_accounts/down.sql create mode 100644 migrations/6.1.2_connected_accounts/up.sql create mode 100644 migrations/6.1.3_bot_hierarchy_monitors/down.sql create mode 100644 migrations/6.1.3_bot_hierarchy_monitors/up.sql create mode 100644 src/basic/keywords/on_change.rs create mode 100644 src/basic/keywords/on_email.rs create mode 100644 src/basic/keywords/use_account.rs diff --git a/PROMPT.md b/PROMPT.md index 53b61ada2..334375ff9 100644 --- a/PROMPT.md +++ b/PROMPT.md @@ -5,6 +5,45 @@ --- +## Weekly Maintenance - EVERY MONDAY + +### Package Review Checklist + +**Every Monday, review the following:** + +1. **Dependency Updates** + ```bash + cargo outdated + cargo audit + ``` + +2. **Package Consolidation Opportunities** + - Check if new crates can replace custom code + - Look for crates that combine multiple dependencies + - Review `Cargo.toml` for redundant dependencies + +3. **Code Reduction Candidates** + - Custom implementations that now have crate equivalents + - Boilerplate that can be replaced with derive macros + - Manual serialization that `serde` can handle + +4. **Security Updates** + ```bash + cargo audit fix + ``` + +### Packages to Watch + +| Area | Potential Packages | Purpose | +|------|-------------------|---------| +| Validation | `validator` | Replace manual validation | +| Date/Time | `chrono`, `time` | Consolidate time handling | +| Email | `lettre` | Simplify email sending | +| File Watching | `notify` | Replace polling with events | +| Background Jobs | `tokio-cron-scheduler` | Simplify scheduling | + +--- + ## Version Management - CRITICAL **Current version is 6.1.0 - DO NOT CHANGE without explicit approval!** @@ -498,4 +537,124 @@ src/shared/ - **Version**: Always 6.1.0 - do not change without approval - **Migrations**: TABLES AND INDEXES ONLY - no views, triggers, functions - **Stalwart**: Use Stalwart IMAP/JMAP API for email features (sieve, filters, etc.) -- **JSON**: Use TEXT columns with `_json` suffix, not JSONB \ No newline at end of file +- **JSON**: Use TEXT columns with `_json` suffix, not JSONB + +--- + +## Monitor Keywords (ON EMAIL, ON CHANGE) + +These keywords register event-driven monitors similar to `SET SCHEDULER`, but triggered by external events. + +### ON EMAIL - Email Monitoring + +Triggers a script when an email arrives at the specified address. + +```basic +' Basic usage - trigger on any email to address +ON EMAIL "support@company.com" + email = GET LAST "email_received_events" + TALK "New email from " + email.from_address + ": " + email.subject +END ON + +' With FROM filter - only trigger for specific sender +ON EMAIL "orders@company.com" FROM "supplier@vendor.com" + ' Process supplier orders +END ON + +' With SUBJECT filter - only trigger for matching subjects +ON EMAIL "alerts@company.com" SUBJECT "URGENT" + ' Handle urgent alerts +END ON +``` + +**Database Tables:** +- `email_monitors` - Configuration for email monitoring +- `email_received_events` - Log of received emails to process + +**TriggerKind:** `EmailReceived = 5` + +### ON CHANGE - Folder Monitoring + +Triggers a script when files change in cloud storage folders (GDrive, OneDrive, Dropbox) or local filesystem. + +**Uses same `account://` syntax as COPY, MOVE, and other file operations.** + +```basic +' Using account:// syntax (recommended) - auto-detects provider from email +ON CHANGE "account://user@gmail.com/Documents/invoices" + file = GET LAST "folder_change_events" + TALK "File changed: " + file.file_name + " (" + file.event_type + ")" +END ON + +' OneDrive via account:// +ON CHANGE "account://user@outlook.com/Business/contracts" + ' Process OneDrive changes +END ON + +' Direct provider syntax (without account) +ON CHANGE "gdrive:///shared/reports" + ' Process Google Drive changes (requires USE ACCOUNT first) +END ON + +ON CHANGE "onedrive:///documents" + ' Process OneDrive changes +END ON + +ON CHANGE "dropbox:///team/assets" + ' Process Dropbox changes +END ON + +' Local filesystem monitoring +ON CHANGE "/var/uploads/incoming" + ' Process local filesystem changes +END ON + +' With specific event types filter +ON CHANGE "account://user@gmail.com/uploads" EVENTS "create, modify" + ' Only trigger on create and modify, ignore delete +END ON + +' Watch for deletions only +ON CHANGE "gdrive:///archive" EVENTS "delete" + ' Log when files are removed from archive +END ON +``` + +**Path Syntax:** +- `account://email@domain.com/path` - Uses connected account (auto-detects provider) +- `gdrive:///path` - Google Drive direct +- `onedrive:///path` - OneDrive direct +- `dropbox:///path` - Dropbox direct +- `/local/path` - Local filesystem + +**Provider Auto-Detection (from email):** +- `@gmail.com`, `@google.com` → Google Drive +- `@outlook.com`, `@hotmail.com`, `@live.com` → OneDrive +- Other emails → Default to Google Drive + +**Event Types:** +- `create` - New file created +- `modify` - File content changed +- `delete` - File deleted +- `rename` - File renamed +- `move` - File moved to different folder + +**Database Tables:** +- `folder_monitors` - Configuration for folder monitoring +- `folder_change_events` - Log of detected changes to process + +**TriggerKind:** `FolderChange = 6` + +### TriggerKind Enum Values + +```rust +pub enum TriggerKind { + Scheduled = 0, // SET SCHEDULER + TableUpdate = 1, // ON UPDATE OF "table" + TableInsert = 2, // ON INSERT OF "table" + TableDelete = 3, // ON DELETE OF "table" + Webhook = 4, // WEBHOOK + EmailReceived = 5, // ON EMAIL + FolderChange = 6, // ON CHANGE +} +``` \ No newline at end of file diff --git a/migrations/6.1.2_connected_accounts/down.sql b/migrations/6.1.2_connected_accounts/down.sql new file mode 100644 index 000000000..3aaf8064f --- /dev/null +++ b/migrations/6.1.2_connected_accounts/down.sql @@ -0,0 +1,20 @@ +DROP INDEX IF EXISTS idx_account_sync_items_unique; +DROP INDEX IF EXISTS idx_account_sync_items_embedding; +DROP INDEX IF EXISTS idx_account_sync_items_date; +DROP INDEX IF EXISTS idx_account_sync_items_type; +DROP INDEX IF EXISTS idx_account_sync_items_account; +DROP TABLE IF EXISTS account_sync_items; + +DROP INDEX IF EXISTS idx_session_account_assoc_unique; +DROP INDEX IF EXISTS idx_session_account_assoc_active; +DROP INDEX IF EXISTS idx_session_account_assoc_account; +DROP INDEX IF EXISTS idx_session_account_assoc_session; +DROP TABLE IF EXISTS session_account_associations; + +DROP INDEX IF EXISTS idx_connected_accounts_bot_email; +DROP INDEX IF EXISTS idx_connected_accounts_status; +DROP INDEX IF EXISTS idx_connected_accounts_provider; +DROP INDEX IF EXISTS idx_connected_accounts_email; +DROP INDEX IF EXISTS idx_connected_accounts_user_id; +DROP INDEX IF EXISTS idx_connected_accounts_bot_id; +DROP TABLE IF EXISTS connected_accounts; diff --git a/migrations/6.1.2_connected_accounts/up.sql b/migrations/6.1.2_connected_accounts/up.sql new file mode 100644 index 000000000..bfe125e72 --- /dev/null +++ b/migrations/6.1.2_connected_accounts/up.sql @@ -0,0 +1,72 @@ +CREATE TABLE IF NOT EXISTS connected_accounts ( + id UUID PRIMARY KEY, + bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE, + user_id UUID, + email TEXT NOT NULL, + provider TEXT NOT NULL, + account_type TEXT NOT NULL DEFAULT 'email', + access_token TEXT NOT NULL, + refresh_token TEXT, + token_expires_at TIMESTAMPTZ, + scopes TEXT, + status TEXT NOT NULL DEFAULT 'active', + sync_enabled BOOLEAN NOT NULL DEFAULT true, + sync_interval_seconds INTEGER NOT NULL DEFAULT 300, + last_sync_at TIMESTAMPTZ, + last_sync_status TEXT, + last_sync_error TEXT, + metadata_json TEXT DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_connected_accounts_bot_id ON connected_accounts(bot_id); +CREATE INDEX IF NOT EXISTS idx_connected_accounts_user_id ON connected_accounts(user_id); +CREATE INDEX IF NOT EXISTS idx_connected_accounts_email ON connected_accounts(email); +CREATE INDEX IF NOT EXISTS idx_connected_accounts_provider ON connected_accounts(provider); +CREATE INDEX IF NOT EXISTS idx_connected_accounts_status ON connected_accounts(status); +CREATE UNIQUE INDEX IF NOT EXISTS idx_connected_accounts_bot_email ON connected_accounts(bot_id, email); + +CREATE TABLE IF NOT EXISTS session_account_associations ( + id UUID PRIMARY KEY, + session_id UUID NOT NULL, + bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE, + account_id UUID NOT NULL REFERENCES connected_accounts(id) ON DELETE CASCADE, + email TEXT NOT NULL, + provider TEXT NOT NULL, + qdrant_collection TEXT NOT NULL, + is_active BOOLEAN NOT NULL DEFAULT true, + added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + added_by_tool TEXT +); + +CREATE INDEX IF NOT EXISTS idx_session_account_assoc_session ON session_account_associations(session_id); +CREATE INDEX IF NOT EXISTS idx_session_account_assoc_account ON session_account_associations(account_id); +CREATE INDEX IF NOT EXISTS idx_session_account_assoc_active ON session_account_associations(session_id, is_active); +CREATE UNIQUE INDEX IF NOT EXISTS idx_session_account_assoc_unique ON session_account_associations(session_id, account_id); + +CREATE TABLE IF NOT EXISTS account_sync_items ( + id UUID PRIMARY KEY, + account_id UUID NOT NULL REFERENCES connected_accounts(id) ON DELETE CASCADE, + item_type TEXT NOT NULL, + item_id TEXT NOT NULL, + subject TEXT, + content_preview TEXT, + sender TEXT, + recipients TEXT, + item_date TIMESTAMPTZ, + folder TEXT, + labels TEXT, + has_attachments BOOLEAN DEFAULT false, + qdrant_point_id TEXT, + embedding_status TEXT DEFAULT 'pending', + metadata_json TEXT DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_account_sync_items_account ON account_sync_items(account_id); +CREATE INDEX IF NOT EXISTS idx_account_sync_items_type ON account_sync_items(item_type); +CREATE INDEX IF NOT EXISTS idx_account_sync_items_date ON account_sync_items(item_date); +CREATE INDEX IF NOT EXISTS idx_account_sync_items_embedding ON account_sync_items(embedding_status); +CREATE UNIQUE INDEX IF NOT EXISTS idx_account_sync_items_unique ON account_sync_items(account_id, item_type, item_id); diff --git a/migrations/6.1.3_bot_hierarchy_monitors/down.sql b/migrations/6.1.3_bot_hierarchy_monitors/down.sql new file mode 100644 index 000000000..bae3b16b5 --- /dev/null +++ b/migrations/6.1.3_bot_hierarchy_monitors/down.sql @@ -0,0 +1,47 @@ +-- Drop comments first +COMMENT ON TABLE public.user_organizations IS NULL; +COMMENT ON TABLE public.email_received_events IS NULL; +COMMENT ON TABLE public.folder_change_events IS NULL; +COMMENT ON TABLE public.folder_monitors IS NULL; +COMMENT ON TABLE public.email_monitors IS NULL; +COMMENT ON COLUMN public.bots.inherit_parent_config IS NULL; +COMMENT ON COLUMN public.bots.enabled_tabs_json IS NULL; +COMMENT ON COLUMN public.bots.parent_bot_id IS NULL; +COMMENT ON TABLE public.system_automations IS NULL; + +-- Drop user organizations table +DROP INDEX IF EXISTS idx_user_orgs_default; +DROP INDEX IF EXISTS idx_user_orgs_org; +DROP INDEX IF EXISTS idx_user_orgs_user; +DROP TABLE IF EXISTS public.user_organizations; + +-- Drop email received events table +DROP INDEX IF EXISTS idx_email_events_received; +DROP INDEX IF EXISTS idx_email_events_processed; +DROP INDEX IF EXISTS idx_email_events_monitor; +DROP TABLE IF EXISTS public.email_received_events; + +-- Drop folder change events table +DROP INDEX IF EXISTS idx_folder_events_created; +DROP INDEX IF EXISTS idx_folder_events_processed; +DROP INDEX IF EXISTS idx_folder_events_monitor; +DROP TABLE IF EXISTS public.folder_change_events; + +-- Drop folder monitors table +DROP INDEX IF EXISTS idx_folder_monitors_account_email; +DROP INDEX IF EXISTS idx_folder_monitors_active; +DROP INDEX IF EXISTS idx_folder_monitors_provider; +DROP INDEX IF EXISTS idx_folder_monitors_bot_id; +DROP TABLE IF EXISTS public.folder_monitors; + +-- Drop email monitors table +DROP INDEX IF EXISTS idx_email_monitors_active; +DROP INDEX IF EXISTS idx_email_monitors_email; +DROP INDEX IF EXISTS idx_email_monitors_bot_id; +DROP TABLE IF EXISTS public.email_monitors; + +-- Remove bot hierarchy columns +DROP INDEX IF EXISTS idx_bots_parent_bot_id; +ALTER TABLE public.bots DROP COLUMN IF EXISTS inherit_parent_config; +ALTER TABLE public.bots DROP COLUMN IF EXISTS enabled_tabs_json; +ALTER TABLE public.bots DROP COLUMN IF EXISTS parent_bot_id; diff --git a/migrations/6.1.3_bot_hierarchy_monitors/up.sql b/migrations/6.1.3_bot_hierarchy_monitors/up.sql new file mode 100644 index 000000000..193888264 --- /dev/null +++ b/migrations/6.1.3_bot_hierarchy_monitors/up.sql @@ -0,0 +1,131 @@ +-- Bot Hierarchy: Add parent_bot_id to support sub-bots +ALTER TABLE public.bots +ADD COLUMN IF NOT EXISTS parent_bot_id UUID REFERENCES public.bots(id) ON DELETE SET NULL; + +-- Index for efficient hierarchy queries +CREATE INDEX IF NOT EXISTS idx_bots_parent_bot_id ON public.bots(parent_bot_id); + +-- Bot enabled tabs configuration (which UI tabs are enabled for this bot) +ALTER TABLE public.bots +ADD COLUMN IF NOT EXISTS enabled_tabs_json TEXT DEFAULT '["chat"]'; + +-- Bot configuration inheritance flag +ALTER TABLE public.bots +ADD COLUMN IF NOT EXISTS inherit_parent_config BOOLEAN DEFAULT true; + +-- Email monitoring table for ON EMAIL triggers +CREATE TABLE IF NOT EXISTS public.email_monitors ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + bot_id UUID NOT NULL REFERENCES public.bots(id) ON DELETE CASCADE, + email_address VARCHAR(500) NOT NULL, + script_path VARCHAR(1000) NOT NULL, + is_active BOOLEAN DEFAULT true, + last_check_at TIMESTAMPTZ, + last_uid BIGINT DEFAULT 0, + filter_from VARCHAR(500), + filter_subject VARCHAR(500), + created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL, + updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL, + CONSTRAINT unique_bot_email UNIQUE (bot_id, email_address) +); + +CREATE INDEX IF NOT EXISTS idx_email_monitors_bot_id ON public.email_monitors(bot_id); +CREATE INDEX IF NOT EXISTS idx_email_monitors_email ON public.email_monitors(email_address); +CREATE INDEX IF NOT EXISTS idx_email_monitors_active ON public.email_monitors(is_active) WHERE is_active = true; + +-- Folder monitoring table for ON CHANGE triggers (GDrive, OneDrive, Dropbox) +-- Uses account:// syntax: account://user@gmail.com/path or gdrive:///path +CREATE TABLE IF NOT EXISTS public.folder_monitors ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + bot_id UUID NOT NULL REFERENCES public.bots(id) ON DELETE CASCADE, + provider VARCHAR(50) NOT NULL, -- 'gdrive', 'onedrive', 'dropbox', 'local' + account_email VARCHAR(500), -- Email from account:// path (e.g., user@gmail.com) + folder_path VARCHAR(2000) NOT NULL, + folder_id VARCHAR(500), -- Provider-specific folder ID + script_path VARCHAR(1000) NOT NULL, + is_active BOOLEAN DEFAULT true, + watch_subfolders BOOLEAN DEFAULT true, + last_check_at TIMESTAMPTZ, + last_change_token VARCHAR(500), -- Provider-specific change token/page token + event_types_json TEXT DEFAULT '["create", "modify", "delete"]', + created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL, + updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL, + CONSTRAINT unique_bot_folder UNIQUE (bot_id, provider, folder_path) +); + +CREATE INDEX IF NOT EXISTS idx_folder_monitors_bot_id ON public.folder_monitors(bot_id); +CREATE INDEX IF NOT EXISTS idx_folder_monitors_provider ON public.folder_monitors(provider); +CREATE INDEX IF NOT EXISTS idx_folder_monitors_active ON public.folder_monitors(is_active) WHERE is_active = true; +CREATE INDEX IF NOT EXISTS idx_folder_monitors_account_email ON public.folder_monitors(account_email); + +-- Folder change events log +CREATE TABLE IF NOT EXISTS public.folder_change_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + monitor_id UUID NOT NULL REFERENCES public.folder_monitors(id) ON DELETE CASCADE, + event_type VARCHAR(50) NOT NULL, -- 'create', 'modify', 'delete', 'rename', 'move' + file_path VARCHAR(2000) NOT NULL, + file_id VARCHAR(500), + file_name VARCHAR(500), + file_size BIGINT, + mime_type VARCHAR(255), + old_path VARCHAR(2000), -- For rename/move events + processed BOOLEAN DEFAULT false, + processed_at TIMESTAMPTZ, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_folder_events_monitor ON public.folder_change_events(monitor_id); +CREATE INDEX IF NOT EXISTS idx_folder_events_processed ON public.folder_change_events(processed) WHERE processed = false; +CREATE INDEX IF NOT EXISTS idx_folder_events_created ON public.folder_change_events(created_at); + +-- Email received events log +CREATE TABLE IF NOT EXISTS public.email_received_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + monitor_id UUID NOT NULL REFERENCES public.email_monitors(id) ON DELETE CASCADE, + message_uid BIGINT NOT NULL, + message_id VARCHAR(500), + from_address VARCHAR(500) NOT NULL, + to_addresses_json TEXT, + subject VARCHAR(1000), + received_at TIMESTAMPTZ, + has_attachments BOOLEAN DEFAULT false, + attachments_json TEXT, + processed BOOLEAN DEFAULT false, + processed_at TIMESTAMPTZ, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_email_events_monitor ON public.email_received_events(monitor_id); +CREATE INDEX IF NOT EXISTS idx_email_events_processed ON public.email_received_events(processed) WHERE processed = false; +CREATE INDEX IF NOT EXISTS idx_email_events_received ON public.email_received_events(received_at); + +-- Add new trigger kinds to system_automations +-- TriggerKind enum: 0=Scheduled, 1=TableUpdate, 2=TableInsert, 3=TableDelete, 4=Webhook, 5=EmailReceived, 6=FolderChange +COMMENT ON TABLE public.system_automations IS 'System automations with TriggerKind: 0=Scheduled, 1=TableUpdate, 2=TableInsert, 3=TableDelete, 4=Webhook, 5=EmailReceived, 6=FolderChange'; + +-- User organization memberships (users can belong to multiple orgs) +CREATE TABLE IF NOT EXISTS public.user_organizations ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE, + org_id UUID NOT NULL REFERENCES public.organizations(org_id) ON DELETE CASCADE, + role VARCHAR(50) DEFAULT 'member', -- 'owner', 'admin', 'member', 'viewer' + is_default BOOLEAN DEFAULT false, + joined_at TIMESTAMPTZ DEFAULT NOW() NOT NULL, + CONSTRAINT unique_user_org UNIQUE (user_id, org_id) +); + +CREATE INDEX IF NOT EXISTS idx_user_orgs_user ON public.user_organizations(user_id); +CREATE INDEX IF NOT EXISTS idx_user_orgs_org ON public.user_organizations(org_id); +CREATE INDEX IF NOT EXISTS idx_user_orgs_default ON public.user_organizations(user_id, is_default) WHERE is_default = true; + +-- Comments for documentation +COMMENT ON COLUMN public.bots.parent_bot_id IS 'Parent bot ID for hierarchical bot structure. NULL means root bot.'; +COMMENT ON COLUMN public.bots.enabled_tabs_json IS 'JSON array of enabled UI tabs for this bot. Root bots have all tabs.'; +COMMENT ON COLUMN public.bots.inherit_parent_config IS 'If true, inherits config from parent bot for missing values.'; +COMMENT ON TABLE public.email_monitors IS 'Email monitoring configuration for ON EMAIL triggers.'; +COMMENT ON TABLE public.folder_monitors IS 'Folder monitoring configuration for ON CHANGE triggers (GDrive, OneDrive, Dropbox).'; +COMMENT ON TABLE public.folder_change_events IS 'Log of detected folder changes to be processed by scripts.'; +COMMENT ON TABLE public.email_received_events IS 'Log of received emails to be processed by scripts.'; +COMMENT ON TABLE public.user_organizations IS 'User membership in organizations with roles.'; diff --git a/scripts/utils/set-limits.sh b/scripts/utils/set-limits.sh index 215cca7fe..17bdf6d41 100644 --- a/scripts/utils/set-limits.sh +++ b/scripts/utils/set-limits.sh @@ -8,13 +8,13 @@ declare -A container_limits=( ["*doc-editor*"]="512MB:10ms/100ms" ["*proxy*"]="2048MB:100ms/100ms" ["*directory*"]="1024MB:50ms/100ms" - ["*drive*"]="4096MB:50ms/100ms" + ["*drive*"]="4096MB:100ms/100ms" ["*email*"]="4096MB:100ms/100ms" ["*webmail*"]="4096MB:100ms/100ms" - ["*bot*"]="4096MB:50ms/100ms" + ["*bot*"]="2048MB:50ms/100ms" ["*meeting*"]="4096MB:100ms/100ms" ["*alm*"]="512MB:50ms/100ms" - ["*alm-ci*"]="4096MB:100ms/100ms" + ["*alm-ci*"]="8192MB:100ms/100ms" ["*system*"]="4096MB:50ms/100ms" ["*mailer*"]="4096MB:25ms/100ms" ) diff --git a/src/basic/keywords/file_operations.rs b/src/basic/keywords/file_operations.rs index a7e090844..ea6da075f 100644 --- a/src/basic/keywords/file_operations.rs +++ b/src/basic/keywords/file_operations.rs @@ -28,6 +28,9 @@ | | \*****************************************************************************/ +use crate::basic::keywords::use_account::{ + get_account_credentials, is_account_path, parse_account_path, +}; use crate::shared::models::schema::bots::dsl::*; use crate::shared::models::UserSession; use crate::shared::state::AppState; @@ -1100,6 +1103,13 @@ async fn execute_copy( source: &str, destination: &str, ) -> Result<(), Box> { + let source_is_account = is_account_path(source); + let dest_is_account = is_account_path(destination); + + if source_is_account || dest_is_account { + return execute_copy_with_account(state, user, source, destination).await; + } + let client = state.drive.as_ref().ok_or("S3 client not configured")?; let bot_name: String = { @@ -1132,6 +1142,179 @@ async fn execute_copy( Ok(()) } +async fn execute_copy_with_account( + state: &AppState, + user: &UserSession, + source: &str, + destination: &str, +) -> Result<(), Box> { + let source_is_account = is_account_path(source); + let dest_is_account = is_account_path(destination); + + let content = if source_is_account { + let (email, path) = parse_account_path(source).ok_or("Invalid account:// path format")?; + let creds = get_account_credentials(&state.conn, &email, user.bot_id) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + download_from_account(&creds, &path).await? + } else { + read_from_local(state, user, source).await? + }; + + if dest_is_account { + let (email, path) = + parse_account_path(destination).ok_or("Invalid account:// path format")?; + let creds = get_account_credentials(&state.conn, &email, user.bot_id) + .await + .map_err(|e| format!("Failed to get credentials: {}", e))?; + upload_to_account(&creds, &path, &content).await?; + } else { + write_to_local(state, user, destination, &content).await?; + } + + trace!( + "COPY with account successful: {} -> {}", + source, + destination + ); + Ok(()) +} + +async fn download_from_account( + creds: &crate::basic::keywords::use_account::AccountCredentials, + path: &str, +) -> Result, Box> { + let client = reqwest::Client::new(); + + match creds.provider.as_str() { + "gmail" | "google" => { + let url = format!( + "https://www.googleapis.com/drive/v3/files/{}?alt=media", + urlencoding::encode(path) + ); + let resp = client + .get(&url) + .bearer_auth(&creds.access_token) + .send() + .await?; + if !resp.status().is_success() { + return Err(format!("Google Drive download failed: {}", resp.status()).into()); + } + Ok(resp.bytes().await?.to_vec()) + } + "outlook" | "microsoft" => { + let url = format!( + "https://graph.microsoft.com/v1.0/me/drive/root:/{}:/content", + urlencoding::encode(path) + ); + let resp = client + .get(&url) + .bearer_auth(&creds.access_token) + .send() + .await?; + if !resp.status().is_success() { + return Err(format!("OneDrive download failed: {}", resp.status()).into()); + } + Ok(resp.bytes().await?.to_vec()) + } + _ => Err(format!("Unsupported provider: {}", creds.provider).into()), + } +} + +async fn upload_to_account( + creds: &crate::basic::keywords::use_account::AccountCredentials, + path: &str, + content: &[u8], +) -> Result<(), Box> { + let client = reqwest::Client::new(); + + match creds.provider.as_str() { + "gmail" | "google" => { + let url = format!( + "https://www.googleapis.com/upload/drive/v3/files?uploadType=media&name={}", + urlencoding::encode(path) + ); + let resp = client + .post(&url) + .bearer_auth(&creds.access_token) + .body(content.to_vec()) + .send() + .await?; + if !resp.status().is_success() { + return Err(format!("Google Drive upload failed: {}", resp.status()).into()); + } + } + "outlook" | "microsoft" => { + let url = format!( + "https://graph.microsoft.com/v1.0/me/drive/root:/{}:/content", + urlencoding::encode(path) + ); + let resp = client + .put(&url) + .bearer_auth(&creds.access_token) + .body(content.to_vec()) + .send() + .await?; + if !resp.status().is_success() { + return Err(format!("OneDrive upload failed: {}", resp.status()).into()); + } + } + _ => return Err(format!("Unsupported provider: {}", creds.provider).into()), + } + Ok(()) +} + +async fn read_from_local( + state: &AppState, + user: &UserSession, + path: &str, +) -> Result, Box> { + let client = state.drive.as_ref().ok_or("S3 client not configured")?; + let bot_name: String = { + let mut db_conn = state.conn.get()?; + bots.filter(id.eq(&user.bot_id)) + .select(name) + .first(&mut *db_conn)? + }; + let bucket_name = format!("{}.gbai", bot_name); + let key = format!("{}.gbdrive/{}", bot_name, path); + + let result = client + .get_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await?; + let bytes = result.body.collect().await?.into_bytes(); + Ok(bytes.to_vec()) +} + +async fn write_to_local( + state: &AppState, + user: &UserSession, + path: &str, + content: &[u8], +) -> Result<(), Box> { + let client = state.drive.as_ref().ok_or("S3 client not configured")?; + let bot_name: String = { + let mut db_conn = state.conn.get()?; + bots.filter(id.eq(&user.bot_id)) + .select(name) + .first(&mut *db_conn)? + }; + let bucket_name = format!("{}.gbai", bot_name); + let key = format!("{}.gbdrive/{}", bot_name, path); + + client + .put_object() + .bucket(&bucket_name) + .key(&key) + .body(content.to_vec().into()) + .send() + .await?; + Ok(()) +} + /// Move/rename file within .gbdrive async fn execute_move( state: &AppState, diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index 9aeaf009a..7f44cdd24 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -46,6 +46,8 @@ pub mod messaging; pub mod model_routing; pub mod multimodal; pub mod on; +pub mod on_change; +pub mod on_email; pub mod on_form_submit; pub mod play; pub mod print; @@ -68,6 +70,7 @@ pub mod switch_case; pub mod table_definition; pub mod transfer_to_human; pub mod universal_messaging; +pub mod use_account; pub mod use_kb; pub mod use_tool; pub mod use_website; @@ -223,6 +226,7 @@ pub fn get_all_keywords() -> Vec { // Knowledge "CLEAR KB".to_string(), "USE KB".to_string(), + "USE ACCOUNT".to_string(), "USE WEBSITE".to_string(), // AI/LLM "LLM".to_string(), @@ -237,6 +241,8 @@ pub fn get_all_keywords() -> Vec { "TALK".to_string(), // Events "ON".to_string(), + "ON EMAIL".to_string(), + "ON CHANGE".to_string(), "SET SCHEDULE".to_string(), "WEBHOOK".to_string(), // Session @@ -383,5 +389,15 @@ pub fn get_keyword_categories() -> std::collections::HashMap ], ); + categories.insert( + "Monitors".to_string(), + vec![ + "ON EMAIL".to_string(), + "ON CHANGE".to_string(), + "SET SCHEDULE".to_string(), + "WEBHOOK".to_string(), + ], + ); + categories } diff --git a/src/basic/keywords/on_change.rs b/src/basic/keywords/on_change.rs new file mode 100644 index 000000000..d79d243b6 --- /dev/null +++ b/src/basic/keywords/on_change.rs @@ -0,0 +1,709 @@ +use diesel::prelude::*; +use log::{error, info, trace}; +use rhai::{Dynamic, Engine}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use uuid::Uuid; + +use crate::shared::models::TriggerKind; +use crate::shared::models::UserSession; +use crate::shared::state::AppState; + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum FolderProvider { + GDrive, + OneDrive, + Dropbox, + Local, +} + +impl FolderProvider { + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "gdrive" | "googledrive" | "google" => Some(Self::GDrive), + "onedrive" | "microsoft" | "ms" => Some(Self::OneDrive), + "dropbox" | "dbx" => Some(Self::Dropbox), + "local" | "filesystem" | "fs" => Some(Self::Local), + _ => None, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + Self::GDrive => "gdrive", + Self::OneDrive => "onedrive", + Self::Dropbox => "dropbox", + Self::Local => "local", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum ChangeEventType { + Create, + Modify, + Delete, + Rename, + Move, +} + +impl ChangeEventType { + pub fn as_str(&self) -> &'static str { + match self { + Self::Create => "create", + Self::Modify => "modify", + Self::Delete => "delete", + Self::Rename => "rename", + Self::Move => "move", + } + } + + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "create" | "created" | "new" => Some(Self::Create), + "modify" | "modified" | "change" | "changed" => Some(Self::Modify), + "delete" | "deleted" | "remove" | "removed" => Some(Self::Delete), + "rename" | "renamed" => Some(Self::Rename), + "move" | "moved" => Some(Self::Move), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FolderMonitor { + pub id: Uuid, + pub bot_id: Uuid, + pub provider: String, + pub account_email: Option, + pub folder_path: String, + pub folder_id: Option, + pub script_path: String, + pub is_active: bool, + pub watch_subfolders: bool, + pub event_types: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FolderChangeEvent { + pub id: Uuid, + pub monitor_id: Uuid, + pub event_type: String, + pub file_path: String, + pub file_id: Option, + pub file_name: Option, + pub file_size: Option, + pub mime_type: Option, + pub old_path: Option, +} + +pub fn parse_folder_path(path: &str) -> (FolderProvider, Option, String) { + if path.starts_with("account://") { + let rest = &path[10..]; + if let Some(slash_pos) = rest.find('/') { + let email = &rest[..slash_pos]; + let folder_path = &rest[slash_pos..]; + let provider = detect_provider_from_email(email); + return (provider, Some(email.to_string()), folder_path.to_string()); + } + } + + if path.starts_with("gdrive://") { + let folder_path = &path[9..]; + return (FolderProvider::GDrive, None, folder_path.to_string()); + } + + if path.starts_with("onedrive://") { + let folder_path = &path[11..]; + return (FolderProvider::OneDrive, None, folder_path.to_string()); + } + + if path.starts_with("dropbox://") { + let folder_path = &path[10..]; + return (FolderProvider::Dropbox, None, folder_path.to_string()); + } + + (FolderProvider::Local, None, path.to_string()) +} + +fn detect_provider_from_email(email: &str) -> FolderProvider { + let lower = email.to_lowercase(); + if lower.ends_with("@gmail.com") || lower.contains("google") { + FolderProvider::GDrive + } else if lower.ends_with("@outlook.com") + || lower.ends_with("@hotmail.com") + || lower.contains("microsoft") + { + FolderProvider::OneDrive + } else { + FolderProvider::GDrive + } +} + +pub fn is_cloud_path(path: &str) -> bool { + path.starts_with("account://") + || path.starts_with("gdrive://") + || path.starts_with("onedrive://") + || path.starts_with("dropbox://") +} + +pub fn on_change_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { + register_on_change_basic(state, user.clone(), engine); + register_on_change_with_events(state, user, engine); +} + +fn register_on_change_basic(state: &AppState, user: UserSession, engine: &mut Engine) { + let state_clone = state.clone(); + let bot_id = user.bot_id; + + engine + .register_custom_syntax( + &["ON", "CHANGE", "$string$"], + true, + move |context, inputs| { + let path = context + .eval_expression_tree(&inputs[0])? + .to_string() + .trim_matches('"') + .to_string(); + + let (provider, account_email, folder_path) = parse_folder_path(&path); + + trace!( + "ON CHANGE '{}' (provider: {}, account: {:?}) for bot: {}", + folder_path, + provider.as_str(), + account_email, + bot_id + ); + + let script_name = format!( + "on_change_{}.rhai", + sanitize_path_for_filename(&folder_path) + ); + + let mut conn = state_clone + .conn + .get() + .map_err(|e| format!("DB error: {}", e))?; + + let result = execute_on_change( + &mut *conn, + bot_id, + provider, + account_email.as_deref(), + &folder_path, + &script_name, + true, + vec!["create", "modify", "delete"], + ) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + info!( + "Folder monitor registered for '{}' ({}) on bot {}", + folder_path, + provider.as_str(), + bot_id + ); + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("Failed to register folder monitor".into()) + } + }, + ) + .unwrap(); +} + +fn register_on_change_with_events(state: &AppState, user: UserSession, engine: &mut Engine) { + let state_clone = state.clone(); + let bot_id = user.bot_id; + + engine + .register_custom_syntax( + &["ON", "CHANGE", "$string$", "EVENTS", "$expr$"], + true, + move |context, inputs| { + let path = context + .eval_expression_tree(&inputs[0])? + .to_string() + .trim_matches('"') + .to_string(); + + let events_value = context.eval_expression_tree(&inputs[1])?; + let events_str = events_value.to_string(); + let events: Vec<&str> = events_str + .trim_matches('"') + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect(); + + let (provider, account_email, folder_path) = parse_folder_path(&path); + + trace!( + "ON CHANGE '{}' EVENTS {:?} (provider: {}) for bot: {}", + folder_path, + events, + provider.as_str(), + bot_id + ); + + let script_name = format!( + "on_change_{}.rhai", + sanitize_path_for_filename(&folder_path) + ); + + let mut conn = state_clone + .conn + .get() + .map_err(|e| format!("DB error: {}", e))?; + + let result = execute_on_change( + &mut *conn, + bot_id, + provider, + account_email.as_deref(), + &folder_path, + &script_name, + true, + events, + ) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + info!( + "Folder monitor registered for '{}' with events {:?} on bot {}", + folder_path, events_str, bot_id + ); + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("Failed to register folder monitor".into()) + } + }, + ) + .unwrap(); +} + +fn sanitize_path_for_filename(path: &str) -> String { + path.replace('/', "_") + .replace('\\', "_") + .replace(':', "_") + .replace(' ', "_") + .replace('.', "_") + .chars() + .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-') + .collect::() + .to_lowercase() +} + +pub fn execute_on_change( + conn: &mut diesel::PgConnection, + bot_id: Uuid, + provider: FolderProvider, + account_email: Option<&str>, + folder_path: &str, + script_path: &str, + watch_subfolders: bool, + event_types: Vec<&str>, +) -> Result { + use crate::shared::models::system_automations; + + let target = match account_email { + Some(email) => format!("account://{}{}", email, folder_path), + None => format!("{}://{}", provider.as_str(), folder_path), + }; + + let new_automation = ( + system_automations::kind.eq(TriggerKind::FolderChange as i32), + system_automations::target.eq(&target), + system_automations::param.eq(script_path), + system_automations::bot_id.eq(bot_id), + ); + + let result = diesel::insert_into(system_automations::table) + .values(&new_automation) + .execute(conn) + .map_err(|e| { + error!("SQL execution error: {}", e); + e.to_string() + })?; + + let monitor_id = Uuid::new_v4(); + let events_json = serde_json::to_string(&event_types).unwrap_or_else(|_| "[]".to_string()); + let account_sql = account_email + .map(|e| format!("'{}'", e.replace('\'', "''"))) + .unwrap_or_else(|| "NULL".to_string()); + + let insert_sql = format!( + "INSERT INTO folder_monitors (id, bot_id, provider, folder_path, script_path, is_active, watch_subfolders, event_types_json) \ + VALUES ('{}', '{}', '{}', '{}', '{}', true, {}, '{}') \ + ON CONFLICT (bot_id, provider, folder_path) DO UPDATE SET \ + script_path = EXCLUDED.script_path, \ + watch_subfolders = EXCLUDED.watch_subfolders, \ + event_types_json = EXCLUDED.event_types_json, \ + is_active = true, \ + updated_at = NOW()", + monitor_id, + bot_id, + provider.as_str(), + folder_path.replace('\'', "''"), + script_path.replace('\'', "''"), + watch_subfolders, + events_json.replace('\'', "''") + ); + + diesel::sql_query(&insert_sql).execute(conn).map_err(|e| { + error!("Failed to insert folder monitor: {}", e); + e.to_string() + })?; + + Ok(json!({ + "command": "on_change", + "provider": provider.as_str(), + "account_email": account_sql, + "folder_path": folder_path, + "script_path": script_path, + "watch_subfolders": watch_subfolders, + "event_types": event_types, + "rows_affected": result + })) +} + +pub async fn check_folder_monitors( + state: &AppState, + bot_id: Uuid, +) -> Result, String> { + let mut conn = state.conn.get().map_err(|e| e.to_string())?; + + let monitors_sql = format!( + "SELECT id, bot_id, provider, folder_path, folder_id, script_path, \ + watch_subfolders, last_change_token, event_types_json \ + FROM folder_monitors WHERE bot_id = '{}' AND is_active = true", + bot_id + ); + + #[derive(QueryableByName)] + struct MonitorRow { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Uuid)] + bot_id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + provider: String, + #[diesel(sql_type = diesel::sql_types::Text)] + folder_path: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + folder_id: Option, + #[diesel(sql_type = diesel::sql_types::Text)] + script_path: String, + #[diesel(sql_type = diesel::sql_types::Bool)] + watch_subfolders: bool, + #[diesel(sql_type = diesel::sql_types::Nullable)] + last_change_token: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + event_types_json: Option, + } + + let monitors: Vec = diesel::sql_query(&monitors_sql) + .load(&mut *conn) + .map_err(|e| e.to_string())?; + + let mut events = Vec::new(); + + for monitor in monitors { + let event_types: Vec = monitor + .event_types_json + .as_ref() + .and_then(|j| serde_json::from_str(j).ok()) + .unwrap_or_else(|| { + vec![ + "create".to_string(), + "modify".to_string(), + "delete".to_string(), + ] + }); + + trace!( + "Checking folder monitor {} for {} on bot {} (provider: {}, events: {:?}, subfolders: {})", + monitor.id, + monitor.folder_path, + monitor.bot_id, + monitor.provider, + event_types, + monitor.watch_subfolders + ); + + let provider = FolderProvider::from_str(&monitor.provider).unwrap_or(FolderProvider::Local); + + let new_events = fetch_folder_changes( + state, + monitor.id, + provider, + &monitor.folder_path, + monitor.folder_id.as_deref(), + monitor.last_change_token.as_deref(), + monitor.watch_subfolders, + &event_types, + ) + .await?; + + for event in new_events { + events.push((event, monitor.script_path.clone())); + } + } + + Ok(events) +} + +async fn fetch_folder_changes( + _state: &AppState, + monitor_id: Uuid, + provider: FolderProvider, + folder_path: &str, + _folder_id: Option<&str>, + _last_token: Option<&str>, + _watch_subfolders: bool, + _event_types: &[String], +) -> Result, String> { + trace!( + "Fetching {} changes for monitor {} path {}", + provider.as_str(), + monitor_id, + folder_path + ); + Ok(Vec::new()) +} + +pub async fn process_folder_event( + state: &AppState, + event: &FolderChangeEvent, + script_path: &str, +) -> Result<(), String> { + info!( + "Processing folder event {} ({}) for {} with script {}", + event.id, event.event_type, event.file_path, script_path + ); + + let mut conn = state.conn.get().map_err(|e| e.to_string())?; + + let update_sql = format!( + "UPDATE folder_change_events SET processed = true, processed_at = NOW() WHERE id = '{}'", + event.id + ); + + diesel::sql_query(&update_sql) + .execute(&mut *conn) + .map_err(|e| e.to_string())?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_folder_path_account() { + let (provider, email, path) = + parse_folder_path("account://user@gmail.com/Documents/invoices"); + assert_eq!(provider, FolderProvider::GDrive); + assert_eq!(email, Some("user@gmail.com".to_string())); + assert_eq!(path, "/Documents/invoices"); + } + + #[test] + fn test_parse_folder_path_gdrive() { + let (provider, email, path) = parse_folder_path("gdrive:///shared/reports"); + assert_eq!(provider, FolderProvider::GDrive); + assert_eq!(email, None); + assert_eq!(path, "/shared/reports"); + } + + #[test] + fn test_parse_folder_path_onedrive() { + let (provider, email, path) = parse_folder_path("onedrive:///business/docs"); + assert_eq!(provider, FolderProvider::OneDrive); + assert_eq!(email, None); + assert_eq!(path, "/business/docs"); + } + + #[test] + fn test_parse_folder_path_dropbox() { + let (provider, email, path) = parse_folder_path("dropbox:///team/assets"); + assert_eq!(provider, FolderProvider::Dropbox); + assert_eq!(email, None); + assert_eq!(path, "/team/assets"); + } + + #[test] + fn test_parse_folder_path_local() { + let (provider, email, path) = parse_folder_path("/home/user/documents"); + assert_eq!(provider, FolderProvider::Local); + assert_eq!(email, None); + assert_eq!(path, "/home/user/documents"); + } + + #[test] + fn test_is_cloud_path() { + assert!(is_cloud_path("account://user@gmail.com/docs")); + assert!(is_cloud_path("gdrive:///shared")); + assert!(is_cloud_path("onedrive:///files")); + assert!(is_cloud_path("dropbox:///folder")); + assert!(!is_cloud_path("/local/path")); + assert!(!is_cloud_path("./relative/path")); + } + + #[test] + fn test_folder_provider_from_str() { + assert_eq!( + FolderProvider::from_str("gdrive"), + Some(FolderProvider::GDrive) + ); + assert_eq!( + FolderProvider::from_str("GDRIVE"), + Some(FolderProvider::GDrive) + ); + assert_eq!( + FolderProvider::from_str("googledrive"), + Some(FolderProvider::GDrive) + ); + assert_eq!( + FolderProvider::from_str("onedrive"), + Some(FolderProvider::OneDrive) + ); + assert_eq!( + FolderProvider::from_str("microsoft"), + Some(FolderProvider::OneDrive) + ); + assert_eq!( + FolderProvider::from_str("dropbox"), + Some(FolderProvider::Dropbox) + ); + assert_eq!( + FolderProvider::from_str("dbx"), + Some(FolderProvider::Dropbox) + ); + assert_eq!( + FolderProvider::from_str("local"), + Some(FolderProvider::Local) + ); + assert_eq!( + FolderProvider::from_str("filesystem"), + Some(FolderProvider::Local) + ); + assert_eq!(FolderProvider::from_str("unknown"), None); + } + + #[test] + fn test_change_event_type_from_str() { + assert_eq!( + ChangeEventType::from_str("create"), + Some(ChangeEventType::Create) + ); + assert_eq!( + ChangeEventType::from_str("created"), + Some(ChangeEventType::Create) + ); + assert_eq!( + ChangeEventType::from_str("modify"), + Some(ChangeEventType::Modify) + ); + assert_eq!( + ChangeEventType::from_str("changed"), + Some(ChangeEventType::Modify) + ); + assert_eq!( + ChangeEventType::from_str("delete"), + Some(ChangeEventType::Delete) + ); + assert_eq!( + ChangeEventType::from_str("removed"), + Some(ChangeEventType::Delete) + ); + assert_eq!( + ChangeEventType::from_str("rename"), + Some(ChangeEventType::Rename) + ); + assert_eq!( + ChangeEventType::from_str("move"), + Some(ChangeEventType::Move) + ); + assert_eq!(ChangeEventType::from_str("invalid"), None); + } + + #[test] + fn test_sanitize_path() { + assert_eq!( + sanitize_path_for_filename("/home/user/docs"), + "_home_user_docs" + ); + assert_eq!( + sanitize_path_for_filename("C:\\Users\\docs"), + "c__users_docs" + ); + assert_eq!( + sanitize_path_for_filename("path with spaces"), + "path_with_spaces" + ); + } + + #[test] + fn test_folder_monitor_struct() { + let monitor = FolderMonitor { + id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + provider: "gdrive".to_string(), + account_email: Some("user@gmail.com".to_string()), + folder_path: "/my/folder".to_string(), + folder_id: Some("folder123".to_string()), + script_path: "on_change.rhai".to_string(), + is_active: true, + watch_subfolders: true, + event_types: vec!["create".to_string(), "modify".to_string()], + }; + + assert_eq!(monitor.provider, "gdrive"); + assert!(monitor.is_active); + assert!(monitor.watch_subfolders); + assert_eq!(monitor.account_email, Some("user@gmail.com".to_string())); + } + + #[test] + fn test_folder_change_event_struct() { + let event = FolderChangeEvent { + id: Uuid::new_v4(), + monitor_id: Uuid::new_v4(), + event_type: "create".to_string(), + file_path: "/docs/new_file.pdf".to_string(), + file_id: Some("file123".to_string()), + file_name: Some("new_file.pdf".to_string()), + file_size: Some(1024), + mime_type: Some("application/pdf".to_string()), + old_path: None, + }; + + assert_eq!(event.event_type, "create"); + assert_eq!(event.file_size, Some(1024)); + } + + #[test] + fn test_detect_provider_from_email() { + assert_eq!( + detect_provider_from_email("user@gmail.com"), + FolderProvider::GDrive + ); + assert_eq!( + detect_provider_from_email("user@outlook.com"), + FolderProvider::OneDrive + ); + assert_eq!( + detect_provider_from_email("user@hotmail.com"), + FolderProvider::OneDrive + ); + assert_eq!( + detect_provider_from_email("user@company.com"), + FolderProvider::GDrive + ); + } +} diff --git a/src/basic/keywords/on_email.rs b/src/basic/keywords/on_email.rs new file mode 100644 index 000000000..29b16268b --- /dev/null +++ b/src/basic/keywords/on_email.rs @@ -0,0 +1,598 @@ +use diesel::prelude::*; +use log::{error, info, trace}; +use rhai::{Dynamic, Engine}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use uuid::Uuid; + +use crate::shared::models::TriggerKind; +use crate::shared::models::UserSession; +use crate::shared::state::AppState; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailMonitor { + pub id: Uuid, + pub bot_id: Uuid, + pub email_address: String, + pub script_path: String, + pub is_active: bool, + pub filter_from: Option, + pub filter_subject: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailReceivedEvent { + pub id: Uuid, + pub monitor_id: Uuid, + pub message_uid: i64, + pub message_id: Option, + pub from_address: String, + pub to_addresses: Vec, + pub subject: Option, + pub has_attachments: bool, + pub attachments: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmailAttachment { + pub filename: String, + pub mime_type: String, + pub size: i64, +} + +pub fn on_email_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { + register_on_email(state, user.clone(), engine); + register_on_email_from(state, user.clone(), engine); + register_on_email_subject(state, user, engine); +} + +fn register_on_email(state: &AppState, user: UserSession, engine: &mut Engine) { + let state_clone = state.clone(); + let bot_id = user.bot_id; + + engine + .register_custom_syntax( + &["ON", "EMAIL", "$string$"], + true, + move |context, inputs| { + let email_address = context + .eval_expression_tree(&inputs[0])? + .to_string() + .trim_matches('"') + .to_string(); + + trace!("ON EMAIL '{}' for bot: {}", email_address, bot_id); + + let script_name = format!( + "on_email_{}.rhai", + email_address.replace('@', "_at_").replace('.', "_") + ); + + let mut conn = state_clone + .conn + .get() + .map_err(|e| format!("DB error: {}", e))?; + + let result = + execute_on_email(&mut *conn, bot_id, &email_address, &script_name, None, None) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + info!( + "Email monitor registered for '{}' on bot {}", + email_address, bot_id + ); + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("Failed to register email monitor".into()) + } + }, + ) + .unwrap(); +} + +fn register_on_email_from(state: &AppState, user: UserSession, engine: &mut Engine) { + let state_clone = state.clone(); + let bot_id = user.bot_id; + + engine + .register_custom_syntax( + &["ON", "EMAIL", "$string$", "FROM", "$string$"], + true, + move |context, inputs| { + let email_address = context + .eval_expression_tree(&inputs[0])? + .to_string() + .trim_matches('"') + .to_string(); + + let filter_from = context + .eval_expression_tree(&inputs[1])? + .to_string() + .trim_matches('"') + .to_string(); + + trace!( + "ON EMAIL '{}' FROM '{}' for bot: {}", + email_address, + filter_from, + bot_id + ); + + let script_name = format!( + "on_email_{}_from_{}.rhai", + email_address.replace('@', "_at_").replace('.', "_"), + filter_from.replace('@', "_at_").replace('.', "_") + ); + + let mut conn = state_clone + .conn + .get() + .map_err(|e| format!("DB error: {}", e))?; + + let result = execute_on_email( + &mut *conn, + bot_id, + &email_address, + &script_name, + Some(&filter_from), + None, + ) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + info!( + "Email monitor registered for '{}' from '{}' on bot {}", + email_address, filter_from, bot_id + ); + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("Failed to register email monitor".into()) + } + }, + ) + .unwrap(); +} + +fn register_on_email_subject(state: &AppState, user: UserSession, engine: &mut Engine) { + let state_clone = state.clone(); + let bot_id = user.bot_id; + + engine + .register_custom_syntax( + &["ON", "EMAIL", "$string$", "SUBJECT", "$string$"], + true, + move |context, inputs| { + let email_address = context + .eval_expression_tree(&inputs[0])? + .to_string() + .trim_matches('"') + .to_string(); + + let filter_subject = context + .eval_expression_tree(&inputs[1])? + .to_string() + .trim_matches('"') + .to_string(); + + trace!( + "ON EMAIL '{}' SUBJECT '{}' for bot: {}", + email_address, + filter_subject, + bot_id + ); + + let script_name = format!( + "on_email_{}_subject.rhai", + email_address.replace('@', "_at_").replace('.', "_") + ); + + let mut conn = state_clone + .conn + .get() + .map_err(|e| format!("DB error: {}", e))?; + + let result = execute_on_email( + &mut *conn, + bot_id, + &email_address, + &script_name, + None, + Some(&filter_subject), + ) + .map_err(|e| format!("DB error: {}", e))?; + + if let Some(rows_affected) = result.get("rows_affected") { + info!( + "Email monitor registered for '{}' with subject filter '{}' on bot {}", + email_address, filter_subject, bot_id + ); + Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0))) + } else { + Err("Failed to register email monitor".into()) + } + }, + ) + .unwrap(); +} + +pub fn execute_on_email( + conn: &mut diesel::PgConnection, + bot_id: Uuid, + email_address: &str, + script_path: &str, + filter_from: Option<&str>, + filter_subject: Option<&str>, +) -> Result { + use crate::shared::models::system_automations; + + let new_automation = ( + system_automations::kind.eq(TriggerKind::EmailReceived as i32), + system_automations::target.eq(email_address), + system_automations::param.eq(script_path), + system_automations::bot_id.eq(bot_id), + ); + + let result = diesel::insert_into(system_automations::table) + .values(&new_automation) + .execute(conn) + .map_err(|e| { + error!("SQL execution error: {}", e); + e.to_string() + })?; + + let monitor_id = Uuid::new_v4(); + let insert_sql = format!( + "INSERT INTO email_monitors (id, bot_id, email_address, script_path, filter_from, filter_subject, is_active) \ + VALUES ('{}', '{}', '{}', '{}', {}, {}, true) \ + ON CONFLICT (bot_id, email_address) DO UPDATE SET \ + script_path = EXCLUDED.script_path, \ + filter_from = EXCLUDED.filter_from, \ + filter_subject = EXCLUDED.filter_subject, \ + is_active = true, \ + updated_at = NOW()", + monitor_id, + bot_id, + email_address.replace('\'', "''"), + script_path.replace('\'', "''"), + filter_from.map(|f| format!("'{}'", f.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()), + filter_subject.map(|s| format!("'{}'", s.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()) + ); + + diesel::sql_query(&insert_sql).execute(conn).map_err(|e| { + error!("Failed to insert email monitor: {}", e); + e.to_string() + })?; + + Ok(json!({ + "command": "on_email", + "email_address": email_address, + "script_path": script_path, + "filter_from": filter_from, + "filter_subject": filter_subject, + "rows_affected": result + })) +} + +pub async fn check_email_monitors( + state: &AppState, + bot_id: Uuid, +) -> Result, String> { + let mut conn = state.conn.get().map_err(|e| e.to_string())?; + + let monitors_sql = format!( + "SELECT id, bot_id, email_address, script_path, filter_from, filter_subject, last_uid \ + FROM email_monitors WHERE bot_id = '{}' AND is_active = true", + bot_id + ); + + #[derive(QueryableByName)] + struct MonitorRow { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Uuid)] + bot_id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + email_address: String, + #[diesel(sql_type = diesel::sql_types::Text)] + script_path: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + filter_from: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + filter_subject: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + last_uid: Option, + } + + let monitors: Vec = diesel::sql_query(&monitors_sql) + .load(&mut *conn) + .map_err(|e| e.to_string())?; + + let mut events = Vec::new(); + + for monitor in monitors { + trace!( + "Checking email monitor for {} on bot {} (last_uid: {:?})", + monitor.email_address, + monitor.bot_id, + monitor.last_uid + ); + + let new_events = fetch_new_emails( + state, + monitor.id, + &monitor.email_address, + monitor.last_uid.unwrap_or(0), + monitor.filter_from.as_deref(), + monitor.filter_subject.as_deref(), + ) + .await?; + + for event in new_events { + events.push((event, monitor.script_path.clone())); + } + } + + Ok(events) +} + +async fn fetch_new_emails( + _state: &AppState, + monitor_id: Uuid, + _email_address: &str, + _last_uid: i64, + _filter_from: Option<&str>, + _filter_subject: Option<&str>, +) -> Result, String> { + trace!("Fetching new emails for monitor {}", monitor_id); + Ok(Vec::new()) +} + +pub async fn process_email_event( + state: &AppState, + event: &EmailReceivedEvent, + script_path: &str, +) -> Result<(), String> { + info!( + "Processing email event {} from {} with script {}", + event.id, event.from_address, script_path + ); + + let mut conn = state.conn.get().map_err(|e| e.to_string())?; + + let update_sql = format!( + "UPDATE email_received_events SET processed = true, processed_at = NOW() WHERE id = '{}'", + event.id + ); + + diesel::sql_query(&update_sql) + .execute(&mut *conn) + .map_err(|e| e.to_string())?; + + Ok(()) +} + +pub fn parse_email_path(path: &str) -> Option<(String, Option)> { + if path.starts_with("email://") { + let rest = &path[8..]; + if let Some(slash_pos) = rest.find('/') { + let email = &rest[..slash_pos]; + let folder = &rest[slash_pos + 1..]; + return Some((email.to_string(), Some(folder.to_string()))); + } + return Some((rest.to_string(), None)); + } + None +} + +pub fn is_email_path(path: &str) -> bool { + path.starts_with("email://") +} + +pub fn sanitize_email_for_filename(email: &str) -> String { + email + .replace('@', "_at_") + .replace('.', "_") + .chars() + .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-') + .collect::() + .to_lowercase() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_email_monitor_struct() { + let monitor = EmailMonitor { + id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + email_address: "test@example.com".to_string(), + script_path: "on_email_test.rhai".to_string(), + is_active: true, + filter_from: None, + filter_subject: None, + }; + + assert_eq!(monitor.email_address, "test@example.com"); + assert!(monitor.is_active); + assert!(monitor.filter_from.is_none()); + assert!(monitor.filter_subject.is_none()); + } + + #[test] + fn test_email_monitor_with_filters() { + let monitor = EmailMonitor { + id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + email_address: "orders@company.com".to_string(), + script_path: "on_email_orders.rhai".to_string(), + is_active: true, + filter_from: Some("supplier@vendor.com".to_string()), + filter_subject: Some("Invoice".to_string()), + }; + + assert_eq!(monitor.email_address, "orders@company.com"); + assert_eq!(monitor.filter_from, Some("supplier@vendor.com".to_string())); + assert_eq!(monitor.filter_subject, Some("Invoice".to_string())); + } + + #[test] + fn test_email_attachment_struct() { + let attachment = EmailAttachment { + filename: "document.pdf".to_string(), + mime_type: "application/pdf".to_string(), + size: 1024, + }; + + assert_eq!(attachment.filename, "document.pdf"); + assert_eq!(attachment.mime_type, "application/pdf"); + assert_eq!(attachment.size, 1024); + } + + #[test] + fn test_email_received_event_struct() { + let event = EmailReceivedEvent { + id: Uuid::new_v4(), + monitor_id: Uuid::new_v4(), + message_uid: 12345, + message_id: Some("".to_string()), + from_address: "sender@example.com".to_string(), + to_addresses: vec!["recipient@example.com".to_string()], + subject: Some("Test Subject".to_string()), + has_attachments: true, + attachments: vec![EmailAttachment { + filename: "file.pdf".to_string(), + mime_type: "application/pdf".to_string(), + size: 2048, + }], + }; + + assert_eq!(event.message_uid, 12345); + assert_eq!(event.from_address, "sender@example.com"); + assert!(event.has_attachments); + assert_eq!(event.attachments.len(), 1); + assert_eq!(event.attachments[0].filename, "file.pdf"); + } + + #[test] + fn test_parse_email_path_basic() { + let result = parse_email_path("email://user@gmail.com"); + assert!(result.is_some()); + let (email, folder) = result.unwrap(); + assert_eq!(email, "user@gmail.com"); + assert!(folder.is_none()); + } + + #[test] + fn test_parse_email_path_with_folder() { + let result = parse_email_path("email://user@gmail.com/INBOX"); + assert!(result.is_some()); + let (email, folder) = result.unwrap(); + assert_eq!(email, "user@gmail.com"); + assert_eq!(folder, Some("INBOX".to_string())); + } + + #[test] + fn test_parse_email_path_invalid() { + assert!(parse_email_path("user@gmail.com").is_none()); + assert!(parse_email_path("mailto:user@gmail.com").is_none()); + assert!(parse_email_path("/local/path").is_none()); + } + + #[test] + fn test_is_email_path() { + assert!(is_email_path("email://user@gmail.com")); + assert!(is_email_path("email://user@company.com/INBOX")); + assert!(!is_email_path("user@gmail.com")); + assert!(!is_email_path("mailto:user@gmail.com")); + assert!(!is_email_path("account://user@gmail.com")); + } + + #[test] + fn test_sanitize_email_for_filename() { + assert_eq!( + sanitize_email_for_filename("user@gmail.com"), + "user_at_gmail_com" + ); + assert_eq!( + sanitize_email_for_filename("test.user@company.co.uk"), + "test_user_at_company_co_uk" + ); + assert_eq!( + sanitize_email_for_filename("USER@EXAMPLE.COM"), + "user_at_example_com" + ); + } + + #[test] + fn test_email_event_without_attachments() { + let event = EmailReceivedEvent { + id: Uuid::new_v4(), + monitor_id: Uuid::new_v4(), + message_uid: 1, + message_id: None, + from_address: "no-reply@system.com".to_string(), + to_addresses: vec![], + subject: None, + has_attachments: false, + attachments: vec![], + }; + + assert!(!event.has_attachments); + assert!(event.attachments.is_empty()); + assert!(event.subject.is_none()); + } + + #[test] + fn test_multiple_to_addresses() { + let event = EmailReceivedEvent { + id: Uuid::new_v4(), + monitor_id: Uuid::new_v4(), + message_uid: 999, + message_id: Some("".to_string()), + from_address: "sender@example.com".to_string(), + to_addresses: vec![ + "user1@example.com".to_string(), + "user2@example.com".to_string(), + "user3@example.com".to_string(), + ], + subject: Some("Group Message".to_string()), + has_attachments: false, + attachments: vec![], + }; + + assert_eq!(event.to_addresses.len(), 3); + assert!(event + .to_addresses + .contains(&"user2@example.com".to_string())); + } + + #[test] + fn test_multiple_attachments() { + let attachments = vec![ + EmailAttachment { + filename: "doc1.pdf".to_string(), + mime_type: "application/pdf".to_string(), + size: 1024, + }, + EmailAttachment { + filename: "image.png".to_string(), + mime_type: "image/png".to_string(), + size: 2048, + }, + EmailAttachment { + filename: "data.xlsx".to_string(), + mime_type: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + .to_string(), + size: 4096, + }, + ]; + + assert_eq!(attachments.len(), 3); + assert_eq!(attachments[0].filename, "doc1.pdf"); + assert_eq!(attachments[1].mime_type, "image/png"); + assert_eq!(attachments[2].size, 4096); + } +} diff --git a/src/basic/keywords/send_mail.rs b/src/basic/keywords/send_mail.rs index d9f2bfef8..4300e8e5c 100644 --- a/src/basic/keywords/send_mail.rs +++ b/src/basic/keywords/send_mail.rs @@ -1,8 +1,9 @@ +use crate::basic::keywords::use_account::{get_account_credentials, AccountCredentials}; use crate::shared::models::UserSession; use crate::shared::state::AppState; use chrono::Utc; use diesel::prelude::*; -use log::{error, trace}; +use log::{error, info, trace}; use rhai::{Dynamic, Engine}; use serde_json::json; use std::sync::Arc; @@ -25,7 +26,6 @@ pub fn send_mail_keyword(state: Arc, user: UserSession, engine: &mut E let body = context.eval_expression_tree(&inputs[2])?.to_string(); let attachments_input = context.eval_expression_tree(&inputs[3])?; - // Parse attachments array let mut attachments = Vec::new(); if attachments_input.is_array() { let arr = attachments_input.cast::(); @@ -64,6 +64,7 @@ pub fn send_mail_keyword(state: Arc, user: UserSession, engine: &mut E &subject, &body, attachments, + None, ) .await }); @@ -99,6 +100,83 @@ pub fn send_mail_keyword(state: Arc, user: UserSession, engine: &mut E ) .unwrap(); + // SEND MAIL to, subject, body USING account + let state_clone2 = Arc::clone(&state); + let user_clone2 = user.clone(); + + engine + .register_custom_syntax( + &[ + "SEND", "MAIL", "$expr$", ",", "$expr$", ",", "$expr$", "USING", "$expr$", + ], + false, + move |context, inputs| { + let to = context.eval_expression_tree(&inputs[0])?.to_string(); + let subject = context.eval_expression_tree(&inputs[1])?.to_string(); + let body = context.eval_expression_tree(&inputs[2])?.to_string(); + let using_account = context.eval_expression_tree(&inputs[3])?.to_string(); + + info!( + "SEND MAIL USING: to={}, subject={}, using={} for user={}", + to, subject, using_account, user_clone2.user_id + ); + + let state_for_task = Arc::clone(&state_clone2); + let user_for_task = user_clone2.clone(); + + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(rt) = rt { + let result = rt.block_on(async move { + execute_send_mail( + &state_for_task, + &user_for_task, + &to, + &subject, + &body, + vec![], + Some(using_account), + ) + .await + }); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".to_string())) + .err() + }; + + if send_err.is_some() { + error!("Failed to send SEND MAIL USING result from thread"); + } + }); + + match rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(Ok(message_id)) => Ok(Dynamic::from(message_id)), + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND MAIL USING failed: {}", e).into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "SEND MAIL USING timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("SEND MAIL USING thread failed: {}", e).into(), + rhai::Position::NONE, + ))), + } + }, + ) + .unwrap(); + // Register SEND TEMPLATE for bulk templated emails let state_clone2 = Arc::clone(&state); let user_clone2 = user.clone(); @@ -194,13 +272,20 @@ async fn execute_send_mail( subject: &str, body: &str, attachments: Vec, + using_account: Option, ) -> Result { let message_id = Uuid::new_v4().to_string(); - // Track email in communication history track_email(state, user, &message_id, to, subject, "sent").await?; - // Send the actual email if email feature is enabled + if let Some(account_email) = using_account { + let creds = get_account_credentials(&state.conn, &account_email, user.bot_id) + .await + .map_err(|e| format!("Failed to get account credentials: {}", e))?; + + return send_via_connected_account(state, &creds, to, subject, body, attachments).await; + } + #[cfg(feature = "email")] { use crate::email::EmailService; @@ -225,12 +310,114 @@ async fn execute_send_mail( } } - // Fallback: store as draft if email sending fails save_email_draft(state, user, to, subject, body, attachments).await?; Ok(format!("Email saved as draft: {}", message_id)) } +async fn send_via_connected_account( + state: &AppState, + creds: &AccountCredentials, + to: &str, + subject: &str, + body: &str, + _attachments: Vec, +) -> Result { + let message_id = Uuid::new_v4().to_string(); + + match creds.provider.as_str() { + "gmail" | "google" => { + send_via_gmail(state, creds, to, subject, body).await?; + } + "outlook" | "microsoft" | "hotmail" => { + send_via_outlook(state, creds, to, subject, body).await?; + } + _ => { + return Err(format!("Unsupported email provider: {}", creds.provider)); + } + } + + info!("Email sent via {} account: {}", creds.provider, message_id); + Ok(format!("Email sent via {}: {}", creds.provider, message_id)) +} + +async fn send_via_gmail( + _state: &AppState, + creds: &AccountCredentials, + to: &str, + subject: &str, + body: &str, +) -> Result<(), String> { + let client = reqwest::Client::new(); + + let raw_message = format!( + "To: {}\r\nSubject: {}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n{}", + to, subject, body + ); + let encoded = base64::Engine::encode( + &base64::engine::general_purpose::URL_SAFE, + raw_message.as_bytes(), + ); + + let response = client + .post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send") + .bearer_auth(&creds.access_token) + .json(&serde_json::json!({ "raw": encoded })) + .send() + .await + .map_err(|e| format!("Gmail API request failed: {}", e))?; + + if !response.status().is_success() { + let error_text = response.text().await.unwrap_or_default(); + return Err(format!("Gmail API error: {}", error_text)); + } + + Ok(()) +} + +async fn send_via_outlook( + _state: &AppState, + creds: &AccountCredentials, + to: &str, + subject: &str, + body: &str, +) -> Result<(), String> { + let client = reqwest::Client::new(); + + let message = serde_json::json!({ + "message": { + "subject": subject, + "body": { + "contentType": "HTML", + "content": body + }, + "toRecipients": [ + { + "emailAddress": { + "address": to + } + } + ] + }, + "saveToSentItems": "true" + }); + + let response = client + .post("https://graph.microsoft.com/v1.0/me/sendMail") + .bearer_auth(&creds.access_token) + .json(&message) + .send() + .await + .map_err(|e| format!("Microsoft Graph request failed: {}", e))?; + + if !response.status().is_success() { + let error_text = response.text().await.unwrap_or_default(); + return Err(format!("Microsoft Graph error: {}", error_text)); + } + + Ok(()) +} + async fn execute_send_template( state: &AppState, user: &UserSession, @@ -243,15 +430,12 @@ async fn execute_send_template( let mut sent_count = 0; for recipient in recipients { - // Personalize template for each recipient let personalized_content = apply_template_variables(&template_content, &variables, &recipient)?; - // Extract subject from template or use default let subject = extract_template_subject(&personalized_content) .unwrap_or_else(|| format!("Message from {}", user.user_id)); - // Send email if let Ok(_) = execute_send_mail( state, user, @@ -259,13 +443,13 @@ async fn execute_send_template( &subject, &personalized_content, vec![], + None, ) .await { sent_count += 1; } - // Add small delay to avoid rate limiting tokio::time::sleep(std::time::Duration::from_millis(100)).await; } diff --git a/src/basic/keywords/use_account.rs b/src/basic/keywords/use_account.rs new file mode 100644 index 000000000..d4b2a12aa --- /dev/null +++ b/src/basic/keywords/use_account.rs @@ -0,0 +1,253 @@ +use crate::shared::models::UserSession; +use crate::shared::state::AppState; +use diesel::prelude::*; +use log::{error, info}; +use rhai::{Dynamic, Engine, EvalAltResult}; +use std::sync::Arc; +use uuid::Uuid; + +#[derive(QueryableByName)] +#[allow(dead_code)] +struct AccountResult { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + email: String, + #[diesel(sql_type = diesel::sql_types::Text)] + provider: String, + #[diesel(sql_type = diesel::sql_types::Text)] + account_type: String, +} + +#[derive(QueryableByName, Debug, Clone)] +pub struct ActiveAccountResult { + #[diesel(sql_type = diesel::sql_types::Uuid)] + pub account_id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + pub email: String, + #[diesel(sql_type = diesel::sql_types::Text)] + pub provider: String, + #[diesel(sql_type = diesel::sql_types::Text)] + pub qdrant_collection: String, +} + +pub fn register_use_account_keyword( + engine: &mut Engine, + state: Arc, + session: Arc, +) -> Result<(), Box> { + let state_clone = Arc::clone(&state); + let session_clone = Arc::clone(&session); + + engine.register_custom_syntax( + &["USE", "ACCOUNT", "$expr$"], + true, + move |context, inputs| { + let email = context.eval_expression_tree(&inputs[0])?.to_string(); + + info!( + "USE ACCOUNT keyword executed - Email: {}, Session: {}", + email, session_clone.id + ); + + let session_id = session_clone.id; + let bot_id = session_clone.bot_id; + let user_id = session_clone.user_id; + let conn = state_clone.conn.clone(); + let email_clone = email.clone(); + + let result = std::thread::spawn(move || { + add_account_to_session(conn, session_id, bot_id, user_id, &email_clone) + }) + .join(); + + match result { + Ok(Ok(_)) => { + info!("Account '{}' added to session {}", email, session_clone.id); + Ok(Dynamic::UNIT) + } + Ok(Err(e)) => { + error!("Failed to add account '{}': {}", email, e); + Err(format!("USE_ACCOUNT failed: {}", e).into()) + } + Err(e) => { + error!("Thread panic in USE_ACCOUNT: {:?}", e); + Err("USE_ACCOUNT failed: thread panic".into()) + } + } + }, + )?; + + Ok(()) +} + +fn add_account_to_session( + conn_pool: crate::shared::utils::DbPool, + session_id: Uuid, + bot_id: Uuid, + user_id: Uuid, + email: &str, +) -> Result<(), String> { + let mut conn = conn_pool + .get() + .map_err(|e| format!("Failed to get DB connection: {}", e))?; + + let account: Option = diesel::sql_query( + "SELECT id, email, provider, account_type FROM connected_accounts + WHERE email = $1 AND (bot_id = $2 OR user_id = $3) AND status = 'active'", + ) + .bind::(email) + .bind::(bot_id) + .bind::(user_id) + .get_result(&mut conn) + .optional() + .map_err(|e| format!("Failed to query account: {}", e))?; + + let account = match account { + Some(acc) => acc, + None => { + return Err(format!( + "Account '{}' not found or not configured. Add it in Sources app.", + email + )); + } + }; + + let qdrant_collection = format!("account_{}_{}", account.provider, account.id); + + let assoc_id = Uuid::new_v4(); + diesel::sql_query( + "INSERT INTO session_account_associations + (id, session_id, bot_id, account_id, email, provider, qdrant_collection, is_active) + VALUES ($1, $2, $3, $4, $5, $6, $7, true) + ON CONFLICT (session_id, account_id) + DO UPDATE SET is_active = true, added_at = NOW()", + ) + .bind::(assoc_id) + .bind::(session_id) + .bind::(bot_id) + .bind::(account.id) + .bind::(&account.email) + .bind::(&account.provider) + .bind::(&qdrant_collection) + .execute(&mut conn) + .map_err(|e| format!("Failed to add account association: {}", e))?; + + info!( + "Added account '{}' ({}) to session {} (collection: {})", + email, account.provider, session_id, qdrant_collection + ); + + Ok(()) +} + +pub fn get_active_accounts_for_session( + conn_pool: &crate::shared::utils::DbPool, + session_id: Uuid, +) -> Result, String> { + let mut conn = conn_pool + .get() + .map_err(|e| format!("Failed to get DB connection: {}", e))?; + + let results: Vec = diesel::sql_query( + "SELECT account_id, email, provider, qdrant_collection + FROM session_account_associations + WHERE session_id = $1 AND is_active = true + ORDER BY added_at DESC", + ) + .bind::(session_id) + .load(&mut conn) + .map_err(|e| format!("Failed to get active accounts: {}", e))?; + + Ok(results) +} + +pub fn parse_account_path(path: &str) -> Option<(String, String)> { + if path.starts_with("account://") { + let rest = &path[10..]; + if let Some(slash_pos) = rest.find('/') { + let email = &rest[..slash_pos]; + let file_path = &rest[slash_pos + 1..]; + return Some((email.to_string(), file_path.to_string())); + } + } + None +} + +pub fn is_account_path(path: &str) -> bool { + path.starts_with("account://") +} + +pub async fn get_account_credentials( + conn_pool: &crate::shared::utils::DbPool, + email: &str, + bot_id: Uuid, +) -> Result { + let mut conn = conn_pool + .get() + .map_err(|e| format!("Failed to get DB connection: {}", e))?; + + #[derive(QueryableByName)] + struct CredResult { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: Uuid, + #[diesel(sql_type = diesel::sql_types::Text)] + provider: String, + #[diesel(sql_type = diesel::sql_types::Text)] + access_token: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + refresh_token: Option, + } + + let creds: CredResult = diesel::sql_query( + "SELECT id, provider, access_token, refresh_token + FROM connected_accounts + WHERE email = $1 AND bot_id = $2 AND status = 'active'", + ) + .bind::(email) + .bind::(bot_id) + .get_result(&mut conn) + .map_err(|e| format!("Account not found: {}", e))?; + + Ok(AccountCredentials { + account_id: creds.id, + provider: creds.provider, + access_token: creds.access_token, + refresh_token: creds.refresh_token, + }) +} + +#[derive(Debug, Clone)] +pub struct AccountCredentials { + pub account_id: Uuid, + pub provider: String, + pub access_token: String, + pub refresh_token: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_account_path() { + let result = parse_account_path("account://user@gmail.com/Documents/file.pdf"); + assert!(result.is_some()); + let (email, path) = result.unwrap(); + assert_eq!(email, "user@gmail.com"); + assert_eq!(path, "Documents/file.pdf"); + } + + #[test] + fn test_parse_account_path_invalid() { + assert!(parse_account_path("local/file.pdf").is_none()); + assert!(parse_account_path("/absolute/path").is_none()); + } + + #[test] + fn test_is_account_path() { + assert!(is_account_path("account://user@gmail.com/file.pdf")); + assert!(!is_account_path("local/file.pdf")); + assert!(!is_account_path("file.pdf")); + } +} diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 6f66e994c..eb059f1cd 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -62,6 +62,8 @@ use self::keywords::webhook::webhook_keyword; use self::keywords::llm_keyword::llm_keyword; use self::keywords::on::on_keyword; +use self::keywords::on_change::on_change_keyword; +use self::keywords::on_email::on_email_keyword; use self::keywords::print::print_keyword; use self::keywords::set::set_keyword; use self::keywords::set_context::set_context_keyword; @@ -98,6 +100,8 @@ impl ScriptService { wait_keyword(&state, user.clone(), &mut engine); print_keyword(&state, user.clone(), &mut engine); on_keyword(&state, user.clone(), &mut engine); + on_email_keyword(&state, user.clone(), &mut engine); + on_change_keyword(&state, user.clone(), &mut engine); hear_keyword(state.clone(), user.clone(), &mut engine); talk_keyword(state.clone(), user.clone(), &mut engine); set_context_keyword(state.clone(), user.clone(), &mut engine); diff --git a/src/core/shared/models.rs b/src/core/shared/models.rs index 129c6c457..71c6a60ae 100644 --- a/src/core/shared/models.rs +++ b/src/core/shared/models.rs @@ -60,6 +60,8 @@ pub enum TriggerKind { TableInsert = 2, TableDelete = 3, Webhook = 4, + EmailReceived = 5, + FolderChange = 6, } impl TriggerKind { @@ -70,6 +72,8 @@ impl TriggerKind { 2 => Some(Self::TableInsert), 3 => Some(Self::TableDelete), 4 => Some(Self::Webhook), + 5 => Some(Self::EmailReceived), + 6 => Some(Self::FolderChange), _ => None, } } @@ -271,7 +275,24 @@ mod tests { #[test] fn test_trigger_kind_conversion() { assert_eq!(TriggerKind::from_i32(0), Some(TriggerKind::Scheduled)); + assert_eq!(TriggerKind::from_i32(1), Some(TriggerKind::TableUpdate)); + assert_eq!(TriggerKind::from_i32(2), Some(TriggerKind::TableInsert)); + assert_eq!(TriggerKind::from_i32(3), Some(TriggerKind::TableDelete)); assert_eq!(TriggerKind::from_i32(4), Some(TriggerKind::Webhook)); + assert_eq!(TriggerKind::from_i32(5), Some(TriggerKind::EmailReceived)); + assert_eq!(TriggerKind::from_i32(6), Some(TriggerKind::FolderChange)); assert_eq!(TriggerKind::from_i32(99), None); + assert_eq!(TriggerKind::from_i32(-1), None); + } + + #[test] + fn test_trigger_kind_as_i32() { + assert_eq!(TriggerKind::Scheduled as i32, 0); + assert_eq!(TriggerKind::TableUpdate as i32, 1); + assert_eq!(TriggerKind::TableInsert as i32, 2); + assert_eq!(TriggerKind::TableDelete as i32, 3); + assert_eq!(TriggerKind::Webhook as i32, 4); + assert_eq!(TriggerKind::EmailReceived as i32, 5); + assert_eq!(TriggerKind::FolderChange as i32, 6); } }