From 3aeb3ebc70e292b1795770b846e5ec682029c653 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 13 Oct 2025 17:43:03 -0300 Subject: [PATCH] - New features for start.bas --- Cargo.lock | 1 + Cargo.toml | 1 + scripts/dev/build_prompt.sh => add-req.sh | 19 +- docs/DEV.md | 6 + docs/GLOSSARY.md | 6 + fix-errors.sh | 59 +++ src/basic/keywords/hear_talk.rs | 144 +++++-- src/basic/keywords/mod.rs | 2 +- src/basic/mod.rs | 7 +- src/bot/mod.rs | 373 ++++++++++++++---- src/channels/mod.rs | 24 +- src/main.rs | 6 +- src/session/mod.rs | 161 ++++++-- .../annoucements.gbdialog/start-ctx.bas | 8 + .../annoucements.gbdialog/start.bas | 9 +- web/index.html | 176 ++++++++- 16 files changed, 836 insertions(+), 166 deletions(-) rename scripts/dev/build_prompt.sh => add-req.sh (75%) mode change 100755 => 100644 create mode 100644 docs/GLOSSARY.md create mode 100755 fix-errors.sh create mode 100644 templates/annoucements.gbai/annoucements.gbdialog/start-ctx.bas diff --git a/Cargo.lock b/Cargo.lock index 8376345d..788c45e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,6 +1021,7 @@ dependencies = [ "native-tls", "num-format", "qdrant-client", + "rand 0.9.2", "redis", "regex", "reqwest 0.12.23", diff --git a/Cargo.toml b/Cargo.toml index e84662e9..e57b074c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,3 +58,4 @@ zip = "2.2" time = "0.3.44" aws-sdk-s3 = "1.108.0" headless_chrome = { version = "1.0.18", optional = true } +rand = "0.9.2" diff --git a/scripts/dev/build_prompt.sh b/add-req.sh old mode 100755 new mode 100644 similarity index 75% rename from scripts/dev/build_prompt.sh rename to add-req.sh index e9e6f6ff..c7d6320e --- a/scripts/dev/build_prompt.sh +++ b/add-req.sh @@ -1,16 +1,15 @@ #!/bin/bash SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +PROJECT_ROOT="$SCRIPT_DIR" OUTPUT_FILE="$SCRIPT_DIR/prompt.out" rm $OUTPUT_FILE echo "Consolidated LLM Context" > "$OUTPUT_FILE" prompts=( - "../../prompts/dev/shared.md" - "../../Cargo.toml" - #"../../prompts/dev/fix.md" - "../../prompts/dev/generation.md" + "./prompts/dev/shared.md" + "./Cargo.toml" + "./prompts/dev/generation.md" ) for file in "${prompts[@]}"; do @@ -23,12 +22,12 @@ dirs=( #"automation" #"basic" "bot" - "channels" + #"channels" "config" - "context" + #"context" #"email" #"file" - "llm" + #"llm" #"llm_legacy" #"org" "session" @@ -36,7 +35,7 @@ dirs=( #"tests" #"tools" #"web_automation" - "whatsapp" + #"whatsapp" ) for dir in "${dirs[@]}"; do find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do @@ -54,6 +53,8 @@ cat "$PROJECT_ROOT/src/basic/keywords/hear_talk.rs" >> "$OUTPUT_FILE" echo "$PROJECT_ROOT/src/basic/mod.rs">> "$OUTPUT_FILE" cat "$PROJECT_ROOT/src/basic/mod.rs" >> "$OUTPUT_FILE" +echo "$PROJECT_ROOT/templates/annoucements.gbai/annoucements.gbdialog/start.bas" >> "$OUTPUT_FILE" +cat "$PROJECT_ROOT/templates/annoucements.gbai/annoucements.gbdialog/start.bas" >> "$OUTPUT_FILE" echo "" >> "$OUTPUT_FILE" diff --git a/docs/DEV.md b/docs/DEV.md index 11655b44..50127659 100644 --- a/docs/DEV.md +++ b/docs/DEV.md @@ -1,3 +1,9 @@ +# LLM + +Zed Assistant: Groq + GPT OSS 120B | +FIX Manual: DeepSeek | ChatGPT 120B | Claude 4.5 Thinking | Mistral +ADD Manual: Claude/DeepSeek -> DeepSeek + # DEV curl -sSL https://get.livekit.io | bash diff --git a/docs/GLOSSARY.md b/docs/GLOSSARY.md new file mode 100644 index 00000000..1d899c10 --- /dev/null +++ b/docs/GLOSSARY.md @@ -0,0 +1,6 @@ +RPM: Requests per minute +RPD: Requests per day +TPM: Tokens per minute +TPD: Tokens per day +ASH: Audio seconds per hour +ASD: Audio seconds per day diff --git a/fix-errors.sh b/fix-errors.sh new file mode 100755 index 00000000..397e0b5d --- /dev/null +++ b/fix-errors.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$SCRIPT_DIR" +OUTPUT_FILE="$SCRIPT_DIR/prompt.out" +rm $OUTPUT_FILE +echo "Please, fix this consolidated LLM Context" > "$OUTPUT_FILE" + +prompts=( + "./prompts/dev/shared.md" + "./Cargo.toml" + "./prompts/dev/fix.md" +) + +for file in "${prompts[@]}"; do + cat "$file" >> "$OUTPUT_FILE" + echo "" >> "$OUTPUT_FILE" +done + +dirs=( + #"auth" + #"automation" + #"basic" + "bot" + #"channels" + #"config" + #"context" + #"email" + #"file" + #"llm" + #"llm_legacy" + #"org" + "session" + "shared" + #"tests" + #"tools" + #"web_automation" + #"whatsapp" +) +for dir in "${dirs[@]}"; do + find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do + echo $file >> "$OUTPUT_FILE" + cat "$file" >> "$OUTPUT_FILE" + echo "" >> "$OUTPUT_FILE" + done +done + +# Also append the specific files you mentioned +echo "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE" +cat "$PROJECT_ROOT/src/main.rs" >> "$OUTPUT_FILE" + +cat "$PROJECT_ROOT/src/basic/keywords/hear_talk.rs" >> "$OUTPUT_FILE" +echo "$PROJECT_ROOT/src/basic/mod.rs">> "$OUTPUT_FILE" +cat "$PROJECT_ROOT/src/basic/mod.rs" >> "$OUTPUT_FILE" + + +echo "" >> "$OUTPUT_FILE" + +cargo build --message-format=short 2>&1 | grep -E 'error' >> "$OUTPUT_FILE" diff --git a/src/basic/keywords/hear_talk.rs b/src/basic/keywords/hear_talk.rs index 5be24338..19d8920d 100644 --- a/src/basic/keywords/hear_talk.rs +++ b/src/basic/keywords/hear_talk.rs @@ -1,11 +1,13 @@ +use crate::shared::models::{BotResponse, UserSession}; use crate::shared::state::AppState; -use crate::{channels::ChannelAdapter, shared::models::UserSession}; -use log::info; +use log::{debug, error, info}; use rhai::{Dynamic, Engine, EvalAltResult}; +use std::sync::Arc; +use uuid::Uuid; -pub fn hear_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { +pub fn hear_keyword(state: Arc, user: UserSession, engine: &mut Engine) { let session_id = user.id; - let cache = state.redis_client.clone(); + let state_clone = Arc::clone(&state); engine .register_custom_syntax(&["HEAR", "$ident$"], true, move |_context, inputs| { @@ -19,22 +21,24 @@ pub fn hear_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { variable_name ); - let cache_clone = cache.clone(); + let state_for_spawn = Arc::clone(&state_clone); let session_id_clone = session_id; let var_name_clone = variable_name.clone(); tokio::spawn(async move { - log::debug!( - "HEAR: Starting async task for session {} and variable '{}'", - session_id_clone, - var_name_clone + debug!( + "HEAR: Setting session {} to wait for input for variable '{}'", + session_id_clone, var_name_clone ); - if let Some(cache_client) = &cache_clone { - let mut conn = match cache_client.get_multiplexed_async_connection().await { + let mut session_manager = state_for_spawn.session_manager.lock().await; + session_manager.mark_waiting(session_id_clone); + + if let Some(redis_client) = &state_for_spawn.redis_client { + let mut conn = match redis_client.get_multiplexed_async_connection().await { Ok(conn) => conn, Err(e) => { - log::error!("Failed to connect to cache: {}", e); + error!("Failed to connect to cache: {}", e); return; } }; @@ -56,10 +60,8 @@ pub fn hear_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { .unwrap(); } -pub fn talk_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { - use crate::shared::models::BotResponse; - - let state_clone = state.clone(); +pub fn talk_keyword(state: Arc, user: UserSession, engine: &mut Engine) { + let state_clone = Arc::clone(&state); let user_clone = user.clone(); engine @@ -68,37 +70,97 @@ pub fn talk_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { info!("TALK command executed: {}", message); - let response = BotResponse { - bot_id: "default_bot".to_string(), - user_id: user_clone.user_id.to_string(), - session_id: user_clone.id.to_string(), - channel: "basic".to_string(), - content: message, - message_type: 1, - stream_token: None, - is_complete: true, - }; + let state_for_spawn = Arc::clone(&state_clone); + let user_clone_spawn = user_clone.clone(); + let message_clone = message.clone(); - let state_for_spawn = state_clone.clone(); tokio::spawn(async move { - if let Err(e) = state_for_spawn.web_adapter.send_message(response).await { - log::error!("Failed to send TALK message: {}", e); + debug!("TALK: Sending message via WebSocket: {}", message_clone); + + let bot_id = + std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + + let response = BotResponse { + bot_id: bot_id, + user_id: user_clone_spawn.user_id.to_string(), + session_id: user_clone_spawn.id.to_string(), + channel: "web".to_string(), + content: message_clone, + message_type: 1, + stream_token: None, + is_complete: true, + }; + + let response_channels = state_for_spawn.response_channels.lock().await; + if let Some(tx) = response_channels.get(&user_clone_spawn.id.to_string()) { + if let Err(e) = tx.send(response).await { + error!("Failed to send TALK message via WebSocket: {}", e); + } else { + debug!("TALK message sent successfully via WebSocket"); + } + } else { + debug!( + "No WebSocket connection found for session {}, sending via web adapter", + user_clone_spawn.id + ); + + if let Err(e) = state_for_spawn + .web_adapter + .send_message_to_session(&user_clone_spawn.id.to_string(), response) + .await + { + error!("Failed to send TALK message via web adapter: {}", e); + } else { + debug!("TALK message sent successfully via web adapter"); + } } }); Ok(Dynamic::UNIT) }) .unwrap(); -} - -pub fn set_context_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { - let cache = state.redis_client.clone(); engine - .register_custom_syntax( - &["SET", "CONTEXT", "$expr$"], - true, - move |context, inputs| { + .register_custom_syntax(&["SET_USER", "$expr$"], true, move |context, inputs| { + let user_id_str = context.eval_expression_tree(&inputs[0])?.to_string(); + + info!("SET USER command executed with ID: {}", user_id_str); + + match Uuid::parse_str(&user_id_str) { + Ok(user_id) => { + debug!("Successfully parsed user UUID: {}", user_id); + + let state_for_spawn = Arc::clone(&state_clone); + let user_clone_spawn = user_clone.clone(); + + tokio::spawn(async move { + let mut session_manager = state_for_spawn.session_manager.lock().await; + + if let Err(e) = session_manager.update_user_id(user_clone_spawn.id, user_id) + { + debug!("Failed to update user ID in session: {}", e); + } else { + info!( + "Updated session {} to user ID: {}", + user_clone_spawn.id, user_id + ); + } + }); + } + Err(e) => { + debug!("Invalid UUID format for SET USER: {}", e); + } + } + + Ok(Dynamic::UNIT) + }) + .unwrap(); + + pub fn set_context_keyword(state: &AppState, user: UserSession, engine: &mut Engine) { + let cache = state.redis_client.clone(); + + engine + .register_custom_syntax(&["SET_CONTEXT", "$expr$"], true, move |context, inputs| { let context_value = context.eval_expression_tree(&inputs[0])?.to_string(); info!("SET CONTEXT command executed: {}", context_value); @@ -112,7 +174,7 @@ pub fn set_context_keyword(state: &AppState, user: UserSession, engine: &mut Eng let mut conn = match cache_client.get_multiplexed_async_connection().await { Ok(conn) => conn, Err(e) => { - log::error!("Failed to connect to cache: {}", e); + error!("Failed to connect to cache: {}", e); return; } }; @@ -126,7 +188,7 @@ pub fn set_context_keyword(state: &AppState, user: UserSession, engine: &mut Eng }); Ok(Dynamic::UNIT) - }, - ) - .unwrap(); + }) + .unwrap(); + } } diff --git a/src/basic/keywords/mod.rs b/src/basic/keywords/mod.rs index b84d0e80..220dd655 100644 --- a/src/basic/keywords/mod.rs +++ b/src/basic/keywords/mod.rs @@ -14,7 +14,7 @@ pub mod set_schedule; pub mod wait; #[cfg(feature = "email")] -pub mod create_draft; +pub mod create_draft_keyword; #[cfg(feature = "web_automation")] pub mod get_website; diff --git a/src/basic/mod.rs b/src/basic/mod.rs index 643e1c27..1ac85108 100644 --- a/src/basic/mod.rs +++ b/src/basic/mod.rs @@ -28,7 +28,7 @@ use self::keywords::create_draft_keyword; use self::keywords::get_website::get_website_keyword; pub struct ScriptService { - engine: Engine, + pub engine: Engine, state: Arc, user: UserSession, } @@ -56,8 +56,8 @@ impl ScriptService { print_keyword(&state, user.clone(), &mut engine); on_keyword(&state, user.clone(), &mut engine); set_schedule_keyword(&state, user.clone(), &mut engine); - hear_keyword(&state, user.clone(), &mut engine); - talk_keyword(&state, user.clone(), &mut engine); + hear_keyword(state.clone(), user.clone(), &mut engine); + talk_keyword(state.clone(), user.clone(), &mut engine); set_context_keyword(&state, user.clone(), &mut engine); #[cfg(feature = "web_automation")] @@ -141,6 +141,7 @@ impl ScriptService { "HEAR", "TALK", "SET CONTEXT", + "SET USER", ]; let is_basic_command = basic_commands.iter().any(|&cmd| trimmed.starts_with(cmd)); diff --git a/src/bot/mod.rs b/src/bot/mod.rs index a2e133fd..0ed7728d 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -125,6 +125,37 @@ impl BotOrchestrator { Ok(()) } + pub async fn send_direct_message( + &self, + session_id: &str, + channel: &str, + content: &str, + ) -> Result<(), Box> { + debug!( + "Sending direct message to session {}: '{}'", + session_id, content + ); + + let bot_response = BotResponse { + bot_id: "default_bot".to_string(), + user_id: "default_user".to_string(), + session_id: session_id.to_string(), + channel: channel.to_string(), + content: content.to_string(), + message_type: 1, + stream_token: None, + is_complete: true, + }; + + if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { + adapter.send_message(bot_response).await?; + debug!("Direct message sent successfully"); + } else { + warn!("No channel adapter found for channel: {}", channel); + } + Ok(()) + } + pub async fn process_message( &self, message: UserMessage, @@ -143,28 +174,32 @@ impl BotOrchestrator { warn!("Invalid user ID provided, generated new UUID: {}", new_id); new_id }); - let bot_id = Uuid::parse_str(&message.bot_id) - .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); + + let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { + Uuid::parse_str(&bot_guid).unwrap_or_else(|_| { + warn!("Invalid BOT_GUID from env, using nil UUID"); + Uuid::nil() + }) + } else { + warn!("BOT_GUID not set in environment, using nil UUID"); + Uuid::nil() + }; debug!("Parsed user_id: {}, bot_id: {}", user_id, bot_id); let session = { let mut session_manager = self.state.session_manager.lock().await; - match session_manager.get_user_session(user_id, bot_id)? { + match session_manager.get_or_create_user_session(user_id, bot_id, "New Conversation")? { Some(session) => { debug!("Found existing session: {}", session.id); session } None => { - info!( - "Creating new session for user {} with bot {}", + error!( + "Failed to create session for user {} with bot {}", user_id, bot_id ); - let new_session = - session_manager.create_session(user_id, bot_id, "New Conversation")?; - debug!("New session created: {}", new_session.id); - Self::run_start_script(&new_session, Arc::clone(&self.state)).await; - new_session + return Err("Failed to create session".into()); } } }; @@ -296,43 +331,34 @@ impl BotOrchestrator { ); debug!("Message content: '{}'", message.content); - let mut user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| { + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| { let new_id = Uuid::new_v4(); warn!("Invalid user ID, generated new: {}", new_id); new_id }); - let bot_id = Uuid::parse_str(&message.bot_id).unwrap_or_else(|_| { - warn!("Invalid bot ID, using nil UUID"); + + let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { + Uuid::parse_str(&bot_guid).unwrap_or_else(|_| { + warn!("Invalid BOT_GUID from env, using nil UUID"); + Uuid::nil() + }) + } else { + warn!("BOT_GUID not set in environment, using nil UUID"); Uuid::nil() - }); + }; debug!("User ID: {}, Bot ID: {}", user_id, bot_id); - let mut auth = self.state.auth_service.lock().await; - let user_exists = auth.get_user_by_id(user_id)?; - - if user_exists.is_none() { - debug!("User {} not found, creating anonymous user", user_id); - user_id = auth.create_user("anonymous1", "anonymous@local", "password")?; - info!("Created new anonymous user: {}", user_id); - } else { - user_id = user_exists.unwrap().id; - debug!("Found existing user: {}", user_id); - } - let session = { let mut sm = self.state.session_manager.lock().await; - match sm.get_user_session(user_id, bot_id)? { + match sm.get_or_create_user_session(user_id, bot_id, "New Conversation")? { Some(sess) => { debug!("Using existing session: {}", sess.id); sess } None => { - info!("Creating new session for streaming"); - let new_session = sm.create_session(user_id, bot_id, "New Conversation")?; - debug!("New session created: {}", new_session.id); - Self::run_start_script(&new_session, Arc::clone(&self.state)).await; - new_session + error!("Failed to create session for streaming"); + return Err("Failed to create session".into()); } } }; @@ -557,23 +583,27 @@ impl BotOrchestrator { warn!("Invalid user ID, generated new: {}", new_id); new_id }); - let bot_id = Uuid::parse_str(&message.bot_id) - .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); + + let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { + Uuid::parse_str(&bot_guid).unwrap_or_else(|_| { + warn!("Invalid BOT_GUID from env, using nil UUID"); + Uuid::nil() + }) + } else { + warn!("BOT_GUID not set in environment, using nil UUID"); + Uuid::nil() + }; let session = { let mut session_manager = self.state.session_manager.lock().await; - match session_manager.get_user_session(user_id, bot_id)? { + match session_manager.get_or_create_user_session(user_id, bot_id, "New Conversation")? { Some(session) => { debug!("Found existing session: {}", session.id); session } None => { - info!("Creating new session for tools processing"); - let new_session = - session_manager.create_session(user_id, bot_id, "New Conversation")?; - debug!("New session created: {}", new_session.id); - Self::run_start_script(&new_session, Arc::clone(&self.state)).await; - new_session + error!("Failed to create session for tools processing"); + return Err("Failed to create session".into()); } } }; @@ -705,10 +735,17 @@ impl BotOrchestrator { Ok(()) } - async fn run_start_script(session: &UserSession, state: Arc) { - info!("Running start script for session: {}", session.id); - - let start_script_path = "start.bas"; + pub async fn run_start_script( + session: &UserSession, + state: Arc, + token_id: Option, + ) -> Result> { + info!( + "Running start script for session: {} with token_id: {:?}", + session.id, token_id + ); + + let start_script_path = "./templates/annoucements.gbai/annoucements.gbdialog/start.bas"; let start_script = match std::fs::read_to_string(start_script_path) { Ok(content) => { debug!("Loaded start script from {}", start_script_path); @@ -720,31 +757,39 @@ impl BotOrchestrator { } }; - debug!("Start script content for session {}: {}", session.id, start_script); + debug!( + "Start script content for session {}: {}", + session.id, start_script + ); let session_clone = session.clone(); let state_clone = state.clone(); - tokio::spawn(async move { - let state_for_run = state_clone.clone(); - match crate::basic::ScriptService::new(state_clone, session_clone.clone()) - .compile(&start_script) - .and_then(|ast| { - crate::basic::ScriptService::new(state_for_run, session_clone.clone()).run(&ast) - }) { - Ok(_) => { - info!( - "Start script executed successfully for session {}", - session_clone.id - ); - } - Err(e) => { - error!( - "Failed to run start script for session {}: {}", - session_clone.id, e - ); - } + + let script_service = crate::basic::ScriptService::new(state_clone, session_clone.clone()); + + if let Some(token_id_value) = token_id { + debug!("Token ID available for script: {}", token_id_value); + } + + match script_service + .compile(&start_script) + .and_then(|ast| script_service.run(&ast)) + { + Ok(result) => { + info!( + "Start script executed successfully for session {}, result: {}", + session_clone.id, result + ); + Ok(true) } - }); + Err(e) => { + error!( + "Failed to run start script for session {}: {}", + session_clone.id, e + ); + Ok(false) + } + } } pub async fn send_warning( @@ -795,6 +840,73 @@ impl BotOrchestrator { } } } + + pub async fn trigger_auto_welcome( + &self, + session_id: &str, + user_id: &str, + _bot_id: &str, + token_id: Option, + ) -> Result> { + info!( + "Triggering auto welcome for session: {} with token_id: {:?}", + session_id, token_id + ); + + let user_uuid = Uuid::parse_str(user_id).unwrap_or_else(|_| { + let new_id = Uuid::new_v4(); + warn!("Invalid user ID, generated new: {}", new_id); + new_id + }); + + let bot_uuid = if let Ok(bot_guid) = std::env::var("BOT_GUID") { + Uuid::parse_str(&bot_guid).unwrap_or_else(|_| { + warn!("Invalid BOT_GUID from env, using nil UUID"); + Uuid::nil() + }) + } else { + warn!("BOT_GUID not set in environment, using nil UUID"); + Uuid::nil() + }; + + let session = { + let mut session_manager = self.state.session_manager.lock().await; + match session_manager.get_or_create_user_session( + user_uuid, + bot_uuid, + "New Conversation", + )? { + Some(session) => { + debug!("Found existing session: {}", session.id); + session + } + None => { + error!("Failed to create session for auto welcome"); + return Ok(false); + } + } + }; + + let result = Self::run_start_script(&session, Arc::clone(&self.state), token_id).await?; + + info!( + "Auto welcome completed for session: {} with result: {}", + session_id, result + ); + Ok(result) + } + + async fn get_web_response_channel( + &self, + session_id: &str, + ) -> Result, Box> { + let response_channels = self.state.response_channels.lock().await; + if let Some(tx) = response_channels.get(session_id) { + Ok(tx.clone()) + } else { + Err("No response channel found for session".into()) + } + } } impl Default for BotOrchestrator { @@ -831,10 +943,12 @@ async fn websocket_handler( .add_connection(session_id.clone(), tx.clone()) .await; + let bot_id = std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + orchestrator .send_event( "default_user", - "default_bot", + &bot_id, &session_id, "web", "session_start", @@ -845,6 +959,19 @@ async fn websocket_handler( ) .await .ok(); + + let orchestrator_clone = BotOrchestrator::new(Arc::clone(&data)); + let session_id_clone = session_id.clone(); + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + if let Err(e) = orchestrator_clone + .trigger_auto_welcome(&session_id_clone, "default_user", &bot_id, None) + .await + { + error!("Failed to trigger auto welcome: {}", e); + } + }); + let web_adapter = data.web_adapter.clone(); let session_id_clone1 = session_id.clone(); let session_id_clone2 = session_id.clone(); @@ -883,8 +1010,11 @@ async fn websocket_handler( message_count += 1; debug!("Received WebSocket message {}: {}", message_count, text); + let bot_id = + std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + let user_message = UserMessage { - bot_id: "default_bot".to_string(), + bot_id: bot_id, user_id: "default_user".to_string(), session_id: session_id_clone2.clone(), channel: "web".to_string(), @@ -903,10 +1033,14 @@ async fn websocket_handler( } WsMessage::Close(_) => { info!("WebSocket close received for session {}", session_id_clone2); + + let bot_id = + std::env::var("BOT_GUID").unwrap_or_else(|_| "default_bot".to_string()); + orchestrator .send_event( "default_user", - "default_bot", + &bot_id, &session_id_clone2, "web", "session_end", @@ -1067,17 +1201,112 @@ async fn voice_stop( } } +#[actix_web::post("/api/start")] +async fn start_session( + data: web::Data, + info: web::Json, +) -> Result { + let session_id = info + .get("session_id") + .and_then(|s| s.as_str()) + .unwrap_or(""); + let token_id = info + .get("token_id") + .and_then(|t| t.as_str()) + .map(|s| s.to_string()); + + info!( + "Starting session: {} with token_id: {:?}", + session_id, token_id + ); + let session_uuid = match Uuid::parse_str(session_id) { + Ok(uuid) => uuid, + Err(_) => { + warn!("Invalid session ID format: {}", session_id); + return Ok( + HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"})) + ); + } + }; + + let session = { + let mut session_manager = data.session_manager.lock().await; + match session_manager.get_session_by_id(session_uuid) { + Ok(Some(s)) => { + debug!("Found existing session: {}", session_uuid); + s + } + Ok(None) => { + warn!("Session not found: {}", session_uuid); + return Ok(HttpResponse::NotFound() + .json(serde_json::json!({"error": "Session not found"}))); + } + Err(e) => { + error!("Error retrieving session {}: {}", session_uuid, e); + return Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Failed to retrieve session"}))); + } + } + }; + + let result = BotOrchestrator::run_start_script(&session, Arc::clone(&data), token_id).await; + + match result { + Ok(true) => { + info!( + "Start script completed successfully for session: {}", + session_id + ); + Ok(HttpResponse::Ok().json(serde_json::json!({ + "status": "started", + "session_id": session.id, + "result": "success" + }))) + } + Ok(false) => { + warn!("Start script returned false for session: {}", session_id); + Ok(HttpResponse::Ok().json(serde_json::json!({ + "status": "started", + "session_id": session.id, + "result": "failed" + }))) + } + Err(e) => { + error!( + "Error running start script for session {}: {}", + session_id, e + ); + Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))) + } + } +} + #[actix_web::post("/api/sessions")] async fn create_session(data: web::Data) -> Result { info!("Creating new session"); let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); - let bot_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(); + + let bot_id = if let Ok(bot_guid) = std::env::var("BOT_GUID") { + Uuid::parse_str(&bot_guid).unwrap_or_else(|_| { + warn!("Invalid BOT_GUID from env, using nil UUID"); + Uuid::nil() + }) + } else { + warn!("BOT_GUID not set in environment, using nil UUID"); + Uuid::nil() + }; let session = { let mut session_manager = data.session_manager.lock().await; - match session_manager.create_session(user_id, bot_id, "New Conversation") { - Ok(s) => s, + match session_manager.get_or_create_user_session(user_id, bot_id, "New Conversation") { + Ok(Some(s)) => s, + Ok(None) => { + error!("Failed to create session"); + return Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Failed to create session"}))); + } Err(e) => { error!("Failed to create session: {}", e); return Ok(HttpResponse::InternalServerError() diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 12fd19d7..0690af1a 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use log::info; +use log::{debug, info}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; @@ -32,6 +32,28 @@ impl WebChannelAdapter { pub async fn remove_connection(&self, session_id: &str) { self.connections.lock().await.remove(session_id); } + pub async fn send_message_to_session( + &self, + session_id: &str, + message: BotResponse, + ) -> Result<(), Box> { + let connections = self.connections.lock().await; + if let Some(tx) = connections.get(session_id) { + if let Err(e) = tx.send(message).await { + log::error!( + "Failed to send message to WebSocket session {}: {}", + session_id, + e + ); + return Err(Box::new(e)); + } + debug!("Message sent to WebSocket session: {}", session_id); + Ok(()) + } else { + debug!("No WebSocket connection found for session: {}", session_id); + Err("No WebSocket connection found".into()) + } + } } #[async_trait] diff --git a/src/main.rs b/src/main.rs index b05ed79a..2a3fac37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,8 +27,9 @@ mod tools; mod whatsapp; use crate::bot::{ - create_session, get_session_history, get_sessions, index, set_mode_handler, static_files, - voice_start, voice_stop, websocket_handler, whatsapp_webhook, whatsapp_webhook_verify, + create_session, get_session_history, get_sessions, index, set_mode_handler, start_session, + static_files, voice_start, voice_stop, websocket_handler, whatsapp_webhook, + whatsapp_webhook_verify, }; use crate::channels::{VoiceAdapter, WebChannelAdapter}; use crate::config::AppConfig; @@ -188,6 +189,7 @@ async fn main() -> std::io::Result<()> { .service(voice_stop) .service(create_session) .service(get_sessions) + .service(start_session) .service(get_session_history) .service(set_mode_handler) .service(chat_completions_local) diff --git a/src/session/mod.rs b/src/session/mod.rs index c1c7b9df..fbfde18a 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1,7 +1,7 @@ use chrono::Utc; use diesel::prelude::*; use diesel::PgConnection; -use log::info; +use log::{debug, error, info, warn}; use redis::Client; use serde::{Deserialize, Serialize}; @@ -41,13 +41,16 @@ impl SessionManager { &mut self, session_id: Uuid, input: String, - ) -> Result<(), Box> { + ) -> Result, Box> { info!( "SessionManager.provide_input called for session {}", session_id ); + if let Some(sess) = self.sessions.get_mut(&session_id) { sess.data = input; + self.waiting_for_input.remove(&session_id); + Ok(Some("user_input".to_string())) } else { let sess = SessionData { id: session_id, @@ -55,9 +58,9 @@ impl SessionManager { data: input, }; self.sessions.insert(session_id, sess); + self.waiting_for_input.remove(&session_id); + Ok(Some("user_input".to_string())) } - self.waiting_for_input.remove(&session_id); - Ok(()) } pub fn is_waiting_for_input(&self, session_id: &Uuid) -> bool { @@ -69,6 +72,20 @@ impl SessionManager { info!("Session {} marked as waiting for input", session_id); } + pub fn get_session_by_id( + &mut self, + session_id: Uuid, + ) -> Result, Box> { + use crate::shared::models::user_sessions::dsl::*; + + let result = user_sessions + .filter(id.eq(session_id)) + .first::(&mut self.conn) + .optional()?; + + Ok(result) + } + pub fn get_user_session( &mut self, uid: Uuid, @@ -86,6 +103,21 @@ impl SessionManager { Ok(result) } + pub fn get_or_create_user_session( + &mut self, + uid: Uuid, + bid: Uuid, + session_title: &str, + ) -> Result, Box> { + if let Some(existing) = self.get_user_session(uid, bid)? { + debug!("Found existing session: {}", existing.id); + return Ok(Some(existing)); + } + + info!("Creating new session for user {} with bot {}", uid, bid); + self.create_session(uid, bid, session_title).map(Some) + } + pub fn create_session( &mut self, uid: Uuid, @@ -93,21 +125,35 @@ impl SessionManager { session_title: &str, ) -> Result> { use crate::shared::models::user_sessions::dsl::*; - - // Return an existing session if one already matches the user, bot, and title. - if let Some(existing) = user_sessions - .filter(user_id.eq(uid)) - .filter(bot_id.eq(bid)) - .filter(title.eq(session_title)) - .first::(&mut self.conn) - .optional()? - { - return Ok(existing); - } + use crate::shared::models::users::dsl as users_dsl; let now = Utc::now(); - // Insert the new session and retrieve the full record in one step. + let user_exists: Option = users_dsl::users + .filter(users_dsl::id.eq(uid)) + .select(users_dsl::id) + .first(&mut self.conn) + .optional()?; + + if user_exists.is_none() { + warn!( + "User {} does not exist in database, creating placeholder user", + uid + ); + diesel::insert_into(users_dsl::users) + .values(( + users_dsl::id.eq(uid), + users_dsl::username.eq(format!("anonymous_{}", rand::random::())), + users_dsl::email.eq(format!("anonymous_{}@local", rand::random::())), + users_dsl::password_hash.eq("placeholder"), + users_dsl::is_active.eq(true), + users_dsl::created_at.eq(now), + users_dsl::updated_at.eq(now), + )) + .execute(&mut self.conn)?; + info!("Created placeholder user: {}", uid); + } + let inserted: UserSession = diesel::insert_into(user_sessions) .values(( id.eq(Uuid::new_v4()), @@ -121,8 +167,13 @@ impl SessionManager { updated_at.eq(now), )) .returning(UserSession::as_returning()) - .get_result(&mut self.conn)?; + .get_result(&mut self.conn) + .map_err(|e| { + error!("Failed to create session in database: {}", e); + e + })?; + info!("New session created: {}", inserted.id); Ok(inserted) } @@ -139,7 +190,8 @@ impl SessionManager { let next_index = message_history .filter(session_id.eq(sess_id)) .count() - .get_result::(&mut self.conn)?; + .get_result::(&mut self.conn) + .unwrap_or(0); diesel::insert_into(message_history) .values(( @@ -154,23 +206,39 @@ impl SessionManager { )) .execute(&mut self.conn)?; + debug!( + "Message saved for session {} with index {}", + sess_id, next_index + ); Ok(()) } pub fn get_conversation_history( &mut self, - _sess_id: Uuid, + sess_id: Uuid, _uid: Uuid, ) -> Result, Box> { - // use crate::shared::models::message_history::dsl::*; + use crate::shared::models::message_history::dsl::*; - // let messages = message_history - // .filter(session_id.eq(sess_id)) - // .order(message_index.asc()) - // .select((role, content_encrypted)) - // .load::<(String, String)>(&mut self.conn)?; + let messages = message_history + .filter(session_id.eq(sess_id)) + .order(message_index.asc()) + .select((role, content_encrypted)) + .load::<(i32, String)>(&mut self.conn)?; - Ok(vec![]) + let history = messages + .into_iter() + .map(|(other_role, content)| { + let role_str = match other_role { + 0 => "user".to_string(), + 1 => "assistant".to_string(), + _ => "unknown".to_string(), + }; + (role_str, content) + }) + .collect(); + + Ok(history) } pub fn get_user_sessions( @@ -195,10 +263,16 @@ impl SessionManager { ) -> Result<(), Box> { use crate::shared::models::user_sessions::dsl::*; - let user_uuid = Uuid::parse_str(uid)?; - let bot_uuid = Uuid::parse_str(bid)?; + let user_uuid = Uuid::parse_str(uid).map_err(|e| { + warn!("Invalid user ID format: {}", uid); + e + })?; + let bot_uuid = Uuid::parse_str(bid).map_err(|e| { + warn!("Invalid bot ID format: {}", bid); + e + })?; - diesel::update( + let updated_count = diesel::update( user_sessions .filter(user_id.eq(user_uuid)) .filter(bot_id.eq(bot_uuid)), @@ -206,6 +280,35 @@ impl SessionManager { .set((answer_mode.eq(mode), updated_at.eq(chrono::Utc::now()))) .execute(&mut self.conn)?; + if updated_count == 0 { + warn!("No session found for user {} and bot {}", uid, bid); + } else { + debug!( + "Answer mode updated to {} for user {} and bot {}", + mode, uid, bid + ); + } + + Ok(()) + } + + pub fn update_user_id( + &mut self, + session_id: Uuid, + new_user_id: Uuid, + ) -> Result<(), Box> { + use crate::shared::models::user_sessions::dsl::*; + + let updated_count = diesel::update(user_sessions.filter(id.eq(session_id))) + .set((user_id.eq(new_user_id), updated_at.eq(chrono::Utc::now()))) + .execute(&mut self.conn)?; + + if updated_count == 0 { + warn!("No session found with ID: {}", session_id); + } else { + info!("Updated session {} to user ID: {}", session_id, new_user_id); + } + Ok(()) } } diff --git a/templates/annoucements.gbai/annoucements.gbdialog/start-ctx.bas b/templates/annoucements.gbai/annoucements.gbdialog/start-ctx.bas new file mode 100644 index 00000000..58f97a65 --- /dev/null +++ b/templates/annoucements.gbai/annoucements.gbdialog/start-ctx.bas @@ -0,0 +1,8 @@ +TALK "Welcome to General Bots! What is your name?" +HEAR name +TALK "Hello, " + name + +text = GET "default.pdf" +SET_CONTEXT text + +resume = LLM "Build a resume from " + text diff --git a/templates/annoucements.gbai/annoucements.gbdialog/start.bas b/templates/annoucements.gbai/annoucements.gbdialog/start.bas index b688960a..08e611d8 100644 --- a/templates/annoucements.gbai/annoucements.gbdialog/start.bas +++ b/templates/annoucements.gbai/annoucements.gbdialog/start.bas @@ -1,8 +1,5 @@ TALK "Welcome to General Bots!" +TALK "What is your name?" HEAR name -TALK "Hello, " + name - -text = GET "default.pdf" -SET CONTEXT text - -resume = LLM "Build a resume from " + text +TALK "Hello " + name + ", nice to meet you!" +SET_USER "92fcffaa-bf0a-41a9-8d99-5541709d695b" diff --git a/web/index.html b/web/index.html index 67a10420..6588a8fe 100644 --- a/web/index.html +++ b/web/index.html @@ -8,6 +8,7 @@ + @@ -457,6 +575,16 @@ const sendBtn = document.getElementById("sendBtn"); const newChatBtn = document.getElementById("newChatBtn"); + // Configure marked for markdown parsing + marked.setOptions({ + highlight: function (code, lang) { + // Simple syntax highlighting - you could integrate highlight.js here + return `
${code}
`; + }, + breaks: true, + gfm: true, + }); + // Initialize createNewSession(); @@ -487,6 +615,13 @@ if (isVoiceMode) { await startVoiceSession(); } + await fetch("/api/start", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + session_id: currentSessionId, + }), + }); } function switchSession(sessionId) { @@ -645,10 +780,16 @@ const msg = document.createElement("div"); msg.className = "mb-8"; + // Parse markdown for assistant messages + const processedContent = + role === "assistant" || role === "voice" + ? marked.parse(content) + : content; + if (role === "user") { msg.innerHTML = `
${content}
`; } else if (role === "assistant") { - msg.innerHTML = `
D
${streaming ? "" : content}
`; + msg.innerHTML = `
D
${streaming ? "" : processedContent}
`; } else { // Voice message msg.innerHTML = `
D
${content}
`; @@ -667,7 +808,10 @@ function updateLastMessage(content) { const m = document.getElementById(streamingMessageId); if (m) { - m.textContent += content; + // Parse markdown incrementally during streaming + const currentContent = m.textContent || m.innerText; + const newContent = currentContent + content; + m.innerHTML = marked.parse(newContent); messagesDiv.scrollTop = messagesDiv.scrollHeight; } } @@ -896,6 +1040,34 @@ }), }); }; + + // Test markdown functionality + window.testMarkdown = function () { + const markdownContent = `# Título Principal + +## Subtítulo + +Este é um **texto em negrito** e este é um *texto em itálico*. + +### Lista de Itens: +- Primeiro item +- Segundo item +- Terceiro item + +### Código: +\`\`\`javascript +function exemplo() { + console.log("Olá, mundo!"); + return 42; +} +\`\`\` + +> Esta é uma citação importante sobre o assunto. + +[Link para documentação](https://exemplo.com)`; + + addMessage("assistant", markdownContent); + };