Update botserver

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-21 23:40:43 -03:00
parent dd91fc2d3b
commit a491cc13a6
37 changed files with 1744 additions and 1340 deletions

1
Cargo.lock generated
View file

@ -1179,6 +1179,7 @@ dependencies = [
"urlencoding", "urlencoding",
"uuid", "uuid",
"vaultrs", "vaultrs",
"walkdir",
"webpki-roots 0.25.4", "webpki-roots 0.25.4",
"x509-parser", "x509-parser",
"zip 2.4.2", "zip 2.4.2",

View file

@ -235,6 +235,7 @@ rss = "2.0"
# HTML parsing/web scraping # HTML parsing/web scraping
scraper = "0.25" scraper = "0.25"
walkdir = "2.5.0"
[dev-dependencies] [dev-dependencies]
mockito = "1.7.0" mockito = "1.7.0"
@ -257,6 +258,12 @@ unwrap_used = "warn"
expect_used = "warn" expect_used = "warn"
panic = "warn" panic = "warn"
todo = "warn" todo = "warn"
# Disabled: has false positives for functions with mut self, heap types (Vec, String)
missing_const_for_fn = "allow"
# Disabled: transitive dependencies we cannot control
multiple_crate_versions = "allow"
# Disabled: many keyword functions need owned types for Rhai integration
needless_pass_by_value = "allow"
[profile.release] [profile.release]
lto = true lto = true

876
PROMPT.md
View file

@ -5,283 +5,299 @@
--- ---
## Build Rules - IMPORTANT ## ZERO TOLERANCE POLICY
**Always use debug builds during development and testing:** **This project has the strictest code quality requirements possible:**
```toml
[lints.clippy]
all = "warn"
pedantic = "warn"
nursery = "warn"
cargo = "warn"
unwrap_used = "warn"
expect_used = "warn"
panic = "warn"
todo = "warn"
```
**EVERY SINGLE WARNING MUST BE FIXED. NO EXCEPTIONS.**
---
## ABSOLUTE PROHIBITIONS
```
❌ NEVER use #![allow()] or #[allow()] in source code to silence warnings
❌ NEVER use _ prefix for unused variables - DELETE the variable or USE it
❌ NEVER use .unwrap() - use ? or proper error handling
❌ NEVER use .expect() - use ? or proper error handling
❌ NEVER use panic!() or unreachable!() - handle all cases
❌ NEVER use todo!() or unimplemented!() - write real code
❌ NEVER leave unused imports - DELETE them
❌ NEVER leave dead code - DELETE it or IMPLEMENT it
❌ NEVER use approximate constants (3.14159) - use std::f64::consts::PI
❌ NEVER silence clippy in code - FIX THE CODE or configure in Cargo.toml
❌ NEVER add comments explaining what code does - code must be self-documenting
❌ NEVER use CDN links - all assets must be local
```
---
## CARGO.TOML LINT EXCEPTIONS
When a clippy lint has **technical false positives** that cannot be fixed in code,
disable it in `Cargo.toml` with a comment explaining why:
```toml
[lints.clippy]
# Disabled: has false positives for functions with mut self, heap types (Vec, String)
missing_const_for_fn = "allow"
# Disabled: Tauri commands require owned types (Window) that cannot be passed by reference
needless_pass_by_value = "allow"
# Disabled: transitive dependencies we cannot control
multiple_crate_versions = "allow"
```
**Approved exceptions:**
- `missing_const_for_fn` - false positives for `mut self`, heap types
- `needless_pass_by_value` - Tauri/framework requirements
- `multiple_crate_versions` - transitive dependencies
- `future_not_send` - when async traits require non-Send futures
---
## MANDATORY CODE PATTERNS
### Error Handling - Use `?` Operator
```rust
// ❌ WRONG
let value = something.unwrap();
let value = something.expect("msg");
// ✅ CORRECT
let value = something?;
let value = something.ok_or_else(|| Error::NotFound)?;
```
### Option Handling - Use Combinators
```rust
// ❌ WRONG
if let Some(x) = opt {
x
} else {
default
}
// ✅ CORRECT
opt.unwrap_or(default)
opt.unwrap_or_else(|| compute_default())
opt.map_or(default, |x| transform(x))
```
### Match Arms - Must Be Different
```rust
// ❌ WRONG - identical arms
match x {
A => do_thing(),
B => do_thing(),
C => other(),
}
// ✅ CORRECT - combine identical arms
match x {
A | B => do_thing(),
C => other(),
}
```
### Self Usage in Impl Blocks
```rust
// ❌ WRONG
impl MyStruct {
fn new() -> MyStruct { MyStruct { } }
}
// ✅ CORRECT
impl MyStruct {
fn new() -> Self { Self { } }
}
```
### Format Strings - Inline Variables
```rust
// ❌ WRONG
format!("Hello {}", name)
log::info!("Processing {}", id);
// ✅ CORRECT
format!("Hello {name}")
log::info!("Processing {id}");
```
### Display vs ToString
```rust
// ❌ WRONG
impl ToString for MyType {
fn to_string(&self) -> String { }
}
// ✅ CORRECT
impl std::fmt::Display for MyType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { }
}
```
### Derive Eq with PartialEq
```rust
// ❌ WRONG
#[derive(PartialEq)]
struct MyStruct { }
// ✅ CORRECT
#[derive(PartialEq, Eq)]
struct MyStruct { }
```
### Must Use Attributes
```rust
// ❌ WRONG - pure function without #[must_use]
pub fn calculate() -> i32 { }
// ✅ CORRECT
#[must_use]
pub fn calculate() -> i32 { }
```
### Const Functions
```rust
// ❌ WRONG - could be const but isn't
pub fn default_value() -> i32 { 42 }
// ✅ CORRECT
pub const fn default_value() -> i32 { 42 }
```
### Pass by Reference
```rust
// ❌ WRONG - needless pass by value
fn process(data: String) { println!("{data}"); }
// ✅ CORRECT
fn process(data: &str) { println!("{data}"); }
```
### Clone Only When Needed
```rust
// ❌ WRONG - redundant clone
let x = value.clone();
use_value(&x);
// ✅ CORRECT
use_value(&value);
```
### Mathematical Constants
```rust
// ❌ WRONG
let pi = 3.14159;
let e = 2.71828;
// ✅ CORRECT
use std::f64::consts::{PI, E};
let pi = PI;
let e = E;
```
### Async Functions
```rust
// ❌ WRONG - async without await
async fn process() { sync_operation(); }
// ✅ CORRECT - remove async if no await needed
fn process() { sync_operation(); }
```
---
## Build Rules
```bash ```bash
# CORRECT - debug build (fast compilation) # Development - ALWAYS debug build
cargo build cargo build
cargo check cargo check
# WRONG - do NOT use release builds unless explicitly requested # NEVER use release unless deploying
# cargo build --release # cargo build --release # NO!
```
Debug builds compile much faster and are sufficient for testing functionality.
Only use `--release` when building final binaries for deployment.
---
## Weekly Maintenance - EVERY MONDAY
### Package Review Checklist
**Every Monday, review the following:**
1. **Dependency Updates**
```bash
cargo outdated
cargo audit
```
2. **Package Consolidation Opportunities**
- Check if new crates can replace custom code
- Look for crates that combine multiple dependencies
- Review `Cargo.toml` for redundant dependencies
3. **Code Reduction Candidates**
- Custom implementations that now have crate equivalents
- Boilerplate that can be replaced with derive macros
- Manual serialization that `serde` can handle
4. **Security Updates**
```bash
cargo audit fix
```
### Packages to Watch
| Area | Potential Packages | Purpose |
|------|-------------------|---------|
| Validation | `validator` | Replace manual validation |
| Date/Time | `chrono`, `time` | Consolidate time handling |
| Email | `lettre` | Simplify email sending |
| File Watching | `notify` | Replace polling with events |
| Background Jobs | `tokio-cron-scheduler` | Simplify scheduling |
---
## Version Management - CRITICAL
**Current version is 6.1.0 - DO NOT CHANGE without explicit approval!**
```bash
# Check current version
grep "^version" Cargo.toml
```
### Rules
1. **Version is 6.1.0 across ALL workspace crates**
2. **NEVER change version without explicit user approval**
3. **All migrations use 6.1.0_* prefix**
4. **Migration folder naming: `6.1.0_{feature_name}/`**
---
## Database Standards - CRITICAL
### TABLES AND INDEXES ONLY
**NEVER create in migrations:**
- ❌ Views (`CREATE VIEW`)
- ❌ Triggers (`CREATE TRIGGER`)
- ❌ Functions (`CREATE FUNCTION`)
- ❌ Stored Procedures
**ALWAYS use:**
- ✅ Tables (`CREATE TABLE IF NOT EXISTS`)
- ✅ Indexes (`CREATE INDEX IF NOT EXISTS`)
- ✅ Constraints (inline in table definitions)
### Why?
- Diesel ORM compatibility
- Simpler rollbacks
- Better portability
- Easier testing
### JSON Storage Pattern
Use TEXT columns with `_json` suffix instead of JSONB:
```sql
-- CORRECT
members_json TEXT DEFAULT '[]'
-- WRONG
members JSONB DEFAULT '[]'::jsonb
``` ```
--- ---
## Official Icons - MANDATORY ## Version Management
**NEVER generate icons with LLM. ALWAYS use official SVG icons from assets.** **Version is 6.1.0 - NEVER CHANGE without explicit approval**
Icons are stored in two locations (kept in sync):
- `botui/ui/suite/assets/icons/` - Runtime icons for UI
- `botbook/src/assets/icons/` - Documentation icons
### Available Icons
| Icon | File | Usage |
|------|------|-------|
| Logo | `gb-logo.svg` | Main GB branding |
| Bot | `gb-bot.svg` | Bot/assistant representation |
| Analytics | `gb-analytics.svg` | Charts, metrics, dashboards |
| Calendar | `gb-calendar.svg` | Scheduling, events |
| Chat | `gb-chat.svg` | Conversations, messaging |
| Compliance | `gb-compliance.svg` | Security, auditing |
| Designer | `gb-designer.svg` | Workflow automation |
| Drive | `gb-drive.svg` | File storage, documents |
| Mail | `gb-mail.svg` | Email functionality |
| Meet | `gb-meet.svg` | Video conferencing |
| Paper | `gb-paper.svg` | Document editing |
| Research | `gb-research.svg` | Search, investigation |
| Sources | `gb-sources.svg` | Knowledge bases |
| Tasks | `gb-tasks.svg` | Task management |
### Icon Guidelines
- All icons use `stroke="currentColor"` for CSS theming
- ViewBox: `0 0 24 24`
- Stroke width: `1.5`
- Rounded line caps and joins
**DO NOT:**
- Generate new icons with AI/LLM
- Use emoji or unicode symbols as icons
- Use external icon libraries
- Create inline SVG content
--- ---
## Project Overview ## Database Standards
botserver is the core backend for General Bots - an open-source conversational AI platform built in Rust. It provides: **TABLES AND INDEXES ONLY:**
- **Bootstrap System**: Auto-installs PostgreSQL, MinIO, Redis, LLM servers
- **Package Manager**: Manages bot deployments and service lifecycle
- **BASIC Interpreter**: Executes conversation scripts via Rhai
- **Multi-Channel Support**: Web, WhatsApp, Teams, Email
- **Knowledge Base**: Document ingestion with vector search
### Workspace Structure
``` ```
botserver/ # Main server (this project) ✅ CREATE TABLE IF NOT EXISTS
botlib/ # Shared library - types, utilities, HTTP client ✅ CREATE INDEX IF NOT EXISTS
botui/ # Web/Desktop UI (Axum + Tauri) ✅ Inline constraints
botapp/ # Desktop app wrapper (Tauri)
botbook/ # Documentation (mdBook) ❌ CREATE VIEW
botmodels/ # Data models visualization ❌ CREATE TRIGGER
botplugin/ # Browser extension ❌ CREATE FUNCTION
❌ Stored Procedures
``` ```
--- **JSON Columns:** Use TEXT with `_json` suffix, not JSONB
## Database Migrations
### Creating New Migrations
```bash
# 1. Version is always 6.1.0
# 2. List existing migrations
ls -la migrations/
# 3. Create new migration folder
mkdir migrations/6.1.0_my_feature
# 4. Create up.sql and down.sql (TABLES AND INDEXES ONLY)
```
### Migration Structure
```
migrations/
├── 6.0.0_initial_schema/
├── 6.0.1_bot_memories/
├── ...
├── 6.1.0_enterprise_features/
│ ├── up.sql
│ └── down.sql
└── 6.1.0_next_feature/ # YOUR NEW MIGRATION
├── up.sql
└── down.sql
```
### Migration Best Practices
- Use `IF NOT EXISTS` for all CREATE TABLE statements
- Use `IF EXISTS` for all DROP statements in down.sql
- Always create indexes for foreign keys
- **NO triggers** - handle updated_at in application code
- **NO views** - use queries in application code
- **NO functions** - use application logic
- Use TEXT with `_json` suffix for JSON data (not JSONB)
---
## LLM Workflow Strategy
### Two Types of LLM Work
1. **Execution Mode (Fazer)**
- Pre-annotate phrases and send for execution
- Focus on automation freedom
- Less concerned with code details
- Primary concern: Is the LLM destroying something?
- Trust but verify output doesn't break existing functionality
2. **Review Mode (Conferir)**
- Read generated code with full attention
- Line-by-line verification
- Check for correctness, security, performance
- Validate against requirements
### Development Process
1. **One requirement at a time** with sequential commits
2. **Start with docs** - explain user behavior before coding
3. **Design first** - spend time on architecture
4. **On unresolved error** - stop and consult with web search enabled
### LLM Fallback Strategy (After 3 attempts / 10 minutes)
1. DeepSeek-V3-0324 (good architect, reliable)
2. gpt-5-chat (slower but thorough)
3. gpt-oss-120b (final validation)
4. Claude Web (for complex debugging, unit tests, UI)
--- ---
## Code Generation Rules ## Code Generation Rules
### CRITICAL REQUIREMENTS
``` ```
- KISS, NO TALK, SECURED ENTERPRISE GRADE THREAD SAFE CODE ONLY - KISS, NO TALK, SECURED ENTERPRISE GRADE THREAD SAFE CODE ONLY
- Use rustc 1.90.0 (1159e78c4 2025-09-14) - Use rustc 1.90.0+
- No placeholders, never comment/uncomment code, no explanations - No placeholders, no explanations, no comments
- All code must be complete, professional, production-ready - All code must be complete, professional, production-ready
- REMOVE ALL COMMENTS FROM GENERATED CODE - REMOVE ALL COMMENTS FROM GENERATED CODE
- Always include full updated code files - never partial - Always include full updated code files - never partial
- Only return files that have actual changes - Only return files that have actual changes
- DO NOT WRITE ERROR HANDLING CODE - LET IT CRASH - Return 0 warnings - FIX ALL CLIPPY WARNINGS
- Return 0 warnings - review unused imports! - NO DEAD CODE - implement real functionality
- NO DEAD CODE - implement real functionality, never use _ for unused
``` ```
### Documentation Rules ---
## Documentation Rules
``` ```
- Rust code examples ONLY in docs/reference/architecture.md (gbapp chapter) - Rust code examples ONLY in docs/reference/architecture.md
- All other docs: BASIC, bash, JSON, SQL, YAML only - All other docs: BASIC, bash, JSON, SQL, YAML only
- Scan for ALL_CAPS.md files created at wrong places - delete or integrate to docs/
- Keep only README.md and PROMPT.md at project root level - Keep only README.md and PROMPT.md at project root level
``` ```
### Frontend Rules ---
## Frontend Rules
``` ```
- Use HTMX to minimize JavaScript - delegate logic to Rust server - Use HTMX to minimize JavaScript - delegate logic to Rust server
@ -290,237 +306,47 @@ migrations/
- Endpoints return HTML fragments, not JSON (for HTMX) - Endpoints return HTML fragments, not JSON (for HTMX)
``` ```
### Rust Patterns ---
## Rust Patterns
```rust ```rust
// Use rand::rng() instead of rand::thread_rng() // Random number generation
let mut rng = rand::rng(); let mut rng = rand::rng();
// Use diesel for database (NOT sqlx) // Database - ONLY diesel, never sqlx
use diesel::prelude::*; use diesel::prelude::*;
// All config from AppConfig - no hardcoded values // Config from AppConfig - no hardcoded values
let url = config.drive.endpoint.clone(); let url = config.drive.endpoint.clone();
// Logging (all-in-one-line, unique messages) // Logging - all-in-one-line, unique messages, inline vars
info!("Processing request id={} user={}", req_id, user_id); info!("Processing request id={id} user={user_id}");
``` ```
### Dependency Management ---
## Dependencies
``` ```
- Use diesel - remove any sqlx references - Use diesel - remove any sqlx references
- After adding to Cargo.toml: cargo audit must show 0 warnings - After adding to Cargo.toml: cargo audit must show 0 vulnerabilities
- If audit fails, find alternative library - If audit fails, find alternative library
- Minimize redundancy - check existing libs before adding new ones - Minimize redundancy - check existing libs before adding
- Review src/ to identify reusable patterns and libraries
```
### botserver Specifics
```
- Sessions MUST be retrieved by id when session_id is present
- Never suggest installing software - bootstrap/package_manager handles it
- Configuration stored in .gbot/config and database bot_configuration table
- Pay attention to shared::utils and shared::models for reuse
``` ```
--- ---
## Documentation Validation ## Key Files
### Chapter Validation Process
For each documentation chapter:
1. Read the chapter instructions step by step
2. Check if source code accomplishes each instruction
3. Verify paths exist and are correct
4. Ensure 100% alignment between docs and implementation
5. Fix either docs or code to match
### Documentation Structure
``` ```
docs/ src/main.rs # Entry point
├── api/ # API documentation (no Rust code) src/lib.rs # Module exports
├── guides/ # How-to guides (no Rust code) src/basic/ # BASIC language keywords
└── reference/ src/core/ # Core functionality
├── architecture.md # ONLY place for Rust code examples src/shared/state.rs # AppState definition
├── basic-language.md # BASIC only src/shared/utils.rs # Utility functions
└── configuration.md # Config examples only src/shared/models.rs # Database models
```
---
## Adding New Features
### Adding a Rhai Keyword
```rust
pub fn my_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone();
engine.register_custom_syntax(
["MY", "KEYWORD", "$expr$"],
true,
{
let db = db.clone();
move |context, inputs| {
let value = context.eval_expression_tree(&inputs[0])?;
let binding = db.as_ref().unwrap();
let fut = execute_my_keyword(binding, value);
let result = tokio::task::block_in_place(||
tokio::runtime::Handle::current().block_on(fut))
.map_err(|e| format!("DB error: {}", e))?;
Ok(Dynamic::from(result))
}
}
).unwrap();
}
pub async fn execute_my_keyword(
pool: &PgPool,
value: String,
) -> Result<Value, Box<dyn std::error::Error>> {
info!("Executing my_keyword value={}", value);
use diesel::prelude::*;
let result = diesel::insert_into(my_table::table)
.values(&NewRecord { value })
.execute(pool)?;
Ok(json!({ "rows_affected": result }))
}
```
### Adding a Data Model
```rust
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Queryable, Selectable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = crate::schema::users)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct User {
pub id: Uuid,
pub status: i16,
pub email: String,
pub age: Option<i16>,
pub metadata: Vec<u8>,
pub created_at: DateTime<Utc>,
}
```
### Adding a Service/Endpoint (HTMX Pattern)
```rust
use axum::{routing::get, Router, extract::State, response::Html};
use askama::Template;
#[derive(Template)]
#[template(path = "partials/items.html")]
struct ItemsTemplate {
items: Vec<Item>,
}
pub fn configure() -> Router<AppState> {
Router::new()
.route("/api/items", get(list_handler))
}
async fn list_handler(
State(state): State<Arc<AppState>>,
) -> Html<String> {
let conn = state.conn.get().unwrap();
let items = items::table.load::<Item>(&conn).unwrap();
let template = ItemsTemplate { items };
Html(template.render().unwrap())
}
```
---
## Final Steps Before Commit
```bash
# Check for warnings
cargo check 2>&1 | grep warning
# Audit dependencies (must be 0 warnings)
cargo audit
# Build release
cargo build --release
# Run tests
cargo test
# Verify no dead code with _ prefixes
grep -r "let _" src/ --include="*.rs"
# Verify version is 6.1.0
grep "^version" Cargo.toml | grep "6.1.0"
# Verify no views/triggers/functions in migrations
grep -r "CREATE VIEW\|CREATE TRIGGER\|CREATE FUNCTION" migrations/
```
### Pre-Commit Checklist
1. Version is 6.1.0 in all workspace Cargo.toml files
2. No views, triggers, or functions in migrations
3. All JSON columns use TEXT with `_json` suffix
---
## Output Format
### Shell Script Format
```sh
#!/bin/bash
cat > src/module/file.rs << 'EOF'
use std::io;
pub fn my_function() -> Result<(), io::Error> {
Ok(())
}
EOF
```
### Rules
- Only return MODIFIED files
- Never return unchanged files
- Use `cat > path << 'EOF'` format
- Include complete file content
- No partial snippets
---
## Key Files Reference
```
src/main.rs # Entry point, bootstrap, Axum server
src/lib.rs # Module exports, feature gates
src/core/
bootstrap/mod.rs # Auto-install services
session/mod.rs # Session management
bot/mod.rs # Bot orchestration
config/mod.rs # Configuration management
package_manager/ # Service lifecycle
src/basic/ # BASIC/Rhai interpreter
src/shared/
state.rs # AppState definition
utils.rs # Utility functions
models.rs # Database models
``` ```
--- ---
@ -530,7 +356,7 @@ src/shared/
| Library | Version | Purpose | | Library | Version | Purpose |
|---------|---------|---------| |---------|---------|---------|
| axum | 0.7.5 | Web framework | | axum | 0.7.5 | Web framework |
| diesel | 2.1 | PostgreSQL ORM (NOT sqlx) | | diesel | 2.1 | PostgreSQL ORM |
| tokio | 1.41 | Async runtime | | tokio | 1.41 | Async runtime |
| rhai | git | BASIC scripting | | rhai | git | BASIC scripting |
| reqwest | 0.12 | HTTP client | | reqwest | 0.12 | HTTP client |
@ -541,138 +367,58 @@ src/shared/
## Remember ## Remember
- **Two LLM modes**: Execution (fazer) vs Review (conferir) - **ZERO WARNINGS** - Every clippy warning must be fixed
- **Rust code**: Only in architecture.md documentation - **NO ALLOW IN CODE** - Never use #[allow()] in source files
- **HTMX**: Minimize JS, delegate to server - **CARGO.TOML EXCEPTIONS OK** - Disable lints with false positives in Cargo.toml with comment
- **Local assets**: No CDN, all vendor files local - **NO DEAD CODE** - Delete unused code, never prefix with _
- **Dead code**: Never use _ prefix, implement real code - **NO UNWRAP/EXPECT** - Use ? operator or proper error handling
- **cargo audit**: Must pass with 0 warnings - **NO APPROXIMATE CONSTANTS** - Use std::f64::consts
- **INLINE FORMAT ARGS** - format!("{name}") not format!("{}", name)
- **USE SELF** - In impl blocks, use Self not the type name
- **DERIVE EQ** - Always derive Eq with PartialEq
- **DISPLAY NOT TOSTRING** - Implement Display, not ToString
- **USE DIAGNOSTICS** - Use IDE diagnostics tool, never call cargo clippy directly
- **PASS BY REF** - Don't clone unnecessarily
- **CONST FN** - Make functions const when possible
- **MUST USE** - Add #[must_use] to pure functions
- **diesel**: No sqlx references - **diesel**: No sqlx references
- **Sessions**: Always retrieve by ID when present - **Sessions**: Always retrieve by ID when present
- **Config**: Never hardcode values, use AppConfig - **Config**: Never hardcode values, use AppConfig
- **Bootstrap**: Never suggest manual installation - **Bootstrap**: Never suggest manual installation
- **Warnings**: Target zero warnings before commit - **Version**: Always 6.1.0 - do not change
- **Version**: Always 6.1.0 - do not change without approval - **Migrations**: TABLES AND INDEXES ONLY
- **Migrations**: TABLES AND INDEXES ONLY - no views, triggers, functions - **JSON**: Use TEXT columns with `_json` suffix
- **Stalwart**: Use Stalwart IMAP/JMAP API for email features (sieve, filters, etc.) - **Session Continuation**: When running out of context, create detailed summary: (1) what was done, (2) what remains, (3) specific files and line numbers, (4) exact next steps.
- **JSON**: Use TEXT columns with `_json` suffix, not JSONB
--- ---
## Monitor Keywords (ON EMAIL, ON CHANGE) ## Monitor Keywords (ON EMAIL, ON CHANGE)
These keywords register event-driven monitors similar to `SET SCHEDULER`, but triggered by external events. ### ON EMAIL
### ON EMAIL - Email Monitoring
Triggers a script when an email arrives at the specified address.
```basic ```basic
' Basic usage - trigger on any email to address
ON EMAIL "support@company.com" ON EMAIL "support@company.com"
email = GET LAST "email_received_events" email = GET LAST "email_received_events"
TALK "New email from " + email.from_address + ": " + email.subject TALK "New email from " + email.from_address
END ON
' With FROM filter - only trigger for specific sender
ON EMAIL "orders@company.com" FROM "supplier@vendor.com"
' Process supplier orders
END ON
' With SUBJECT filter - only trigger for matching subjects
ON EMAIL "alerts@company.com" SUBJECT "URGENT"
' Handle urgent alerts
END ON END ON
``` ```
**Database Tables:** ### ON CHANGE
- `email_monitors` - Configuration for email monitoring
- `email_received_events` - Log of received emails to process
**TriggerKind:** `EmailReceived = 5`
### ON CHANGE - Folder Monitoring
Triggers a script when files change in cloud storage folders (GDrive, OneDrive, Dropbox) or local filesystem.
**Uses same `account://` syntax as COPY, MOVE, and other file operations.**
```basic ```basic
' Using account:// syntax (recommended) - auto-detects provider from email ON CHANGE "gdrive://myaccount/folder"
ON CHANGE "account://user@gmail.com/Documents/invoices" files = GET LAST "folder_change_events"
file = GET LAST "folder_change_events" FOR EACH file IN files
TALK "File changed: " + file.file_name + " (" + file.event_type + ")" TALK "File changed: " + file.name
END ON NEXT
' OneDrive via account://
ON CHANGE "account://user@outlook.com/Business/contracts"
' Process OneDrive changes
END ON
' Direct provider syntax (without account)
ON CHANGE "gdrive:///shared/reports"
' Process Google Drive changes (requires USE ACCOUNT first)
END ON
ON CHANGE "onedrive:///documents"
' Process OneDrive changes
END ON
ON CHANGE "dropbox:///team/assets"
' Process Dropbox changes
END ON
' Local filesystem monitoring
ON CHANGE "/var/uploads/incoming"
' Process local filesystem changes
END ON
' With specific event types filter
ON CHANGE "account://user@gmail.com/uploads" EVENTS "create, modify"
' Only trigger on create and modify, ignore delete
END ON
' Watch for deletions only
ON CHANGE "gdrive:///archive" EVENTS "delete"
' Log when files are removed from archive
END ON END ON
``` ```
**Path Syntax:** **TriggerKind Enum:**
- `account://email@domain.com/path` - Uses connected account (auto-detects provider) - Scheduled = 0
- `gdrive:///path` - Google Drive direct - TableUpdate = 1
- `onedrive:///path` - OneDrive direct - TableInsert = 2
- `dropbox:///path` - Dropbox direct - TableDelete = 3
- `/local/path` - Local filesystem - Webhook = 4
- EmailReceived = 5
**Provider Auto-Detection (from email):** - FolderChange = 6
- `@gmail.com`, `@google.com` → Google Drive
- `@outlook.com`, `@hotmail.com`, `@live.com` → OneDrive
- Other emails → Default to Google Drive
**Event Types:**
- `create` - New file created
- `modify` - File content changed
- `delete` - File deleted
- `rename` - File renamed
- `move` - File moved to different folder
**Database Tables:**
- `folder_monitors` - Configuration for folder monitoring
- `folder_change_events` - Log of detected changes to process
**TriggerKind:** `FolderChange = 6`
### TriggerKind Enum Values
```rust
pub enum TriggerKind {
Scheduled = 0, // SET SCHEDULER
TableUpdate = 1, // ON UPDATE OF "table"
TableInsert = 2, // ON INSERT OF "table"
TableDelete = 3, // ON DELETE OF "table"
Webhook = 4, // WEBHOOK
EmailReceived = 5, // ON EMAIL
FolderChange = 6, // ON CHANGE
}
```

