feat: add ON EMAIL and ON CHANGE keywords for event-driven monitoring

- Add ON EMAIL keyword with FROM/SUBJECT filters
- Add ON CHANGE keyword with account:// syntax (gdrive, onedrive, dropbox, local)
- Add TriggerKind::EmailReceived (5) and FolderChange (6)
- Add migration 6.1.3_bot_hierarchy_monitors with:
  - email_monitors, folder_monitors tables
  - email_received_events, folder_change_events tables
  - user_organizations table
  - Bot hierarchy: parent_bot_id, enabled_tabs_json, inherit_parent_config
- Add 26 unit tests (12 on_email, 12 on_change, 2 trigger_kind)
- Update PROMPT.md with weekly maintenance checklist
- Zero warnings, zero errors
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-18 16:17:58 -03:00
parent ff73f0b4c6
commit 57bf5b8754
14 changed files with 2410 additions and 13 deletions

161
PROMPT.md
View file

@ -5,6 +5,45 @@
---
## Weekly Maintenance - EVERY MONDAY
### Package Review Checklist
**Every Monday, review the following:**
1. **Dependency Updates**
```bash
cargo outdated
cargo audit
```
2. **Package Consolidation Opportunities**
- Check if new crates can replace custom code
- Look for crates that combine multiple dependencies
- Review `Cargo.toml` for redundant dependencies
3. **Code Reduction Candidates**
- Custom implementations that now have crate equivalents
- Boilerplate that can be replaced with derive macros
- Manual serialization that `serde` can handle
4. **Security Updates**
```bash
cargo audit fix
```
### Packages to Watch
| Area | Potential Packages | Purpose |
|------|-------------------|---------|
| Validation | `validator` | Replace manual validation |
| Date/Time | `chrono`, `time` | Consolidate time handling |
| Email | `lettre` | Simplify email sending |
| File Watching | `notify` | Replace polling with events |
| Background Jobs | `tokio-cron-scheduler` | Simplify scheduling |
---
## Version Management - CRITICAL
**Current version is 6.1.0 - DO NOT CHANGE without explicit approval!**
@ -498,4 +537,124 @@ src/shared/
- **Version**: Always 6.1.0 - do not change without approval
- **Migrations**: TABLES AND INDEXES ONLY - no views, triggers, functions
- **Stalwart**: Use Stalwart IMAP/JMAP API for email features (sieve, filters, etc.)
- **JSON**: Use TEXT columns with `_json` suffix, not JSONB
- **JSON**: Use TEXT columns with `_json` suffix, not JSONB
---
## Monitor Keywords (ON EMAIL, ON CHANGE)
These keywords register event-driven monitors similar to `SET SCHEDULER`, but triggered by external events.
### ON EMAIL - Email Monitoring
Triggers a script when an email arrives at the specified address.
```basic
' Basic usage - trigger on any email to address
ON EMAIL "support@company.com"
email = GET LAST "email_received_events"
TALK "New email from " + email.from_address + ": " + email.subject
END ON
' With FROM filter - only trigger for specific sender
ON EMAIL "orders@company.com" FROM "supplier@vendor.com"
' Process supplier orders
END ON
' With SUBJECT filter - only trigger for matching subjects
ON EMAIL "alerts@company.com" SUBJECT "URGENT"
' Handle urgent alerts
END ON
```
**Database Tables:**
- `email_monitors` - Configuration for email monitoring
- `email_received_events` - Log of received emails to process
**TriggerKind:** `EmailReceived = 5`
### ON CHANGE - Folder Monitoring
Triggers a script when files change in cloud storage folders (GDrive, OneDrive, Dropbox) or local filesystem.
**Uses same `account://` syntax as COPY, MOVE, and other file operations.**
```basic
' Using account:// syntax (recommended) - auto-detects provider from email
ON CHANGE "account://user@gmail.com/Documents/invoices"
file = GET LAST "folder_change_events"
TALK "File changed: " + file.file_name + " (" + file.event_type + ")"
END ON
' OneDrive via account://
ON CHANGE "account://user@outlook.com/Business/contracts"
' Process OneDrive changes
END ON
' Direct provider syntax (without account)
ON CHANGE "gdrive:///shared/reports"
' Process Google Drive changes (requires USE ACCOUNT first)
END ON
ON CHANGE "onedrive:///documents"
' Process OneDrive changes
END ON
ON CHANGE "dropbox:///team/assets"
' Process Dropbox changes
END ON
' Local filesystem monitoring
ON CHANGE "/var/uploads/incoming"
' Process local filesystem changes
END ON
' With specific event types filter
ON CHANGE "account://user@gmail.com/uploads" EVENTS "create, modify"
' Only trigger on create and modify, ignore delete
END ON
' Watch for deletions only
ON CHANGE "gdrive:///archive" EVENTS "delete"
' Log when files are removed from archive
END ON
```
**Path Syntax:**
- `account://email@domain.com/path` - Uses connected account (auto-detects provider)
- `gdrive:///path` - Google Drive direct
- `onedrive:///path` - OneDrive direct
- `dropbox:///path` - Dropbox direct
- `/local/path` - Local filesystem
**Provider Auto-Detection (from email):**
- `@gmail.com`, `@google.com` → Google Drive
- `@outlook.com`, `@hotmail.com`, `@live.com` → OneDrive
- Other emails → Default to Google Drive
**Event Types:**
- `create` - New file created
- `modify` - File content changed
- `delete` - File deleted
- `rename` - File renamed
- `move` - File moved to different folder
**Database Tables:**
- `folder_monitors` - Configuration for folder monitoring
- `folder_change_events` - Log of detected changes to process
**TriggerKind:** `FolderChange = 6`
### TriggerKind Enum Values
```rust
pub enum TriggerKind {
Scheduled = 0, // SET SCHEDULER
TableUpdate = 1, // ON UPDATE OF "table"
TableInsert = 2, // ON INSERT OF "table"
TableDelete = 3, // ON DELETE OF "table"
Webhook = 4, // WEBHOOK
EmailReceived = 5, // ON EMAIL
FolderChange = 6, // ON CHANGE
}
```

View file

@ -0,0 +1,20 @@
DROP INDEX IF EXISTS idx_account_sync_items_unique;
DROP INDEX IF EXISTS idx_account_sync_items_embedding;
DROP INDEX IF EXISTS idx_account_sync_items_date;
DROP INDEX IF EXISTS idx_account_sync_items_type;
DROP INDEX IF EXISTS idx_account_sync_items_account;
DROP TABLE IF EXISTS account_sync_items;
DROP INDEX IF EXISTS idx_session_account_assoc_unique;
DROP INDEX IF EXISTS idx_session_account_assoc_active;
DROP INDEX IF EXISTS idx_session_account_assoc_account;
DROP INDEX IF EXISTS idx_session_account_assoc_session;
DROP TABLE IF EXISTS session_account_associations;
DROP INDEX IF EXISTS idx_connected_accounts_bot_email;
DROP INDEX IF EXISTS idx_connected_accounts_status;
DROP INDEX IF EXISTS idx_connected_accounts_provider;
DROP INDEX IF EXISTS idx_connected_accounts_email;
DROP INDEX IF EXISTS idx_connected_accounts_user_id;
DROP INDEX IF EXISTS idx_connected_accounts_bot_id;
DROP TABLE IF EXISTS connected_accounts;

View file

