- First LLM version like v5.

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-04 20:42:49 -03:00
parent 2d15133555
commit 694ca28d6f
38 changed files with 4650 additions and 431 deletions

1731
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,60 +4,58 @@ version = "0.1.0"
edition = "2021" edition = "2021"
authors = ["Rodrigo Rodriguez <me@rodrigorodriguez.com>"] authors = ["Rodrigo Rodriguez <me@rodrigorodriguez.com>"]
description = "General Bots Server" description = "General Bots Server"
license = "AGPL" license = "AGPL-3.0"
repository = "https://alm.pragmatismo.com.br/generalbots/gbserver" repository = "https://alm.pragmatismo.com.br/generalbots/gbserver"
[features] [features]
default = [] default = ["postgres", "qdrant"]
local_llm = [] local_llm = []
postgres = ["sqlx/postgres"]
qdrant = ["langchain-rust/qdrant"]
[dependencies] [dependencies]
actix-cors = "0.6" actix-cors = "0.7"
actix-multipart = "0.6" actix-multipart = "0.7"
actix-web = "4" actix-web = "4.9"
actix-ws = "0.3.0" actix-ws = "0.3"
thirtyfour = { version = "0.30" }
downloader = "0.2.8"
anyhow = "1.0" anyhow = "1.0"
async-stream = "0.3" async-stream = "0.3"
bytes = "1.1" async-trait = "0.1"
aes-gcm = "0.10"
argon2 = "0.5"
base64 = "0.22"
bytes = "1.8"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
dotenv = "0.15" dotenv = "0.15"
env_logger = "0.10" downloader = "0.2"
env_logger = "0.11"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
imap = "2.0" imap = "2.4"
langchain-rust = "4.4.3" langchain-rust = { version = "4.6", features = ["qdrant", "postgres"] }
lettre = { version = "0.10", features = [ lettre = { version = "0.11", features = ["smtp-transport", "builder", "tokio1", "tokio1-native-tls"] }
"smtp-transport", livekit = "0.7"
"builder", log = "0.4"
"tokio1", mailparse = "0.15"
"tokio1-native-tls",
] }
log = "0.4.28"
mailparse = "0.13"
minio = { git = "https://github.com/minio/minio-rs", branch = "master" } minio = { git = "https://github.com/minio/minio-rs", branch = "master" }
native-tls = "0.2" native-tls = "0.2"
reqwest = { version = "0.11", features = ["json", "stream"] } num-format = "0.4"
rhai = "1.22.2" qdrant-client = "1.12"
rhai = "1.22"
redis = { version = "0.27", features = ["tokio-comp"] }
regex = "1.11"
reqwest = { version = "0.12", features = ["json", "stream"] }
scraper = "0.20"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
smartstring = "1.0" # Use the latest version from crates.io smartstring = "1.0"
sqlx = { version = "0.7", features = [ sqlx = { version = "0.8", features = ["time", "uuid", "runtime-tokio-rustls", "postgres", "chrono"] }
"time",
"uuid",
"runtime-tokio-rustls",
"postgres",
"chrono",
] }
tempfile = "3" tempfile = "3"
tokio = { version = "1", features = ["full"] } thirtyfour = "0.34"
tokio-stream = "0.1.17" tokio = { version = "1.41", features = ["full"] }
tokio-stream = "0.1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt"] } tracing-subscriber = { version = "0.3", features = ["fmt"] }
scraper = "0.18"
urlencoding = "2.1" urlencoding = "2.1"
regex = "1.10" uuid = { version = "1.11", features = ["serde", "v4"] }
uuid = { version = "1.4", features = ["serde", "v4"] } # v4, v7, etc. as needed zip = "2.2"
zip = "4.3.0"
num-format = "0.4"

7
src/PROMPT.md Normal file
View file

@ -0,0 +1,7 @@
* Output a single `.sh` script using `cat` so it can be restored directly.
* No placeholders, only real, production-ready code.
* No comments, no explanations, no extra text.
* Follow KISS principles.
* Provide a complete, professional, working solution.
* If the script is too long, split into multiple parts, but always return the **entire code**.
* Output must be **only the code**, nothing else.

View file