View file

@ -186,10 +186,11 @@ DROP TABLE IF EXISTS calendar_shares;
DROP TABLE IF EXISTS calendar_resource_bookings; DROP TABLE IF EXISTS calendar_resource_bookings;
DROP TABLE IF EXISTS calendar_resources; DROP TABLE IF EXISTS calendar_resources;
-- Drop task tables -- Drop task tables (order matters due to foreign keys)
DROP TABLE IF EXISTS task_recurrence; DROP TABLE IF EXISTS task_recurrence;
DROP TABLE IF EXISTS task_time_entries; DROP TABLE IF EXISTS task_time_entries;
DROP TABLE IF EXISTS task_dependencies; DROP TABLE IF EXISTS task_dependencies;
DROP TABLE IF EXISTS tasks;
-- Drop collaboration tables -- Drop collaboration tables
DROP TABLE IF EXISTS document_presence; DROP TABLE IF EXISTS document_presence;

View file

@ -487,11 +487,41 @@ CREATE INDEX IF NOT EXISTS idx_document_presence_doc ON document_presence(docume
-- TASK ENTERPRISE FEATURES -- TASK ENTERPRISE FEATURES
-- ============================================================================ -- ============================================================================
-- Core tasks table
CREATE TABLE IF NOT EXISTS tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title TEXT NOT NULL,
description TEXT,
status TEXT NOT NULL DEFAULT 'todo',
priority TEXT NOT NULL DEFAULT 'medium',
assignee_id UUID REFERENCES users(id) ON DELETE SET NULL,
reporter_id UUID REFERENCES users(id) ON DELETE SET NULL,
project_id UUID,
due_date TIMESTAMPTZ,
tags TEXT[] DEFAULT '{}',
dependencies UUID[] DEFAULT '{}',
estimated_hours FLOAT8,
actual_hours FLOAT8,
progress INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
CONSTRAINT check_task_status CHECK (status IN ('todo', 'in_progress', 'review', 'blocked', 'on_hold', 'done', 'completed', 'cancelled')),
CONSTRAINT check_task_priority CHECK (priority IN ('low', 'medium', 'high', 'urgent'))
);
CREATE INDEX IF NOT EXISTS idx_tasks_assignee ON tasks(assignee_id);
CREATE INDEX IF NOT EXISTS idx_tasks_reporter ON tasks(reporter_id);
CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks(project_id);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
CREATE INDEX IF NOT EXISTS idx_tasks_due_date ON tasks(due_date);
CREATE INDEX IF NOT EXISTS idx_tasks_created ON tasks(created_at);
-- Task dependencies -- Task dependencies
CREATE TABLE IF NOT EXISTS task_dependencies ( CREATE TABLE IF NOT EXISTS task_dependencies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL, task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
depends_on_task_id UUID NOT NULL, depends_on_task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
dependency_type VARCHAR(20) DEFAULT 'finish_to_start', dependency_type VARCHAR(20) DEFAULT 'finish_to_start',
lag_days INTEGER DEFAULT 0, lag_days INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(), created_at TIMESTAMPTZ DEFAULT NOW(),
@ -505,7 +535,7 @@ CREATE INDEX IF NOT EXISTS idx_task_dependencies_depends ON task_dependencies(de
-- Task time tracking -- Task time tracking
CREATE TABLE IF NOT EXISTS task_time_entries ( CREATE TABLE IF NOT EXISTS task_time_entries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL, task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
description TEXT, description TEXT,
started_at TIMESTAMPTZ NOT NULL, started_at TIMESTAMPTZ NOT NULL,
@ -521,7 +551,7 @@ CREATE INDEX IF NOT EXISTS idx_task_time_user ON task_time_entries(user_id, star
-- Task recurring rules -- Task recurring rules
CREATE TABLE IF NOT EXISTS task_recurrence ( CREATE TABLE IF NOT EXISTS task_recurrence (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_template_id UUID NOT NULL, task_template_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
recurrence_pattern VARCHAR(20) NOT NULL, recurrence_pattern VARCHAR(20) NOT NULL,
interval_value INTEGER DEFAULT 1, interval_value INTEGER DEFAULT 1,
days_of_week_json TEXT, days_of_week_json TEXT,

View file

@ -65,6 +65,7 @@ use axum::{
routing::{get, post}, routing::{get, post},
Json, Router, Json, Router,
}; };
use botlib::MessageType;
use chrono::Utc; use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
@ -228,11 +229,13 @@ pub async fn attendant_respond(
user_id: recipient.to_string(), user_id: recipient.to_string(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: request.message.clone(), content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, message_type: botlib::MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
match adapter.send_message(response).await { match adapter.send_message(response).await {
@ -274,11 +277,13 @@ pub async fn attendant_respond(
user_id: session.user_id.to_string(), user_id: session.user_id.to_string(),
channel: channel.to_string(), channel: channel.to_string(),
content: request.message.clone(), content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, message_type: botlib::MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
tx.send(response).await.is_ok() tx.send(response).await.is_ok()
} else { } else {
@ -333,8 +338,11 @@ async fn save_message_to_history(
.values(( .values((
message_history::id.eq(Uuid::new_v4()), message_history::id.eq(Uuid::new_v4()),
message_history::session_id.eq(session_id), message_history::session_id.eq(session_id),
message_history::role.eq(sender_clone), message_history::user_id.eq(session_id),
message_history::content.eq(content_clone), message_history::role.eq(if sender_clone == "user" { 1 } else { 2 }),
message_history::content_encrypted.eq(content_clone),
message_history::message_type.eq(1),
message_history::message_index.eq(0i64),
message_history::created_at.eq(diesel::dsl::now), message_history::created_at.eq(diesel::dsl::now),
)) ))
.execute(&mut db_conn) .execute(&mut db_conn)
@ -626,12 +634,13 @@ async fn handle_attendant_message(
user_id: phone.to_string(), user_id: phone.to_string(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: content.to_string(), content: content.to_string(),
message_type: message_type: botlib::MessageType::BOT_RESPONSE,
crate::shared::models::message_types::MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
let _ = adapter.send_message(response).await; let _ = adapter.send_message(response).await;
} }

View file

@ -29,7 +29,7 @@
use log::{trace, warn}; use log::{trace, warn};
use std::collections::HashSet; use std::collections::HashSet;
/// Represents a labeled block of code /// Represents a labeled block of code
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct LabeledBlock { struct LabeledBlock {

View file

@ -16,11 +16,11 @@ use diesel::prelude::*;
use log::{info, trace}; use log::{info, trace};
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
/// Bot trigger types #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TriggerType { pub enum TriggerType {
Keyword, Keyword,
Tool, Tool,
@ -32,29 +32,28 @@ pub enum TriggerType {
impl From<String> for TriggerType { impl From<String> for TriggerType {
fn from(s: String) -> Self { fn from(s: String) -> Self {
match s.to_lowercase().as_str() { match s.to_lowercase().as_str() {
"keyword" => TriggerType::Keyword, "keyword" => Self::Keyword,
"tool" => TriggerType::Tool, "tool" => Self::Tool,
"schedule" => TriggerType::Schedule, "schedule" => Self::Schedule,
"event" => TriggerType::Event, "event" => Self::Event,
"always" => TriggerType::Always, "always" => Self::Always,
_ => TriggerType::Keyword, _ => Self::Keyword,
} }
} }
} }
impl ToString for TriggerType { impl fmt::Display for TriggerType {
fn to_string(&self) -> String { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
TriggerType::Keyword => "keyword".to_string(), Self::Keyword => write!(f, "keyword"),
TriggerType::Tool => "tool".to_string(), Self::Tool => write!(f, "tool"),
TriggerType::Schedule => "schedule".to_string(), Self::Schedule => write!(f, "schedule"),
TriggerType::Event => "event".to_string(), Self::Event => write!(f, "event"),
TriggerType::Always => "always".to_string(), Self::Always => write!(f, "always"),
} }
} }
} }
/// Bot trigger configuration
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BotTrigger { pub struct BotTrigger {
pub trigger_type: TriggerType, pub trigger_type: TriggerType,
@ -65,6 +64,7 @@ pub struct BotTrigger {
} }
impl BotTrigger { impl BotTrigger {
#[must_use]
pub fn from_keywords(keywords: Vec<String>) -> Self { pub fn from_keywords(keywords: Vec<String>) -> Self {
Self { Self {
trigger_type: TriggerType::Keyword, trigger_type: TriggerType::Keyword,
@ -75,6 +75,7 @@ impl BotTrigger {
} }
} }
#[must_use]
pub fn from_tools(tools: Vec<String>) -> Self { pub fn from_tools(tools: Vec<String>) -> Self {
Self { Self {
trigger_type: TriggerType::Tool, trigger_type: TriggerType::Tool,
@ -85,6 +86,7 @@ impl BotTrigger {
} }
} }
#[must_use]
pub fn from_schedule(cron: String) -> Self { pub fn from_schedule(cron: String) -> Self {
Self { Self {
trigger_type: TriggerType::Schedule, trigger_type: TriggerType::Schedule,
@ -95,6 +97,7 @@ impl BotTrigger {
} }
} }
#[must_use]
pub fn always() -> Self { pub fn always() -> Self {
Self { Self {
trigger_type: TriggerType::Always, trigger_type: TriggerType::Always,
@ -106,7 +109,6 @@ impl BotTrigger {
} }
} }
/// Session bot association
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionBot { pub struct SessionBot {
pub id: Uuid, pub id: Uuid,
@ -118,32 +120,30 @@ pub struct SessionBot {
pub is_active: bool, pub is_active: bool,
} }
/// Register all bot-related keywords pub fn register_bot_keywords(state: &Arc<AppState>, user: &UserSession, engine: &mut Engine) {
pub fn register_bot_keywords(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { if let Err(e) = add_bot_with_trigger_keyword(Arc::clone(state), user.clone(), engine) {
if let Err(e) = add_bot_with_trigger_keyword(state.clone(), user.clone(), engine) { log::error!("Failed to register ADD BOT WITH TRIGGER keyword: {e}");
log::error!("Failed to register ADD BOT WITH TRIGGER keyword: {}", e);
} }
if let Err(e) = add_bot_with_tools_keyword(state.clone(), user.clone(), engine) { if let Err(e) = add_bot_with_tools_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register ADD BOT WITH TOOLS keyword: {}", e); log::error!("Failed to register ADD BOT WITH TOOLS keyword: {e}");
} }
if let Err(e) = add_bot_with_schedule_keyword(state.clone(), user.clone(), engine) { if let Err(e) = add_bot_with_schedule_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register ADD BOT WITH SCHEDULE keyword: {}", e); log::error!("Failed to register ADD BOT WITH SCHEDULE keyword: {e}");
} }
if let Err(e) = remove_bot_keyword(state.clone(), user.clone(), engine) { if let Err(e) = remove_bot_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register REMOVE BOT keyword: {}", e); log::error!("Failed to register REMOVE BOT keyword: {e}");
} }
if let Err(e) = list_bots_keyword(state.clone(), user.clone(), engine) { if let Err(e) = list_bots_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register LIST BOTS keyword: {}", e); log::error!("Failed to register LIST BOTS keyword: {e}");
} }
if let Err(e) = set_bot_priority_keyword(state.clone(), user.clone(), engine) { if let Err(e) = set_bot_priority_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register SET BOT PRIORITY keyword: {}", e); log::error!("Failed to register SET BOT PRIORITY keyword: {e}");
} }
if let Err(e) = delegate_to_keyword(state.clone(), user.clone(), engine) { if let Err(e) = delegate_to_keyword(Arc::clone(state), user.clone(), engine) {
log::error!("Failed to register DELEGATE TO keyword: {}", e); log::error!("Failed to register DELEGATE TO keyword: {e}");
} }
} }
/// ADD BOT "name" WITH TRIGGER "keywords"
fn add_bot_with_trigger_keyword( fn add_bot_with_trigger_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -168,9 +168,7 @@ fn add_bot_with_trigger_keyword(
.to_string(); .to_string();
trace!( trace!(
"ADD BOT '{}' WITH TRIGGER '{}' for session: {}", "ADD BOT '{bot_name}' WITH TRIGGER '{trigger_str}' for session: {}",
bot_name,
trigger_str,
user_clone.id user_clone.id
); );
@ -184,21 +182,20 @@ fn add_bot_with_trigger_keyword(
let state_for_task = Arc::clone(&state_clone); let state_for_task = Arc::clone(&state_clone);
let session_id = user_clone.id; let session_id = user_clone.id;
let bot_id = user_clone.bot_id; let bot_id = user_clone.bot_id;
let bot_name_clone = bot_name.clone();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
add_bot_to_session( add_bot_to_session(&state_for_task, session_id, bot_id, &bot_name, trigger)
&state_for_task, .await
session_id,
bot_id,
&bot_name_clone,
trigger,
)
.await
}); });
let _ = tx.send(result); let _ = tx.send(result);
}); });
@ -219,7 +216,6 @@ fn add_bot_with_trigger_keyword(
Ok(()) Ok(())
} }
/// ADD BOT "name" WITH TOOLS "tool1, tool2"
fn add_bot_with_tools_keyword( fn add_bot_with_tools_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -244,9 +240,7 @@ fn add_bot_with_tools_keyword(
.to_string(); .to_string();
trace!( trace!(
"ADD BOT '{}' WITH TOOLS '{}' for session: {}", "ADD BOT '{bot_name}' WITH TOOLS '{tools_str}' for session: {}",
bot_name,
tools_str,
user_clone.id user_clone.id
); );
@ -260,21 +254,20 @@ fn add_bot_with_tools_keyword(
let state_for_task = Arc::clone(&state_clone); let state_for_task = Arc::clone(&state_clone);
let session_id = user_clone.id; let session_id = user_clone.id;
let bot_id = user_clone.bot_id; let bot_id = user_clone.bot_id;
let bot_name_clone = bot_name.clone();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
add_bot_to_session( add_bot_to_session(&state_for_task, session_id, bot_id, &bot_name, trigger)
&state_for_task, .await
session_id,
bot_id,
&bot_name_clone,
trigger,
)
.await
}); });
let _ = tx.send(result); let _ = tx.send(result);
}); });
@ -295,7 +288,6 @@ fn add_bot_with_tools_keyword(
Ok(()) Ok(())
} }
/// ADD BOT "name" WITH SCHEDULE "cron"
fn add_bot_with_schedule_keyword( fn add_bot_with_schedule_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -320,9 +312,7 @@ fn add_bot_with_schedule_keyword(
.to_string(); .to_string();
trace!( trace!(
"ADD BOT '{}' WITH SCHEDULE '{}' for session: {}", "ADD BOT '{bot_name}' WITH SCHEDULE '{schedule}' for session: {}",
bot_name,
schedule,
user_clone.id user_clone.id
); );
@ -330,21 +320,20 @@ fn add_bot_with_schedule_keyword(
let state_for_task = Arc::clone(&state_clone); let state_for_task = Arc::clone(&state_clone);
let session_id = user_clone.id; let session_id = user_clone.id;
let bot_id = user_clone.bot_id; let bot_id = user_clone.bot_id;
let bot_name_clone = bot_name.clone();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
add_bot_to_session( add_bot_to_session(&state_for_task, session_id, bot_id, &bot_name, trigger)
&state_for_task, .await
session_id,
bot_id,
&bot_name_clone,
trigger,
)
.await
}); });
let _ = tx.send(result); let _ = tx.send(result);
}); });
@ -365,7 +354,6 @@ fn add_bot_with_schedule_keyword(
Ok(()) Ok(())
} }
/// REMOVE BOT "name"
fn remove_bot_keyword( fn remove_bot_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -384,7 +372,7 @@ fn remove_bot_keyword(
.trim_matches('"') .trim_matches('"')
.to_string(); .to_string();
trace!("REMOVE BOT '{}' from session: {}", bot_name, user_clone.id); trace!("REMOVE BOT '{bot_name}' from session: {}", user_clone.id);
let state_for_task = Arc::clone(&state_clone); let state_for_task = Arc::clone(&state_clone);
let session_id = user_clone.id; let session_id = user_clone.id;
@ -392,7 +380,13 @@ fn remove_bot_keyword(
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
remove_bot_from_session(&state_for_task, session_id, &bot_name).await remove_bot_from_session(&state_for_task, session_id, &bot_name).await
}); });
@ -415,7 +409,6 @@ fn remove_bot_keyword(
Ok(()) Ok(())
} }
/// LIST BOTS
fn list_bots_keyword( fn list_bots_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -433,14 +426,19 @@ fn list_bots_keyword(
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { get_session_bots(&state_for_task, session_id).await }); let result = rt.block_on(async { get_session_bots(&state_for_task, session_id).await });
let _ = tx.send(result); let _ = tx.send(result);
}); });
match rx.recv_timeout(std::time::Duration::from_secs(30)) { match rx.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(Ok(bots)) => { Ok(Ok(bots)) => {
// Convert to Dynamic array
let bot_list: Vec<Dynamic> = bots let bot_list: Vec<Dynamic> = bots
.into_iter() .into_iter()
.map(|b| { .map(|b| {
@ -470,7 +468,6 @@ fn list_bots_keyword(
Ok(()) Ok(())
} }
/// SET BOT PRIORITY "name", priority
fn set_bot_priority_keyword( fn set_bot_priority_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -488,15 +485,14 @@ fn set_bot_priority_keyword(
.to_string() .to_string()
.trim_matches('"') .trim_matches('"')
.to_string(); .to_string();
#[allow(clippy::cast_possible_truncation)]
let priority = context let priority = context
.eval_expression_tree(&inputs[1])? .eval_expression_tree(&inputs[1])?
.as_int() .as_int()
.unwrap_or(0) as i32; .unwrap_or(0) as i32;
trace!( trace!(
"SET BOT PRIORITY '{}' to {} for session: {}", "SET BOT PRIORITY '{bot_name}' to {priority} for session: {}",
bot_name,
priority,
user_clone.id user_clone.id
); );
@ -506,7 +502,13 @@ fn set_bot_priority_keyword(
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
set_bot_priority(&state_for_task, session_id, &bot_name, priority).await set_bot_priority(&state_for_task, session_id, &bot_name, priority).await
}); });
@ -529,7 +531,6 @@ fn set_bot_priority_keyword(
Ok(()) Ok(())
} }
/// DELEGATE TO "bot" WITH CONTEXT
fn delegate_to_keyword( fn delegate_to_keyword(
state: Arc<AppState>, state: Arc<AppState>,
user: UserSession, user: UserSession,
@ -548,7 +549,7 @@ fn delegate_to_keyword(
.trim_matches('"') .trim_matches('"')
.to_string(); .to_string();
trace!("DELEGATE TO '{}' for session: {}", bot_name, user_clone.id); trace!("DELEGATE TO '{bot_name}' for session: {}", user_clone.id);
let state_for_task = Arc::clone(&state_clone); let state_for_task = Arc::clone(&state_clone);
let session_id = user_clone.id; let session_id = user_clone.id;
@ -556,7 +557,13 @@ fn delegate_to_keyword(
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to create runtime: {e}")));
return;
}
};
let result = rt.block_on(async { let result = rt.block_on(async {
delegate_to_bot(&state_for_task, session_id, &bot_name).await delegate_to_bot(&state_for_task, session_id, &bot_name).await
}); });
@ -579,9 +586,6 @@ fn delegate_to_keyword(
Ok(()) Ok(())
} }
// Database Operations
/// Add a bot to the session
async fn add_bot_to_session( async fn add_bot_to_session(
state: &AppState, state: &AppState,
session_id: Uuid, session_id: Uuid,
@ -589,9 +593,8 @@ async fn add_bot_to_session(
bot_name: &str, bot_name: &str,
trigger: BotTrigger, trigger: BotTrigger,
) -> Result<String, String> { ) -> Result<String, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let mut conn = state.conn.get().map_err(|e| format!("DB error: {e}"))?;
// Check if bot exists
let bot_exists: bool = diesel::sql_query( let bot_exists: bool = diesel::sql_query(
"SELECT EXISTS(SELECT 1 FROM bots WHERE name = $1 AND is_active = true) as exists", "SELECT EXISTS(SELECT 1 FROM bots WHERE name = $1 AND is_active = true) as exists",
) )
@ -600,15 +603,13 @@ async fn add_bot_to_session(
.map(|r| r.exists) .map(|r| r.exists)
.unwrap_or(false); .unwrap_or(false);
// If bot doesn't exist, try to find it in templates or create a placeholder
let bot_id: String = if bot_exists { let bot_id: String = if bot_exists {
diesel::sql_query("SELECT id FROM bots WHERE name = $1 AND is_active = true") diesel::sql_query("SELECT id FROM bots WHERE name = $1 AND is_active = true")
.bind::<diesel::sql_types::Text, _>(bot_name) .bind::<diesel::sql_types::Text, _>(bot_name)
.get_result::<UuidResult>(&mut *conn) .get_result::<UuidResult>(&mut *conn)
.map(|r| r.id) .map(|r| r.id)
.map_err(|e| format!("Failed to get bot ID: {}", e))? .map_err(|e| format!("Failed to get bot ID: {e}"))?
} else { } else {
// Create a new bot entry
let new_bot_id = Uuid::new_v4(); let new_bot_id = Uuid::new_v4();
diesel::sql_query( diesel::sql_query(
"INSERT INTO bots (id, name, description, is_active, created_at) "INSERT INTO bots (id, name, description, is_active, created_at)
@ -618,18 +619,16 @@ async fn add_bot_to_session(
) )
.bind::<diesel::sql_types::Text, _>(new_bot_id.to_string()) .bind::<diesel::sql_types::Text, _>(new_bot_id.to_string())
.bind::<diesel::sql_types::Text, _>(bot_name) .bind::<diesel::sql_types::Text, _>(bot_name)
.bind::<diesel::sql_types::Text, _>(format!("Bot agent: {}", bot_name)) .bind::<diesel::sql_types::Text, _>(format!("Bot agent: {bot_name}"))
.execute(&mut *conn) .execute(&mut *conn)
.map_err(|e| format!("Failed to create bot: {}", e))?; .map_err(|e| format!("Failed to create bot: {e}"))?;
new_bot_id.to_string() new_bot_id.to_string()
}; };
// Serialize trigger to JSON let trigger_json =
let trigger_json = serde_json::to_string(&trigger) serde_json::to_string(&trigger).map_err(|e| format!("Failed to serialize trigger: {e}"))?;
.map_err(|e| format!("Failed to serialize trigger: {}", e))?;
// Add bot to session
let association_id = Uuid::new_v4(); let association_id = Uuid::new_v4();
diesel::sql_query( diesel::sql_query(
"INSERT INTO session_bots (id, session_id, bot_id, bot_name, trigger_config, priority, is_active, joined_at) "INSERT INTO session_bots (id, session_id, bot_id, bot_name, trigger_config, priority, is_active, joined_at)
@ -639,27 +638,26 @@ async fn add_bot_to_session(
) )
.bind::<diesel::sql_types::Text, _>(association_id.to_string()) .bind::<diesel::sql_types::Text, _>(association_id.to_string())
.bind::<diesel::sql_types::Text, _>(session_id.to_string()) .bind::<diesel::sql_types::Text, _>(session_id.to_string())
.bind::<diesel::sql_types::Text, _>(bot_id.to_string()) .bind::<diesel::sql_types::Text, _>(bot_id)
.bind::<diesel::sql_types::Text, _>(bot_name) .bind::<diesel::sql_types::Text, _>(bot_name)
.bind::<diesel::sql_types::Text, _>(&trigger_json) .bind::<diesel::sql_types::Text, _>(&trigger_json)
.execute(&mut *conn) .execute(&mut *conn)
.map_err(|e| format!("Failed to add bot to session: {}", e))?; .map_err(|e| format!("Failed to add bot to session: {e}"))?;
info!( info!(
"Bot '{}' added to session {} with trigger type: {:?}", "Bot '{bot_name}' added to session {session_id} with trigger type: {:?}",
bot_name, session_id, trigger.trigger_type trigger.trigger_type
); );
Ok(format!("Bot '{}' added to conversation", bot_name)) Ok(format!("Bot '{bot_name}' added to conversation"))
} }
/// Remove a bot from the session
async fn remove_bot_from_session( async fn remove_bot_from_session(
state: &AppState, state: &AppState,
session_id: Uuid, session_id: Uuid,
bot_name: &str, bot_name: &str,
) -> Result<String, String> { ) -> Result<String, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let mut conn = state.conn.get().map_err(|e| format!("DB error: {e}"))?;
let affected = diesel::sql_query( let affected = diesel::sql_query(
"UPDATE session_bots SET is_active = false WHERE session_id = $1 AND bot_name = $2", "UPDATE session_bots SET is_active = false WHERE session_id = $1 AND bot_name = $2",
@ -667,19 +665,18 @@ async fn remove_bot_from_session(
.bind::<diesel::sql_types::Text, _>(session_id.to_string()) .bind::<diesel::sql_types::Text, _>(session_id.to_string())
.bind::<diesel::sql_types::Text, _>(bot_name) .bind::<diesel::sql_types::Text, _>(bot_name)
.execute(&mut *conn) .execute(&mut *conn)
.map_err(|e| format!("Failed to remove bot: {}", e))?; .map_err(|e| format!("Failed to remove bot: {e}"))?;
if affected > 0 { if affected > 0 {
info!("Bot '{}' removed from session {}", bot_name, session_id); info!("Bot '{bot_name}' removed from session {session_id}");
Ok(format!("Bot '{}' removed from conversation", bot_name)) Ok(format!("Bot '{bot_name}' removed from conversation"))
} else { } else {
Ok(format!("Bot '{}' was not in the conversation", bot_name)) Ok(format!("Bot '{bot_name}' was not in the conversation"))
} }
} }
/// Get all bots in a session
async fn get_session_bots(state: &AppState, session_id: Uuid) -> Result<Vec<SessionBot>, String> { async fn get_session_bots(state: &AppState, session_id: Uuid) -> Result<Vec<SessionBot>, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let mut conn = state.conn.get().map_err(|e| format!("DB error: {e}"))?;
let results: Vec<SessionBotRow> = diesel::sql_query( let results: Vec<SessionBotRow> = diesel::sql_query(
"SELECT id, session_id, bot_id, bot_name, trigger_config, priority, is_active "SELECT id, session_id, bot_id, bot_name, trigger_config, priority, is_active
@ -689,13 +686,13 @@ async fn get_session_bots(state: &AppState, session_id: Uuid) -> Result<Vec<Sess
) )
.bind::<diesel::sql_types::Text, _>(session_id.to_string()) .bind::<diesel::sql_types::Text, _>(session_id.to_string())
.load(&mut *conn) .load(&mut *conn)
.map_err(|e| format!("Failed to get session bots: {}", e))?; .map_err(|e| format!("Failed to get session bots: {e}"))?;
let bots = results let bots = results
.into_iter() .into_iter()
.filter_map(|row| { .filter_map(|row| {
let trigger: BotTrigger = let trigger: BotTrigger =
serde_json::from_str(&row.trigger_config).unwrap_or(BotTrigger::always()); serde_json::from_str(&row.trigger_config).unwrap_or_else(|_| BotTrigger::always());
Some(SessionBot { Some(SessionBot {
id: Uuid::parse_str(&row.id).ok()?, id: Uuid::parse_str(&row.id).ok()?,
session_id: Uuid::parse_str(&row.session_id).ok()?, session_id: Uuid::parse_str(&row.session_id).ok()?,
@ -711,14 +708,13 @@ async fn get_session_bots(state: &AppState, session_id: Uuid) -> Result<Vec<Sess
Ok(bots) Ok(bots)
} }
/// Set bot priority in session
async fn set_bot_priority( async fn set_bot_priority(
state: &AppState, state: &AppState,
session_id: Uuid, session_id: Uuid,
bot_name: &str, bot_name: &str,
priority: i32, priority: i32,
) -> Result<String, String> { ) -> Result<String, String> {
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let mut conn = state.conn.get().map_err(|e| format!("DB error: {e}"))?;
diesel::sql_query( diesel::sql_query(
"UPDATE session_bots SET priority = $1 WHERE session_id = $2 AND bot_name = $3", "UPDATE session_bots SET priority = $1 WHERE session_id = $2 AND bot_name = $3",
@ -727,19 +723,17 @@ async fn set_bot_priority(
.bind::<diesel::sql_types::Text, _>(session_id.to_string()) .bind::<diesel::sql_types::Text, _>(session_id.to_string())
.bind::<diesel::sql_types::Text, _>(bot_name) .bind::<diesel::sql_types::Text, _>(bot_name)
.execute(&mut *conn) .execute(&mut *conn)
.map_err(|e| format!("Failed to set priority: {}", e))?; .map_err(|e| format!("Failed to set priority: {e}"))?;
Ok(format!("Bot '{}' priority set to {}", bot_name, priority)) Ok(format!("Bot '{bot_name}' priority set to {priority}"))
} }
/// Delegate current conversation to another bot
async fn delegate_to_bot( async fn delegate_to_bot(
state: &AppState, state: &AppState,
session_id: Uuid, session_id: Uuid,
bot_name: &str, bot_name: &str,
) -> Result<String, String> { ) -> Result<String, String> {
// Get the bot's configuration let mut conn = state.conn.get().map_err(|e| format!("DB error: {e}"))?;
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let bot_config: Option<BotConfigRow> = diesel::sql_query( let bot_config: Option<BotConfigRow> = diesel::sql_query(
"SELECT id, name, system_prompt, model_config FROM bots WHERE name = $1 AND is_active = true", "SELECT id, name, system_prompt, model_config FROM bots WHERE name = $1 AND is_active = true",
@ -748,12 +742,10 @@ async fn delegate_to_bot(
.get_result(&mut *conn) .get_result(&mut *conn)
.ok(); .ok();
let config = match bot_config { let Some(config) = bot_config else {
Some(cfg) => cfg, return Err(format!("Bot '{bot_name}' not found"));
None => return Err(format!("Bot '{}' not found", bot_name)),
}; };
// Log delegation details for debugging
trace!( trace!(
"Delegating to bot: id={}, name={}, has_system_prompt={}, has_model_config={}", "Delegating to bot: id={}, name={}, has_system_prompt={}, has_model_config={}",
config.id, config.id,
@ -762,30 +754,27 @@ async fn delegate_to_bot(
config.model_config.is_some() config.model_config.is_some()
); );
// Mark delegation in session with bot ID for proper tracking
diesel::sql_query("UPDATE sessions SET delegated_to = $1, delegated_at = NOW() WHERE id = $2") diesel::sql_query("UPDATE sessions SET delegated_to = $1, delegated_at = NOW() WHERE id = $2")
.bind::<diesel::sql_types::Text, _>(&config.id) .bind::<diesel::sql_types::Text, _>(&config.id)
.bind::<diesel::sql_types::Text, _>(session_id.to_string()) .bind::<diesel::sql_types::Text, _>(session_id.to_string())
.execute(&mut *conn) .execute(&mut *conn)
.map_err(|e| format!("Failed to delegate: {}", e))?; .map_err(|e| format!("Failed to delegate: {e}"))?;
// Build response message with bot info let response = config.system_prompt.as_ref().map_or_else(
let response = if let Some(ref prompt) = config.system_prompt { || format!("Conversation delegated to '{}'", config.name),
format!( |prompt| {
"Conversation delegated to '{}' (specialized: {})", format!(
config.name, "Conversation delegated to '{}' (specialized: {})",
prompt.chars().take(50).collect::<String>() config.name,
) prompt.chars().take(50).collect::<String>()
} else { )
format!("Conversation delegated to '{}'", config.name) },
}; );
Ok(response) Ok(response)
} }
// Multi-Agent Message Processing #[must_use]
/// Check if a message matches any bot triggers
pub fn match_bot_triggers(message: &str, bots: &[SessionBot]) -> Vec<SessionBot> { pub fn match_bot_triggers(message: &str, bots: &[SessionBot]) -> Vec<SessionBot> {
let message_lower = message.to_lowercase(); let message_lower = message.to_lowercase();
let mut matching_bots = Vec::new(); let mut matching_bots = Vec::new();
@ -796,27 +785,12 @@ pub fn match_bot_triggers(message: &str, bots: &[SessionBot]) -> Vec<SessionBot>
} }
let matches = match bot.trigger.trigger_type { let matches = match bot.trigger.trigger_type {
TriggerType::Keyword => { TriggerType::Keyword => bot.trigger.keywords.as_ref().map_or(false, |keywords| {
if let Some(keywords) = &bot.trigger.keywords { keywords
keywords .iter()
.iter() .any(|kw| message_lower.contains(&kw.to_lowercase()))
.any(|kw| message_lower.contains(&kw.to_lowercase())) }),
} else { TriggerType::Tool | TriggerType::Schedule | TriggerType::Event => false,
false
}
}
TriggerType::Tool => {
// Tool triggers are checked separately when tools are invoked
false
}
TriggerType::Schedule => {
// Schedule triggers are checked by the scheduler
false
}
TriggerType::Event => {
// Event triggers are checked when events occur
false
}
TriggerType::Always => true, TriggerType::Always => true,
}; };
@ -825,12 +799,11 @@ pub fn match_bot_triggers(message: &str, bots: &[SessionBot]) -> Vec<SessionBot>
} }
} }
// Sort by priority (higher first)
matching_bots.sort_by(|a, b| b.priority.cmp(&a.priority)); matching_bots.sort_by(|a, b| b.priority.cmp(&a.priority));
matching_bots matching_bots
} }
/// Check if a tool invocation matches any bot triggers #[must_use]
pub fn match_tool_triggers(tool_name: &str, bots: &[SessionBot]) -> Vec<SessionBot> { pub fn match_tool_triggers(tool_name: &str, bots: &[SessionBot]) -> Vec<SessionBot> {
let tool_upper = tool_name.to_uppercase(); let tool_upper = tool_name.to_uppercase();
let mut matching_bots = Vec::new(); let mut matching_bots = Vec::new();
@ -853,8 +826,6 @@ pub fn match_tool_triggers(tool_name: &str, bots: &[SessionBot]) -> Vec<SessionB
matching_bots matching_bots
} }
// Helper Types for Diesel Queries
#[derive(QueryableByName)] #[derive(QueryableByName)]
struct BoolResult { struct BoolResult {
#[diesel(sql_type = diesel::sql_types::Bool)] #[diesel(sql_type = diesel::sql_types::Bool)]
@ -943,7 +914,7 @@ mod tests {
assert_eq!(matches[0].bot_name, "hr-bot"); assert_eq!(matches[0].bot_name, "hr-bot");
let matches = match_bot_triggers("Hello world", &bots); let matches = match_bot_triggers("Hello world", &bots);
assert_eq!(matches.len(), 0); assert!(matches.is_empty());
} }
#[test] #[test]
@ -962,6 +933,6 @@ mod tests {
assert_eq!(matches.len(), 1); assert_eq!(matches.len(), 1);
let matches = match_tool_triggers("SEND", &bots); let matches = match_tool_triggers("SEND", &bots);
assert_eq!(matches.len(), 0); assert!(matches.is_empty());
} }
} }

View file

@ -1,35 +1,18 @@
//! CONTAINS function for checking array membership
//!
//! BASIC Syntax:
//! result = CONTAINS(array, value)
//!
//! Returns TRUE if the value exists in the array, FALSE otherwise.
//!
//! Examples:
//! names = ["Alice", "Bob", "Charlie"]
//! IF CONTAINS(names, "Bob") THEN
//! TALK "Found Bob!"
//! END IF
use crate::shared::models::UserSession; use crate::shared::models::UserSession;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::debug; use log::debug;
use rhai::{Array, Dynamic, Engine}; use rhai::{Array, Dynamic, Engine};
use std::sync::Arc; use std::sync::Arc;
/// Registers the CONTAINS function for array membership checking
pub fn contains_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn contains_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) {
// CONTAINS - uppercase version
engine.register_fn("CONTAINS", |arr: Array, value: Dynamic| -> bool { engine.register_fn("CONTAINS", |arr: Array, value: Dynamic| -> bool {
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
// contains - lowercase version
engine.register_fn("contains", |arr: Array, value: Dynamic| -> bool { engine.register_fn("contains", |arr: Array, value: Dynamic| -> bool {
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
// IN_ARRAY - alternative name (PHP style)
engine.register_fn("IN_ARRAY", |value: Dynamic, arr: Array| -> bool { engine.register_fn("IN_ARRAY", |value: Dynamic, arr: Array| -> bool {
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
@ -38,7 +21,6 @@ pub fn contains_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
// INCLUDES - JavaScript style
engine.register_fn("INCLUDES", |arr: Array, value: Dynamic| -> bool { engine.register_fn("INCLUDES", |arr: Array, value: Dynamic| -> bool {
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
@ -47,7 +29,6 @@ pub fn contains_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
// HAS - short form
engine.register_fn("HAS", |arr: Array, value: Dynamic| -> bool { engine.register_fn("HAS", |arr: Array, value: Dynamic| -> bool {
array_contains(&arr, &value) array_contains(&arr, &value)
}); });
@ -59,17 +40,14 @@ pub fn contains_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut
debug!("Registered CONTAINS keyword"); debug!("Registered CONTAINS keyword");
} }
/// Helper function to check if an array contains a value
fn array_contains(arr: &Array, value: &Dynamic) -> bool { fn array_contains(arr: &Array, value: &Dynamic) -> bool {
let search_str = value.to_string(); let search_str = value.to_string();
for item in arr { for item in arr {
// Try exact type match first
if items_equal(item, value) { if items_equal(item, value) {
return true; return true;
} }
// Fall back to string comparison
if item.to_string() == search_str { if item.to_string() == search_str {
return true; return true;
} }
@ -78,22 +56,19 @@ fn array_contains(arr: &Array, value: &Dynamic) -> bool {
false false
} }
/// Helper function to compare two Dynamic values
fn items_equal(a: &Dynamic, b: &Dynamic) -> bool { fn items_equal(a: &Dynamic, b: &Dynamic) -> bool {
// Both integers
if a.is_int() && b.is_int() { if a.is_int() && b.is_int() {
return a.as_int().unwrap_or(0) == b.as_int().unwrap_or(1); return a.as_int().unwrap_or(0) == b.as_int().unwrap_or(1);
} }
// Both floats
if a.is_float() && b.is_float() { if a.is_float() && b.is_float() {
let af = a.as_float().unwrap_or(0.0); let af = a.as_float().unwrap_or(0.0);
let bf = b.as_float().unwrap_or(1.0); let bf = b.as_float().unwrap_or(1.0);
return (af - bf).abs() < f64::EPSILON; return (af - bf).abs() < f64::EPSILON;
} }
// Int and float comparison
if a.is_int() && b.is_float() { if a.is_int() && b.is_float() {
#[allow(clippy::cast_precision_loss)]
let af = a.as_int().unwrap_or(0) as f64; let af = a.as_int().unwrap_or(0) as f64;
let bf = b.as_float().unwrap_or(1.0); let bf = b.as_float().unwrap_or(1.0);
return (af - bf).abs() < f64::EPSILON; return (af - bf).abs() < f64::EPSILON;
@ -101,16 +76,15 @@ fn items_equal(a: &Dynamic, b: &Dynamic) -> bool {
if a.is_float() && b.is_int() { if a.is_float() && b.is_int() {
let af = a.as_float().unwrap_or(0.0); let af = a.as_float().unwrap_or(0.0);
#[allow(clippy::cast_precision_loss)]
let bf = b.as_int().unwrap_or(1) as f64; let bf = b.as_int().unwrap_or(1) as f64;
return (af - bf).abs() < f64::EPSILON; return (af - bf).abs() < f64::EPSILON;
} }
// Both booleans
if a.is_bool() && b.is_bool() { if a.is_bool() && b.is_bool() {
return a.as_bool().unwrap_or(false) == b.as_bool().unwrap_or(true); return a.as_bool().unwrap_or(false) == b.as_bool().unwrap_or(true);
} }
// Both strings
if a.is_string() && b.is_string() { if a.is_string() && b.is_string() {
return a.clone().into_string().unwrap_or_default() return a.clone().into_string().unwrap_or_default()
== b.clone().into_string().unwrap_or_default(); == b.clone().into_string().unwrap_or_default();
@ -125,10 +99,11 @@ mod tests {
#[test] #[test]
fn test_contains_string() { fn test_contains_string() {
let mut arr = Array::new(); let arr: Array = vec![
arr.push(Dynamic::from("Alice")); Dynamic::from("Alice"),
arr.push(Dynamic::from("Bob")); Dynamic::from("Bob"),
arr.push(Dynamic::from("Charlie")); Dynamic::from("Charlie"),
];
assert!(array_contains(&arr, &Dynamic::from("Bob"))); assert!(array_contains(&arr, &Dynamic::from("Bob")));
assert!(!array_contains(&arr, &Dynamic::from("David"))); assert!(!array_contains(&arr, &Dynamic::from("David")));
@ -136,10 +111,11 @@ mod tests {
#[test] #[test]
fn test_contains_integer() { fn test_contains_integer() {
let mut arr = Array::new(); let arr: Array = vec![
arr.push(Dynamic::from(1_i64)); Dynamic::from(1_i64),
arr.push(Dynamic::from(2_i64)); Dynamic::from(2_i64),
arr.push(Dynamic::from(3_i64)); Dynamic::from(3_i64),
];
assert!(array_contains(&arr, &Dynamic::from(2_i64))); assert!(array_contains(&arr, &Dynamic::from(2_i64)));
assert!(!array_contains(&arr, &Dynamic::from(5_i64))); assert!(!array_contains(&arr, &Dynamic::from(5_i64)));
@ -147,10 +123,11 @@ mod tests {
#[test] #[test]
fn test_contains_float() { fn test_contains_float() {
let mut arr = Array::new(); let arr: Array = vec![
arr.push(Dynamic::from(1.5_f64)); Dynamic::from(1.5_f64),
arr.push(Dynamic::from(2.5_f64)); Dynamic::from(2.5_f64),
arr.push(Dynamic::from(3.5_f64)); Dynamic::from(3.5_f64),
];
assert!(array_contains(&arr, &Dynamic::from(2.5_f64))); assert!(array_contains(&arr, &Dynamic::from(2.5_f64)));
assert!(!array_contains(&arr, &Dynamic::from(4.5_f64))); assert!(!array_contains(&arr, &Dynamic::from(4.5_f64)));
@ -158,9 +135,7 @@ mod tests {
#[test] #[test]
fn test_contains_bool() { fn test_contains_bool() {
let mut arr = Array::new(); let arr: Array = vec![Dynamic::from(true), Dynamic::from(false)];
arr.push(Dynamic::from(true));
arr.push(Dynamic::from(false));
assert!(array_contains(&arr, &Dynamic::from(true))); assert!(array_contains(&arr, &Dynamic::from(true)));
assert!(array_contains(&arr, &Dynamic::from(false))); assert!(array_contains(&arr, &Dynamic::from(false)));

View file

@ -1,20 +1,10 @@
//! PUSH and POP array manipulation functions
//!
//! PUSH - Add element(s) to the end of an array
//! POP - Remove and return the last element from an array
//! SHIFT - Remove and return the first element from an array
//! UNSHIFT - Add element(s) to the beginning of an array
use crate::shared::models::UserSession; use crate::shared::models::UserSession;
use crate::shared::state::AppState; use crate::shared::state::AppState;
use log::debug; use log::debug;
use rhai::{Array, Dynamic, Engine}; use rhai::{Array, Dynamic, Engine};
use std::sync::Arc; use std::sync::Arc;
/// PUSH - Add an element to the end of an array
/// Returns the new array with the element added
pub fn push_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn push_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) {
// PUSH single element
engine.register_fn("PUSH", |mut arr: Array, value: Dynamic| -> Array { engine.register_fn("PUSH", |mut arr: Array, value: Dynamic| -> Array {
arr.push(value); arr.push(value);
arr arr
@ -25,13 +15,11 @@ pub fn push_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Eng
arr arr
}); });
// ARRAY_PUSH alias
engine.register_fn("ARRAY_PUSH", |mut arr: Array, value: Dynamic| -> Array { engine.register_fn("ARRAY_PUSH", |mut arr: Array, value: Dynamic| -> Array {
arr.push(value); arr.push(value);
arr arr
}); });
// APPEND alias
engine.register_fn("APPEND", |mut arr: Array, value: Dynamic| -> Array { engine.register_fn("APPEND", |mut arr: Array, value: Dynamic| -> Array {
arr.push(value); arr.push(value);
arr arr
@ -45,10 +33,7 @@ pub fn push_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Eng
debug!("Registered PUSH keyword"); debug!("Registered PUSH keyword");
} }
/// POP - Remove and return the last element from an array
/// Returns the removed element (or unit if array is empty)
pub fn pop_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn pop_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) {
// POP - returns the popped element
engine.register_fn("POP", |mut arr: Array| -> Dynamic { engine.register_fn("POP", |mut arr: Array| -> Dynamic {
arr.pop().unwrap_or(Dynamic::UNIT) arr.pop().unwrap_or(Dynamic::UNIT)
}); });
@ -57,7 +42,6 @@ pub fn pop_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engi
arr.pop().unwrap_or(Dynamic::UNIT) arr.pop().unwrap_or(Dynamic::UNIT)
}); });
// ARRAY_POP alias
engine.register_fn("ARRAY_POP", |mut arr: Array| -> Dynamic { engine.register_fn("ARRAY_POP", |mut arr: Array| -> Dynamic {
arr.pop().unwrap_or(Dynamic::UNIT) arr.pop().unwrap_or(Dynamic::UNIT)
}); });
@ -65,7 +49,6 @@ pub fn pop_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engi
debug!("Registered POP keyword"); debug!("Registered POP keyword");
} }
/// SHIFT - Remove and return the first element from an array
pub fn shift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn shift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine.register_fn("SHIFT", |mut arr: Array| -> Dynamic { engine.register_fn("SHIFT", |mut arr: Array| -> Dynamic {
if arr.is_empty() { if arr.is_empty() {
@ -83,7 +66,6 @@ pub fn shift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut En
} }
}); });
// ARRAY_SHIFT alias
engine.register_fn("ARRAY_SHIFT", |mut arr: Array| -> Dynamic { engine.register_fn("ARRAY_SHIFT", |mut arr: Array| -> Dynamic {
if arr.is_empty() { if arr.is_empty() {
Dynamic::UNIT Dynamic::UNIT
@ -95,7 +77,6 @@ pub fn shift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut En
debug!("Registered SHIFT keyword"); debug!("Registered SHIFT keyword");
} }
/// UNSHIFT - Add element(s) to the beginning of an array
pub fn unshift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) { pub fn unshift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut Engine) {
engine.register_fn("UNSHIFT", |mut arr: Array, value: Dynamic| -> Array { engine.register_fn("UNSHIFT", |mut arr: Array, value: Dynamic| -> Array {
arr.insert(0, value); arr.insert(0, value);
@ -107,7 +88,6 @@ pub fn unshift_keyword(_state: &Arc<AppState>, _user: UserSession, engine: &mut
arr arr
}); });
// PREPEND alias
engine.register_fn("PREPEND", |mut arr: Array, value: Dynamic| -> Array { engine.register_fn("PREPEND", |mut arr: Array, value: Dynamic| -> Array {
arr.insert(0, value); arr.insert(0, value);
arr arr
@ -130,15 +110,15 @@ mod tests {
let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2)]; let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2)];
arr.push(Dynamic::from(3)); arr.push(Dynamic::from(3));
assert_eq!(arr.len(), 3); assert_eq!(arr.len(), 3);
assert_eq!(arr[2].as_int().unwrap(), 3); assert_eq!(arr[2].as_int().unwrap_or(0), 3);
} }
#[test] #[test]
fn test_pop() { fn test_pop() {
let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2), Dynamic::from(3)]; let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2), Dynamic::from(3)];
let popped = arr.pop().unwrap(); let popped = arr.pop();
assert_eq!(arr.len(), 2); assert_eq!(arr.len(), 2);
assert_eq!(popped.as_int().unwrap(), 3); assert_eq!(popped.and_then(|v| v.as_int().ok()).unwrap_or(0), 3);
} }
#[test] #[test]
@ -153,8 +133,8 @@ mod tests {
let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2), Dynamic::from(3)]; let mut arr: Array = vec![Dynamic::from(1), Dynamic::from(2), Dynamic::from(3)];
let shifted = arr.remove(0); let shifted = arr.remove(0);
assert_eq!(arr.len(), 2); assert_eq!(arr.len(), 2);
assert_eq!(shifted.as_int().unwrap(), 1); assert_eq!(shifted.as_int().unwrap_or(0), 1);
assert_eq!(arr[0].as_int().unwrap(), 2); assert_eq!(arr[0].as_int().unwrap_or(0), 2);
} }
#[test] #[test]
@ -162,6 +142,6 @@ mod tests {
let mut arr: Array = vec![Dynamic::from(2), Dynamic::from(3)]; let mut arr: Array = vec![Dynamic::from(2), Dynamic::from(3)];
arr.insert(0, Dynamic::from(1)); arr.insert(0, Dynamic::from(1));
assert_eq!(arr.len(), 3); assert_eq!(arr.len(), 3);
assert_eq!(arr[0].as_int().unwrap(), 1); assert_eq!(arr[0].as_int().unwrap_or(0), 1);
} }
} }

View file

@ -3,8 +3,8 @@ use crate::shared::state::AppState;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
pub fn create_draft_keyword(_state: &AppState, _user: UserSession, engine: &mut Engine) { pub fn create_draft_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) {
let state_clone = _state.clone(); let state_clone = state.clone();
engine engine
.register_custom_syntax( .register_custom_syntax(
&["CREATE_DRAFT", "$expr$", ",", "$expr$", ",", "$expr$"], &["CREATE_DRAFT", "$expr$", ",", "$expr$", ",", "$expr$"],
@ -17,11 +17,11 @@ pub fn create_draft_keyword(_state: &AppState, _user: UserSession, engine: &mut
let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text); let fut = execute_create_draft(&state_clone, &to, &subject, &reply_text);
let result = let result =
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut)) tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(fut))
.map_err(|e| format!("Draft creation error: {}", e))?; .map_err(|e| format!("Draft creation error: {e}"))?;
Ok(Dynamic::from(result)) Ok(Dynamic::from(result))
}, },
) )
.unwrap(); .ok();
} }
async fn execute_create_draft( async fn execute_create_draft(
@ -36,37 +36,36 @@ async fn execute_create_draft(
let config = state.config.as_ref().ok_or("No email config")?; let config = state.config.as_ref().ok_or("No email config")?;
// Fetch any previous emails to this recipient for threading
let previous_email = fetch_latest_sent_to(&config.email, to) let previous_email = fetch_latest_sent_to(&config.email, to)
.await .await
.unwrap_or_default(); .unwrap_or_default();
let email_body = if !previous_email.is_empty() { let email_body = if !previous_email.is_empty() {
// Create a threaded reply
let email_separator = "<br><hr><br>"; let email_separator = "<br><hr><br>";
let formatted_reply = reply_text.replace("FIX", "Fixed"); let formatted_reply = reply_text.replace("FIX", "Fixed");
let formatted_old = previous_email.replace("\n", "<br>"); let formatted_old = previous_email.replace('\n', "<br>");
format!("{}{}{}", formatted_reply, email_separator, formatted_old) format!("{formatted_reply}{email_separator}{formatted_old}")
} else { } else {
reply_text.to_string() reply_text.to_string()
}; };
let draft_request = SaveDraftRequest { let draft_request = SaveDraftRequest {
account_id: String::new(),
to: to.to_string(), to: to.to_string(),
subject: subject.to_string(),
cc: None, cc: None,
bcc: None,
subject: subject.to_string(),
body: email_body, body: email_body,
}; };
save_email_draft(&config.email, &draft_request) save_email_draft(&config.email, &draft_request)
.await .await
.map(|_| "Draft saved successfully".to_string()) .map(|()| "Draft saved successfully".to_string())
.map_err(|e| e.to_string()) .map_err(|e| e.to_string())
} }
#[cfg(not(feature = "email"))] #[cfg(not(feature = "email"))]
{ {
// Store draft in database when email feature is disabled
use chrono::Utc; use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use uuid::Uuid; use uuid::Uuid;
@ -92,7 +91,7 @@ async fn execute_create_draft(
.execute(&mut db_conn) .execute(&mut db_conn)
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
Ok::<_, String>(format!("Draft saved with ID: {}", draft_id)) Ok::<_, String>(format!("Draft saved with ID: {draft_id}"))
}) })
.await .await
.map_err(|e| e.to_string())? .map_err(|e| e.to_string())?

View file

@ -1,14 +1,19 @@
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
pub fn first_keyword(engine: &mut Engine) { pub fn first_keyword(engine: &mut Engine) {
engine engine
.register_custom_syntax(&["FIRST", "$expr$"], false, { .register_custom_syntax(&["FIRST", "$expr$"], false, {
move |context, inputs| { move |context, inputs| {
let input_string = context.eval_expression_tree(&inputs[0])?; let input_string = context.eval_expression_tree(&inputs[0])?;
let input_str = input_string.to_string(); let input_str = input_string.to_string();
let first_word = input_str.split_whitespace().next().unwrap_or("").to_string(); let first_word = input_str
Ok(Dynamic::from(first_word)) .split_whitespace()
} .next()
}) .unwrap_or("")
.unwrap(); .to_string();
Ok(Dynamic::from(first_word))
}
})
.ok();
} }

View file

@ -1,18 +1,19 @@
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
pub fn last_keyword(engine: &mut Engine) { pub fn last_keyword(engine: &mut Engine) {
engine engine
.register_custom_syntax(&["LAST", "(", "$expr$", ")"], false, { .register_custom_syntax(&["LAST", "(", "$expr$", ")"], false, {
move |context, inputs| { move |context, inputs| {
let input_string = context.eval_expression_tree(&inputs[0])?; let input_string = context.eval_expression_tree(&inputs[0])?;
let input_str = input_string.to_string(); let input_str = input_string.to_string();
if input_str.trim().is_empty() { if input_str.trim().is_empty() {
return Ok(Dynamic::from("")); return Ok(Dynamic::from(""));
} }
let words: Vec<&str> = input_str.split_whitespace().collect(); let words: Vec<&str> = input_str.split_whitespace().collect();
let last_word = words.last().map(|s| *s).unwrap_or(""); let last_word = words.last().copied().unwrap_or("");
Ok(Dynamic::from(last_word.to_string())) Ok(Dynamic::from(last_word.to_string()))
} }
}) })
.unwrap(); .ok();
} }

View file

@ -30,10 +30,10 @@ mod tests {
#[test] #[test]
fn test_round_decimals() { fn test_round_decimals() {
let n = 3.14159_f64; let n = 2.71828_f64;
let decimals = 2; let decimals = 2;
let factor = 10_f64.powi(decimals); let factor = 10_f64.powi(decimals);
let result = (n * factor).round() / factor; let result = (n * factor).round() / factor;
assert!((result - 3.14).abs() < 0.001); assert!((result - 2.72).abs() < 0.001);
} }
} }

View file

@ -79,6 +79,7 @@ mod tests {
#[test] #[test]
fn test_pi() { fn test_pi() {
assert!((std::f64::consts::PI - 3.14159).abs() < 0.001); assert!(std::f64::consts::PI > 3.14);
assert!(std::f64::consts::PI < 3.15);
} }
} }

View file

@ -1,15 +1,15 @@
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use log::trace; use log::trace;
use rhai::Dynamic; use rhai::Dynamic;
use rhai::Engine; use rhai::Engine;
use crate::shared::state::AppState;
use crate::shared::models::UserSession;
pub fn print_keyword(_state: &AppState, _user: UserSession, engine: &mut Engine) { pub fn print_keyword(_state: &AppState, _user: UserSession, engine: &mut Engine) {
engine engine
.register_custom_syntax(&["PRINT", "$expr$"], true, |context, inputs| { .register_custom_syntax(&["PRINT", "$expr$"], true, |context, inputs| {
let value = context.eval_expression_tree(&inputs[0])?; let value = context.eval_expression_tree(&inputs[0])?;
trace!("PRINT: {}", value); trace!("PRINT: {value}");
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}, })
) .ok();
.unwrap();
} }

View file

@ -290,7 +290,7 @@ async fn execute_send_mail(
{ {
use crate::email::EmailService; use crate::email::EmailService;
let email_service = EmailService::new(Arc::new(state.as_ref().clone())); let email_service = EmailService::new(Arc::new(state.clone()));
if let Ok(_) = email_service if let Ok(_) = email_service
.send_email( .send_email(

View file

@ -7,7 +7,7 @@ use uuid::Uuid;
pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
let user_clone = user.clone(); let user_clone = user;
engine engine
.register_custom_syntax(&["SET", "USER", "$expr$"], true, move |context, inputs| { .register_custom_syntax(&["SET", "USER", "$expr$"], true, move |context, inputs| {
@ -21,21 +21,20 @@ pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut En
futures::executor::block_on(state_for_spawn.session_manager.lock()); futures::executor::block_on(state_for_spawn.session_manager.lock());
if let Err(e) = session_manager.update_user_id(user_clone_spawn.id, user_id) { if let Err(e) = session_manager.update_user_id(user_clone_spawn.id, user_id) {
error!("Failed to update user ID in session: {}", e); error!("Failed to update user ID in session: {e}");
} else { } else {
trace!( trace!(
"Updated session {} to user ID: {}", "Updated session {} to user ID: {user_id}",
user_clone_spawn.id, user_clone_spawn.id
user_id
); );
} }
} }
Err(e) => { Err(e) => {
trace!("Invalid user ID format: {}", e); trace!("Invalid user ID format: {e}");
} }
} }
Ok(Dynamic::UNIT) Ok(Dynamic::UNIT)
}) })
.unwrap(); .ok();
} }

View file

@ -1,27 +1,29 @@
use crate::shared::state::AppState;
use crate::shared::models::UserSession; use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use rhai::{Dynamic, Engine}; use rhai::{Dynamic, Engine};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
pub fn wait_keyword(_state: &AppState, _user: UserSession, engine: &mut Engine) { pub fn wait_keyword(_state: &AppState, _user: UserSession, engine: &mut Engine) {
engine engine
.register_custom_syntax(&["WAIT", "$expr$"], false, move |context, inputs| { .register_custom_syntax(&["WAIT", "$expr$"], false, move |context, inputs| {
let seconds = context.eval_expression_tree(&inputs[0])?; let seconds = context.eval_expression_tree(&inputs[0])?;
let duration_secs = if seconds.is::<i64>() { let duration_secs = if seconds.is::<i64>() {
seconds.cast::<i64>() as f64 #[allow(clippy::cast_precision_loss)]
} else if seconds.is::<f64>() { let val = seconds.cast::<i64>() as f64;
seconds.cast::<f64>() val
} else { } else if seconds.is::<f64>() {
return Err(format!("WAIT expects a number, got: {}", seconds).into()); seconds.cast::<f64>()
}; } else {
if duration_secs < 0.0 { return Err(format!("WAIT expects a number, got: {seconds}").into());
return Err("WAIT duration cannot be negative".into()); };
} if duration_secs < 0.0 {
let capped_duration = if duration_secs > 300.0 { 300.0 } else { duration_secs }; return Err("WAIT duration cannot be negative".into());
let duration = Duration::from_secs_f64(capped_duration); }
thread::sleep(duration); let capped_duration = duration_secs.min(300.0);
Ok(Dynamic::from(format!("Waited {} seconds", capped_duration))) let duration = Duration::from_secs_f64(capped_duration);
}, thread::sleep(duration);
) Ok(Dynamic::from(format!("Waited {capped_duration} seconds")))
.unwrap(); })
.ok();
} }

View file

@ -124,7 +124,7 @@ impl ScriptService {
add_member_keyword(state.clone(), user.clone(), &mut engine); add_member_keyword(state.clone(), user.clone(), &mut engine);
// Register dynamic bot management keywords (ADD BOT, REMOVE BOT) // Register dynamic bot management keywords (ADD BOT, REMOVE BOT)
register_bot_keywords(state.clone(), user.clone(), &mut engine); register_bot_keywords(&state, &user, &mut engine);
// Register model routing keywords (USE MODEL, SET MODEL ROUTING, etc.) // Register model routing keywords (USE MODEL, SET MODEL ROUTING, etc.)
register_model_routing_keywords(state.clone(), user.clone(), &mut engine); register_model_routing_keywords(state.clone(), user.clone(), &mut engine);

190
src/calendar/caldav.rs Normal file
View file

@ -0,0 +1,190 @@
//! CalDAV module for calendar synchronization
//!
//! This module provides CalDAV protocol support for calendar synchronization
//! with external calendar clients and servers.
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Router,
};
use std::sync::Arc;
use super::CalendarEngine;
use crate::shared::state::AppState;
/// Create the CalDAV router
/// Note: The engine is stored in a static for now to avoid state type conflicts
pub fn create_caldav_router(_engine: Arc<CalendarEngine>) -> Router<Arc<AppState>> {
// TODO: Store engine in a way accessible to handlers
// For now, create a stateless router that can merge with any state type
Router::new()
.route("/caldav", get(caldav_root))
.route("/caldav/principals", get(caldav_principals))
.route("/caldav/calendars", get(caldav_calendars))
.route("/caldav/calendars/:calendar_id", get(caldav_calendar))
.route(
"/caldav/calendars/:calendar_id/:event_id.ics",
get(caldav_event).put(caldav_put_event),
)
}
/// CalDAV root endpoint - returns server capabilities
async fn caldav_root() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.header("DAV", "1, 2, calendar-access")
.header("Content-Type", "application/xml; charset=utf-8")
.body(
r#"<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:response>
<D:href>/caldav/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype>
<D:collection/>
</D:resourcetype>
<D:displayname>GeneralBots CalDAV Server</D:displayname>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"#
.to_string(),
)
.unwrap()
}
/// CalDAV principals endpoint - returns user principal
async fn caldav_principals() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/xml; charset=utf-8")
.body(
r#"<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:response>
<D:href>/caldav/principals/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype>
<D:collection/>
<D:principal/>
</D:resourcetype>
<C:calendar-home-set>
<D:href>/caldav/calendars/</D:href>
</C:calendar-home-set>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"#
.to_string(),
)
.unwrap()
}
/// CalDAV calendars collection endpoint
async fn caldav_calendars() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/xml; charset=utf-8")
.body(
r#"<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:response>
<D:href>/caldav/calendars/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype>
<D:collection/>
</D:resourcetype>
<D:displayname>Calendars</D:displayname>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
<D:response>
<D:href>/caldav/calendars/default/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype>
<D:collection/>
<C:calendar/>
</D:resourcetype>
<D:displayname>Default Calendar</D:displayname>
<C:supported-calendar-component-set>
<C:comp name="VEVENT"/>
<C:comp name="VTODO"/>
</C:supported-calendar-component-set>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"#
.to_string(),
)
.unwrap()
}
/// CalDAV single calendar endpoint
async fn caldav_calendar() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/xml; charset=utf-8")
.body(
r#"<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:caldav">
<D:response>
<D:href>/caldav/calendars/default/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype>
<D:collection/>
<C:calendar/>
</D:resourcetype>
<D:displayname>Default Calendar</D:displayname>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"#
.to_string(),
)
.unwrap()
}
/// Get a single event in iCalendar format
async fn caldav_event() -> impl IntoResponse {
// TODO: Fetch actual event from engine and convert to iCalendar format
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/calendar; charset=utf-8")
.body(
r#"BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//GeneralBots//Calendar//EN
BEGIN:VEVENT
UID:placeholder@generalbots.com
DTSTAMP:20240101T000000Z
DTSTART:20240101T090000Z
DTEND:20240101T100000Z
SUMMARY:Placeholder Event
END:VEVENT
END:VCALENDAR"#
.to_string(),
)
.unwrap()
}
/// Put (create/update) an event
async fn caldav_put_event() -> impl IntoResponse {
// TODO: Parse incoming iCalendar and create/update event in engine
Response::builder()
.status(StatusCode::CREATED)
.header("ETag", "\"placeholder-etag\"")
.body(String::new())
.unwrap()
}

View file

@ -5,8 +5,13 @@ use axum::{
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use icalendar::{Calendar, Component, Event as IcalEvent, EventLike, Property}; use diesel::r2d2::{ConnectionManager, Pool};
use diesel::PgConnection;
use icalendar::{
Calendar, CalendarDateTime, Component, DatePerhapsTime, Event as IcalEvent, EventLike, Property,
};
use log::info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -16,6 +21,8 @@ use uuid::Uuid;
use crate::core::urls::ApiUrls; use crate::core::urls::ApiUrls;
use crate::shared::state::AppState; use crate::shared::state::AppState;
pub mod caldav;
pub struct CalendarState { pub struct CalendarState {
events: RwLock<HashMap<Uuid, CalendarEvent>>, events: RwLock<HashMap<Uuid, CalendarEvent>>,
} }
@ -108,8 +115,8 @@ impl CalendarEvent {
let uid = ical.get_uid()?; let uid = ical.get_uid()?;
let summary = ical.get_summary()?; let summary = ical.get_summary()?;
let start_time = ical.get_start()?.with_timezone(&Utc); let start_time = date_perhaps_time_to_utc(ical.get_start()?)?;
let end_time = ical.get_end()?.with_timezone(&Utc); let end_time = date_perhaps_time_to_utc(ical.get_end()?)?;
let id = Uuid::parse_str(uid).unwrap_or_else(|_| Uuid::new_v4()); let id = Uuid::parse_str(uid).unwrap_or_else(|_| Uuid::new_v4());
@ -130,6 +137,31 @@ impl CalendarEvent {
} }
} }
/// Convert DatePerhapsTime to DateTime<Utc>
fn date_perhaps_time_to_utc(dpt: DatePerhapsTime) -> Option<DateTime<Utc>> {
match dpt {
DatePerhapsTime::DateTime(cal_dt) => {
// Handle different CalendarDateTime variants
match cal_dt {
CalendarDateTime::Utc(dt) => Some(dt),
CalendarDateTime::Floating(naive) => {
// For floating time, assume UTC
Some(Utc.from_utc_datetime(&naive))
}
CalendarDateTime::WithTimezone { date_time, .. } => {
// For timezone-aware, convert to UTC (assuming UTC if tz parsing fails)
Some(Utc.from_utc_datetime(&date_time))
}
}
}
DatePerhapsTime::Date(date) => {
// For date-only, use midnight UTC
let naive = NaiveDateTime::new(date, chrono::NaiveTime::from_hms_opt(0, 0, 0)?);
Some(Utc.from_utc_datetime(&naive))
}
}
}
/// Export events to iCal format /// Export events to iCal format
pub fn export_to_ical(events: &[CalendarEvent], calendar_name: &str) -> String { pub fn export_to_ical(events: &[CalendarEvent], calendar_name: &str) -> String {
let mut calendar = Calendar::new(); let mut calendar = Calendar::new();
@ -165,11 +197,16 @@ pub fn import_from_ical(ical_str: &str, organizer: &str) -> Vec<CalendarEvent> {
#[derive(Default)] #[derive(Default)]
pub struct CalendarEngine { pub struct CalendarEngine {
events: Vec<CalendarEvent>, events: Vec<CalendarEvent>,
#[allow(dead_code)]
conn: Option<Pool<ConnectionManager<PgConnection>>>,
} }
impl CalendarEngine { impl CalendarEngine {
pub fn new() -> Self { pub fn new(conn: Pool<ConnectionManager<PgConnection>>) -> Self {
Self::default() Self {
events: Vec::new(),
conn: Some(conn),
}
} }
pub fn create_event(&mut self, input: CalendarEventInput) -> CalendarEvent { pub fn create_event(&mut self, input: CalendarEventInput) -> CalendarEvent {
@ -262,7 +299,6 @@ impl CalendarEngine {
} }
} }
pub async fn list_events( pub async fn list_events(
State(_state): State<Arc<AppState>>, State(_state): State<Arc<AppState>>,
axum::extract::Query(_query): axum::extract::Query<serde_json::Value>, axum::extract::Query(_query): axum::extract::Query<serde_json::Value>,
@ -402,10 +438,7 @@ pub async fn update_event(
Ok(Json(event.clone())) Ok(Json(event.clone()))
} }
pub async fn delete_event( pub async fn delete_event(State(_state): State<Arc<AppState>>, Path(id): Path<Uuid>) -> StatusCode {
State(_state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> StatusCode {
let calendar_state = get_calendar_state(); let calendar_state = get_calendar_state();
let mut events = calendar_state.events.write().await; let mut events = calendar_state.events.write().await;
@ -438,16 +471,21 @@ pub async fn import_ical(
/// New event form (HTMX HTML response) /// New event form (HTMX HTML response)
pub async fn new_event_form(State(_state): State<Arc<AppState>>) -> axum::response::Html<String> { pub async fn new_event_form(State(_state): State<Arc<AppState>>) -> axum::response::Html<String> {
axum::response::Html(r#" axum::response::Html(
r#"
<div class="event-form-content"> <div class="event-form-content">
<p>Create a new event using the form on the right panel.</p> <p>Create a new event using the form on the right panel.</p>
</div> </div>
"#.to_string()) "#
.to_string(),
)
} }
/// New calendar form (HTMX HTML response) /// New calendar form (HTMX HTML response)
pub async fn new_calendar_form(State(_state): State<Arc<AppState>>) -> axum::response::Html<String> { pub async fn new_calendar_form(
axum::response::Html(r#" State(_state): State<Arc<AppState>>,
) -> axum::response::Html<String> {
axum::response::Html(r##"
<form class="calendar-form" hx-post="/api/calendar/calendars" hx-swap="none"> <form class="calendar-form" hx-post="/api/calendar/calendars" hx-swap="none">
<div class="form-group"> <div class="form-group">
<label>Calendar Name</label> <label>Calendar Name</label>
@ -468,7 +506,33 @@ pub async fn new_calendar_form(State(_state): State<Arc<AppState>>) -> axum::res
<button type="submit" class="btn-primary">Create Calendar</button> <button type="submit" class="btn-primary">Create Calendar</button>
</div> </div>
</form> </form>
"#.to_string()) "##.to_string())
}
/// Start the reminder job that checks for upcoming events and sends notifications
pub async fn start_reminder_job(engine: Arc<CalendarEngine>) {
info!("Starting calendar reminder job");
loop {
// Check every minute for upcoming reminders
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
let now = Utc::now();
for event in &engine.events {
if let Some(reminder_minutes) = event.reminder_minutes {
let reminder_time =
event.start_time - chrono::Duration::minutes(reminder_minutes as i64);
// Check if we're within the reminder window (within 1 minute)
if now >= reminder_time && now < reminder_time + chrono::Duration::minutes(1) {
info!(
"Reminder: Event '{}' starts in {} minutes",
event.title, reminder_minutes
);
// TODO: Send actual notification via configured channels
}
}
}
}
} }
/// Configure calendar API routes /// Configure calendar API routes

View file

@ -390,7 +390,8 @@ impl CodeScanner {
.to_string_lossy() .to_string_lossy()
.to_string(); .to_string();
let bot_id = format!("{:x}", md5::compute(&bot_name)); let bot_id =
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, bot_name.as_bytes()).to_string();
let mut issues = Vec::new(); let mut issues = Vec::new();
let mut stats = ScanStats::default(); let mut stats = ScanStats::default();

View file

@ -47,7 +47,7 @@ pub enum ComplianceStatus {
} }
/// Severity levels /// Severity levels
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Severity { pub enum Severity {
Low, Low,
Medium, Medium,

View file

@ -10,7 +10,7 @@ use std::collections::HashMap;
use uuid::Uuid; use uuid::Uuid;
/// Risk category enumeration /// Risk category enumeration
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RiskCategory { pub enum RiskCategory {
Security, Security,
Compliance, Compliance,
@ -22,7 +22,7 @@ pub enum RiskCategory {
} }
/// Risk likelihood levels /// Risk likelihood levels
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Likelihood { pub enum Likelihood {
Rare, Rare,
Unlikely, Unlikely,
@ -32,7 +32,7 @@ pub enum Likelihood {
} }
/// Risk impact levels /// Risk impact levels
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum Impact { pub enum Impact {
Negligible, Negligible,
Minor, Minor,
@ -42,7 +42,7 @@ pub enum Impact {
} }
/// Risk level based on likelihood and impact /// Risk level based on likelihood and impact
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum RiskLevel { pub enum RiskLevel {
Low, Low,
Medium, Medium,
@ -51,7 +51,7 @@ pub enum RiskLevel {
} }
/// Risk status /// Risk status
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RiskStatus { pub enum RiskStatus {
Identified, Identified,
Assessed, Assessed,
@ -263,11 +263,7 @@ impl RiskAssessmentService {
} }
/// Add vulnerability to risk assessment /// Add vulnerability to risk assessment
pub fn add_vulnerability( pub fn add_vulnerability(&mut self, risk_id: Uuid, vulnerability: Vulnerability) -> Result<()> {
&mut self,
risk_id: Uuid,
vulnerability: Vulnerability,
) -> Result<()> {
let assessment = self let assessment = self
.assessments .assessments
.get_mut(&risk_id) .get_mut(&risk_id)
@ -340,9 +336,9 @@ impl RiskAssessmentService {
} }
} }
assessment.risk_level = assessment.risk_level = self
self.risk_matrix .risk_matrix
.calculate_risk_level(&assessment.likelihood, &assessment.impact); .calculate_risk_level(&assessment.likelihood, &assessment.impact);
Ok(()) Ok(())
} }
@ -489,24 +485,42 @@ impl Default for RiskMatrix {
matrix.insert((Likelihood::Unlikely, Impact::Minor), RiskLevel::Low); matrix.insert((Likelihood::Unlikely, Impact::Minor), RiskLevel::Low);
matrix.insert((Likelihood::Unlikely, Impact::Moderate), RiskLevel::Medium); matrix.insert((Likelihood::Unlikely, Impact::Moderate), RiskLevel::Medium);
matrix.insert((Likelihood::Unlikely, Impact::Major), RiskLevel::High); matrix.insert((Likelihood::Unlikely, Impact::Major), RiskLevel::High);
matrix.insert((Likelihood::Unlikely, Impact::Catastrophic), RiskLevel::High); matrix.insert(
(Likelihood::Unlikely, Impact::Catastrophic),
RiskLevel::High,
);
matrix.insert((Likelihood::Possible, Impact::Negligible), RiskLevel::Low); matrix.insert((Likelihood::Possible, Impact::Negligible), RiskLevel::Low);
matrix.insert((Likelihood::Possible, Impact::Minor), RiskLevel::Medium); matrix.insert((Likelihood::Possible, Impact::Minor), RiskLevel::Medium);
matrix.insert((Likelihood::Possible, Impact::Moderate), RiskLevel::Medium); matrix.insert((Likelihood::Possible, Impact::Moderate), RiskLevel::Medium);
matrix.insert((Likelihood::Possible, Impact::Major), RiskLevel::High); matrix.insert((Likelihood::Possible, Impact::Major), RiskLevel::High);
matrix.insert((Likelihood::Possible, Impact::Catastrophic), RiskLevel::Critical); matrix.insert(
(Likelihood::Possible, Impact::Catastrophic),
RiskLevel::Critical,
);
matrix.insert((Likelihood::Likely, Impact::Negligible), RiskLevel::Medium); matrix.insert((Likelihood::Likely, Impact::Negligible), RiskLevel::Medium);
matrix.insert((Likelihood::Likely, Impact::Minor), RiskLevel::Medium); matrix.insert((Likelihood::Likely, Impact::Minor), RiskLevel::Medium);
matrix.insert((Likelihood::Likely, Impact::Moderate), RiskLevel::High); matrix.insert((Likelihood::Likely, Impact::Moderate), RiskLevel::High);
matrix.insert((Likelihood::Likely, Impact::Major), RiskLevel::Critical); matrix.insert((Likelihood::Likely, Impact::Major), RiskLevel::Critical);
matrix.insert((Likelihood::Likely, Impact::Catastrophic), RiskLevel::Critical); matrix.insert(
(Likelihood::Likely, Impact::Catastrophic),
RiskLevel::Critical,
);
matrix.insert((Likelihood::AlmostCertain, Impact::Negligible), RiskLevel::Medium); matrix.insert(
(Likelihood::AlmostCertain, Impact::Negligible),
RiskLevel::Medium,
);
matrix.insert((Likelihood::AlmostCertain, Impact::Minor), RiskLevel::High); matrix.insert((Likelihood::AlmostCertain, Impact::Minor), RiskLevel::High);
matrix.insert((Likelihood::AlmostCertain, Impact::Moderate), RiskLevel::High); matrix.insert(
matrix.insert((Likelihood::AlmostCertain, Impact::Major), RiskLevel::Critical); (Likelihood::AlmostCertain, Impact::Moderate),
RiskLevel::High,
);
matrix.insert(
(Likelihood::AlmostCertain, Impact::Major),
RiskLevel::Critical,
);
matrix.insert( matrix.insert(
(Likelihood::AlmostCertain, Impact::Catastrophic), (Likelihood::AlmostCertain, Impact::Catastrophic),
RiskLevel::Critical, RiskLevel::Critical,

View file

@ -23,7 +23,7 @@ pub enum TrainingType {
} }
/// Training status /// Training status
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TrainingStatus { pub enum TrainingStatus {
NotStarted, NotStarted,
InProgress, InProgress,
@ -152,7 +152,8 @@ impl TrainingTracker {
max_attempts: 3, max_attempts: 3,
}; };
self.courses.insert(security_awareness.id, security_awareness); self.courses
.insert(security_awareness.id, security_awareness);
let data_protection = TrainingCourse { let data_protection = TrainingCourse {
id: Uuid::new_v4(), id: Uuid::new_v4(),
@ -213,11 +214,7 @@ impl TrainingTracker {
self.assignments.insert(assignment.id, assignment.clone()); self.assignments.insert(assignment.id, assignment.clone());
log::info!( log::info!("Assigned training '{}' to user {}", course.title, user_id);
"Assigned training '{}' to user {}",
course.title,
user_id
);
Ok(assignment) Ok(assignment)
} }
@ -298,7 +295,10 @@ impl TrainingTracker {
course_id: course.id, course_id: course.id,
issued_date: end_time, issued_date: end_time,
expiry_date: end_time + Duration::days(course.validity_days), expiry_date: end_time + Duration::days(course.validity_days),
certificate_number: format!("CERT-{}", Uuid::new_v4().to_string()[..8].to_uppercase()), certificate_number: format!(
"CERT-{}",
Uuid::new_v4().to_string()[..8].to_uppercase()
),
verification_code: Uuid::new_v4().to_string(), verification_code: Uuid::new_v4().to_string(),
}; };
@ -331,9 +331,11 @@ impl TrainingTracker {
let mut upcoming_trainings = vec![]; let mut upcoming_trainings = vec![];
for course in self.courses.values() { for course in self.courses.values() {
if course.required_for_roles.iter().any(|r| { if course
user_roles.contains(r) || r == "all" .required_for_roles
}) { .iter()
.any(|r| user_roles.contains(r) || r == "all")
{
required_trainings.push(course.id); required_trainings.push(course.id);
// Check if user has completed this training // Check if user has completed this training
@ -401,18 +403,14 @@ impl TrainingTracker {
let overdue_count = self let overdue_count = self
.assignments .assignments
.values() .values()
.filter(|a| { .filter(|a| a.status != TrainingStatus::Completed && a.due_date < Utc::now())
a.status != TrainingStatus::Completed
&& a.due_date < Utc::now()
})
.count(); .count();
let expiring_soon = self let expiring_soon = self
.certificates .certificates
.values() .values()
.filter(|c| { .filter(|c| {
c.expiry_date > Utc::now() c.expiry_date > Utc::now() && c.expiry_date < Utc::now() + Duration::days(30)
&& c.expiry_date < Utc::now() + Duration::days(30)
}) })
.count(); .count();
@ -460,10 +458,7 @@ impl TrainingTracker {
pub fn get_overdue_trainings(&self) -> Vec<TrainingAssignment> { pub fn get_overdue_trainings(&self) -> Vec<TrainingAssignment> {
self.assignments self.assignments
.values() .values()
.filter(|a| { .filter(|a| a.status != TrainingStatus::Completed && a.due_date < Utc::now())
a.status != TrainingStatus::Completed
&& a.due_date < Utc::now()
})
.cloned() .cloned()
.collect() .collect()
} }
@ -473,9 +468,7 @@ impl TrainingTracker {
let cutoff = Utc::now() + Duration::days(days_ahead); let cutoff = Utc::now() + Duration::days(days_ahead);
self.certificates self.certificates
.values() .values()
.filter(|c| { .filter(|c| c.expiry_date > Utc::now() && c.expiry_date <= cutoff)
c.expiry_date > Utc::now() && c.expiry_date <= cutoff
})
.cloned() .cloned()
.collect() .collect()
} }

View file

@ -109,7 +109,7 @@ impl UserDriveVectorDB {
if !exists { if !exists {
// Create collection for file embeddings (1536 dimensions for OpenAI embeddings) // Create collection for file embeddings (1536 dimensions for OpenAI embeddings)
client client
.create_collection(&CreateCollection { .create_collection(CreateCollection {
collection_name: self.collection_name.clone(), collection_name: self.collection_name.clone(),
vectors_config: Some(VectorsConfig { vectors_config: Some(VectorsConfig {
config: Some(Config::Params(VectorParams { config: Some(Config::Params(VectorParams {
@ -482,7 +482,7 @@ impl UserDriveVectorDB {
// Recreate empty collection // Recreate empty collection
client client
.create_collection(&CreateCollection { .create_collection(CreateCollection {
collection_name: self.collection_name.clone(), collection_name: self.collection_name.clone(),
vectors_config: Some(VectorsConfig { vectors_config: Some(VectorsConfig {
config: Some(Config::Params(VectorParams { config: Some(Config::Params(VectorParams {

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,7 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::fs;
use uuid::Uuid; use uuid::Uuid;
@ -87,7 +88,7 @@ impl UserEmailVectorDB {
if !exists { if !exists {
// Create collection for email embeddings (1536 dimensions for OpenAI embeddings) // Create collection for email embeddings (1536 dimensions for OpenAI embeddings)
client client
.create_collection(&CreateCollection { .create_collection(CreateCollection {
collection_name: self.collection_name.clone(), collection_name: self.collection_name.clone(),
vectors_config: Some(VectorsConfig { vectors_config: Some(VectorsConfig {
config: Some(Config::Params(VectorParams { config: Some(Config::Params(VectorParams {
@ -329,7 +330,7 @@ impl UserEmailVectorDB {
// Recreate empty collection // Recreate empty collection
client client
.create_collection(&CreateCollection { .create_collection(CreateCollection {
collection_name: self.collection_name.clone(), collection_name: self.collection_name.clone(),
vectors_config: Some(VectorsConfig { vectors_config: Some(VectorsConfig {
config: Some(Config::Params(VectorParams { config: Some(Config::Params(VectorParams {

96
src/instagram/mod.rs Normal file
View file

@ -0,0 +1,96 @@
pub use crate::core::bot::channels::instagram::*;
use crate::shared::state::AppState;
use axum::{
extract::{Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use serde::Deserialize;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
pub struct WebhookVerifyQuery {
#[serde(rename = "hub.mode")]
pub mode: Option<String>,
#[serde(rename = "hub.verify_token")]
pub verify_token: Option<String>,
#[serde(rename = "hub.challenge")]
pub challenge: Option<String>,
}
pub fn configure() -> Router<Arc<AppState>> {
Router::new()
.route(
"/api/instagram/webhook",
get(verify_webhook).post(handle_webhook),
)
.route("/api/instagram/send", post(send_message))
}
async fn verify_webhook(Query(query): Query<WebhookVerifyQuery>) -> impl IntoResponse {
let adapter = InstagramAdapter::new();
match (
query.mode.as_deref(),
query.verify_token.as_deref(),
query.challenge,
) {
(Some(mode), Some(token), Some(challenge)) => {
if let Some(response) = adapter
.handle_webhook_verification(mode, token, &challenge)
.await
{
(StatusCode::OK, response)
} else {
(StatusCode::FORBIDDEN, "Verification failed".to_string())
}
}
_ => (StatusCode::BAD_REQUEST, "Missing parameters".to_string()),
}
}
async fn handle_webhook(
State(_state): State<Arc<AppState>>,
Json(payload): Json<InstagramWebhookPayload>,
) -> impl IntoResponse {
for entry in payload.entry {
if let Some(messaging_list) = entry.messaging {
for messaging in messaging_list {
if let Some(message) = messaging.message {
if let Some(text) = message.text {
log::info!(
"Instagram message from={} text={}",
messaging.sender.id,
text
);
}
}
}
}
}
StatusCode::OK
}
async fn send_message(
State(_state): State<Arc<AppState>>,
Json(request): Json<serde_json::Value>,
) -> impl IntoResponse {
let adapter = InstagramAdapter::new();
let recipient = request.get("to").and_then(|v| v.as_str()).unwrap_or("");
let message = request
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("");
match adapter.send_instagram_message(recipient, message).await {
Ok(_) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"success": false, "error": e.to_string()})),
),
}
}

View file

@ -32,15 +32,15 @@ pub fn configure() -> Router<Arc<AppState>> {
.route("/api/meet/participants", get(all_participants)) .route("/api/meet/participants", get(all_participants))
.route("/api/meet/scheduled", get(scheduled_meetings)) .route("/api/meet/scheduled", get(scheduled_meetings))
.route( .route(
ApiUrls::MEET_ROOM_BY_ID.replace(":id", "{room_id}"), &ApiUrls::MEET_ROOM_BY_ID.replace(":id", "{room_id}"),
get(get_room), get(get_room),
) )
.route( .route(
ApiUrls::MEET_JOIN.replace(":id", "{room_id}"), &ApiUrls::MEET_JOIN.replace(":id", "{room_id}"),
post(join_room), post(join_room),
) )
.route( .route(
ApiUrls::MEET_TRANSCRIPTION.replace(":id", "{room_id}"), &ApiUrls::MEET_TRANSCRIPTION.replace(":id", "{room_id}"),
post(start_transcription), post(start_transcription),
) )
.route(ApiUrls::MEET_TOKEN, post(get_meeting_token)) .route(ApiUrls::MEET_TOKEN, post(get_meeting_token))

View file

@ -3,6 +3,7 @@ use crate::shared::state::AppState;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
use botlib::MessageType;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use log::{info, trace, warn}; use log::{info, trace, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -416,7 +417,7 @@ impl MeetingService {
session_id: room_id.to_string(), session_id: room_id.to_string(),
channel: "meeting".to_string(), channel: "meeting".to_string(),
content: text.to_string(), content: text.to_string(),
message_type: 0, message_type: MessageType::USER,
media_url: None, media_url: None,
timestamp: chrono::Utc::now(), timestamp: chrono::Utc::now(),
context_name: None, context_name: None,
@ -481,7 +482,7 @@ impl MeetingService {
session_id: message.session_id, session_id: message.session_id,
channel: "meeting".to_string(), channel: "meeting".to_string(),
content: format!("Processing: {}", message.content), content: format!("Processing: {}", message.content),
message_type: 1, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: Vec::new(), suggestions: Vec::new(),

137
src/msteams/mod.rs Normal file
View file

@ -0,0 +1,137 @@
pub use crate::core::bot::channels::teams::TeamsAdapter;
use crate::core::bot::channels::ChannelAdapter;
use crate::shared::state::AppState;
use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router};
use diesel::prelude::*;
use serde::Deserialize;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Deserialize)]
pub struct TeamsActivity {
#[serde(rename = "type")]
pub activity_type: String,
pub id: String,
pub timestamp: Option<String>,
#[serde(rename = "serviceUrl")]
pub service_url: Option<String>,
#[serde(rename = "channelId")]
pub channel_id: Option<String>,
pub from: TeamsChannelAccount,
pub conversation: TeamsConversationAccount,
pub recipient: Option<TeamsChannelAccount>,
pub text: Option<String>,
pub value: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct TeamsChannelAccount {
pub id: String,
pub name: Option<String>,
#[serde(rename = "aadObjectId")]
pub aad_object_id: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct TeamsConversationAccount {
pub id: String,
#[serde(rename = "conversationType")]
pub conversation_type: Option<String>,
#[serde(rename = "tenantId")]
pub tenant_id: Option<String>,
pub name: Option<String>,
}
pub fn configure() -> Router<Arc<AppState>> {
Router::new()
.route("/api/msteams/messages", post(handle_incoming))
.route("/api/msteams/send", post(send_message))
}
async fn handle_incoming(
State(state): State<Arc<AppState>>,
Json(activity): Json<TeamsActivity>,
) -> impl IntoResponse {
match activity.activity_type.as_str() {
"message" => {
if let Some(text) = &activity.text {
log::info!(
"Teams message from={} conversation={} text={}",
activity.from.id,
activity.conversation.id,
text
);
}
(StatusCode::OK, Json(serde_json::json!({})))
}
"conversationUpdate" => {
log::info!("Teams conversation update id={}", activity.id);
(StatusCode::OK, Json(serde_json::json!({})))
}
"invoke" => {
log::info!("Teams invoke id={}", activity.id);
(StatusCode::OK, Json(serde_json::json!({"status": 200})))
}
_ => (StatusCode::OK, Json(serde_json::json!({}))),
}
}
async fn send_message(
State(state): State<Arc<AppState>>,
Json(request): Json<serde_json::Value>,
) -> impl IntoResponse {
let bot_id = get_default_bot_id(&state).await;
let adapter = TeamsAdapter::new(state.conn.clone(), bot_id);
let conversation_id = request
.get("conversation_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let message = request
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("");
let response = crate::shared::models::BotResponse {
bot_id: bot_id.to_string(),
session_id: conversation_id.to_string(),
user_id: conversation_id.to_string(),
channel: "teams".to_string(),
content: message.to_string(),
message_type: botlib::MessageType::BOT_RESPONSE,
stream_token: None,
is_complete: true,
suggestions: vec![],
context_name: None,
context_length: 0,
context_max_length: 0,
};
match adapter.send_message(response).await {
Ok(_) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"success": false, "error": e.to_string()})),
),
}
}
async fn get_default_bot_id(state: &Arc<AppState>) -> Uuid {
let conn = state.conn.clone();
tokio::task::spawn_blocking(move || {
let mut db_conn = conn.get().ok()?;
use crate::shared::models::schema::bots;
use diesel::prelude::*;
bots::table
.filter(bots::is_active.eq(true))
.select(bots::id)
.first::<Uuid>(&mut db_conn)
.ok()
})
.await
.ok()
.flatten()
.unwrap_or_else(Uuid::nil)
}

View file

@ -202,7 +202,6 @@ pub struct BM25Index {
enabled: bool, enabled: bool,
} }
#[cfg(not(feature = "vectordb"))]
impl BM25Index { impl BM25Index {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@ -351,7 +350,6 @@ pub struct BM25Stats {
pub enabled: bool, pub enabled: bool,
} }
/// Document entry in the store /// Document entry in the store
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DocumentEntry { struct DocumentEntry {
@ -757,7 +755,6 @@ pub struct HybridSearchStats {
pub config: HybridSearchConfig, pub config: HybridSearchConfig,
} }
/// Query decomposition for complex questions /// Query decomposition for complex questions
pub struct QueryDecomposer { pub struct QueryDecomposer {
llm_endpoint: String, llm_endpoint: String,
@ -841,7 +838,6 @@ impl QueryDecomposer {
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -39,12 +39,6 @@ pub use hybrid_search::{
SearchMethod, SearchResult, SearchMethod, SearchResult,
}; };
// Tantivy BM25 index (when vectordb feature enabled)
#[cfg(feature = "vectordb")]
pub use hybrid_search::TantivyBM25Index;
// Fallback BM25 index (when vectordb feature NOT enabled)
#[cfg(not(feature = "vectordb"))]
pub use hybrid_search::BM25Index; pub use hybrid_search::BM25Index;
pub use vectordb_indexer::{IndexingStats, IndexingStatus, VectorDBIndexer}; pub use vectordb_indexer::{IndexingStats, IndexingStatus, VectorDBIndexer};

View file

@ -39,8 +39,15 @@ impl UserWorkspace {
.join(self.bot_id.to_string()) .join(self.bot_id.to_string())
.join(self.user_id.to_string()) .join(self.user_id.to_string())
} }
}
fn email_vectordb(&self) -> String {
format!("email_{}_{}", self.bot_id, self.user_id)
}
fn drive_vectordb(&self) -> String {
format!("drive_{}_{}", self.bot_id, self.user_id)
}
}
/// Indexing job status /// Indexing job status
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
@ -455,11 +462,11 @@ impl VectorDBIndexer {
user_id: Uuid, user_id: Uuid,
account_id: &str, account_id: &str,
) -> Result<Vec<EmailDocument>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Vec<EmailDocument>, Box<dyn std::error::Error + Send + Sync>> {
let pool = self.pool.clone(); let pool = self.db_pool.clone();
let account_id = account_id.to_string(); let account_id = account_id.to_string();
let results = tokio::task::spawn_blocking(move || { let results = tokio::task::spawn_blocking(move || {
let conn = pool.get()?; let mut conn = pool.get()?;
let query = r#" let query = r#"
SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses, SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses,
@ -486,7 +493,17 @@ impl VectorDBIndexer {
)> = diesel::sql_query(query) )> = diesel::sql_query(query)
.bind::<diesel::sql_types::Uuid, _>(user_id) .bind::<diesel::sql_types::Uuid, _>(user_id)
.bind::<diesel::sql_types::Text, _>(&account_id) .bind::<diesel::sql_types::Text, _>(&account_id)
.load(&conn) .load::<(
Uuid,
String,
String,
String,
String,
Option<String>,
Option<String>,
DateTime<Utc>,
String,
)>(&mut conn)
.unwrap_or_default(); .unwrap_or_default();
let emails: Vec<EmailDocument> = rows let emails: Vec<EmailDocument> = rows
@ -505,18 +522,16 @@ impl VectorDBIndexer {
)| { )| {
EmailDocument { EmailDocument {
id: id.to_string(), id: id.to_string(),
message_id,
subject,
from_address: from,
to_addresses: to.split(',').map(|s| s.trim().to_string()).collect(),
cc_addresses: Vec::new(),
body_text: body_text.unwrap_or_default(),
body_html,
received_at,
folder,
labels: Vec::new(),
has_attachments: false,
account_id: account_id.clone(), account_id: account_id.clone(),
from_email: from.clone(),
from_name: from,
to_email: to,
subject,
body_text: body_text.unwrap_or_default(),
date: received_at,
folder,
has_attachments: false,
thread_id: None,
} }
}, },
) )
@ -533,10 +548,10 @@ impl VectorDBIndexer {
&self, &self,
user_id: Uuid, user_id: Uuid,
) -> Result<Vec<FileDocument>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Vec<FileDocument>, Box<dyn std::error::Error + Send + Sync>> {
let pool = self.pool.clone(); let pool = self.db_pool.clone();
let results = tokio::task::spawn_blocking(move || { let results = tokio::task::spawn_blocking(move || {
let conn = pool.get()?; let mut conn = pool.get()?;
let query = r#" let query = r#"
SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size, SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size,
@ -562,7 +577,7 @@ impl VectorDBIndexer {
DateTime<Utc>, DateTime<Utc>,
)> = diesel::sql_query(query) )> = diesel::sql_query(query)
.bind::<diesel::sql_types::Uuid, _>(user_id) .bind::<diesel::sql_types::Uuid, _>(user_id)
.load(&conn) .load::<(Uuid, String, String, i64, DateTime<Utc>)>(&mut conn)
.unwrap_or_default(); .unwrap_or_default();
let files: Vec<FileDocument> = rows let files: Vec<FileDocument> = rows

View file

@ -17,11 +17,11 @@
//! whatsapp-business-account-id,your_business_account_id //! whatsapp-business-account-id,your_business_account_id
//! ``` //! ```
use crate::bot::BotOrchestrator;
use crate::core::bot::channels::whatsapp::WhatsAppAdapter; use crate::core::bot::channels::whatsapp::WhatsAppAdapter;
use crate::core::bot::channels::ChannelAdapter; use crate::core::bot::channels::ChannelAdapter;
use crate::core::bot::orchestrator::BotOrchestrator;
use crate::shared::models::{BotResponse, UserMessage, UserSession}; use crate::shared::models::{BotResponse, UserMessage, UserSession};
use crate::shared::state::AppState; use crate::shared::state::{AppState, AttendantNotification};
use axum::{ use axum::{
extract::{Query, State}, extract::{Query, State},
http::StatusCode, http::StatusCode,
@ -29,6 +29,7 @@ use axum::{
routing::{get, post}, routing::{get, post},
Json, Router, Json, Router,
}; };
use botlib::MessageType;
use chrono::Utc; use chrono::Utc;
use diesel::prelude::*; use diesel::prelude::*;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
@ -41,22 +42,6 @@ use uuid::Uuid;
/// WebSocket broadcast channel for attendant notifications /// WebSocket broadcast channel for attendant notifications
pub type AttendantBroadcast = broadcast::Sender<AttendantNotification>; pub type AttendantBroadcast = broadcast::Sender<AttendantNotification>;
/// Notification sent to attendants via WebSocket
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AttendantNotification {
#[serde(rename = "type")]
pub notification_type: String,
pub session_id: String,
pub user_id: String,
pub user_name: Option<String>,
pub user_phone: Option<String>,
pub channel: String,
pub content: String,
pub timestamp: String,
pub assigned_to: Option<String>,
pub priority: i32,
}
/// WhatsApp webhook verification query parameters /// WhatsApp webhook verification query parameters
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct WebhookVerifyQuery { pub struct WebhookVerifyQuery {
@ -327,11 +312,13 @@ async fn process_incoming_message(
user_id: phone.clone(), user_id: phone.clone(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: response, content: response,
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
if let Err(e) = adapter.send_message(bot_response).await { if let Err(e) = adapter.send_message(bot_response).await {
error!("Failed to send attendant command response: {}", e); error!("Failed to send attendant command response: {}", e);
@ -374,17 +361,29 @@ async fn process_attendant_command(
// Get current session the attendant is handling (if any) // Get current session the attendant is handling (if any)
let current_session = get_attendant_active_session(state, phone).await; let current_session = get_attendant_active_session(state, phone).await;
// Process the command using llm_assist module // Process the command using llm_assist module (only if attendance feature is enabled)
match crate::attendance::llm_assist::process_attendant_command( #[cfg(feature = "attendance")]
state,
phone,
content,
current_session,
)
.await
{ {
Ok(response) => Some(response), match crate::attendance::llm_assist::process_attendant_command(
Err(e) => Some(format!("❌ Error: {}", e)), state,
phone,
content,
current_session,
)
.await
{
Ok(response) => return Some(response),
Err(e) => return Some(format!("❌ Error: {}", e)),
}
}
#[cfg(not(feature = "attendance"))]
{
let _ = current_session; // Suppress unused warning
Some(format!(
"Attendance module not enabled. Message: {}",
content
))
} }
} }
@ -568,7 +567,7 @@ async fn find_or_create_session(
// Check if session is recent (within 24 hours) // Check if session is recent (within 24 hours)
let age = Utc::now() - session.updated_at; let age = Utc::now() - session.updated_at;
if age.num_hours() < 24 { if age.num_hours() < 24 {
return Ok((session, false)); return Ok::<(UserSession, bool), String>((session, false));
} }
} }
@ -597,7 +596,7 @@ async fn find_or_create_session(
.first(&mut db_conn) .first(&mut db_conn)
.map_err(|e| format!("Load session error: {}", e))?; .map_err(|e| format!("Load session error: {}", e))?;
Ok((new_session, true)) Ok::<(UserSession, bool), String>((new_session, true))
}) })
.await .await
.map_err(|e| format!("Task error: {}", e))??; .map_err(|e| format!("Task error: {}", e))??;
@ -623,9 +622,15 @@ async fn route_to_bot(
info!("Routing WhatsApp message to bot for session {}", session.id); info!("Routing WhatsApp message to bot for session {}", session.id);
let user_message = UserMessage { let user_message = UserMessage {
session_id: session.id.to_string(), bot_id: session.bot_id.to_string(),
content: content.to_string(),
user_id: session.user_id.to_string(), user_id: session.user_id.to_string(),
session_id: session.id.to_string(),
channel: "whatsapp".to_string(),
content: content.to_string(),
message_type: MessageType::USER,
media_url: None,
timestamp: Utc::now(),
context_name: None,
}; };
// Get WhatsApp adapter for sending responses // Get WhatsApp adapter for sending responses
@ -645,6 +650,7 @@ async fn route_to_bot(
.unwrap_or("") .unwrap_or("")
.to_string(); .to_string();
let phone_for_error = phone.clone(); // Clone for use in error handling after move
let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id); let adapter_for_send = WhatsAppAdapter::new(state.conn.clone(), session.bot_id);
tokio::spawn(async move { tokio::spawn(async move {
@ -662,26 +668,25 @@ async fn route_to_bot(
} }
}); });
// Process message // Process message using stream_response
if let Err(e) = orchestrator if let Err(e) = orchestrator.stream_response(user_message, tx).await {
.process_message(user_message, Some(tx), is_new)
.await
{
error!("Bot processing error: {}", e); error!("Bot processing error: {}", e);
// Send error message back // Send error message back
let error_response = BotResponse { let error_response = BotResponse {
bot_id: session.bot_id.to_string(), bot_id: session.bot_id.to_string(),
session_id: session.id.to_string(), session_id: session.id.to_string(),
user_id: phone.clone(), user_id: phone_for_error.clone(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: "Sorry, I encountered an error processing your message. Please try again." content: "Sorry, I encountered an error processing your message. Please try again."
.to_string(), .to_string(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
if let Err(e) = adapter.send_message(error_response).await { if let Err(e) = adapter.send_message(error_response).await {
@ -759,6 +764,7 @@ async fn save_message_to_history(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let conn = state.conn.clone(); let conn = state.conn.clone();
let session_id = session.id; let session_id = session.id;
let user_id = session.user_id; // Get the actual user_id from the session
let content_clone = content.to_string(); let content_clone = content.to_string();
let sender_clone = sender.to_string(); let sender_clone = sender.to_string();
@ -771,8 +777,11 @@ async fn save_message_to_history(
.values(( .values((
message_history::id.eq(Uuid::new_v4()), message_history::id.eq(Uuid::new_v4()),
message_history::session_id.eq(session_id), message_history::session_id.eq(session_id),
message_history::role.eq(sender_clone), message_history::user_id.eq(user_id), // User associated with the message (has mobile field)
message_history::content.eq(content_clone), message_history::role.eq(if sender_clone == "user" { 1 } else { 2 }),
message_history::content_encrypted.eq(content_clone),
message_history::message_type.eq(1),
message_history::message_index.eq(0i64),
message_history::created_at.eq(diesel::dsl::now), message_history::created_at.eq(diesel::dsl::now),
)) ))
.execute(&mut db_conn) .execute(&mut db_conn)
@ -853,11 +862,13 @@ pub async fn send_message(
user_id: request.to.clone(), user_id: request.to.clone(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: request.message.clone(), content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::EXTERNAL, message_type: MessageType::EXTERNAL,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
match adapter.send_message(response).await { match adapter.send_message(response).await {
@ -975,11 +986,13 @@ pub async fn attendant_respond(
user_id: recipient.to_string(), user_id: recipient.to_string(),
channel: "whatsapp".to_string(), channel: "whatsapp".to_string(),
content: request.message.clone(), content: request.message.clone(),
message_type: crate::shared::models::message_types::MessageType::BOT_RESPONSE, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions: vec![], suggestions: vec![],
context_name: None, context_name: None,
context_length: 0,
context_max_length: 0,
}; };
match adapter.send_message(response).await { match adapter.send_message(response).await {
@ -1033,15 +1046,15 @@ pub async fn attendant_respond(
async fn get_verify_token(_state: &Arc<AppState>) -> String { async fn get_verify_token(_state: &Arc<AppState>) -> String {
// Get verify token from Vault - stored at gbo/whatsapp // Get verify token from Vault - stored at gbo/whatsapp
use crate::core::secrets::SecretsManager; use crate::core::secrets::SecretsManager;
match SecretsManager::new() { match SecretsManager::from_env() {
Ok(secrets) => { Ok(secrets) => {
match secrets.get("gbo/whatsapp", "verify_token").await { match secrets.get_value("gbo/whatsapp", "verify_token").await {
Ok(token) => token, Ok(token) => token,
Err(_) => "webhook_verify".to_string() // Default for initial setup Err(_) => "webhook_verify".to_string(), // Default for initial setup
} }
} }
Err(_) => "webhook_verify".to_string() // Default if Vault not configured Err(_) => "webhook_verify".to_string(), // Default if Vault not configured
} }
} }