@ -0,0 +1,72 @@
CREATE TABLE IF NOT EXISTS connected_accounts (
id UUID PRIMARY KEY,
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
user_id UUID,
email TEXT NOT NULL,
provider TEXT NOT NULL,
account_type TEXT NOT NULL DEFAULT 'email',
access_token TEXT NOT NULL,
refresh_token TEXT,
token_expires_at TIMESTAMPTZ,
scopes TEXT,
status TEXT NOT NULL DEFAULT 'active',
sync_enabled BOOLEAN NOT NULL DEFAULT true,
sync_interval_seconds INTEGER NOT NULL DEFAULT 300,
last_sync_at TIMESTAMPTZ,
last_sync_status TEXT,
last_sync_error TEXT,
metadata_json TEXT DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_connected_accounts_bot_id ON connected_accounts(bot_id);
CREATE INDEX IF NOT EXISTS idx_connected_accounts_user_id ON connected_accounts(user_id);
CREATE INDEX IF NOT EXISTS idx_connected_accounts_email ON connected_accounts(email);
CREATE INDEX IF NOT EXISTS idx_connected_accounts_provider ON connected_accounts(provider);
CREATE INDEX IF NOT EXISTS idx_connected_accounts_status ON connected_accounts(status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_connected_accounts_bot_email ON connected_accounts(bot_id, email);
CREATE TABLE IF NOT EXISTS session_account_associations (
id UUID PRIMARY KEY,
session_id UUID NOT NULL,
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
account_id UUID NOT NULL REFERENCES connected_accounts(id) ON DELETE CASCADE,
email TEXT NOT NULL,
provider TEXT NOT NULL,
qdrant_collection TEXT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
added_by_tool TEXT
);
CREATE INDEX IF NOT EXISTS idx_session_account_assoc_session ON session_account_associations(session_id);
CREATE INDEX IF NOT EXISTS idx_session_account_assoc_account ON session_account_associations(account_id);
CREATE INDEX IF NOT EXISTS idx_session_account_assoc_active ON session_account_associations(session_id, is_active);
CREATE UNIQUE INDEX IF NOT EXISTS idx_session_account_assoc_unique ON session_account_associations(session_id, account_id);
CREATE TABLE IF NOT EXISTS account_sync_items (
id UUID PRIMARY KEY,
account_id UUID NOT NULL REFERENCES connected_accounts(id) ON DELETE CASCADE,
item_type TEXT NOT NULL,
item_id TEXT NOT NULL,
subject TEXT,
content_preview TEXT,
sender TEXT,
recipients TEXT,
item_date TIMESTAMPTZ,
folder TEXT,
labels TEXT,
has_attachments BOOLEAN DEFAULT false,
qdrant_point_id TEXT,
embedding_status TEXT DEFAULT 'pending',
metadata_json TEXT DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_account_sync_items_account ON account_sync_items(account_id);
CREATE INDEX IF NOT EXISTS idx_account_sync_items_type ON account_sync_items(item_type);
CREATE INDEX IF NOT EXISTS idx_account_sync_items_date ON account_sync_items(item_date);
CREATE INDEX IF NOT EXISTS idx_account_sync_items_embedding ON account_sync_items(embedding_status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_account_sync_items_unique ON account_sync_items(account_id, item_type, item_id);

View file

@ -0,0 +1,47 @@
-- Drop comments first
COMMENT ON TABLE public.user_organizations IS NULL;
COMMENT ON TABLE public.email_received_events IS NULL;
COMMENT ON TABLE public.folder_change_events IS NULL;
COMMENT ON TABLE public.folder_monitors IS NULL;
COMMENT ON TABLE public.email_monitors IS NULL;
COMMENT ON COLUMN public.bots.inherit_parent_config IS NULL;
COMMENT ON COLUMN public.bots.enabled_tabs_json IS NULL;
COMMENT ON COLUMN public.bots.parent_bot_id IS NULL;
COMMENT ON TABLE public.system_automations IS NULL;
-- Drop user organizations table
DROP INDEX IF EXISTS idx_user_orgs_default;
DROP INDEX IF EXISTS idx_user_orgs_org;
DROP INDEX IF EXISTS idx_user_orgs_user;
DROP TABLE IF EXISTS public.user_organizations;
-- Drop email received events table
DROP INDEX IF EXISTS idx_email_events_received;
DROP INDEX IF EXISTS idx_email_events_processed;
DROP INDEX IF EXISTS idx_email_events_monitor;
DROP TABLE IF EXISTS public.email_received_events;
-- Drop folder change events table
DROP INDEX IF EXISTS idx_folder_events_created;
DROP INDEX IF EXISTS idx_folder_events_processed;
DROP INDEX IF EXISTS idx_folder_events_monitor;
DROP TABLE IF EXISTS public.folder_change_events;
-- Drop folder monitors table
DROP INDEX IF EXISTS idx_folder_monitors_account_email;
DROP INDEX IF EXISTS idx_folder_monitors_active;
DROP INDEX IF EXISTS idx_folder_monitors_provider;
DROP INDEX IF EXISTS idx_folder_monitors_bot_id;
DROP TABLE IF EXISTS public.folder_monitors;
-- Drop email monitors table
DROP INDEX IF EXISTS idx_email_monitors_active;
DROP INDEX IF EXISTS idx_email_monitors_email;
DROP INDEX IF EXISTS idx_email_monitors_bot_id;
DROP TABLE IF EXISTS public.email_monitors;
-- Remove bot hierarchy columns
DROP INDEX IF EXISTS idx_bots_parent_bot_id;
ALTER TABLE public.bots DROP COLUMN IF EXISTS inherit_parent_config;
ALTER TABLE public.bots DROP COLUMN IF EXISTS enabled_tabs_json;
ALTER TABLE public.bots DROP COLUMN IF EXISTS parent_bot_id;

View file

@ -0,0 +1,131 @@
-- Bot Hierarchy: Add parent_bot_id to support sub-bots
ALTER TABLE public.bots
ADD COLUMN IF NOT EXISTS parent_bot_id UUID REFERENCES public.bots(id) ON DELETE SET NULL;
-- Index for efficient hierarchy queries
CREATE INDEX IF NOT EXISTS idx_bots_parent_bot_id ON public.bots(parent_bot_id);
-- Bot enabled tabs configuration (which UI tabs are enabled for this bot)
ALTER TABLE public.bots
ADD COLUMN IF NOT EXISTS enabled_tabs_json TEXT DEFAULT '["chat"]';
-- Bot configuration inheritance flag
ALTER TABLE public.bots
ADD COLUMN IF NOT EXISTS inherit_parent_config BOOLEAN DEFAULT true;
-- Email monitoring table for ON EMAIL triggers
CREATE TABLE IF NOT EXISTS public.email_monitors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bot_id UUID NOT NULL REFERENCES public.bots(id) ON DELETE CASCADE,
email_address VARCHAR(500) NOT NULL,
script_path VARCHAR(1000) NOT NULL,
is_active BOOLEAN DEFAULT true,
last_check_at TIMESTAMPTZ,
last_uid BIGINT DEFAULT 0,
filter_from VARCHAR(500),
filter_subject VARCHAR(500),
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
CONSTRAINT unique_bot_email UNIQUE (bot_id, email_address)
);
CREATE INDEX IF NOT EXISTS idx_email_monitors_bot_id ON public.email_monitors(bot_id);
CREATE INDEX IF NOT EXISTS idx_email_monitors_email ON public.email_monitors(email_address);
CREATE INDEX IF NOT EXISTS idx_email_monitors_active ON public.email_monitors(is_active) WHERE is_active = true;
-- Folder monitoring table for ON CHANGE triggers (GDrive, OneDrive, Dropbox)
-- Uses account:// syntax: account://user@gmail.com/path or gdrive:///path
CREATE TABLE IF NOT EXISTS public.folder_monitors (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bot_id UUID NOT NULL REFERENCES public.bots(id) ON DELETE CASCADE,
provider VARCHAR(50) NOT NULL, -- 'gdrive', 'onedrive', 'dropbox', 'local'
account_email VARCHAR(500), -- Email from account:// path (e.g., user@gmail.com)
folder_path VARCHAR(2000) NOT NULL,
folder_id VARCHAR(500), -- Provider-specific folder ID
script_path VARCHAR(1000) NOT NULL,
is_active BOOLEAN DEFAULT true,
watch_subfolders BOOLEAN DEFAULT true,
last_check_at TIMESTAMPTZ,
last_change_token VARCHAR(500), -- Provider-specific change token/page token
event_types_json TEXT DEFAULT '["create", "modify", "delete"]',
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
CONSTRAINT unique_bot_folder UNIQUE (bot_id, provider, folder_path)
);
CREATE INDEX IF NOT EXISTS idx_folder_monitors_bot_id ON public.folder_monitors(bot_id);
CREATE INDEX IF NOT EXISTS idx_folder_monitors_provider ON public.folder_monitors(provider);
CREATE INDEX IF NOT EXISTS idx_folder_monitors_active ON public.folder_monitors(is_active) WHERE is_active = true;
CREATE INDEX IF NOT EXISTS idx_folder_monitors_account_email ON public.folder_monitors(account_email);
-- Folder change events log
CREATE TABLE IF NOT EXISTS public.folder_change_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
monitor_id UUID NOT NULL REFERENCES public.folder_monitors(id) ON DELETE CASCADE,
event_type VARCHAR(50) NOT NULL, -- 'create', 'modify', 'delete', 'rename', 'move'
file_path VARCHAR(2000) NOT NULL,
file_id VARCHAR(500),
file_name VARCHAR(500),
file_size BIGINT,
mime_type VARCHAR(255),
old_path VARCHAR(2000), -- For rename/move events
processed BOOLEAN DEFAULT false,
processed_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_folder_events_monitor ON public.folder_change_events(monitor_id);
CREATE INDEX IF NOT EXISTS idx_folder_events_processed ON public.folder_change_events(processed) WHERE processed = false;
CREATE INDEX IF NOT EXISTS idx_folder_events_created ON public.folder_change_events(created_at);
-- Email received events log
CREATE TABLE IF NOT EXISTS public.email_received_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
monitor_id UUID NOT NULL REFERENCES public.email_monitors(id) ON DELETE CASCADE,
message_uid BIGINT NOT NULL,
message_id VARCHAR(500),
from_address VARCHAR(500) NOT NULL,
to_addresses_json TEXT,
subject VARCHAR(1000),
received_at TIMESTAMPTZ,
has_attachments BOOLEAN DEFAULT false,
attachments_json TEXT,
processed BOOLEAN DEFAULT false,
processed_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ DEFAULT NOW() NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_email_events_monitor ON public.email_received_events(monitor_id);
CREATE INDEX IF NOT EXISTS idx_email_events_processed ON public.email_received_events(processed) WHERE processed = false;
CREATE INDEX IF NOT EXISTS idx_email_events_received ON public.email_received_events(received_at);
-- Add new trigger kinds to system_automations
-- TriggerKind enum: 0=Scheduled, 1=TableUpdate, 2=TableInsert, 3=TableDelete, 4=Webhook, 5=EmailReceived, 6=FolderChange
COMMENT ON TABLE public.system_automations IS 'System automations with TriggerKind: 0=Scheduled, 1=TableUpdate, 2=TableInsert, 3=TableDelete, 4=Webhook, 5=EmailReceived, 6=FolderChange';
-- User organization memberships (users can belong to multiple orgs)
CREATE TABLE IF NOT EXISTS public.user_organizations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES public.users(user_id) ON DELETE CASCADE,
org_id UUID NOT NULL REFERENCES public.organizations(org_id) ON DELETE CASCADE,
role VARCHAR(50) DEFAULT 'member', -- 'owner', 'admin', 'member', 'viewer'
is_default BOOLEAN DEFAULT false,
joined_at TIMESTAMPTZ DEFAULT NOW() NOT NULL,
CONSTRAINT unique_user_org UNIQUE (user_id, org_id)
);
CREATE INDEX IF NOT EXISTS idx_user_orgs_user ON public.user_organizations(user_id);
CREATE INDEX IF NOT EXISTS idx_user_orgs_org ON public.user_organizations(org_id);
CREATE INDEX IF NOT EXISTS idx_user_orgs_default ON public.user_organizations(user_id, is_default) WHERE is_default = true;
-- Comments for documentation
COMMENT ON COLUMN public.bots.parent_bot_id IS 'Parent bot ID for hierarchical bot structure. NULL means root bot.';
COMMENT ON COLUMN public.bots.enabled_tabs_json IS 'JSON array of enabled UI tabs for this bot. Root bots have all tabs.';
COMMENT ON COLUMN public.bots.inherit_parent_config IS 'If true, inherits config from parent bot for missing values.';
COMMENT ON TABLE public.email_monitors IS 'Email monitoring configuration for ON EMAIL triggers.';
COMMENT ON TABLE public.folder_monitors IS 'Folder monitoring configuration for ON CHANGE triggers (GDrive, OneDrive, Dropbox).';
COMMENT ON TABLE public.folder_change_events IS 'Log of detected folder changes to be processed by scripts.';
COMMENT ON TABLE public.email_received_events IS 'Log of received emails to be processed by scripts.';
COMMENT ON TABLE public.user_organizations IS 'User membership in organizations with roles.';

View file

@ -8,13 +8,13 @@ declare -A container_limits=(
["*doc-editor*"]="512MB:10ms/100ms"
["*proxy*"]="2048MB:100ms/100ms"
["*directory*"]="1024MB:50ms/100ms"
["*drive*"]="4096MB:50ms/100ms"
["*drive*"]="4096MB:100ms/100ms"
["*email*"]="4096MB:100ms/100ms"
["*webmail*"]="4096MB:100ms/100ms"
["*bot*"]="4096MB:50ms/100ms"
["*bot*"]="2048MB:50ms/100ms"
["*meeting*"]="4096MB:100ms/100ms"
["*alm*"]="512MB:50ms/100ms"
["*alm-ci*"]="4096MB:100ms/100ms"
["*alm-ci*"]="8192MB:100ms/100ms"
["*system*"]="4096MB:50ms/100ms"
["*mailer*"]="4096MB:25ms/100ms"
)

View file

@ -28,6 +28,9 @@
| |
\*****************************************************************************/
use crate::basic::keywords::use_account::{
get_account_credentials, is_account_path, parse_account_path,
};
use crate::shared::models::schema::bots::dsl::*;
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
@ -1100,6 +1103,13 @@ async fn execute_copy(
source: &str,
destination: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let source_is_account = is_account_path(source);
let dest_is_account = is_account_path(destination);
if source_is_account || dest_is_account {
return execute_copy_with_account(state, user, source, destination).await;
}
let client = state.drive.as_ref().ok_or("S3 client not configured")?;
let bot_name: String = {
@ -1132,6 +1142,179 @@ async fn execute_copy(
Ok(())
}
async fn execute_copy_with_account(
state: &AppState,
user: &UserSession,
source: &str,
destination: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let source_is_account = is_account_path(source);
let dest_is_account = is_account_path(destination);
let content = if source_is_account {
let (email, path) = parse_account_path(source).ok_or("Invalid account:// path format")?;
let creds = get_account_credentials(&state.conn, &email, user.bot_id)
.await
.map_err(|e| format!("Failed to get credentials: {}", e))?;
download_from_account(&creds, &path).await?
} else {
read_from_local(state, user, source).await?
};
if dest_is_account {
let (email, path) =
parse_account_path(destination).ok_or("Invalid account:// path format")?;
let creds = get_account_credentials(&state.conn, &email, user.bot_id)
.await
.map_err(|e| format!("Failed to get credentials: {}", e))?;
upload_to_account(&creds, &path, &content).await?;
} else {
write_to_local(state, user, destination, &content).await?;
}
trace!(
"COPY with account successful: {} -> {}",
source,
destination
);
Ok(())
}
async fn download_from_account(
creds: &crate::basic::keywords::use_account::AccountCredentials,
path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let client = reqwest::Client::new();
match creds.provider.as_str() {
"gmail" | "google" => {
let url = format!(
"https://www.googleapis.com/drive/v3/files/{}?alt=media",
urlencoding::encode(path)
);
let resp = client
.get(&url)
.bearer_auth(&creds.access_token)
.send()
.await?;
if !resp.status().is_success() {
return Err(format!("Google Drive download failed: {}", resp.status()).into());
}
Ok(resp.bytes().await?.to_vec())
}
"outlook" | "microsoft" => {
let url = format!(
"https://graph.microsoft.com/v1.0/me/drive/root:/{}:/content",
urlencoding::encode(path)
);
let resp = client
.get(&url)
.bearer_auth(&creds.access_token)
.send()
.await?;
if !resp.status().is_success() {
return Err(format!("OneDrive download failed: {}", resp.status()).into());
}
Ok(resp.bytes().await?.to_vec())
}
_ => Err(format!("Unsupported provider: {}", creds.provider).into()),
}
}
async fn upload_to_account(
creds: &crate::basic::keywords::use_account::AccountCredentials,
path: &str,
content: &[u8],
) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = reqwest::Client::new();
match creds.provider.as_str() {
"gmail" | "google" => {
let url = format!(
"https://www.googleapis.com/upload/drive/v3/files?uploadType=media&name={}",
urlencoding::encode(path)
);
let resp = client
.post(&url)
.bearer_auth(&creds.access_token)
.body(content.to_vec())
.send()
.await?;
if !resp.status().is_success() {
return Err(format!("Google Drive upload failed: {}", resp.status()).into());
}
}
"outlook" | "microsoft" => {
let url = format!(
"https://graph.microsoft.com/v1.0/me/drive/root:/{}:/content",
urlencoding::encode(path)
);
let resp = client
.put(&url)
.bearer_auth(&creds.access_token)
.body(content.to_vec())
.send()
.await?;
if !resp.status().is_success() {
return Err(format!("OneDrive upload failed: {}", resp.status()).into());
}
}
_ => return Err(format!("Unsupported provider: {}", creds.provider).into()),
}
Ok(())
}
async fn read_from_local(
state: &AppState,
user: &UserSession,
path: &str,
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let client = state.drive.as_ref().ok_or("S3 client not configured")?;
let bot_name: String = {
let mut db_conn = state.conn.get()?;
bots.filter(id.eq(&user.bot_id))
.select(name)
.first(&mut *db_conn)?
};
let bucket_name = format!("{}.gbai", bot_name);
let key = format!("{}.gbdrive/{}", bot_name, path);
let result = client
.get_object()
.bucket(&bucket_name)
.key(&key)
.send()
.await?;
let bytes = result.body.collect().await?.into_bytes();
Ok(bytes.to_vec())
}
async fn write_to_local(
state: &AppState,
user: &UserSession,
path: &str,
content: &[u8],
) -> Result<(), Box<dyn Error + Send + Sync>> {
let client = state.drive.as_ref().ok_or("S3 client not configured")?;
let bot_name: String = {
let mut db_conn = state.conn.get()?;
bots.filter(id.eq(&user.bot_id))
.select(name)
.first(&mut *db_conn)?
};
let bucket_name = format!("{}.gbai", bot_name);
let key = format!("{}.gbdrive/{}", bot_name, path);
client
.put_object()
.bucket(&bucket_name)
.key(&key)
.body(content.to_vec().into())
.send()
.await?;
Ok(())
}
/// Move/rename file within .gbdrive
async fn execute_move(
state: &AppState,

View file

@ -46,6 +46,8 @@ pub mod messaging;
pub mod model_routing;
pub mod multimodal;
pub mod on;
pub mod on_change;
pub mod on_email;
pub mod on_form_submit;
pub mod play;
pub mod print;
@ -68,6 +70,7 @@ pub mod switch_case;
pub mod table_definition;
pub mod transfer_to_human;
pub mod universal_messaging;
pub mod use_account;
pub mod use_kb;
pub mod use_tool;
pub mod use_website;
@ -223,6 +226,7 @@ pub fn get_all_keywords() -> Vec<String> {
// Knowledge
"CLEAR KB".to_string(),
"USE KB".to_string(),
"USE ACCOUNT".to_string(),
"USE WEBSITE".to_string(),
// AI/LLM
"LLM".to_string(),
@ -237,6 +241,8 @@ pub fn get_all_keywords() -> Vec<String> {
"TALK".to_string(),
// Events
"ON".to_string(),
"ON EMAIL".to_string(),
"ON CHANGE".to_string(),
"SET SCHEDULE".to_string(),
"WEBHOOK".to_string(),
// Session
@ -383,5 +389,15 @@ pub fn get_keyword_categories() -> std::collections::HashMap<String, Vec<String>
],
);
categories.insert(
"Monitors".to_string(),
vec![
"ON EMAIL".to_string(),
"ON CHANGE".to_string(),
"SET SCHEDULE".to_string(),
"WEBHOOK".to_string(),
],
);
categories
}

View file

@ -0,0 +1,709 @@
use diesel::prelude::*;
use log::{error, info, trace};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use uuid::Uuid;
use crate::shared::models::TriggerKind;
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum FolderProvider {
GDrive,
OneDrive,
Dropbox,
Local,
}
impl FolderProvider {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"gdrive" | "googledrive" | "google" => Some(Self::GDrive),
"onedrive" | "microsoft" | "ms" => Some(Self::OneDrive),
"dropbox" | "dbx" => Some(Self::Dropbox),
"local" | "filesystem" | "fs" => Some(Self::Local),
_ => None,
}
}
pub fn as_str(&self) -> &'static str {
match self {
Self::GDrive => "gdrive",
Self::OneDrive => "onedrive",
Self::Dropbox => "dropbox",
Self::Local => "local",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum ChangeEventType {
Create,
Modify,
Delete,
Rename,
Move,
}
impl ChangeEventType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Create => "create",
Self::Modify => "modify",
Self::Delete => "delete",
Self::Rename => "rename",
Self::Move => "move",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"create" | "created" | "new" => Some(Self::Create),
"modify" | "modified" | "change" | "changed" => Some(Self::Modify),
"delete" | "deleted" | "remove" | "removed" => Some(Self::Delete),
"rename" | "renamed" => Some(Self::Rename),
"move" | "moved" => Some(Self::Move),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FolderMonitor {
pub id: Uuid,
pub bot_id: Uuid,
pub provider: String,
pub account_email: Option<String>,
pub folder_path: String,
pub folder_id: Option<String>,
pub script_path: String,
pub is_active: bool,
pub watch_subfolders: bool,
pub event_types: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FolderChangeEvent {
pub id: Uuid,
pub monitor_id: Uuid,
pub event_type: String,
pub file_path: String,
pub file_id: Option<String>,
pub file_name: Option<String>,
pub file_size: Option<i64>,
pub mime_type: Option<String>,
pub old_path: Option<String>,
}
pub fn parse_folder_path(path: &str) -> (FolderProvider, Option<String>, String) {
if path.starts_with("account://") {
let rest = &path[10..];
if let Some(slash_pos) = rest.find('/') {
let email = &rest[..slash_pos];
let folder_path = &rest[slash_pos..];
let provider = detect_provider_from_email(email);
return (provider, Some(email.to_string()), folder_path.to_string());
}
}
if path.starts_with("gdrive://") {
let folder_path = &path[9..];
return (FolderProvider::GDrive, None, folder_path.to_string());
}
if path.starts_with("onedrive://") {
let folder_path = &path[11..];
return (FolderProvider::OneDrive, None, folder_path.to_string());
}
if path.starts_with("dropbox://") {
let folder_path = &path[10..];
return (FolderProvider::Dropbox, None, folder_path.to_string());
}
(FolderProvider::Local, None, path.to_string())
}
fn detect_provider_from_email(email: &str) -> FolderProvider {
let lower = email.to_lowercase();
if lower.ends_with("@gmail.com") || lower.contains("google") {
FolderProvider::GDrive
} else if lower.ends_with("@outlook.com")
|| lower.ends_with("@hotmail.com")
|| lower.contains("microsoft")
{
FolderProvider::OneDrive
} else {
FolderProvider::GDrive
}
}
pub fn is_cloud_path(path: &str) -> bool {
path.starts_with("account://")
|| path.starts_with("gdrive://")
|| path.starts_with("onedrive://")
|| path.starts_with("dropbox://")
}
pub fn on_change_keyword(state: &AppState, user: UserSession, engine: &mut Engine) {
register_on_change_basic(state, user.clone(), engine);
register_on_change_with_events(state, user, engine);
}
fn register_on_change_basic(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
let bot_id = user.bot_id;
engine
.register_custom_syntax(
&["ON", "CHANGE", "$string$"],
true,
move |context, inputs| {
let path = context
.eval_expression_tree(&inputs[0])?
.to_string()
.trim_matches('"')
.to_string();
let (provider, account_email, folder_path) = parse_folder_path(&path);
trace!(
"ON CHANGE '{}' (provider: {}, account: {:?}) for bot: {}",
folder_path,
provider.as_str(),
account_email,
bot_id
);
let script_name = format!(
"on_change_{}.rhai",
sanitize_path_for_filename(&folder_path)
);
let mut conn = state_clone
.conn
.get()
.map_err(|e| format!("DB error: {}", e))?;
let result = execute_on_change(
&mut *conn,
bot_id,
provider,
account_email.as_deref(),
&folder_path,
&script_name,
true,
vec!["create", "modify", "delete"],
)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
info!(
"Folder monitor registered for '{}' ({}) on bot {}",
folder_path,
provider.as_str(),
bot_id
);
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("Failed to register folder monitor".into())
}
},
)
.unwrap();
}
fn register_on_change_with_events(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
let bot_id = user.bot_id;
engine
.register_custom_syntax(
&["ON", "CHANGE", "$string$", "EVENTS", "$expr$"],
true,
move |context, inputs| {
let path = context
.eval_expression_tree(&inputs[0])?
.to_string()
.trim_matches('"')
.to_string();
let events_value = context.eval_expression_tree(&inputs[1])?;
let events_str = events_value.to_string();
let events: Vec<&str> = events_str
.trim_matches('"')
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
let (provider, account_email, folder_path) = parse_folder_path(&path);
trace!(
"ON CHANGE '{}' EVENTS {:?} (provider: {}) for bot: {}",
folder_path,
events,
provider.as_str(),
bot_id
);
let script_name = format!(
"on_change_{}.rhai",
sanitize_path_for_filename(&folder_path)
);
let mut conn = state_clone
.conn
.get()
.map_err(|e| format!("DB error: {}", e))?;
let result = execute_on_change(
&mut *conn,
bot_id,
provider,
account_email.as_deref(),
&folder_path,
&script_name,
true,
events,
)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
info!(
"Folder monitor registered for '{}' with events {:?} on bot {}",
folder_path, events_str, bot_id
);
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("Failed to register folder monitor".into())
}
},
)
.unwrap();
}
fn sanitize_path_for_filename(path: &str) -> String {
path.replace('/', "_")
.replace('\\', "_")
.replace(':', "_")
.replace(' ', "_")
.replace('.', "_")
.chars()
.filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
.collect::<String>()
.to_lowercase()
}
pub fn execute_on_change(
conn: &mut diesel::PgConnection,
bot_id: Uuid,
provider: FolderProvider,
account_email: Option<&str>,
folder_path: &str,
script_path: &str,
watch_subfolders: bool,
event_types: Vec<&str>,
) -> Result<Value, String> {
use crate::shared::models::system_automations;
let target = match account_email {
Some(email) => format!("account://{}{}", email, folder_path),
None => format!("{}://{}", provider.as_str(), folder_path),
};
let new_automation = (
system_automations::kind.eq(TriggerKind::FolderChange as i32),
system_automations::target.eq(&target),
system_automations::param.eq(script_path),
system_automations::bot_id.eq(bot_id),
);
let result = diesel::insert_into(system_automations::table)
.values(&new_automation)
.execute(conn)
.map_err(|e| {
error!("SQL execution error: {}", e);
e.to_string()
})?;
let monitor_id = Uuid::new_v4();
let events_json = serde_json::to_string(&event_types).unwrap_or_else(|_| "[]".to_string());
let account_sql = account_email
.map(|e| format!("'{}'", e.replace('\'', "''")))
.unwrap_or_else(|| "NULL".to_string());
let insert_sql = format!(
"INSERT INTO folder_monitors (id, bot_id, provider, folder_path, script_path, is_active, watch_subfolders, event_types_json) \
VALUES ('{}', '{}', '{}', '{}', '{}', true, {}, '{}') \
ON CONFLICT (bot_id, provider, folder_path) DO UPDATE SET \
script_path = EXCLUDED.script_path, \
watch_subfolders = EXCLUDED.watch_subfolders, \
event_types_json = EXCLUDED.event_types_json, \
is_active = true, \
updated_at = NOW()",
monitor_id,
bot_id,
provider.as_str(),
folder_path.replace('\'', "''"),
script_path.replace('\'', "''"),
watch_subfolders,
events_json.replace('\'', "''")
);
diesel::sql_query(&insert_sql).execute(conn).map_err(|e| {
error!("Failed to insert folder monitor: {}", e);
e.to_string()
})?;
Ok(json!({
"command": "on_change",
"provider": provider.as_str(),
"account_email": account_sql,
"folder_path": folder_path,
"script_path": script_path,
"watch_subfolders": watch_subfolders,
"event_types": event_types,
"rows_affected": result
}))
}
pub async fn check_folder_monitors(
state: &AppState,
bot_id: Uuid,
) -> Result<Vec<(FolderChangeEvent, String)>, String> {
let mut conn = state.conn.get().map_err(|e| e.to_string())?;
let monitors_sql = format!(
"SELECT id, bot_id, provider, folder_path, folder_id, script_path, \
watch_subfolders, last_change_token, event_types_json \
FROM folder_monitors WHERE bot_id = '{}' AND is_active = true",
bot_id
);
#[derive(QueryableByName)]
struct MonitorRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
provider: String,
#[diesel(sql_type = diesel::sql_types::Text)]
folder_path: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
folder_id: Option<String>,
#[diesel(sql_type = diesel::sql_types::Text)]
script_path: String,
#[diesel(sql_type = diesel::sql_types::Bool)]
watch_subfolders: bool,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
last_change_token: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
event_types_json: Option<String>,
}
let monitors: Vec<MonitorRow> = diesel::sql_query(&monitors_sql)
.load(&mut *conn)
.map_err(|e| e.to_string())?;
let mut events = Vec::new();
for monitor in monitors {
let event_types: Vec<String> = monitor
.event_types_json
.as_ref()
.and_then(|j| serde_json::from_str(j).ok())
.unwrap_or_else(|| {
vec![
"create".to_string(),
"modify".to_string(),
"delete".to_string(),
]
});
trace!(
"Checking folder monitor {} for {} on bot {} (provider: {}, events: {:?}, subfolders: {})",
monitor.id,
monitor.folder_path,
monitor.bot_id,
monitor.provider,
event_types,
monitor.watch_subfolders
);
let provider = FolderProvider::from_str(&monitor.provider).unwrap_or(FolderProvider::Local);
let new_events = fetch_folder_changes(
state,
monitor.id,
provider,
&monitor.folder_path,
monitor.folder_id.as_deref(),
monitor.last_change_token.as_deref(),
monitor.watch_subfolders,
&event_types,
)
.await?;
for event in new_events {
events.push((event, monitor.script_path.clone()));
}
}
Ok(events)
}
async fn fetch_folder_changes(
_state: &AppState,
monitor_id: Uuid,
provider: FolderProvider,
folder_path: &str,
_folder_id: Option<&str>,
_last_token: Option<&str>,
_watch_subfolders: bool,
_event_types: &[String],
) -> Result<Vec<FolderChangeEvent>, String> {
trace!(
"Fetching {} changes for monitor {} path {}",
provider.as_str(),
monitor_id,
folder_path
);
Ok(Vec::new())
}
pub async fn process_folder_event(
state: &AppState,
event: &FolderChangeEvent,
script_path: &str,
) -> Result<(), String> {
info!(
"Processing folder event {} ({}) for {} with script {}",
event.id, event.event_type, event.file_path, script_path
);
let mut conn = state.conn.get().map_err(|e| e.to_string())?;
let update_sql = format!(
"UPDATE folder_change_events SET processed = true, processed_at = NOW() WHERE id = '{}'",
event.id
);
diesel::sql_query(&update_sql)
.execute(&mut *conn)
.map_err(|e| e.to_string())?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_folder_path_account() {
let (provider, email, path) =
parse_folder_path("account://user@gmail.com/Documents/invoices");
assert_eq!(provider, FolderProvider::GDrive);
assert_eq!(email, Some("user@gmail.com".to_string()));
assert_eq!(path, "/Documents/invoices");
}
#[test]
fn test_parse_folder_path_gdrive() {
let (provider, email, path) = parse_folder_path("gdrive:///shared/reports");
assert_eq!(provider, FolderProvider::GDrive);
assert_eq!(email, None);
assert_eq!(path, "/shared/reports");
}
#[test]
fn test_parse_folder_path_onedrive() {
let (provider, email, path) = parse_folder_path("onedrive:///business/docs");
assert_eq!(provider, FolderProvider::OneDrive);
assert_eq!(email, None);
assert_eq!(path, "/business/docs");
}
#[test]
fn test_parse_folder_path_dropbox() {
let (provider, email, path) = parse_folder_path("dropbox:///team/assets");
assert_eq!(provider, FolderProvider::Dropbox);
assert_eq!(email, None);
assert_eq!(path, "/team/assets");
}
#[test]
fn test_parse_folder_path_local() {
let (provider, email, path) = parse_folder_path("/home/user/documents");
assert_eq!(provider, FolderProvider::Local);
assert_eq!(email, None);
assert_eq!(path, "/home/user/documents");
}
#[test]
fn test_is_cloud_path() {
assert!(is_cloud_path("account://user@gmail.com/docs"));
assert!(is_cloud_path("gdrive:///shared"));
assert!(is_cloud_path("onedrive:///files"));
assert!(is_cloud_path("dropbox:///folder"));
assert!(!is_cloud_path("/local/path"));
assert!(!is_cloud_path("./relative/path"));
}
#[test]
fn test_folder_provider_from_str() {
assert_eq!(
FolderProvider::from_str("gdrive"),
Some(FolderProvider::GDrive)
);
assert_eq!(
FolderProvider::from_str("GDRIVE"),
Some(FolderProvider::GDrive)
);
assert_eq!(
FolderProvider::from_str("googledrive"),
Some(FolderProvider::GDrive)
);
assert_eq!(
FolderProvider::from_str("onedrive"),
Some(FolderProvider::OneDrive)
);
assert_eq!(
FolderProvider::from_str("microsoft"),
Some(FolderProvider::OneDrive)
);
assert_eq!(
FolderProvider::from_str("dropbox"),
Some(FolderProvider::Dropbox)
);
assert_eq!(
FolderProvider::from_str("dbx"),
Some(FolderProvider::Dropbox)
);
assert_eq!(
FolderProvider::from_str("local"),
Some(FolderProvider::Local)
);
assert_eq!(
FolderProvider::from_str("filesystem"),
Some(FolderProvider::Local)
);
assert_eq!(FolderProvider::from_str("unknown"), None);
}
#[test]
fn test_change_event_type_from_str() {
assert_eq!(
ChangeEventType::from_str("create"),
Some(ChangeEventType::Create)
);
assert_eq!(
ChangeEventType::from_str("created"),
Some(ChangeEventType::Create)
);
assert_eq!(
ChangeEventType::from_str("modify"),
Some(ChangeEventType::Modify)
);
assert_eq!(
ChangeEventType::from_str("changed"),
Some(ChangeEventType::Modify)
);
assert_eq!(
ChangeEventType::from_str("delete"),
Some(ChangeEventType::Delete)
);
assert_eq!(
ChangeEventType::from_str("removed"),
Some(ChangeEventType::Delete)
);
assert_eq!(
ChangeEventType::from_str("rename"),
Some(ChangeEventType::Rename)
);
assert_eq!(
ChangeEventType::from_str("move"),
Some(ChangeEventType::Move)
);
assert_eq!(ChangeEventType::from_str("invalid"), None);
}
#[test]
fn test_sanitize_path() {
assert_eq!(
sanitize_path_for_filename("/home/user/docs"),
"_home_user_docs"
);
assert_eq!(
sanitize_path_for_filename("C:\\Users\\docs"),
"c__users_docs"
);
assert_eq!(
sanitize_path_for_filename("path with spaces"),
"path_with_spaces"
);
}
#[test]
fn test_folder_monitor_struct() {
let monitor = FolderMonitor {
id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
provider: "gdrive".to_string(),
account_email: Some("user@gmail.com".to_string()),
folder_path: "/my/folder".to_string(),
folder_id: Some("folder123".to_string()),
script_path: "on_change.rhai".to_string(),
is_active: true,
watch_subfolders: true,
event_types: vec!["create".to_string(), "modify".to_string()],
};
assert_eq!(monitor.provider, "gdrive");
assert!(monitor.is_active);
assert!(monitor.watch_subfolders);
assert_eq!(monitor.account_email, Some("user@gmail.com".to_string()));
}
#[test]
fn test_folder_change_event_struct() {
let event = FolderChangeEvent {
id: Uuid::new_v4(),
monitor_id: Uuid::new_v4(),
event_type: "create".to_string(),
file_path: "/docs/new_file.pdf".to_string(),
file_id: Some("file123".to_string()),
file_name: Some("new_file.pdf".to_string()),
file_size: Some(1024),
mime_type: Some("application/pdf".to_string()),
old_path: None,
};
assert_eq!(event.event_type, "create");
assert_eq!(event.file_size, Some(1024));
}
#[test]
fn test_detect_provider_from_email() {
assert_eq!(
detect_provider_from_email("user@gmail.com"),
FolderProvider::GDrive
);
assert_eq!(
detect_provider_from_email("user@outlook.com"),
FolderProvider::OneDrive
);
assert_eq!(
detect_provider_from_email("user@hotmail.com"),
FolderProvider::OneDrive
);
assert_eq!(
detect_provider_from_email("user@company.com"),
FolderProvider::GDrive
);
}
}

View file

@ -0,0 +1,598 @@
use diesel::prelude::*;
use log::{error, info, trace};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use uuid::Uuid;
use crate::shared::models::TriggerKind;
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailMonitor {
pub id: Uuid,
pub bot_id: Uuid,
pub email_address: String,
pub script_path: String,
pub is_active: bool,
pub filter_from: Option<String>,
pub filter_subject: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailReceivedEvent {
pub id: Uuid,
pub monitor_id: Uuid,
pub message_uid: i64,
pub message_id: Option<String>,
pub from_address: String,
pub to_addresses: Vec<String>,
pub subject: Option<String>,
pub has_attachments: bool,
pub attachments: Vec<EmailAttachment>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailAttachment {
pub filename: String,
pub mime_type: String,
pub size: i64,
}
pub fn on_email_keyword(state: &AppState, user: UserSession, engine: &mut Engine) {
register_on_email(state, user.clone(), engine);
register_on_email_from(state, user.clone(), engine);
register_on_email_subject(state, user, engine);
}
fn register_on_email(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
let bot_id = user.bot_id;
engine
.register_custom_syntax(
&["ON", "EMAIL", "$string$"],
true,
move |context, inputs| {
let email_address = context
.eval_expression_tree(&inputs[0])?
.to_string()
.trim_matches('"')
.to_string();
trace!("ON EMAIL '{}' for bot: {}", email_address, bot_id);
let script_name = format!(
"on_email_{}.rhai",
email_address.replace('@', "_at_").replace('.', "_")
);
let mut conn = state_clone
.conn
.get()
.map_err(|e| format!("DB error: {}", e))?;
let result =
execute_on_email(&mut *conn, bot_id, &email_address, &script_name, None, None)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
info!(
"Email monitor registered for '{}' on bot {}",
email_address, bot_id
);
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("Failed to register email monitor".into())
}
},
)
.unwrap();
}
fn register_on_email_from(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
let bot_id = user.bot_id;
engine
.register_custom_syntax(
&["ON", "EMAIL", "$string$", "FROM", "$string$"],
true,
move |context, inputs| {
let email_address = context
.eval_expression_tree(&inputs[0])?
.to_string()
.trim_matches('"')
.to_string();
let filter_from = context
.eval_expression_tree(&inputs[1])?
.to_string()
.trim_matches('"')
.to_string();
trace!(
"ON EMAIL '{}' FROM '{}' for bot: {}",
email_address,
filter_from,
bot_id
);
let script_name = format!(
"on_email_{}_from_{}.rhai",
email_address.replace('@', "_at_").replace('.', "_"),
filter_from.replace('@', "_at_").replace('.', "_")
);
let mut conn = state_clone
.conn
.get()
.map_err(|e| format!("DB error: {}", e))?;
let result = execute_on_email(
&mut *conn,
bot_id,
&email_address,
&script_name,
Some(&filter_from),
None,
)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
info!(
"Email monitor registered for '{}' from '{}' on bot {}",
email_address, filter_from, bot_id
);
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("Failed to register email monitor".into())
}
},
)
.unwrap();
}
fn register_on_email_subject(state: &AppState, user: UserSession, engine: &mut Engine) {
let state_clone = state.clone();
let bot_id = user.bot_id;
engine
.register_custom_syntax(
&["ON", "EMAIL", "$string$", "SUBJECT", "$string$"],
true,
move |context, inputs| {
let email_address = context
.eval_expression_tree(&inputs[0])?
.to_string()
.trim_matches('"')
.to_string();
let filter_subject = context
.eval_expression_tree(&inputs[1])?
.to_string()
.trim_matches('"')
.to_string();
trace!(
"ON EMAIL '{}' SUBJECT '{}' for bot: {}",
email_address,
filter_subject,
bot_id
);
let script_name = format!(
"on_email_{}_subject.rhai",
email_address.replace('@', "_at_").replace('.', "_")
);
let mut conn = state_clone
.conn
.get()
.map_err(|e| format!("DB error: {}", e))?;
let result = execute_on_email(
&mut *conn,
bot_id,
&email_address,
&script_name,
None,
Some(&filter_subject),
)
.map_err(|e| format!("DB error: {}", e))?;
if let Some(rows_affected) = result.get("rows_affected") {
info!(
"Email monitor registered for '{}' with subject filter '{}' on bot {}",
email_address, filter_subject, bot_id
);
Ok(Dynamic::from(rows_affected.as_i64().unwrap_or(0)))
} else {
Err("Failed to register email monitor".into())
}
},
)
.unwrap();
}
pub fn execute_on_email(
conn: &mut diesel::PgConnection,
bot_id: Uuid,
email_address: &str,
script_path: &str,
filter_from: Option<&str>,
filter_subject: Option<&str>,
) -> Result<Value, String> {
use crate::shared::models::system_automations;
let new_automation = (
system_automations::kind.eq(TriggerKind::EmailReceived as i32),
system_automations::target.eq(email_address),
system_automations::param.eq(script_path),
system_automations::bot_id.eq(bot_id),
);
let result = diesel::insert_into(system_automations::table)
.values(&new_automation)
.execute(conn)
.map_err(|e| {
error!("SQL execution error: {}", e);
e.to_string()
})?;
let monitor_id = Uuid::new_v4();
let insert_sql = format!(
"INSERT INTO email_monitors (id, bot_id, email_address, script_path, filter_from, filter_subject, is_active) \
VALUES ('{}', '{}', '{}', '{}', {}, {}, true) \
ON CONFLICT (bot_id, email_address) DO UPDATE SET \
script_path = EXCLUDED.script_path, \
filter_from = EXCLUDED.filter_from, \
filter_subject = EXCLUDED.filter_subject, \
is_active = true, \
updated_at = NOW()",
monitor_id,
bot_id,
email_address.replace('\'', "''"),
script_path.replace('\'', "''"),
filter_from.map(|f| format!("'{}'", f.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string()),
filter_subject.map(|s| format!("'{}'", s.replace('\'', "''"))).unwrap_or_else(|| "NULL".to_string())
);
diesel::sql_query(&insert_sql).execute(conn).map_err(|e| {
error!("Failed to insert email monitor: {}", e);
e.to_string()
})?;
Ok(json!({
"command": "on_email",
"email_address": email_address,
"script_path": script_path,
"filter_from": filter_from,
"filter_subject": filter_subject,
"rows_affected": result
}))
}
pub async fn check_email_monitors(
state: &AppState,
bot_id: Uuid,
) -> Result<Vec<(EmailReceivedEvent, String)>, String> {
let mut conn = state.conn.get().map_err(|e| e.to_string())?;
let monitors_sql = format!(
"SELECT id, bot_id, email_address, script_path, filter_from, filter_subject, last_uid \
FROM email_monitors WHERE bot_id = '{}' AND is_active = true",
bot_id
);
#[derive(QueryableByName)]
struct MonitorRow {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
email_address: String,
#[diesel(sql_type = diesel::sql_types::Text)]
script_path: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
filter_from: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
filter_subject: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::BigInt>)]
last_uid: Option<i64>,
}
let monitors: Vec<MonitorRow> = diesel::sql_query(&monitors_sql)
.load(&mut *conn)
.map_err(|e| e.to_string())?;
let mut events = Vec::new();
for monitor in monitors {
trace!(
"Checking email monitor for {} on bot {} (last_uid: {:?})",
monitor.email_address,
monitor.bot_id,
monitor.last_uid
);
let new_events = fetch_new_emails(
state,
monitor.id,
&monitor.email_address,
monitor.last_uid.unwrap_or(0),
monitor.filter_from.as_deref(),
monitor.filter_subject.as_deref(),
)
.await?;
for event in new_events {
events.push((event, monitor.script_path.clone()));
}
}
Ok(events)
}
async fn fetch_new_emails(
_state: &AppState,
monitor_id: Uuid,
_email_address: &str,
_last_uid: i64,
_filter_from: Option<&str>,
_filter_subject: Option<&str>,
) -> Result<Vec<EmailReceivedEvent>, String> {
trace!("Fetching new emails for monitor {}", monitor_id);
Ok(Vec::new())
}
pub async fn process_email_event(
state: &AppState,
event: &EmailReceivedEvent,
script_path: &str,
) -> Result<(), String> {
info!(
"Processing email event {} from {} with script {}",
event.id, event.from_address, script_path
);
let mut conn = state.conn.get().map_err(|e| e.to_string())?;
let update_sql = format!(
"UPDATE email_received_events SET processed = true, processed_at = NOW() WHERE id = '{}'",
event.id
);
diesel::sql_query(&update_sql)
.execute(&mut *conn)
.map_err(|e| e.to_string())?;
Ok(())
}
pub fn parse_email_path(path: &str) -> Option<(String, Option<String>)> {
if path.starts_with("email://") {
let rest = &path[8..];
if let Some(slash_pos) = rest.find('/') {
let email = &rest[..slash_pos];
let folder = &rest[slash_pos + 1..];
return Some((email.to_string(), Some(folder.to_string())));
}
return Some((rest.to_string(), None));
}
None
}
pub fn is_email_path(path: &str) -> bool {
path.starts_with("email://")
}
pub fn sanitize_email_for_filename(email: &str) -> String {
email
.replace('@', "_at_")
.replace('.', "_")
.chars()
.filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
.collect::<String>()
.to_lowercase()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_email_monitor_struct() {
let monitor = EmailMonitor {
id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
email_address: "test@example.com".to_string(),
script_path: "on_email_test.rhai".to_string(),
is_active: true,
filter_from: None,
filter_subject: None,
};
assert_eq!(monitor.email_address, "test@example.com");
assert!(monitor.is_active);
assert!(monitor.filter_from.is_none());
assert!(monitor.filter_subject.is_none());
}
#[test]
fn test_email_monitor_with_filters() {
let monitor = EmailMonitor {
id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
email_address: "orders@company.com".to_string(),
script_path: "on_email_orders.rhai".to_string(),
is_active: true,
filter_from: Some("supplier@vendor.com".to_string()),
filter_subject: Some("Invoice".to_string()),
};
assert_eq!(monitor.email_address, "orders@company.com");
assert_eq!(monitor.filter_from, Some("supplier@vendor.com".to_string()));
assert_eq!(monitor.filter_subject, Some("Invoice".to_string()));
}
#[test]
fn test_email_attachment_struct() {
let attachment = EmailAttachment {
filename: "document.pdf".to_string(),
mime_type: "application/pdf".to_string(),
size: 1024,
};
assert_eq!(attachment.filename, "document.pdf");
assert_eq!(attachment.mime_type, "application/pdf");
assert_eq!(attachment.size, 1024);
}
#[test]
fn test_email_received_event_struct() {
let event = EmailReceivedEvent {
id: Uuid::new_v4(),
monitor_id: Uuid::new_v4(),
message_uid: 12345,
message_id: Some("<msg123@example.com>".to_string()),
from_address: "sender@example.com".to_string(),
to_addresses: vec!["recipient@example.com".to_string()],
subject: Some("Test Subject".to_string()),
has_attachments: true,
attachments: vec![EmailAttachment {
filename: "file.pdf".to_string(),
mime_type: "application/pdf".to_string(),
size: 2048,
}],
};
assert_eq!(event.message_uid, 12345);
assert_eq!(event.from_address, "sender@example.com");
assert!(event.has_attachments);
assert_eq!(event.attachments.len(), 1);
assert_eq!(event.attachments[0].filename, "file.pdf");
}
#[test]
fn test_parse_email_path_basic() {
let result = parse_email_path("email://user@gmail.com");
assert!(result.is_some());
let (email, folder) = result.unwrap();
assert_eq!(email, "user@gmail.com");
assert!(folder.is_none());
}
#[test]
fn test_parse_email_path_with_folder() {
let result = parse_email_path("email://user@gmail.com/INBOX");
assert!(result.is_some());
let (email, folder) = result.unwrap();
assert_eq!(email, "user@gmail.com");
assert_eq!(folder, Some("INBOX".to_string()));
}
#[test]
fn test_parse_email_path_invalid() {
assert!(parse_email_path("user@gmail.com").is_none());
assert!(parse_email_path("mailto:user@gmail.com").is_none());
assert!(parse_email_path("/local/path").is_none());
}
#[test]
fn test_is_email_path() {
assert!(is_email_path("email://user@gmail.com"));
assert!(is_email_path("email://user@company.com/INBOX"));
assert!(!is_email_path("user@gmail.com"));
assert!(!is_email_path("mailto:user@gmail.com"));
assert!(!is_email_path("account://user@gmail.com"));
}
#[test]
fn test_sanitize_email_for_filename() {
assert_eq!(
sanitize_email_for_filename("user@gmail.com"),
"user_at_gmail_com"
);
assert_eq!(
sanitize_email_for_filename("test.user@company.co.uk"),
"test_user_at_company_co_uk"
);
assert_eq!(
sanitize_email_for_filename("USER@EXAMPLE.COM"),
"user_at_example_com"
);
}
#[test]
fn test_email_event_without_attachments() {
let event = EmailReceivedEvent {
id: Uuid::new_v4(),
monitor_id: Uuid::new_v4(),
message_uid: 1,
message_id: None,
from_address: "no-reply@system.com".to_string(),
to_addresses: vec![],
subject: None,
has_attachments: false,
attachments: vec![],
};
assert!(!event.has_attachments);
assert!(event.attachments.is_empty());
assert!(event.subject.is_none());
}
#[test]
fn test_multiple_to_addresses() {
let event = EmailReceivedEvent {
id: Uuid::new_v4(),
monitor_id: Uuid::new_v4(),
message_uid: 999,
message_id: Some("<multi@example.com>".to_string()),
from_address: "sender@example.com".to_string(),
to_addresses: vec![
"user1@example.com".to_string(),
"user2@example.com".to_string(),
"user3@example.com".to_string(),
],
subject: Some("Group Message".to_string()),
has_attachments: false,
attachments: vec![],
};
assert_eq!(event.to_addresses.len(), 3);
assert!(event
.to_addresses
.contains(&"user2@example.com".to_string()));
}
#[test]
fn test_multiple_attachments() {
let attachments = vec![
EmailAttachment {
filename: "doc1.pdf".to_string(),
mime_type: "application/pdf".to_string(),
size: 1024,
},
EmailAttachment {
filename: "image.png".to_string(),
mime_type: "image/png".to_string(),
size: 2048,
},
EmailAttachment {
filename: "data.xlsx".to_string(),
mime_type: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
.to_string(),
size: 4096,
},
];
assert_eq!(attachments.len(), 3);
assert_eq!(attachments[0].filename, "doc1.pdf");
assert_eq!(attachments[1].mime_type, "image/png");
assert_eq!(attachments[2].size, 4096);
}
}