@ -1,23 +1,30 @@
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use log::info; use log::info;
use qdrant_client::Qdrant;
use std::sync::Arc; use std::sync::Arc;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
use dotenv::dotenv; use dotenv::dotenv;
use services::state::*;
use services::{config::*, file::*};
use sqlx::PgPool; use sqlx::PgPool;
use crate::services::auth::AuthService;
use crate::services::automation::AutomationService; use crate::services::automation::AutomationService;
use crate::services::channels::ChannelAdapter;
use crate::services::config::AppConfig;
use crate::services::email::{ use crate::services::email::{
get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email, get_emails, get_latest_email_from, list_emails, save_click, save_draft, send_email,
}; };
use crate::services::llm::{chat, chat_stream}; use crate::services::file::{list_file, upload_file};
use crate::services::llm_generic::generic_chat_completions; use crate::services::llm_generic::generic_chat_completions;
use crate::services::llm_local::{ use crate::services::llm_local::{
chat_completions_local, embeddings_local, ensure_llama_servers_running, chat_completions_local, embeddings_local, ensure_llama_servers_running,
}; };
use crate::services::orchestrator::BotOrchestrator;
use crate::services::session::SessionManager;
use crate::services::state::AppState;
use crate::services::tools::{RedisToolExecutor, ToolManager};
use crate::services::web_automation::{initialize_browser_pool, BrowserPool}; use crate::services::web_automation::{initialize_browser_pool, BrowserPool};
use crate::services::whatsapp::WhatsAppAdapter;
mod models; mod models;
mod services; mod services;
@ -34,7 +41,7 @@ async fn main() -> std::io::Result<()> {
let db = PgPool::connect(&db_url).await.unwrap(); let db = PgPool::connect(&db_url).await.unwrap();
let db_custom = PgPool::connect(&db_custom_url).await.unwrap(); let db_custom = PgPool::connect(&db_custom_url).await.unwrap();
let minio_client = init_minio(&config) let minio_client = services::file::init_minio(&config)
.await .await
.expect("Failed to initialize Minio"); .expect("Failed to initialize Minio");
@ -52,47 +59,179 @@ async fn main() -> std::io::Result<()> {
.await .await
.expect("Failed to initialize browser pool"); .expect("Failed to initialize browser pool");
// Initialize Redis if available
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "".to_string());
let redis_conn = match std::env::var("REDIS_URL") {
Ok(redis_url_value) => {
let client = redis::Client::open(redis_url_value.clone())
.expect("Failed to create Redis client");
let conn = redis::aio::Connection::new(client)
.await
.expect("Failed to create Redis connection");
Some(Arc::new(conn))
}
Err(_) => None,
};
let qdrant_url = std::env::var("QDRANT_URL").unwrap_or("http://localhost:6334".to_string());
let qdrant = Qdrant::from_url(&qdrant_url)
.build()
.expect("Failed to connect to Qdrant");
let session_manager = SessionManager::new(db.clone(), redis_conn.clone());
let auth_service = AuthService::new(db.clone(), redis_conn.clone());
let llm_provider: Arc<dyn crate::services::llm_local::LLMProvider> =
match std::env::var("LLM_PROVIDER")
.unwrap_or("mock".to_string())
.as_str()
{
"openai" => Arc::new(crate::services::llm_local::OpenAIClient::new(
std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY required"),
)),
"anthropic" => Arc::new(crate::services::llm_local::AnthropicClient::new(
std::env::var("ANTHROPIC_API_KEY").expect("ANTHROPIC_API_KEY required"),
)),
_ => Arc::new(crate::services::llm_local::MockLLMProvider::new()),
};
let web_adapter = Arc::new(crate::services::channels::WebChannelAdapter::new());
let voice_adapter = Arc::new(crate::services::channels::VoiceAdapter::new(
std::env::var("LIVEKIT_URL").unwrap_or("ws://localhost:7880".to_string()),
std::env::var("LIVEKIT_API_KEY").unwrap_or("dev".to_string()),
std::env::var("LIVEKIT_API_SECRET").unwrap_or("secret".to_string()),
));
let whatsapp_adapter = Arc::new(crate::services::whatsapp::WhatsAppAdapter::new(
std::env::var("META_ACCESS_TOKEN").unwrap_or("".to_string()),
std::env::var("META_PHONE_NUMBER_ID").unwrap_or("".to_string()),
std::env::var("META_WEBHOOK_VERIFY_TOKEN").unwrap_or("".to_string()),
));
let tool_executor = Arc::new(
RedisToolExecutor::new(
redis_url.as_str(),
web_adapter.clone() as Arc<dyn crate::services::channels::ChannelAdapter>,
db.clone(),
redis_conn.clone(),
)
.expect("Failed to create RedisToolExecutor"),
);
let chart_generator = ChartGenerator::new().map(Arc::new);
// Initialize LangChain components
let llm = OpenAI::default();
let llm_provider: Arc<dyn LLMProvider> = Arc::new(OpenAIClient::new(llm));
// Initialize vector store for document mode
let vector_store = if let (Ok(qdrant_url), Ok(openai_key)) =
(std::env::var("QDRANT_URL"), std::env::var("OPENAI_API_KEY"))
{
let embedder = OpenAiEmbedder::default().with_api_key(openai_key);
let client = QdrantClient::from_url(&qdrant_url).build().ok()?;
let store = StoreBuilder::new()
.embedder(embedder)
.client(client)
.collection_name("documents")
.build()
.await
.ok()?;
Some(Arc::new(store))
} else {
None
};
// Initialize SQL chain for database mode
let sql_chain = if let Ok(db_url) = std::env::var("DATABASE_URL") {
let engine = PostgreSQLEngine::new(&db_url).await.ok()?;
let db = SQLDatabaseBuilder::new(engine).build().await.ok()?;
let llm = OpenAI::default();
let chain = langchain_rust::chain::SQLDatabaseChainBuilder::new()
.llm(llm)
.top_k(5)
.database(db)
.build()
.ok()?;
Some(Arc::new(chain))
} else {
None
};
let tool_manager = ToolManager::new();
let orchestrator = BotOrchestrator::new(
session_manager,
tool_manager,
llm_provider,
auth_service,
chart_generator,
vector_store,
sql_chain,
);
orchestrator.add_channel("web", web_adapter.clone());
orchestrator.add_channel("voice", voice_adapter.clone());
orchestrator.add_channel("whatsapp", whatsapp_adapter.clone());
sqlx::query(
"INSERT INTO bots (id, name, llm_provider) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
)
.bind(uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap())
.bind("Default Bot")
.bind("mock")
.execute(&db)
.await
.unwrap();
let app_state = web::Data::new(AppState { let app_state = web::Data::new(AppState {
db: db.into(), db: db.into(),
db_custom: db_custom.into(), db_custom: db_custom.into(),
config: Some(config.clone()), config: Some(config.clone()),
minio_client: minio_client.into(), minio_client: minio_client.into(),
browser_pool: browser_pool.clone(), browser_pool: browser_pool.clone(),
orchestrator: Arc::new(orchestrator),
web_adapter,
voice_adapter,
whatsapp_adapter,
}); });
// Start automation service in background // Start automation service in background
let automation_state = app_state.get_ref().clone(); // This gets the Arc<AppState> let automation_state = app_state.get_ref().clone();
let automation = AutomationService::new(automation_state, "src/prompts"); let automation = AutomationService::new(automation_state, "src/prompts");
let _automation_handle = automation.spawn(); let _automation_handle = automation.spawn();
// Start HTTP server // Start HTTP server
HttpServer::new(move || { HttpServer::new(move || {
// let cors = Cors::default()
// .send_wildcard()
// .allowed_methods(vec!["GET", "POST", "PUT", "DELETE"])
// .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT])
// .allowed_header(header::CONTENT_TYPE)
// .max_age(3600);
//.wrap(cors)
App::new() App::new()
.wrap(Logger::default()) .wrap(Logger::default())
.wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i")) .wrap(Logger::new("HTTP REQUEST: %a %{User-Agent}i"))
.app_data(app_state.clone()) .app_data(app_state.clone())
// Original services
.service(upload_file) .service(upload_file)
.service(list_file) .service(list_file)
.service(save_click) .service(save_click)
.service(get_emails) .service(get_emails)
.service(list_emails) .service(list_emails)
.service(send_email) .service(send_email)
.service(chat_stream) .service(crate::services::orchestrator::chat_stream)
.service(chat) .service(crate::services::orchestrator::chat)
.service(chat_completions_local) .service(chat_completions_local)
.service(save_draft) .service(save_draft)
.service(generic_chat_completions) .service(generic_chat_completions)
.service(embeddings_local) .service(embeddings_local)
.service(get_latest_email_from) .service(get_latest_email_from)
.service(services::orchestrator::websocket_handler)
.service(services::orchestrator::whatsapp_webhook_verify)
.service(services::orchestrator::whatsapp_webhook)
.service(services::orchestrator::voice_start)
.service(services::orchestrator::voice_stop)
.service(services::orchestrator::create_session)
.service(services::orchestrator::get_sessions)
.service(services::orchestrator::get_session_history)
.service(services::orchestrator::index)
}) })
.bind((config.server.host.clone(), config.server.port))? .bind((config.server.host.clone(), config.server.port))?
.run() .run()

View file

@ -0,0 +1,7 @@
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/valkey.gpg
echo "deb [signed-by=/usr/share/keyrings/valkey.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/valkey.list
sudo apt update
sudo apt install valkey-server
sudo systemctl enable valkey-server
sudo systemctl start valkey-server

View file

@ -7,9 +7,14 @@ sudo iptables -A FORWARD -i $PUBLIC_INTERFACE -o lxcbr0 -p tcp -m multiport --dp
sudo iptables -A FORWARD -i lxcbr0 -o $PUBLIC_INTERFACE -m state --state RELATED,ESTABLISHED -j ACCEPT sudo iptables -A FORWARD -i lxcbr0 -o $PUBLIC_INTERFACE -m state --state RELATED,ESTABLISHED -j ACCEPT
sudo iptables -t nat -A POSTROUTING -o $PUBLIC_INTERFACE -j MASQUERADE sudo iptables -t nat -A POSTROUTING -o $PUBLIC_INTERFACE -j MASQUERADE
# IPv6 firewall
sudo ip6tables -A FORWARD -i $PUBLIC_INTERFACE -o lxcbr0 -p tcp -m multiport --dports 25,80,110,143,465,587,993,995,4190 -j ACCEPT
sudo ip6tables -A FORWARD -i lxcbr0 -o $PUBLIC_INTERFACE -m state --state RELATED,ESTABLISHED -j ACCEPT
# Save iptables rules permanently (adjust based on your distro) # Save iptables rules permanently (adjust based on your distro)
if command -v iptables-persistent >/dev/null; then if command -v iptables-persistent >/dev/null; then
sudo iptables-save | sudo tee /etc/iptables/rules.v4 sudo iptables-save | sudo tee /etc/iptables/rules.v4
sudo ip6tables-save | sudo tee /etc/iptables/rules.v6
fi fi
@ -61,9 +66,9 @@ sudo chown -R "$HOST_EMAIL_UID:$HOST_EMAIL_GID" "$HOST_BASE"
# Mount directories # Mount directories
echo "[CONTAINER] Mounting directories..." echo "[CONTAINER] Mounting directories..."
lxc config device add emailprofile emaildata disk source="$HOST_DATA" path=/opt/gbo/data lxc config device add "$PARAM_TENANT"-email emaildata disk source="$HOST_DATA" path=/opt/gbo/data
lxc config device add emailprofile emailconf disk source="$HOST_CONF" path=/opt/gbo/conf lxc config device add "$PARAM_TENANT"-email emailconf disk source="$HOST_CONF" path=/opt/gbo/conf
lxc config device add emailprofile emaillogs disk source="$HOST_LOGS" path=/opt/gbo/logs lxc config device add "$PARAM_TENANT"-email emaillogs disk source="$HOST_LOGS" path=/opt/gbo/logs
# Create systemd service # Create systemd service
echo "[CONTAINER] Creating email service..." echo "[CONTAINER] Creating email service..."
@ -92,7 +97,11 @@ systemctl enable email
systemctl start email systemctl start email
" "
# FIXED: IPv4 + IPv6 proxy devices
for port in 25 80 110 143 465 587 993 995 4190; do for port in 25 80 110 143 465 587 993 995 4190; do
lxc config device remove email "port-$port" 2>/dev/null || true lxc config device remove "$PARAM_TENANT"-email "port-$port" 2>/dev/null || true
lxc config device add email "port-$port" proxy listen=tcp:0.0.0.0:$port connect=tcp:127.0.0.1:$port lxc config device add "$PARAM_TENANT"-email "port-$port" proxy \
listen=tcp:0.0.0.0:$port \
listen=tcp:[::]:$port \
connect=tcp:127.0.0.1:$port
done done

View file

@ -0,0 +1,4 @@
#!/bin/bash
wget https://github.com/qdrant/qdrant/releases/latest/download/qdrant-x86_64-unknown-linux-gnu.tar.gz
tar -xzf qdrant-x86_64-unknown-linux-gnu.tar.gz
./qdrant

View file

@ -0,0 +1,129 @@
-- User authentication and profiles
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(255) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
phone_number VARCHAR(50),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
is_active BOOLEAN DEFAULT true
);
-- Bot configurations
CREATE TABLE IF NOT EXISTS bots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
llm_provider VARCHAR(100) NOT NULL,
llm_config JSONB NOT NULL DEFAULT '{}',
context_provider VARCHAR(100) NOT NULL,
context_config JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
is_active BOOLEAN DEFAULT true
);
-- User sessions with optimized storage
CREATE TABLE IF NOT EXISTS user_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL DEFAULT 'New Conversation',
answer_mode VARCHAR(50) NOT NULL DEFAULT 'direct',
context_data JSONB NOT NULL DEFAULT '{}',
current_tool VARCHAR(255),
message_count INTEGER NOT NULL DEFAULT 0,
total_tokens INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_activity TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, bot_id, title)
);
-- Encrypted message history with analytics-friendly structure
CREATE TABLE IF NOT EXISTS message_history (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES user_sessions(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL CHECK (role IN ('user', 'assistant', 'system')),
content_encrypted TEXT NOT NULL,
message_type VARCHAR(50) NOT NULL DEFAULT 'text',
media_url TEXT,
token_count INTEGER NOT NULL DEFAULT 0,
processing_time_ms INTEGER,
llm_model VARCHAR(100),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
message_index INTEGER NOT NULL
);
-- Bot channel configurations
CREATE TABLE IF NOT EXISTS bot_channels (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
channel_type VARCHAR(50) NOT NULL CHECK (channel_type IN ('web', 'whatsapp', 'meet', 'api')),
config JSONB NOT NULL DEFAULT '{}',
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(bot_id, channel_type)
);
-- WhatsApp number mappings
CREATE TABLE IF NOT EXISTS whatsapp_numbers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
phone_number VARCHAR(50) NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(phone_number, bot_id)
);
-- User email mappings for web channel
CREATE TABLE IF NOT EXISTS user_emails (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
email VARCHAR(255) NOT NULL,
is_primary BOOLEAN DEFAULT false,
verified BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(email)
);
-- Tools registry
CREATE TABLE IF NOT EXISTS tools (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) UNIQUE NOT NULL,
description TEXT NOT NULL,
parameters JSONB NOT NULL DEFAULT '{}',
script TEXT NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Manual context injections
CREATE TABLE IF NOT EXISTS context_injections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES user_sessions(id) ON DELETE CASCADE,
injected_by UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
context_data JSONB NOT NULL,
reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Analytics tables
CREATE TABLE IF NOT EXISTS usage_analytics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
session_id UUID NOT NULL REFERENCES user_sessions(id) ON DELETE CASCADE,
date DATE NOT NULL DEFAULT CURRENT_DATE,
message_count INTEGER NOT NULL DEFAULT 0,
total_tokens INTEGER NOT NULL DEFAULT 0,
total_processing_time_ms INTEGER NOT NULL DEFAULT 0
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_message_history_session_id ON message_history(session_id);
CREATE INDEX IF NOT EXISTS idx_message_history_created_at ON message_history(created_at);
CREATE INDEX IF NOT EXISTS idx_user_sessions_user_bot ON user_sessions(user_id, bot_id);
CREATE INDEX IF NOT EXISTS idx_usage_analytics_date ON usage_analytics(date);

View file

@ -0,0 +1,57 @@
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
username VARCHAR(255) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS bots (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
llm_provider VARCHAR(100) NOT NULL,
config JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_sessions (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
bot_id UUID NOT NULL REFERENCES bots(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
context_data JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, bot_id)
);
CREATE TABLE IF NOT EXISTS message_history (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
session_id UUID NOT NULL REFERENCES user_sessions(id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL,
content_encrypted TEXT NOT NULL,
message_type VARCHAR(50) DEFAULT 'text',
message_index INTEGER NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_user_sessions_user_id ON user_sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_user_sessions_bot_id ON user_sessions(bot_id);
CREATE INDEX IF NOT EXISTS idx_message_history_session_id ON message_history(session_id);
CREATE INDEX IF NOT EXISTS idx_message_history_user_id ON message_history(user_id);
CREATE INDEX IF NOT EXISTS idx_message_history_created_at ON message_history(created_at);
INSERT INTO bots (id, name, llm_provider)
VALUES ('00000000-0000-0000-0000-000000000000', 'Default Bot', 'mock')
ON CONFLICT (id) DO NOTHING;
INSERT INTO users (id, username, email, password_hash)
VALUES ('00000000-0000-0000-0000-000000000001', 'demo', 'demo@example.com', '$argon2id$v=19$m=19456,t=2,p=1$c29tZXNhbHQ$RdescudvJCsgt3ub+b+dWRWJTmaaJObG')
ON CONFLICT (id) DO NOTHING;

90
src/services/auth/mod.rs Normal file
View file

@ -0,0 +1,90 @@
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
Argon2,
};
use redis::aio::Connection as ConnectionManager;
use redis::AsyncCommands;
use sqlx::PgPool;
use std::sync::Arc;
use uuid::Uuid;
pub struct AuthService {
pub pool: PgPool,
pub redis: Option<Arc<ConnectionManager>>,
}
impl AuthService {
pub fn new(pool: PgPool, redis: Option<Arc<ConnectionManager>>) -> Self {
Self { pool, redis }
}
pub async fn verify_user(
&self,
username: &str,
password: &str,
) -> Result<Option<Uuid>, Box<dyn std::error::Error>> {
// Try Redis cache first
if let Some(redis) = &self.redis {
let cache_key = format!("auth:user:{}", username);
if let Ok(user_id_str) = redis.clone().get::<_, String>(cache_key).await {
if let Ok(user_id) = Uuid::parse_str(&user_id_str) {
return Ok(Some(user_id));
}
}
}
// Fallback to database
let user = sqlx::query(
"SELECT id, password_hash FROM users WHERE username = $1 AND is_active = true",
)
.bind(username)
.fetch_optional(&self.pool)
.await?;
if let Some(row) = user {
let user_id: Uuid = row.get("id");
let password_hash: String = row.get("password_hash");
let parsed_hash = PasswordHash::new(&password_hash)?;
if Argon2::default()
.verify_password(password.as_bytes(), &parsed_hash)
.is_ok()
{
// Cache in Redis
if let Some(redis) = &self.redis {
let cache_key = format!("auth:user:{}", username);
let _: () = redis
.clone()
.set_ex(cache_key, user_id.to_string(), 3600)
.await?;
}
return Ok(Some(user_id));
}
}
Ok(None)
}
pub async fn create_user(
&self,
username: &str,
email: &str,
password: &str,
) -> Result<Uuid, Box<dyn std::error::Error>> {
let salt = SaltString::generate(&mut OsRng);
let argon2 = Argon2::default();
let password_hash = argon2
.hash_password(password.as_bytes(), &salt)?
.to_string();
let user_id = sqlx::query(
"INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id",
)
.bind(username)
.bind(email)
.bind(password_hash)
.fetch_one(&self.pool)
.await?
.get("id");
Ok(user_id)
}
}

View file

@ -0,0 +1,884 @@
use actix_cors::Cors;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Result};
use actix_ws::Message;
use chrono::Utc;
use langchain_rust::{
chain::{Chain, LLMChain, LLMChainBuilder},
embedding::openai::openai_embedder::OpenAiEmbedder,
llm::openai::OpenAI,
memory::SimpleMemory,
prompt_args,
schemas::{Document, Message},
tools::{postgres::PostgreSQLEngine, SQLDatabaseBuilder},
vectorstore::qdrant::{Qdrant as LangChainQdrant, StoreBuilder},
vectorstore::{VecStoreOptions, VectorStore},
};
use log::info;
use qdrant_client::qdrant::Qdrant as QdrantClient;
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
mod shared;
use shared::*;
mod services;
use services::auth::AuthService;
use services::channels::{ChannelAdapter, VoiceAdapter, WebChannelAdapter};
use services::chart::ChartGenerator;
use services::llm::{AnthropicClient, LLMProvider, MockLLMProvider, OpenAIClient};
use services::session::SessionManager;
use services::tools::ToolManager;
use services::whatsapp::WhatsAppAdapter;
pub struct BotOrchestrator {
session_manager: SessionManager,
tool_manager: ToolManager,
llm_provider: Arc<dyn LLMProvider>,
auth_service: AuthService,
channels: HashMap<String, Arc<dyn ChannelAdapter>>,
response_channels: Arc<Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
chart_generator: Option<Arc<ChartGenerator>>,
vector_store: Option<Arc<LangChainQdrant<OpenAiEmbedder>>>,
sql_chain: Option<Arc<LLMChain>>,
}
impl BotOrchestrator {
fn new(
session_manager: SessionManager,
tool_manager: ToolManager,
llm_provider: Arc<dyn LLMProvider>,
auth_service: AuthService,
chart_generator: Option<Arc<ChartGenerator>>,
vector_store: Option<Arc<LangChainQdrant<OpenAiEmbedder>>>,
sql_chain: Option<Arc<LLMChain>>,
) -> Self {
Self {
session_manager,
tool_manager,
llm_provider,
auth_service,
channels: HashMap::new(),
response_channels: Arc::new(Mutex::new(HashMap::new())),
chart_generator,
vector_store,
sql_chain,
}
}
fn add_channel(&mut self, channel_type: &str, adapter: Arc<dyn ChannelAdapter>) {
self.channels.insert(channel_type.to_string(), adapter);
}
async fn register_response_channel(
&self,
session_id: String,
sender: mpsc::Sender<BotResponse>,
) {
self.response_channels
.lock()
.await
.insert(session_id, sender);
}
async fn set_user_answer_mode(
&self,
user_id: &str,
bot_id: &str,
mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
self.session_manager
.update_answer_mode(user_id, bot_id, mode)
.await?;
Ok(())
}
async fn process_message(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Processing message from channel: {}, user: {}",
message.channel, message.user_id
);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id)
.unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap());
let session = match self
.session_manager
.get_user_session(user_id, bot_id)
.await?
{
Some(session) => session,
None => {
self.session_manager
.create_session(user_id, bot_id, "New Conversation")
.await?
}
};
// Check if we're in tool mode and there's an active tool
if session.answer_mode == "tool" && session.current_tool.is_some() {
self.tool_manager
.provide_user_response(&message.user_id, &message.bot_id, message.content.clone())
.await?;
return Ok(());
}
self.session_manager
.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)
.await?;
// Handle different answer modes with LangChain integration
let response_content = match session.answer_mode.as_str() {
"document" => self.document_mode_handler(&message, &session).await?,
"chart" => self.chart_mode_handler(&message, &session).await?,
"database" => self.database_mode_handler(&message, &session).await?,
"tool" => self.tool_mode_handler(&message, &session).await?,
_ => self.direct_mode_handler(&message, &session).await?,
};
self.session_manager
.save_message(session.id, user_id, "assistant", &response_content, "text")
.await?;
let bot_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id,
channel: message.channel,
content: response_content,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
Ok(())
}
async fn document_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
if let Some(vector_store) = &self.vector_store {
let similar_docs = vector_store
.similarity_search(&message.content, 3, &VecStoreOptions::default())
.await?;
let mut enhanced_prompt = format!("User question: {}\n\n", message.content);
if !similar_docs.is_empty() {
enhanced_prompt.push_str("Relevant documents:\n");
for (i, doc) in similar_docs.iter().enumerate() {
enhanced_prompt.push_str(&format!("[Doc {}]: {}\n", i + 1, doc.page_content));
}
enhanced_prompt.push_str(
"\nPlease answer the user's question based on the provided documents.",
);
}
self.llm_provider
.generate(&enhanced_prompt, &serde_json::Value::Null)
.await
} else {
self.direct_mode_handler(message, session).await
}
}
async fn chart_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
if let Some(chart_generator) = &self.chart_generator {
let chart_response = chart_generator
.generate_chart(&message.content, "bar")
.await?;
// Store chart generation in history
self.session_manager
.save_message(
session.id,
session.user_id,
"system",
&format!("Generated chart for query: {}", message.content),
"chart",
)
.await?;
Ok(format!(
"Chart generated for your query. Data retrieved: {}",
chart_response.sql_query
))
} else {
// Fallback to document mode
self.document_mode_handler(message, session).await
}
}
async fn database_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
if let Some(sql_chain) = &self.sql_chain {
let input_variables = prompt_args! {
"input" => message.content,
};
let result = sql_chain.invoke(input_variables).await?;
Ok(result.to_string())
} else {
// Use LangChain SQL database chain as fallback
let db_url = std::env::var("DATABASE_URL")?;
let engine = PostgreSQLEngine::new(&db_url).await?;
let db = SQLDatabaseBuilder::new(engine).build().await?;
let llm = OpenAI::default();
let chain = langchain_rust::chain::SQLDatabaseChainBuilder::new()
.llm(llm)
.top_k(5)
.database(db)
.build()?;
let input_variables = chain.prompt_builder().query(&message.content).build();
let result = chain.invoke(input_variables).await?;
Ok(result.to_string())
}
}
async fn tool_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
// Check if we should start a tool
if message.content.to_lowercase().contains("calculator") {
if let Some(adapter) = self.channels.get(&message.channel) {
let (tx, _rx) = mpsc::channel(100);
self.register_response_channel(message.session_id.clone(), tx.clone())
.await;
let tool_manager = self.tool_manager.clone();
let user_id_str = message.user_id.clone();
let bot_id_str = message.bot_id.clone();
let session_manager = self.session_manager.clone();
tokio::spawn(async move {
tool_manager
.execute_tool("calculator", &user_id_str, &bot_id_str, session_manager, tx)
.await
.unwrap_or_else(|e| {
log::error!("Error executing tool: {}", e);
});
});
}
Ok("Starting calculator tool...".to_string())
} else {
// Fall back to normal LLM response with tool context
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
} else {
String::new()
};
let full_prompt = format!("{}{}", message.content, tools_context);
self.llm_provider
.generate(&full_prompt, &serde_json::Value::Null)
.await
}
}
async fn direct_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error>> {
// Get conversation history for context using LangChain memory
let history = self
.session_manager
.get_conversation_history(session.id, session.user_id)
.await?;
let mut memory = SimpleMemory::new();
for (role, content) in history {
match role.as_str() {
"user" => memory.add_user_message(&content),
"assistant" => memory.add_ai_message(&content),
_ => {}
}
}
// Build prompt with memory context
let mut prompt = String::new();
if let Some(chat_history) = memory.get_chat_history() {
for message in chat_history {
prompt.push_str(&format!(
"{}: {}\n",
message.message_type(),
message.content()
));
}
}
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
self.llm_provider
.generate(&prompt, &serde_json::Value::Null)
.await
}
async fn stream_response(
&self,
message: UserMessage,
mut response_tx: mpsc::Sender<BotResponse>,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Streaming response for user: {}", message.user_id);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id)
.unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap());
let session = match self
.session_manager
.get_user_session(user_id, bot_id)
.await?
{
Some(session) => session,
None => {
self.session_manager
.create_session(user_id, bot_id, "New Conversation")
.await?
}
};
if session.answer_mode == "tool" && session.current_tool.is_some() {
self.tool_manager
.provide_user_response(&message.user_id, &message.bot_id, message.content.clone())
.await?;
return Ok(());
}
self.session_manager
.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)
.await?;
// Get conversation history for streaming context
let history = self
.session_manager
.get_conversation_history(session.id, user_id)
.await?;
let mut memory = SimpleMemory::new();
for (role, content) in history {
match role.as_str() {
"user" => memory.add_user_message(&content),
"assistant" => memory.add_ai_message(&content),
_ => {}
}
}
let mut prompt = String::new();
if let Some(chat_history) = memory.get_chat_history() {
for message in chat_history {
prompt.push_str(&format!(
"{}: {}\n",
message.message_type(),
message.content()
));
}
}
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
let (stream_tx, mut stream_rx) = mpsc::channel(100);
let llm_provider = self.llm_provider.clone();
let prompt_clone = prompt.clone();
tokio::spawn(async move {
let _ = llm_provider
.generate_stream(&prompt_clone, &serde_json::Value::Null, stream_tx)
.await;
});
let mut full_response = String::new();
while let Some(chunk) = stream_rx.recv().await {
full_response.push_str(&chunk);
let bot_response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: chunk,
message_type: "text".to_string(),
stream_token: None,
is_complete: false,
};
if response_tx.send(bot_response).await.is_err() {
break;
}
}
self.session_manager
.save_message(session.id, user_id, "assistant", &full_response, "text")
.await?;
let final_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id,
channel: message.channel,
content: "".to_string(),
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
response_tx.send(final_response).await?;
Ok(())
}
async fn get_user_sessions(
&self,
user_id: Uuid,
) -> Result<Vec<UserSession>, Box<dyn std::error::Error>> {
self.session_manager.get_user_sessions(user_id).await
}
async fn get_conversation_history(
&self,
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
self.session_manager
.get_conversation_history(session_id, user_id)
.await
}
pub async fn process_message_with_tools(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Processing message with tools from user: {}",
message.user_id
);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id)
.unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap());
let session = match self
.session_manager
.get_user_session(user_id, bot_id)
.await?
{
Some(session) => session,
None => {
self.session_manager
.create_session(user_id, bot_id, "New Conversation")
.await?
}
};
self.session_manager
.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)
.await?;
// Check if we're in a tool conversation
let is_tool_waiting = self
.tool_manager
.is_tool_waiting(&message.session_id)
.await
.unwrap_or(false);
if is_tool_waiting {
// Provide input to the running tool
self.tool_manager
.provide_input(&message.session_id, &message.content)
.await?;
// Get tool output and send it as a response
if let Ok(tool_output) = self.tool_manager.get_tool_output(&message.session_id).await {
for output in tool_output {
let bot_response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: output,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
}
}
return Ok(());
}
// Normal LLM processing with tool awareness
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
} else {
String::new()
};
let full_prompt = format!("{}{}", message.content, tools_context);
// Simple tool detection (in a real system, this would be LLM-driven)
let response = if message.content.to_lowercase().contains("calculator")
|| message.content.to_lowercase().contains("calculate")
|| message.content.to_lowercase().contains("math")
{
// Start calculator tool
match self
.tool_manager
.execute_tool("calculator", &message.session_id, &message.user_id)
.await
{
Ok(tool_result) => {
// Save tool start message
self.session_manager
.save_message(
session.id,
user_id,
"assistant",
&tool_result.output,
"tool_start",
)
.await?;
tool_result.output
}
Err(e) => {
format!("I encountered an error starting the calculator: {}", e)
}
}
} else {
// Normal LLM response
self.llm_provider
.generate(&full_prompt, &serde_json::Value::Null)
.await?
};
self.session_manager
.save_message(session.id, user_id, "assistant", &response, "text")
.await?;
let bot_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id,
channel: message.channel,
content: response,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
Ok(())
}
}
struct AppState {
orchestrator: Arc<BotOrchestrator>,
web_adapter: Arc<WebChannelAdapter>,
voice_adapter: Arc<VoiceAdapter>,
whatsapp_adapter: Arc<WhatsAppAdapter>,
}
#[actix_web::get("/ws")]
async fn websocket_handler(
req: HttpRequest,
stream: web::Payload,
data: web::Data<AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
let session_id = Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<BotResponse>(100);
data.orchestrator
.register_response_channel(session_id.clone(), tx.clone())
.await;
data.web_adapter
.add_connection(session_id.clone(), tx.clone())
.await;
data.voice_adapter
.add_connection(session_id.clone(), tx.clone())
.await;
let orchestrator = data.orchestrator.clone();
let web_adapter = data.web_adapter.clone();
actix_web::rt::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
let _ = session.text(json).await;
}
}
});
actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = msg_stream.recv().await {
match msg {
Message::Text(text) => {
let user_message = UserMessage {
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
session_id: session_id.clone(),
channel: "web".to_string(),
content: text.to_string(),
message_type: "text".to_string(),
media_url: None,
timestamp: Utc::now(),
};
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
info!("Error processing message: {}", e);
}
}
Message::Close(_) => {
web_adapter.remove_connection(&session_id).await;
break;
}
_ => {}
}
}
});
Ok(res)
}
#[actix_web::get("/api/whatsapp/webhook")]
async fn whatsapp_webhook_verify(
data: web::Data<AppState>,
web::Query(params): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse> {
let mode = params.get("hub.mode").unwrap_or(&"".to_string());
let token = params.get("hub.verify_token").unwrap_or(&"".to_string());
let challenge = params.get("hub.challenge").unwrap_or(&"".to_string());
match data.whatsapp_adapter.verify_webhook(mode, token, challenge) {
Ok(challenge_response) => Ok(HttpResponse::Ok().body(challenge_response)),
Err(_) => Ok(HttpResponse::Forbidden().body("Verification failed")),
}
}
#[actix_web::post("/api/whatsapp/webhook")]
async fn whatsapp_webhook(
data: web::Data<AppState>,
payload: web::Json<services::whatsapp::WhatsAppMessage>,
) -> Result<HttpResponse> {
match data
.whatsapp_adapter
.process_incoming_message(payload.into_inner())
.await
{
Ok(user_messages) => {
for user_message in user_messages {
if let Err(e) = data.orchestrator.process_message(user_message).await {
log::error!("Error processing WhatsApp message: {}", e);
}
}
Ok(HttpResponse::Ok().body(""))
}
Err(e) => {
log::error!("Error processing WhatsApp webhook: {}", e);
Ok(HttpResponse::BadRequest().body("Invalid message"))
}
}
}
#[actix_web::post("/api/voice/start")]
async fn voice_start(
data: web::Data<AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
.get("session_id")
.and_then(|s| s.as_str())
.unwrap_or("");
let user_id = info
.get("user_id")
.and_then(|u| u.as_str())
.unwrap_or("user");
match data
.voice_adapter
.start_voice_session(session_id, user_id)
.await
{
Ok(token) => {
Ok(HttpResponse::Ok().json(serde_json::json!({"token": token, "status": "started"})))
}
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::post("/api/voice/stop")]
async fn voice_stop(
data: web::Data<AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
.get("session_id")
.and_then(|s| s.as_str())
.unwrap_or("");
match data.voice_adapter.stop_voice_session(session_id).await {
Ok(()) => Ok(HttpResponse::Ok().json(serde_json::json!({"status": "stopped"}))),
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::post("/api/sessions")]
async fn create_session(data: web::Data<AppState>) -> Result<HttpResponse> {
let session_id = Uuid::new_v4();
Ok(HttpResponse::Ok().json(serde_json::json!({
"session_id": session_id,
"title": "New Conversation",
"created_at": Utc::now()
})))
}
#[actix_web::get("/api/sessions")]
async fn get_sessions(data: web::Data<AppState>) -> Result<HttpResponse> {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match data.orchestrator.get_user_sessions(user_id).await {
Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)),
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::get("/api/sessions/{session_id}")]
async fn get_session_history(
data: web::Data<AppState>,
path: web::Path<String>,
) -> Result<HttpResponse> {
let session_id = path.into_inner();
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => {
match data
.orchestrator
.get_conversation_history(session_uuid, user_id)
.await
{
Ok(history) => Ok(HttpResponse::Ok().json(history)),
Err(e) => Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()}))),
}
}
Err(_) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"})))
}
}
}
#[actix_web::post("/api/set_mode")]
async fn set_mode_handler(
data: web::Data<AppState>,
info: web::Json<HashMap<String, String>>,
) -> Result<HttpResponse> {
let default_user = "default_user".to_string();
let default_bot = "default_bot".to_string();
let default_mode = "direct".to_string();
let user_id = info.get("user_id").unwrap_or(&default_user);
let bot_id = info.get("bot_id").unwrap_or(&default_bot);
let mode = info.get("mode").unwrap_or(&default_mode);
if let Err(e) = data
.orchestrator
.set_user_answer_mode(user_id, bot_id, mode)
.await
{
return Ok(
HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()}))
);
}
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "mode_updated"})))
}
#[actix_web::get("/")]
async fn index() -> Result<HttpResponse> {
let html = fs::read_to_string("templates/index.html")
.unwrap_or_else(|_| include_str!("../templates/index.html").to_string());
Ok(HttpResponse::Ok().content_type("text/html").body(html))
}
#[actix_web::get("/static/{filename:.*}")]
async fn static_files(req: HttpRequest) -> Result<HttpResponse> {
let filename = req.match_info().query("filename");
let path = format!("static/{}", filename);
match fs::read(&path) {
Ok(content) => {
let content_type = match filename {
f if f.ends_with(".js") => "application/javascript",
f if f.ends_with(".css") => "text/css",
f if f.ends_with(".png") => "image/png",
f if f.ends_with(".jpg") | f.ends_with(".jpeg") => "image/jpeg",
_ => "text/plain",
};
Ok(HttpResponse::Ok().content_type(content_type).body(content))
}
Err(_) => Ok(HttpResponse::NotFound().body("File not found")),
}
}

View file

@ -0,0 +1,153 @@
use async_trait::async_trait;
use livekit::{AccessToken, Room, RoomOptions, DataPacketKind};
use log::info;
use tokio::sync::{mpsc, Mutex};
use std::collections::HashMap;
use std::sync::Arc;
use chrono::Utc;
use crate::shared::{UserMessage, BotResponse};
#[async_trait]
pub trait ChannelAdapter: Send + Sync {
async fn send_message(&self, response: BotResponse) -> Result<(), Box<dyn std::error::Error>>;
}
pub struct WebChannelAdapter {
connections: Arc<Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
}
impl WebChannelAdapter {
pub fn new() -> Self {
Self {
connections: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn add_connection(&self, session_id: String, tx: mpsc::Sender<BotResponse>) {
self.connections.lock().await.insert(session_id, tx);
}
pub async fn remove_connection(&self, session_id: &str) {
self.connections.lock().await.remove(session_id);
}
}
#[async_trait]
impl ChannelAdapter for WebChannelAdapter {
async fn send_message(&self, response: BotResponse) -> Result<(), Box<dyn std::error::Error>> {
let connections = self.connections.lock().await;
if let Some(tx) = connections.get(&response.session_id) {
tx.send(response).await?;
}
Ok(())
}
}
pub struct VoiceAdapter {
livekit_url: String,
api_key: String,
api_secret: String,
rooms: Arc<Mutex<HashMap<String, Room>>>,
connections: Arc<Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
}
impl VoiceAdapter {
pub fn new(livekit_url: String, api_key: String, api_secret: String) -> Self {
Self {
livekit_url,
api_key,
api_secret,
rooms: Arc::new(Mutex::new(HashMap::new())),
connections: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn start_voice_session(&self, session_id: &str, user_id: &str) -> Result<String, Box<dyn std::error::Error>> {
let token = AccessToken::with_api_key(&self.api_key, &self.api_secret)
.with_identity(user_id)
.with_name(user_id)
.with_room_name(session_id)
.with_room_join(true)
.to_jwt()?;
let room_options = RoomOptions {
auto_subscribe: true,
..Default::default()
};
let (room, mut events) = Room::connect(&self.livekit_url, &token, room_options).await?;
self.rooms.lock().await.insert(session_id.to_string(), room.clone());
let rooms_clone = self.rooms.clone();
let connections_clone = self.connections.clone();
let session_id_clone = session_id.to_string();
tokio::spawn(async move {
while let Some(event) = events.recv().await {
match event {
livekit::prelude::RoomEvent::DataReceived(data_packet) => {
if let Ok(message) = serde_json::from_slice::<UserMessage>(&data_packet.data) {
info!("Received voice message: {}", message.content);
if let Some(tx) = connections_clone.lock().await.get(&message.session_id) {
let _ = tx.send(BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id,
channel: "voice".to_string(),
content: format!("🎤 Voice: {}", message.content),
message_type: "voice".to_string(),
stream_token: None,
is_complete: true,
}).await;
}
}
}
livekit::prelude::RoomEvent::TrackSubscribed(track, publication, participant) => {
info!("Voice track subscribed from {}", participant.identity());
}
_ => {}
}
}
rooms_clone.lock().await.remove(&session_id_clone);
});
Ok(token)
}
pub async fn stop_voice_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error>> {
if let Some(room) = self.rooms.lock().await.remove(session_id) {
room.disconnect();
}
Ok(())
}
pub async fn add_connection(&self, session_id: String, tx: mpsc::Sender<BotResponse>) {
self.connections.lock().await.insert(session_id, tx);
}
pub async fn send_voice_response(&self, session_id: &str, text: &str) -> Result<(), Box<dyn std::error::Error>> {
if let Some(room) = self.rooms.lock().await.get(session_id) {
let voice_response = serde_json::json!({
"type": "voice_response",
"text": text,
"timestamp": Utc::now()
});
room.local_participant().publish_data(
serde_json::to_vec(&voice_response)?,
DataPacketKind::Reliable,
&[],
)?;
}
Ok(())
}
}
#[async_trait]
impl ChannelAdapter for VoiceAdapter {
async fn send_message(&self, response: BotResponse) -> Result<(), Box<dyn std::error::Error>> {
info!("Sending voice response to: {}", response.user_id);
self.send_voice_response(&response.session_id, &response.content).await
}
}

View file

@ -0,0 +1,92 @@
use langchain_rust::{
chain::{Chain, SQLDatabaseChainBuilder, options::ChainCallOptions},
llm::openai::OpenAI,
tools::{postgres::PostgreSQLEngine, SQLDatabaseBuilder},
prompt::PromptTemplate,
};
pub struct ChartGenerator {
sql_chain: SQLDatabaseChainBuilder,
llm: OpenAI,
}
impl ChartGenerator {
pub async fn new(database_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
let llm = OpenAI::default();
let engine = PostgreSQLEngine::new(database_url).await?;
let db = SQLDatabaseBuilder::new(engine).build().await?;
let sql_chain = SQLDatabaseChainBuilder::new()
.llm(llm.clone())
.top_k(4)
.database(db);
Ok(Self {
sql_chain,
llm,
})
}
pub async fn generate_chart(
&self,
question: &str,
chart_type: &str
) -> Result<ChartResponse, Box<dyn std::error::Error>> {
// Step 1: Generate SQL using LangChain
let sql_result = self.generate_sql(question).await?;
// Step 2: Execute SQL and get data
let data = self.execute_sql(&sql_result).await?;
// Step 3: Generate chart configuration using LLM
let chart_config = self.generate_chart_config(&data, chart_type).await?;
// Step 4: Generate and render chart
let chart_image = self.render_chart(&chart_config).await?;
Ok(ChartResponse {
sql_query: sql_result,
data,
chart_image,
chart_config,
})
}
async fn generate_sql(&self, question: &str) -> Result<String, Box<dyn std::error::Error>> {
let chain = self.sql_chain
.clone()
.build()
.expect("Failed to build SQL chain");
let input_variables = chain.prompt_builder().query(question).build();
let result = chain.invoke(input_variables).await?;
Ok(result.to_string())
}
async fn execute_sql(&self, query: &str) -> Result<Value, Box<dyn std::error::Error>> {
// Execute the generated SQL and return structured data
// Implementation depends on your database setup
Ok(Value::Null)
}
async fn generate_chart_config(&self, data: &Value, chart_type: &str) -> Result<Value, Box<dyn std::error::Error>> {
let prompt = format!(
"Given this data: {} and chart type: {}, generate a billboard.js configuration JSON. \
Focus on creating meaningful visualizations for this business data.",
data, chart_type
);
let message = HumanMessage::new(prompt);
let result = self.llm.invoke(&[message]).await?;
serde_json::from_str(&result.generation)
.map_err(|e| e.into())
}
async fn render_chart(&self, config: &Value) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
// Use headless browser to render chart and capture as image
// This would integrate with your browser automation setup
Ok(vec![])
}
}

View file

@ -0,0 +1,97 @@
use async_trait::async_trait;
use langchain_rust::{
embedding::openai::openai_embedder::OpenAiEmbedder,
vectorstore::qdrant::{Qdrant, StoreBuilder},
vectorstore::{VectorStore, VecStoreOptions},
schemas::Document,
};
use qdrant_client::qdrant::Qdrant as QdrantClient;
use sqlx::PgPool;
use uuid::Uuid;
#[async_trait]
pub trait ContextProvider: Send + Sync {
async fn get_context(&self, session_id: Uuid, user_id: Uuid, query: &str) -> Result<String, Box<dyn std::error::Error>>;
async fn store_embedding(&self, text: &str, embedding: Vec<f32>, metadata: Value) -> Result<(), Box<dyn std::error::Error>>;
async fn search_similar(&self, embedding: Vec<f32>, limit: u32) -> Result<Vec<SearchResult>, Box<dyn std::error::Error>>;
}
pub struct LangChainContextProvider {
pool: PgPool,
vector_store: Qdrant,
embedder: OpenAiEmbedder,
}
impl LangChainContextProvider {
pub async fn new(pool: PgPool, qdrant_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
let embedder = OpenAiEmbedder::default();
let client = QdrantClient::from_url(qdrant_url).build()?;
let vector_store = StoreBuilder::new()
.embedder(embedder.clone())
.client(client)
.collection_name("conversations")
.build()
.await?;
Ok(Self {
pool,
vector_store,
embedder,
})
}
}
#[async_trait]
impl ContextProvider for LangChainContextProvider {
async fn get_context(&self, session_id: Uuid, user_id: Uuid, query: &str) -> Result<String, Box<dyn std::error::Error>> {
// Get conversation history
let history = sqlx::query(
"SELECT role, content_encrypted FROM message_history
WHERE session_id = $1 AND user_id = $2
ORDER BY message_index DESC LIMIT 5"
)
.bind(session_id)
.bind(user_id)
.fetch_all(&self.pool)
.await?;
let mut context = String::from("Conversation history:\n");
for row in history.iter().rev() {
let role: String = row.get("role");
let content: String = row.get("content_encrypted");
context.push_str(&format!("{}: {}\n", role, content));
}
// Search for similar documents using LangChain
let similar_docs = self.vector_store
.similarity_search(query, 3, &VecStoreOptions::default())
.await?;
if !similar_docs.is_empty() {
context.push_str("\nRelevant context:\n");
for doc in similar_docs {
context.push_str(&format!("- {}\n", doc.page_content));
}
}
context.push_str(&format!("\nCurrent message: {}", query));
Ok(context)
}
async fn store_embedding(&self, text: &str, embedding: Vec<f32>, metadata: Value) -> Result<(), Box<dyn std::error::Error>> {
let document = Document::new(text).with_metadata(metadata);
self.vector_store
.add_documents(&[document], &VecStoreOptions::default())
.await?;
Ok(())
}
async fn search_similar(&self, embedding: Vec<f32>, limit: u32) -> Result<Vec<SearchResult>, Box<dyn std::error::Error>> {
// LangChain handles this through the vector store interface
// This method would need adaptation to work with LangChain's search patterns
Ok(vec![])
}
}

121
src/services/llm/mod.rs Normal file
View file

@ -0,0 +1,121 @@
use async_trait::async_trait;
use futures::StreamExt;
use langchain_rust::{
language_models::llm::LLM,
llm::{claude::Claude, openai::OpenAI},
schemas::Message,
};
use serde_json::Value;
use tokio::sync::mpsc;
pub mod llm_generic;
pub mod llm_local;
pub mod llm_provider;
#[async_trait]
pub trait LLMProvider: Send + Sync {
async fn generate(
&self,
prompt: &str,
config: &Value,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
async fn generate_stream(
&self,
prompt: &str,
config: &Value,
tx: mpsc::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
// Add tool calling capability
async fn generate_with_tools(
&self,
prompt: &str,
config: &serde_json::Value,
available_tools: &[String],
tool_manager: Arc<tools::ToolManager>,
session_id: &str,
user_id: &str,
) -> Result<String, Box<dyn std::error::Error>>;
}
pub struct OpenAIClient<C>
where
C: langchain_rust::llm::Config,
{
client: OpenAI<C>,
}
impl<C> OpenAIClient<C>
where
C: langchain_rust::llm::Config,
{
pub fn new(config: C) -> Self {
let client = OpenAI::new(config);
Self { client }
}
}
#[async_trait]
impl<C> LLMProvider for OpenAIClient<C>
where
C: langchain_rust::llm::Config + Send + Sync,
{
async fn generate(
&self,
prompt: &str,
_config: &Value,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Call the underlying OpenAI client with the raw prompt string.
let result = self
.client
.invoke(prompt)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
Ok(result)
}
async fn generate_stream(
&self,
prompt: &str,
_config: &Value,
mut tx: mpsc::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Build a message vector for the OpenAI streaming API
let messages = vec![Message::new_human_message(prompt.to_string())];
let mut stream = self
.client
.stream(&messages)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
// The `content` field is accessed directly (no method).
let content = chunk.content;
if !content.is_empty() {
let _ = tx.send(content.to_string()).await;
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
}
}
}
Ok(())
}
}
pub struct AnthropicClient {
client: Claude,
}
impl AnthropicClient {
pub fn new(api_key: String) -> Self {
let client = Claude::default().with_api_key(api_key);
Self { client }
}
}

View file

@ -1,13 +1,17 @@
pub mod auth;
pub mod automation; pub mod automation;
pub mod channels;
pub mod config; pub mod config;
pub mod context;
pub mod email; pub mod email;
pub mod file; pub mod file;
pub mod keywords;
pub mod llm; pub mod llm;
pub mod llm_generic; pub mod llm_generic;
pub mod llm_local; pub mod llm_local;
pub mod llm_provider; pub mod orchestrator;
pub mod script; pub mod session;
pub mod shared;
pub mod state; pub mod state;
pub mod utils; pub mod tools;
pub mod web_automation; pub mod web_automation;
pub mod whatsapp;

175
src/services/session/mod.rs Normal file
View file

@ -0,0 +1,175 @@
use crate::services::shared::shared::UserSession;
use sqlx::Row;
use redis::{aio::Connection as ConnectionManager, AsyncCommands};
use serde_json;
use sqlx::PgPool;
use std::sync::Arc;
use uuid::Uuid;
pub struct SessionManager {
pub pool: PgPool,
pub redis: Option<Arc<ConnectionManager>>,
}
impl SessionManager {
pub fn new(pool: PgPool, redis: Option<Arc<ConnectionManager>>) -> Self {
Self { pool, redis }
}
pub async fn get_user_session(
&self,
user_id: Uuid,
bot_id: Uuid,
) -> Result<Option<UserSession>, Box<dyn std::error::Error>> {
// Try Redis cache first
if let Some(redis) = &self.redis {
let cache_key = format!("session:{}:{}", user_id, bot_id);
let mut conn = redis.clone().lock().await;
if let Ok(session_json) = conn.get::<_, String>(cache_key).await {
if let Ok(session) = serde_json::from_str::<UserSession>(&session_json) {
return Ok(Some(session));
}
}
}
// Fallback to database
let session = sqlx::query_as(
"SELECT * FROM user_sessions WHERE user_id = $1 AND bot_id = $2 ORDER BY updated_at DESC LIMIT 1"
)
.bind(user_id)
.bind(bot_id)
.fetch_optional(&self.pool)
.await?;
// Cache in Redis if found
if let Some(ref session) = session {
if let Some(redis) = &self.redis {
let cache_key = format!("session:{}:{}", user_id, bot_id);
let session_json = serde_json::to_string(session)?;
let mut conn = redis.clone().lock().await;
let _: () = conn.set_ex(cache_key, session_json, 1800).await?;
}
}
Ok(session)
}
pub async fn create_session(
&self,
user_id: Uuid,
bot_id: Uuid,
title: &str,
) -> Result<UserSession, Box<dyn std::error::Error>> {
log::info!(
"Creating new session for user: {}, bot: {}",
user_id,
bot_id
);
let session = sqlx::query_as(
"INSERT INTO user_sessions (user_id, bot_id, title) VALUES ($1, $2, $3) RETURNING *",
)
.bind(user_id)
.bind(bot_id)
.bind(title)
.fetch_one(&self.pool)
.await?;
// Cache in Redis
if let Some(redis) = &self.redis {
let cache_key = format!("session:{}:{}", user_id, bot_id);
let session_json = serde_json::to_string(&session)?;
let _: () = redis.clone().set_ex(cache_key, session_json, 1800).await?;
}
Ok(session)
}
pub async fn save_message(
&self,
session_id: Uuid,
user_id: Uuid,
role: &str,
content: &str,
message_type: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let message_count: i32 =
sqlx::query("SELECT COUNT(*) as count FROM message_history WHERE session_id = $1")
.bind(session_id)
.fetch_one(&self.pool)
.await?
.get("count");
sqlx::query(
"INSERT INTO message_history (session_id, user_id, role, content_encrypted, message_type, message_index)
VALUES ($1, $2, $3, $4, $5, $6)"
)
.bind(session_id)
.bind(user_id)
.bind(role)
.bind(content) // Note: Encryption removed for simplicity
.bind(message_type)
.bind(message_count + 1)
.execute(&self.pool)
.await?;
sqlx::query("UPDATE user_sessions SET updated_at = NOW() WHERE id = $1")
.bind(session_id)
.execute(&self.pool)
.await?;
// Invalidate session cache
if let Some(redis) = &self.redis {
let session: Option<(Uuid, Uuid)> =
sqlx::query_as("SELECT user_id, bot_id FROM user_sessions WHERE id = $1")
.bind(session_id)
.fetch_optional(&self.pool)
.await?;
if let Some((user_id, bot_id)) = session {
let cache_key = format!("session:{}:{}", user_id, bot_id);
let _: () = redis.clone().del(cache_key).await?;
}
}
Ok(())
}
pub async fn get_conversation_history(
&self,
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
let messages = sqlx::query(
"SELECT role, content_encrypted FROM message_history
WHERE session_id = $1 AND user_id = $2
ORDER BY message_index ASC",
)
.bind(session_id)
.bind(user_id)
.fetch_all(&self.pool)
.await?;
let mut history = Vec::new();
for row in messages {
let role: String = row.get("role");
let content: String = row.get("content_encrypted"); // No decryption needed
history.push((role, content));
}
Ok(history)
}
pub async fn get_user_sessions(
&self,
user_id: Uuid,
) -> Result<Vec<UserSession>, Box<dyn std::error::Error>> {
// For lists, we'll just use database
let sessions = sqlx::query_as(
"SELECT * FROM user_sessions WHERE user_id = $1 ORDER BY updated_at DESC",
)
.bind(user_id)
.fetch_all(&self.pool)
.await?;
Ok(sessions)
}
}

View file

@ -0,0 +1,3 @@
pub mod shared;
pub mod state;
pub mod utils;

View file

@ -0,0 +1,58 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct UserSession {
pub id: Uuid,
pub user_id: Uuid,
pub bot_id: Uuid,
pub title: String,
pub context_data: serde_json::Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingRequest {
pub text: String,
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingResponse {
pub embedding: Vec<f32>,
pub model: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResult {
pub text: String,
pub similarity: f32,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserMessage {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub media_url: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BotResponse {
pub bot_id: String,
pub user_id: String,
pub session_id: String,
pub channel: String,
pub content: String,
pub message_type: String,
pub stream_token: Option<String>,
pub is_complete: bool,
}

View file

@ -11,6 +11,11 @@ pub struct AppState {
pub db: Option<sqlx::PgPool>, pub db: Option<sqlx::PgPool>,
pub db_custom: Option<sqlx::PgPool>, pub db_custom: Option<sqlx::PgPool>,
pub browser_pool: Arc<BrowserPool>, pub browser_pool: Arc<BrowserPool>,
pub orchestrator: Arc<BotOrchestrator>,
pub web_adapter: Arc<WebChannelAdapter>,
pub voice_adapter: Arc<VoiceAdapter>,
pub whatsapp_adapter: Arc<WhatsAppAdapter>,
tool_api: Arc<ToolApi>, // Add this
} }
pub struct _BotState { pub struct _BotState {
pub language: String, pub language: String,

536
src/services/tools/mod.rs Normal file
View file

@ -0,0 +1,536 @@
// services/tools/mod.rs
use async_trait::async_trait;
use redis::AsyncCommands;
use rhai::Engine;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolResult {
pub success: bool,
pub output: String,
pub requires_input: bool,
pub session_id: String,
}
#[derive(Clone)]
pub struct Tool {
pub name: String,
pub description: String,
pub parameters: HashMap<String, String>,
pub script: String,
}
#[async_trait]
pub trait ToolExecutor: Send + Sync {
async fn execute(
&self,
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>>;
async fn provide_input(
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>>;
async fn get_output(&self, session_id: &str)
-> Result<Vec<String>, Box<dyn std::error::Error>>;
async fn is_waiting_for_input(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>>;
}
pub struct RedisToolExecutor {
redis_client: redis::Client,
web_adapter: Arc<dyn ChannelAdapter>,
voice_adapter: Arc<dyn ChannelAdapter>,
whatsapp_adapter: Arc<dyn ChannelAdapter>,
}
impl RedisToolExecutor {
pub fn new(
redis_url: &str,
web_adapter: Arc<dyn ChannelAdapter>,
voice_adapter: Arc<dyn ChannelAdapter>,
whatsapp_adapter: Arc<dyn ChannelAdapter>,
) -> Result<Self, Box<dyn std::error::Error>> {
let client = redis::Client::open(redis_url)?;
Ok(Self {
redis_client: client,
web_adapter,
voice_adapter,
whatsapp_adapter,
})
}
async fn send_tool_message(
&self,
session_id: &str,
user_id: &str,
channel: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id.to_string(),
session_id: session_id.to_string(),
channel: channel.to_string(),
content: message.to_string(),
message_type: "tool".to_string(),
stream_token: None,
is_complete: true,
};
match channel {
"web" => self.web_adapter.send_message(response).await,
"voice" => self.voice_adapter.send_message(response).await,
"whatsapp" => self.whatsapp_adapter.send_message(response).await,
_ => Ok(()),
}
}
fn create_rhai_engine(&self, session_id: String, user_id: String, channel: String) -> Engine {
let mut engine = Engine::new();
// Clone for TALK function
let tool_executor = Arc::new((
self.redis_client.clone(),
self.web_adapter.clone(),
self.voice_adapter.clone(),
self.whatsapp_adapter.clone(),
));
let session_id_clone = session_id.clone();
let user_id_clone = user_id.clone();
let channel_clone = channel.clone();
engine.register_fn("talk", move |message: String| {
let tool_executor = Arc::clone(&tool_executor);
let session_id = session_id_clone.clone();
let user_id = user_id_clone.clone();
let channel = channel_clone.clone();
tokio::spawn(async move {
let (redis_client, web_adapter, voice_adapter, whatsapp_adapter) = &*tool_executor;
// Send message through appropriate channel
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id.clone(),
session_id: session_id.clone(),
channel: channel.clone(),
content: message.clone(),
message_type: "tool".to_string(),
stream_token: None,
is_complete: true,
};
let result = match channel.as_str() {
"web" => web_adapter.send_message(response).await,
"voice" => voice_adapter.send_message(response).await,
"whatsapp" => whatsapp_adapter.send_message(response).await,
_ => Ok(()),
};
if let Err(e) = result {
log::error!("Failed to send tool message: {}", e);
}
// Also store in Redis for persistence
if let Ok(mut conn) = redis_client.get_async_connection().await {
let output_key = format!("tool:{}:output", session_id);
let _ = conn.lpush(&output_key, &message).await;
}
});
});
// Clone for HEAR function
let hear_executor = self.redis_client.clone();
let session_id_clone = session_id.clone();
engine.register_fn("hear", move || -> String {
let hear_executor = hear_executor.clone();
let session_id = session_id_clone.clone();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
match hear_executor.get_async_connection().await {
Ok(mut conn) => {
let input_key = format!("tool:{}:input", session_id);
let waiting_key = format!("tool:{}:waiting", session_id);
// Set waiting flag
let _ = conn.set_ex(&waiting_key, "true", 300).await;
// Wait for input
let result: Option<(String, String)> =
conn.brpop(&input_key, 30).await.ok().flatten();
// Clear waiting flag
let _ = conn.del(&waiting_key).await;
result
.map(|(_, input)| input)
.unwrap_or_else(|| "timeout".to_string())
}
Err(e) => {
log::error!("HEAR Redis error: {}", e);
"error".to_string()
}
}
})
});
engine
}
async fn cleanup_session(&self, session_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let keys = vec![
format!("tool:{}:output", session_id),
format!("tool:{}:input", session_id),
format!("tool:{}:waiting", session_id),
format!("tool:{}:active", session_id),
];
for key in keys {
let _: () = conn.del(&key).await?;
}
Ok(())
}
}
#[async_trait]
impl ToolExecutor for RedisToolExecutor {
async fn execute(
&self,
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>> {
let tool = get_tool(tool_name).ok_or_else(|| format!("Tool not found: {}", tool_name))?;
// Store session info in Redis
let mut conn = self.redis_client.get_async_connection().await?;
let session_key = format!("tool:{}:session", session_id);
let session_data = serde_json::json!({
"user_id": user_id,
"tool_name": tool_name,
"started_at": chrono::Utc::now().to_rfc3339(),
});
conn.set_ex(&session_key, session_data.to_string(), 3600)
.await?;
// Mark tool as active
let active_key = format!("tool:{}:active", session_id);
conn.set_ex(&active_key, "true", 3600).await?;
// Get channel from session (you might want to store this differently)
let channel = "web"; // Default channel, you might want to track this per session
let engine = self.create_rhai_engine(
session_id.to_string(),
user_id.to_string(),
channel.to_string(),
);
// Execute tool in background
let redis_clone = self.redis_client.clone();
let web_adapter_clone = self.web_adapter.clone();
let voice_adapter_clone = self.voice_adapter.clone();
let whatsapp_adapter_clone = self.whatsapp_adapter.clone();
let session_id_clone = session_id.to_string();
let user_id_clone = user_id.to_string();
let tool_script = tool.script.clone();
tokio::spawn(async move {
let mut engine = Engine::new();
let mut scope = Scope::new();
// Register TALK function for background execution
let redis_client = redis_clone.clone();
let web_adapter = web_adapter_clone.clone();
let voice_adapter = voice_adapter_clone.clone();
let whatsapp_adapter = whatsapp_adapter_clone.clone();
let session_id = session_id_clone.clone();
let user_id = user_id_clone.clone();
engine.register_fn("talk", move |message: String| {
let redis_client = redis_client.clone();
let web_adapter = web_adapter.clone();
let voice_adapter = voice_adapter.clone();
let whatsapp_adapter = whatsapp_adapter.clone();
let session_id = session_id.clone();
let user_id = user_id.clone();
tokio::spawn(async move {
// Determine channel from session data
let channel = "web"; // In real implementation, get from session storage
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id.clone(),
session_id: session_id.clone(),
channel: channel.to_string(),
content: message.clone(),
message_type: "tool".to_string(),
stream_token: None,
is_complete: true,
};
// Send through appropriate channel
let send_result = match channel {
"web" => web_adapter.send_message(response).await,
"voice" => voice_adapter.send_message(response).await,
"whatsapp" => whatsapp_adapter.send_message(response).await,
_ => Ok(()),
};
if let Err(e) = send_result {
log::error!("Failed to send tool message: {}", e);
}
// Store in Redis for backup
if let Ok(mut conn) = redis_client.get_async_connection().await {
let output_key = format!("tool:{}:output", session_id);
let _ = conn.lpush(&output_key, &message).await;
}
});
});
// Register HEAR function
let hear_redis = redis_clone.clone();
let session_id_hear = session_id.clone();
engine.register_fn("hear", move || -> String {
let hear_redis = hear_redis.clone();
let session_id = session_id_hear.clone();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
match hear_redis.get_async_connection().await {
Ok(mut conn) => {
let input_key = format!("tool:{}:input", session_id);
let waiting_key = format!("tool:{}:waiting", session_id);
let _ = conn.set_ex(&waiting_key, "true", 300).await;
let result: Option<(String, String)> =
conn.brpop(&input_key, 30).await.ok().flatten();
let _ = conn.del(&waiting_key).await;
result
.map(|(_, input)| input)
.unwrap_or_else(|| "timeout".to_string())
}
Err(_) => "error".to_string(),
}
})
});
// Execute the tool
match engine.eval_with_scope::<()>(&mut scope, &tool_script) {
Ok(_) => {
log::info!(
"Tool {} completed successfully for session {}",
tool_name,
session_id
);
// Send completion message
let completion_msg =
"🛠️ Tool execution completed. How can I help you with anything else?";
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id_clone,
session_id: session_id_clone.clone(),
channel: "web".to_string(),
content: completion_msg.to_string(),
message_type: "tool_complete".to_string(),
stream_token: None,
is_complete: true,
};
let _ = web_adapter_clone.send_message(response).await;
}
Err(e) => {
log::error!("Tool execution failed: {}", e);
let error_msg = format!("❌ Tool error: {}", e);
let response = BotResponse {
bot_id: "tool_bot".to_string(),
user_id: user_id_clone,
session_id: session_id_clone.clone(),
channel: "web".to_string(),
content: error_msg,
message_type: "tool_error".to_string(),
stream_token: None,
is_complete: true,
};
let _ = web_adapter_clone.send_message(response).await;
}
}
// Cleanup
if let Ok(mut conn) = redis_clone.get_async_connection().await {
let active_key = format!("tool:{}:active", session_id_clone);
let _ = conn.del(&active_key).await;
}
});
Ok(ToolResult {
success: true,
output: format!(
"🛠️ Starting {} tool. Please follow the tool's instructions.",
tool_name
),
requires_input: true,
session_id: session_id.to_string(),
})
}
async fn provide_input(
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let input_key = format!("tool:{}:input", session_id);
conn.lpush(&input_key, input).await?;
Ok(())
}
async fn get_output(
&self,
session_id: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let output_key = format!("tool:{}:output", session_id);
let messages: Vec<String> = conn.lrange(&output_key, 0, -1).await?;
// Clear after reading
let _: () = conn.del(&output_key).await?;
Ok(messages)
}
async fn is_waiting_for_input(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let waiting_key = format!("tool:{}:waiting", session_id);
let exists: bool = conn.exists(&waiting_key).await?;
Ok(exists)
}
}
// Tool definitions
fn get_tool(name: &str) -> Option<Tool> {
match name {
"calculator" => Some(Tool {
name: "calculator".to_string(),
description: "Perform mathematical calculations".to_string(),
parameters: HashMap::from([
("operation".to_string(), "add|subtract|multiply|divide".to_string()),
("a".to_string(), "number".to_string()),
("b".to_string(), "number".to_string()),
]),
script: r#"
// Calculator tool using TALK/HEAR pattern
let TALK = |message| {
talk(message);
};
let HEAR = || {
hear()
};
TALK("🔢 Calculator started!");
TALK("Please enter the first number:");
let a = HEAR();
TALK("Please enter the second number:");
let b = HEAR();
TALK("Choose operation: add, subtract, multiply, or divide:");
let op = HEAR();
let num_a = a.to_float();
let num_b = b.to_float();
if op == "add" {
let result = num_a + num_b;
TALK("✅ Result: " + a + " + " + b + " = " + result);
} else if op == "subtract" {
let result = num_a - num_b;
TALK("✅ Result: " + a + " - " + b + " = " + result);
} else if op == "multiply" {
let result = num_a * num_b;
TALK("✅ Result: " + a + " × " + b + " = " + result);
} else if op == "divide" {
if num_b != 0.0 {
let result = num_a / num_b;
TALK("✅ Result: " + a + " ÷ " + b + " = " + result);
} else {
TALK("❌ Error: Cannot divide by zero!");
}
} else {
TALK("❌ Error: Invalid operation. Please use: add, subtract, multiply, or divide");
}
TALK("Calculator session completed. Thank you!");
"#.to_string(),
}),
_ => None,
}
}
pub struct ToolManager {
executor: Arc<dyn ToolExecutor>,
}
impl ToolManager {
pub fn new(executor: Arc<dyn ToolExecutor>) -> Self {
Self { executor }
}
pub async fn execute_tool(
&self,
tool_name: &str,
session_id: &str,
user_id: &str,
) -> Result<ToolResult, Box<dyn std::error::Error>> {
self.executor.execute(tool_name, session_id, user_id).await
}
pub async fn provide_input(
&self,
session_id: &str,
input: &str,
) -> Result<(), Box<dyn std::error::Error>> {
self.executor.provide_input(session_id, input).await
}
pub async fn get_tool_output(
&self,
session_id: &str,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
self.executor.get_output(session_id).await
}
pub async fn is_tool_waiting(
&self,
session_id: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
self.executor.is_waiting_for_input(session_id).await
}
pub fn list_tools(&self) -> Vec<String> {
vec!["calculator".to_string()]
}
}

View file

@ -0,0 +1,176 @@
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use log::info;
use crate::shared::BotResponse;
#[derive(Debug, Deserialize)]
pub struct WhatsAppMessage {
pub entry: Vec<WhatsAppEntry>,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppEntry {
pub changes: Vec<WhatsAppChange>,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppChange {
pub value: WhatsAppValue,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppValue {
pub contacts: Option<Vec<WhatsAppContact>>,
pub messages: Option<Vec<WhatsAppMessageData>>,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppContact {
pub profile: WhatsAppProfile,
pub wa_id: String,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppProfile {
pub name: String,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppMessageData {
pub from: String,
pub id: String,
pub timestamp: String,
pub text: Option<WhatsAppText>,
pub r#type: String,
}
#[derive(Debug, Deserialize)]
pub struct WhatsAppText {
pub body: String,
}
#[derive(Serialize)]
pub struct WhatsAppResponse {
pub messaging_product: String,
pub to: String,
pub text: WhatsAppResponseText,
}
#[derive(Serialize)]
pub struct WhatsAppResponseText {
pub body: String,
}
pub struct WhatsAppAdapter {
client: Client,
access_token: String,
phone_number_id: String,
webhook_verify_token: String,
sessions: Arc<Mutex<HashMap<String, String>>>, // phone -> session_id
}
impl WhatsAppAdapter {
pub fn new(access_token: String, phone_number_id: String, webhook_verify_token: String) -> Self {
Self {
client: Client::new(),
access_token,
phone_number_id,
webhook_verify_token,
sessions: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn get_session_id(&self, phone: &str) -> String {
let sessions = self.sessions.lock().await;
sessions.get(phone).cloned().unwrap_or_else(|| {
drop(sessions);
let session_id = uuid::Uuid::new_v4().to_string();
let mut sessions = self.sessions.lock().await;
sessions.insert(phone.to_string(), session_id.clone());
session_id
})
}
pub async fn send_whatsapp_message(&self, to: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> {
let url = format!(
"https://graph.facebook.com/v17.0/{}/messages",
self.phone_number_id
);
let response_data = WhatsAppResponse {
messaging_product: "whatsapp".to_string(),
to: to.to_string(),
text: WhatsAppResponseText {
body: body.to_string(),
},
};
let response = self.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.access_token))
.json(&response_data)
.send()
.await?;
if response.status().is_success() {
info!("WhatsApp message sent to {}", to);
} else {
let error_text = response.text().await?;
log::error!("Failed to send WhatsApp message: {}", error_text);
}
Ok(())
}
pub async fn process_incoming_message(&self, message: WhatsAppMessage) -> Result<Vec<crate::shared::UserMessage>, Box<dyn std::error::Error>> {
let mut user_messages = Vec::new();
for entry in message.entry {
for change in entry.changes {
if let Some(messages) = change.value.messages {
for msg in messages {
if let Some(text) = msg.text {
let session_id = self.get_session_id(&msg.from).await;
let user_message = crate::shared::UserMessage {
bot_id: "default_bot".to_string(),
user_id: msg.from.clone(),
session_id: session_id.clone(),
channel: "whatsapp".to_string(),
content: text.body,
message_type: msg.r#type,
media_url: None,
timestamp: chrono::Utc::now(),
};
user_messages.push(user_message);
}
}
}
}
}
Ok(user_messages)
}
pub fn verify_webhook(&self, mode: &str, token: &str, challenge: &str) -> Result<String, Box<dyn std::error::Error>> {
if mode == "subscribe" && token == self.webhook_verify_token {
Ok(challenge.to_string())
} else {
Err("Invalid verification".into())
}
}
}
#[async_trait]
impl super::channels::ChannelAdapter for WhatsAppAdapter {
async fn send_message(&self, response: BotResponse) -> Result<(), Box<dyn std::error::Error>> {
info!("Sending WhatsApp response to: {}", response.user_id);
self.send_whatsapp_message(&response.user_id, &response.content).await
}
}

484
static/index.html Normal file
View file

@ -0,0 +1,484 @@
<!doctype html>
<html>
<head>
<title>General Bots - ChatGPT Clone</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family:
-apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
sans-serif;
background: #343541;
color: white;
height: 100vh;
display: flex;
}
.sidebar {
width: 260px;
background: #202123;
padding: 10px;
display: flex;
flex-direction: column;
}
.new-chat {
background: transparent;
border: 1px solid #4d4d4f;
color: white;
padding: 12px;
border-radius: 6px;
margin-bottom: 10px;
cursor: pointer;
}
.voice-toggle {
background: #19c37d;
border: 1px solid #19c37d;
color: white;
padding: 12px;
border-radius: 6px;
margin-bottom: 10px;
cursor: pointer;
}
.voice-toggle.recording {
background: #ef4444;
border: 1px solid #ef4444;
}
.history {
flex: 1;
overflow-y: auto;
}
.history-item {
padding: 12px;
border-radius: 6px;
margin-bottom: 5px;
cursor: pointer;
}
.history-item:hover {
background: #2a2b32;
}
.main {
flex: 1;
display: flex;
flex-direction: column;
}
.messages {
flex: 1;
overflow-y: auto;
padding: 20px;
}
.message {
max-width: 800px;
margin: 0 auto 20px;
line-height: 1.5;
}
.user-message {
color: #d1d5db;
}
.assistant-message {
color: #ececf1;
}
.voice-message {
color: #19c37d;
font-style: italic;
}
.input-area {
max-width: 800px;
margin: 0 auto 20px;
position: relative;
}
.input-area input {
width: 100%;
background: #40414f;
border: none;
padding: 12px 45px 12px 15px;
border-radius: 12px;
color: white;
font-size: 16px;
}
.input-area button {
position: absolute;
right: 5px;
top: 5px;
background: #19c37d;
border: none;
padding: 8px 12px;
border-radius: 6px;
color: white;
cursor: pointer;
}
.voice-status {
text-align: center;
margin: 10px 0;
color: #19c37d;
}
.pulse {
animation: pulse 2s infinite;
}
@keyframes pulse {
0% {
opacity: 1;
}
50% {
opacity: 0.5;
}
100% {
opacity: 1;
}
}
</style>
</head>
<body>
<div class="sidebar">
<button class="new-chat" onclick="createNewSession()">
+ New chat
</button>
<button
class="voice-toggle"
id="voiceToggle"
onclick="toggleVoiceMode()"
>
🎤 Voice Mode
</button>
<div class="history" id="history"></div>
</div>
<div class="main">
<div class="voice-status" id="voiceStatus" style="display: none">
<div class="pulse">🎤 Listening... Speak now</div>
</div>
<div class="messages" id="messages"></div>
<div class="input-area">
<input
type="text"
id="messageInput"
placeholder="Type your message or use voice..."
onkeypress="handleKeyPress(event)"
/>
<button onclick="sendMessage()">Send</button>
</div>
</div>
<script src="https://unpkg.com/livekit-client@latest/dist/livekit-client.js"></script>
<script>
let ws = null;
let currentSessionId = null;
let isStreaming = false;
let voiceRoom = null;
let isVoiceMode = false;
let mediaRecorder = null;
let audioChunks = [];
async function loadSessions() {
const response = await fetch("/api/sessions");
const sessions = await response.json();
const history = document.getElementById("history");
history.innerHTML = "";
sessions.forEach((session) => {
const div = document.createElement("div");
div.className = "history-item";
div.textContent = session.title;
div.onclick = () => switchSession(session.id);
history.appendChild(div);
});
}
async function createNewSession() {
const response = await fetch("/api/sessions", {
method: "POST",
});
const session = await response.json();
currentSessionId = session.session_id;
connectWebSocket();
loadSessions();
document.getElementById("messages").innerHTML = "";
if (isVoiceMode) {
await startVoiceSession();
}
}
function switchSession(sessionId) {
currentSessionId = sessionId;
loadSessionHistory(sessionId);
connectWebSocket();
if (isVoiceMode) {
startVoiceSession();
}
}
async function loadSessionHistory(sessionId) {
const response = await fetch("/api/sessions/" + sessionId);
const history = await response.json();
const messages = document.getElementById("messages");
messages.innerHTML = "";
history.forEach(([role, content]) => {
const className =
role === "user"
? "user-message"
: role === "assistant"
? "assistant-message"
: "voice-message";
addMessage(
role === "user" ? "You" : "Assistant",
content,
className,
);
});
}
function connectWebSocket() {
if (ws) ws.close();
ws = new WebSocket("ws://" + window.location.host + "/ws");
ws.onmessage = function (event) {
const response = JSON.parse(event.data);
if (!response.is_complete) {
if (!isStreaming) {
isStreaming = true;
addMessage(
"Assistant",
response.content,
"assistant-message",
true,
);
} else {
updateLastMessage(response.content);
}
} else {
isStreaming = false;
}
};
ws.onopen = function () {
console.log("Connected to WebSocket");
};
}
function addMessage(
sender,
content,
className,
isStreaming = false,
) {
const messages = document.getElementById("messages");
const messageDiv = document.createElement("div");
messageDiv.className = `message ${className}`;
messageDiv.id = isStreaming ? "streaming-message" : null;
messageDiv.innerHTML = `<strong>${sender}:</strong> ${content}`;
messages.appendChild(messageDiv);
messages.scrollTop = messages.scrollHeight;
}
function updateLastMessage(content) {
const lastMessage =
document.getElementById("streaming-message");
if (lastMessage) {
lastMessage.innerHTML = `<strong>Assistant:</strong> ${lastMessage.textContent.replace("Assistant:", "").trim() + content}`;
document.getElementById("messages").scrollTop =
document.getElementById("messages").scrollHeight;
}
}
function sendMessage() {
const input = document.getElementById("messageInput");
const message = input.value.trim();
if (message && ws && ws.readyState === WebSocket.OPEN) {
addMessage("You", message, "user-message");
ws.send(message);
input.value = "";
}
}
function handleKeyPress(event) {
if (event.key === "Enter") {
sendMessage();
}
}
async function toggleVoiceMode() {
isVoiceMode = !isVoiceMode;
const voiceToggle = document.getElementById("voiceToggle");
const voiceStatus = document.getElementById("voiceStatus");
if (isVoiceMode) {
voiceToggle.textContent = "🔴 Stop Voice";
voiceToggle.classList.add("recording");
voiceStatus.style.display = "block";
await startVoiceSession();
} else {
voiceToggle.textContent = "🎤 Voice Mode";
voiceToggle.classList.remove("recording");
voiceStatus.style.display = "none";
await stopVoiceSession();
}
}
async function startVoiceSession() {
if (!currentSessionId) return;
try {
const response = await fetch("/api/voice/start", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
session_id: currentSessionId,
user_id: "user_" + currentSessionId,
}),
});
const data = await response.json();
if (data.token) {
await connectToVoiceRoom(data.token);
startVoiceRecording();
}
} catch (error) {
console.error("Failed to start voice session:", error);
}
}
async function stopVoiceSession() {
if (!currentSessionId) return;
try {
await fetch("/api/voice/stop", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
session_id: currentSessionId,
}),
});
if (voiceRoom) {
voiceRoom.disconnect();
voiceRoom = null;
}
if (mediaRecorder && mediaRecorder.state === "recording") {
mediaRecorder.stop();
}
} catch (error) {
console.error("Failed to stop voice session:", error);
}
}
async function connectToVoiceRoom(token) {
try {
const room = new LiveKitClient.Room();
await room.connect("ws://localhost:7880", token);
voiceRoom = room;
room.on("dataReceived", (data) => {
const decoder = new TextDecoder();
const message = decoder.decode(data);
try {
const parsed = JSON.parse(message);
if (parsed.type === "voice_response") {
addMessage(
"Assistant",
parsed.text,
"assistant-message",
);
}
} catch (e) {
console.log("Voice data:", message);
}
});
const localTracks = await LiveKitClient.createLocalTracks({
audio: true,
video: false,
});
for (const track of localTracks) {
await room.localParticipant.publishTrack(track);
}
} catch (error) {
console.error("Failed to connect to voice room:", error);
}
}
function startVoiceRecording() {
if (!navigator.mediaDevices) {
console.log("Media devices not supported");
return;
}
navigator.mediaDevices
.getUserMedia({ audio: true })
.then((stream) => {
mediaRecorder = new MediaRecorder(stream);
audioChunks = [];
mediaRecorder.ondataavailable = (event) => {
audioChunks.push(event.data);
};
mediaRecorder.onstop = () => {
const audioBlob = new Blob(audioChunks, {
type: "audio/wav",
});
simulateVoiceTranscription();
};
mediaRecorder.start();
setTimeout(() => {
if (
mediaRecorder &&
mediaRecorder.state === "recording"
) {
mediaRecorder.stop();
setTimeout(() => {
if (isVoiceMode) {
startVoiceRecording();
}
}, 1000);
}
}, 5000);
})
.catch((error) => {
console.error("Error accessing microphone:", error);
});
}
function simulateVoiceTranscription() {
const phrases = [
"Hello, how can I help you today?",
"I understand what you're saying",
"That's an interesting point",
"Let me think about that",
"I can assist you with that",
"What would you like to know?",
"That sounds great",
"I'm listening to your voice",
];
const randomPhrase =
phrases[Math.floor(Math.random() * phrases.length)];
if (voiceRoom) {
const message = {
type: "voice_input",
content: randomPhrase,
timestamp: new Date().toISOString(),
};
voiceRoom.localParticipant.publishData(
new TextEncoder().encode(JSON.stringify(message)),
LiveKitClient.DataPacketKind.RELIABLE,
);
}
addMessage("You", `🎤 ${randomPhrase}`, "voice-message");
}
createNewSession();
</script>
</body>
</html>

View file