From fa9f163971b7ab641493594d5ef328810fed4abf Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 12 Oct 2025 11:44:35 -0300 Subject: [PATCH] - Compiling again. --- Cargo.lock | 1 + Cargo.toml | 2 +- prompts/dev/generation.md | 3 +- scripts/dev/build_prompt.sh | 22 +- src/bot/mod.rs | 776 ++++++++++++++++++++++++++++++++---- src/session/mod.rs | 167 +++++++- src/shared/models.rs | 7 +- src/tools/mod.rs | 19 +- static/index.html | 2 +- 9 files changed, 878 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76f251186..8376345dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1726,6 +1726,7 @@ dependencies = [ "downcast-rs", "itoa", "pq-sys", + "serde_json", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 8dae7686b..e84662e98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ argon2 = "0.5" base64 = "0.22" bytes = "1.8" chrono = { version = "0.4", features = ["serde"] } -diesel = { version = "2.1", features = ["postgres", "uuid", "chrono"] } +diesel = { version = "2.1", features = ["postgres", "uuid", "chrono", "serde_json"] } dotenvy = "0.15" downloader = "0.2" env_logger = "0.11" diff --git a/prompts/dev/generation.md b/prompts/dev/generation.md index 32ef21735..6349d2733 100644 --- a/prompts/dev/generation.md +++ b/prompts/dev/generation.md @@ -1,5 +1,6 @@ MORE RULES: -- Return only the modified files as a single `.sh` script using `cat`, so the - code can be restored directly. +- Return *only the modified* files as a single `.sh` script using `cat`, so the code can be restored directly. +- NEVER return a untouched file in output. Just files that need to be updated. - You MUST return exactly this example format: ```sh #!/bin/bash diff --git a/scripts/dev/build_prompt.sh b/scripts/dev/build_prompt.sh index e7bc7ffa9..927fda7cc 100755 --- a/scripts/dev/build_prompt.sh +++ b/scripts/dev/build_prompt.sh @@ -9,8 +9,8 @@ echo "Consolidated LLM Context" > "$OUTPUT_FILE" prompts=( "../../prompts/dev/shared.md" "../../Cargo.toml" - "../../prompts/dev/fix.md" - #"../../prompts/dev/generation.md" + #"../../prompts/dev/fix.md" + "../../prompts/dev/generation.md" ) for file in "${prompts[@]}"; do @@ -23,22 +23,21 @@ dirs=( #"automation" #"basic" "bot" - #"channels" + "channels" #"config" - #"context" + "context" #"email" #"file" - #"llm" + "llm" #"llm_legacy" #"org" "session" - #"shared" + "shared" #"tests" - #"tools" + "tools" #"web_automation" #"whatsapp" ) -dirs=() # disabled. for dir in "${dirs[@]}"; do find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do cat "$file" >> "$OUTPUT_FILE" @@ -48,7 +47,12 @@ done # Also append the specific files you mentioned cat "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE" +cat "$PROJECT_ROOT/src/basic/keywords/hear_talk.rs" >> "$OUTPUT_FILE" + +echo "This BASIC file will run as soon as the conversation is created. " >> "$OUTPUT_FILE" +cat "$PROJECT_ROOT/templates/annoucements.gbai/annoucements.gbdialog/start.bas" >> "$OUTPUT_FILE" + echo "" >> "$OUTPUT_FILE" -cargo build --message-format=short 2>&1 | grep -E 'error' >> "$OUTPUT_FILE" +# cargo build --message-format=short 2>&1 | grep -E 'error' >> "$OUTPUT_FILE" diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 6ae05f744..441abe6ee 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,94 +1,714 @@ -use crate::session::SessionManager; -use actix_web::{get, post, web, HttpResponse, Responder}; +use actix_web::{web, HttpRequest, HttpResponse, Result}; +use actix_ws::Message as WsMessage; +use chrono::Utc; use log::info; +use serde_json; +use std::collections::HashMap; +use std::fs; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; use uuid::Uuid; -pub struct BotOrchestrator {} +use crate::auth::AuthService; +use crate::channels::ChannelAdapter; +use crate::llm::LLMProvider; +use crate::session::SessionManager; +use crate::shared::models::{BotResponse, UserMessage, UserSession}; +use crate::tools::ToolManager; + +pub struct BotOrchestrator { + pub session_manager: Arc>, + tool_manager: Arc, + llm_provider: Arc, + auth_service: AuthService, + pub channels: HashMap>, + response_channels: Arc>>>, +} impl BotOrchestrator { - pub fn new(_a: A, _b: B, _c: C, _d: D) -> Self { - info!("BotOrchestrator initialized"); - BotOrchestrator {} + pub fn new( + session_manager: SessionManager, + tool_manager: ToolManager, + llm_provider: Arc, + auth_service: AuthService, + ) -> Self { + Self { + session_manager: Arc::new(Mutex::new(session_manager)), + tool_manager: Arc::new(tool_manager), + llm_provider, + auth_service, + channels: HashMap::new(), + response_channels: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn handle_user_input( + &self, + session_id: Uuid, + user_input: &str, + ) -> Result, Box> { + let mut session_manager = self.session_manager.lock().await; + session_manager.provide_input(session_id, user_input.to_string())?; + Ok(None) + } + + pub async fn is_waiting_for_input(&self, session_id: Uuid) -> bool { + let session_manager = self.session_manager.lock().await; + session_manager.is_waiting_for_input(&session_id) + } + + pub fn add_channel(&mut self, channel_type: &str, adapter: Arc) { + self.channels.insert(channel_type.to_string(), adapter); + } + + pub async fn register_response_channel( + &self, + session_id: String, + sender: mpsc::Sender, + ) { + self.response_channels + .lock() + .await + .insert(session_id, sender); + } + + pub async fn set_user_answer_mode( + &self, + user_id: &str, + bot_id: &str, + mode: &str, + ) -> Result<(), Box> { + let mut session_manager = self.session_manager.lock().await; + session_manager.update_answer_mode(user_id, bot_id, mode)?; + Ok(()) + } + + pub async fn process_message( + &self, + message: UserMessage, + ) -> Result<(), Box> { + info!( + "Processing message from channel: {}, user: {}", + message.channel, message.user_id + ); + + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); + let bot_id = Uuid::parse_str(&message.bot_id) + .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); + + let session = { + let mut session_manager = self.session_manager.lock().await; + match session_manager.get_user_session(user_id, bot_id)? { + Some(session) => session, + None => session_manager.create_session(user_id, bot_id, "New Conversation")?, + } + }; + + if self.is_waiting_for_input(session.id).await { + if let Some(variable_name) = + self.handle_user_input(session.id, &message.content).await? + { + info!( + "Stored user input in variable '{}' for session {}", + variable_name, session.id + ); + + if let Some(adapter) = self.channels.get(&message.channel) { + let ack_response = BotResponse { + bot_id: message.bot_id.clone(), + user_id: message.user_id.clone(), + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: format!("Input stored in '{}'", variable_name), + message_type: "system".to_string(), + stream_token: None, + is_complete: true, + }; + adapter.send_message(ack_response).await?; + } + return Ok(()); + } + } + + if session.answer_mode == "tool" && session.current_tool.is_some() { + self.tool_manager.provide_user_response( + &message.user_id, + &message.bot_id, + message.content.clone(), + )?; + return Ok(()); + } + + { + let mut session_manager = self.session_manager.lock().await; + session_manager.save_message( + session.id, + user_id, + "user", + &message.content, + &message.message_type, + )?; + } + + let response_content = self.direct_mode_handler(&message, &session).await?; + + { + let mut session_manager = self.session_manager.lock().await; + session_manager.save_message( + session.id, + user_id, + "assistant", + &response_content, + "text", + )?; + } + + let bot_response = BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: response_content, + message_type: "text".to_string(), + stream_token: None, + is_complete: true, + }; + + if let Some(adapter) = self.channels.get(&message.channel) { + adapter.send_message(bot_response).await?; + } + + Ok(()) + } + + async fn direct_mode_handler( + &self, + message: &UserMessage, + session: &UserSession, + ) -> Result> { + // Retrieve conversation history while holding a mutable lock on the session manager. + let history = { + let mut session_manager = self.session_manager.lock().await; + session_manager.get_conversation_history(session.id, session.user_id)? + }; + + // Build the prompt from the conversation history. + let mut prompt = String::new(); + for (role, content) in history { + prompt.push_str(&format!("{}: {}\n", role, content)); + } + prompt.push_str(&format!("User: {}\nAssistant:", message.content)); + + // Generate the assistant's response using the LLM provider. + self.llm_provider + .generate(&prompt, &serde_json::Value::Null) + .await + } + + pub async fn stream_response( + &self, + message: UserMessage, + response_tx: mpsc::Sender, + ) -> Result<(), Box> { + info!("Streaming response for user: {}", message.user_id); + + // Parse identifiers, falling back to safe defaults. + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); + let bot_id = Uuid::parse_str(&message.bot_id).unwrap_or_else(|_| Uuid::nil()); + + // Retrieve an existing session or create a new one. + let session = { + let mut sm = self.session_manager.lock().await; + match sm.get_user_session(user_id, bot_id)? { + Some(sess) => sess, + None => sm.create_session(user_id, bot_id, "New Conversation")?, + } + }; + + // If the session is awaiting tool input, forward the user's answer to the tool manager. + if session.answer_mode == "tool" && session.current_tool.is_some() { + self.tool_manager.provide_user_response( + &message.user_id, + &message.bot_id, + message.content.clone(), + )?; + return Ok(()); + } + + // Persist the incoming user message. + { + let mut sm = self.session_manager.lock().await; + sm.save_message( + session.id, + user_id, + "user", + &message.content, + &message.message_type, + )?; + } + + // Build the prompt from the conversation history. + let prompt = { + let mut sm = self.session_manager.lock().await; + let history = sm.get_conversation_history(session.id, user_id)?; + let mut p = String::new(); + for (role, content) in history { + p.push_str(&format!("{}: {}\n", role, content)); + } + p.push_str(&format!("User: {}\nAssistant:", message.content)); + p + }; + + // Set up a channel for the streaming LLM output. + let (stream_tx, mut stream_rx) = mpsc::channel::(100); + let llm = self.llm_provider.clone(); + + // Spawn the LLM streaming task. + tokio::spawn(async move { + if let Err(e) = llm + .generate_stream(&prompt, &serde_json::Value::Null, stream_tx) + .await + { + log::error!("LLM streaming error: {}", e); + } + }); + + // Forward each chunk to the client as it arrives. + let mut full_response = String::new(); + while let Some(chunk) = stream_rx.recv().await { + full_response.push_str(&chunk); + + let partial = BotResponse { + bot_id: message.bot_id.clone(), + user_id: message.user_id.clone(), + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: chunk, + message_type: "text".to_string(), + stream_token: None, + is_complete: false, + }; + + if response_tx.send(partial).await.is_err() { + // Receiver has been dropped; stop streaming. + break; + } + } + + // Save the complete assistant reply. + { + let mut sm = self.session_manager.lock().await; + sm.save_message(session.id, user_id, "assistant", &full_response, "text")?; + } + + // Notify the client that the stream is finished. + let final_msg = BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id, + channel: message.channel, + content: String::new(), + message_type: "text".to_string(), + stream_token: None, + is_complete: true, + }; + + response_tx.send(final_msg).await?; + Ok(()) + } + + pub async fn get_user_sessions( + &self, + user_id: Uuid, + ) -> Result, Box> { + let mut session_manager = self.session_manager.lock().await; + session_manager.get_user_sessions(user_id) + } + + pub async fn get_conversation_history( + &self, + session_id: Uuid, + user_id: Uuid, + ) -> Result, Box> { + let mut session_manager = self.session_manager.lock().await; + session_manager.get_conversation_history(session_id, user_id) + } + + pub async fn process_message_with_tools( + &self, + message: UserMessage, + ) -> Result<(), Box> { + info!( + "Processing message with tools from user: {}", + message.user_id + ); + + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); + let bot_id = Uuid::parse_str(&message.bot_id) + .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); + + let session = { + let mut session_manager = self.session_manager.lock().await; + match session_manager.get_user_session(user_id, bot_id)? { + Some(session) => session, + None => session_manager.create_session(user_id, bot_id, "New Conversation")?, + } + }; + + { + let mut session_manager = self.session_manager.lock().await; + session_manager.save_message( + session.id, + user_id, + "user", + &message.content, + &message.message_type, + )?; + } + + let is_tool_waiting = self + .tool_manager + .is_tool_waiting(&message.session_id) + .await + .unwrap_or(false); + + if is_tool_waiting { + self.tool_manager + .provide_input(&message.session_id, &message.content) + .await?; + + if let Ok(tool_output) = self.tool_manager.get_tool_output(&message.session_id).await { + for output in tool_output { + let bot_response = BotResponse { + bot_id: message.bot_id.clone(), + user_id: message.user_id.clone(), + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: output, + message_type: "text".to_string(), + stream_token: None, + is_complete: true, + }; + + if let Some(adapter) = self.channels.get(&message.channel) { + adapter.send_message(bot_response).await?; + } + } + } + return Ok(()); + } + + let response = if message.content.to_lowercase().contains("calculator") + || message.content.to_lowercase().contains("calculate") + || message.content.to_lowercase().contains("math") + { + match self + .tool_manager + .execute_tool("calculator", &message.session_id, &message.user_id) + .await + { + Ok(tool_result) => { + let mut session_manager = self.session_manager.lock().await; + session_manager.save_message( + session.id, + user_id, + "assistant", + &tool_result.output, + "tool_start", + )?; + + tool_result.output + } + Err(e) => { + format!("I encountered an error starting the calculator: {}", e) + } + } + } else { + let available_tools = self.tool_manager.list_tools(); + let tools_context = if !available_tools.is_empty() { + format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", ")) + } else { + String::new() + }; + + let full_prompt = format!("{}{}", message.content, tools_context); + + self.llm_provider + .generate(&full_prompt, &serde_json::Value::Null) + .await? + }; + + { + let mut session_manager = self.session_manager.lock().await; + session_manager.save_message(session.id, user_id, "assistant", &response, "text")?; + } + + let bot_response = BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: response, + message_type: "text".to_string(), + stream_token: None, + is_complete: true, + }; + + if let Some(adapter) = self.channels.get(&message.channel) { + adapter.send_message(bot_response).await?; + } + + Ok(()) } } -#[get("/")] -pub async fn index() -> impl Responder { - info!("index requested"); - HttpResponse::Ok().body("General Bots") +#[actix_web::get("/ws")] +async fn websocket_handler( + req: HttpRequest, + stream: web::Payload, + data: web::Data, +) -> Result { + let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; + let session_id = Uuid::new_v4().to_string(); + let (tx, mut rx) = mpsc::channel::(100); + + data.orchestrator + .register_response_channel(session_id.clone(), tx.clone()) + .await; + data.web_adapter + .add_connection(session_id.clone(), tx.clone()) + .await; + data.voice_adapter + .add_connection(session_id.clone(), tx.clone()) + .await; + + let orchestrator = data.orchestrator.clone(); + let web_adapter = data.web_adapter.clone(); + + actix_web::rt::spawn(async move { + while let Some(msg) = rx.recv().await { + if let Ok(json) = serde_json::to_string(&msg) { + let _ = session.text(json).await; + } + } + }); + + actix_web::rt::spawn(async move { + while let Some(Ok(msg)) = msg_stream.recv().await { + match msg { + WsMessage::Text(text) => { + let user_message = UserMessage { + bot_id: "default_bot".to_string(), + user_id: "default_user".to_string(), + session_id: session_id.clone(), + channel: "web".to_string(), + content: text.to_string(), + message_type: "text".to_string(), + media_url: None, + timestamp: Utc::now(), + }; + + if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { + info!("Error processing message: {}", e); + } + } + WsMessage::Close(_) => { + web_adapter.remove_connection(&session_id).await; + break; + } + _ => {} + } + } + }); + + Ok(res) } -#[get("/static")] -pub async fn static_files() -> impl Responder { - info!("static_files requested"); - HttpResponse::Ok().body("static") -} +#[actix_web::get("/api/whatsapp/webhook")] +async fn whatsapp_webhook_verify( + data: web::Data, + web::Query(params): web::Query>, +) -> Result { + let empty = String::new(); + let mode = params.get("hub.mode").unwrap_or(&empty); + let token = params.get("hub.verify_token").unwrap_or(&empty); + let challenge = params.get("hub.challenge").unwrap_or(&empty); -#[post("/voice/start")] -pub async fn voice_start() -> impl Responder { - info!("voice_start requested"); - HttpResponse::Ok().body("voice started") -} - -#[post("/voice/stop")] -pub async fn voice_stop() -> impl Responder { - info!("voice_stop requested"); - HttpResponse::Ok().body("voice stopped") -} - -#[post("/ws")] -pub async fn websocket_handler() -> impl Responder { - info!("websocket_handler requested"); - HttpResponse::NotImplemented().finish() -} - -#[post("/whatsapp/webhook")] -pub async fn whatsapp_webhook() -> impl Responder { - info!("whatsapp_webhook called"); - HttpResponse::Ok().finish() -} - -#[get("/whatsapp/verify")] -pub async fn whatsapp_webhook_verify() -> impl Responder { - info!("whatsapp_webhook_verify called"); - HttpResponse::Ok().finish() -} - -#[post("/session/create")] -pub async fn create_session(data: web::Data) -> impl Responder { - let mut mgr = data.0.lock().unwrap(); - let id = mgr.create_session(); - info!("create_session -> {}", id); - HttpResponse::Ok().body(id.to_string()) -} - -#[get("/sessions")] -pub async fn get_sessions(data: web::Data) -> impl Responder { - let mgr = data.0.lock().unwrap(); - let list = mgr.list_sessions(); - HttpResponse::Ok().json(list) -} - -#[get("/session/{id}/history")] -pub async fn get_session_history( - path: web::Path, - data: web::Data, -) -> impl Responder { - let id = path.into_inner(); - let mgr = data.0.lock().unwrap(); - if let Some(sess) = mgr.get_session(&id) { - HttpResponse::Ok().json(sess) - } else { - HttpResponse::NotFound().finish() + match data.whatsapp_adapter.verify_webhook(mode, token, challenge) { + Ok(challenge_response) => Ok(HttpResponse::Ok().body(challenge_response)), + Err(_) => Ok(HttpResponse::Forbidden().body("Verification failed")), } } -#[post("/session/{id}/mode")] -pub async fn set_mode_handler(path: web::Path) -> impl Responder { - let id = path.into_inner(); - info!("set_mode_handler called for {}", id); - HttpResponse::Ok().finish() +#[actix_web::post("/api/whatsapp/webhook")] +async fn whatsapp_webhook( + data: web::Data, + payload: web::Json, +) -> Result { + match data + .whatsapp_adapter + .process_incoming_message(payload.into_inner()) + .await + { + Ok(user_messages) => { + for user_message in user_messages { + if let Err(e) = data.orchestrator.process_message(user_message).await { + log::error!("Error processing WhatsApp message: {}", e); + } + } + Ok(HttpResponse::Ok().body("")) + } + Err(e) => { + log::error!("Error processing WhatsApp webhook: {}", e); + Ok(HttpResponse::BadRequest().body("Invalid message")) + } + } } -use std::sync::{Arc, Mutex}; -pub struct SessionManagerWrapper(pub Arc>); +#[actix_web::post("/api/voice/start")] +async fn voice_start( + data: web::Data, + info: web::Json, +) -> Result { + let session_id = info + .get("session_id") + .and_then(|s| s.as_str()) + .unwrap_or(""); + let user_id = info + .get("user_id") + .and_then(|u| u.as_str()) + .unwrap_or("user"); + + match data + .voice_adapter + .start_voice_session(session_id, user_id) + .await + { + Ok(token) => { + Ok(HttpResponse::Ok().json(serde_json::json!({"token": token, "status": "started"}))) + } + Err(e) => { + Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))) + } + } +} + +#[actix_web::post("/api/voice/stop")] +async fn voice_stop( + data: web::Data, + info: web::Json, +) -> Result { + let session_id = info + .get("session_id") + .and_then(|s| s.as_str()) + .unwrap_or(""); + + match data.voice_adapter.stop_voice_session(session_id).await { + Ok(()) => Ok(HttpResponse::Ok().json(serde_json::json!({"status": "stopped"}))), + Err(e) => { + Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))) + } + } +} + +#[actix_web::post("/api/sessions")] +async fn create_session(_data: web::Data) -> Result { + let session_id = Uuid::new_v4(); + Ok(HttpResponse::Ok().json(serde_json::json!({ + "session_id": session_id, + "title": "New Conversation", + "created_at": Utc::now() + }))) +} + +#[actix_web::get("/api/sessions")] +async fn get_sessions(data: web::Data) -> Result { + let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + match data.orchestrator.get_user_sessions(user_id).await { + Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)), + Err(e) => { + Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))) + } + } +} + +#[actix_web::get("/api/sessions/{session_id}")] +async fn get_session_history( + data: web::Data, + path: web::Path, +) -> Result { + let session_id = path.into_inner(); + let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + + match Uuid::parse_str(&session_id) { + Ok(session_uuid) => match data + .orchestrator + .get_conversation_history(session_uuid, user_id) + .await + { + Ok(history) => Ok(HttpResponse::Ok().json(history)), + Err(e) => Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))), + }, + Err(_) => { + Ok(HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"}))) + } + } +} + +#[actix_web::post("/api/set_mode")] +async fn set_mode_handler( + data: web::Data, + info: web::Json>, +) -> Result { + let default_user = "default_user".to_string(); + let default_bot = "default_bot".to_string(); + let default_mode = "direct".to_string(); + + let user_id = info.get("user_id").unwrap_or(&default_user); + let bot_id = info.get("bot_id").unwrap_or(&default_bot); + let mode = info.get("mode").unwrap_or(&default_mode); + + if let Err(e) = data + .orchestrator + .set_user_answer_mode(user_id, bot_id, mode) + .await + { + return Ok( + HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) + ); + } + + Ok(HttpResponse::Ok().json(serde_json::json!({"status": "mode_updated"}))) +} + +#[actix_web::get("/")] +async fn index() -> Result { + let html = fs::read_to_string("templates/index.html") + .unwrap_or_else(|_| include_str!("../../static/index.html").to_string()); + Ok(HttpResponse::Ok().content_type("text/html").body(html)) +} + +#[actix_web::get("/static/{filename:.*}")] +async fn static_files(req: HttpRequest) -> Result { + let filename = req.match_info().query("filename"); + let path = format!("static/{}", filename); + + match fs::read(&path) { + Ok(content) => { + let content_type = match filename { + f if f.ends_with(".js") => "application/javascript", + f if f.ends_with(".css") => "text/css", + f if f.ends_with(".png") => "image/png", + f if f.ends_with(".jpg") | f.ends_with(".jpeg") => "image/jpeg", + _ => "text/plain", + }; + + Ok(HttpResponse::Ok().content_type(content_type).body(content)) + } + Err(_) => Ok(HttpResponse::NotFound().body("File not found")), + } +} diff --git a/src/session/mod.rs b/src/session/mod.rs index 8b5325220..64ffda5d3 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1,29 +1,36 @@ +use chrono::Utc; +use diesel::prelude::*; use diesel::PgConnection; use log::info; use redis::Client; use serde::{Deserialize, Serialize}; + use std::collections::{HashMap, HashSet}; use std::error::Error; use std::sync::Arc; use uuid::Uuid; +use crate::shared::models::UserSession; + #[derive(Clone, Serialize, Deserialize)] -pub struct UserSession { +pub struct SessionData { pub id: Uuid, pub user_id: Option, pub data: String, } pub struct SessionManager { - sessions: HashMap, + conn: PgConnection, + sessions: HashMap, waiting_for_input: HashSet, redis: Option>, } impl SessionManager { - pub fn new(_conn: PgConnection, redis_client: Option>) -> Self { + pub fn new(conn: PgConnection, redis_client: Option>) -> Self { info!("Initializing SessionManager"); SessionManager { + conn, sessions: HashMap::new(), waiting_for_input: HashSet::new(), redis: redis_client, @@ -42,7 +49,7 @@ impl SessionManager { if let Some(sess) = self.sessions.get_mut(&session_id) { sess.data = input; } else { - let sess = UserSession { + let sess = SessionData { id: session_id, user_id: None, data: input, @@ -57,28 +64,148 @@ impl SessionManager { self.waiting_for_input.contains(session_id) } - pub fn create_session(&mut self) -> Uuid { - let id = Uuid::new_v4(); - let sess = UserSession { - id, - user_id: None, - data: String::new(), - }; - self.sessions.insert(id, sess); - info!("Created session {}", id); - id - } - pub fn mark_waiting(&mut self, session_id: Uuid) { self.waiting_for_input.insert(session_id); info!("Session {} marked as waiting for input", session_id); } - pub fn get_session(&self, session_id: &Uuid) -> Option { - self.sessions.get(session_id).cloned() + pub fn get_user_session( + &mut self, + uid: Uuid, + bid: Uuid, + ) -> Result, Box> { + use crate::shared::models::user_sessions::dsl::*; + + let result = user_sessions + .filter(user_id.eq(uid)) + .filter(bot_id.eq(bid)) + .order(created_at.desc()) + .first::(&mut self.conn) + .optional()?; + + Ok(result) } - pub fn list_sessions(&self) -> Vec { - self.sessions.values().cloned().collect() + pub fn create_session( + &mut self, + uid: Uuid, + bid: Uuid, + session_title: &str, + ) -> Result> { + use crate::shared::models::user_sessions::dsl::*; + + // Return an existing session if one already matches the user, bot, and title. + if let Some(existing) = user_sessions + .filter(user_id.eq(uid)) + .filter(bot_id.eq(bid)) + .filter(title.eq(session_title)) + .first::(&mut self.conn) + .optional()? + { + return Ok(existing); + } + + let now = Utc::now(); + + // Insert the new session and retrieve the full record in one step. + let inserted: UserSession = diesel::insert_into(user_sessions) + .values(( + id.eq(Uuid::new_v4()), + user_id.eq(uid), + bot_id.eq(bid), + title.eq(session_title), + context_data.eq(serde_json::json!({})), + answer_mode.eq("direct"), + current_tool.eq(None::), + created_at.eq(now), + updated_at.eq(now), + )) + .returning(UserSession::as_returning()) + .get_result(&mut self.conn)?; + + Ok(inserted) + } + + pub fn save_message( + &mut self, + sess_id: Uuid, + uid: Uuid, + role_str: &str, + content: &str, + msg_type: &str, + ) -> Result<(), Box> { + use crate::shared::models::message_history::dsl::*; + + let next_index = message_history + .filter(session_id.eq(sess_id)) + .count() + .get_result::(&mut self.conn)?; + + diesel::insert_into(message_history) + .values(( + id.eq(Uuid::new_v4()), + session_id.eq(sess_id), + user_id.eq(uid), + role.eq(role_str), + content_encrypted.eq(content), + message_type.eq(msg_type), + message_index.eq(next_index), + created_at.eq(chrono::Utc::now()), + )) + .execute(&mut self.conn)?; + + Ok(()) + } + + pub fn get_conversation_history( + &mut self, + sess_id: Uuid, + _uid: Uuid, + ) -> Result, Box> { + use crate::shared::models::message_history::dsl::*; + + let messages = message_history + .filter(session_id.eq(sess_id)) + .order(message_index.asc()) + .select((role, content_encrypted)) + .load::<(String, String)>(&mut self.conn)?; + + Ok(messages) + } + + pub fn get_user_sessions( + &mut self, + uid: Uuid, + ) -> Result, Box> { + use crate::shared::models::user_sessions; + + let sessions = user_sessions::table + .filter(user_sessions::user_id.eq(uid)) + .order(user_sessions::created_at.desc()) + .load::(&mut self.conn)?; + + Ok(sessions) + } + + pub fn update_answer_mode( + &mut self, + uid: &str, + bid: &str, + mode: &str, + ) -> Result<(), Box> { + use crate::shared::models::user_sessions::dsl::*; + + let user_uuid = Uuid::parse_str(uid)?; + let bot_uuid = Uuid::parse_str(bid)?; + + diesel::update( + user_sessions + .filter(user_id.eq(user_uuid)) + .filter(bot_id.eq(bot_uuid)), + ) + .set((answer_mode.eq(mode), updated_at.eq(chrono::Utc::now()))) + .execute(&mut self.conn)?; + + Ok(()) } } diff --git a/src/shared/models.rs b/src/shared/models.rs index 235a6fc7a..2c2a7ced0 100644 --- a/src/shared/models.rs +++ b/src/shared/models.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use diesel::prelude::*; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -61,7 +62,7 @@ pub struct Automation { pub last_triggered: Option>, } -#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Identifiable)] +#[derive(Debug, Clone, Serialize, Deserialize, Queryable, Identifiable, Selectable)] #[diesel(table_name = user_sessions)] pub struct UserSession { pub id: Uuid, @@ -71,8 +72,8 @@ pub struct UserSession { pub context_data: serde_json::Value, pub answer_mode: String, pub current_tool: Option, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/tools/mod.rs b/src/tools/mod.rs index ca829e744..9eccb41ec 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -112,7 +112,6 @@ impl ToolManager { ("b".to_string(), "number".to_string()), ]), script: r#" - // Calculator tool implementation print("Calculator started"); "# .to_string(), @@ -163,7 +162,6 @@ impl ToolManager { input: &str, ) -> Result<(), Box> { self.provide_user_response(session_id, "default_bot", input.to_string()) - .await } pub async fn get_tool_output( @@ -173,18 +171,23 @@ impl ToolManager { Ok(vec![]) } - pub async fn provide_user_response( + pub fn provide_user_response( &self, user_id: &str, bot_id: &str, response: String, ) -> Result<(), Box> { let key = format!("{}:{}", user_id, bot_id); - let mut waiting = self.waiting_responses.lock().await; - if let Some(tx) = waiting.get_mut(&key) { - let _ = tx.send(response).await; - waiting.remove(&key); - } + let waiting = self.waiting_responses.clone(); + + tokio::spawn(async move { + let mut waiting_lock = waiting.lock().await; + if let Some(tx) = waiting_lock.get_mut(&key) { + let _ = tx.send(response).await; + waiting_lock.remove(&key); + } + }); + Ok(()) } } diff --git a/static/index.html b/static/index.html index f92997495..242a09d60 100644 --- a/static/index.html +++ b/static/index.html @@ -1,7 +1,7 @@ - General Bots - ChatGPT Clone + General Bots