View file

@ -1,8 +1,9 @@
use crate::basic::keywords::use_account::{get_account_credentials, AccountCredentials};
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::Utc;
use diesel::prelude::*;
use log::{error, trace};
use log::{error, info, trace};
use rhai::{Dynamic, Engine};
use serde_json::json;
use std::sync::Arc;
@ -25,7 +26,6 @@ pub fn send_mail_keyword(state: Arc<AppState>, user: UserSession, engine: &mut E
let body = context.eval_expression_tree(&inputs[2])?.to_string();
let attachments_input = context.eval_expression_tree(&inputs[3])?;
// Parse attachments array
let mut attachments = Vec::new();
if attachments_input.is_array() {
let arr = attachments_input.cast::<rhai::Array>();
@ -64,6 +64,7 @@ pub fn send_mail_keyword(state: Arc<AppState>, user: UserSession, engine: &mut E
&subject,
&body,
attachments,
None,
)
.await
});
@ -99,6 +100,83 @@ pub fn send_mail_keyword(state: Arc<AppState>, user: UserSession, engine: &mut E
)
.unwrap();
// SEND MAIL to, subject, body USING account
let state_clone2 = Arc::clone(&state);
let user_clone2 = user.clone();
engine
.register_custom_syntax(
&[
"SEND", "MAIL", "$expr$", ",", "$expr$", ",", "$expr$", "USING", "$expr$",
],
false,
move |context, inputs| {
let to = context.eval_expression_tree(&inputs[0])?.to_string();
let subject = context.eval_expression_tree(&inputs[1])?.to_string();
let body = context.eval_expression_tree(&inputs[2])?.to_string();
let using_account = context.eval_expression_tree(&inputs[3])?.to_string();
info!(
"SEND MAIL USING: to={}, subject={}, using={} for user={}",
to, subject, using_account, user_clone2.user_id
);
let state_for_task = Arc::clone(&state_clone2);
let user_for_task = user_clone2.clone();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build();
let send_err = if let Ok(rt) = rt {
let result = rt.block_on(async move {
execute_send_mail(
&state_for_task,
&user_for_task,
&to,
&subject,
&body,
vec![],
Some(using_account),
)
.await
});
tx.send(result).err()
} else {
tx.send(Err("Failed to build tokio runtime".to_string()))
.err()
};
if send_err.is_some() {
error!("Failed to send SEND MAIL USING result from thread");
}
});
match rx.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(Ok(message_id)) => Ok(Dynamic::from(message_id)),
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND MAIL USING failed: {}", e).into(),
rhai::Position::NONE,
))),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
"SEND MAIL USING timed out".into(),
rhai::Position::NONE,
)))
}
Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
format!("SEND MAIL USING thread failed: {}", e).into(),
rhai::Position::NONE,
))),
}
},
)
.unwrap();
// Register SEND TEMPLATE for bulk templated emails
let state_clone2 = Arc::clone(&state);
let user_clone2 = user.clone();
@ -194,13 +272,20 @@ async fn execute_send_mail(
subject: &str,
body: &str,
attachments: Vec<String>,
using_account: Option<String>,
) -> Result<String, String> {
let message_id = Uuid::new_v4().to_string();
// Track email in communication history
track_email(state, user, &message_id, to, subject, "sent").await?;
// Send the actual email if email feature is enabled
if let Some(account_email) = using_account {
let creds = get_account_credentials(&state.conn, &account_email, user.bot_id)
.await
.map_err(|e| format!("Failed to get account credentials: {}", e))?;
return send_via_connected_account(state, &creds, to, subject, body, attachments).await;
}
#[cfg(feature = "email")]
{
use crate::email::EmailService;
@ -225,12 +310,114 @@ async fn execute_send_mail(
}
}
// Fallback: store as draft if email sending fails
save_email_draft(state, user, to, subject, body, attachments).await?;
Ok(format!("Email saved as draft: {}", message_id))
}
async fn send_via_connected_account(
state: &AppState,
creds: &AccountCredentials,
to: &str,
subject: &str,
body: &str,
_attachments: Vec<String>,
) -> Result<String, String> {
let message_id = Uuid::new_v4().to_string();
match creds.provider.as_str() {
"gmail" | "google" => {
send_via_gmail(state, creds, to, subject, body).await?;
}
"outlook" | "microsoft" | "hotmail" => {
send_via_outlook(state, creds, to, subject, body).await?;
}
_ => {
return Err(format!("Unsupported email provider: {}", creds.provider));
}
}
info!("Email sent via {} account: {}", creds.provider, message_id);
Ok(format!("Email sent via {}: {}", creds.provider, message_id))
}
async fn send_via_gmail(
_state: &AppState,
creds: &AccountCredentials,
to: &str,
subject: &str,
body: &str,
) -> Result<(), String> {
let client = reqwest::Client::new();
let raw_message = format!(
"To: {}\r\nSubject: {}\r\nContent-Type: text/html; charset=utf-8\r\n\r\n{}",
to, subject, body
);
let encoded = base64::Engine::encode(
&base64::engine::general_purpose::URL_SAFE,
raw_message.as_bytes(),
);
let response = client
.post("https://gmail.googleapis.com/gmail/v1/users/me/messages/send")
.bearer_auth(&creds.access_token)
.json(&serde_json::json!({ "raw": encoded }))
.send()
.await
.map_err(|e| format!("Gmail API request failed: {}", e))?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(format!("Gmail API error: {}", error_text));
}
Ok(())
}
async fn send_via_outlook(
_state: &AppState,
creds: &AccountCredentials,
to: &str,
subject: &str,
body: &str,
) -> Result<(), String> {
let client = reqwest::Client::new();
let message = serde_json::json!({
"message": {
"subject": subject,
"body": {
"contentType": "HTML",
"content": body
},
"toRecipients": [
{
"emailAddress": {
"address": to
}
}
]
},
"saveToSentItems": "true"
});
let response = client
.post("https://graph.microsoft.com/v1.0/me/sendMail")
.bearer_auth(&creds.access_token)
.json(&message)
.send()
.await
.map_err(|e| format!("Microsoft Graph request failed: {}", e))?;
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(format!("Microsoft Graph error: {}", error_text));
}
Ok(())
}
async fn execute_send_template(
state: &AppState,
user: &UserSession,
@ -243,15 +430,12 @@ async fn execute_send_template(
let mut sent_count = 0;
for recipient in recipients {
// Personalize template for each recipient
let personalized_content =
apply_template_variables(&template_content, &variables, &recipient)?;
// Extract subject from template or use default
let subject = extract_template_subject(&personalized_content)
.unwrap_or_else(|| format!("Message from {}", user.user_id));
// Send email
if let Ok(_) = execute_send_mail(
state,
user,
@ -259,13 +443,13 @@ async fn execute_send_template(
&subject,
&personalized_content,
vec![],
None,
)
.await
{
sent_count += 1;
}
// Add small delay to avoid rate limiting
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

View file

@ -0,0 +1,253 @@
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use diesel::prelude::*;
use log::{error, info};
use rhai::{Dynamic, Engine, EvalAltResult};
use std::sync::Arc;
use uuid::Uuid;
#[derive(QueryableByName)]
#[allow(dead_code)]
struct AccountResult {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
email: String,
#[diesel(sql_type = diesel::sql_types::Text)]
provider: String,
#[diesel(sql_type = diesel::sql_types::Text)]
account_type: String,
}
#[derive(QueryableByName, Debug, Clone)]
pub struct ActiveAccountResult {
#[diesel(sql_type = diesel::sql_types::Uuid)]
pub account_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
pub email: String,
#[diesel(sql_type = diesel::sql_types::Text)]
pub provider: String,
#[diesel(sql_type = diesel::sql_types::Text)]
pub qdrant_collection: String,
}
pub fn register_use_account_keyword(
engine: &mut Engine,
state: Arc<AppState>,
session: Arc<UserSession>,
) -> Result<(), Box<EvalAltResult>> {
let state_clone = Arc::clone(&state);
let session_clone = Arc::clone(&session);
engine.register_custom_syntax(
&["USE", "ACCOUNT", "$expr$"],
true,
move |context, inputs| {
let email = context.eval_expression_tree(&inputs[0])?.to_string();
info!(
"USE ACCOUNT keyword executed - Email: {}, Session: {}",
email, session_clone.id
);
let session_id = session_clone.id;
let bot_id = session_clone.bot_id;
let user_id = session_clone.user_id;
let conn = state_clone.conn.clone();
let email_clone = email.clone();
let result = std::thread::spawn(move || {
add_account_to_session(conn, session_id, bot_id, user_id, &email_clone)
})
.join();
match result {
Ok(Ok(_)) => {
info!("Account '{}' added to session {}", email, session_clone.id);
Ok(Dynamic::UNIT)
}
Ok(Err(e)) => {
error!("Failed to add account '{}': {}", email, e);
Err(format!("USE_ACCOUNT failed: {}", e).into())
}
Err(e) => {
error!("Thread panic in USE_ACCOUNT: {:?}", e);
Err("USE_ACCOUNT failed: thread panic".into())
}
}
},
)?;
Ok(())
}
fn add_account_to_session(
conn_pool: crate::shared::utils::DbPool,
session_id: Uuid,
bot_id: Uuid,
user_id: Uuid,
email: &str,
) -> Result<(), String> {
let mut conn = conn_pool
.get()
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
let account: Option<AccountResult> = diesel::sql_query(
"SELECT id, email, provider, account_type FROM connected_accounts
WHERE email = $1 AND (bot_id = $2 OR user_id = $3) AND status = 'active'",
)
.bind::<diesel::sql_types::Text, _>(email)
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Uuid, _>(user_id)
.get_result(&mut conn)
.optional()
.map_err(|e| format!("Failed to query account: {}", e))?;
let account = match account {
Some(acc) => acc,
None => {
return Err(format!(
"Account '{}' not found or not configured. Add it in Sources app.",
email
));
}
};
let qdrant_collection = format!("account_{}_{}", account.provider, account.id);
let assoc_id = Uuid::new_v4();
diesel::sql_query(
"INSERT INTO session_account_associations
(id, session_id, bot_id, account_id, email, provider, qdrant_collection, is_active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (session_id, account_id)
DO UPDATE SET is_active = true, added_at = NOW()",
)
.bind::<diesel::sql_types::Uuid, _>(assoc_id)
.bind::<diesel::sql_types::Uuid, _>(session_id)
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Uuid, _>(account.id)
.bind::<diesel::sql_types::Text, _>(&account.email)
.bind::<diesel::sql_types::Text, _>(&account.provider)
.bind::<diesel::sql_types::Text, _>(&qdrant_collection)
.execute(&mut conn)
.map_err(|e| format!("Failed to add account association: {}", e))?;
info!(
"Added account '{}' ({}) to session {} (collection: {})",
email, account.provider, session_id, qdrant_collection
);
Ok(())
}
pub fn get_active_accounts_for_session(
conn_pool: &crate::shared::utils::DbPool,
session_id: Uuid,
) -> Result<Vec<ActiveAccountResult>, String> {
let mut conn = conn_pool
.get()
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
let results: Vec<ActiveAccountResult> = diesel::sql_query(
"SELECT account_id, email, provider, qdrant_collection
FROM session_account_associations
WHERE session_id = $1 AND is_active = true
ORDER BY added_at DESC",
)
.bind::<diesel::sql_types::Uuid, _>(session_id)
.load(&mut conn)
.map_err(|e| format!("Failed to get active accounts: {}", e))?;
Ok(results)
}
pub fn parse_account_path(path: &str) -> Option<(String, String)> {
if path.starts_with("account://") {
let rest = &path[10..];
if let Some(slash_pos) = rest.find('/') {
let email = &rest[..slash_pos];
let file_path = &rest[slash_pos + 1..];
return Some((email.to_string(), file_path.to_string()));
}
}
None
}
pub fn is_account_path(path: &str) -> bool {
path.starts_with("account://")
}
pub async fn get_account_credentials(
conn_pool: &crate::shared::utils::DbPool,
email: &str,
bot_id: Uuid,
) -> Result<AccountCredentials, String> {
let mut conn = conn_pool
.get()
.map_err(|e| format!("Failed to get DB connection: {}", e))?;
#[derive(QueryableByName)]
struct CredResult {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
provider: String,
#[diesel(sql_type = diesel::sql_types::Text)]
access_token: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
refresh_token: Option<String>,
}
let creds: CredResult = diesel::sql_query(
"SELECT id, provider, access_token, refresh_token
FROM connected_accounts
WHERE email = $1 AND bot_id = $2 AND status = 'active'",
)
.bind::<diesel::sql_types::Text, _>(email)
.bind::<diesel::sql_types::Uuid, _>(bot_id)
.get_result(&mut conn)
.map_err(|e| format!("Account not found: {}", e))?;
Ok(AccountCredentials {
account_id: creds.id,
provider: creds.provider,
access_token: creds.access_token,
refresh_token: creds.refresh_token,
})
}
#[derive(Debug, Clone)]
pub struct AccountCredentials {
pub account_id: Uuid,
pub provider: String,
pub access_token: String,
pub refresh_token: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_account_path() {
let result = parse_account_path("account://user@gmail.com/Documents/file.pdf");
assert!(result.is_some());
let (email, path) = result.unwrap();
assert_eq!(email, "user@gmail.com");
assert_eq!(path, "Documents/file.pdf");
}
#[test]
fn test_parse_account_path_invalid() {
assert!(parse_account_path("local/file.pdf").is_none());
assert!(parse_account_path("/absolute/path").is_none());
}
#[test]
fn test_is_account_path() {
assert!(is_account_path("account://user@gmail.com/file.pdf"));
assert!(!is_account_path("local/file.pdf"));
assert!(!is_account_path("file.pdf"));
}
}

View file

@ -62,6 +62,8 @@ use self::keywords::webhook::webhook_keyword;
use self::keywords::llm_keyword::llm_keyword;
use self::keywords::on::on_keyword;
use self::keywords::on_change::on_change_keyword;
use self::keywords::on_email::on_email_keyword;
use self::keywords::print::print_keyword;
use self::keywords::set::set_keyword;
use self::keywords::set_context::set_context_keyword;
@ -98,6 +100,8 @@ impl ScriptService {
wait_keyword(&state, user.clone(), &mut engine);
print_keyword(&state, user.clone(), &mut engine);
on_keyword(&state, user.clone(), &mut engine);
on_email_keyword(&state, user.clone(), &mut engine);
on_change_keyword(&state, user.clone(), &mut engine);
hear_keyword(state.clone(), user.clone(), &mut engine);
talk_keyword(state.clone(), user.clone(), &mut engine);
set_context_keyword(state.clone(), user.clone(), &mut engine);

View file

@ -60,6 +60,8 @@ pub enum TriggerKind {
TableInsert = 2,
TableDelete = 3,
Webhook = 4,
EmailReceived = 5,
FolderChange = 6,
}
impl TriggerKind {
@ -70,6 +72,8 @@ impl TriggerKind {
2 => Some(Self::TableInsert),
3 => Some(Self::TableDelete),
4 => Some(Self::Webhook),
5 => Some(Self::EmailReceived),
6 => Some(Self::FolderChange),
_ => None,
}
}
@ -271,7 +275,24 @@ mod tests {
#[test]
fn test_trigger_kind_conversion() {
assert_eq!(TriggerKind::from_i32(0), Some(TriggerKind::Scheduled));
assert_eq!(TriggerKind::from_i32(1), Some(TriggerKind::TableUpdate));
assert_eq!(TriggerKind::from_i32(2), Some(TriggerKind::TableInsert));
assert_eq!(TriggerKind::from_i32(3), Some(TriggerKind::TableDelete));
assert_eq!(TriggerKind::from_i32(4), Some(TriggerKind::Webhook));
assert_eq!(TriggerKind::from_i32(5), Some(TriggerKind::EmailReceived));
assert_eq!(TriggerKind::from_i32(6), Some(TriggerKind::FolderChange));
assert_eq!(TriggerKind::from_i32(99), None);
assert_eq!(TriggerKind::from_i32(-1), None);
}
#[test]
fn test_trigger_kind_as_i32() {
assert_eq!(TriggerKind::Scheduled as i32, 0);
assert_eq!(TriggerKind::TableUpdate as i32, 1);
assert_eq!(TriggerKind::TableInsert as i32, 2);
assert_eq!(TriggerKind::TableDelete as i32, 3);
assert_eq!(TriggerKind::Webhook as i32, 4);
assert_eq!(TriggerKind::EmailReceived as i32, 5);
assert_eq!(TriggerKind::FolderChange as i32, 6);
}
}