- Trying to remove compilation errors and new method for LLM.

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-06 08:42:09 -03:00
parent 45ab675b21
commit 84d59b61da
42 changed files with 3690 additions and 401 deletions

36
prompts/dev/fix.md Normal file
View file

@ -0,0 +1,36 @@
You are fixing Rust code in a Cargo project. The user is providing problematic code that needs to be corrected.
## Your Task
Fix ALL compiler errors and logical issues while maintaining the original intent. Return the COMPLETE corrected files as a SINGLE .sh script that can be executed from project root.
Use Cargo.toml as reference, do not change it.
Only return input files, all other files already exists.
If something, need to be added to a external file, inform it separated.
## Critical Requirements
1. **Return as SINGLE .sh script** - Output must be a complete shell script using `cat > file << 'EOF'` pattern
2. **Include ALL files** - Every corrected file must be included in the script
3. **Respect Cargo.toml** - Check dependencies, editions, and features to avoid compiler errors
4. **Type safety** - Ensure all types match and trait bounds are satisfied
5. **Ownership rules** - Fix borrowing, ownership, and lifetime issues
## Output Format Requirements
You MUST return exactly this example format:
```sh
#!/bin/bash
# Restore fixed Rust project
cat > src/<filenamehere>.rs << 'EOF'
use std::io;
// test
cat > src/<anotherfile>.rs << 'EOF'
// Fixed library code
pub fn add(a: i32, b: i32) -> i32 {
a + b
}
EOF
----

View file

@ -1,3 +1,4 @@
* Preffer imports than using :: to call methods,
* Output a single `.sh` script using `cat` so it can be restored directly.
* No placeholders, only real, production-ready code.
* No comments, no explanations, no extra text.

2625
scripts/dev/llm_context.txt Normal file

File diff suppressed because it is too large Load diff

35
scripts/dev/llm_fix.sh Executable file
View file

@ -0,0 +1,35 @@
#!/bin/bash
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
OUTPUT_FILE="$SCRIPT_DIR/llm_context.txt"
echo "Consolidated LLM Context" > "$OUTPUT_FILE"
prompts=(
"../../prompts/dev/general.md"
"../../Cargo.toml"
"../../prompts/dev/fix.md"
)
for file in "${prompts[@]}"; do
cat "$file" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
done
dirs=(
"src/shared"
"src/bot"
"src/session"
"src/tools"
)
for dir in "${dirs[@]}"; do
find "$PROJECT_ROOT/$dir" -name "*.rs" | while read file; do
cat "$file" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
done
done
cd "$PROJECT_ROOT"
tree -P '*.rs' -I 'target|*.lock' --prune | grep -v '[0-9] directories$' >> "$OUTPUT_FILE"

View file

@ -0,0 +1,2 @@
# apt install tree
tree -P '*.rs' -I 'target|*.lock' --prune | grep -v '[0-9] directories$'

View file

@ -2,19 +2,19 @@ use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
Argon2,
};
use redis::aio::Connection as ConnectionManager;
use redis::AsyncCommands;
use sqlx::PgPool;
use redis::Client;
use sqlx::{PgPool, Row}; // <-- required for .get()
use std::sync::Arc;
use uuid::Uuid;
pub struct AuthService {
pub pool: PgPool,
pub redis: Option<Arc<ConnectionManager>>,
pub redis: Option<Arc<Client>>,
}
impl AuthService {
pub fn new(pool: PgPool, redis: Option<Arc<ConnectionManager>>) -> Self {
#[allow(clippy::new_without_default)]
pub fn new(pool: PgPool, redis: Option<Arc<Client>>) -> Self {
Self { pool, redis }
}
@ -23,17 +23,6 @@ impl AuthService {
username: &str,
password: &str,
) -> Result<Option<Uuid>, Box<dyn std::error::Error>> {
// Try Redis cache first
if let Some(redis) = &self.redis {
let cache_key = format!("auth:user:{}", username);
if let Ok(user_id_str) = redis.clone().get::<_, String>(cache_key).await {
if let Ok(user_id) = Uuid::parse_str(&user_id_str) {
return Ok(Some(user_id));
}
}
}
// Fallback to database
let user = sqlx::query(
"SELECT id, password_hash FROM users WHERE username = $1 AND is_active = true",
)
@ -44,22 +33,17 @@ impl AuthService {
if let Some(row) = user {
let user_id: Uuid = row.get("id");
let password_hash: String = row.get("password_hash");
let parsed_hash = PasswordHash::new(&password_hash)?;
if Argon2::default()
.verify_password(password.as_bytes(), &parsed_hash)
.is_ok()
{
// Cache in Redis
if let Some(redis) = &self.redis {
let cache_key = format!("auth:user:{}", username);
let _: () = redis
.clone()
.set_ex(cache_key, user_id.to_string(), 3600)
.await?;
if let Ok(parsed_hash) = PasswordHash::new(&password_hash) {
if Argon2::default()
.verify_password(password.as_bytes(), &parsed_hash)
.is_ok()
{
return Ok(Some(user_id));
}
return Ok(Some(user_id));
}
}
Ok(None)
}
@ -71,20 +55,83 @@ impl AuthService {
) -> Result<Uuid, Box<dyn std::error::Error>> {
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
let password_hash = argon2
.hash_password(password.as_bytes(), &salt)?
.to_string();
let password_hash = match argon2.hash_password(password.as_bytes(), &salt) {
Ok(ph) => ph.to_string(),
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
)))
}
};
let user_id = sqlx::query(
let row = sqlx::query(
"INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id",
)
.bind(username)
.bind(email)
.bind(password_hash)
.bind(&password_hash)
.fetch_one(&self.pool)
.await?
.get("id");
.await?;
Ok(user_id)
Ok(row.get::<Uuid, _>("id"))
}
pub async fn delete_user_cache(
&self,
username: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("auth:user:{}", username);
let _: () = redis::Cmd::del(&cache_key).query_async(&mut conn).await?;
}
Ok(())
}
pub async fn update_user_password(
&self,
user_id: Uuid,
new_password: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
let password_hash = match argon2.hash_password(new_password.as_bytes(), &salt) {
Ok(ph) => ph.to_string(),
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
)))
}
};
sqlx::query("UPDATE users SET password_hash = $1, updated_at = NOW() WHERE id = $2")
.bind(&password_hash)
.bind(user_id)
.execute(&self.pool)
.await?;
if let Some(user_row) = sqlx::query("SELECT username FROM users WHERE id = $1")
.bind(user_id)
.fetch_optional(&self.pool)
.await?
{
let username: String = user_row.get("username");
self.delete_user_cache(&username).await?;
}
Ok(())
}
}
{{END_REWRITTEN_CODE}}
impl Clone for AuthService {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
redis: self.redis.clone(),
}
}
}

View file

@ -1,6 +1,6 @@
use crate::models::automation_model::{Automation, TriggerKind};
use crate::basic::ScriptService;
use crate::state::AppState;
use crate::shared::models::automation_model::{Automation, TriggerKind};
use crate::shared::state::AppState;
use chrono::Datelike;
use chrono::Timelike;
use chrono::{DateTime, Utc};
@ -128,7 +128,6 @@ impl AutomationService {
)
.execute(pool)
.await
{
error!(
"Failed to update last_triggered for automation {}: {}",

View file

@ -1,6 +1,6 @@
use crate::email::save_email_draft;
use crate::email::{fetch_latest_sent_to, SaveDraftRequest};
use crate::state::AppState;
use crate::shared::state::AppState;
use rhai::Dynamic;
use rhai::Engine;

View file

@ -7,8 +7,8 @@ use std::fs;
use std::io::Read;
use std::path::PathBuf;
use crate::state::AppState;
use crate::utils;
use crate::shared::state::AppState;
use crate::shared::utils;
pub fn create_site_keyword(state: &AppState, engine: &mut Engine) {
let state_clone = state.clone();

View file

@ -4,10 +4,10 @@ use rhai::Engine;
use serde_json::{json, Value};
use sqlx::PgPool;
use crate::state::AppState;
use crate::utils;
use crate::utils::row_to_json;
use crate::utils::to_array;
use crate::shared::state::AppState;
use crate::shared::utils;
use crate::shared::utils::row_to_json;
use crate::shared::utils::to_array;
pub fn find_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone();

View file

@ -1,4 +1,4 @@
use crate::state::AppState;
use crate::shared::state::AppState;
use log::info;
use rhai::Dynamic;
use rhai::Engine;

View file

@ -1,6 +1,6 @@
use log::info;
use crate::state::AppState;
use crate::shared::state::AppState;
use reqwest::{self, Client};
use rhai::{Dynamic, Engine};
use scraper::{Html, Selector};

View file

@ -1,6 +1,6 @@
use log::info;
use crate::{state::AppState, utils::call_llm};
use crate::{shared::state::AppState, utils::call_llm};
use rhai::{Dynamic, Engine};
pub fn llm_keyword(state: &AppState, engine: &mut Engine) {

View file

@ -4,8 +4,8 @@ use rhai::Engine;
use serde_json::{json, Value};
use sqlx::PgPool;
use crate::models::automation_model::TriggerKind;
use crate::state::AppState;
use crate::shared::models::automation_model::TriggerKind;
use crate::shared::state::AppState;
pub fn on_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone();

View file

@ -2,7 +2,7 @@ use log::info;
use rhai::Dynamic;
use rhai::Engine;
use crate::state::AppState;
use crate::shared::state::AppState;
pub fn print_keyword(_state: &AppState, engine: &mut Engine) {
// PRINT command

View file

@ -5,8 +5,8 @@ use serde_json::{json, Value};
use sqlx::PgPool;
use std::error::Error;
use crate::state::AppState;
use crate::utils;
use crate::shared::state::AppState;
use crate::shared::utils;
pub fn set_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone();

View file

@ -4,8 +4,8 @@ use rhai::Engine;
use serde_json::{json, Value};
use sqlx::PgPool;
use crate::models::automation_model::TriggerKind;
use crate::state::AppState;
use crate::shared::models::automation_model::TriggerKind;
use crate::shared::state::AppState;
pub fn set_schedule_keyword(state: &AppState, engine: &mut Engine) {
let db = state.db_custom.clone();

View file

@ -1,4 +1,4 @@
use crate::state::AppState;
use crate::shared::state::AppState;
use log::info;
use rhai::{Dynamic, Engine};
use std::thread;

View file

@ -1,8 +1,8 @@
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage;
use chrono::Utc;
use langchain_rust::{
chain::{Chain, LLMChain},
embedding::openai::openai_embedder::OpenAiEmbedder,
llm::openai::OpenAI,
memory::SimpleMemory,
prompt_args,
@ -11,19 +11,23 @@ use langchain_rust::{
vectorstore::{VecStoreOptions, VectorStore},
};
use log::info;
use serde_json;
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
use services::auth::AuthService;
use services::channels::{ChannelAdapter, VoiceAdapter, WebChannelAdapter};
use services::chart::ChartGenerator;
use services::llm::{AnthropicClient, LLMProvider, MockLLMProvider, OpenAIClient};
use services::session::SessionManager;
use services::tools::ToolManager;
use services::whatsapp::WhatsAppAdapter;
use crate::{
auth::AuthService,
channels::{ChannelAdapter, VoiceAdapter, WebChannelAdapter},
chart::ChartGenerator,
llm::LLMProvider,
session::SessionManager,
shared::{BotResponse, UserMessage, UserSession},
tools::ToolManager,
whatsapp::WhatsAppAdapter,
};
pub struct BotOrchestrator {
session_manager: SessionManager,
@ -33,18 +37,18 @@ pub struct BotOrchestrator {
channels: HashMap<String, Arc<dyn ChannelAdapter>>,
response_channels: Arc<Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
chart_generator: Option<Arc<ChartGenerator>>,
vector_store: Option<Arc<LangChainQdrant<OpenAiEmbedder>>>,
vector_store: Option<Arc<LangChainQdrant>>,
sql_chain: Option<Arc<LLMChain>>,
}
impl BotOrchestrator {
fn new(
pub fn new(
session_manager: SessionManager,
tool_manager: ToolManager,
llm_provider: Arc<dyn LLMProvider>,
auth_service: AuthService,
chart_generator: Option<Arc<ChartGenerator>>,
vector_store: Option<Arc<LangChainQdrant<OpenAiEmbedder>>>,
vector_store: Option<Arc<LangChainQdrant>>,
sql_chain: Option<Arc<LLMChain>>,
) -> Self {
Self {
@ -60,11 +64,11 @@ impl BotOrchestrator {
}
}
fn add_channel(&mut self, channel_type: &str, adapter: Arc<dyn ChannelAdapter>) {
pub fn add_channel(&mut self, channel_type: &str, adapter: Arc<dyn ChannelAdapter>) {
self.channels.insert(channel_type.to_string(), adapter);
}
async fn register_response_channel(
pub async fn register_response_channel(
&self,
session_id: String,
sender: mpsc::Sender<BotResponse>,
@ -75,22 +79,22 @@ impl BotOrchestrator {
.insert(session_id, sender);
}
async fn set_user_answer_mode(
pub async fn set_user_answer_mode(
&self,
user_id: &str,
bot_id: &str,
mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.session_manager
.update_answer_mode(user_id, bot_id, mode)
.await?;
Ok(())
}
async fn process_message(
pub async fn process_message(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Processing message from channel: {}, user: {}",
message.channel, message.user_id
@ -113,7 +117,6 @@ impl BotOrchestrator {
}
};
// Check if we're in tool mode and there's an active tool
if session.answer_mode == "tool" && session.current_tool.is_some() {
self.tool_manager
.provide_user_response(&message.user_id, &message.bot_id, message.content.clone())
@ -131,7 +134,6 @@ impl BotOrchestrator {
)
.await?;
// Handle different answer modes with LangChain integration
let response_content = match session.answer_mode.as_str() {
"document" => self.document_mode_handler(&message, &session).await?,
"chart" => self.chart_mode_handler(&message, &session).await?,
@ -166,7 +168,7 @@ impl BotOrchestrator {
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if let Some(vector_store) = &self.vector_store {
let similar_docs = vector_store
.similarity_search(&message.content, 3, &VecStoreOptions::default())
@ -196,13 +198,12 @@ impl BotOrchestrator {
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if let Some(chart_generator) = &self.chart_generator {
let chart_response = chart_generator
.generate_chart(&message.content, "bar")
.await?;
// Store chart generation in history
self.session_manager
.save_message(
session.id,
@ -218,7 +219,6 @@ impl BotOrchestrator {
chart_response.sql_query
))
} else {
// Fallback to document mode
self.document_mode_handler(message, session).await
}
}
@ -226,8 +226,8 @@ impl BotOrchestrator {
async fn database_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
_session: &UserSession,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if let Some(sql_chain) = &self.sql_chain {
let input_variables = prompt_args! {
"input" => message.content,
@ -236,7 +236,6 @@ impl BotOrchestrator {
let result = sql_chain.invoke(input_variables).await?;
Ok(result.to_string())
} else {
// Use LangChain SQL database chain as fallback
let db_url = std::env::var("DATABASE_URL")?;
let engine = PostgreSQLEngine::new(&db_url).await?;
let db = SQLDatabaseBuilder::new(engine).build().await?;
@ -258,11 +257,10 @@ impl BotOrchestrator {
async fn tool_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
// Check if we should start a tool
_session: &UserSession,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if message.content.to_lowercase().contains("calculator") {
if let Some(adapter) = self.channels.get(&message.channel) {
if let Some(_adapter) = self.channels.get(&message.channel) {
let (tx, _rx) = mpsc::channel(100);
self.register_response_channel(message.session_id.clone(), tx.clone())
@ -274,17 +272,19 @@ impl BotOrchestrator {
let session_manager = self.session_manager.clone();
tokio::spawn(async move {
tool_manager
.execute_tool("calculator", &user_id_str, &bot_id_str, session_manager, tx)
.await
.unwrap_or_else(|e| {
log::error!("Error executing tool: {}", e);
});
let _ = tool_manager
.execute_tool_with_session(
"calculator",
&user_id_str,
&bot_id_str,
session_manager,
tx,
)
.await;
});
}
Ok("Starting calculator tool...".to_string())
} else {
// Fall back to normal LLM response with tool context
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
@ -304,8 +304,7 @@ impl BotOrchestrator {
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
// Get conversation history for context using LangChain memory
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let history = self
.session_manager
.get_conversation_history(session.id, session.user_id)
@ -320,7 +319,6 @@ impl BotOrchestrator {
}
}
// Build prompt with memory context
let mut prompt = String::new();
if let Some(chat_history) = memory.get_chat_history() {
for message in chat_history {
@ -338,11 +336,11 @@ impl BotOrchestrator {
.await
}
async fn stream_response(
pub async fn stream_response(
&self,
message: UserMessage,
mut response_tx: mpsc::Sender<BotResponse>,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Streaming response for user: {}", message.user_id);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
@ -379,7 +377,6 @@ impl BotOrchestrator {
)
.await?;
// Get conversation history for streaming context
let history = self
.session_manager
.get_conversation_history(session.id, user_id)
@ -455,18 +452,18 @@ impl BotOrchestrator {
Ok(())
}
async fn get_user_sessions(
pub async fn get_user_sessions(
&self,
user_id: Uuid,
) -> Result<Vec<UserSession>, Box<dyn std::error::Error>> {
) -> Result<Vec<UserSession>, Box<dyn std::error::Error + Send + Sync>> {
self.session_manager.get_user_sessions(user_id).await
}
async fn get_conversation_history(
pub async fn get_conversation_history(
&self,
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
self.session_manager
.get_conversation_history(session_id, user_id)
.await
@ -475,7 +472,7 @@ impl BotOrchestrator {
pub async fn process_message_with_tools(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Processing message with tools from user: {}",
message.user_id
@ -508,7 +505,6 @@ impl BotOrchestrator {
)
.await?;
// Check if we're in a tool conversation
let is_tool_waiting = self
.tool_manager
.is_tool_waiting(&message.session_id)
@ -516,12 +512,10 @@ impl BotOrchestrator {
.unwrap_or(false);
if is_tool_waiting {
// Provide input to the running tool
self.tool_manager
.provide_input(&message.session_id, &message.content)
.await?;
// Get tool output and send it as a response
if let Ok(tool_output) = self.tool_manager.get_tool_output(&message.session_id).await {
for output in tool_output {
let bot_response = BotResponse {
@ -543,29 +537,16 @@ impl BotOrchestrator {
return Ok(());
}
// Normal LLM processing with tool awareness
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
} else {
String::new()
};
let full_prompt = format!("{}{}", message.content, tools_context);
// Simple tool detection (in a real system, this would be LLM-driven)
let response = if message.content.to_lowercase().contains("calculator")
|| message.content.to_lowercase().contains("calculate")
|| message.content.to_lowercase().contains("math")
{
// Start calculator tool
match self
.tool_manager
.execute_tool("calculator", &message.session_id, &message.user_id)
.await
{
Ok(tool_result) => {
// Save tool start message
self.session_manager
.save_message(
session.id,
@ -583,7 +564,15 @@ impl BotOrchestrator {
}
}
} else {
// Normal LLM response
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
} else {
String::new()
};
let full_prompt = format!("{}{}", message.content, tools_context);
self.llm_provider
.generate(&full_prompt, &serde_json::Value::Null)
.await?
@ -612,18 +601,11 @@ impl BotOrchestrator {
}
}
struct AppState {
orchestrator: Arc<BotOrchestrator>,
web_adapter: Arc<WebChannelAdapter>,
voice_adapter: Arc<VoiceAdapter>,
whatsapp_adapter: Arc<WhatsAppAdapter>,
}
#[actix_web::get("/ws")]
async fn websocket_handler(
req: HttpRequest,
stream: web::Payload,
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
let session_id = Uuid::new_v4().to_string();
@ -653,7 +635,7 @@ async fn websocket_handler(
actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = msg_stream.recv().await {
match msg {
Message::Text(text) => {
WsMessage::Text(text) => {
let user_message = UserMessage {
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
@ -669,7 +651,7 @@ async fn websocket_handler(
info!("Error processing message: {}", e);
}
}
Message::Close(_) => {
WsMessage::Close(_) => {
web_adapter.remove_connection(&session_id).await;
break;
}
@ -683,7 +665,7 @@ async fn websocket_handler(
#[actix_web::get("/api/whatsapp/webhook")]
async fn whatsapp_webhook_verify(
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
web::Query(params): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse> {
let mode = params.get("hub.mode").unwrap_or(&"".to_string());
@ -698,8 +680,8 @@ async fn whatsapp_webhook_verify(
#[actix_web::post("/api/whatsapp/webhook")]
async fn whatsapp_webhook(
data: web::Data<AppState>,
payload: web::Json<services::whatsapp::WhatsAppMessage>,
data: web::Data<crate::shared::AppState>,
payload: web::Json<crate::whatsapp::WhatsAppMessage>,
) -> Result<HttpResponse> {
match data
.whatsapp_adapter
@ -723,7 +705,7 @@ async fn whatsapp_webhook(
#[actix_web::post("/api/voice/start")]
async fn voice_start(
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
@ -752,7 +734,7 @@ async fn voice_start(
#[actix_web::post("/api/voice/stop")]
async fn voice_stop(
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
@ -770,7 +752,7 @@ async fn voice_stop(
}
#[actix_web::post("/api/sessions")]
async fn create_session(data: web::Data<AppState>) -> Result<HttpResponse> {
async fn create_session(_data: web::Data<crate::shared::AppState>) -> Result<HttpResponse> {
let session_id = Uuid::new_v4();
Ok(HttpResponse::Ok().json(serde_json::json!({
"session_id": session_id,
@ -780,7 +762,7 @@ async fn create_session(data: web::Data<AppState>) -> Result<HttpResponse> {
}
#[actix_web::get("/api/sessions")]
async fn get_sessions(data: web::Data<AppState>) -> Result<HttpResponse> {
async fn get_sessions(data: web::Data<crate::shared::AppState>) -> Result<HttpResponse> {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match data.orchestrator.get_user_sessions(user_id).await {
Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)),
@ -793,24 +775,22 @@ async fn get_sessions(data: web::Data<AppState>) -> Result<HttpResponse> {
#[actix_web::get("/api/sessions/{session_id}")]
async fn get_session_history(
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
path: web::Path<String>,
) -> Result<HttpResponse> {
let session_id = path.into_inner();
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => {
match data
.orchestrator
.get_conversation_history(session_uuid, user_id)
.await
{
Ok(history) => Ok(HttpResponse::Ok().json(history)),
Err(e) => Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()}))),
}
}
Ok(session_uuid) => match data
.orchestrator
.get_conversation_history(session_uuid, user_id)
.await
{
Ok(history) => Ok(HttpResponse::Ok().json(history)),
Err(e) => Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()}))),
},
Err(_) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"})))
}
@ -819,7 +799,7 @@ async fn get_session_history(
#[actix_web::post("/api/set_mode")]
async fn set_mode_handler(
data: web::Data<AppState>,
data: web::Data<crate::shared::AppState>,
info: web::Json<HashMap<String, String>>,
) -> Result<HttpResponse> {
let default_user = "default_user".to_string();
@ -846,7 +826,7 @@ async fn set_mode_handler(
#[actix_web::get("/")]
async fn index() -> Result<HttpResponse> {
let html = fs::read_to_string("templates/index.html")
.unwrap_or_else(|_| include_str!("../templates/index.html").to_string());
.unwrap_or_else(|_| include_str!("../../static/index.html").to_string());
Ok(HttpResponse::Ok().content_type("text/html").body(html))
}

View file

@ -1,5 +1,3 @@
pub mod channels;
use async_trait::async_trait;
use chrono::Utc;
use livekit::{DataPacketKind, Room, RoomOptions};

View file

@ -15,7 +15,7 @@ use minio::s3::http::BaseUrl;
use std::str::FromStr;
use crate::config::AppConfig;
use crate::state::AppState;
use crate::shared::state::AppState;
pub async fn init_minio(config: &AppConfig) -> Result<MinioClient, minio::s3::error::Error> {
let scheme = if config.minio.use_ssl {

View file

@ -78,7 +78,6 @@ async fn main() -> std::io::Result<()> {
.expect("Failed to create Redis client");
let conn = client
.get_connection()
.await
.expect("Failed to create Redis connection");
Some(Arc::new(conn))
}
@ -93,17 +92,17 @@ async fn main() -> std::io::Result<()> {
let session_manager = SessionManager::new(db.clone(), redis_conn.clone());
let auth_service = AuthService::new(db.clone(), redis_conn.clone());
let llm_provider: Arc<dyn crate::llm_local::LLMProvider> = match std::env::var("LLM_PROVIDER")
let llm_provider: Arc<dyn crate::llm::LLMProvider> = match std::env::var("LLM_PROVIDER")
.unwrap_or("mock".to_string())
.as_str()
{
"openai" => Arc::new(crate::llm_local::OpenAIClient::new(
"openai" => Arc::new(crate::llm::OpenAIClient::new(
std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY required"),
)),
"anthropic" => Arc::new(crate::llm_local::AnthropicClient::new(
"anthropic" => Arc::new(crate::llm::AnthropicClient::new(
std::env::var("ANTHROPIC_API_KEY").expect("ANTHROPIC_API_KEY required"),
)),
_ => Arc::new(crate::llm_local::MockLLMProvider::new()),
_ => Arc::new(crate::llm::MockLLMProvider::new()),
};
let web_adapter = Arc::new(crate::channels::WebChannelAdapter::new());

View file

@ -1,4 +1,4 @@
use actix_web::{web, HttpResponse, Result};
use actix_web::{put, web, HttpResponse, Result};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

View file

@ -1,19 +1,18 @@
use crate::shared::shared::UserSession;
use sqlx::Row;
use redis::{aio::Connection as ConnectionManager, AsyncCommands};
use redis::{AsyncCommands, Client};
use serde_json;
use sqlx::PgPool;
use sqlx::{PgPool, Row};
use std::sync::Arc;
use uuid::Uuid;
use crate::shared::UserSession;
pub struct SessionManager {
pub pool: PgPool,
pub redis: Option<Arc<ConnectionManager>>,
pub redis: Option<Arc<Client>>,
}
impl SessionManager {
pub fn new(pool: PgPool, redis: Option<Arc<ConnectionManager>>) -> Self {
pub fn new(pool: PgPool, redis: Option<Arc<Client>>) -> Self {
Self { pool, redis }
}
@ -21,33 +20,31 @@ impl SessionManager {
&self,
user_id: Uuid,
bot_id: Uuid,
) -> Result<Option<UserSession>, Box<dyn std::error::Error>> {
// Try Redis cache first
if let Some(redis) = &self.redis {
) -> Result<Option<UserSession>, Box<dyn std::error::Error + Send + Sync>> {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_id, bot_id);
let mut conn = redis.clone().lock().await;
if let Ok(session_json) = conn.get::<_, String>(cache_key).await {
if let Ok(session) = serde_json::from_str::<UserSession>(&session_json) {
let session_json: Option<String> = conn.get(&cache_key).await?;
if let Some(json) = session_json {
if let Ok(session) = serde_json::from_str::<UserSession>(&json) {
return Ok(Some(session));
}
}
}
// Fallback to database
let session = sqlx::query_as(
"SELECT * FROM user_sessions WHERE user_id = $1 AND bot_id = $2 ORDER BY updated_at DESC LIMIT 1"
let session = sqlx::query_as::<_, UserSession>(
"SELECT * FROM user_sessions WHERE user_id = $1 AND bot_id = $2 ORDER BY updated_at DESC LIMIT 1",
)
.bind(user_id)
.bind(bot_id)
.fetch_optional(&self.pool)
.await?;
// Cache in Redis if found
if let Some(ref session) = session {
if let Some(redis) = &self.redis {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_id, bot_id);
let session_json = serde_json::to_string(session)?;
let mut conn = redis.clone().lock().await;
let _: () = conn.set_ex(cache_key, session_json, 1800).await?;
}
}
@ -60,13 +57,8 @@ impl SessionManager {
user_id: Uuid,
bot_id: Uuid,
title: &str,
) -> Result<UserSession, Box<dyn std::error::Error>> {
log::info!(
"Creating new session for user: {}, bot: {}",
user_id,
bot_id
);
let session = sqlx::query_as(
) -> Result<UserSession, Box<dyn std::error::Error + Send + Sync>> {
let session = sqlx::query_as::<_, UserSession>(
"INSERT INTO user_sessions (user_id, bot_id, title) VALUES ($1, $2, $3) RETURNING *",
)
.bind(user_id)
@ -75,11 +67,11 @@ impl SessionManager {
.fetch_one(&self.pool)
.await?;
// Cache in Redis
if let Some(redis) = &self.redis {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_id, bot_id);
let session_json = serde_json::to_string(&session)?;
let _: () = redis.clone().set_ex(cache_key, session_json, 1800).await?;
let _: () = conn.set_ex(cache_key, session_json, 1800).await?;
}
Ok(session)
@ -92,8 +84,8 @@ impl SessionManager {
role: &str,
content: &str,
message_type: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let message_count: i32 =
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let message_count: i64 =
sqlx::query("SELECT COUNT(*) as count FROM message_history WHERE session_id = $1")
.bind(session_id)
.fetch_one(&self.pool)
@ -102,12 +94,12 @@ impl SessionManager {
sqlx::query(
"INSERT INTO message_history (session_id, user_id, role, content_encrypted, message_type, message_index)
VALUES ($1, $2, $3, $4, $5, $6)"
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(session_id)
.bind(user_id)
.bind(role)
.bind(content) // Note: Encryption removed for simplicity
.bind(content)
.bind(message_type)
.bind(message_count + 1)
.execute(&self.pool)
@ -118,17 +110,18 @@ impl SessionManager {
.execute(&self.pool)
.await?;
// Invalidate session cache
if let Some(redis) = &self.redis {
let session: Option<(Uuid, Uuid)> =
sqlx::query_as("SELECT user_id, bot_id FROM user_sessions WHERE id = $1")
if let Some(redis_client) = &self.redis {
if let Some(session_info) =
sqlx::query("SELECT user_id, bot_id FROM user_sessions WHERE id = $1")
.bind(session_id)
.fetch_optional(&self.pool)
.await?;
if let Some((user_id, bot_id)) = session {
.await?
{
let user_id: Uuid = session_info.get("user_id");
let bot_id: Uuid = session_info.get("bot_id");
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_id, bot_id);
let _: () = redis.clone().del(cache_key).await?;
let _: () = conn.del(cache_key).await?;
}
}
@ -139,7 +132,7 @@ impl SessionManager {
&self,
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
let messages = sqlx::query(
"SELECT role, content_encrypted FROM message_history
WHERE session_id = $1 AND user_id = $2
@ -150,21 +143,19 @@ impl SessionManager {
.fetch_all(&self.pool)
.await?;
let mut history = Vec::new();
for row in messages {
let role: String = row.get("role");
let content: String = row.get("content_encrypted"); // No decryption needed
history.push((role, content));
}
let history = messages
.into_iter()
.map(|row| (row.get("role"), row.get("content_encrypted")))
.collect();
Ok(history)
}
pub async fn get_user_sessions(
&self,
user_id: Uuid,
) -> Result<Vec<UserSession>, Box<dyn std::error::Error>> {
// For lists, we'll just use database
let sessions = sqlx::query_as(
) -> Result<Vec<UserSession>, Box<dyn std::error::Error + Send + Sync>> {
let sessions = sqlx::query_as::<_, UserSession>(
"SELECT * FROM user_sessions WHERE user_id = $1 ORDER BY updated_at DESC",
)
.bind(user_id)
@ -172,4 +163,146 @@ impl SessionManager {
.await?;
Ok(sessions)
}
pub async fn update_answer_mode(
&self,
user_id: &str,
bot_id: &str,
mode: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let user_uuid = Uuid::parse_str(user_id)?;
let bot_uuid = Uuid::parse_str(bot_id)?;
sqlx::query(
"UPDATE user_sessions
SET answer_mode = $1, updated_at = NOW()
WHERE user_id = $2 AND bot_id = $3",
)
.bind(mode)
.bind(user_uuid)
.bind(bot_uuid)
.execute(&self.pool)
.await?;
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_uuid, bot_uuid);
let _: () = conn.del(cache_key).await?;
}
Ok(())
}
pub async fn update_current_tool(
&self,
user_id: &str,
bot_id: &str,
tool_name: Option<&str>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let user_uuid = Uuid::parse_str(user_id)?;
let bot_uuid = Uuid::parse_str(bot_id)?;
sqlx::query(
"UPDATE user_sessions
SET current_tool = $1, updated_at = NOW()
WHERE user_id = $2 AND bot_id = $3",
)
.bind(tool_name)
.bind(user_uuid)
.bind(bot_uuid)
.execute(&self.pool)
.await?;
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_uuid, bot_uuid);
let _: () = conn.del(cache_key).await?;
}
Ok(())
}
pub async fn get_session_by_id(
&self,
session_id: Uuid,
) -> Result<Option<UserSession>, Box<dyn std::error::Error + Send + Sync>> {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session_by_id:{}", session_id);
let session_json: Option<String> = conn.get(&cache_key).await?;
if let Some(json) = session_json {
if let Ok(session) = serde_json::from_str::<UserSession>(&json) {
return Ok(Some(session));
}
}
}
let session = sqlx::query_as::<_, UserSession>("SELECT * FROM user_sessions WHERE id = $1")
.bind(session_id)
.fetch_optional(&self.pool)
.await?;
if let Some(ref session) = session {
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session_by_id:{}", session_id);
let session_json = serde_json::to_string(session)?;
let _: () = conn.set_ex(cache_key, session_json, 1800).await?;
}
}
Ok(session)
}
pub async fn cleanup_old_sessions(
&self,
days_old: i32,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let result = sqlx::query(
"DELETE FROM user_sessions
WHERE updated_at < NOW() - INTERVAL '1 day' * $1",
)
.bind(days_old)
.execute(&self.pool)
.await?;
Ok(result.rows_affected())
}
pub async fn set_current_tool(
&self,
user_id: &str,
bot_id: &str,
tool_name: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let user_uuid = Uuid::parse_str(user_id)?;
let bot_uuid = Uuid::parse_str(bot_id)?;
sqlx::query(
"UPDATE user_sessions
SET current_tool = $1, updated_at = NOW()
WHERE user_id = $2 AND bot_id = $3",
)
.bind(tool_name)
.bind(user_uuid)
.bind(bot_uuid)
.execute(&self.pool)
.await?;
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_multiplexed_async_connection().await?;
let cache_key = format!("session:{}:{}", user_uuid, bot_uuid);
let _: () = conn.del(cache_key).await?;
}
Ok(())
}
}
impl Clone for SessionManager {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
redis: self.redis.clone(),
}
}
}

View file

@ -2,61 +2,6 @@ pub mod models;
pub mod state;
pub mod utils;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct UserSession {
pub id: Uuid,
pub user_id: Uuid,
pub bot_id: Uuid,
pub title: String,
pub context_data: serde_json::Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingRequest {
pub text: String,
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingResponse {
pub embedding: Vec<f32>,
pub model: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResult {
pub text: String,
pub similarity: f32,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserMessage {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub media_url: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BotResponse {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub stream_token: Option<String>,
pub is_complete: bool,
}
pub use models::*;
pub use state::*;
pub use utils::*;

View file

@ -4,7 +4,7 @@ use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct organization {
pub struct Organization {
pub org_id: Uuid,
pub name: String,
pub slug: String,
@ -17,8 +17,8 @@ pub struct Bot {
pub name: String,
pub status: BotStatus,
pub config: serde_json::Value,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::Type)]
@ -30,11 +30,6 @@ pub enum BotStatus {
Maintenance,
}
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TriggerKind {
Scheduled = 0,
@ -58,10 +53,66 @@ impl TriggerKind {
#[derive(Debug, FromRow, Serialize, Deserialize)]
pub struct Automation {
pub id: Uuid,
pub kind: i32, // Using number for trigger type
pub kind: i32,
pub target: Option<String>,
pub schedule: Option<String>,
pub param: String,
pub is_active: bool,
pub last_triggered: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct UserSession {
pub id: Uuid,
pub user_id: Uuid,
pub bot_id: Uuid,
pub title: String,
pub context_data: serde_json::Value,
pub answer_mode: String,
pub current_tool: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingRequest {
pub text: String,
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingResponse {
pub embedding: Vec<f32>,
pub model: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResult {
pub text: String,
pub similarity: f32,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserMessage {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub media_url: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BotResponse {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub stream_token: Option<String>,
pub is_complete: bool,
}

View file

@ -1,12 +1,16 @@
use std::sync::Arc;
use minio::s3::Client;
use crate::{config::AppConfig, web_automator::BrowserPool};
use crate::{
bot::BotOrchestrator,
channels::{VoiceAdapter, WebChannelAdapter, WhatsAppAdapter},
config::AppConfig,
tools::ToolApi,
web_automation::BrowserPool
};
#[derive(Clone)]
pub struct AppState {
pub minio_client: Option<Client>,
pub minio_client: Option<minio::s3::Client>,
pub config: Option<AppConfig>,
pub db: Option<sqlx::PgPool>,
pub db_custom: Option<sqlx::PgPool>,
@ -15,9 +19,10 @@ pub struct AppState {
pub web_adapter: Arc<WebChannelAdapter>,
pub voice_adapter: Arc<VoiceAdapter>,
pub whatsapp_adapter: Arc<WhatsAppAdapter>,
tool_api: Arc<ToolApi>, // Add this
pub tool_api: Arc<ToolApi>,
}
pub struct _BotState {
pub struct BotState {
pub language: String,
pub work_folder: String,
}

View file

@ -1,15 +1,10 @@
use crate::config::AIConfig;
use langchain_rust::llm::OpenAI;
use langchain_rust::{language_models::llm::LLM, llm::AzureConfig};
use log::error;
use chrono::{DateTime, Utc};
use langchain_rust::llm::AzureConfig;
use log::{debug, warn};
use rhai::{Array, Dynamic};
use serde_json::{json, Value};
use smartstring::SmartString;
use sqlx::Column; // Required for .name() method
use sqlx::TypeInfo; // Required for .type_info() method
use sqlx::{postgres::PgRow, Row};
use sqlx::{Decode, Type};
use sqlx::{postgres::PgRow, Column, Decode, Row, Type, TypeInfo};
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
@ -18,6 +13,7 @@ use tokio::fs::File as TokioFile;
use tokio_stream::StreamExt;
use zip::ZipArchive;
use crate::config::AIConfig;
use reqwest::Client;
use tokio::io::AsyncWriteExt;
@ -34,16 +30,14 @@ pub async fn call_llm(
ai_config: &AIConfig,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let azure_config = azure_from_config(&ai_config.clone());
let open_ai = OpenAI::new(azure_config);
let open_ai = langchain_rust::llm::OpenAI::new(azure_config);
// Directly use the input text as prompt
let prompt = text.to_string();
// Call LLM and return the raw text response
match open_ai.invoke(&prompt).await {
Ok(response_text) => Ok(response_text),
Err(err) => {
error!("Error invoking LLM API: {}", err);
log::error!("Error invoking LLM API: {}", err);
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to invoke LLM API",
@ -79,6 +73,7 @@ pub fn extract_zip_recursive(
Ok(())
}
pub fn row_to_json(row: PgRow) -> Result<Value, Box<dyn Error>> {
let mut result = serde_json::Map::new();
let columns = row.columns();
@ -131,17 +126,15 @@ where
}
fn handle_json(row: &PgRow, idx: usize, col_name: &str) -> Value {
// First try to get as Option<Value>
match row.try_get::<Option<Value>, _>(idx) {
Ok(Some(val)) => {
debug!("Successfully read JSON column {} as Value", col_name);
return val;
}
Ok(None) => return Value::Null,
Err(_) => (), // Fall through to other attempts
Err(_) => (),
}
// Try as Option<String> that might contain JSON
match row.try_get::<Option<String>, _>(idx) {
Ok(Some(s)) => match serde_json::from_str(&s) {
Ok(val) => val,
@ -157,6 +150,7 @@ fn handle_json(row: &PgRow, idx: usize, col_name: &str) -> Value {
}
}
}
pub fn json_value_to_dynamic(value: &Value) -> Dynamic {
match value {
Value::Null => Dynamic::UNIT,
@ -184,16 +178,12 @@ pub fn json_value_to_dynamic(value: &Value) -> Dynamic {
}
}
/// Converts any value to an array - single values become single-element arrays
pub fn to_array(value: Dynamic) -> Array {
if value.is_array() {
// Already an array - return as-is
value.cast::<Array>()
} else if value.is_unit() || value.is::<()>() {
// Handle empty/unit case
Array::new()
} else {
// Convert single value to single-element array
Array::from([value])
}
}
@ -218,7 +208,6 @@ pub async fn download_file(url: &str, output_path: &str) -> Result<(), Box<dyn s
Ok(())
}
// Helper function to parse the filter string into SQL WHERE clause and parameters
pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn Error>> {
let parts: Vec<&str> = filter_str.split('=').collect();
if parts.len() != 2 {
@ -228,7 +217,6 @@ pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn E
let column = parts[0].trim();
let value = parts[1].trim();
// Validate column name to prevent SQL injection
if !column
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
@ -236,11 +224,9 @@ pub fn parse_filter(filter_str: &str) -> Result<(String, Vec<String>), Box<dyn E
return Err("Invalid column name in filter".into());
}
// Return the parameterized query part and the value separately
Ok((format!("{} = $1", column), vec![value.to_string()]))
}
// Parse filter without adding quotes
pub fn parse_filter_with_offset(
filter_str: &str,
offset: usize,
@ -265,7 +251,7 @@ pub fn parse_filter_with_offset(
}
clauses.push(format!("{} = ${}", column, i + 1 + offset));
params.push(value.to_string()); // Store raw value without quotes
params.push(value.to_string());
}
Ok((clauses.join(" AND "), params))

View file

@ -1,13 +1,18 @@
// services/tools/mod.rs
use async_trait::async_trait;
use redis::AsyncCommands;
use rhai::Engine;
use rhai::{Engine, Scope};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
use crate::{
channels::ChannelAdapter,
session::SessionManager,
shared::BotResponse,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolResult {
pub success: bool,
@ -31,18 +36,20 @@ pub trait ToolExecutor: Send + Sync {
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>>;
) -> Result<ToolResult, Box<dyn std::error::Error + Send + Sync>>;
async fn provide_input(
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>>;
async fn get_output(&self, session_id: &str)
-> Result<Vec<String>, Box<dyn std::error::Error>>;
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn get_output(
&self,
session_id: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
async fn is_waiting_for_input(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>>;
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>>;
}
pub struct RedisToolExecutor {
@ -58,7 +65,7 @@ impl RedisToolExecutor {
web_adapter: Arc<dyn ChannelAdapter>,
voice_adapter: Arc<dyn ChannelAdapter>,
whatsapp_adapter: Arc<dyn ChannelAdapter>,
) -> Result<Self, Box<dyn std::error::Error>> {
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = redis::Client::open(redis_url)?;
Ok(Self {
redis_client: client,
@ -74,7 +81,7 @@ impl RedisToolExecutor {
user_id: &str,
channel: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id.to_string(),
@ -97,7 +104,6 @@ impl RedisToolExecutor {
fn create_rhai_engine(&self, session_id: String, user_id: String, channel: String) -> Engine {
let mut engine = Engine::new();
// Clone for TALK function
let tool_executor = Arc::new((
self.redis_client.clone(),
self.web_adapter.clone(),
@ -118,7 +124,6 @@ impl RedisToolExecutor {
tokio::spawn(async move {
let (redis_client, web_adapter, voice_adapter, whatsapp_adapter) = &*tool_executor;
// Send message through appropriate channel
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id.clone(),
@ -141,7 +146,6 @@ impl RedisToolExecutor {
log::error!("Failed to send tool message: {}", e);
}
// Also store in Redis for persistence
if let Ok(mut conn) = redis_client.get_async_connection().await {
let output_key = format!("tool:{}:output", session_id);
let _ = conn.lpush(&output_key, &message).await;
@ -149,7 +153,6 @@ impl RedisToolExecutor {
});
});
// Clone for HEAR function
let hear_executor = self.redis_client.clone();
let session_id_clone = session_id.clone();
@ -164,14 +167,9 @@ impl RedisToolExecutor {
let input_key = format!("tool:{}:input", session_id);
let waiting_key = format!("tool:{}:waiting", session_id);
// Set waiting flag
let _ = conn.set_ex(&waiting_key, "true", 300).await;
// Wait for input
let result: Option<(String, String)> =
conn.brpop(&input_key, 30).await.ok().flatten();
// Clear waiting flag
let _ = conn.del(&waiting_key).await;
result
@ -189,8 +187,8 @@ impl RedisToolExecutor {
engine
}
async fn cleanup_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
async fn cleanup_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let keys = vec![
format!("tool:{}:output", session_id),
@ -214,11 +212,10 @@ impl ToolExecutor for RedisToolExecutor {
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>> {
) -> Result<ToolResult, Box<dyn std::error::Error + Send + Sync>> {
let tool = get_tool(tool_name).ok_or_else(|| format!("Tool not found: {}", tool_name))?;
// Store session info in Redis
let mut conn = self.redis_client.get_async_connection().await?;
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let session_key = format!("tool:{}:session", session_id);
let session_data = serde_json::json!({
"user_id": user_id,
@ -228,20 +225,16 @@ impl ToolExecutor for RedisToolExecutor {
conn.set_ex(&session_key, session_data.to_string(), 3600)
.await?;
// Mark tool as active
let active_key = format!("tool:{}:active", session_id);
conn.set_ex(&active_key, "true", 3600).await?;
// Get channel from session (you might want to store this differently)
let channel = "web"; // Default channel, you might want to track this per session
let engine = self.create_rhai_engine(
let channel = "web";
let _engine = self.create_rhai_engine(
session_id.to_string(),
user_id.to_string(),
channel.to_string(),
);
// Execute tool in background
let redis_clone = self.redis_client.clone();
let web_adapter_clone = self.web_adapter.clone();
let voice_adapter_clone = self.voice_adapter.clone();
@ -254,7 +247,6 @@ impl ToolExecutor for RedisToolExecutor {
let mut engine = Engine::new();
let mut scope = Scope::new();
// Register TALK function for background execution
let redis_client = redis_clone.clone();
let web_adapter = web_adapter_clone.clone();
let voice_adapter = voice_adapter_clone.clone();
@ -271,8 +263,7 @@ impl ToolExecutor for RedisToolExecutor {
let user_id = user_id.clone();
tokio::spawn(async move {
// Determine channel from session data
let channel = "web"; // In real implementation, get from session storage
let channel = "web";
let response = BotResponse {
bot_id: "tool_bot".to_string(),
@ -285,7 +276,6 @@ impl ToolExecutor for RedisToolExecutor {
is_complete: true,
};
// Send through appropriate channel
let send_result = match channel {
"web" => web_adapter.send_message(response).await,
"voice" => voice_adapter.send_message(response).await,
@ -297,7 +287,6 @@ impl ToolExecutor for RedisToolExecutor {
log::error!("Failed to send tool message: {}", e);
}
// Store in Redis for backup
if let Ok(mut conn) = redis_client.get_async_connection().await {
let output_key = format!("tool:{}:output", session_id);
let _ = conn.lpush(&output_key, &message).await;
@ -305,7 +294,6 @@ impl ToolExecutor for RedisToolExecutor {
});
});
// Register HEAR function
let hear_redis = redis_clone.clone();
let session_id_hear = session_id.clone();
engine.register_fn("hear", move || -> String {
@ -333,7 +321,6 @@ impl ToolExecutor for RedisToolExecutor {
})
});
// Execute the tool
match engine.eval_with_scope::<()>(&mut scope, &tool_script) {
Ok(_) => {
log::info!(
@ -342,7 +329,6 @@ impl ToolExecutor for RedisToolExecutor {
session_id
);
// Send completion message
let completion_msg =
"🛠️ Tool execution completed. How can I help you with anything else?";
let response = BotResponse {
@ -377,7 +363,6 @@ impl ToolExecutor for RedisToolExecutor {
}
}
// Cleanup
if let Ok(mut conn) = redis_clone.get_async_connection().await {
let active_key = format!("tool:{}:active", session_id_clone);
let _ = conn.del(&active_key).await;
@ -399,8 +384,8 @@ impl ToolExecutor for RedisToolExecutor {
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let input_key = format!("tool:{}:input", session_id);
conn.lpush(&input_key, input).await?;
Ok(())
@ -409,29 +394,25 @@ impl ToolExecutor for RedisToolExecutor {
async fn get_output(
&self,
session_id: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let output_key = format!("tool:{}:output", session_id);
let messages: Vec<String> = conn.lrange(&output_key, 0, -1).await?;
// Clear after reading
let _: () = conn.del(&output_key).await?;
Ok(messages)
}
async fn is_waiting_for_input(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let waiting_key = format!("tool:{}:waiting", session_id);
let exists: bool = conn.exists(&waiting_key).await?;
Ok(exists)
}
}
// Tool definitions
fn get_tool(name: &str) -> Option<Tool> {
match name {
"calculator" => Some(Tool {
@ -443,7 +424,6 @@ fn get_tool(name: &str) -> Option<Tool> {
("b".to_string(), "number".to_string()),
]),
script: r#"
// Calculator tool using TALK/HEAR pattern
let TALK = |message| {
talk(message);
};
@ -490,13 +470,71 @@ fn get_tool(name: &str) -> Option<Tool> {
}
}
#[derive(Clone)]
pub struct ToolManager {
executor: Arc<dyn ToolExecutor>,
tools: HashMap<String, Tool>,
waiting_responses: Arc<Mutex<HashMap<String, mpsc::Sender<String>>>>,
}
impl ToolManager {
pub fn new(executor: Arc<dyn ToolExecutor>) -> Self {
Self { executor }
pub fn new() -> Self {
let mut tools = HashMap::new();
let calculator_tool = Tool {
name: "calculator".to_string(),
description: "Perform calculations".to_string(),
parameters: HashMap::from([
(
"operation".to_string(),
"add|subtract|multiply|divide".to_string(),
),
("a".to_string(), "number".to_string()),
("b".to_string(), "number".to_string()),
]),
script: r#"
TALK("Calculator started. Enter first number:");
let a = HEAR();
TALK("Enter second number:");
let b = HEAR();
TALK("Operation (add/subtract/multiply/divide):");
let op = HEAR();
let num_a = a.parse::<f64>().unwrap();
let num_b = b.parse::<f64>().unwrap();
let result = if op == "add" {
num_a + num_b
} else if op == "subtract" {
num_a - num_b
} else if op == "multiply" {
num_a * num_b
} else if op == "divide" {
if num_b == 0.0 {
TALK("Cannot divide by zero");
return;
}
num_a / num_b
} else {
TALK("Invalid operation");
return;
};
TALK("Result: ".to_string() + &result.to_string());
"#
.to_string(),
};
tools.insert(calculator_tool.name.clone(), calculator_tool);
Self {
tools,
waiting_responses: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn get_tool(&self, name: &str) -> Option<&Tool> {
self.tools.get(name)
}
pub fn list_tools(&self) -> Vec<String> {
self.tools.keys().cloned().collect()
}
pub async fn execute_tool(
@ -504,33 +542,159 @@ impl ToolManager {
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>> {
self.executor.execute(tool_name, session_id, user_id).await
) -> Result<ToolResult, Box<dyn std::error::Error + Send + Sync>> {
let tool = self.get_tool(tool_name).ok_or("Tool not found")?;
Ok(ToolResult {
success: true,
output: format!("Tool {} started for user {}", tool_name, user_id),
requires_input: true,
session_id: session_id.to_string(),
})
}
pub async fn is_tool_waiting(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let waiting = self.waiting_responses.lock().await;
Ok(waiting.contains_key(session_id))
}
pub async fn provide_input(
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
self.executor.provide_input(session_id, input).await
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.provide_user_response(session_id, "default_bot", input.to_string())
.await
}
pub async fn get_tool_output(
&self,
session_id: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
self.executor.get_output(session_id).await
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
Ok(vec![])
}
pub async fn is_tool_waiting(
pub async fn execute_tool_with_session(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
self.executor.is_waiting_for_input(session_id).await
tool_name: &str,
user_id: &str,
bot_id: &str,
session_manager: SessionManager,
channel_sender: mpsc::Sender<BotResponse>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tool = self.get_tool(tool_name).ok_or("Tool not found")?;
session_manager
.set_current_tool(user_id, bot_id, Some(tool_name.to_string()))
.await?;
let user_id = user_id.to_string();
let bot_id = bot_id.to_string();
let script = tool.script.clone();
let session_manager_clone = session_manager.clone();
let waiting_responses = self.waiting_responses.clone();
tokio::spawn(async move {
let mut engine = rhai::Engine::new();
let (talk_tx, mut talk_rx) = mpsc::channel(100);
let (hear_tx, mut hear_rx) = mpsc::channel(100);
{
let key = format!("{}:{}", user_id, bot_id);
let mut waiting = waiting_responses.lock().await;
waiting.insert(key, hear_tx);
}
let channel_sender_clone = channel_sender.clone();
let user_id_clone = user_id.clone();
let bot_id_clone = bot_id.clone();
let talk_tx_clone = talk_tx.clone();
engine.register_fn("TALK", move |message: String| {
let tx = talk_tx_clone.clone();
tokio::spawn(async move {
let _ = tx.send(message).await;
});
});
let hear_rx_mutex = Arc::new(Mutex::new(hear_rx));
engine.register_fn("HEAR", move || {
let hear_rx = hear_rx_mutex.clone();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async move {
let mut receiver = hear_rx.lock().await;
receiver.recv().await.unwrap_or_default()
})
})
});
let script_result =
tokio::task::spawn_blocking(move || engine.eval::<()>(&script)).await;
if let Ok(Err(e)) = script_result {
let error_response = BotResponse {
bot_id: bot_id_clone.clone(),
user_id: user_id_clone.clone(),
session_id: Uuid::new_v4().to_string(),
channel: "test".to_string(),
content: format!("Tool error: {}", e),
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
let _ = channel_sender_clone.send(error_response).await;
}
while let Some(message) = talk_rx.recv().await {
let response = BotResponse {
bot_id: bot_id.clone(),
user_id: user_id.clone(),
session_id: Uuid::new_v4().to_string(),
channel: "test".to_string(),
content: message,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
let _ = channel_sender.send(response).await;
}
let _ = session_manager_clone
.set_current_tool(&user_id, &bot_id, None)
.await;
});
Ok(())
}
pub fn list_tools(&self) -> Vec<String> {
vec!["calculator".to_string()]
pub async fn provide_user_response(
&self,
user_id: &str,
bot_id: &str,
response: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let key = format!("{}:{}", user_id, bot_id);
let mut waiting = self.waiting_responses.lock().await;
if let Some(tx) = waiting.get_mut(&key) {
let _ = tx.send(response).await;
waiting.remove(&key);
}
Ok(())
}
}
impl Default for ToolManager {
fn default() -> Self {
Self::new()
}
}
pub struct ToolApi;
impl ToolApi {
pub fn new() -> Self {
Self
}
}

View file

@ -0,0 +1,27 @@
export BOT_ID=
./mc alias set minio http://localhost:9000 user pass
./mc admin user add minio $BOT_ID
cat > $BOT_ID-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::pragmatismo-$BOT_ID.gbai",
"arn:aws:s3:::pragmatismo-$BOT_ID.gbai/*"
]
}
]
}
EOF
./mc admin policy create minio $BOT_ID-policy $BOT_ID-policy.json
./mc admin policy attach minio $BOT_ID-policy --user $BOT_ID

29
src/utils/check-space.sh Normal file
View file

@ -0,0 +1,29 @@
df -h
printf "%-20s %-10s %-10s %-10s %-6s %s\n" "CONTAINER" "USED" "AVAIL" "TOTAL" "USE%" "MOUNT"
for container in $(lxc list -c n --format csv); do
disk_info=$(lxc exec $container -- df -h / --output=used,avail,size,pcent | tail -n 1)
printf "%-20s %s\n" "$container" "$disk_info"
done
#!/bin/bash
# Directory to analyze
TARGET_DIR="/opt/gbo/tenants/pragmatismo"
echo "Calculating sizes for directories in $TARGET_DIR..."
echo ""
# Check if directory exists
if [ ! -d "$TARGET_DIR" ]; then
echo "Error: Directory $TARGET_DIR does not exist"
exit 1
fi
# Get the size of each subdirectory
echo "Directory Size Report:"
echo "----------------------"
du -h --max-depth=1 "$TARGET_DIR" | sort -hr | awk -F'\t' '{printf "%-50s %s\n", $2, $1}'
echo ""
echo "Total size:"
du -sh "$TARGET_DIR"

71
src/utils/cleaner.sh Executable file
View file

@ -0,0 +1,71 @@
#!/bin/bash
# Cleanup script for Ubuntu Server and LXC containers
# Run with sudo privileges
echo "Starting system cleanup..."
### Host System Cleanup ###
echo -e "\n[ HOST SYSTEM CLEANUP ]"
# Package manager cache
echo "Cleaning package cache..."
apt clean
apt autoclean
apt autoremove -y
# Journal logs
echo "Cleaning journal logs..."
journalctl --vacuum-time=2d 2>/dev/null
# Temporary files
echo "Cleaning temporary files..."
rm -rf /tmp/* /var/tmp/*
# Thumbnail cache
echo "Cleaning thumbnail cache..."
rm -rf ~/.cache/thumbnails/* /root/.cache/thumbnails/*
# DNS cache
echo "Flushing DNS cache..."
systemd-resolve --flush-caches 2>/dev/null || true
# Old kernels (keep 2 latest)
echo "Removing old kernels..."
apt purge -y $(dpkg -l | awk '/^ii linux-image-*/{print $2}' | grep -v $(uname -r) | head -n -2) 2>/dev/null
# Crash reports
echo "Clearing crash reports..."
rm -f /var/crash/*
### LXC Containers Cleanup ###
echo -e "\n[ LXC CONTAINERS CLEANUP ]"
# Check if LXC is installed
if command -v lxc >/dev/null 2>&1; then
for container in $(lxc list -c n --format csv | grep -v "^$"); do
echo -e "\nCleaning container: $container"
# Execute cleanup commands in container
lxc exec "$container" -- bash -c "
echo 'Cleaning package cache...'
apt clean && apt autoclean && apt autoremove -y
echo 'Cleaning temporary files...'
rm -rf /tmp/* /var/tmp/*
echo 'Cleaning logs...'
rm -rf /opt/gbo/logs/*
echo 'Cleaning journal logs...'
journalctl --vacuum-time=1d 2>/dev/null || true
echo 'Cleaning thumbnail cache...'
rm -rf /home/*/.cache/thumbnails/* /root/.cache/thumbnails/*
" 2>/dev/null
done
else
echo "LXC not installed, skipping container cleanup."
fi
echo -e "\nCleanup completed!"

6
src/utils/disk-size.md Normal file
View file

@ -0,0 +1,6 @@
lxc list --format json | jq -r '.[].name' | while read container; do
echo -n "$container: "
lxc exec $container -- df -h / --output=used < /dev/null | tail -n1
done
du -h --max-depth=1 "." 2>/dev/null | sort -rh | head -n 50 | awk '{printf "%-10s %s\n", $1, $2}'

8
src/utils/email-ips.sh Normal file
View file

@ -0,0 +1,8 @@
az network public-ip list --resource-group "$CLOUD_GROUP" \
--query "[].{Name:name, IP:ipAddress, ReverseDNS:dnsSettings.reverseFqdn}" \
-o table
az network public-ip update --resource-group "$CLOUD_GROUP"
--name "pip-network-adapter-name"
--reverse-fqdn "outbound14.domain.com.br"

View file

@ -0,0 +1,65 @@
sudo apt install -y cloud-guest-utils e2fsprogs
apt install -y make g++ build-essential
apt install -y openjdk-17-jdk ant
apt install -y sudo systemd wget zip procps ccache
apt install -y automake bison flex git gperf graphviz junit4 libtool m4 nasm
apt install -y libcairo2-dev libjpeg-dev libegl1-mesa-dev libfontconfig1-dev \
libgl1-mesa-dev libgif-dev libgtk-3-dev librsvg2-dev libpango1.0-dev
apt install -y libcap-dev libcap2-bin libkrb5-dev libpcap0.8-dev openssl libssl-dev
apt install -y libxcb-dev libx11-xcb-dev libxkbcommon-x11-dev libxtst-dev \
libxrender-dev libxslt1-dev libxt-dev xsltproc
apt install -y libcunit1-dev libcppunit-dev libpam0g-dev libcups2-dev libzstd-dev uuid-runtime
apt install -y python3-dev python3-lxml python3-pip python3-polib
apt install -y nodejs npm
apt install -y libpoco-dev libpococrypto80
apt install -y libreoffice-dev
mkdir -p /opt/lo && cd /opt/lo
wget https://github.com/CollaboraOnline/online/releases/download/for-code-assets/core-co-24.04-assets.tar.gz
tar xf core-co-24.04-assets.tar.gz && rm core-co-24.04-assets.tar.gz
useradd cool -G sudo
mkdir -p /opt/cool && chown cool:cool /opt/cool
cd /opt/cool
sudo -Hu cool git clone https://github.com/CollaboraOnline/online.git
cd online && sudo -Hu cool ./autogen.sh
export CPPFLAGS=-I/opt/lo/include
export LDFLAGS=-L/opt/lo/instdir/program
./configure --with-lokit-path=/opt/lo --with-lo-path=/opt/lo/instdir --with-poco-includes=/usr/local/include --with-poco-libs=/usr/local/lib
sudo -Hu cool make -j$(nproc)
make install
mkdir -p /etc/coolwsd /usr/local/var/cache/coolwsd
chown cool:cool /usr/local/var/cache/coolwsd
admin_pwd=$(openssl rand -hex 6)
cat <<EOT > /lib/systemd/system/coolwsd.service
[Unit]
Description=Collabora Online WebSocket Daemon
After=network.target
[Service]
ExecStart=/opt/cool/online/coolwsd --o:sys_template_path=/opt/cool/online/systemplate \
--o:lo_template_path=/opt/lo/instdir --o:child_root_path=/opt/cool/online/jails \
--o:admin_console.username=admin --o:admin_console.password=$DOC_EDITOR_ADMIN_PWD \
--o:ssl.enable=false
User=cool
[Install]
WantedBy=multi-user.target
EOT
systemctl daemon-reload
systemctl enable coolwsd.service
systemctl start coolwsd.service
"
echo "Installation complete!"
echo "Admin password: $admin_pwd"
echo "Access at: https://localhost:9980"

53
src/utils/set-limits.sh Normal file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Define container limits in an associative array
declare -A container_limits=(
# Pattern Memory CPU Allowance
["*tables*"]="4096MB:100ms/100ms"
["*dns*"]="2048MB:100ms/100ms"
["*doc-editor*"]="512MB:10ms/100ms"
["*proxy*"]="2048MB:100ms/100ms"
["*directory*"]="1024MB:50ms/100ms"
["*drive*"]="4096MB:50ms/100ms"
["*email*"]="4096MB:100ms/100ms"
["*webmail*"]="4096MB:100ms/100ms"
["*bot*"]="4096MB:50ms/100ms"
["*meeting*"]="4096MB:100ms/100ms"
["*alm*"]="512MB:50ms/100ms"
["*alm-ci*"]="4096MB:100ms/100ms"
["*system*"]="4096MB:50ms/100ms"
["*mailer*"]="4096MB:25ms/100ms"
)
# Default values (for containers that don't match any pattern)
DEFAULT_MEMORY="1024MB"
DEFAULT_CPU_ALLOWANCE="15ms/100ms"
CPU_COUNT=2
CPU_PRIORITY=10
for pattern in "${!container_limits[@]}"; do
echo "Configuring $container..."
memory=$DEFAULT_MEMORY
cpu_allowance=$DEFAULT_CPU_ALLOWANCE
# Configure all containers
for container in $(lxc list -c n --format csv); do
# Check if container matches any pattern
if [[ $container == $pattern ]]; then
IFS=':' read -r memory cpu_allowance <<< "${container_limits[$pattern]}"
# Apply configuration
lxc config set "$container" limits.memory "$memory"
lxc config set "$container" limits.cpu.allowance "$cpu_allowance"
lxc config set "$container" limits.cpu "$CPU_COUNT"
lxc config set "$container" limits.cpu.priority "$CPU_PRIORITY"
echo "Restarting $container..."
lxc restart "$container"
lxc config show "$container" | grep -E "memory|cpu"
break
fi
done
done

View file

@ -0,0 +1,7 @@
lxc config device override $CONTAINER_NAME root
lxc config device set $CONTAINER_NAME root size 6GB
zpool set autoexpand=on default
zpool online -e default /var/snap/lxd/common/lxd/disks/default.img
zpool list
zfs list

6
src/utils/setup-host.sh Normal file
View file

@ -0,0 +1,6 @@
# Host
sudo lxc config set core.trust_password "$LXC_TRUST_PASSWORD"
# ALM-CI
lxc remote add bot 10.16.164.? --accept-certificate --password "$LXC_TRUST_PASSWORD"

10
src/utils/startup.sh Normal file
View file

@ -0,0 +1,10 @@
#!/bin/bash
# Disable shell timeout
sed -i '/TMOUT/d' /etc/profile /etc/bash.bashrc /etc/profile.d/*
echo 'export TMOUT=0' > /etc/profile.d/notimeout.sh
chmod +x /etc/profile.d/notimeout.sh
sed -i '/pam_exec.so/s/quiet/quiet set_timeout=0/' /etc/pam.d/sshd 2>/dev/null
source /etc/profile

View file

@ -2,15 +2,15 @@
// sudo dpkg -i google-chrome-stable_current_amd64.deb
use log::info;
use crate::utils;
use crate::shared::utils;
use std::env;
use std::error::Error;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::sync::Arc;
use std::path::PathBuf;
use thirtyfour::{DesiredCapabilities, WebDriver};
use tokio::fs;
use tokio::sync::Semaphore;
@ -76,32 +76,33 @@ impl BrowserSetup {
})
}
async fn find_brave() -> Result<String, Box<dyn std::error::Error>> {
let mut possible_paths = vec![
// Windows - Program Files
String::from(r"C:\Program Files\BraveSoftware\Brave-Browser\Application\brave.exe"),
// macOS
String::from("/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"),
// Linux
String::from("/usr/bin/brave-browser"),
String::from("/usr/bin/brave"),
];
async fn find_brave() -> Result<String, Box<dyn std::error::Error>> {
let mut possible_paths = vec![
// Windows - Program Files
String::from(r"C:\Program Files\BraveSoftware\Brave-Browser\Application\brave.exe"),
// macOS
String::from("/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"),
// Linux
String::from("/usr/bin/brave-browser"),
String::from("/usr/bin/brave"),
];
// Windows - AppData (usuário atual)
if let Ok(local_appdata) = env::var("LOCALAPPDATA") {
let mut path = PathBuf::from(local_appdata);
path.push("BraveSoftware\\Brave-Browser\\Application\\brave.exe");
possible_paths.push(path.to_string_lossy().to_string());
}
for path in possible_paths {
if fs::metadata(&path).await.is_ok() {
return Ok(path);
// Windows - AppData (usuário atual)
if let Ok(local_appdata) = env::var("LOCALAPPDATA") {
let mut path = PathBuf::from(local_appdata);
path.push("BraveSoftware\\Brave-Browser\\Application\\brave.exe");
possible_paths.push(path.to_string_lossy().to_string());
}
}
Err("Brave browser not found. Please install Brave first.".into())
} async fn setup_chromedriver() -> Result<String, Box<dyn std::error::Error>> {
for path in possible_paths {
if fs::metadata(&path).await.is_ok() {
return Ok(path);
}
}
Err("Brave browser not found. Please install Brave first.".into())
}
async fn setup_chromedriver() -> Result<String, Box<dyn std::error::Error>> {
// Create chromedriver directory in executable's parent directory
let mut chromedriver_dir = env::current_exe()?.parent().unwrap().to_path_buf();
chromedriver_dir.push("chromedriver");