Enhance bot memory and Redis guards
- Derive bot_id from BOT_GUID env var - Guard concurrent runs with Redis - Read CACHE_URL for Redis connection - Extend bot memory keyword to accept comma as separator - Increase LLM timeouts to 180s (local and legacy) - Update templates to use bot memory (GET_BOT_MEMORY/SET_BOT_MEMORY) - Fix start script path to announcements.gbai
This commit is contained in:
parent
4acb9bb8f5
commit
e5a9752caa
30 changed files with 270 additions and 19 deletions
168
docs/platform/guide/file.md
Normal file
168
docs/platform/guide/file.md
Normal file
|
|
@ -0,0 +1,168 @@
|
||||||
|
# File Upload Service with Actix Web and S3/MinIO
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
This service provides a REST API endpoint for uploading files to S3-compatible storage (including MinIO) using Actix Web. It handles multipart form data, temporarily stores files locally, and transfers them to object storage.
|
||||||
|
|
||||||
|
## BASIC Keywords Reference
|
||||||
|
|
||||||
|
- **UPLOAD**: Handles file uploads via multipart form data
|
||||||
|
- **CONFIG**: Manages S3/MinIO configuration and client initialization
|
||||||
|
- **TEMP**: Uses temporary files for processing uploads
|
||||||
|
- **CLIENT**: Maintains S3 client connection
|
||||||
|
- **ERROR**: Comprehensive error handling for upload failures
|
||||||
|
- **BUCKET**: Configures and uses S3 buckets for storage
|
||||||
|
- **PATH**: Manages folder paths for object organization
|
||||||
|
|
||||||
|
## API Reference
|
||||||
|
|
||||||
|
### POST `/files/upload/{folder_path}`
|
||||||
|
|
||||||
|
Uploads a file to the specified folder in S3/MinIO storage.
|
||||||
|
|
||||||
|
**Path Parameters:**
|
||||||
|
- `folder_path` (string): Target folder path in S3 bucket
|
||||||
|
|
||||||
|
**Request:**
|
||||||
|
- Content-Type: `multipart/form-data`
|
||||||
|
- Body: File data in multipart format
|
||||||
|
|
||||||
|
**Response:**
|
||||||
|
- `200 OK`: Upload successful
|
||||||
|
- `500 Internal Server Error`: Upload failed
|
||||||
|
|
||||||
|
**Example:**
|
||||||
|
```bash
|
||||||
|
curl -X POST \
|
||||||
|
http://localhost:8080/files/upload/documents \
|
||||||
|
-F "file=@report.pdf"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### DriveConfig Structure
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// Example configuration
|
||||||
|
let config = DriveConfig {
|
||||||
|
access_key: "your-access-key".to_string(),
|
||||||
|
secret_key: "your-secret-key".to_string(),
|
||||||
|
server: "minio.example.com:9000".to_string(),
|
||||||
|
s3_bucket: "my-bucket".to_string(),
|
||||||
|
use_ssl: false,
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
### Client Initialization
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use crate::config::DriveConfig;
|
||||||
|
|
||||||
|
// Initialize S3 client
|
||||||
|
let drive_config = DriveConfig {
|
||||||
|
access_key: "minioadmin".to_string(),
|
||||||
|
secret_key: "minioadmin".to_string(),
|
||||||
|
server: "localhost:9000".to_string(),
|
||||||
|
s3_bucket: "uploads".to_string(),
|
||||||
|
use_ssl: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
let s3_client = init_drive(&drive_config).await?;
|
||||||
|
```
|
||||||
|
|
||||||
|
## Implementation Guide
|
||||||
|
|
||||||
|
### 1. Setting Up AppState
|
||||||
|
|
||||||
|
```rust
|
||||||
|
use crate::shared::state::AppState;
|
||||||
|
|
||||||
|
// Configure application state with S3 client
|
||||||
|
let app_state = web::Data::new(AppState {
|
||||||
|
s3_client: Some(s3_client),
|
||||||
|
config: Some(drive_config),
|
||||||
|
// ... other state fields
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2. Error Handling Patterns
|
||||||
|
|
||||||
|
The service implements several error handling strategies:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// Configuration errors
|
||||||
|
let bucket_name = state.get_ref().config.as_ref()
|
||||||
|
.ok_or_else(|| actix_web::error::ErrorInternalServerError(
|
||||||
|
"S3 bucket configuration is missing"
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Client initialization errors
|
||||||
|
let s3_client = state.get_ref().s3_client.as_ref()
|
||||||
|
.ok_or_else(|| actix_web::error::ErrorInternalServerError(
|
||||||
|
"S3 client is not initialized"
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// File operation errors with cleanup
|
||||||
|
let mut temp_file = NamedTempFile::new().map_err(|e| {
|
||||||
|
actix_web::error::ErrorInternalServerError(format!(
|
||||||
|
"Failed to create temp file: {}", e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. File Processing Flow
|
||||||
|
|
||||||
|
```rust
|
||||||
|
// 1. Create temporary file
|
||||||
|
let mut temp_file = NamedTempFile::new()?;
|
||||||
|
|
||||||
|
// 2. Process multipart data
|
||||||
|
while let Some(mut field) = payload.try_next().await? {
|
||||||
|
// Extract filename from content disposition
|
||||||
|
if let Some(disposition) = field.content_disposition() {
|
||||||
|
file_name = disposition.get_filename().map(|s| s.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream data to temporary file
|
||||||
|
while let Some(chunk) = field.try_next().await? {
|
||||||
|
temp_file.write_all(&chunk)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Upload to S3
|
||||||
|
upload_to_s3(&s3_client, &bucket_name, &s3_key, &temp_file_path).await?;
|
||||||
|
|
||||||
|
// 4. Cleanup temporary file
|
||||||
|
let _ = std::fs::remove_file(&temp_file_path);
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Features
|
||||||
|
|
||||||
|
### Temporary File Management
|
||||||
|
- Uses `NamedTempFile` for secure temporary storage
|
||||||
|
- Automatic cleanup on both success and failure
|
||||||
|
- Efficient streaming of multipart data
|
||||||
|
|
||||||
|
### S3/MinIO Compatibility
|
||||||
|
- Path-style addressing for MinIO compatibility
|
||||||
|
- Configurable SSL/TLS
|
||||||
|
- Custom endpoint support
|
||||||
|
|
||||||
|
### Security Considerations
|
||||||
|
- Temporary files are automatically deleted
|
||||||
|
- No persistent storage of uploaded files on server
|
||||||
|
- Secure credential handling
|
||||||
|
|
||||||
|
## Error Scenarios
|
||||||
|
|
||||||
|
1. **Missing Configuration**: Returns 500 if S3 bucket or client not configured
|
||||||
|
2. **File System Errors**: Handles temp file creation/write failures
|
||||||
|
3. **Network Issues**: Manages S3 connection timeouts and errors
|
||||||
|
4. **Invalid Uploads**: Handles malformed multipart data
|
||||||
|
|
||||||
|
## Performance Notes
|
||||||
|
|
||||||
|
- Streams data directly from multipart to temporary file
|
||||||
|
- Uses async operations for I/O-bound tasks
|
||||||
|
- Minimal memory usage for large file uploads
|
||||||
|
- Efficient cleanup prevents disk space leaks
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
- Be pragmatic and concise with examples.
|
|
||||||
7
prompts/dev/platform/doc-guide-topic.md
Normal file
7
prompts/dev/platform/doc-guide-topic.md
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
- Be pragmatic and concise with examples.
|
||||||
|
- Create both guide like and API like if any.
|
||||||
|
- Use clear and consistent terminology.
|
||||||
|
- Ensure consistency in formatting and structure.
|
||||||
|
- Follow a logical flow and organization.
|
||||||
|
- Use consistent headings and subheadings.
|
||||||
|
- Make a relation to the BASIC keyword list.
|
||||||
|
|
@ -3,7 +3,8 @@ use crate::shared::models::{Automation, TriggerKind};
|
||||||
use crate::shared::state::AppState;
|
use crate::shared::state::AppState;
|
||||||
use chrono::{DateTime, Datelike, Timelike, Utc};
|
use chrono::{DateTime, Datelike, Timelike, Utc};
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use log::{error, info};
|
use log::{error, info, warn};
|
||||||
|
use std::env;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
|
|
@ -182,20 +183,68 @@ impl AutomationService {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_action(&self, param: &str) {
|
async fn execute_action(&self, param: &str) {
|
||||||
|
// Get bot_id early to use in Redis key
|
||||||
|
let bot_id_string = env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string());
|
||||||
|
let bot_id = Uuid::parse_str(&bot_id_string).unwrap_or_else(|_| Uuid::new_v4());
|
||||||
|
|
||||||
|
// Check if this job is already running for this bot
|
||||||
|
let is_bas_file = param.ends_with(".bas");
|
||||||
|
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||||
|
|
||||||
|
// Try to check if job is running using Redis
|
||||||
|
if let Some(redis_client) = &self.state.redis_client {
|
||||||
|
match redis_client.get_multiplexed_async_connection().await {
|
||||||
|
Ok(mut conn) => {
|
||||||
|
// Check if key exists
|
||||||
|
let is_running: Result<bool, redis::RedisError> = redis::cmd("EXISTS")
|
||||||
|
.arg(&redis_key)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Ok(true) = is_running {
|
||||||
|
if is_bas_file {
|
||||||
|
warn!(
|
||||||
|
"⚠️ Job '{}' is already running for bot '{}', skipping execution to allow only one .bas execution per bot",
|
||||||
|
param, bot_id
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"Job '{}' is already running for bot '{}', skipping execution",
|
||||||
|
param, bot_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark job as running (set with 300 second expiry as safety)
|
||||||
|
let _: Result<(), redis::RedisError> = redis::cmd("SETEX")
|
||||||
|
.arg(&redis_key)
|
||||||
|
.arg(300) // 5 minutes expiry
|
||||||
|
.arg("1")
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to connect to Redis for job tracking: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let full_path = Path::new(&self.scripts_dir).join(param);
|
let full_path = Path::new(&self.scripts_dir).join(param);
|
||||||
let script_content = match tokio::fs::read_to_string(&full_path).await {
|
let script_content = match tokio::fs::read_to_string(&full_path).await {
|
||||||
Ok(content) => content,
|
Ok(content) => content,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to read script {}: {}", full_path.display(), e);
|
error!("Failed to read script {}: {}", full_path.display(), e);
|
||||||
|
// Clean up running flag on error
|
||||||
|
self.cleanup_job_flag(&bot_id, param).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
info!("Executing action with param: {} for bot: {}", param, bot_id);
|
||||||
info!("Executing action with param: {}", param);
|
|
||||||
let user_session = crate::shared::models::UserSession {
|
let user_session = crate::shared::models::UserSession {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
user_id: Uuid::new_v4(),
|
user_id: Uuid::new_v4(),
|
||||||
bot_id: Uuid::new_v4(),
|
bot_id,
|
||||||
title: "Automation".to_string(),
|
title: "Automation".to_string(),
|
||||||
answer_mode: 0,
|
answer_mode: 0,
|
||||||
current_tool: None,
|
current_tool: None,
|
||||||
|
|
@ -221,5 +270,26 @@ impl AutomationService {
|
||||||
error!("Error executing script: {}", e);
|
error!("Error executing script: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up running flag after execution
|
||||||
|
self.cleanup_job_flag(&bot_id, param).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cleanup_job_flag(&self, bot_id: &Uuid, param: &str) {
|
||||||
|
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||||
|
|
||||||
|
if let Some(redis_client) = &self.state.redis_client {
|
||||||
|
match redis_client.get_multiplexed_async_connection().await {
|
||||||
|
Ok(mut conn) => {
|
||||||
|
let _: Result<(), redis::RedisError> = redis::cmd("DEL")
|
||||||
|
.arg(&redis_key)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to connect to Redis for cleanup: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,8 @@ pub fn set_bot_memory_keyword(state: Arc<AppState>, user: UserSession, engine: &
|
||||||
|
|
||||||
engine
|
engine
|
||||||
.register_custom_syntax(
|
.register_custom_syntax(
|
||||||
&["SET_BOT_MEMORY", "$expr$", "$expr$"],
|
&["SET_BOT_MEMORY", "$expr$", ",", "$expr$"],
|
||||||
true,
|
false,
|
||||||
move |context, inputs| {
|
move |context, inputs| {
|
||||||
let key = context.eval_expression_tree(&inputs[0])?.to_string();
|
let key = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||||
let value = context.eval_expression_tree(&inputs[1])?.to_string();
|
let value = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ pub fn llm_keyword(state: Arc<AppState>, _user: UserSession, engine: &mut Engine
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
match rx.recv_timeout(Duration::from_secs(60)) {
|
match rx.recv_timeout(Duration::from_secs(180)) {
|
||||||
Ok(Ok(result)) => Ok(Dynamic::from(result)),
|
Ok(Ok(result)) => Ok(Dynamic::from(result)),
|
||||||
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
|
Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime(
|
||||||
e.to_string().into(),
|
e.to_string().into(),
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
pub mod keywords;
|
pub mod keywords;
|
||||||
|
|
||||||
|
use self::keywords::bot_memory::{get_bot_memory_keyword, set_bot_memory_keyword};
|
||||||
use self::keywords::create_site::create_site_keyword;
|
use self::keywords::create_site::create_site_keyword;
|
||||||
use self::keywords::find::find_keyword;
|
use self::keywords::find::find_keyword;
|
||||||
use self::keywords::first::first_keyword;
|
use self::keywords::first::first_keyword;
|
||||||
|
|
@ -45,6 +46,8 @@ impl ScriptService {
|
||||||
#[cfg(feature = "email")]
|
#[cfg(feature = "email")]
|
||||||
create_draft_keyword(&state, user.clone(), &mut engine);
|
create_draft_keyword(&state, user.clone(), &mut engine);
|
||||||
|
|
||||||
|
set_bot_memory_keyword(state.clone(), user.clone(), &mut engine);
|
||||||
|
get_bot_memory_keyword(state.clone(), user.clone(), &mut engine);
|
||||||
create_site_keyword(&state, user.clone(), &mut engine);
|
create_site_keyword(&state, user.clone(), &mut engine);
|
||||||
find_keyword(&state, user.clone(), &mut engine);
|
find_keyword(&state, user.clone(), &mut engine);
|
||||||
for_keyword(&state, user.clone(), &mut engine);
|
for_keyword(&state, user.clone(), &mut engine);
|
||||||
|
|
@ -143,6 +146,8 @@ impl ScriptService {
|
||||||
"TALK",
|
"TALK",
|
||||||
"SET CONTEXT",
|
"SET CONTEXT",
|
||||||
"SET USER",
|
"SET USER",
|
||||||
|
"GET BOT MEMORY",
|
||||||
|
"SET BOT MEMORY",
|
||||||
];
|
];
|
||||||
|
|
||||||
let is_basic_command = basic_commands.iter().any(|&cmd| trimmed.starts_with(cmd));
|
let is_basic_command = basic_commands.iter().any(|&cmd| trimmed.starts_with(cmd));
|
||||||
|
|
|
||||||
|
|
@ -557,10 +557,10 @@ impl BotOrchestrator {
|
||||||
"Running start script for session: {} with token: {:?}",
|
"Running start script for session: {} with token: {:?}",
|
||||||
session.id, token
|
session.id, token
|
||||||
);
|
);
|
||||||
let start_script_path = "./templates/annoucements.gbai/annoucements.gbdialog/start.bas";
|
let start_script_path = "./templates/announcements.gbai/announcements.gbdialog/start.bas";
|
||||||
let start_script = match std::fs::read_to_string(start_script_path) {
|
let start_script = match std::fs::read_to_string(start_script_path) {
|
||||||
Ok(content) => content,
|
Ok(content) => content,
|
||||||
Err(_) => r#"TALK "Welcome to General Bots!""#.to_string(),
|
Err(_) => r#"TALK "Error loading script file.""#.to_string(),
|
||||||
};
|
};
|
||||||
debug!(
|
debug!(
|
||||||
"Start script content for session {}: {}",
|
"Start script content for session {}: {}",
|
||||||
|
|
|
||||||
|
|
@ -275,7 +275,7 @@ pub async fn chat_completions_local(
|
||||||
|
|
||||||
// Send request to llama.cpp server
|
// Send request to llama.cpp server
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.timeout(Duration::from_secs(120)) // 2 minute timeout
|
.timeout(Duration::from_secs(180)) // 2 minute timeout
|
||||||
.build()
|
.build()
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("Error creating HTTP client: {}", e);
|
error!("Error creating HTTP client: {}", e);
|
||||||
|
|
|
||||||
|
|
@ -94,7 +94,9 @@ async fn main() -> std::io::Result<()> {
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
// Redis client (optional)
|
// Redis client (optional)
|
||||||
// ----------------------------------------------------------------------
|
// ----------------------------------------------------------------------
|
||||||
let redis_client = match redis::Client::open("redis://127.0.0.1/") {
|
let cache_url = std::env::var("CACHE_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string());
|
||||||
|
|
||||||
|
let redis_client = match redis::Client::open(cache_url.as_str()) {
|
||||||
Ok(client) => {
|
Ok(client) => {
|
||||||
info!("Connected to Redis successfully");
|
info!("Connected to Redis successfully");
|
||||||
Some(Arc::new(client))
|
Some(Arc::new(client))
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
TALK "Olá, pode me perguntar sobre qualquer coisa..."
|
let resume = GET_BOT_MEMORY ("resume")
|
||||||
let text = GET "default.gbdrive/default.pdf"
|
|
||||||
let resume = LLM "Say Hello and present a a resume from " + text
|
|
||||||
TALK resume
|
TALK resume
|
||||||
|
|
||||||
|
let text = GET "default.gbdrive/default.pdf"
|
||||||
SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: " + text
|
SET_CONTEXT "Este é o documento que você deve usar para responder dúvidas: " + text
|
||||||
return true;
|
TALK "Olá, pode me perguntar sobre qualquer coisa desta circular..."
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
|
|
||||||
|
|
||||||
let text = GET "default.gbdrive/default.pdf"
|
let text = GET "default.gbdrive/default.pdf"
|
||||||
let resume = LLM "Build table resume with deadlines, dates and actions: " + text
|
let resume = LLM "Resume this document, in a table (DO NOT THINK) no_think: " + text
|
||||||
|
|
||||||
SET_BOT_MEMORY "resume" resume
|
SET_BOT_MEMORY "resume", resume
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue