- Compiling again.

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-12 11:44:35 -03:00
parent 4c3bf0f029
commit fa9f163971
9 changed files with 878 additions and 121 deletions

1
Cargo.lock generated
View file

@ -1726,6 +1726,7 @@ dependencies = [
"downcast-rs",
"itoa",
"pq-sys",
"serde_json",
"uuid",
]

View file

@ -27,7 +27,7 @@ argon2 = "0.5"
base64 = "0.22"
bytes = "1.8"
chrono = { version = "0.4", features = ["serde"] }
diesel = { version = "2.1", features = ["postgres", "uuid", "chrono"] }
diesel = { version = "2.1", features = ["postgres", "uuid", "chrono", "serde_json"] }
dotenvy = "0.15"
downloader = "0.2"
env_logger = "0.11"

View file

@ -1,5 +1,6 @@
MORE RULES:
- Return only the modified files as a single `.sh` script using `cat`, so the - code can be restored directly.
- Return *only the modified* files as a single `.sh` script using `cat`, so the code can be restored directly.
- NEVER return a untouched file in output. Just files that need to be updated.
- You MUST return exactly this example format:
```sh
#!/bin/bash

View file

@ -9,8 +9,8 @@ echo "Consolidated LLM Context" > "$OUTPUT_FILE"
prompts=(
"../../prompts/dev/shared.md"
"../../Cargo.toml"
"../../prompts/dev/fix.md"
#"../../prompts/dev/generation.md"
#"../../prompts/dev/fix.md"
"../../prompts/dev/generation.md"
)
for file in "${prompts[@]}"; do
@ -23,22 +23,21 @@ dirs=(
#"automation"
#"basic"
"bot"
#"channels"
"channels"
#"config"
#"context"
"context"
#"email"
#"file"
#"llm"
"llm"
#"llm_legacy"
#"org"
"session"
#"shared"
"shared"
#"tests"
#"tools"
"tools"
#"web_automation"
#"whatsapp"
)
dirs=() # disabled.
for dir in "${dirs[@]}"; do
find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do
cat "$file" >> "$OUTPUT_FILE"
@ -48,7 +47,12 @@ done
# Also append the specific files you mentioned
cat "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE"
cat "$PROJECT_ROOT/src/basic/keywords/hear_talk.rs" >> "$OUTPUT_FILE"
echo "This BASIC file will run as soon as the conversation is created. " >> "$OUTPUT_FILE"
cat "$PROJECT_ROOT/templates/annoucements.gbai/annoucements.gbdialog/start.bas" >> "$OUTPUT_FILE"
echo "" >> "$OUTPUT_FILE"
cargo build --message-format=short 2>&1 | grep -E 'error' >> "$OUTPUT_FILE"
# cargo build --message-format=short 2>&1 | grep -E 'error' >> "$OUTPUT_FILE"

View file

@ -1,94 +1,714 @@
use crate::session::SessionManager;
use actix_web::{get, post, web, HttpResponse, Responder};
use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage;
use chrono::Utc;
use log::info;
use serde_json;
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
pub struct BotOrchestrator {}
use crate::auth::AuthService;
use crate::channels::ChannelAdapter;
use crate::llm::LLMProvider;
use crate::session::SessionManager;
use crate::shared::models::{BotResponse, UserMessage, UserSession};
use crate::tools::ToolManager;
pub struct BotOrchestrator {
pub session_manager: Arc<Mutex<SessionManager>>,
tool_manager: Arc<ToolManager>,
llm_provider: Arc<dyn LLMProvider>,
auth_service: AuthService,
pub channels: HashMap<String, Arc<dyn ChannelAdapter>>,
response_channels: Arc<Mutex<HashMap<String, mpsc::Sender<BotResponse>>>>,
}
impl BotOrchestrator {
pub fn new<A, B, C, D>(_a: A, _b: B, _c: C, _d: D) -> Self {
info!("BotOrchestrator initialized");
BotOrchestrator {}
pub fn new(
session_manager: SessionManager,
tool_manager: ToolManager,
llm_provider: Arc<dyn LLMProvider>,
auth_service: AuthService,
) -> Self {
Self {
session_manager: Arc::new(Mutex::new(session_manager)),
tool_manager: Arc::new(tool_manager),
llm_provider,
auth_service,
channels: HashMap::new(),
response_channels: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn handle_user_input(
&self,
session_id: Uuid,
user_input: &str,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
let mut session_manager = self.session_manager.lock().await;
session_manager.provide_input(session_id, user_input.to_string())?;
Ok(None)
}
pub async fn is_waiting_for_input(&self, session_id: Uuid) -> bool {
let session_manager = self.session_manager.lock().await;
session_manager.is_waiting_for_input(&session_id)
}
pub fn add_channel(&mut self, channel_type: &str, adapter: Arc<dyn ChannelAdapter>) {
self.channels.insert(channel_type.to_string(), adapter);
}
pub async fn register_response_channel(
&self,
session_id: String,
sender: mpsc::Sender<BotResponse>,
) {
self.response_channels
.lock()
.await
.insert(session_id, sender);
}
pub async fn set_user_answer_mode(
&self,
user_id: &str,
bot_id: &str,
mode: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut session_manager = self.session_manager.lock().await;
session_manager.update_answer_mode(user_id, bot_id, mode)?;
Ok(())
}
pub async fn process_message(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Processing message from channel: {}, user: {}",
message.channel, message.user_id
);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id)
.unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap());
let session = {
let mut session_manager = self.session_manager.lock().await;
match session_manager.get_user_session(user_id, bot_id)? {
Some(session) => session,
None => session_manager.create_session(user_id, bot_id, "New Conversation")?,
}
};
if self.is_waiting_for_input(session.id).await {
if let Some(variable_name) =
self.handle_user_input(session.id, &message.content).await?
{
info!(
"Stored user input in variable '{}' for session {}",
variable_name, session.id
);
if let Some(adapter) = self.channels.get(&message.channel) {
let ack_response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: format!("Input stored in '{}'", variable_name),
message_type: "system".to_string(),
stream_token: None,
is_complete: true,
};
adapter.send_message(ack_response).await?;
}
return Ok(());
}
}
if session.answer_mode == "tool" && session.current_tool.is_some() {
self.tool_manager.provide_user_response(
&message.user_id,
&message.bot_id,
message.content.clone(),
)?;
return Ok(());
}
{
let mut session_manager = self.session_manager.lock().await;
session_manager.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)?;
}
let response_content = self.direct_mode_handler(&message, &session).await?;
{
let mut session_manager = self.session_manager.lock().await;
session_manager.save_message(
session.id,
user_id,
"assistant",
&response_content,
"text",
)?;
}
let bot_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: response_content,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
Ok(())
}
async fn direct_mode_handler(
&self,
message: &UserMessage,
session: &UserSession,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
// Retrieve conversation history while holding a mutable lock on the session manager.
let history = {
let mut session_manager = self.session_manager.lock().await;
session_manager.get_conversation_history(session.id, session.user_id)?
};
// Build the prompt from the conversation history.
let mut prompt = String::new();
for (role, content) in history {
prompt.push_str(&format!("{}: {}\n", role, content));
}
prompt.push_str(&format!("User: {}\nAssistant:", message.content));
// Generate the assistant's response using the LLM provider.
self.llm_provider
.generate(&prompt, &serde_json::Value::Null)
.await
}
pub async fn stream_response(
&self,
message: UserMessage,
response_tx: mpsc::Sender<BotResponse>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Streaming response for user: {}", message.user_id);
// Parse identifiers, falling back to safe defaults.
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id).unwrap_or_else(|_| Uuid::nil());
// Retrieve an existing session or create a new one.
let session = {
let mut sm = self.session_manager.lock().await;
match sm.get_user_session(user_id, bot_id)? {
Some(sess) => sess,
None => sm.create_session(user_id, bot_id, "New Conversation")?,
}
};
// If the session is awaiting tool input, forward the user's answer to the tool manager.
if session.answer_mode == "tool" && session.current_tool.is_some() {
self.tool_manager.provide_user_response(
&message.user_id,
&message.bot_id,
message.content.clone(),
)?;
return Ok(());
}
// Persist the incoming user message.
{
let mut sm = self.session_manager.lock().await;
sm.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)?;
}
// Build the prompt from the conversation history.
let prompt = {
let mut sm = self.session_manager.lock().await;
let history = sm.get_conversation_history(session.id, user_id)?;
let mut p = String::new();
for (role, content) in history {
p.push_str(&format!("{}: {}\n", role, content));
}
p.push_str(&format!("User: {}\nAssistant:", message.content));
p
};
// Set up a channel for the streaming LLM output.
let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100);
let llm = self.llm_provider.clone();
// Spawn the LLM streaming task.
tokio::spawn(async move {
if let Err(e) = llm
.generate_stream(&prompt, &serde_json::Value::Null, stream_tx)
.await
{
log::error!("LLM streaming error: {}", e);
}
});
// Forward each chunk to the client as it arrives.
let mut full_response = String::new();
while let Some(chunk) = stream_rx.recv().await {
full_response.push_str(&chunk);
let partial = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: chunk,
message_type: "text".to_string(),
stream_token: None,
is_complete: false,
};
if response_tx.send(partial).await.is_err() {
// Receiver has been dropped; stop streaming.
break;
}
}
// Save the complete assistant reply.
{
let mut sm = self.session_manager.lock().await;
sm.save_message(session.id, user_id, "assistant", &full_response, "text")?;
}
// Notify the client that the stream is finished.
let final_msg = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id,
channel: message.channel,
content: String::new(),
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
response_tx.send(final_msg).await?;
Ok(())
}
pub async fn get_user_sessions(
&self,
user_id: Uuid,
) -> Result<Vec<UserSession>, Box<dyn std::error::Error + Send + Sync>> {
let mut session_manager = self.session_manager.lock().await;
session_manager.get_user_sessions(user_id)
}
pub async fn get_conversation_history(
&self,
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
let mut session_manager = self.session_manager.lock().await;
session_manager.get_conversation_history(session_id, user_id)
}
pub async fn process_message_with_tools(
&self,
message: UserMessage,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Processing message with tools from user: {}",
message.user_id
);
let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4());
let bot_id = Uuid::parse_str(&message.bot_id)
.unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap());
let session = {
let mut session_manager = self.session_manager.lock().await;
match session_manager.get_user_session(user_id, bot_id)? {
Some(session) => session,
None => session_manager.create_session(user_id, bot_id, "New Conversation")?,
}
};
{
let mut session_manager = self.session_manager.lock().await;
session_manager.save_message(
session.id,
user_id,
"user",
&message.content,
&message.message_type,
)?;
}
let is_tool_waiting = self
.tool_manager
.is_tool_waiting(&message.session_id)
.await
.unwrap_or(false);
if is_tool_waiting {
self.tool_manager
.provide_input(&message.session_id, &message.content)
.await?;
if let Ok(tool_output) = self.tool_manager.get_tool_output(&message.session_id).await {
for output in tool_output {
let bot_response = BotResponse {
bot_id: message.bot_id.clone(),
user_id: message.user_id.clone(),
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: output,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
}
}
return Ok(());
}
let response = if message.content.to_lowercase().contains("calculator")
|| message.content.to_lowercase().contains("calculate")
|| message.content.to_lowercase().contains("math")
{
match self
.tool_manager
.execute_tool("calculator", &message.session_id, &message.user_id)
.await
{
Ok(tool_result) => {
let mut session_manager = self.session_manager.lock().await;
session_manager.save_message(
session.id,
user_id,
"assistant",
&tool_result.output,
"tool_start",
)?;
tool_result.output
}
Err(e) => {
format!("I encountered an error starting the calculator: {}", e)
}
}
} else {
let available_tools = self.tool_manager.list_tools();
let tools_context = if !available_tools.is_empty() {
format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", "))
} else {
String::new()
};
let full_prompt = format!("{}{}", message.content, tools_context);
self.llm_provider
.generate(&full_prompt, &serde_json::Value::Null)
.await?
};
{
let mut session_manager = self.session_manager.lock().await;
session_manager.save_message(session.id, user_id, "assistant", &response, "text")?;
}
let bot_response = BotResponse {
bot_id: message.bot_id,
user_id: message.user_id,
session_id: message.session_id.clone(),
channel: message.channel.clone(),
content: response,
message_type: "text".to_string(),
stream_token: None,
is_complete: true,
};
if let Some(adapter) = self.channels.get(&message.channel) {
adapter.send_message(bot_response).await?;
}
Ok(())
}
}
#[get("/")]
pub async fn index() -> impl Responder {
info!("index requested");
HttpResponse::Ok().body("General Bots")
#[actix_web::get("/ws")]
async fn websocket_handler(
req: HttpRequest,
stream: web::Payload,
data: web::Data<crate::shared::state::AppState>,
) -> Result<HttpResponse, actix_web::Error> {
let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?;
let session_id = Uuid::new_v4().to_string();
let (tx, mut rx) = mpsc::channel::<BotResponse>(100);
data.orchestrator
.register_response_channel(session_id.clone(), tx.clone())
.await;
data.web_adapter
.add_connection(session_id.clone(), tx.clone())
.await;
data.voice_adapter
.add_connection(session_id.clone(), tx.clone())
.await;
let orchestrator = data.orchestrator.clone();
let web_adapter = data.web_adapter.clone();
actix_web::rt::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
let _ = session.text(json).await;
}
}
});
actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = msg_stream.recv().await {
match msg {
WsMessage::Text(text) => {
let user_message = UserMessage {
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
session_id: session_id.clone(),
channel: "web".to_string(),
content: text.to_string(),
message_type: "text".to_string(),
media_url: None,
timestamp: Utc::now(),
};
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
info!("Error processing message: {}", e);
}
}
WsMessage::Close(_) => {
web_adapter.remove_connection(&session_id).await;
break;
}
_ => {}
}
}
});
Ok(res)
}
#[get("/static")]
pub async fn static_files() -> impl Responder {
info!("static_files requested");
HttpResponse::Ok().body("static")
}
#[actix_web::get("/api/whatsapp/webhook")]
async fn whatsapp_webhook_verify(
data: web::Data<crate::shared::state::AppState>,
web::Query(params): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse> {
let empty = String::new();
let mode = params.get("hub.mode").unwrap_or(&empty);
let token = params.get("hub.verify_token").unwrap_or(&empty);
let challenge = params.get("hub.challenge").unwrap_or(&empty);
#[post("/voice/start")]
pub async fn voice_start() -> impl Responder {
info!("voice_start requested");
HttpResponse::Ok().body("voice started")
}
#[post("/voice/stop")]
pub async fn voice_stop() -> impl Responder {
info!("voice_stop requested");
HttpResponse::Ok().body("voice stopped")
}
#[post("/ws")]
pub async fn websocket_handler() -> impl Responder {
info!("websocket_handler requested");
HttpResponse::NotImplemented().finish()
}
#[post("/whatsapp/webhook")]
pub async fn whatsapp_webhook() -> impl Responder {
info!("whatsapp_webhook called");
HttpResponse::Ok().finish()
}
#[get("/whatsapp/verify")]
pub async fn whatsapp_webhook_verify() -> impl Responder {
info!("whatsapp_webhook_verify called");
HttpResponse::Ok().finish()
}
#[post("/session/create")]
pub async fn create_session(data: web::Data<SessionManagerWrapper>) -> impl Responder {
let mut mgr = data.0.lock().unwrap();
let id = mgr.create_session();
info!("create_session -> {}", id);
HttpResponse::Ok().body(id.to_string())
}
#[get("/sessions")]
pub async fn get_sessions(data: web::Data<SessionManagerWrapper>) -> impl Responder {
let mgr = data.0.lock().unwrap();
let list = mgr.list_sessions();
HttpResponse::Ok().json(list)
}
#[get("/session/{id}/history")]
pub async fn get_session_history(
path: web::Path<Uuid>,
data: web::Data<SessionManagerWrapper>,
) -> impl Responder {
let id = path.into_inner();
let mgr = data.0.lock().unwrap();
if let Some(sess) = mgr.get_session(&id) {
HttpResponse::Ok().json(sess)
} else {
HttpResponse::NotFound().finish()
match data.whatsapp_adapter.verify_webhook(mode, token, challenge) {
Ok(challenge_response) => Ok(HttpResponse::Ok().body(challenge_response)),
Err(_) => Ok(HttpResponse::Forbidden().body("Verification failed")),
}
}
#[post("/session/{id}/mode")]
pub async fn set_mode_handler(path: web::Path<Uuid>) -> impl Responder {
let id = path.into_inner();
info!("set_mode_handler called for {}", id);
HttpResponse::Ok().finish()
#[actix_web::post("/api/whatsapp/webhook")]
async fn whatsapp_webhook(
data: web::Data<crate::shared::state::AppState>,
payload: web::Json<crate::whatsapp::WhatsAppMessage>,
) -> Result<HttpResponse> {
match data
.whatsapp_adapter
.process_incoming_message(payload.into_inner())
.await
{
Ok(user_messages) => {
for user_message in user_messages {
if let Err(e) = data.orchestrator.process_message(user_message).await {
log::error!("Error processing WhatsApp message: {}", e);
}
}
Ok(HttpResponse::Ok().body(""))
}
Err(e) => {
log::error!("Error processing WhatsApp webhook: {}", e);
Ok(HttpResponse::BadRequest().body("Invalid message"))
}
}
}
use std::sync::{Arc, Mutex};
pub struct SessionManagerWrapper(pub Arc<Mutex<SessionManager>>);
#[actix_web::post("/api/voice/start")]
async fn voice_start(
data: web::Data<crate::shared::state::AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
.get("session_id")
.and_then(|s| s.as_str())
.unwrap_or("");
let user_id = info
.get("user_id")
.and_then(|u| u.as_str())
.unwrap_or("user");
match data
.voice_adapter
.start_voice_session(session_id, user_id)
.await
{
Ok(token) => {
Ok(HttpResponse::Ok().json(serde_json::json!({"token": token, "status": "started"})))
}
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::post("/api/voice/stop")]
async fn voice_stop(
data: web::Data<crate::shared::state::AppState>,
info: web::Json<serde_json::Value>,
) -> Result<HttpResponse> {
let session_id = info
.get("session_id")
.and_then(|s| s.as_str())
.unwrap_or("");
match data.voice_adapter.stop_voice_session(session_id).await {
Ok(()) => Ok(HttpResponse::Ok().json(serde_json::json!({"status": "stopped"}))),
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::post("/api/sessions")]
async fn create_session(_data: web::Data<crate::shared::state::AppState>) -> Result<HttpResponse> {
let session_id = Uuid::new_v4();
Ok(HttpResponse::Ok().json(serde_json::json!({
"session_id": session_id,
"title": "New Conversation",
"created_at": Utc::now()
})))
}
#[actix_web::get("/api/sessions")]
async fn get_sessions(data: web::Data<crate::shared::state::AppState>) -> Result<HttpResponse> {
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match data.orchestrator.get_user_sessions(user_id).await {
Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)),
Err(e) => {
Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()})))
}
}
}
#[actix_web::get("/api/sessions/{session_id}")]
async fn get_session_history(
data: web::Data<crate::shared::state::AppState>,
path: web::Path<String>,
) -> Result<HttpResponse> {
let session_id = path.into_inner();
let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
match Uuid::parse_str(&session_id) {
Ok(session_uuid) => match data
.orchestrator
.get_conversation_history(session_uuid, user_id)
.await
{
Ok(history) => Ok(HttpResponse::Ok().json(history)),
Err(e) => Ok(HttpResponse::InternalServerError()
.json(serde_json::json!({"error": e.to_string()}))),
},
Err(_) => {
Ok(HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"})))
}
}
}
#[actix_web::post("/api/set_mode")]
async fn set_mode_handler(
data: web::Data<crate::shared::state::AppState>,
info: web::Json<HashMap<String, String>>,
) -> Result<HttpResponse> {
let default_user = "default_user".to_string();
let default_bot = "default_bot".to_string();
let default_mode = "direct".to_string();
let user_id = info.get("user_id").unwrap_or(&default_user);
let bot_id = info.get("bot_id").unwrap_or(&default_bot);
let mode = info.get("mode").unwrap_or(&default_mode);
if let Err(e) = data
.orchestrator
.set_user_answer_mode(user_id, bot_id, mode)
.await
{
return Ok(
HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()}))
);
}
Ok(HttpResponse::Ok().json(serde_json::json!({"status": "mode_updated"})))
}
#[actix_web::get("/")]
async fn index() -> Result<HttpResponse> {
let html = fs::read_to_string("templates/index.html")
.unwrap_or_else(|_| include_str!("../../static/index.html").to_string());
Ok(HttpResponse::Ok().content_type("text/html").body(html))
}
#[actix_web::get("/static/{filename:.*}")]
async fn static_files(req: HttpRequest) -> Result<HttpResponse> {
let filename = req.match_info().query("filename");
let path = format!("static/{}", filename);
match fs::read(&path) {
Ok(content) => {
let content_type = match filename {
f if f.ends_with(".js") => "application/javascript",
f if f.ends_with(".css") => "text/css",
f if f.ends_with(".png") => "image/png",
f if f.ends_with(".jpg") | f.ends_with(".jpeg") => "image/jpeg",
_ => "text/plain",
};
Ok(HttpResponse::Ok().content_type(content_type).body(content))
}
Err(_) => Ok(HttpResponse::NotFound().body("File not found")),
}
}

View file

@ -1,29 +1,36 @@
use chrono::Utc;
use diesel::prelude::*;
use diesel::PgConnection;
use log::info;
use redis::Client;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use uuid::Uuid;
use crate::shared::models::UserSession;
#[derive(Clone, Serialize, Deserialize)]
pub struct UserSession {
pub struct SessionData {
pub id: Uuid,
pub user_id: Option<Uuid>,
pub data: String,
}
pub struct SessionManager {
sessions: HashMap<Uuid, UserSession>,
conn: PgConnection,
sessions: HashMap<Uuid, SessionData>,
waiting_for_input: HashSet<Uuid>,
redis: Option<Arc<Client>>,
}
impl SessionManager {
pub fn new(_conn: PgConnection, redis_client: Option<Arc<Client>>) -> Self {
pub fn new(conn: PgConnection, redis_client: Option<Arc<Client>>) -> Self {
info!("Initializing SessionManager");
SessionManager {
conn,
sessions: HashMap::new(),
waiting_for_input: HashSet::new(),
redis: redis_client,
@ -42,7 +49,7 @@ impl SessionManager {
if let Some(sess) = self.sessions.get_mut(&session_id) {
sess.data = input;
} else {
let sess = UserSession {
let sess = SessionData {
id: session_id,
user_id: None,
data: input,
@ -57,28 +64,148 @@ impl SessionManager {
self.waiting_for_input.contains(session_id)
}
pub fn create_session(&mut self) -> Uuid {
let id = Uuid::new_v4();
let sess = UserSession {
id,
user_id: None,
data: String::new(),
};
self.sessions.insert(id, sess);
info!("Created session {}", id);
id
}
pub fn mark_waiting(&mut self, session_id: Uuid) {
self.waiting_for_input.insert(session_id);
info!("Session {} marked as waiting for input", session_id);
}
pub fn get_session(&self, session_id: &Uuid) -> Option<UserSession> {
self.sessions.get(session_id).cloned()
pub fn get_user_session(
&mut self,
uid: Uuid,
bid: Uuid,
) -> Result<Option<UserSession>, Box<dyn Error + Send + Sync>> {
use crate::shared::models::user_sessions::dsl::*;
let result = user_sessions
.filter(user_id.eq(uid))
.filter(bot_id.eq(bid))
.order(created_at.desc())
.first::<UserSession>(&mut self.conn)
.optional()?;
Ok(result)
}
pub fn list_sessions(&self) -> Vec<UserSession> {
self.sessions.values().cloned().collect()
pub fn create_session(
&mut self,
uid: Uuid,
bid: Uuid,
session_title: &str,
) -> Result<UserSession, Box<dyn Error + Send + Sync>> {
use crate::shared::models::user_sessions::dsl::*;
// Return an existing session if one already matches the user, bot, and title.
if let Some(existing) = user_sessions
.filter(user_id.eq(uid))
.filter(bot_id.eq(bid))
.filter(title.eq(session_title))
.first::<UserSession>(&mut self.conn)
.optional()?
{
return Ok(existing);
}
let now = Utc::now();
// Insert the new session and retrieve the full record in one step.
let inserted: UserSession = diesel::insert_into(user_sessions)
.values((
id.eq(Uuid::new_v4()),
user_id.eq(uid),
bot_id.eq(bid),
title.eq(session_title),
context_data.eq(serde_json::json!({})),
answer_mode.eq("direct"),
current_tool.eq(None::<String>),
created_at.eq(now),
updated_at.eq(now),
))
.returning(UserSession::as_returning())
.get_result(&mut self.conn)?;
Ok(inserted)
}
pub fn save_message(
&mut self,
sess_id: Uuid,
uid: Uuid,
role_str: &str,
content: &str,
msg_type: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::message_history::dsl::*;
let next_index = message_history
.filter(session_id.eq(sess_id))
.count()
.get_result::<i64>(&mut self.conn)?;
diesel::insert_into(message_history)
.values((
id.eq(Uuid::new_v4()),
session_id.eq(sess_id),
user_id.eq(uid),
role.eq(role_str),
content_encrypted.eq(content),
message_type.eq(msg_type),
message_index.eq(next_index),
created_at.eq(chrono::Utc::now()),
))
.execute(&mut self.conn)?;
Ok(())
}
pub fn get_conversation_history(
&mut self,
sess_id: Uuid,
_uid: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn Error + Send + Sync>> {
use crate::shared::models::message_history::dsl::*;
let messages = message_history
.filter(session_id.eq(sess_id))
.order(message_index.asc())
.select((role, content_encrypted))
.load::<(String, String)>(&mut self.conn)?;
Ok(messages)
}
pub fn get_user_sessions(
&mut self,
uid: Uuid,
) -> Result<Vec<UserSession>, Box<dyn Error + Send + Sync>> {
use crate::shared::models::user_sessions;
let sessions = user_sessions::table
.filter(user_sessions::user_id.eq(uid))
.order(user_sessions::created_at.desc())
.load::<UserSession>(&mut self.conn)?;
Ok(sessions)
}
pub fn update_answer_mode(
&mut self,
uid: &str,
bid: &str,
mode: &str,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::user_sessions::dsl::*;
let user_uuid = Uuid::parse_str(uid)?;
let bot_uuid = Uuid::parse_str(bid)?;
diesel::update(
user_sessions
.filter(user_id.eq(user_uuid))
.filter(bot_id.eq(bot_uuid)),
)
.set((answer_mode.eq(mode), updated_at.eq(chrono::Utc::now())))
.execute(&mut self.conn)?;
Ok(())
}
}

View file

@ -1,3 +1,4 @@
use chrono::Utc;
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -61,7 +62,7 @@ pub struct Automation {
pub last_triggered: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Identifiable)]
#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Identifiable, Selectable)]
#[diesel(table_name = user_sessions)]
pub struct UserSession {
pub id: Uuid,
@ -71,8 +72,8 @@ pub struct UserSession {
pub context_data: serde_json::Value,
pub answer_mode: String,
pub current_tool: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View file

@ -112,7 +112,6 @@ impl ToolManager {
("b".to_string(), "number".to_string()),
]),
script: r#"
// Calculator tool implementation
print("Calculator started");
"#
.to_string(),
@ -163,7 +162,6 @@ impl ToolManager {
input: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.provide_user_response(session_id, "default_bot", input.to_string())
.await
}
pub async fn get_tool_output(
@ -173,18 +171,23 @@ impl ToolManager {
Ok(vec![])
}
pub async fn provide_user_response(
pub fn provide_user_response(
&self,
user_id: &str,
bot_id: &str,
response: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let key = format!("{}:{}", user_id, bot_id);
let mut waiting = self.waiting_responses.lock().await;
if let Some(tx) = waiting.get_mut(&key) {
let _ = tx.send(response).await;
waiting.remove(&key);
}
let waiting = self.waiting_responses.clone();
tokio::spawn(async move {
let mut waiting_lock = waiting.lock().await;
if let Some(tx) = waiting_lock.get_mut(&key) {
let _ = tx.send(response).await;
waiting_lock.remove(&key);
}
});
Ok(())
}
}

View file

@ -1,7 +1,7 @@
<!doctype html>
<html>
<head>
<title>General Bots - ChatGPT Clone</title>
<title>General Bots</title>
<style>
* {
margin: 0;