feat: Add Auto Task system - Intent Compiler, MCP Client, Safety Layer

- intent_compiler.rs: LLM-to-BASIC translation engine
- auto_task.rs: Auto-executing task data structures
- mcp_client.rs: Model Context Protocol server integration
- safety_layer.rs: Constraints, simulation, audit trail
- autotask_api.rs: HTTP API handlers for Auto Task UI
- Updated mod.rs with new modules and keyword list
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-12 12:33:17 -03:00
parent 777a3eae63
commit 48f3cfb6f3
9 changed files with 4464 additions and 2081 deletions

View file

@ -1,527 +0,0 @@
# Stalwart API Mapping for General Bots
**Version:** 6.1.0
**Purpose:** Map Stalwart native features vs General Bots custom tables
---
## Overview
Stalwart Mail Server provides a comprehensive REST Management API. Many email features that we might implement in our database are already available natively in Stalwart. This document maps what to use from Stalwart vs what we manage ourselves.
---
## API Base URL
```
https://{stalwart_host}:{port}/api
```
Default ports:
- HTTP: 8080
- HTTPS: 443
---
## Feature Mapping
### ✅ USE STALWART API (Do NOT create tables)
| Feature | Stalwart Endpoint | Notes |
|---------|------------------|-------|
| **User/Account Management** | `GET/POST/PATCH/DELETE /principal/{id}` | Create, update, delete email accounts |
| **Email Queue** | `GET /queue/messages` | List queued messages for delivery |
| **Queue Status** | `GET /queue/status` | Check if queue is running |
| **Queue Control** | `PATCH /queue/status/start` `PATCH /queue/status/stop` | Start/stop queue processing |
| **Reschedule Delivery** | `PATCH /queue/messages/{id}` | Retry failed deliveries |
| **Cancel Delivery** | `DELETE /queue/messages/{id}` | Cancel queued message |
| **Distribution Lists** | `POST /principal` with `type: "list"` | Mailing lists are "principals" |
| **DKIM Signatures** | `POST /dkim` | Create DKIM keys per domain |
| **DNS Records** | `GET /dns/records/{domain}` | Get required DNS records |
| **Spam Training** | `POST /spam-filter/train/spam` `POST /spam-filter/train/ham` | Train spam filter |
| **Spam Classification** | `POST /spam-filter/classify` | Test spam score |
| **Telemetry/Metrics** | `GET /telemetry/metrics` | Server metrics for monitoring |
| **Live Metrics** | `GET /telemetry/metrics/live` | Real-time metrics (WebSocket) |
| **Logs** | `GET /logs` | Query server logs |
| **Traces** | `GET /telemetry/traces` | Delivery traces |
| **Live Tracing** | `GET /telemetry/traces/live` | Real-time tracing |
| **DMARC Reports** | `GET /reports/dmarc` | Incoming DMARC reports |
| **TLS Reports** | `GET /reports/tls` | TLS-RPT reports |
| **ARF Reports** | `GET /reports/arf` | Abuse feedback reports |
| **Troubleshooting** | `GET /troubleshoot/delivery/{recipient}` | Debug delivery issues |
| **DMARC Check** | `POST /troubleshoot/dmarc` | Test DMARC/SPF/DKIM |
| **Settings** | `GET/POST /settings` | Server configuration |
| **Undelete** | `GET/POST /store/undelete/{account_id}` | Recover deleted messages |
| **Account Purge** | `GET /store/purge/account/{id}` | Purge account data |
| **Encryption Settings** | `GET/POST /account/crypto` | Encryption-at-rest |
| **2FA/App Passwords** | `GET/POST /account/auth` | Authentication settings |
### ⚠️ USE BOTH (Stalwart + Our Tables)
| Feature | Stalwart | Our Table | Why Both? |
|---------|----------|-----------|-----------|
| **Auto-Responders** | Sieve scripts via settings | `email_auto_responders` | We store UI config, sync to Stalwart Sieve |
| **Email Rules/Filters** | Sieve scripts | `email_rules` | We store UI-friendly rules, compile to Sieve |
| **Shared Mailboxes** | Principal with shared access | `shared_mailboxes` | We track permissions, Stalwart handles access |
### ✅ USE OUR TABLES (Stalwart doesn't provide)
| Feature | Our Table | Why? |
|---------|-----------|------|
| **Global Email Signature** | `global_email_signatures` | Bot-level branding, not in Stalwart |
| **User Email Signature** | `email_signatures` | User preferences, append before send |
| **Scheduled Send** | `scheduled_emails` | We queue and release at scheduled time |
| **Email Templates** | `email_templates` | Business templates with variables |
| **Email Labels** | `email_labels`, `email_label_assignments` | UI organization, not IMAP folders |
| **Email Tracking** | `sent_email_tracking` (existing) | Open/click tracking pixels |
---
## Stalwart API Integration Code
### Client Setup
```rust
// src/email/stalwart_client.rs
pub struct StalwartClient {
base_url: String,
auth_token: String,
http_client: reqwest::Client,
}
impl StalwartClient {
pub fn new(base_url: &str, token: &str) -> Self {
Self {
base_url: base_url.to_string(),
auth_token: token.to_string(),
http_client: reqwest::Client::new(),
}
}
async fn request<T: DeserializeOwned>(&self, method: Method, path: &str, body: Option<Value>) -> Result<T> {
let url = format!("{}{}", self.base_url, path);
let mut req = self.http_client.request(method, &url)
.header("Authorization", format!("Bearer {}", self.auth_token));
if let Some(b) = body {
req = req.json(&b);
}
let resp = req.send().await?;
let data: ApiResponse<T> = resp.json().await?;
Ok(data.data)
}
}
```
### Queue Monitoring (for Analytics Dashboard)
```rust
impl StalwartClient {
/// Get email queue status for monitoring dashboard
pub async fn get_queue_status(&self) -> Result<QueueStatus> {
let status: bool = self.request(Method::GET, "/api/queue/status", None).await?;
let messages: QueueList = self.request(Method::GET, "/api/queue/messages?limit=100", None).await?;
Ok(QueueStatus {
is_running: status,
total_queued: messages.total,
messages: messages.items,
})
}
/// Get queued message details
pub async fn get_queued_message(&self, message_id: &str) -> Result<QueuedMessage> {
self.request(Method::GET, &format!("/api/queue/messages/{}", message_id), None).await
}
/// Retry failed delivery
pub async fn retry_delivery(&self, message_id: &str) -> Result<bool> {
self.request(Method::PATCH, &format!("/api/queue/messages/{}", message_id), None).await
}
/// Cancel queued message
pub async fn cancel_delivery(&self, message_id: &str) -> Result<bool> {
self.request(Method::DELETE, &format!("/api/queue/messages/{}", message_id), None).await
}
/// Stop all queue processing
pub async fn stop_queue(&self) -> Result<bool> {
self.request(Method::PATCH, "/api/queue/status/stop", None).await
}
/// Resume queue processing
pub async fn start_queue(&self) -> Result<bool> {
self.request(Method::PATCH, "/api/queue/status/start", None).await
}
}
```
### Account/Principal Management
```rust
impl StalwartClient {
/// Create email account
pub async fn create_account(&self, email: &str, password: &str, display_name: &str) -> Result<u64> {
let body = json!({
"type": "individual",
"name": email.split('@').next().unwrap_or(email),
"emails": [email],
"secrets": [password],
"description": display_name,
"quota": 0,
"roles": ["user"]
});
self.request(Method::POST, "/api/principal", Some(body)).await
}
/// Create distribution list
pub async fn create_distribution_list(&self, name: &str, email: &str, members: Vec<String>) -> Result<u64> {
let body = json!({
"type": "list",
"name": name,
"emails": [email],
"members": members,
"description": format!("Distribution list: {}", name)
});
self.request(Method::POST, "/api/principal", Some(body)).await
}
/// Get account details
pub async fn get_account(&self, account_id: &str) -> Result<Principal> {
self.request(Method::GET, &format!("/api/principal/{}", account_id), None).await
}
/// Update account
pub async fn update_account(&self, account_id: &str, updates: Vec<AccountUpdate>) -> Result<()> {
let body: Vec<Value> = updates.iter().map(|u| json!({
"action": u.action,
"field": u.field,
"value": u.value
})).collect();
self.request(Method::PATCH, &format!("/api/principal/{}", account_id), Some(json!(body))).await
}
/// Delete account
pub async fn delete_account(&self, account_id: &str) -> Result<()> {
self.request(Method::DELETE, &format!("/api/principal/{}", account_id), None).await
}
}
```
### Sieve Rules (Auto-Responders & Filters)
```rust
impl StalwartClient {
/// Set vacation/out-of-office auto-responder via Sieve
pub async fn set_auto_responder(&self, account_id: &str, config: &AutoResponderConfig) -> Result<()> {
let sieve_script = self.generate_vacation_sieve(config);
let updates = vec![json!({
"type": "set",
"prefix": format!("sieve.scripts.{}.vacation", account_id),
"value": sieve_script
})];
self.request(Method::POST, "/api/settings", Some(json!(updates))).await
}
fn generate_vacation_sieve(&self, config: &AutoResponderConfig) -> String {
let mut script = String::from("require [\"vacation\", \"variables\"];\n\n");
if let Some(start) = &config.start_date {
script.push_str(&format!("# Active from: {}\n", start));
}
if let Some(end) = &config.end_date {
script.push_str(&format!("# Active until: {}\n", end));
}
script.push_str(&format!(
r#"vacation :days 1 :subject "{}" "{}";"#,
config.subject.replace('"', "\\\""),
config.body_plain.replace('"', "\\\"")
));
script
}
/// Set email filter rule via Sieve
pub async fn set_filter_rule(&self, account_id: &str, rule: &EmailRule) -> Result<()> {
let sieve_script = self.generate_filter_sieve(rule);
let updates = vec![json!({
"type": "set",
"prefix": format!("sieve.scripts.{}.filter_{}", account_id, rule.id),
"value": sieve_script
})];
self.request(Method::POST, "/api/settings", Some(json!(updates))).await
}
fn generate_filter_sieve(&self, rule: &EmailRule) -> String {
let mut script = String::from("require [\"fileinto\", \"reject\", \"vacation\"];\n\n");
// Generate conditions
for condition in &rule.conditions {
match condition.field.as_str() {
"from" => script.push_str(&format!(
"if header :contains \"From\" \"{}\" {{\n",
condition.value
)),
"subject" => script.push_str(&format!(
"if header :contains \"Subject\" \"{}\" {{\n",
condition.value
)),
_ => {}
}
}
// Generate actions
for action in &rule.actions {
match action.action_type.as_str() {
"move" => script.push_str(&format!(" fileinto \"{}\";\n", action.value)),
"delete" => script.push_str(" discard;\n"),
"mark_read" => script.push_str(" setflag \"\\\\Seen\";\n"),
_ => {}
}
}
if rule.stop_processing {
script.push_str(" stop;\n");
}
script.push_str("}\n");
script
}
}
```
### Telemetry & Monitoring
```rust
impl StalwartClient {
/// Get server metrics for dashboard
pub async fn get_metrics(&self) -> Result<Metrics> {
self.request(Method::GET, "/api/telemetry/metrics", None).await
}
/// Get server logs
pub async fn get_logs(&self, page: u32, limit: u32) -> Result<LogList> {
self.request(
Method::GET,
&format!("/api/logs?page={}&limit={}", page, limit),
None
).await
}
/// Get delivery traces
pub async fn get_traces(&self, trace_type: &str, page: u32) -> Result<TraceList> {
self.request(
Method::GET,
&format!("/api/telemetry/traces?type={}&page={}&limit=50", trace_type, page),
None
).await
}
/// Get specific trace details
pub async fn get_trace(&self, trace_id: &str) -> Result<Vec<TraceEvent>> {
self.request(Method::GET, &format!("/api/telemetry/trace/{}", trace_id), None).await
}
/// Get DMARC reports
pub async fn get_dmarc_reports(&self, page: u32) -> Result<ReportList> {
self.request(Method::GET, &format!("/api/reports/dmarc?page={}&limit=50", page), None).await
}
/// Get TLS reports
pub async fn get_tls_reports(&self, page: u32) -> Result<ReportList> {
self.request(Method::GET, &format!("/api/reports/tls?page={}&limit=50", page), None).await
}
}
```
### Spam Filter
```rust
impl StalwartClient {
/// Train message as spam
pub async fn train_spam(&self, raw_message: &str) -> Result<()> {
self.http_client
.post(&format!("{}/api/spam-filter/train/spam", self.base_url))
.header("Authorization", format!("Bearer {}", self.auth_token))
.header("Content-Type", "message/rfc822")
.body(raw_message.to_string())
.send()
.await?;
Ok(())
}
/// Train message as ham (not spam)
pub async fn train_ham(&self, raw_message: &str) -> Result<()> {
self.http_client
.post(&format!("{}/api/spam-filter/train/ham", self.base_url))
.header("Authorization", format!("Bearer {}", self.auth_token))
.header("Content-Type", "message/rfc822")
.body(raw_message.to_string())
.send()
.await?;
Ok(())
}
/// Classify message (get spam score)
pub async fn classify_message(&self, message: &SpamClassifyRequest) -> Result<SpamClassifyResult> {
self.request(Method::POST, "/api/spam-filter/classify", Some(json!(message))).await
}
}
```
---
## Monitoring Dashboard Integration
### Endpoints to Poll
| Metric | Endpoint | Poll Interval |
|--------|----------|---------------|
| Queue Size | `GET /queue/messages` | 30s |
| Queue Status | `GET /queue/status` | 30s |
| Server Metrics | `GET /telemetry/metrics` | 60s |
| Recent Logs | `GET /logs?limit=100` | 60s |
| Delivery Traces | `GET /telemetry/traces?type=delivery.attempt-start` | 60s |
| Failed Deliveries | `GET /queue/messages?filter=status:failed` | 60s |
### WebSocket Endpoints (Real-time)
| Feature | Endpoint | Token Endpoint |
|---------|----------|----------------|
| Live Metrics | `ws://.../telemetry/metrics/live` | `GET /telemetry/live/metrics-token` |
| Live Traces | `ws://.../telemetry/traces/live` | `GET /telemetry/live/tracing-token` |
---
## Tables to REMOVE from Migration
Based on this mapping, these tables are **REDUNDANT** and should be removed:
```sql
-- REMOVE: Stalwart handles distribution lists via principals
-- DROP TABLE IF EXISTS distribution_lists;
-- KEEP: We need this for UI config, but sync to Stalwart Sieve
-- email_auto_responders (KEEP but add stalwart_sieve_id column)
-- KEEP: We need this for UI config, but sync to Stalwart Sieve
-- email_rules (KEEP but add stalwart_sieve_id column)
```
---
## Migration Updates Needed
The current `6.1.0_enterprise_suite` migration already correctly:
1. ✅ Keeps `global_email_signatures` - Stalwart doesn't have this
2. ✅ Keeps `email_signatures` - User preference, not in Stalwart
3. ✅ Keeps `scheduled_emails` - We manage scheduling
4. ✅ Keeps `email_templates` - Business feature
5. ✅ Keeps `email_labels` - UI organization
6. ✅ Has `stalwart_sieve_id` in `email_auto_responders` - For sync
7. ✅ Has `stalwart_sieve_id` in `email_rules` - For sync
8. ✅ Has `stalwart_account_id` in `shared_mailboxes` - For sync
The `distribution_lists` table could potentially be removed since Stalwart handles lists as principals, BUT we may want to keep it for:
- Caching/faster lookups
- UI metadata not stored in Stalwart
- Offline resilience
**Recommendation**: Keep `distribution_lists` but sync with Stalwart principals.
---
## Sync Strategy
### On Create (Our DB → Stalwart)
```rust
async fn create_distribution_list(db: &Pool, stalwart: &StalwartClient, list: NewDistributionList) -> Result<Uuid> {
// 1. Create in Stalwart first
let stalwart_id = stalwart.create_distribution_list(
&list.name,
&list.email_alias,
list.members.clone()
).await?;
// 2. Store in our DB with stalwart reference
let id = db.insert_distribution_list(DistributionList {
name: list.name,
email_alias: list.email_alias,
members_json: serde_json::to_string(&list.members)?,
stalwart_principal_id: Some(stalwart_id.to_string()),
..Default::default()
}).await?;
Ok(id)
}
```
### On Update (Sync both)
```rust
async fn update_distribution_list(db: &Pool, stalwart: &StalwartClient, id: Uuid, updates: ListUpdates) -> Result<()> {
// 1. Get current record
let list = db.get_distribution_list(&id).await?;
// 2. Update Stalwart if we have a reference
if let Some(stalwart_id) = &list.stalwart_principal_id {
stalwart.update_principal(stalwart_id, updates.to_stalwart_updates()).await?;
}
// 3. Update our DB
db.update_distribution_list(&id, updates).await?;
Ok(())
}
```
### On Delete (Both)
```rust
async fn delete_distribution_list(db: &Pool, stalwart: &StalwartClient, id: Uuid) -> Result<()> {
let list = db.get_distribution_list(&id).await?;
// 1. Delete from Stalwart
if let Some(stalwart_id) = &list.stalwart_principal_id {
stalwart.delete_principal(stalwart_id).await?;
}
// 2. Delete from our DB
db.delete_distribution_list(&id).await?;
Ok(())
}
```
---
## Summary
| Category | Use Stalwart | Use Our Tables | Use Both |
|----------|-------------|----------------|----------|
| Account Management | ✅ | | |
| Email Queue | ✅ | | |
| Queue Monitoring | ✅ | | |
| Distribution Lists | | | ✅ |
| Auto-Responders | | | ✅ |
| Email Rules/Filters | | | ✅ |
| Shared Mailboxes | | | ✅ |
| Email Signatures | | ✅ | |
| Scheduled Send | | ✅ | |
| Email Templates | | ✅ | |
| Email Labels | | ✅ | |
| Email Tracking | | ✅ | |
| Spam Training | ✅ | | |
| Telemetry/Logs | ✅ | | |
| DMARC/TLS Reports | ✅ | | |

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,394 @@
//! Auto Task System - Self-Executing Intelligent Tasks
//!
//! This module provides the "Auto Task" functionality that enables tasks to
//! automatically execute themselves using LLM-generated BASIC programs.
//! It integrates with the Intent Compiler, Safety Layer, and MCP servers
//! to create a fully autonomous task execution system.
//!
//! # Architecture
//!
//! ```text
//! User Intent → Auto Task → Intent Compiler → Execution Plan → Safety Check → Execute
//! ↓ ↓ ↓ ↓ ↓ ↓
//! "Build CRM" Create task Generate BASIC Validate plan Simulate Run steps
//! with metadata program & approve impact with audit
//! ```
//!
//! # Features
//!
//! - **Automatic Execution**: Tasks execute themselves when conditions are met
//! - **Safety First**: All actions are simulated and validated before execution
//! - **Decision Framework**: Ambiguous situations generate options for user choice
//! - **Audit Trail**: Complete logging of all actions and decisions
//! - **MCP Integration**: Leverage registered MCP servers for extended capabilities
//! - **Rollback Support**: Automatic rollback on failure when possible
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::{DateTime, Duration, Utc};
use diesel::prelude::*;
use log::{error, info, trace, warn};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
// ============================================================================
// AUTO TASK DATA STRUCTURES
// ============================================================================
/// Represents an auto-executing task
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoTask {
/// Unique task identifier
pub id: String,
/// Human-readable task title
pub title: String,
/// Original intent/description
pub intent: String,
/// Current task status
pub status: AutoTaskStatus,
/// Execution mode
pub mode: ExecutionMode,
/// Priority level
pub priority: TaskPriority,
/// Generated execution plan ID
pub plan_id: Option<String>,
/// Generated BASIC program
pub basic_program: Option<String>,
/// Current execution step (0 = not started)
pub current_step: i32,
/// Total steps in the plan
pub total_steps: i32,
/// Execution progress (0.0 - 1.0)
pub progress: f64,
/// Step execution results
pub step_results: Vec<StepExecutionResult>,
/// Pending decisions requiring user input
pub pending_decisions: Vec<PendingDecision>,
/// Active approvals waiting
pub pending_approvals: Vec<PendingApproval>,
/// Risk assessment summary
pub risk_summary: Option<RiskSummary>,
/// Resource usage tracking
pub resource_usage: ResourceUsage,
/// Error information if failed
pub error: Option<TaskError>,
/// Rollback state if available
pub rollback_state: Option<RollbackState>,
/// Session that created this task
pub session_id: String,
/// Bot executing this task
pub bot_id: String,
/// User who created the task
pub created_by: String,
/// Assigned executor (user or "auto")
pub assigned_to: String,
/// Scheduling information
pub schedule: Option<TaskSchedule>,
/// Tags for organization
pub tags: Vec<String>,
/// Parent task ID if this is a subtask
pub parent_task_id: Option<String>,
/// Child task IDs
pub subtask_ids: Vec<String>,
/// Dependencies on other tasks
pub depends_on: Vec<String>,
/// Tasks that depend on this one
pub dependents: Vec<String>,
/// MCP servers being used
pub mcp_servers: Vec<String>,
/// External APIs being called
pub external_apis: Vec<String>,
/// Timestamps
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
/// Estimated completion time
pub estimated_completion: Option<DateTime<Utc>>,
}
/// Auto task status
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AutoTaskStatus {
/// Task created, not yet analyzed
Draft,
/// Intent being compiled to BASIC program
Compiling,
/// Plan generated, waiting for approval
PendingApproval,
/// Simulating execution impact
Simulating,
/// Waiting for user decision on options
WaitingDecision,
/// Ready to execute
Ready,
/// Currently executing
Running,
/// Paused by user or system
Paused,
/// Waiting for external resource
Blocked,
/// Completed successfully
Completed,
/// Failed with error
Failed,
/// Cancelled by user
Cancelled,
/// Rolling back changes
RollingBack,
/// Rollback completed
RolledBack,
}
impl Default for AutoTaskStatus {
fn default() -> Self {
AutoTaskStatus::Draft
}
}
impl std::fmt::Display for AutoTaskStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AutoTaskStatus::Draft => write!(f, "Draft"),
AutoTaskStatus::Compiling => write!(f, "Compiling"),
AutoTaskStatus::PendingApproval => write!(f, "Pending Approval"),
AutoTaskStatus::Simulating => write!(f, "Simulating"),
AutoTaskStatus::WaitingDecision => write!(f, "Waiting for Decision"),
AutoTaskStatus::Ready => write!(f, "Ready"),
AutoTaskStatus::Running => write!(f, "Running"),
AutoTaskStatus::Paused => write!(f, "Paused"),
AutoTaskStatus::Blocked => write!(f, "Blocked"),
AutoTaskStatus::Completed => write!(f, "Completed"),
AutoTaskStatus::Failed => write!(f, "Failed"),
AutoTaskStatus::Cancelled => write!(f, "Cancelled"),
AutoTaskStatus::RollingBack => write!(f, "Rolling Back"),
AutoTaskStatus::RolledBack => write!(f, "Rolled Back"),
}
}
}
/// Execution mode for the task
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ExecutionMode {
/// Fully automatic - execute without user intervention
FullyAutomatic,
/// Semi-automatic - pause for approvals on high-risk steps
SemiAutomatic,
/// Supervised - pause after each step for review
Supervised,
/// Manual - user triggers each step
Manual,
/// Dry run - simulate only, don't execute
DryRun,
}
impl Default for ExecutionMode {
fn default() -> Self {
ExecutionMode::SemiAutomatic
}
}
/// Task priority
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
pub enum TaskPriority {
Critical = 4,
High = 3,
Medium = 2,
Low = 1,
Background = 0,
}
impl Default for TaskPriority {
fn default() -> Self {
TaskPriority::Medium
}
}
/// Result of executing a single step
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepExecutionResult {
pub step_id: String,
pub step_order: i32,
pub step_name: String,
pub status: StepStatus,
pub started_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub duration_ms: Option<i64>,
pub output: Option<serde_json::Value>,
pub error: Option<String>,
pub logs: Vec<ExecutionLog>,
pub resources_used: ResourceUsage,
pub can_rollback: bool,
pub rollback_data: Option<serde_json::Value>,
}
/// Status of a single step
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StepStatus {
Pending,
Running,
Completed,
Failed,
Skipped,
RolledBack,
}
/// Execution log entry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionLog {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub details: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogLevel {
Debug,
Info,
Warning,
Error,
}
/// A decision point requiring user input
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingDecision {
pub id: String,
pub decision_type: DecisionType,
pub title: String,
pub description: String,
pub options: Vec<DecisionOption>,
pub default_option: Option<String>,
pub timeout_seconds: Option<i32>,
pub timeout_action: TimeoutAction,
pub context: serde_json::Value,
pub created_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DecisionType {
/// Choose between multiple approaches
ApproachSelection,
/// Confirm a high-risk action
RiskConfirmation,
/// Resolve ambiguous intent
AmbiguityResolution,
/// Provide missing information
InformationRequest,
/// Handle an error
ErrorRecovery,
/// Custom decision type
Custom(String),
}
/// An option in a decision
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DecisionOption {
pub id: String,
pub label: String,
pub description: String,
pub pros: Vec<String>,
pub cons: Vec<String>,
pub estimated_impact: ImpactEstimate,
pub recommended: bool,
pub risk_level: RiskLevel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImpactEstimate {
pub cost_change: f64,
pub time_change_minutes: i32,
pub risk_change: f64,
pub description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TimeoutAction {
UseDefault,
Pause,
Cancel,
Escalate,
}
impl Default for TimeoutAction {
fn default() -> Self {
TimeoutAction::Pause
}
}
/// A pending approval request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingApproval {
pub id: String,
pub approval_type: ApprovalType,
pub title: String,
pub description: String,
pub risk_level: RiskLevel,
pub approver: String,
pub step_id: Option<String>,
pub impact_summary: String,
pub simulation_result: Option<SimulationResult>,
pub timeout_seconds: i32,
pub default_action: ApprovalDefault,
pub created_at: DateTime<Utc>,
pub expires_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ApprovalType {
PlanApproval,
StepApproval,
HighRiskAction,
ExternalApiCall,
DataModification,
CostOverride,
SecurityOverride,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ApprovalDefault {
Approve,
Reject,
Pause,
Escalate,
}
impl Default for ApprovalDefault {
fn default() -> Self {
ApprovalDefault::Pause
}
}
/// Risk level classification
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
pub enum RiskLevel {
None = 0,
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
impl Default for RiskLevel {
fn default() -> Self {
RiskLevel::Low
}
}
/// Risk assessment summary
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RiskSummary {
pub overall_risk: RiskLevel,
pub data_risk: RiskLevel,
pub cost_risk: RiskLevel,
pub security_risk: RiskLevel,
pub compliance_risk: RiskLevel,
pub risk_factors: Vec<RiskFactor>,
pub mitigations_applied: Vec<String>,
}
#

View file

@ -0,0 +1,883 @@
//! Auto Task API Handlers
//!
//! This module provides the HTTP API endpoints for the Auto Task system,
//! enabling the UI to interact with the Intent Compiler, execution engine,
//! safety layer, and MCP client.
use crate::basic::keywords::auto_task::{
AutoTask, AutoTaskStatus, ExecutionMode, PendingApproval, PendingDecision, TaskPriority,
};
use crate::basic::keywords::intent_compiler::{CompiledIntent, IntentCompiler};
use crate::basic::keywords::mcp_client::McpClient;
use crate::basic::keywords::safety_layer::{SafetyLayer, SimulationResult};
use crate::shared::state::AppState;
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use log::{error, info, trace};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
// =============================================================================
// REQUEST/RESPONSE TYPES
// =============================================================================
/// Request to compile an intent into an executable plan
#[derive(Debug, Deserialize)]
pub struct CompileIntentRequest {
pub intent: String,
pub execution_mode: Option<String>,
pub priority: Option<String>,
}
/// Response from intent compilation
#[derive(Debug, Serialize)]
pub struct CompileIntentResponse {
pub success: bool,
pub plan_id: Option<String>,
pub plan_name: Option<String>,
pub plan_description: Option<String>,
pub steps: Vec<PlanStepResponse>,
pub alternatives: Vec<AlternativeResponse>,
pub confidence: f64,
pub risk_level: String,
pub estimated_duration_minutes: i32,
pub estimated_cost: f64,
pub resource_estimate: ResourceEstimateResponse,
pub basic_program: Option<String>,
pub requires_approval: bool,
pub mcp_servers: Vec<String>,
pub external_apis: Vec<String>,
pub risks: Vec<RiskResponse>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PlanStepResponse {
pub id: String,
pub order: i32,
pub name: String,
pub description: String,
pub keywords: Vec<String>,
pub priority: String,
pub risk_level: String,
pub estimated_minutes: i32,
pub requires_approval: bool,
}
#[derive(Debug, Serialize)]
pub struct AlternativeResponse {
pub id: String,
pub description: String,
pub confidence: f64,
pub pros: Vec<String>,
pub cons: Vec<String>,
pub estimated_cost: Option<f64>,
pub estimated_time_hours: Option<f64>,
}
#[derive(Debug, Serialize)]
pub struct ResourceEstimateResponse {
pub compute_hours: f64,
pub storage_gb: f64,
pub api_calls: i32,
pub llm_tokens: i32,
pub estimated_cost_usd: f64,
}
#[derive(Debug, Serialize)]
pub struct RiskResponse {
pub id: String,
pub category: String,
pub description: String,
pub probability: f64,
pub impact: String,
}
/// Request to execute a compiled plan
#[derive(Debug, Deserialize)]
pub struct ExecutePlanRequest {
pub plan_id: String,
pub execution_mode: Option<String>,
pub priority: Option<String>,
}
/// Response from plan execution
#[derive(Debug, Serialize)]
pub struct ExecutePlanResponse {
pub success: bool,
pub task_id: Option<String>,
pub status: Option<String>,
pub error: Option<String>,
}
/// Query parameters for listing tasks
#[derive(Debug, Deserialize)]
pub struct ListTasksQuery {
pub filter: Option<String>,
pub status: Option<String>,
pub priority: Option<String>,
pub limit: Option<i32>,
pub offset: Option<i32>,
}
/// Auto task stats response
#[derive(Debug, Serialize)]
pub struct AutoTaskStatsResponse {
pub total: i32,
pub running: i32,
pub pending: i32,
pub completed: i32,
pub failed: i32,
pub pending_approval: i32,
pub pending_decision: i32,
}
/// Task action response
#[derive(Debug, Serialize)]
pub struct TaskActionResponse {
pub success: bool,
pub message: Option<String>,
pub error: Option<String>,
}
/// Decision submission request
#[derive(Debug, Deserialize)]
pub struct DecisionRequest {
pub decision_id: String,
pub option_id: Option<String>,
pub skip: Option<bool>,
}
/// Approval action request
#[derive(Debug, Deserialize)]
pub struct ApprovalRequest {
pub approval_id: String,
pub action: String, // "approve", "reject", "defer"
pub comment: Option<String>,
}
/// Simulation response
#[derive(Debug, Serialize)]
pub struct SimulationResponse {
pub success: bool,
pub confidence: f64,
pub risk_score: f64,
pub risk_level: String,
pub step_outcomes: Vec<StepOutcomeResponse>,
pub impact: ImpactResponse,
pub side_effects: Vec<SideEffectResponse>,
pub recommendations: Vec<RecommendationResponse>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct StepOutcomeResponse {
pub step_id: String,
pub step_name: String,
pub would_succeed: bool,
pub success_probability: f64,
pub failure_modes: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct ImpactResponse {
pub risk_score: f64,
pub risk_level: String,
pub data_impact: DataImpactResponse,
pub cost_impact: CostImpactResponse,
pub time_impact: TimeImpactResponse,
pub security_impact: SecurityImpactResponse,
}
#[derive(Debug, Serialize)]
pub struct DataImpactResponse {
pub records_created: i32,
pub records_modified: i32,
pub records_deleted: i32,
pub tables_affected: Vec<String>,
pub reversible: bool,
}
#[derive(Debug, Serialize)]
pub struct CostImpactResponse {
pub api_costs: f64,
pub compute_costs: f64,
pub storage_costs: f64,
pub total_estimated_cost: f64,
}
#[derive(Debug, Serialize)]
pub struct TimeImpactResponse {
pub estimated_duration_seconds: i32,
pub blocking: bool,
}
#[derive(Debug, Serialize)]
pub struct SecurityImpactResponse {
pub risk_level: String,
pub credentials_accessed: Vec<String>,
pub external_systems: Vec<String>,
pub concerns: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct SideEffectResponse {
pub effect_type: String,
pub description: String,
pub severity: String,
pub mitigation: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct RecommendationResponse {
pub id: String,
pub recommendation_type: String,
pub description: String,
pub action: Option<String>,
}
// =============================================================================
// API HANDLERS
// =============================================================================
/// POST /api/autotask/compile - Compile an intent into an execution plan
pub async fn compile_intent_handler(
State(state): State<Arc<AppState>>,
Json(request): Json<CompileIntentRequest>,
) -> impl IntoResponse {
info!("Compiling intent: {}", &request.intent[..request.intent.len().min(100)]);
// Get session from state (in real implementation, extract from auth)
let session = match get_current_session(&state).await {
Ok(s) => s,
Err(e) => {
return (
StatusCode::UNAUTHORIZED,
Json(CompileIntentResponse {
success: false,
plan_id: None,
plan_name: None,
plan_description: None,
steps: Vec::new(),
alternatives: Vec::new(),
confidence: 0.0,
risk_level: "unknown".to_string(),
estimated_duration_minutes: 0,
estimated_cost: 0.0,
resource_estimate: ResourceEstimateResponse {
compute_hours: 0.0,
storage_gb: 0.0,
api_calls: 0,
llm_tokens: 0,
estimated_cost_usd: 0.0,
},
basic_program: None,
requires_approval: false,
mcp_servers: Vec::new(),
external_apis: Vec::new(),
risks: Vec::new(),
error: Some(format!("Authentication error: {}", e)),
}),
);
}
};
// Create intent compiler
let compiler = IntentCompiler::new(Arc::clone(&state));
// Compile the intent
match compiler.compile(&request.intent, &session).await {
Ok(compiled) => {
let response = CompileIntentResponse {
success: true,
plan_id: Some(compiled.plan.id.clone()),
plan_name: Some(compiled.plan.name.clone()),
plan_description: Some(compiled.plan.description.clone()),
steps: compiled
.plan
.steps
.iter()
.map(|s| PlanStepResponse {
id: s.id.clone(),
order: s.order,
name: s.name.clone(),
description: s.description.clone(),
keywords: s.keywords.clone(),
priority: format!("{:?}", s.priority),
risk_level: format!("{:?}", s.risk_level),
estimated_minutes: s.estimated_minutes,
requires_approval: s.requires_approval,
})
.collect(),
alternatives: compiled
.alternatives
.iter()
.map(|a| AlternativeResponse {
id: a.id.clone(),
description: a.description.clone(),
confidence: a.confidence,
pros: a.pros.clone(),
cons: a.cons.clone(),
estimated_cost: a.estimated_cost,
estimated_time_hours: a.estimated_time_hours,
})
.collect(),
confidence: compiled.confidence,
risk_level: format!("{:?}", compiled.risk_assessment.overall_risk),
estimated_duration_minutes: compiled.plan.estimated_duration_minutes,
estimated_cost: compiled.resource_estimate.estimated_cost_usd,
resource_estimate: ResourceEstimateResponse {
compute_hours: compiled.resource_estimate.compute_hours,
storage_gb: compiled.resource_estimate.storage_gb,
api_calls: compiled.resource_estimate.api_calls,
llm_tokens: 0, // TODO: Track LLM tokens
estimated_cost_usd: compiled.resource_estimate.estimated_cost_usd,
},
basic_program: Some(compiled.basic_program.clone()),
requires_approval: compiled.plan.requires_approval,
mcp_servers: compiled.resource_estimate.mcp_servers_needed.clone(),
external_apis: compiled.resource_estimate.external_services.clone(),
risks: compiled
.risk_assessment
.risks
.iter()
.map(|r| RiskResponse {
id: r.id.clone(),
category: format!("{:?}", r.category),
description: r.description.clone(),
probability: r.probability,
impact: format!("{:?}", r.impact),
})
.collect(),
error: None,
};
(StatusCode::OK, Json(response))
}
Err(e) => {
error!("Failed to compile intent: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(CompileIntentResponse {
success: false,
plan_id: None,
plan_name: None,
plan_description: None,
steps: Vec::new(),
alternatives: Vec::new(),
confidence: 0.0,
risk_level: "unknown".to_string(),
estimated_duration_minutes: 0,
estimated_cost: 0.0,
resource_estimate: ResourceEstimateResponse {
compute_hours: 0.0,
storage_gb: 0.0,
api_calls: 0,
llm_tokens: 0,
estimated_cost_usd: 0.0,
},
basic_program: None,
requires_approval: false,
mcp_servers: Vec::new(),
external_apis: Vec::new(),
risks: Vec::new(),
error: Some(e.to_string()),
}),
)
}
}
}
/// POST /api/autotask/execute - Execute a compiled plan
pub async fn execute_plan_handler(
State(state): State<Arc<AppState>>,
Json(request): Json<ExecutePlanRequest>,
) -> impl IntoResponse {
info!("Executing plan: {}", request.plan_id);
let session = match get_current_session(&state).await {
Ok(s) => s,
Err(e) => {
return (
StatusCode::UNAUTHORIZED,
Json(ExecutePlanResponse {
success: false,
task_id: None,
status: None,
error: Some(format!("Authentication error: {}", e)),
}),
);
}
};
// Parse execution mode
let execution_mode = match request.execution_mode.as_deref() {
Some("fully-automatic") => ExecutionMode::FullyAutomatic,
Some("supervised") => ExecutionMode::Supervised,
Some("manual") => ExecutionMode::Manual,
Some("dry-run") => ExecutionMode::DryRun,
_ => ExecutionMode::SemiAutomatic,
};
// Parse priority
let priority = match request.priority.as_deref() {
Some("critical") => TaskPriority::Critical,
Some("high") => TaskPriority::High,
Some("low") => TaskPriority::Low,
Some("background") => TaskPriority::Background,
_ => TaskPriority::Medium,
};
// Create the auto task from the compiled plan
match create_auto_task_from_plan(&state, &session, &request.plan_id, execution_mode, priority).await {
Ok(task) => {
// Start execution
match start_task_execution(&state, &task.id).await {
Ok(_) => (
StatusCode::OK,
Json(ExecutePlanResponse {
success: true,
task_id: Some(task.id),
status: Some(task.status.to_string()),
error: None,
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ExecutePlanResponse {
success: false,
task_id: Some(task.id),
status: Some("failed".to_string()),
error: Some(e.to_string()),
}),
),
}
}
Err(e) => {
error!("Failed to create task: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ExecutePlanResponse {
success: false,
task_id: None,
status: None,
error: Some(e.to_string()),
}),
)
}
}
}
/// GET /api/autotask/list - List auto tasks
pub async fn list_tasks_handler(
State(state): State<Arc<AppState>>,
Query(query): Query<ListTasksQuery>,
) -> impl IntoResponse {
let filter = query.filter.as_deref().unwrap_or("all");
let limit = query.limit.unwrap_or(50);
let offset = query.offset.unwrap_or(0);
match list_auto_tasks(&state, filter, limit, offset).await {
Ok(tasks) => {
// Render as HTML for HTMX
let html = render_task_list_html(&tasks);
(StatusCode::OK, axum::response::Html(html))
}
Err(e) => {
error!("Failed to list tasks: {}", e);
let html = format!(
r#"<div class="error-message">
<span class="error-icon"></span>
<p>Failed to load tasks: {}</p>
</div>"#,
html_escape(&e.to_string())
);
(StatusCode::INTERNAL_SERVER_ERROR, axum::response::Html(html))
}
}
}
/// GET /api/autotask/stats - Get auto task statistics
pub async fn get_stats_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
match get_auto_task_stats(&state).await {
Ok(stats) => (StatusCode::OK, Json(stats)),
Err(e) => {
error!("Failed to get stats: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(AutoTaskStatsResponse {
total: 0,
running: 0,
pending: 0,
completed: 0,
failed: 0,
pending_approval: 0,
pending_decision: 0,
}),
)
}
}
}
/// POST /api/autotask/:task_id/pause - Pause a task
pub async fn pause_task_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
match update_task_status(&state, &task_id, AutoTaskStatus::Paused).await {
Ok(_) => (
StatusCode::OK,
Json(TaskActionResponse {
success: true,
message: Some("Task paused".to_string()),
error: None,
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(TaskActionResponse {
success: false,
message: None,
error: Some(e.to_string()),
}),
),
}
}
/// POST /api/autotask/:task_id/resume - Resume a paused task
pub async fn resume_task_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
match update_task_status(&state, &task_id, AutoTaskStatus::Running).await {
Ok(_) => {
// Restart execution
let _ = start_task_execution(&state, &task_id).await;
(
StatusCode::OK,
Json(TaskActionResponse {
success: true,
message: Some("Task resumed".to_string()),
error: None,
}),
)
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(TaskActionResponse {
success: false,
message: None,
error: Some(e.to_string()),
}),
),
}
}
/// POST /api/autotask/:task_id/cancel - Cancel a task
pub async fn cancel_task_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
match update_task_status(&state, &task_id, AutoTaskStatus::Cancelled).await {
Ok(_) => (
StatusCode::OK,
Json(TaskActionResponse {
success: true,
message: Some("Task cancelled".to_string()),
error: None,
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(TaskActionResponse {
success: false,
message: None,
error: Some(e.to_string()),
}),
),
}
}
/// POST /api/autotask/:task_id/simulate - Simulate task execution
pub async fn simulate_task_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
let session = match get_current_session(&state).await {
Ok(s) => s,
Err(e) => {
return (
StatusCode::UNAUTHORIZED,
Json(SimulationResponse {
success: false,
confidence: 0.0,
risk_score: 0.0,
risk_level: "unknown".to_string(),
step_outcomes: Vec::new(),
impact: ImpactResponse {
risk_score: 0.0,
risk_level: "unknown".to_string(),
data_impact: DataImpactResponse {
records_created: 0,
records_modified: 0,
records_deleted: 0,
tables_affected: Vec::new(),
reversible: true,
},
cost_impact: CostImpactResponse {
api_costs: 0.0,
compute_costs: 0.0,
storage_costs: 0.0,
total_estimated_cost: 0.0,
},
time_impact: TimeImpactResponse {
estimated_duration_seconds: 0,
blocking: false,
},
security_impact: SecurityImpactResponse {
risk_level: "unknown".to_string(),
credentials_accessed: Vec::new(),
external_systems: Vec::new(),
concerns: Vec::new(),
},
},
side_effects: Vec::new(),
recommendations: Vec::new(),
error: Some(format!("Authentication error: {}", e)),
}),
);
}
};
let safety_layer = SafetyLayer::new(Arc::clone(&state));
match simulate_task_execution(&state, &safety_layer, &task_id, &session).await {
Ok(result) => {
let response = SimulationResponse {
success: result.success,
confidence: result.confidence,
risk_score: result.impact.risk_score,
risk_level: format!("{}", result.impact.risk_level),
step_outcomes: result
.step_outcomes
.iter()
.map(|s| StepOutcomeResponse {
step_id: s.step_id.clone(),
step_name: s.step_name.clone(),
would_succeed: s.would_succeed,
success_probability: s.success_probability,
failure_modes: s.failure_modes.iter().map(|f| f.failure_type.clone()).collect(),
})
.collect(),
impact: ImpactResponse {
risk_score: result.impact.risk_score,
risk_level: format!("{}", result.impact.risk_level),
data_impact: DataImpactResponse {
records_created: result.impact.data_impact.records_created,
records_modified: result.impact.data_impact.records_modified,
records_deleted: result.impact.data_impact.records_deleted,
tables_affected: result.impact.data_impact.tables_affected.clone(),
reversible: result.impact.data_impact.reversible,
},
cost_impact: CostImpactResponse {
api_costs: result.impact.cost_impact.api_costs,
compute_costs: result.impact.cost_impact.compute_costs,
storage_costs: result.impact.cost_impact.storage_costs,
total_estimated_cost: result.impact.cost_impact.total_estimated_cost,
},
time_impact: TimeImpactResponse {
estimated_duration_seconds: result.impact.time_impact.estimated_duration_seconds,
blocking: result.impact.time_impact.blocking,
},
security_impact: SecurityImpactResponse {
risk_level: format!("{}", result.impact.security_impact.risk_level),
credentials_accessed: result.impact.security_impact.credentials_accessed.clone(),
external_systems: result.impact.security_impact.external_systems.clone(),
concerns: result.impact.security_impact.concerns.clone(),
},
},
side_effects: result
.side_effects
.iter()
.map(|s| SideEffectResponse {
effect_type: s.effect_type.clone(),
description: s.description.clone(),
severity: format!("{:?}", s.severity),
mitigation: s.mitigation.clone(),
})
.collect(),
recommendations: result
.recommendations
.iter()
.enumerate()
.map(|(i, r)| RecommendationResponse {
id: format!("rec-{}", i),
recommendation_type: format!("{:?}", r.recommendation_type),
description: r.description.clone(),
action: r.action.clone(),
})
.collect(),
error: None,
};
(StatusCode::OK, Json(response))
}
Err(e) => {
error!("Simulation failed: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(SimulationResponse {
success: false,
confidence: 0.0,
risk_score: 1.0,
risk_level: "unknown".to_string(),
step_outcomes: Vec::new(),
impact: ImpactResponse {
risk_score: 1.0,
risk_level: "unknown".to_string(),
data_impact: DataImpactResponse {
records_created: 0,
records_modified: 0,
records_deleted: 0,
tables_affected: Vec::new(),
reversible: true,
},
cost_impact: CostImpactResponse {
api_costs: 0.0,
compute_costs: 0.0,
storage_costs: 0.0,
total_estimated_cost: 0.0,
},
time_impact: TimeImpactResponse {
estimated_duration_seconds: 0,
blocking: false,
},
security_impact: SecurityImpactResponse {
risk_level: "unknown".to_string(),
credentials_accessed: Vec::new(),
external_systems: Vec::new(),
concerns: Vec::new(),
},
},
side_effects: Vec::new(),
recommendations: Vec::new(),
error: Some(e.to_string()),
}),
)
}
}
}
/// GET /api/autotask/:task_id/decisions - Get pending decisions for a task
pub async fn get_decisions_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
match get_pending_decisions(&state, &task_id).await {
Ok(decisions) => (StatusCode::OK, Json(decisions)),
Err(e) => {
error!("Failed to get decisions: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(Vec::<PendingDecision>::new()))
}
}
}
/// POST /api/autotask/:task_id/decide - Submit a decision
pub async fn submit_decision_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
Json(request): Json<DecisionRequest>,
) -> impl IntoResponse {
match submit_decision(&state, &task_id, &request).await {
Ok(_) => (
StatusCode::OK,
Json(TaskActionResponse {
success: true,
message: Some("Decision submitted".to_string()),
error: None,
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(TaskActionResponse {
success: false,
message: None,
error: Some(e.to_string()),
}),
),
}
}
/// GET /api/autotask/:task_id/approvals - Get pending approvals for a task
pub async fn get_approvals_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
) -> impl IntoResponse {
match get_pending_approvals(&state, &task_id).await {
Ok(approvals) => (StatusCode::OK, Json(approvals)),
Err(e) => {
error!("Failed to get approvals: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(Vec::<PendingApproval>::new()))
}
}
}
/// POST /api/autotask/:task_id/approve - Submit an approval decision
pub async fn submit_approval_handler(
State(state): State<Arc<AppState>>,
Path(task_id): Path<String>,
Json(request): Json<ApprovalRequest>,
) -> impl IntoResponse {
match submit_approval(&state, &task_id, &request).await {
Ok(_) => (
StatusCode::OK,
Json(TaskActionResponse {
success: true,
message: Some(format!("Approval {}", request.action)),
error: None,
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(TaskActionResponse {
success: false,
message: None,
error: Some(e.to_string()),
}),
),
}
}
/// POST /api/autotask/simulate/:plan_id - Simulate a plan before execution
pub async fn simulate_plan_handler(
State(state): State<Arc<AppState>>,
Path(plan_id): Path<String>,
) -> impl IntoResponse {
let session = match get_current_session(&state).await {
Ok(s) => s,
Err(e) => {
return (
StatusCode::UNAUTHORIZED,
Json(SimulationResponse {
success: false,
confidence: 0.0,
risk_score: 0.0,
risk_level: "unknown".to_string(),
step_outcomes: Vec::new(),
impact: ImpactResponse {
risk_score: 0.0,
risk_level: "unknown".to_string(),
data_impact: DataImpactResponse {
records_created: 0,
records_modified: 0,
records_deleted: 0,
tables_affected: Vec::new(),
reversible: true,

View file

@ -0,0 +1,879 @@
//! Intent Compiler - LLM to BASIC Program Translator
//!
//! This module provides the core "Intent Compiler" functionality that translates
//! natural language requests into executable BASIC programs using the General Bots
//! keyword system.
//!
//! # Architecture
//!
//! ```text
//! User Intent → Intent Analysis → Plan Generation → BASIC Program → Execution
//! ↓ ↓ ↓ ↓ ↓
//! "Make CRM" Extract entities Generate steps CREATE_TASK Run with
//! & requirements with keywords SET var safety checks
//! ```
//!
//! # Example
//!
//! ```basic
//! ' Generated from: "Make a financial CRM for Deloitte"
//! PLAN_START "Financial CRM for Deloitte"
//! STEP 1, "Create database schema", HIGH
//! STEP 2, "Setup user authentication", HIGH
//! STEP 3, "Create client management module", MEDIUM
//! STEP 4, "Create financial tracking module", MEDIUM
//! STEP 5, "Create reporting dashboard", LOW
//! PLAN_END
//!
//! REQUIRE_APPROVAL "create-database", "Creating database will cost ~$50/month"
//! IF approved THEN
//! RUN_PYTHON "create_schema.py"
//! END IF
//! ```
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use log::{error, info, trace, warn};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
// ============================================================================
// CORE DATA STRUCTURES
// ============================================================================
/// Represents a compiled intent - the result of LLM analysis
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompiledIntent {
/// Unique identifier for this compiled intent
pub id: String,
/// Original user intent/request
pub original_intent: String,
/// Extracted entities from the intent
pub entities: IntentEntities,
/// Generated execution plan
pub plan: ExecutionPlan,
/// Generated BASIC program
pub basic_program: String,
/// Confidence score (0.0 - 1.0)
pub confidence: f64,
/// Alternative interpretations if ambiguous
pub alternatives: Vec<AlternativeInterpretation>,
/// Risk assessment
pub risk_assessment: RiskAssessment,
/// Estimated resources needed
pub resource_estimate: ResourceEstimate,
/// Timestamp of compilation
pub compiled_at: DateTime<Utc>,
/// Session that requested this compilation
pub session_id: String,
/// Bot that will execute this
pub bot_id: String,
}
/// Entities extracted from the user's intent
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IntentEntities {
/// Primary action (create, update, delete, analyze, etc.)
pub action: String,
/// Target object/system (CRM, website, report, etc.)
pub target: String,
/// Domain/industry (financial, healthcare, retail, etc.)
pub domain: Option<String>,
/// Client/company name if mentioned
pub client: Option<String>,
/// Specific features requested
pub features: Vec<String>,
/// Constraints mentioned (budget, timeline, etc.)
pub constraints: Vec<Constraint>,
/// Technologies/tools mentioned
pub technologies: Vec<String>,
/// Data sources mentioned
pub data_sources: Vec<String>,
/// Integrations needed
pub integrations: Vec<String>,
}
/// A constraint on the task
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Constraint {
pub constraint_type: ConstraintType,
pub value: String,
pub is_hard: bool, // Hard constraint = must be met, soft = preferred
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ConstraintType {
Budget,
Timeline,
Technology,
Security,
Compliance,
Performance,
Scalability,
Custom(String),
}
/// Execution plan generated from the intent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionPlan {
pub id: String,
pub name: String,
pub description: String,
pub steps: Vec<PlanStep>,
pub dependencies: HashMap<String, Vec<String>>, // step_id -> depends_on[]
pub estimated_duration_minutes: i32,
pub requires_approval: bool,
pub approval_levels: Vec<ApprovalLevel>,
pub rollback_plan: Option<String>,
}
/// A single step in the execution plan
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanStep {
pub id: String,
pub order: i32,
pub name: String,
pub description: String,
pub keywords: Vec<String>, // BASIC keywords this step will use
pub basic_code: String, // Generated BASIC code for this step
pub priority: StepPriority,
pub risk_level: RiskLevel,
pub estimated_minutes: i32,
pub requires_approval: bool,
pub can_rollback: bool,
pub dependencies: Vec<String>,
pub outputs: Vec<String>, // Variables/resources this step produces
pub mcp_servers: Vec<String>, // MCP servers this step needs
pub api_calls: Vec<ApiCallSpec>, // External APIs this step calls
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StepPriority {
Critical, // Must complete for any success
High, // Important for core functionality
Medium, // Adds significant value
Low, // Nice to have
Optional, // Can be skipped if needed
}
impl Default for StepPriority {
fn default() -> Self {
StepPriority::Medium
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RiskLevel {
None, // No risk, reversible
Low, // Minor impact if fails
Medium, // Moderate impact, recoverable
High, // Significant impact, difficult recovery
Critical, // Severe impact, may not be recoverable
}
impl Default for RiskLevel {
fn default() -> Self {
RiskLevel::Low
}
}
/// API call specification for external integrations
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiCallSpec {
pub name: String,
pub method: String,
pub url_template: String,
pub headers: HashMap<String, String>,
pub body_template: Option<String>,
pub auth_type: AuthType,
pub retry_config: RetryConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthType {
None,
ApiKey { header: String, key_ref: String },
Bearer { token_ref: String },
Basic { user_ref: String, pass_ref: String },
OAuth2 { client_id_ref: String, client_secret_ref: String },
}
impl Default for AuthType {
fn default() -> Self {
AuthType::None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: i32,
pub backoff_ms: i32,
pub retry_on_status: Vec<i32>,
}
impl Default for RetryConfig {
fn default() -> Self {
RetryConfig {
max_retries: 3,
backoff_ms: 1000,
retry_on_status: vec![429, 500, 502, 503, 504],
}
}
}
/// Approval level for human-in-the-loop
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApprovalLevel {
pub level: i32,
pub approver: String, // Role or specific user
pub reason: String,
pub timeout_minutes: i32,
pub default_action: DefaultApprovalAction,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DefaultApprovalAction {
Approve,
Reject,
Escalate,
Pause,
}
impl Default for DefaultApprovalAction {
fn default() -> Self {
DefaultApprovalAction::Pause
}
}
/// Alternative interpretation when intent is ambiguous
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlternativeInterpretation {
pub id: String,
pub description: String,
pub confidence: f64,
pub plan_summary: String,
pub pros: Vec<String>,
pub cons: Vec<String>,
pub estimated_cost: Option<f64>,
pub estimated_time_hours: Option<f64>,
}
/// Risk assessment for the compiled intent
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RiskAssessment {
pub overall_risk: RiskLevel,
pub risks: Vec<IdentifiedRisk>,
pub mitigations: Vec<RiskMitigation>,
pub requires_human_review: bool,
pub review_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IdentifiedRisk {
pub id: String,
pub category: RiskCategory,
pub description: String,
pub probability: f64, // 0.0 - 1.0
pub impact: RiskLevel,
pub affected_steps: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RiskCategory {
DataLoss,
SecurityBreach,
CostOverrun,
TimelineSlip,
IntegrationFailure,
ComplianceViolation,
PerformanceIssue,
DependencyFailure,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RiskMitigation {
pub risk_id: String,
pub strategy: String,
pub basic_code: Option<String>, // BASIC code to implement mitigation
pub fallback_plan: Option<String>,
}
/// Resource estimate for the task
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceEstimate {
pub compute_hours: f64,
pub storage_gb: f64,
pub api_calls: i32,
pub estimated_cost_usd: f64,
pub human_hours: f64,
pub mcp_servers_needed: Vec<String>,
pub external_services: Vec<String>,
}
impl Default for ResourceEstimate {
fn default() -> Self {
ResourceEstimate {
compute_hours: 0.0,
storage_gb: 0.0,
api_calls: 0,
estimated_cost_usd: 0.0,
human_hours: 0.0,
mcp_servers_needed: Vec::new(),
external_services: Vec::new(),
}
}
}
// ============================================================================
// INTENT COMPILER ENGINE
// ============================================================================
/// The main Intent Compiler engine
pub struct IntentCompiler {
state: Arc<AppState>,
config: IntentCompilerConfig,
}
/// Configuration for the Intent Compiler
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntentCompilerConfig {
/// Enable/disable the compiler
pub enabled: bool,
/// LLM model to use for compilation
pub model: String,
/// Temperature for LLM (creativity vs determinism)
pub temperature: f64,
/// Maximum tokens for LLM response
pub max_tokens: i32,
/// Auto-execute low-risk tasks
pub auto_execute_low_risk: bool,
/// Always require approval for these risk levels
pub require_approval_above: RiskLevel,
/// Enable simulation before execution
pub simulate_before_execute: bool,
/// Maximum steps in a generated plan
pub max_plan_steps: i32,
/// Available keywords for code generation
pub available_keywords: Vec<String>,
/// Available MCP servers
pub available_mcp_servers: Vec<String>,
}
impl Default for IntentCompilerConfig {
fn default() -> Self {
IntentCompilerConfig {
enabled: true,
model: "gpt-4".to_string(),
temperature: 0.3, // Lower for more deterministic output
max_tokens: 4000,
auto_execute_low_risk: false,
require_approval_above: RiskLevel::Medium,
simulate_before_execute: true,
max_plan_steps: 50,
available_keywords: get_all_keywords(),
available_mcp_servers: Vec::new(),
}
}
}
impl std::fmt::Debug for IntentCompiler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IntentCompiler")
.field("config", &self.config)
.finish()
}
}
impl IntentCompiler {
pub fn new(state: Arc<AppState>) -> Self {
IntentCompiler {
state,
config: IntentCompilerConfig::default(),
}
}
pub fn with_config(state: Arc<AppState>, config: IntentCompilerConfig) -> Self {
IntentCompiler { state, config }
}
/// Main compilation method - translates intent to executable BASIC program
pub async fn compile(
&self,
intent: &str,
session: &UserSession,
) -> Result<CompiledIntent, Box<dyn std::error::Error + Send + Sync>> {
info!(
"Compiling intent for session {}: {}",
session.id,
&intent[..intent.len().min(100)]
);
// Step 1: Analyze the intent using LLM
let entities = self.extract_entities(intent).await?;
trace!("Extracted entities: {:?}", entities);
// Step 2: Generate execution plan
let plan = self.generate_plan(intent, &entities).await?;
trace!("Generated plan with {} steps", plan.steps.len());
// Step 3: Generate BASIC program from plan
let basic_program = self.generate_basic_program(&plan, &entities).await?;
trace!(
"Generated BASIC program: {} lines",
basic_program.lines().count()
);
// Step 4: Assess risks
let risk_assessment = self.assess_risks(&plan).await?;
// Step 5: Estimate resources
let resource_estimate = self.estimate_resources(&plan).await?;
// Step 6: Check for ambiguity and generate alternatives if needed
let (confidence, alternatives) =
self.check_ambiguity(intent, &entities, &plan).await?;
let compiled = CompiledIntent {
id: Uuid::new_v4().to_string(),
original_intent: intent.to_string(),
entities,
plan,
basic_program,
confidence,
alternatives,
risk_assessment,
resource_estimate,
compiled_at: Utc::now(),
session_id: session.id.to_string(),
bot_id: session.bot_id.to_string(),
};
// Store the compiled intent
self.store_compiled_intent(&compiled).await?;
Ok(compiled)
}
/// Extract entities from the user's intent using LLM
async fn extract_entities(
&self,
intent: &str,
) -> Result<IntentEntities, Box<dyn std::error::Error + Send + Sync>> {
let prompt = format!(
r#"Analyze this user request and extract structured information.
User Request: "{}"
Extract the following as JSON:
{{
"action": "primary action (create/update/delete/analyze/report/integrate/automate)",
"target": "what to create/modify (CRM, website, report, API, etc.)",
"domain": "industry/domain if mentioned (financial, healthcare, retail, etc.) or null",
"client": "client/company name if mentioned or null",
"features": ["list of specific features requested"],
"constraints": [
{{"type": "budget|timeline|technology|security|compliance|performance", "value": "constraint value", "is_hard": true/false}}
],
"technologies": ["specific technologies/tools mentioned"],
"data_sources": ["data sources mentioned"],
"integrations": ["external systems to integrate with"]
}}
Respond ONLY with valid JSON, no explanation."#,
intent
);
let response = self.call_llm(&prompt).await?;
let entities: IntentEntities = serde_json::from_str(&response).unwrap_or_else(|e| {
warn!("Failed to parse entity extraction response: {}", e);
IntentEntities {
action: "create".to_string(),
target: intent.to_string(),
..Default::default()
}
});
Ok(entities)
}
/// Generate an execution plan from the analyzed intent
async fn generate_plan(
&self,
intent: &str,
entities: &IntentEntities,
) -> Result<ExecutionPlan, Box<dyn std::error::Error + Send + Sync>> {
let keywords_list = self.config.available_keywords.join(", ");
let mcp_servers_list = self.config.available_mcp_servers.join(", ");
let prompt = format!(
r#"Generate an execution plan for this task.
Original Request: "{}"
Extracted Information:
- Action: {}
- Target: {}
- Domain: {}
- Client: {}
- Features: {:?}
- Technologies: {:?}
- Integrations: {:?}
Available BASIC Keywords: {}
Available MCP Servers: {}
Generate a detailed execution plan as JSON:
{{
"name": "short plan name",
"description": "brief description",
"steps": [
{{
"id": "step-1",
"order": 1,
"name": "Step name",
"description": "What this step does",
"keywords": ["BASIC keywords this step uses"],
"priority": "CRITICAL|HIGH|MEDIUM|LOW|OPTIONAL",
"risk_level": "NONE|LOW|MEDIUM|HIGH|CRITICAL",
"estimated_minutes": 5,
"requires_approval": false,
"can_rollback": true,
"dependencies": [],
"outputs": ["variables/resources produced"],
"mcp_servers": ["MCP servers needed"],
"api_calls": []
}}
],
"requires_approval": true/false,
"estimated_duration_minutes": 60,
"rollback_plan": "how to undo if needed"
}}
Maximum {} steps. Focus on practical, executable steps.
Respond ONLY with valid JSON."#,
intent,
entities.action,
entities.target,
entities.domain.as_deref().unwrap_or("general"),
entities.client.as_deref().unwrap_or("none"),
entities.features,
entities.technologies,
entities.integrations,
keywords_list,
mcp_servers_list,
self.config.max_plan_steps
);
let response = self.call_llm(&prompt).await?;
#[derive(Deserialize)]
struct PlanResponse {
name: String,
description: String,
steps: Vec<PlanStepResponse>,
requires_approval: Option<bool>,
estimated_duration_minutes: Option<i32>,
rollback_plan: Option<String>,
}
#[derive(Deserialize)]
struct PlanStepResponse {
id: String,
order: i32,
name: String,
description: String,
keywords: Vec<String>,
priority: Option<String>,
risk_level: Option<String>,
estimated_minutes: Option<i32>,
requires_approval: Option<bool>,
can_rollback: Option<bool>,
dependencies: Option<Vec<String>>,
outputs: Option<Vec<String>>,
mcp_servers: Option<Vec<String>>,
api_calls: Option<Vec<ApiCallSpec>>,
}
let plan_response: PlanResponse = serde_json::from_str(&response)?;
let steps: Vec<PlanStep> = plan_response
.steps
.into_iter()
.map(|s| PlanStep {
id: s.id,
order: s.order,
name: s.name,
description: s.description,
keywords: s.keywords,
basic_code: String::new(), // Will be generated later
priority: match s.priority.as_deref() {
Some("CRITICAL") => StepPriority::Critical,
Some("HIGH") => StepPriority::High,
Some("MEDIUM") => StepPriority::Medium,
Some("LOW") => StepPriority::Low,
Some("OPTIONAL") => StepPriority::Optional,
_ => StepPriority::Medium,
},
risk_level: match s.risk_level.as_deref() {
Some("NONE") => RiskLevel::None,
Some("LOW") => RiskLevel::Low,
Some("MEDIUM") => RiskLevel::Medium,
Some("HIGH") => RiskLevel::High,
Some("CRITICAL") => RiskLevel::Critical,
_ => RiskLevel::Low,
},
estimated_minutes: s.estimated_minutes.unwrap_or(5),
requires_approval: s.requires_approval.unwrap_or(false),
can_rollback: s.can_rollback.unwrap_or(true),
dependencies: s.dependencies.unwrap_or_default(),
outputs: s.outputs.unwrap_or_default(),
mcp_servers: s.mcp_servers.unwrap_or_default(),
api_calls: s.api_calls.unwrap_or_default(),
})
.collect();
// Build dependency map
let mut dependencies: HashMap<String, Vec<String>> = HashMap::new();
for step in &steps {
dependencies.insert(step.id.clone(), step.dependencies.clone());
}
// Determine approval levels based on risk
let approval_levels = self.determine_approval_levels(&steps);
Ok(ExecutionPlan {
id: Uuid::new_v4().to_string(),
name: plan_response.name,
description: plan_response.description,
steps,
dependencies,
estimated_duration_minutes: plan_response.estimated_duration_minutes.unwrap_or(60),
requires_approval: plan_response.requires_approval.unwrap_or(false),
approval_levels,
rollback_plan: plan_response.rollback_plan,
})
}
/// Generate BASIC program code from the execution plan
async fn generate_basic_program(
&self,
plan: &ExecutionPlan,
entities: &IntentEntities,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut program = String::new();
// Header comment
program.push_str(&format!(
"' =============================================================================\n"
));
program.push_str(&format!("' AUTO-GENERATED BASIC PROGRAM\n"));
program.push_str(&format!("' Plan: {}\n", plan.name));
program.push_str(&format!("' Description: {}\n", plan.description));
program.push_str(&format!("' Generated: {}\n", Utc::now().format("%Y-%m-%d %H:%M:%S")));
program.push_str(&format!(
"' =============================================================================\n\n"
));
// Plan declaration
program.push_str(&format!(
"PLAN_START \"{}\", \"{}\"\n",
plan.name, plan.description
));
// Declare steps
for step in &plan.steps {
let priority_str = match step.priority {
StepPriority::Critical => "CRITICAL",
StepPriority::High => "HIGH",
StepPriority::Medium => "MEDIUM",
StepPriority::Low => "LOW",
StepPriority::Optional => "OPTIONAL",
};
program.push_str(&format!(
" STEP {}, \"{}\", {}\n",
step.order, step.name, priority_str
));
}
program.push_str("PLAN_END\n\n");
// Initialize variables
program.push_str("' Initialize context variables\n");
program.push_str(&format!("SET action = \"{}\"\n", entities.action));
program.push_str(&format!("SET target = \"{}\"\n", entities.target));
if let Some(ref client) = entities.client {
program.push_str(&format!("SET client = \"{}\"\n", client));
}
if let Some(ref domain) = entities.domain {
program.push_str(&format!("SET domain = \"{}\"\n", domain));
}
program.push_str("\n");
// Set context for LLM operations
program.push_str("' Set LLM context\n");
program.push_str(&format!(
"SET CONTEXT \"Task: {} {} for {}\"\n\n",
entities.action,
entities.target,
entities.client.as_deref().unwrap_or("general use")
));
// Generate code for each step
for step in &plan.steps {
program.push_str(&format!(
"' -----------------------------------------------------------------------------\n"
));
program.push_str(&format!("' STEP {}: {}\n", step.order, step.name));
program.push_str(&format!("' {}\n", step.description));
program.push_str(&format!(
"' Risk: {:?}, Approval Required: {}\n",
step.risk_level, step.requires_approval
));
program.push_str(&format!(
"' -----------------------------------------------------------------------------\n"
));
// Generate step code
let step_code = self.generate_step_code(step, entities).await?;
program.push_str(&step_code);
program.push_str("\n");
}
// Completion
program.push_str("' Task completed\n");
program.push_str("TALK \"Task completed successfully!\"\n");
program.push_str(&format!(
"AUDIT_LOG \"plan-complete\", \"{}\", \"success\"\n",
plan.id
));
Ok(program)
}
/// Generate BASIC code for a single step
async fn generate_step_code(
&self,
step: &PlanStep,
_entities: &IntentEntities,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut code = String::new();
// Add approval check if needed
if step.requires_approval {
code.push_str(&format!(
"REQUIRE_APPROVAL \"step-{}\", \"{}\"\n",
step.order, step.description
));
code.push_str("IF NOT approved THEN\n");
code.push_str(&format!(
" TALK \"Step {} was not approved, skipping...\"\n",
step.order
));
code.push_str(&format!(" GOTO step_{}_end\n", step.order));
code.push_str("END IF\n\n");
}
// Add simulation check for high-risk steps
if matches!(step.risk_level, RiskLevel::High | RiskLevel::Critical) {
code.push_str(&format!(
"simulation_result = SIMULATE_IMPACT \"step-{}\"\n",
step.order
));
code.push_str("IF simulation_result.risk_score > 0.7 THEN\n");
code.push_str(&format!(
" TALK \"High risk detected in step {}, requesting manual review...\"\n",
step.order
));
code.push_str(" REQUIRE_APPROVAL \"high-risk-override\", simulation_result.summary\n");
code.push_str("END IF\n\n");
}
// Audit log start
code.push_str(&format!(
"AUDIT_LOG \"step-start\", \"step-{}\", \"{}\"\n",
step.order, step.name
));
// Generate code based on keywords
for keyword in &step.keywords {
match keyword.to_uppercase().as_str() {
"CREATE_TASK" => {
code.push_str(&format!(
"task_{} = CREATE_TASK \"{}\", \"auto\", \"+1 day\", null\n",
step.order,
step.name
));
}
"LLM" => {
code.push_str(&format!(
"llm_result_{} = LLM \"{}\"\n",
step.order, step.description
));
}
"RUN_PYTHON" => {
code.push_str(&format!(
"python_result_{} = RUN_PYTHON \"# {}\nprint('Step {} executed')\"\n",
step.order, step.description, step.order
));
}
"RUN_JAVASCRIPT" => {
code.push_str(&format!(
"js_result_{} = RUN_JAVASCRIPT \"console.log('Step {} executed');\"\n",
step.order, step.order
));
}
"GET" => {
code.push_str(&format!("data_{} = GET \"{}_data\"\n", step.order, step.id));
}
"SET" => {
code.push_str(&format!(
"SET step_{}_complete = true\n",
step.order
));
}
"SAVE" => {
code.push_str(&format!(
"SAVE step_{}_result TO \"results\"\n",
step.order
));
}
"POST" | "PUT" | "PATCH" | "DELETE HTTP" => {
for api_call in &step.api_calls {
code.push_str(&format!(
"{} \"{}\" INTO api_result_{}\n",
keyword, api_call.url_template, step.order
));
}
}
"USE_MCP" => {
for mcp_server in &step.mcp_servers {
code.push_str(&format!(
"mcp_result_{} = USE_MCP \"{}\", \"{}\"\n",
step.order, mcp_server, step.description
));
}
}
"SEND_MAIL" => {
code.push_str(&format!(
"SEND_MAIL \"status@bot.local\", \"Step {} Complete\", \"{}\"\n",
step.order, step.description
));
}
_ => {
// Generic keyword usage
code.push_str(&format!("' Using keyword: {}\n", keyword));
}
}
}
// Mark outputs
for output in &step.outputs {
code.push_str(&format!("SET output_{} = result_{}\n", output, step.order));
}
// Audit log en

View file

@ -0,0 +1,911 @@
//! MCP Client - Model Context Protocol Server Integration
//!
//! This module provides the MCP (Model Context Protocol) client functionality
//! that enables BASIC programs to call registered MCP servers for extended
//! capabilities. It supports tool discovery, invocation, and result handling.
//!
//! # Architecture
//!
//! ```text
//! BASIC Program → USE_MCP → MCP Client → MCP Server → Execute Tool → Return Result
//! ↓ ↓ ↓ ↓ ↓ ↓
//! USE_MCP Resolve Connect Invoke tool Process Return to
//! "server" server & auth with params result BASIC
//! ```
//!
//! # Supported MCP Servers
//!
//! - Database Server: PostgreSQL, MySQL, SQLite connections
//! - Filesystem Server: Local and cloud file access
//! - Web Server: HTTP/REST API integrations
//! - Email Server: SMTP/IMAP email handling
//! - Slack Server: Slack workspace integration
//! - Analytics Server: Data processing and reporting
//! - Custom Servers: User-defined MCP servers
//!
//! # Example BASIC Usage
//!
//! ```basic
//! ' Use MCP server to query database
//! result = USE_MCP "database", "query", {"sql": "SELECT * FROM users"}
//!
//! ' Use MCP server to send Slack message
//! USE_MCP "slack", "send_message", {"channel": "#general", "text": "Hello!"}
//!
//! ' List available tools from a server
//! tools = MCP_LIST_TOOLS "filesystem"
//! ```
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::{DateTime, Utc};
use diesel::prelude::*;
use log::{error, info, trace, warn};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
// ============================================================================
// MCP DATA STRUCTURES
// ============================================================================
/// Represents a registered MCP server
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpServer {
/// Unique server identifier
pub id: String,
/// Server name (used in BASIC as identifier)
pub name: String,
/// Server description
pub description: String,
/// Server type/category
pub server_type: McpServerType,
/// Connection configuration
pub connection: McpConnection,
/// Authentication configuration
pub auth: McpAuth,
/// Available tools on this server
pub tools: Vec<McpTool>,
/// Server capabilities
pub capabilities: McpCapabilities,
/// Server status
pub status: McpServerStatus,
/// Bot ID that owns this server config
pub bot_id: String,
/// Creation timestamp
pub created_at: DateTime<Utc>,
/// Last updated timestamp
pub updated_at: DateTime<Utc>,
/// Last health check timestamp
pub last_health_check: Option<DateTime<Utc>>,
/// Health check status
pub health_status: HealthStatus,
}
/// MCP server types
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum McpServerType {
Database,
Filesystem,
Web,
Email,
Slack,
Teams,
Analytics,
Search,
Storage,
Compute,
Custom(String),
}
impl Default for McpServerType {
fn default() -> Self {
McpServerType::Custom("unknown".to_string())
}
}
impl From<&str> for McpServerType {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"database" | "db" => McpServerType::Database,
"filesystem" | "fs" | "file" => McpServerType::Filesystem,
"web" | "http" | "rest" | "api" => McpServerType::Web,
"email" | "mail" | "smtp" | "imap" => McpServerType::Email,
"slack" => McpServerType::Slack,
"teams" | "microsoft-teams" => McpServerType::Teams,
"analytics" | "data" => McpServerType::Analytics,
"search" | "elasticsearch" | "opensearch" => McpServerType::Search,
"storage" | "s3" | "blob" | "gcs" => McpServerType::Storage,
"compute" | "lambda" | "function" => McpServerType::Compute,
other => McpServerType::Custom(other.to_string()),
}
}
}
impl std::fmt::Display for McpServerType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
McpServerType::Database => write!(f, "database"),
McpServerType::Filesystem => write!(f, "filesystem"),
McpServerType::Web => write!(f, "web"),
McpServerType::Email => write!(f, "email"),
McpServerType::Slack => write!(f, "slack"),
McpServerType::Teams => write!(f, "teams"),
McpServerType::Analytics => write!(f, "analytics"),
McpServerType::Search => write!(f, "search"),
McpServerType::Storage => write!(f, "storage"),
McpServerType::Compute => write!(f, "compute"),
McpServerType::Custom(s) => write!(f, "{}", s),
}
}
}
/// MCP connection configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpConnection {
/// Connection type
pub connection_type: ConnectionType,
/// Server URL or path
pub url: String,
/// Connection port (if applicable)
pub port: Option<u16>,
/// Connection timeout in seconds
pub timeout_seconds: i32,
/// Maximum retries
pub max_retries: i32,
/// Retry backoff in milliseconds
pub retry_backoff_ms: i32,
/// Keep-alive settings
pub keep_alive: bool,
/// SSL/TLS configuration
pub tls_config: Option<TlsConfig>,
}
impl Default for McpConnection {
fn default() -> Self {
McpConnection {
connection_type: ConnectionType::Http,
url: "http://localhost:8080".to_string(),
port: None,
timeout_seconds: 30,
max_retries: 3,
retry_backoff_ms: 1000,
keep_alive: true,
tls_config: None,
}
}
}
/// Connection type for MCP server
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ConnectionType {
/// HTTP/HTTPS connection
Http,
/// WebSocket connection
WebSocket,
/// gRPC connection
Grpc,
/// Unix socket
UnixSocket,
/// Standard IO (for local processes)
Stdio,
/// TCP socket
Tcp,
}
impl Default for ConnectionType {
fn default() -> Self {
ConnectionType::Http
}
}
/// TLS configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub enabled: bool,
pub verify_certificates: bool,
pub ca_cert_path: Option<String>,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
}
/// MCP authentication configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpAuth {
/// Authentication type
pub auth_type: McpAuthType,
/// Credentials (stored securely, reference only)
pub credentials: McpCredentials,
}
impl Default for McpAuth {
fn default() -> Self {
McpAuth {
auth_type: McpAuthType::None,
credentials: McpCredentials::None,
}
}
}
/// Authentication types
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum McpAuthType {
None,
ApiKey,
Bearer,
Basic,
OAuth2,
Certificate,
Custom(String),
}
impl Default for McpAuthType {
fn default() -> Self {
McpAuthType::None
}
}
/// Credentials storage (references, not actual secrets)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum McpCredentials {
None,
ApiKey {
header_name: String,
key_ref: String, // Reference to secret storage
},
Bearer {
token_ref: String,
},
Basic {
username_ref: String,
password_ref: String,
},
OAuth2 {
client_id_ref: String,
client_secret_ref: String,
token_url: String,
scopes: Vec<String>,
},
Certificate {
cert_ref: String,
key_ref: String,
},
Custom(HashMap<String, String>),
}
impl Default for McpCredentials {
fn default() -> Self {
McpCredentials::None
}
}
/// MCP tool definition
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpTool {
/// Tool name
pub name: String,
/// Tool description
pub description: String,
/// Input schema (JSON Schema)
pub input_schema: serde_json::Value,
/// Output schema (JSON Schema)
pub output_schema: Option<serde_json::Value>,
/// Required permissions
pub required_permissions: Vec<String>,
/// Risk level of this tool
pub risk_level: ToolRiskLevel,
/// Whether this tool modifies data
pub is_destructive: bool,
/// Whether this tool requires approval
pub requires_approval: bool,
/// Rate limit (calls per minute)
pub rate_limit: Option<i32>,
/// Timeout for this specific tool
pub timeout_seconds: Option<i32>,
}
/// Tool risk level
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ToolRiskLevel {
Safe, // Read-only, no side effects
Low, // Minor side effects, easily reversible
Medium, // Moderate side effects, reversible with effort
High, // Significant side effects, difficult to reverse
Critical, // Irreversible actions, requires approval
}
impl Default for ToolRiskLevel {
fn default() -> Self {
ToolRiskLevel::Low
}
}
/// MCP server capabilities
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct McpCapabilities {
/// Supports tool listing
pub tools: bool,
/// Supports resource listing
pub resources: bool,
/// Supports prompts
pub prompts: bool,
/// Supports logging
pub logging: bool,
/// Supports streaming responses
pub streaming: bool,
/// Supports cancellation
pub cancellation: bool,
/// Custom capabilities
pub custom: HashMap<String, bool>,
}
/// MCP server status
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum McpServerStatus {
Active,
Inactive,
Connecting,
Error,
Maintenance,
}
impl Default for McpServerStatus {
fn default() -> Self {
McpServerStatus::Inactive
}
}
/// Health check status
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub healthy: bool,
pub last_check: Option<DateTime<Utc>>,
pub response_time_ms: Option<i64>,
pub error_message: Option<String>,
pub consecutive_failures: i32,
}
impl Default for HealthStatus {
fn default() -> Self {
HealthStatus {
healthy: false,
last_check: None,
response_time_ms: None,
error_message: None,
consecutive_failures: 0,
}
}
}
// ============================================================================
// MCP REQUEST/RESPONSE
// ============================================================================
/// MCP tool invocation request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpRequest {
/// Request ID for tracking
pub id: String,
/// Target server name
pub server: String,
/// Tool to invoke
pub tool: String,
/// Tool arguments
pub arguments: serde_json::Value,
/// Request context
pub context: McpRequestContext,
/// Timeout override
pub timeout_seconds: Option<i32>,
}
/// Request context
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpRequestContext {
pub session_id: String,
pub bot_id: String,
pub user_id: String,
pub task_id: Option<String>,
pub step_id: Option<String>,
pub correlation_id: Option<String>,
}
/// MCP tool invocation response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpResponse {
/// Response ID (matches request ID)
pub id: String,
/// Whether the invocation succeeded
pub success: bool,
/// Result data
pub result: Option<serde_json::Value>,
/// Error information
pub error: Option<McpError>,
/// Execution metadata
pub metadata: McpResponseMetadata,
}
/// MCP error
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpError {
pub code: String,
pub message: String,
pub details: Option<serde_json::Value>,
pub retryable: bool,
}
/// Response metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpResponseMetadata {
pub duration_ms: i64,
pub server_version: Option<String>,
pub rate_limit_remaining: Option<i32>,
pub rate_limit_reset: Option<DateTime<Utc>>,
}
// ============================================================================
// MCP CLIENT
// ============================================================================
/// The MCP Client for managing server connections and tool invocations
pub struct McpClient {
state: Arc<AppState>,
config: McpClientConfig,
servers: HashMap<String, McpServer>,
http_client: reqwest::Client,
}
/// MCP Client configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpClientConfig {
/// Enable/disable MCP functionality
pub enabled: bool,
/// Default timeout for all requests
pub default_timeout_seconds: i32,
/// Maximum concurrent requests
pub max_concurrent_requests: i32,
/// Enable request caching
pub cache_enabled: bool,
/// Cache TTL in seconds
pub cache_ttl_seconds: i32,
/// Enable audit logging
pub audit_enabled: bool,
/// Health check interval in seconds
pub health_check_interval_seconds: i32,
/// Auto-retry failed requests
pub auto_retry: bool,
/// Circuit breaker threshold
pub circuit_breaker_threshold: i32,
}
impl Default for McpClientConfig {
fn default() -> Self {
McpClientConfig {
enabled: true,
default_timeout_seconds: 30,
max_concurrent_requests: 10,
cache_enabled: true,
cache_ttl_seconds: 300,
audit_enabled: true,
health_check_interval_seconds: 60,
auto_retry: true,
circuit_breaker_threshold: 5,
}
}
}
impl std::fmt::Debug for McpClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("McpClient")
.field("config", &self.config)
.field("servers_count", &self.servers.len())
.finish()
}
}
impl McpClient {
/// Create a new MCP client
pub fn new(state: Arc<AppState>) -> Self {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default();
McpClient {
state,
config: McpClientConfig::default(),
servers: HashMap::new(),
http_client,
}
}
/// Create a new MCP client with custom configuration
pub fn with_config(state: Arc<AppState>, config: McpClientConfig) -> Self {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(config.default_timeout_seconds as u64))
.build()
.unwrap_or_default();
McpClient {
state,
config,
servers: HashMap::new(),
http_client,
}
}
/// Load servers from database for a bot
pub async fn load_servers(&mut self, bot_id: &Uuid) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let bot_id_str = bot_id.to_string();
let query = diesel::sql_query(
"SELECT id, name, description, server_type, config, status, created_at, updated_at
FROM mcp_servers WHERE bot_id = $1 AND status != 'deleted'"
)
.bind::<diesel::sql_types::Text, _>(&bot_id_str);
#[derive(QueryableByName)]
struct ServerRow {
#[diesel(sql_type = diesel::sql_types::Text)]
id: String,
#[diesel(sql_type = diesel::sql_types::Text)]
name: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
description: Option<String>,
#[diesel(sql_type = diesel::sql_types::Text)]
server_type: String,
#[diesel(sql_type = diesel::sql_types::Text)]
config: String,
#[diesel(sql_type = diesel::sql_types::Text)]
status: String,
}
let rows: Vec<ServerRow> = query.load(&mut *conn).unwrap_or_default();
for row in rows {
let server = McpServer {
id: row.id.clone(),
name: row.name.clone(),
description: row.description.unwrap_or_default(),
server_type: McpServerType::from(row.server_type.as_str()),
connection: serde_json::from_str(&row.config).unwrap_or_default(),
auth: McpAuth::default(),
tools: Vec::new(),
capabilities: McpCapabilities::default(),
status: match row.status.as_str() {
"active" => McpServerStatus::Active,
"inactive" => McpServerStatus::Inactive,
"error" => McpServerStatus::Error,
"maintenance" => McpServerStatus::Maintenance,
_ => McpServerStatus::Inactive,
},
bot_id: bot_id_str.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
last_health_check: None,
health_status: HealthStatus::default(),
};
self.servers.insert(row.name, server);
}
info!("Loaded {} MCP servers for bot {}", self.servers.len(), bot_id);
Ok(())
}
/// Register a new MCP server
pub async fn register_server(&mut self, server: McpServer) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let config_json = serde_json::to_string(&server.connection)?;
let now = Utc::now().to_rfc3339();
let query = diesel::sql_query(
"INSERT INTO mcp_servers (id, bot_id, name, description, server_type, config, status, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (bot_id, name) DO UPDATE SET
description = EXCLUDED.description,
server_type = EXCLUDED.server_type,
config = EXCLUDED.config,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at"
)
.bind::<diesel::sql_types::Text, _>(&server.id)
.bind::<diesel::sql_types::Text, _>(&server.bot_id)
.bind::<diesel::sql_types::Text, _>(&server.name)
.bind::<diesel::sql_types::Text, _>(&server.description)
.bind::<diesel::sql_types::Text, _>(&server.server_type.to_string())
.bind::<diesel::sql_types::Text, _>(&config_json)
.bind::<diesel::sql_types::Text, _>("active")
.bind::<diesel::sql_types::Text, _>(&now)
.bind::<diesel::sql_types::Text, _>(&now);
query.execute(&mut *conn).map_err(|e| format!("Failed to register MCP server: {}", e))?;
self.servers.insert(server.name.clone(), server);
Ok(())
}
/// Get a server by name
pub fn get_server(&self, name: &str) -> Option<&McpServer> {
self.servers.get(name)
}
/// List all registered servers
pub fn list_servers(&self) -> Vec<&McpServer> {
self.servers.values().collect()
}
/// List tools from a specific server
pub async fn list_tools(&self, server_name: &str) -> Result<Vec<McpTool>, Box<dyn std::error::Error + Send + Sync>> {
let server = self.servers.get(server_name)
.ok_or_else(|| format!("MCP server '{}' not found", server_name))?;
// For HTTP-based servers, call the tools/list endpoint
if server.connection.connection_type == ConnectionType::Http {
let url = format!("{}/tools/list", server.connection.url);
let response = self.http_client
.get(&url)
.timeout(Duration::from_secs(server.connection.timeout_seconds as u64))
.send()
.await?;
if response.status().is_success() {
let tools: Vec<McpTool> = response.json().await?;
return Ok(tools);
}
}
// Return cached tools if available
Ok(server.tools.clone())
}
/// Invoke a tool on an MCP server
pub async fn invoke_tool(
&self,
request: McpRequest,
) -> Result<McpResponse, Box<dyn std::error::Error + Send + Sync>> {
let start_time = std::time::Instant::now();
// Get server
let server = self.servers.get(&request.server)
.ok_or_else(|| format!("MCP server '{}' not found", request.server))?;
// Check server status
if server.status != McpServerStatus::Active {
return Ok(McpResponse {
id: request.id,
success: false,
result: None,
error: Some(McpError {
code: "SERVER_UNAVAILABLE".to_string(),
message: format!("MCP server '{}' is not active (status: {:?})", request.server, server.status),
details: None,
retryable: true,
}),
metadata: McpResponseMetadata {
duration_ms: start_time.elapsed().as_millis() as i64,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
});
}
// Audit log the request
if self.config.audit_enabled {
self.audit_request(&request).await;
}
// Execute based on connection type
let result = match server.connection.connection_type {
ConnectionType::Http => self.invoke_http(server, &request).await,
ConnectionType::Stdio => self.invoke_stdio(server, &request).await,
_ => Err(format!("Connection type {:?} not yet supported", server.connection.connection_type).into()),
};
let duration_ms = start_time.elapsed().as_millis() as i64;
match result {
Ok(mut response) => {
response.metadata.duration_ms = duration_ms;
// Audit log the response
if self.config.audit_enabled {
self.audit_response(&request, &response).await;
}
Ok(response)
}
Err(e) => {
let response = McpResponse {
id: request.id.clone(),
success: false,
result: None,
error: Some(McpError {
code: "INVOCATION_ERROR".to_string(),
message: e.to_string(),
details: None,
retryable: true,
}),
metadata: McpResponseMetadata {
duration_ms,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
};
// Audit log the error
if self.config.audit_enabled {
self.audit_response(&request, &response).await;
}
Ok(response)
}
}
}
/// Invoke tool via HTTP
async fn invoke_http(
&self,
server: &McpServer,
request: &McpRequest,
) -> Result<McpResponse, Box<dyn std::error::Error + Send + Sync>> {
let url = format!("{}/tools/call", server.connection.url);
let body = serde_json::json!({
"name": request.tool,
"arguments": request.arguments
});
let timeout = request.timeout_seconds
.unwrap_or(server.connection.timeout_seconds);
let mut http_request = self.http_client
.post(&url)
.json(&body)
.timeout(Duration::from_secs(timeout as u64));
// Add authentication headers
http_request = self.add_auth_headers(http_request, &server.auth);
let response = http_request.send().await?;
let status = response.status();
if status.is_success() {
let result: serde_json::Value = response.json().await?;
Ok(McpResponse {
id: request.id.clone(),
success: true,
result: Some(result),
error: None,
metadata: McpResponseMetadata {
duration_ms: 0,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
})
} else {
let error_text = response.text().await.unwrap_or_default();
Ok(McpResponse {
id: request.id.clone(),
success: false,
result: None,
error: Some(McpError {
code: format!("HTTP_{}", status.as_u16()),
message: error_text,
details: None,
retryable: status.as_u16() >= 500,
}),
metadata: McpResponseMetadata {
duration_ms: 0,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
})
}
}
/// Invoke tool via stdio (local process)
async fn invoke_stdio(
&self,
server: &McpServer,
request: &McpRequest,
) -> Result<McpResponse, Box<dyn std::error::Error + Send + Sync>> {
use tokio::process::Command;
let input = serde_json::json!({
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": request.tool,
"arguments": request.arguments
},
"id": request.id
});
let output = Command::new(&server.connection.url)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?
.wait_with_output()
.await?;
if output.status.success() {
let result: serde_json::Value = serde_json::from_slice(&output.stdout)?;
Ok(McpResponse {
id: request.id.clone(),
success: true,
result: result.get("result").cloned(),
error: None,
metadata: McpResponseMetadata {
duration_ms: 0,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
})
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
Ok(McpResponse {
id: request.id.clone(),
success: false,
result: None,
error: Some(McpError {
code: "STDIO_ERROR".to_string(),
message: stderr.to_string(),
details: None,
retryable: false,
}),
metadata: McpResponseMetadata {
duration_ms: 0,
server_version: None,
rate_limit_remaining: None,
rate_limit_reset: None,
},
})
}
}
/// Add authentication headers to request
fn add_auth_headers(
&self,
mut request: reqwest::RequestBuilder,
auth: &McpAuth,
) -> reqwest::RequestBuilder {
match &auth.credentials {
McpCredentials::ApiKey { header_name, key_ref } => {
// In production, resolve key_ref from secret storage
request = request.header(header_name.as_str(), key_ref.as_str());
}
McpCredentials::Bearer { token_ref } => {
request = request.bearer_auth(token_ref);
}
McpCredentials::Basic { username_ref, password_ref } => {
request = request.basic_auth(username_ref, Some(password_ref));
}
_ => {}
}
request
}
/// Perform health check on a server
pub async fn health_check(&mut self, server_name: &str) -> Result<HealthStatus, Box<dyn std::error::Error + Send + Sync>> {
let server = self.servers.get_mut(server_name)
.ok_or_else(|| format!("MCP server '{}' not found", server_name))?;
let start_time = std::time::Instant::now();
let health_url = format!("{}/health", server.connection.url);
let result = self.http_client
.get(&health_url)
.timeout(Duration::from_secs(5))
.send()
.await

View file

@ -6,6 +6,8 @@ pub mod agent_reflection;
pub mod ai_tools;
pub mod api_tool_generator;
pub mod arrays;
pub mod auto_task;
pub mod autotask_api;
pub mod book;
pub mod bot_memory;
pub mod clear_kb;
@ -30,6 +32,7 @@ pub mod hear_talk;
pub mod http_operations;
pub mod human_approval;
pub mod import_export;
pub mod intent_compiler;
pub mod kb_statistics;
pub mod knowledge_graph;
pub mod last;
@ -37,6 +40,7 @@ pub mod lead_scoring;
pub mod llm_keyword;
pub mod llm_macros;
pub mod math;
pub mod mcp_client;
pub mod messaging;
pub mod model_routing;
pub mod multimodal;
@ -47,6 +51,7 @@ pub mod print;
pub mod procedures;
pub mod qrcode;
pub mod remember;
pub mod safety_layer;
pub mod save_from_unstructured;
pub mod send_mail;
pub mod send_template;
@ -71,3 +76,310 @@ pub mod wait;
pub mod weather;
pub mod web_data;
pub mod webhook;
// Re-export key types for convenience
pub use auto_task::{AutoTask, AutoTaskStatus, ExecutionMode, TaskPriority};
pub use intent_compiler::{CompiledIntent, ExecutionPlan, IntentCompiler, PlanStep};
pub use mcp_client::{McpClient, McpRequest, McpResponse, McpServer, McpTool};
pub use safety_layer::{AuditEntry, ConstraintCheckResult, SafetyLayer, SimulationResult};
// Re-export API handlers for route configuration
pub use autotask_api::{
cancel_task_handler, compile_intent_handler, execute_plan_handler, get_approvals_handler,
get_decisions_handler, get_stats_handler, list_tasks_handler, pause_task_handler,
resume_task_handler, simulate_plan_handler, simulate_task_handler, submit_approval_handler,
submit_decision_handler,
};
/// Configure Auto Task API routes
pub fn configure_autotask_routes() -> axum::Router<std::sync::Arc<crate::shared::state::AppState>> {
use axum::routing::{get, post};
axum::Router::new()
// Intent compilation
.route("/api/autotask/compile", post(compile_intent_handler))
// Plan execution
.route("/api/autotask/execute", post(execute_plan_handler))
.route(
"/api/autotask/simulate/:plan_id",
post(simulate_plan_handler),
)
// Task listing and stats
.route("/api/autotask/list", get(list_tasks_handler))
.route("/api/autotask/stats", get(get_stats_handler))
// Task actions
.route("/api/autotask/:task_id/pause", post(pause_task_handler))
.route("/api/autotask/:task_id/resume", post(resume_task_handler))
.route("/api/autotask/:task_id/cancel", post(cancel_task_handler))
.route(
"/api/autotask/:task_id/simulate",
post(simulate_task_handler),
)
// Decisions
.route(
"/api/autotask/:task_id/decisions",
get(get_decisions_handler),
)
.route(
"/api/autotask/:task_id/decide",
post(submit_decision_handler),
)
// Approvals
.route(
"/api/autotask/:task_id/approvals",
get(get_approvals_handler),
)
.route(
"/api/autotask/:task_id/approve",
post(submit_approval_handler),
)
}
/// List of all available BASIC keywords for the Intent Compiler
pub fn get_all_keywords() -> Vec<String> {
vec![
// Bot/Multi-Agent
"ADD BOT".to_string(),
"BOT REFLECTION".to_string(),
"BROADCAST TO BOTS".to_string(),
"DELEGATE TO BOT".to_string(),
"TRANSFER CONVERSATION".to_string(),
// Communication
"ADD MEMBER".to_string(),
"CREATE DRAFT".to_string(),
"SEND MAIL".to_string(),
"SEND TEMPLATE".to_string(),
"SMS".to_string(),
// UI
"ADD SUGGESTION".to_string(),
"CLEAR SUGGESTIONS".to_string(),
// Tools
"ADD TOOL".to_string(),
"CLEAR TOOLS".to_string(),
"CREATE SITE".to_string(),
"CREATE TASK".to_string(),
"USE TOOL".to_string(),
// Data Operations
"AGGREGATE".to_string(),
"DELETE".to_string(),
"FILL".to_string(),
"FILTER".to_string(),
"FIND".to_string(),
"FIRST".to_string(),
"GROUP BY".to_string(),
"INSERT".to_string(),
"JOIN".to_string(),
"LAST".to_string(),
"MAP".to_string(),
"MERGE".to_string(),
"PIVOT".to_string(),
"SAVE".to_string(),
"SAVE FROM UNSTRUCTURED".to_string(),
"UPDATE".to_string(),
// Files
"COMPRESS".to_string(),
"COPY".to_string(),
"DELETE FILE".to_string(),
"DOWNLOAD".to_string(),
"EXTRACT".to_string(),
"GENERATE PDF".to_string(),
"LIST".to_string(),
"MERGE PDF".to_string(),
"MOVE".to_string(),
"READ".to_string(),
"UPLOAD".to_string(),
"WRITE".to_string(),
// HTTP
"CLEAR HEADERS".to_string(),
"DELETE HTTP".to_string(),
"GET".to_string(),
"GRAPHQL".to_string(),
"PATCH".to_string(),
"POST".to_string(),
"PUT".to_string(),
"SET HEADER".to_string(),
"SOAP".to_string(),
// Control Flow
"EXIT FOR".to_string(),
"FOR EACH".to_string(),
"IF".to_string(),
"SWITCH".to_string(),
"WAIT".to_string(),
"WHILE".to_string(),
// Variables
"GET".to_string(),
"SET".to_string(),
// Memory
"GET BOT MEMORY".to_string(),
"GET USER MEMORY".to_string(),
"REMEMBER".to_string(),
"SET BOT MEMORY".to_string(),
"SET CONTEXT".to_string(),
"SET USER FACT".to_string(),
"SET USER MEMORY".to_string(),
"USER FACTS".to_string(),
// Knowledge
"CLEAR KB".to_string(),
"USE KB".to_string(),
"USE WEBSITE".to_string(),
// AI/LLM
"LLM".to_string(),
"SET CONTEXT".to_string(),
"USE MODEL".to_string(),
// Code Execution
"RUN BASH".to_string(),
"RUN JAVASCRIPT".to_string(),
"RUN PYTHON".to_string(),
// Dialog
"HEAR".to_string(),
"TALK".to_string(),
// Events
"ON".to_string(),
"SET SCHEDULE".to_string(),
"WEBHOOK".to_string(),
// Session
"SET USER".to_string(),
// Special
"BOOK".to_string(),
"WEATHER".to_string(),
// Debug
"PRINT".to_string(),
// String Functions
"FORMAT".to_string(),
"INSTR".to_string(),
"IS NUMERIC".to_string(),
// Safety & Approval (NEW)
"REQUIRE APPROVAL".to_string(),
"SIMULATE IMPACT".to_string(),
"CHECK CONSTRAINTS".to_string(),
"AUDIT LOG".to_string(),
// Auto Task (NEW)
"PLAN START".to_string(),
"PLAN END".to_string(),
"STEP".to_string(),
"AUTO TASK".to_string(),
// MCP Integration (NEW)
"USE MCP".to_string(),
"MCP LIST TOOLS".to_string(),
"MCP INVOKE".to_string(),
// Decision Framework (NEW)
"OPTION A OR B".to_string(),
"DECIDE".to_string(),
"ESCALATE".to_string(),
]
}
/// Keyword categories for documentation and UI
pub fn get_keyword_categories() -> std::collections::HashMap<String, Vec<String>> {
let mut categories = std::collections::HashMap::new();
categories.insert(
"Multi-Agent".to_string(),
vec![
"ADD BOT".to_string(),
"BOT REFLECTION".to_string(),
"BROADCAST TO BOTS".to_string(),
"DELEGATE TO BOT".to_string(),
"TRANSFER CONVERSATION".to_string(),
],
);
categories.insert(
"Communication".to_string(),
vec![
"ADD MEMBER".to_string(),
"CREATE DRAFT".to_string(),
"SEND MAIL".to_string(),
"SEND TEMPLATE".to_string(),
"SMS".to_string(),
],
);
categories.insert(
"Data".to_string(),
vec![
"AGGREGATE".to_string(),
"DELETE".to_string(),
"FILL".to_string(),
"FILTER".to_string(),
"FIND".to_string(),
"FIRST".to_string(),
"GROUP BY".to_string(),
"INSERT".to_string(),
"JOIN".to_string(),
"LAST".to_string(),
"MAP".to_string(),
"MERGE".to_string(),
"PIVOT".to_string(),
"SAVE".to_string(),
"UPDATE".to_string(),
],
);
categories.insert(
"HTTP".to_string(),
vec![
"GET".to_string(),
"POST".to_string(),
"PUT".to_string(),
"PATCH".to_string(),
"DELETE HTTP".to_string(),
"GRAPHQL".to_string(),
"SOAP".to_string(),
"SET HEADER".to_string(),
"CLEAR HEADERS".to_string(),
],
);
categories.insert(
"AI".to_string(),
vec![
"LLM".to_string(),
"SET CONTEXT".to_string(),
"USE MODEL".to_string(),
],
);
categories.insert(
"Code Execution".to_string(),
vec![
"RUN PYTHON".to_string(),
"RUN JAVASCRIPT".to_string(),
"RUN BASH".to_string(),
],
);
categories.insert(
"Safety".to_string(),
vec![
"REQUIRE APPROVAL".to_string(),
"SIMULATE IMPACT".to_string(),
"CHECK CONSTRAINTS".to_string(),
"AUDIT LOG".to_string(),
],
);
categories.insert(
"MCP".to_string(),
vec![
"USE MCP".to_string(),
"MCP LIST TOOLS".to_string(),
"MCP INVOKE".to_string(),
],
);
categories.insert(
"Auto Task".to_string(),
vec![
"PLAN START".to_string(),
"PLAN END".to_string(),
"STEP".to_string(),
"AUTO TASK".to_string(),
"OPTION A OR B".to_string(),
"DECIDE".to_string(),
"ESCALATE".to_string(),
],
);
categories
}

View file

@ -0,0 +1,919 @@
//! Safety Layer - Simulation, Constraints, and Audit Trail
//!
//! This module provides the safety infrastructure for the Auto Task system,
//! ensuring that all actions are validated, simulated, and audited before
//! and during execution.
//!
//! # Architecture
//!
//! ```text
//! Action Request → Constraint Check → Impact Simulation → Approval → Execute → Audit
//! ↓ ↓ ↓ ↓ ↓ ↓
//! Validate Check budget, Simulate what Get user Run with Log all
//! request permissions, will happen approval safeguards actions
//! policies (dry run) if needed
//! ```
//!
//! # Features
//!
//! - **Impact Simulation**: Dry-run execution to predict outcomes
//! - **Constraint Validation**: Budget, permissions, policies, compliance
//! - **Approval Workflow**: Multi-level approval for high-risk actions
//! - **Audit Trail**: Complete logging of all actions and decisions
//! - **Rollback Support**: Undo mechanisms for reversible actions
//! - **Rate Limiting**: Prevent runaway executions
//! - **Circuit Breaker**: Stop execution on repeated failures
use crate::shared::models::UserSession;
use crate::shared::state::AppState;
use chrono::{DateTime, Duration, Utc};
use diesel::prelude::*;
use log::{error, info, trace, warn};
use rhai::{Dynamic, Engine};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
// ============================================================================
// CONSTRAINT DATA STRUCTURES
// ============================================================================
/// Constraint check result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConstraintCheckResult {
/// Whether all constraints passed
pub passed: bool,
/// Individual constraint results
pub results: Vec<ConstraintResult>,
/// Overall risk score (0.0 - 1.0)
pub risk_score: f64,
/// Blocking constraints that must be resolved
pub blocking: Vec<String>,
/// Warnings that should be reviewed
pub warnings: Vec<String>,
/// Suggestions for improvement
pub suggestions: Vec<String>,
}
impl Default for ConstraintCheckResult {
fn default() -> Self {
ConstraintCheckResult {
passed: true,
results: Vec::new(),
risk_score: 0.0,
blocking: Vec::new(),
warnings: Vec::new(),
suggestions: Vec::new(),
}
}
}
/// Individual constraint check result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConstraintResult {
/// Constraint identifier
pub constraint_id: String,
/// Constraint type
pub constraint_type: ConstraintType,
/// Whether this constraint passed
pub passed: bool,
/// Severity if failed
pub severity: ConstraintSeverity,
/// Human-readable message
pub message: String,
/// Additional details
pub details: Option<serde_json::Value>,
/// Suggested remediation
pub remediation: Option<String>,
}
/// Types of constraints
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ConstraintType {
/// Budget/cost constraints
Budget,
/// User permissions
Permission,
/// Organizational policies
Policy,
/// Regulatory compliance
Compliance,
/// Technical limitations
Technical,
/// Rate limits
RateLimit,
/// Time-based constraints
TimeWindow,
/// Data access constraints
DataAccess,
/// Security constraints
Security,
/// Resource availability
Resource,
/// Custom constraint
Custom(String),
}
impl Default for ConstraintType {
fn default() -> Self {
ConstraintType::Custom("unknown".to_string())
}
}
impl std::fmt::Display for ConstraintType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConstraintType::Budget => write!(f, "budget"),
ConstraintType::Permission => write!(f, "permission"),
ConstraintType::Policy => write!(f, "policy"),
ConstraintType::Compliance => write!(f, "compliance"),
ConstraintType::Technical => write!(f, "technical"),
ConstraintType::RateLimit => write!(f, "rate_limit"),
ConstraintType::TimeWindow => write!(f, "time_window"),
ConstraintType::DataAccess => write!(f, "data_access"),
ConstraintType::Security => write!(f, "security"),
ConstraintType::Resource => write!(f, "resource"),
ConstraintType::Custom(s) => write!(f, "{}", s),
}
}
}
/// Constraint severity levels
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
pub enum ConstraintSeverity {
/// Informational only
Info = 0,
/// Warning - should review but can proceed
Warning = 1,
/// Error - should not proceed without override
Error = 2,
/// Critical - cannot proceed under any circumstances
Critical = 3,
}
impl Default for ConstraintSeverity {
fn default() -> Self {
ConstraintSeverity::Warning
}
}
/// Constraint definition
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Constraint {
/// Unique identifier
pub id: String,
/// Constraint name
pub name: String,
/// Constraint type
pub constraint_type: ConstraintType,
/// Description
pub description: String,
/// Evaluation expression (for dynamic constraints)
pub expression: Option<String>,
/// Static value to check against
pub threshold: Option<serde_json::Value>,
/// Severity if violated
pub severity: ConstraintSeverity,
/// Whether this constraint is enabled
pub enabled: bool,
/// Actions this constraint applies to
pub applies_to: Vec<String>,
/// Bot ID this constraint belongs to
pub bot_id: String,
}
// ============================================================================
// SIMULATION DATA STRUCTURES
// ============================================================================
/// Result of impact simulation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimulationResult {
/// Unique simulation ID
pub id: String,
/// Whether simulation completed successfully
pub success: bool,
/// Simulated outcomes for each step
pub step_outcomes: Vec<StepSimulationOutcome>,
/// Overall impact assessment
pub impact: ImpactAssessment,
/// Predicted resource usage
pub resource_usage: PredictedResourceUsage,
/// Potential side effects
pub side_effects: Vec<SideEffect>,
/// Recommended actions
pub recommendations: Vec<Recommendation>,
/// Confidence in the simulation (0.0 - 1.0)
pub confidence: f64,
/// Simulation timestamp
pub simulated_at: DateTime<Utc>,
/// Duration of simulation in ms
pub simulation_duration_ms: i64,
}
impl Default for SimulationResult {
fn default() -> Self {
SimulationResult {
id: Uuid::new_v4().to_string(),
success: true,
step_outcomes: Vec::new(),
impact: ImpactAssessment::default(),
resource_usage: PredictedResourceUsage::default(),
side_effects: Vec::new(),
recommendations: Vec::new(),
confidence: 0.0,
simulated_at: Utc::now(),
simulation_duration_ms: 0,
}
}
}
/// Outcome of simulating a single step
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepSimulationOutcome {
/// Step ID
pub step_id: String,
/// Step name
pub step_name: String,
/// Whether step would succeed
pub would_succeed: bool,
/// Probability of success (0.0 - 1.0)
pub success_probability: f64,
/// Predicted outputs
pub predicted_outputs: serde_json::Value,
/// Potential failure modes
pub failure_modes: Vec<FailureMode>,
/// Time estimate in seconds
pub estimated_duration_seconds: i32,
/// Dependencies that would be affected
pub affected_dependencies: Vec<String>,
}
/// Potential failure mode
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureMode {
/// Failure type
pub failure_type: String,
/// Probability (0.0 - 1.0)
pub probability: f64,
/// Impact description
pub impact: String,
/// Mitigation strategy
pub mitigation: Option<String>,
/// Whether this is recoverable
pub recoverable: bool,
}
/// Overall impact assessment
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImpactAssessment {
/// Overall risk score (0.0 - 1.0)
pub risk_score: f64,
/// Risk level classification
pub risk_level: RiskLevel,
/// Data impact
pub data_impact: DataImpact,
/// Cost impact
pub cost_impact: CostImpact,
/// Time impact
pub time_impact: TimeImpact,
/// Security impact
pub security_impact: SecurityImpact,
/// Summary description
pub summary: String,
}
impl Default for ImpactAssessment {
fn default() -> Self {
ImpactAssessment {
risk_score: 0.0,
risk_level: RiskLevel::Low,
data_impact: DataImpact::default(),
cost_impact: CostImpact::default(),
time_impact: TimeImpact::default(),
security_impact: SecurityImpact::default(),
summary: "No impact assessed".to_string(),
}
}
}
/// Risk level classification
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
pub enum RiskLevel {
None = 0,
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
impl Default for RiskLevel {
fn default() -> Self {
RiskLevel::Low
}
}
impl std::fmt::Display for RiskLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RiskLevel::None => write!(f, "None"),
RiskLevel::Low => write!(f, "Low"),
RiskLevel::Medium => write!(f, "Medium"),
RiskLevel::High => write!(f, "High"),
RiskLevel::Critical => write!(f, "Critical"),
}
}
}
/// Data impact assessment
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataImpact {
/// Records that would be created
pub records_created: i32,
/// Records that would be modified
pub records_modified: i32,
/// Records that would be deleted
pub records_deleted: i32,
/// Tables affected
pub tables_affected: Vec<String>,
/// Data sources affected
pub data_sources_affected: Vec<String>,
/// Whether changes are reversible
pub reversible: bool,
/// Backup required
pub backup_required: bool,
}
impl Default for DataImpact {
fn default() -> Self {
DataImpact {
records_created: 0,
records_modified: 0,
records_deleted: 0,
tables_affected: Vec::new(),
data_sources_affected: Vec::new(),
reversible: true,
backup_required: false,
}
}
}
/// Cost impact assessment
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CostImpact {
/// Estimated API costs
pub api_costs: f64,
/// Estimated compute costs
pub compute_costs: f64,
/// Estimated storage costs
pub storage_costs: f64,
/// Total estimated cost
pub total_estimated_cost: f64,
/// Cost currency
pub currency: String,
/// Whether this exceeds budget
pub exceeds_budget: bool,
/// Budget remaining after this action
pub budget_remaining: Option<f64>,
}
impl Default for CostImpact {
fn default() -> Self {
CostImpact {
api_costs: 0.0,
compute_costs: 0.0,
storage_costs: 0.0,
total_estimated_cost: 0.0,
currency: "USD".to_string(),
exceeds_budget: false,
budget_remaining: None,
}
}
}
/// Time impact assessment
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeImpact {
/// Estimated execution time in seconds
pub estimated_duration_seconds: i32,
/// Whether this blocks other tasks
pub blocking: bool,
/// Tasks that would be delayed
pub delayed_tasks: Vec<String>,
/// Deadline impact
pub affects_deadline: bool,
}
impl Default for TimeImpact {
fn default() -> Self {
TimeImpact {
estimated_duration_seconds: 0,
blocking: false,
delayed_tasks: Vec::new(),
affects_deadline: false,
}
}
}
/// Security impact assessment
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityImpact {
/// Security risk level
pub risk_level: RiskLevel,
/// Credentials accessed
pub credentials_accessed: Vec<String>,
/// External systems contacted
pub external_systems: Vec<String>,
/// Data exposure risk
pub data_exposure_risk: bool,
/// Requires elevated permissions
pub requires_elevation: bool,
/// Security concerns
pub concerns: Vec<String>,
}
impl Default for SecurityImpact {
fn default() -> Self {
SecurityImpact {
risk_level: RiskLevel::Low,
credentials_accessed: Vec::new(),
external_systems: Vec::new(),
data_exposure_risk: false,
requires_elevation: false,
concerns: Vec::new(),
}
}
}
/// Predicted resource usage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PredictedResourceUsage {
/// CPU usage percentage
pub cpu_percent: f64,
/// Memory usage in MB
pub memory_mb: f64,
/// Network bandwidth in KB/s
pub network_kbps: f64,
/// Disk I/O in KB/s
pub disk_io_kbps: f64,
/// Number of API calls
pub api_calls: i32,
/// Number of database queries
pub db_queries: i32,
/// LLM tokens used
pub llm_tokens: i32,
}
impl Default for PredictedResourceUsage {
fn default() -> Self {
PredictedResourceUsage {
cpu_percent: 0.0,
memory_mb: 0.0,
network_kbps: 0.0,
disk_io_kbps: 0.0,
api_calls: 0,
db_queries: 0,
llm_tokens: 0,
}
}
}
/// Potential side effect
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SideEffect {
/// Side effect type
pub effect_type: String,
/// Description
pub description: String,
/// Severity
pub severity: ConstraintSeverity,
/// Affected systems
pub affected_systems: Vec<String>,
/// Whether this is intentional
pub intentional: bool,
/// Mitigation if unintentional
pub mitigation: Option<String>,
}
/// Recommendation from simulation
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Recommendation {
/// Recommendation type
pub recommendation_type: RecommendationType,
/// Priority
pub priority: i32,
/// Description
pub description: String,
/// Action to take
pub action: Option<String>,
/// BASIC code to implement
pub basic_code: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecommendationType {
/// Add a safety check
AddSafetyCheck,
/// Add error handling
AddErrorHandling,
/// Request approval
RequestApproval,
/// Add backup step
AddBackup,
/// Optimize performance
Optimize,
/// Split into smaller steps
SplitSteps,
/// Add monitoring
AddMonitoring,
/// Custom recommendation
Custom(String),
}
// ============================================================================
// AUDIT TRAIL DATA STRUCTURES
// ============================================================================
/// Audit log entry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditEntry {
/// Unique audit entry ID
pub id: String,
/// Timestamp
pub timestamp: DateTime<Utc>,
/// Event type
pub event_type: AuditEventType,
/// Actor (user or system)
pub actor: AuditActor,
/// Action performed
pub action: String,
/// Target of the action
pub target: AuditTarget,
/// Outcome
pub outcome: AuditOutcome,
/// Details
pub details: serde_json::Value,
/// Related entities
pub related_entities: Vec<RelatedEntity>,
/// Session ID
pub session_id: String,
/// Bot ID
pub bot_id: String,
/// Task ID if applicable
pub task_id: Option<String>,
/// Step ID if applicable
pub step_id: Option<String>,
/// IP address
pub ip_address: Option<String>,
/// User agent
pub user_agent: Option<String>,
/// Risk level of the action
pub risk_level: RiskLevel,
/// Whether this was auto-executed
pub auto_executed: bool,
}
/// Audit event types
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AuditEventType {
/// Task lifecycle events
TaskCreated,
TaskStarted,
TaskCompleted,
TaskFailed,
TaskCancelled,
TaskPaused,
TaskResumed,
/// Step events
StepStarted,
StepCompleted,
StepFailed,
StepSkipped,
StepRolledBack,
/// Approval events
ApprovalRequested,
ApprovalGranted,
ApprovalDenied,
ApprovalExpired,
/// Decision events
DecisionRequested,
DecisionMade,
DecisionTimeout,
/// Simulation events
SimulationStarted,
SimulationCompleted,
/// Constraint events
ConstraintChecked,
ConstraintViolated,
ConstraintOverridden,
/// Data events
DataRead,
DataCreated,
DataModified,
DataDeleted,
/// External events
ApiCalled,
McpInvoked,
WebhookTriggered,
/// Security events
PermissionChecked,
PermissionDenied,
CredentialAccessed,
/// System events
ConfigChanged,
ErrorOccurred,
WarningRaised,
/// Custom event
Custom(String),
}
impl std::fmt::Display for AuditEventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuditEventType::TaskCreated => write!(f, "task_created"),
AuditEventType::TaskStarted => write!(f, "task_started"),
AuditEventType::TaskCompleted => write!(f, "task_completed"),
AuditEventType::TaskFailed => write!(f, "task_failed"),
AuditEventType::TaskCancelled => write!(f, "task_cancelled"),
AuditEventType::TaskPaused => write!(f, "task_paused"),
AuditEventType::TaskResumed => write!(f, "task_resumed"),
AuditEventType::StepStarted => write!(f, "step_started"),
AuditEventType::StepCompleted => write!(f, "step_completed"),
AuditEventType::StepFailed => write!(f, "step_failed"),
AuditEventType::StepSkipped => write!(f, "step_skipped"),
AuditEventType::StepRolledBack => write!(f, "step_rolled_back"),
AuditEventType::ApprovalRequested => write!(f, "approval_requested"),
AuditEventType::ApprovalGranted => write!(f, "approval_granted"),
AuditEventType::ApprovalDenied => write!(f, "approval_denied"),
AuditEventType::ApprovalExpired => write!(f, "approval_expired"),
AuditEventType::DecisionRequested => write!(f, "decision_requested"),
AuditEventType::DecisionMade => write!(f, "decision_made"),
AuditEventType::DecisionTimeout => write!(f, "decision_timeout"),
AuditEventType::SimulationStarted => write!(f, "simulation_started"),
AuditEventType::SimulationCompleted => write!(f, "simulation_completed"),
AuditEventType::ConstraintChecked => write!(f, "constraint_checked"),
AuditEventType::ConstraintViolated => write!(f, "constraint_violated"),
AuditEventType::ConstraintOverridden => write!(f, "constraint_overridden"),
AuditEventType::DataRead => write!(f, "data_read"),
AuditEventType::DataCreated => write!(f, "data_created"),
AuditEventType::DataModified => write!(f, "data_modified"),
AuditEventType::DataDeleted => write!(f, "data_deleted"),
AuditEventType::ApiCalled => write!(f, "api_called"),
AuditEventType::McpInvoked => write!(f, "mcp_invoked"),
AuditEventType::WebhookTriggered => write!(f, "webhook_triggered"),
AuditEventType::PermissionChecked => write!(f, "permission_checked"),
AuditEventType::PermissionDenied => write!(f, "permission_denied"),
AuditEventType::CredentialAccessed => write!(f, "credential_accessed"),
AuditEventType::ConfigChanged => write!(f, "config_changed"),
AuditEventType::ErrorOccurred => write!(f, "error_occurred"),
AuditEventType::WarningRaised => write!(f, "warning_raised"),
AuditEventType::Custom(s) => write!(f, "{}", s),
}
}
}
/// Actor in an audit event
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditActor {
/// Actor type
pub actor_type: ActorType,
/// Actor ID
pub id: String,
/// Actor name
pub name: Option<String>,
/// Actor role
pub role: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ActorType {
User,
Bot,
System,
External,
Anonymous,
}
/// Target of an audit action
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditTarget {
/// Target type
pub target_type: String,
/// Target ID
pub id: String,
/// Target name
pub name: Option<String>,
/// Additional properties
pub properties: HashMap<String, String>,
}
/// Outcome of an audit action
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditOutcome {
/// Whether action succeeded
pub success: bool,
/// Result code
pub result_code: Option<String>,
/// Result message
pub message: Option<String>,
/// Duration in milliseconds
pub duration_ms: Option<i64>,
/// Error details if failed
pub error: Option<String>,
}
/// Related entity in audit
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelatedEntity {
/// Entity type
pub entity_type: String,
/// Entity ID
pub entity_id: String,
/// Relationship
pub relationship: String,
}
// ============================================================================
// SAFETY LAYER ENGINE
// ============================================================================
/// The Safety Layer engine
pub struct SafetyLayer {
state: Arc<AppState>,
config: SafetyConfig,
constraints: Vec<Constraint>,
}
/// Safety Layer configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafetyConfig {
/// Enable/disable safety layer
pub enabled: bool,
/// Enable constraint checking
pub check_constraints: bool,
/// Enable impact simulation
pub simulate_impact: bool,
/// Enable audit logging
pub audit_enabled: bool,
/// Risk level that requires approval
pub approval_threshold: RiskLevel,
/// Maximum auto-execute risk level
pub max_auto_execute_risk: RiskLevel,
/// Default budget limit (USD)
pub default_budget_limit: f64,
/// Rate limit (actions per minute)
pub rate_limit_per_minute: i32,
/// Circuit breaker failure threshold
pub circuit_breaker_threshold: i32,
/// Audit retention days
pub audit_retention_days: i32,
/// Require simulation for these action types
pub require_simulation_for: Vec<String>,
}
impl Default for SafetyConfig {
fn default() -> Self {
SafetyConfig {
enabled: true,
check_constraints: true,
simulate_impact: true,
audit_enabled: true,
approval_threshold: RiskLevel::High,
max_auto_execute_risk: RiskLevel::Low,
default_budget_limit: 100.0,
rate_limit_per_minute: 60,
circuit_breaker_threshold: 5,
audit_retention_days: 90,
require_simulation_for: vec![
"DELETE".to_string(),
"UPDATE".to_string(),
"RUN_PYTHON".to_string(),
"RUN_BASH".to_string(),
"POST".to_string(),
"PUT".to_string(),
"PATCH".to_string(),
],
}
}
}
impl std::fmt::Debug for SafetyLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SafetyLayer")
.field("config", &self.config)
.field("constraints_count", &self.constraints.len())
.finish()
}
}
impl SafetyLayer {
/// Create a new Safety Layer
pub fn new(state: Arc<AppState>) -> Self {
SafetyLayer {
state,
config: SafetyConfig::default(),
constraints: Vec::new(),
}
}
/// Create with custom configuration
pub fn with_config(state: Arc<AppState>, config: SafetyConfig) -> Self {
SafetyLayer {
state,
config,
constraints: Vec::new(),
}
}
/// Load constraints from database
pub async fn load_constraints(&mut self, bot_id: &Uuid) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.state.conn.get().map_err(|e| format!("DB error: {}", e))?;
let bot_id_str = bot_id.to_string();
let query = diesel::sql_query(
"SELECT id, name, constraint_type, description, expression, threshold, severity, enabled, applies_to
FROM safety_constraints WHERE bot_id = $1 AND enabled = true"
)
.bind::<diesel::sql_types::Text, _>(&bot_id_str);
#[derive(QueryableByName)]
struct ConstraintRow {
#[diesel(sql_type = diesel::sql_types::Text)]
id: String,
#[diesel(sql_type = diesel::sql_types::Text)]
name: String,
#[diesel(sql_type = diesel::sql_types::Text)]
constraint_type: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
description: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
expression: Option<String>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
threshold: Option<String>,
#[diesel(sql_type = diesel::sql_types::Text)]
severity: String,
#[diesel(sql_type = diesel::sql_types::Bool)]
enabled: bool,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
applies_to: Option<String>,
}
let rows: Vec<ConstraintRow> = query.load(&mut *conn).unwrap_or_default();
self.constraints = rows.into_iter().map(|row| {
Constraint {
id: row.id,
name: row.name,
constraint_type: match row.constraint_type.as_str() {
"budget" => ConstraintType::Budget,
"permission" => ConstraintType::Permission,
"policy" => ConstraintType::Policy,
"compliance" => ConstraintType::Compliance,
"technical" => ConstraintType::Technical,
"rate_limit" => ConstraintType::RateLimit,
"time_window" => ConstraintType::TimeWindow,
"data_access" => ConstraintType::DataAccess,
"security" => ConstraintType::Security,
"resource" => ConstraintType::Resource,
other => ConstraintType::Custom(other.to_string()),
},
description: row.description.unwrap_or_default(),
expression: row.expression,
threshold: row.threshold.and_then(|t| serde_json::from_str(&t).ok()),
severity: match row.severity.as_str() {
"info" => ConstraintSeverity::Info,
"warning" => ConstraintSeverity::Warning,
"error" => ConstraintSeverity::Error,
"critical" => ConstraintSeverity::Critical,
_ => ConstraintSeverity::Warning,
},
enabled: row.enabled,
applies_to: row.applies_to
.map(|s| s.split(',').map(|x| x.trim().to_string()).collect())
.unwrap_or_default(),
bot_id: bot_id_str.clone(),
}
}).collect();
info!("Loaded {} constraints for bot {}", self.constraints.len(), bot_id);
Ok(())
}
/// Check all constraints for an action
pub async fn check_constraints(
&self,
action: &str,
context: &serde_json::Value,
user: &UserSession,
) -> Result<ConstraintCheckResult, Box

View file

@ -6,14 +6,12 @@ use anyhow::Result;
use aws_config::BehaviorVersion;
use aws_sdk_s3::Client;
use diesel::{Connection, RunQueryDsl};
use log::debug;
use log::{error, info, trace, warn};
use log::{debug, error, info, warn};
use rand::distr::Alphanumeric;
use rcgen::{
BasicConstraints, CertificateParams, DistinguishedName, DnType, IsCa, Issuer, KeyPair,
};
use std::fs;
use std::io::Write;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
@ -29,11 +27,6 @@ pub struct BootstrapManager {
}
impl BootstrapManager {
pub async fn new(mode: InstallMode, tenant: Option<String>) -> Self {
trace!(
"Initializing BootstrapManager with mode {:?} and tenant {:?}",
mode,
tenant
);
Self {
install_mode: mode,
tenant,
@ -144,9 +137,37 @@ impl BootstrapManager {
}
}
/// Check if botserver-stack has installed components (indicating a working installation)
/// This is used to prevent accidental re-initialization of existing installations
fn has_installed_stack() -> bool {
let stack_dir = PathBuf::from("./botserver-stack");
if !stack_dir.exists() {
return false;
}
// Check for key indicators of an installed stack
let indicators = vec![
"./botserver-stack/bin/vault/vault",
"./botserver-stack/data/vault",
"./botserver-stack/conf/vault/config.hcl",
];
indicators.iter().any(|path| PathBuf::from(path).exists())
}
/// Reset only Vault credentials (when re-initialization is needed)
/// CRITICAL: This should NEVER be called if botserver-stack exists with installed components!
/// NEVER deletes user data in botserver-stack
fn reset_vault_only() -> Result<()> {
// SAFETY CHECK: NEVER reset if stack is installed
if Self::has_installed_stack() {
error!("REFUSING to reset Vault credentials - botserver-stack is installed!");
error!("If you need to re-initialize, manually delete botserver-stack directory first");
return Err(anyhow::anyhow!(
"Cannot reset Vault - existing installation detected. Manual intervention required."
));
}
let vault_init = PathBuf::from("./botserver-stack/conf/vault/init.json");
let env_file = PathBuf::from("./.env");
@ -212,18 +233,48 @@ impl BootstrapManager {
}
}
// Try to unseal Vault - if this fails, we need to re-initialize Vault only
// Try to unseal Vault - if this fails, we need to handle carefully
if let Err(e) = self.ensure_vault_unsealed().await {
warn!("Vault unseal failed: {} - re-initializing Vault only", e);
warn!("Vault unseal failed: {}", e);
// CRITICAL: If stack is installed, NEVER try to re-initialize
// Just try restarting Vault a few more times
if Self::has_installed_stack() {
error!("Vault failed to unseal but stack is installed - NOT re-initializing");
error!("Try manually restarting Vault or check ./botserver-stack/logs/vault/vault.log");
// Kill only Vault process and try to restart
let _ = Command::new("pkill")
.args(["-9", "-f", "botserver-stack/bin/vault"])
.output();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// Try to restart Vault
if let Err(e) = pm.start("vault") {
warn!("Failed to restart Vault: {}", e);
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Final attempt to unseal
if let Err(e) = self.ensure_vault_unsealed().await {
return Err(anyhow::anyhow!(
"Vault failed to start/unseal after restart: {}. Manual intervention required.", e
));
}
} else {
// No installed stack, safe to re-initialize
warn!("No installed stack detected - proceeding with re-initialization");
// Kill only Vault process, reset only Vault credentials
// NEVER delete user data in botserver-stack
let _ = Command::new("pkill")
.args(["-9", "-f", "botserver-stack/bin/vault"])
.output();
if let Err(e) = Self::reset_vault_only() {
error!("Failed to reset Vault: {}", e);
return Err(e);
}
// Run bootstrap to re-initialize Vault
@ -233,6 +284,7 @@ impl BootstrapManager {
info!("Vault re-initialization complete");
return Ok(());
}
}
// Initialize SecretsManager so other code can use Vault
info!("Initializing SecretsManager...");
@ -286,10 +338,10 @@ impl BootstrapManager {
if pm.is_installed(component.name) {
match pm.start(component.name) {
Ok(_child) => {
trace!("Started component: {}", component.name);
info!("Started component: {}", component.name);
}
Err(e) => {
trace!(
debug!(
"Component {} might already be running: {}",
component.name,
e
@ -400,8 +452,24 @@ impl BootstrapManager {
if let Err(e) = self.ensure_vault_unsealed().await {
warn!("Vault still not responding after restart: {}", e);
// Only now reset Vault credentials and re-initialize ONLY Vault
Self::reset_vault_only()?;
// CRITICAL: If stack is installed, NEVER try to re-initialize
// This protects existing installations from being destroyed
if Self::has_installed_stack() {
error!("CRITICAL: Vault failed but botserver-stack is installed!");
error!("REFUSING to delete init.json or .env - this would destroy your installation");
error!("Please check ./botserver-stack/logs/vault/vault.log for errors");
error!("You may need to manually restart Vault or check its configuration");
return Err(anyhow::anyhow!(
"Vault failed to start. Manual intervention required. Check logs at ./botserver-stack/logs/vault/vault.log"
));
}
// Only reset if NO installed stack (fresh/broken install)
warn!("No installed stack detected - attempting Vault re-initialization");
if let Err(reset_err) = Self::reset_vault_only() {
error!("Failed to reset Vault: {}", reset_err);
return Err(reset_err);
}
// Install/configure ONLY Vault - NOT full bootstrap
info!("Re-initializing Vault only (preserving other services)...");
@ -419,7 +487,7 @@ impl BootstrapManager {
}
}
info!("Vault re-initialization complete");
info!("Vault recovery complete");
}
// Initialize SecretsManager so other code can use Vault
@ -692,25 +760,14 @@ impl BootstrapManager {
std::thread::sleep(std::time::Duration::from_millis(200));
}
eprintln!("[DEBUG] Installing component: {}", component);
let _ = std::io::stderr().flush();
info!("Installing component: {}", component);
let install_result = pm.install(component).await;
eprintln!(
"[DEBUG] Install result for {}: {:?}",
component,
install_result.is_ok()
);
let _ = std::io::stderr().flush();
if let Err(e) = install_result {
eprintln!("[DEBUG] Failed to install component {}: {}", component, e);
error!("Failed to install component {}: {}", component, e);
if component == "vault" {
return Err(anyhow::anyhow!("Failed to install Vault: {}", e));
}
}
eprintln!("[DEBUG] Component {} installed successfully", component);
let _ = std::io::stderr().flush();
info!("Component {} installed successfully", component);
// After tables is installed, START PostgreSQL and create Zitadel config files before installing directory
@ -761,32 +818,20 @@ impl BootstrapManager {
// After Vault is installed, START the server then initialize it
if component == "vault" {
eprintln!("[VAULT DEBUG] === VAULT SETUP BLOCK ENTERED ===");
eprintln!(
"[VAULT DEBUG] Current working directory: {:?}",
std::env::current_dir()
);
eprintln!("[VAULT DEBUG] base_path: {:?}", pm.base_path);
let _ = std::io::stderr().flush();
info!("=== VAULT SETUP BLOCK ENTERED ===");
info!("Setting up Vault secrets service...");
// Verify vault binary exists and is executable
let vault_bin = PathBuf::from("./botserver-stack/bin/vault/vault");
if !vault_bin.exists() {
eprintln!("[VAULT DEBUG] Vault binary not found at {:?}", vault_bin);
let _ = std::io::stderr().flush();
error!("Vault binary not found at {:?}", vault_bin);
return Err(anyhow::anyhow!("Vault binary not found after installation"));
}
eprintln!("[VAULT DEBUG] Vault binary exists at {:?}", vault_bin);
let _ = std::io::stderr().flush();
info!("Vault binary exists at {:?}", vault_bin);
info!("Vault binary verified at {:?}", vault_bin);
// Ensure logs directory exists
let vault_log_path = PathBuf::from("./botserver-stack/logs/vault/vault.log");
if let Some(parent) = vault_log_path.parent() {
if let Err(e) = fs::create_dir_all(parent) {
eprintln!("[VAULT DEBUG] Failed to create vault logs directory: {}", e);
error!("Failed to create vault logs directory: {}", e);
}
}
@ -794,21 +839,16 @@ impl BootstrapManager {
// Ensure data directory exists
let vault_data_path = PathBuf::from("./botserver-stack/data/vault");
if let Err(e) = fs::create_dir_all(&vault_data_path) {
eprintln!("[VAULT DEBUG] Failed to create vault data directory: {}", e);
error!("Failed to create vault data directory: {}", e);
}
eprintln!("[VAULT DEBUG] Starting Vault server...");
let _ = std::io::stderr().flush();
info!("Starting Vault server...");
// Try starting vault directly first to see if it works
eprintln!("[VAULT DEBUG] Testing direct vault start...");
let direct_test = std::process::Command::new("sh")
// Try starting vault directly first
let _ = std::process::Command::new("sh")
.arg("-c")
.arg("cd ./botserver-stack/bin/vault && nohup ./vault server -config=../../conf/vault/config.hcl > ../../logs/vault/vault.log 2>&1 &")
.status();
eprintln!("[VAULT DEBUG] Direct test result: {:?}", direct_test);
std::thread::sleep(std::time::Duration::from_secs(2));
// Check if it's running now
@ -817,25 +857,17 @@ impl BootstrapManager {
.output();
if let Ok(output) = &check {
let pids = String::from_utf8_lossy(&output.stdout);
eprintln!(
"[VAULT DEBUG] After direct start, pgrep result: '{}'",
pids.trim()
);
if !pids.trim().is_empty() {
eprintln!("[VAULT DEBUG] Vault started via direct command!");
// Skip pm.start since vault is already running
info!("Vault server started");
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
} else {
eprintln!("[VAULT DEBUG] Direct start failed, trying pm.start...");
debug!("Direct start failed, trying pm.start...");
match pm.start("vault") {
Ok(_) => {
eprintln!("[VAULT DEBUG] pm.start returned Ok");
info!("Vault server started");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Err(e) => {
eprintln!("[VAULT DEBUG] pm.start failed: {}", e);
error!("Failed to start Vault server: {}", e);
return Err(anyhow::anyhow!(
"Failed to start Vault server: {}",
@ -846,39 +878,18 @@ impl BootstrapManager {
}
}
// Check log file
eprintln!(
"[VAULT DEBUG] Checking if vault.log exists: {}",
vault_log_path.exists()
);
if vault_log_path.exists() {
if let Ok(content) = fs::read_to_string(&vault_log_path) {
eprintln!(
"[VAULT DEBUG] vault.log content (first 500 chars): {}",
&content[..content.len().min(500)]
);
}
}
// The direct start above should have worked, but if pm.start is still called due to
// code flow, just check if vault is running
// Verify vault is running
let final_check = std::process::Command::new("pgrep")
.args(["-f", "vault server"])
.output();
if let Ok(output) = final_check {
let pids = String::from_utf8_lossy(&output.stdout);
if pids.trim().is_empty() {
eprintln!(
"[VAULT DEBUG] CRITICAL: Vault is not running after all attempts!"
);
error!("Vault is not running after all start attempts");
return Err(anyhow::anyhow!("Failed to start Vault server"));
} else {
eprintln!("[VAULT DEBUG] Vault is running with PIDs: {}", pids.trim());
}
}
eprintln!("[VAULT DEBUG] Initializing Vault with secrets...");
let _ = std::io::stderr().flush();
info!("Initializing Vault with secrets...");
if let Err(e) = self
.setup_vault(
@ -1457,6 +1468,19 @@ meet IN A 127.0.0.1
(unseal_key, root_token)
} else {
// Check if .env exists with VAULT_TOKEN - try to recover from that
let env_token = if env_file_path.exists() {
if let Ok(env_content) = fs::read_to_string(&env_file_path) {
env_content.lines()
.find(|line| line.starts_with("VAULT_TOKEN="))
.map(|line| line.trim_start_matches("VAULT_TOKEN=").to_string())
} else {
None
}
} else {
None
};
// Initialize Vault if not already done
info!("Initializing Vault...");
// Clear any mTLS env vars that might interfere with CLI
@ -1472,7 +1496,52 @@ meet IN A 127.0.0.1
let stderr = String::from_utf8_lossy(&init_output.stderr);
if stderr.contains("already initialized") {
warn!("Vault already initialized but init.json not found");
return Err(anyhow::anyhow!("Vault initialized but credentials lost"));
// If we have a token from .env, check if Vault is already unsealed
// and we can continue (maybe it was manually unsealed)
if let Some(_token) = env_token {
info!("Found VAULT_TOKEN in .env, checking if Vault is unsealed...");
// Check Vault status
let status_check = std::process::Command::new("sh")
.arg("-c")
.arg(format!(
"unset VAULT_CLIENT_CERT VAULT_CLIENT_KEY VAULT_CACERT; VAULT_ADDR={} ./botserver-stack/bin/vault/vault status -format=json 2>/dev/null",
vault_addr
))
.output();
if let Ok(status_output) = status_check {
let status_str = String::from_utf8_lossy(&status_output.stdout);
if let Ok(status) = serde_json::from_str::<serde_json::Value>(&status_str) {
let sealed = status["sealed"].as_bool().unwrap_or(true);
if !sealed {
// Vault is unsealed! We can continue with the token from .env
warn!("Vault is already unsealed - continuing with existing token");
warn!("NOTE: Unseal key is lost - Vault will need manual unseal after restart");
return Ok(()); // Skip rest of setup, Vault is already working
}
}
}
// Vault is sealed but we don't have unseal key
error!("Vault is sealed and unseal key is lost (init.json missing)");
error!("Options:");
error!(" 1. If you have a backup of init.json, restore it to ./botserver-stack/conf/vault/init.json");
error!(" 2. To start fresh, delete ./botserver-stack/data/vault/ and restart");
return Err(anyhow::anyhow!(
"Vault is sealed but unseal key is lost. See error messages above for recovery options."
));
}
// No token in .env either
error!("Vault already initialized but credentials are lost");
error!("Options:");
error!(" 1. If you have a backup of init.json, restore it to ./botserver-stack/conf/vault/init.json");
error!(" 2. To start fresh, delete ./botserver-stack/data/vault/ and ./botserver-stack/conf/vault/init.json and restart");
return Err(anyhow::anyhow!(
"Vault initialized but credentials lost. See error messages above for recovery options."
));
}
return Err(anyhow::anyhow!("Vault init failed: {}", stderr));
}
@ -1803,8 +1872,6 @@ VAULT_CACHE_TTL=300
return Err(anyhow::anyhow!("Failed to create bucket {}: {}. Check S3 credentials and endpoint configuration", bucket, e));
}
}
} else {
trace!("Bucket {} already exists", bucket);
}
}
}
@ -1848,7 +1915,7 @@ VAULT_CACHE_TTL=300
if config_path.exists() {
match std::fs::read_to_string(&config_path) {
Ok(csv_content) => {
info!("Syncing config.csv from {:?}", config_path);
debug!("Syncing config.csv from {:?}", config_path);
if let Err(e) =
self.sync_config_csv_to_db(conn, &default_bot_id, &csv_content)
{
@ -1860,10 +1927,10 @@ VAULT_CACHE_TTL=300
}
}
} else {
warn!("No config.csv found at {:?}", config_path);
debug!("No config.csv found at {:?}", config_path);
}
} else {
warn!("default.gbai template not found");
debug!("default.gbai template not found");
}
Ok(())
@ -1918,7 +1985,6 @@ VAULT_CACHE_TTL=300
.bind::<diesel::sql_types::Text, _>(value)
.execute(conn) {
Ok(_) => {
trace!(" Synced config: {} = {}", key, if key.contains("pass") || key.contains("secret") || key.contains("key") { "***" } else { value });
synced += 1;
}
Err(e) => {
@ -1965,12 +2031,6 @@ VAULT_CACHE_TTL=300
}
key.push_str(&file_name);
if path.is_file() {
trace!(
"Uploading file {} to bucket {} with key {}",
path.display(),
bucket,
key
);
let content = tokio::fs::read(&path).await?;
client
.put_object()
@ -2218,7 +2278,6 @@ log_level = "info"
// Skip if certificate already exists
if cert_path.exists() && key_path.exists() {
trace!("Certificate for {} already exists", service);
continue;
}