From a7c74b837e936c95d3987f3871053adf8b37e1d4 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 13 Oct 2025 00:31:08 -0300 Subject: [PATCH] Enhance streaming with events and warning API - Introduce event-driven streaming with thinking_start, thinking_end, and warn events; skip sending analysis content to clients - Add /api/warn endpoint to dispatch warnings for sessions and channels; web UI displays alerts - Emit session_start/session_end events over WebSocket and instrument logging throughout orchestration - Update web client: show thinking indicator and warning banners; switch LiveKit client URL to CDN - Extend BotOrchestrator with send_event and send_warning, expand session/tool workflow - Improve REST endpoints for sessions/history with better logging and error handling - Update docs and prompts: DEV.md usage note; adjust dev build_prompt script --- docs/DEV.md | 6 + scripts/dev/build_prompt.sh | 13 +- src/bot/mod.rs | 631 +++++++++++++++++++++++++++++++++--- web/index.html | 161 ++++++++- 4 files changed, 749 insertions(+), 62 deletions(-) diff --git a/docs/DEV.md b/docs/DEV.md index a85748ad7..11655b44e 100644 --- a/docs/DEV.md +++ b/docs/DEV.md @@ -1,3 +1,9 @@ +# DEV + +curl -sSL https://get.livekit.io | bash +livekit-server --dev + + # Util diff --git a/scripts/dev/build_prompt.sh b/scripts/dev/build_prompt.sh index a0a88e75d..5a0328e73 100755 --- a/scripts/dev/build_prompt.sh +++ b/scripts/dev/build_prompt.sh @@ -9,8 +9,8 @@ echo "Consolidated LLM Context" > "$OUTPUT_FILE" prompts=( "../../prompts/dev/shared.md" "../../Cargo.toml" - "../../prompts/dev/fix.md" - #"../../prompts/dev/generation.md" + #../../prompts/dev/fix.md" + "../../prompts/dev/generation.md" ) for file in "${prompts[@]}"; do @@ -23,23 +23,24 @@ dirs=( #"automation" #"basic" "bot" - #"channels" + "channels" #"config" #"context" #"email" #"file" - #"llm" + "llm" #"llm_legacy" #"org" - #"session" + "session" "shared" #"tests" #"tools" #"web_automation" - #"whatsapp" + "whatsapp" ) for dir in "${dirs[@]}"; do find "$PROJECT_ROOT/src/$dir" -name "*.rs" | while read file; do + echo $file >> "$OUTPUT_FILE" cat "$file" >> "$OUTPUT_FILE" echo "" >> "$OUTPUT_FILE" done diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 1e7fbd49a..ddde1eaae 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,7 +1,7 @@ use actix_web::{web, HttpRequest, HttpResponse, Result}; use actix_ws::Message as WsMessage; use chrono::Utc; -use log::info; +use log::{debug, error, info, trace, warn}; use serde_json; use std::collections::HashMap; use std::fs; @@ -19,6 +19,7 @@ pub struct BotOrchestrator { impl BotOrchestrator { pub fn new(state: Arc) -> Self { + info!("Creating new BotOrchestrator instance"); Self { state } } @@ -27,22 +28,32 @@ impl BotOrchestrator { session_id: Uuid, user_input: &str, ) -> Result, Box> { + debug!( + "Handling user input for session {}: '{}'", + session_id, user_input + ); let mut session_manager = self.state.session_manager.lock().await; session_manager.provide_input(session_id, user_input.to_string())?; + debug!("User input handled for session {}", session_id); Ok(None) } pub async fn is_waiting_for_input(&self, session_id: Uuid) -> bool { + trace!("Checking if session {} is waiting for input", session_id); let session_manager = self.state.session_manager.lock().await; - session_manager.is_waiting_for_input(&session_id) + let result = session_manager.is_waiting_for_input(&session_id); + trace!("Session {} waiting for input: {}", session_id, result); + result } pub fn add_channel(&self, channel_type: &str, adapter: Arc) { + info!("Adding channel adapter for type: {}", channel_type); self.state .channels .lock() .unwrap() .insert(channel_type.to_string(), adapter); + debug!("Channel adapter for {} added successfully", channel_type); } pub async fn register_response_channel( @@ -50,11 +61,13 @@ impl BotOrchestrator { session_id: String, sender: mpsc::Sender, ) { + debug!("Registering response channel for session: {}", session_id); self.state .response_channels .lock() .await - .insert(session_id, sender); + .insert(session_id.clone(), sender); + trace!("Response channel registered for session: {}", session_id); } pub async fn set_user_answer_mode( @@ -63,8 +76,52 @@ impl BotOrchestrator { bot_id: &str, mode: i32, ) -> Result<(), Box> { + info!( + "Setting answer mode for user {} with bot {} to mode {}", + user_id, bot_id, mode + ); let mut session_manager = self.state.session_manager.lock().await; session_manager.update_answer_mode(user_id, bot_id, mode)?; + debug!("Answer mode updated successfully"); + Ok(()) + } + + pub async fn send_event( + &self, + user_id: &str, + bot_id: &str, + session_id: &str, + channel: &str, + event_type: &str, + data: serde_json::Value, + ) -> Result<(), Box> { + debug!( + "Sending event '{}' to session {} on channel {}", + event_type, session_id, channel + ); + + let event_response = BotResponse { + bot_id: bot_id.to_string(), + user_id: user_id.to_string(), + session_id: session_id.to_string(), + channel: channel.to_string(), + content: serde_json::to_string(&serde_json::json!({ + "event": event_type, + "data": data + }))?, + message_type: 2, // Event message type + stream_token: None, + is_complete: true, + }; + + trace!("Event response created: {:?}", event_response); + + if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { + adapter.send_message(event_response).await?; + debug!("Event sent successfully via channel adapter"); + } else { + warn!("No channel adapter found for channel: {}", channel); + } Ok(()) } @@ -73,28 +130,52 @@ impl BotOrchestrator { message: UserMessage, ) -> Result<(), Box> { info!( - "Processing message from channel: {}, user: {}", - message.channel, message.user_id + "Processing message from channel: {}, user: {}, session: {}", + message.channel, message.user_id, message.session_id + ); + debug!( + "Message content: '{}', type: {}", + message.content, message.message_type ); - let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| { + let new_id = Uuid::new_v4(); + warn!("Invalid user ID provided, generated new UUID: {}", new_id); + new_id + }); let bot_id = Uuid::parse_str(&message.bot_id) .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); + debug!("Parsed user_id: {}, bot_id: {}", user_id, bot_id); + let session = { let mut session_manager = self.state.session_manager.lock().await; match session_manager.get_user_session(user_id, bot_id)? { - Some(session) => session, + Some(session) => { + debug!("Found existing session: {}", session.id); + session + } None => { + info!( + "Creating new session for user {} with bot {}", + user_id, bot_id + ); let new_session = session_manager.create_session(user_id, bot_id, "New Conversation")?; + debug!("New session created: {}", new_session.id); Self::run_start_script(&new_session, Arc::clone(&self.state)).await; new_session } } }; + trace!("Current session state: {:?}", session); + if self.is_waiting_for_input(session.id).await { + debug!( + "Session {} is waiting for input, processing as variable input", + session.id + ); if let Some(variable_name) = self.handle_user_input(session.id, &message.content).await? { @@ -115,17 +196,20 @@ impl BotOrchestrator { is_complete: true, }; adapter.send_message(ack_response).await?; + debug!("Acknowledgment sent for variable storage"); } return Ok(()); } } if session.answer_mode == 1 && session.current_tool.is_some() { + debug!("Session in answer mode with active tool, providing user response"); self.state.tool_manager.provide_user_response( &message.user_id, &message.bot_id, message.content.clone(), )?; + trace!("User response provided to tool manager"); return Ok(()); } @@ -138,13 +222,16 @@ impl BotOrchestrator { &message.content, message.message_type, )?; + debug!("User message saved to session history"); } let response_content = self.direct_mode_handler(&message, &session).await?; + debug!("Generated response content: '{}'", response_content); { let mut session_manager = self.state.session_manager.lock().await; session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; + debug!("Bot response saved to session history"); } let bot_response = BotResponse { @@ -158,8 +245,13 @@ impl BotOrchestrator { is_complete: true, }; + trace!("Final bot response: {:?}", bot_response); + if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { adapter.send_message(bot_response).await?; + info!("Response sent successfully via channel adapter"); + } else { + warn!("No channel adapter found for channel: {}", message.channel); } Ok(()) @@ -170,17 +262,23 @@ impl BotOrchestrator { message: &UserMessage, session: &UserSession, ) -> Result> { + debug!("Using direct mode handler for session {}", session.id); + let history = { let mut session_manager = self.state.session_manager.lock().await; session_manager.get_conversation_history(session.id, session.user_id)? }; + debug!("Retrieved {} history entries", history.len()); + let mut prompt = String::new(); for (role, content) in history { prompt.push_str(&format!("{}: {}\n", role, content)); } prompt.push_str(&format!("User: {}\nAssistant:", message.content)); + trace!("Constructed prompt for LLM: {}", prompt); + self.state .llm_provider .generate(&prompt, &serde_json::Value::Null) @@ -192,32 +290,56 @@ impl BotOrchestrator { message: UserMessage, response_tx: mpsc::Sender, ) -> Result<(), Box> { - info!("Streaming response for user: {}", message.user_id); + info!( + "Streaming response for user: {}, session: {}", + message.user_id, message.session_id + ); + debug!("Message content: '{}'", message.content); + + let mut user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| { + let new_id = Uuid::new_v4(); + warn!("Invalid user ID, generated new: {}", new_id); + new_id + }); + let bot_id = Uuid::parse_str(&message.bot_id).unwrap_or_else(|_| { + warn!("Invalid bot ID, using nil UUID"); + Uuid::nil() + }); + + debug!("User ID: {}, Bot ID: {}", user_id, bot_id); - let mut user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); - let bot_id = Uuid::parse_str(&message.bot_id).unwrap_or_else(|_| Uuid::nil()); let mut auth = self.state.auth_service.lock().await; let user_exists = auth.get_user_by_id(user_id)?; if user_exists.is_none() { + debug!("User {} not found, creating anonymous user", user_id); user_id = auth.create_user("anonymous1", "anonymous@local", "password")?; + info!("Created new anonymous user: {}", user_id); } else { user_id = user_exists.unwrap().id; + debug!("Found existing user: {}", user_id); } let session = { let mut sm = self.state.session_manager.lock().await; match sm.get_user_session(user_id, bot_id)? { - Some(sess) => sess, + Some(sess) => { + debug!("Using existing session: {}", sess.id); + sess + } None => { + info!("Creating new session for streaming"); let new_session = sm.create_session(user_id, bot_id, "New Conversation")?; - Self::run_start_script(&new_session, Arc::clone(&self.state)).await; + debug!("New session created: {}", new_session.id); new_session } } }; + trace!("Session state: {:?}", session); + if session.answer_mode == 1 && session.current_tool.is_some() { + debug!("Session in answer mode, forwarding to tool manager"); self.state.tool_manager.provide_user_response( &message.user_id, &message.bot_id, @@ -235,33 +357,124 @@ impl BotOrchestrator { &message.content, message.message_type, )?; + debug!("User message saved for streaming session"); } let prompt = { let mut sm = self.state.session_manager.lock().await; let history = sm.get_conversation_history(session.id, user_id)?; let mut p = String::new(); - for (role, content) in history { + for (role, content) in &history { p.push_str(&format!("{}: {}\n", role, content)); } p.push_str(&format!("User: {}\nAssistant:", message.content)); + debug!( + "Stream prompt constructed with {} history entries", + history.len() + ); + trace!("Full prompt: {}", p); p }; let (stream_tx, mut stream_rx) = mpsc::channel::(100); let llm = self.state.llm_provider.clone(); + // Send thinking start event for web channels + if message.channel == "web" { + debug!("Sending thinking start event for web channel"); + self.send_event( + &message.user_id, + &message.bot_id, + &message.session_id, + &message.channel, + "thinking_start", + serde_json::json!({}), + ) + .await?; + } else { + // For non-web channels, send a single thinking message + debug!("Sending thinking message for non-web channel"); + let thinking_response = BotResponse { + bot_id: message.bot_id.clone(), + user_id: message.user_id.clone(), + session_id: message.session_id.clone(), + channel: message.channel.clone(), + content: "Thinking...".to_string(), + message_type: 1, + stream_token: None, + is_complete: true, + }; + response_tx.send(thinking_response).await?; + } + + info!("Starting LLM stream generation"); tokio::spawn(async move { if let Err(e) = llm .generate_stream(&prompt, &serde_json::Value::Null, stream_tx) .await { - log::error!("LLM streaming error: {}", e); + error!("LLM streaming error: {}", e); + } else { + debug!("LLM stream generation completed"); } }); let mut full_response = String::new(); + let mut analysis_buffer = String::new(); + let mut in_analysis = false; + let mut chunk_count = 0; + + debug!("Starting to process stream chunks"); while let Some(chunk) = stream_rx.recv().await { + chunk_count += 1; + trace!("Received chunk {}: '{}'", chunk_count, chunk); + + // Handle analysis tags - don't send analysis content to client + analysis_buffer.push_str(&chunk); + + if analysis_buffer.contains("<|channel|>") { + debug!("Analysis section started"); + in_analysis = true; + } + + if in_analysis { + // Check for end of analysis + if analysis_buffer.ends_with("final<|message|>") { + debug!( + "Analysis section completed, buffer length: {}", + analysis_buffer.len() + ); + in_analysis = false; + // Clear buffer if we're not in analysis + analysis_buffer.clear(); + + // Send thinking end event for web channels + if message.channel == "web" { + let orchestrator = BotOrchestrator::new(Arc::clone(&self.state)); + orchestrator + .send_event( + &message.user_id, + &message.bot_id, + &message.session_id, + &message.channel, + "thinking_end", + serde_json::json!({ + "user_id": message.user_id.clone() + }), + ) + .await + .ok(); + } + analysis_buffer.clear(); + continue; + } + + // Skip sending analysis content to client + trace!("Skipping analysis chunk"); + continue; + } + + // Only send non-analysis content to client full_response.push_str(&chunk); let partial = BotResponse { @@ -276,13 +489,21 @@ impl BotOrchestrator { }; if response_tx.send(partial).await.is_err() { + warn!("Response channel closed, stopping stream processing"); break; } } + debug!( + "Stream processing completed, {} chunks processed", + chunk_count + ); + info!("Full response length: {} characters", full_response.len()); + { let mut sm = self.state.session_manager.lock().await; sm.save_message(session.id, user_id, 2, &full_response, 1)?; + debug!("Stream response saved to session history"); } let final_msg = BotResponse { @@ -297,6 +518,8 @@ impl BotOrchestrator { }; response_tx.send(final_msg).await?; + debug!("Final stream message sent"); + Ok(()) } @@ -304,8 +527,11 @@ impl BotOrchestrator { &self, user_id: Uuid, ) -> Result, Box> { + debug!("Getting sessions for user: {}", user_id); let mut session_manager = self.state.session_manager.lock().await; - session_manager.get_user_sessions(user_id) + let sessions = session_manager.get_user_sessions(user_id)?; + debug!("Found {} sessions for user {}", sessions.len(), user_id); + Ok(sessions) } pub async fn get_conversation_history( @@ -313,8 +539,14 @@ impl BotOrchestrator { session_id: Uuid, user_id: Uuid, ) -> Result, Box> { + debug!( + "Getting conversation history for session {} user {}", + session_id, user_id + ); let mut session_manager = self.state.session_manager.lock().await; - session_manager.get_conversation_history(session_id, user_id) + let history = session_manager.get_conversation_history(session_id, user_id)?; + debug!("Retrieved {} history entries", history.len()); + Ok(history) } pub async fn process_message_with_tools( @@ -322,21 +554,31 @@ impl BotOrchestrator { message: UserMessage, ) -> Result<(), Box> { info!( - "Processing message with tools from user: {}", - message.user_id + "Processing message with tools from user: {}, session: {}", + message.user_id, message.session_id ); + debug!("Message content: '{}'", message.content); - let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| Uuid::new_v4()); + let user_id = Uuid::parse_str(&message.user_id).unwrap_or_else(|_| { + let new_id = Uuid::new_v4(); + warn!("Invalid user ID, generated new: {}", new_id); + new_id + }); let bot_id = Uuid::parse_str(&message.bot_id) .unwrap_or_else(|_| Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap()); let session = { let mut session_manager = self.state.session_manager.lock().await; match session_manager.get_user_session(user_id, bot_id)? { - Some(session) => session, + Some(session) => { + debug!("Found existing session: {}", session.id); + session + } None => { + info!("Creating new session for tools processing"); let new_session = session_manager.create_session(user_id, bot_id, "New Conversation")?; + debug!("New session created: {}", new_session.id); Self::run_start_script(&new_session, Arc::clone(&self.state)).await; new_session } @@ -352,6 +594,7 @@ impl BotOrchestrator { &message.content, message.message_type, )?; + debug!("User message saved for tools processing"); } let is_tool_waiting = self @@ -362,6 +605,10 @@ impl BotOrchestrator { .unwrap_or(false); if is_tool_waiting { + debug!( + "Tool is waiting for input, providing: '{}'", + message.content + ); self.state .tool_manager .provide_input(&message.session_id, &message.content) @@ -373,13 +620,14 @@ impl BotOrchestrator { .get_tool_output(&message.session_id) .await { + debug!("Retrieved {} tool output entries", tool_output.len()); for output in tool_output { let bot_response = BotResponse { bot_id: message.bot_id.clone(), user_id: message.user_id.clone(), session_id: message.session_id.clone(), channel: message.channel.clone(), - content: output, + content: output.clone(), message_type: 1, stream_token: None, is_complete: true, @@ -388,6 +636,7 @@ impl BotOrchestrator { if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { adapter.send_message(bot_response).await?; + debug!("Tool output sent: '{}'", output); } } } @@ -398,6 +647,7 @@ impl BotOrchestrator { || message.content.to_lowercase().contains("calculate") || message.content.to_lowercase().contains("math") { + debug!("Message requires calculator tool"); match self .state .tool_manager @@ -405,16 +655,18 @@ impl BotOrchestrator { .await { Ok(tool_result) => { + debug!("Calculator tool executed successfully"); let mut session_manager = self.state.session_manager.lock().await; session_manager.save_message(session.id, user_id, 2, &tool_result.output, 2)?; - tool_result.output } Err(e) => { + error!("Calculator tool error: {}", e); format!("I encountered an error starting the calculator: {}", e) } } } else { + debug!("Using LLM for response generation"); let available_tools = self.state.tool_manager.list_tools(); let tools_context = if !available_tools.is_empty() { format!("\n\nAvailable tools: {}. If the user needs calculations, suggest using the calculator tool.", available_tools.join(", ")) @@ -423,6 +675,7 @@ impl BotOrchestrator { }; let full_prompt = format!("{}{}", message.content, tools_context); + trace!("Full prompt with tools context: {}", full_prompt); self.state .llm_provider @@ -430,9 +683,12 @@ impl BotOrchestrator { .await? }; + debug!("Generated response: '{}'", response); + { let mut session_manager = self.state.session_manager.lock().await; session_manager.save_message(session.id, user_id, 2, &response, 1)?; + debug!("Response saved to session history"); } let bot_response = BotResponse { @@ -448,43 +704,101 @@ impl BotOrchestrator { if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { adapter.send_message(bot_response).await?; + info!("Tools response sent successfully"); + } else { + warn!("No channel adapter found for channel: {}", message.channel); } Ok(()) } async fn run_start_script(session: &UserSession, state: Arc) { + info!("Running start script for session: {}", session.id); let start_script = r#" TALK "Welcome to General Bots!" -HEAR name -TALK "Hello, " + name - -text = GET "default.pdf" -SET CONTEXT text - -resume = LLM "Build a resume from " + text "#; - info!("Running start.bas for session: {}", session.id); + debug!("Start script content loaded for session {}", session.id); let session_clone = session.clone(); let state_clone = state.clone(); tokio::spawn(async move { let state_for_run = state_clone.clone(); - if let Err(e) = crate::basic::ScriptService::new(state_clone, session_clone.clone()) + match crate::basic::ScriptService::new(state_clone, session_clone.clone()) .compile(start_script) .and_then(|ast| { crate::basic::ScriptService::new(state_for_run, session_clone.clone()).run(&ast) - }) - { - log::error!("Failed to run start.bas: {}", e); + }) { + Ok(_) => { + info!( + "Start script executed successfully for session {}", + session_clone.id + ); + } + Err(e) => { + error!( + "Failed to run start script for session {}: {}", + session_clone.id, e + ); + } } }); } + + pub async fn send_warning( + &self, + session_id: &str, + channel: &str, + message: &str, + ) -> Result<(), Box> { + warn!( + "Sending warning to session {} on channel {}: {}", + session_id, channel, message + ); + + if channel == "web" { + debug!("Sending warning as web event"); + self.send_event( + "system", + "system", + session_id, + channel, + "warn", + serde_json::json!({ + "message": message, + "timestamp": Utc::now().to_rfc3339() + }), + ) + .await + } else { + // For non-web channels, send as regular message + debug!("Sending warning as regular message"); + if let Some(adapter) = self.state.channels.lock().unwrap().get(channel) { + let warn_response = BotResponse { + bot_id: "system".to_string(), + user_id: "system".to_string(), + session_id: session_id.to_string(), + channel: channel.to_string(), + content: format!("⚠️ WARNING: {}", message), + message_type: 1, + stream_token: None, + is_complete: true, + }; + adapter.send_message(warn_response).await + } else { + warn!( + "No channel adapter found for warning on channel: {}", + channel + ); + Ok(()) + } + } + } } impl Default for BotOrchestrator { fn default() -> Self { + info!("Creating default BotOrchestrator"); Self { state: Arc::new(AppState::default()), } @@ -497,10 +811,14 @@ async fn websocket_handler( stream: web::Payload, data: web::Data, ) -> Result { + info!("WebSocket connection attempt"); + let (res, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; let session_id = Uuid::new_v4().to_string(); let (tx, mut rx) = mpsc::channel::(100); + info!("WebSocket session established: {}", session_id); + let orchestrator = BotOrchestrator::new(Arc::clone(&data)); orchestrator .register_response_channel(session_id.clone(), tx.clone()) @@ -512,24 +830,65 @@ async fn websocket_handler( .add_connection(session_id.clone(), tx.clone()) .await; + // Send session start event + orchestrator + .send_event( + "default_user", + "default_bot", + &session_id, + "web", + "session_start", + serde_json::json!({ + "session_id": session_id, + "timestamp": Utc::now().to_rfc3339() + }), + ) + .await + .ok(); let web_adapter = data.web_adapter.clone(); + let session_id_clone1 = session_id.clone(); + let session_id_clone2 = session_id.clone(); + // Spawn task to send messages to WebSocket actix_web::rt::spawn(async move { + info!( + "Starting WebSocket sender for session {}", + session_id_clone1 + ); + let mut message_count = 0; while let Some(msg) = rx.recv().await { + message_count += 1; if let Ok(json) = serde_json::to_string(&msg) { - let _ = session.text(json).await; + trace!("Sending WebSocket message {}: {}", message_count, json); + if let Err(e) = session.text(json).await { + warn!("Failed to send WebSocket message {}: {}", message_count, e); + break; + } } } + info!( + "WebSocket sender terminated for session {}, sent {} messages", + session_id_clone1, message_count + ); }); + // Spawn task to receive messages from WebSocket actix_web::rt::spawn(async move { + info!( + "Starting WebSocket receiver for session {}", + session_id_clone2 + ); + let mut message_count = 0; while let Some(Ok(msg)) = msg_stream.recv().await { match msg { WsMessage::Text(text) => { + message_count += 1; + debug!("Received WebSocket message {}: {}", message_count, text); + let user_message = UserMessage { bot_id: "default_bot".to_string(), user_id: "default_user".to_string(), - session_id: session_id.clone(), + session_id: session_id_clone2.clone(), channel: "web".to_string(), content: text.to_string(), message_type: 1, @@ -538,18 +897,45 @@ async fn websocket_handler( }; if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { - info!("Error processing message: {}", e); + error!( + "Error processing WebSocket message {}: {}", + message_count, e + ); } } WsMessage::Close(_) => { - web_adapter.remove_connection(&session_id).await; + info!("WebSocket close received for session {}", session_id_clone2); + // Send session end event + orchestrator + .send_event( + "default_user", + "default_bot", + &session_id_clone2, + "web", + "session_end", + serde_json::json!({}), + ) + .await + .ok(); + + web_adapter.remove_connection(&session_id_clone2).await; break; } - _ => {} + _ => { + trace!("Received non-text WebSocket message"); + } } } + info!( + "WebSocket receiver terminated for session {}, processed {} messages", + session_id_clone2, message_count + ); }); + info!( + "WebSocket handler setup completed for session {}", + session_id + ); Ok(res) } @@ -558,14 +944,27 @@ async fn whatsapp_webhook_verify( data: web::Data, web::Query(params): web::Query>, ) -> Result { + info!("WhatsApp webhook verification request"); + let empty = String::new(); let mode = params.get("hub.mode").unwrap_or(&empty); let token = params.get("hub.verify_token").unwrap_or(&empty); let challenge = params.get("hub.challenge").unwrap_or(&empty); + debug!( + "Verification params - mode: {}, token: {}, challenge: {}", + mode, token, challenge + ); + match data.whatsapp_adapter.verify_webhook(mode, token, challenge) { - Ok(challenge_response) => Ok(HttpResponse::Ok().body(challenge_response)), - Err(_) => Ok(HttpResponse::Forbidden().body("Verification failed")), + Ok(challenge_response) => { + info!("WhatsApp webhook verification successful"); + Ok(HttpResponse::Ok().body(challenge_response)) + } + Err(_) => { + warn!("WhatsApp webhook verification failed"); + Ok(HttpResponse::Forbidden().body("Verification failed")) + } } } @@ -574,22 +973,25 @@ async fn whatsapp_webhook( data: web::Data, payload: web::Json, ) -> Result { + info!("WhatsApp webhook message received"); + match data .whatsapp_adapter .process_incoming_message(payload.into_inner()) .await { Ok(user_messages) => { + info!("Processed {} WhatsApp messages", user_messages.len()); for user_message in user_messages { let orchestrator = BotOrchestrator::new(Arc::clone(&data)); if let Err(e) = orchestrator.process_message(user_message).await { - log::error!("Error processing WhatsApp message: {}", e); + error!("Error processing WhatsApp message: {}", e); } } Ok(HttpResponse::Ok().body("")) } Err(e) => { - log::error!("Error processing WhatsApp webhook: {}", e); + error!("Error processing WhatsApp webhook: {}", e); Ok(HttpResponse::BadRequest().body("Invalid message")) } } @@ -609,15 +1011,28 @@ async fn voice_start( .and_then(|u| u.as_str()) .unwrap_or("user"); + info!( + "Voice session start request - session: {}, user: {}", + session_id, user_id + ); + match data .voice_adapter .start_voice_session(session_id, user_id) .await { Ok(token) => { + info!( + "Voice session started successfully for session {}", + session_id + ); Ok(HttpResponse::Ok().json(serde_json::json!({"token": token, "status": "started"}))) } Err(e) => { + error!( + "Failed to start voice session for session {}: {}", + session_id, e + ); Ok(HttpResponse::InternalServerError() .json(serde_json::json!({"error": e.to_string()}))) } @@ -634,9 +1049,21 @@ async fn voice_stop( .and_then(|s| s.as_str()) .unwrap_or(""); + info!("Voice session stop request - session: {}", session_id); + match data.voice_adapter.stop_voice_session(session_id).await { - Ok(()) => Ok(HttpResponse::Ok().json(serde_json::json!({"status": "stopped"}))), + Ok(()) => { + info!( + "Voice session stopped successfully for session {}", + session_id + ); + Ok(HttpResponse::Ok().json(serde_json::json!({"status": "stopped"}))) + } Err(e) => { + error!( + "Failed to stop voice session for session {}: {}", + session_id, e + ); Ok(HttpResponse::InternalServerError() .json(serde_json::json!({"error": e.to_string()}))) } @@ -644,10 +1071,27 @@ async fn voice_stop( } #[actix_web::post("/api/sessions")] -async fn create_session(_data: web::Data) -> Result { - let session_id = Uuid::new_v4(); +async fn create_session(data: web::Data) -> Result { + info!("Creating new session"); + + // Run start script for new session + let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + let bot_id = Uuid::parse_str("00000000-0000-0000-0000-000000000000").unwrap(); + + let session = { + let mut session_manager = data.session_manager.lock().await; + match session_manager.create_session(user_id, bot_id, "New Conversation") { + Ok(s) => s, + Err(e) => { + error!("Failed to create session: {}", e); + return Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))); + } + } + }; + Ok(HttpResponse::Ok().json(serde_json::json!({ - "session_id": session_id, + "session_id": session.id, "title": "New Conversation", "created_at": Utc::now() }))) @@ -655,11 +1099,16 @@ async fn create_session(_data: web::Data) -> Result { #[actix_web::get("/api/sessions")] async fn get_sessions(data: web::Data) -> Result { + info!("Getting sessions list"); let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); let orchestrator = BotOrchestrator::new(Arc::clone(&data)); match orchestrator.get_user_sessions(user_id).await { - Ok(sessions) => Ok(HttpResponse::Ok().json(sessions)), + Ok(sessions) => { + info!("Retrieved {} sessions", sessions.len()); + Ok(HttpResponse::Ok().json(sessions)) + } Err(e) => { + error!("Failed to get sessions: {}", e); Ok(HttpResponse::InternalServerError() .json(serde_json::json!({"error": e.to_string()}))) } @@ -672,6 +1121,8 @@ async fn get_session_history( path: web::Path, ) -> Result { let session_id = path.into_inner(); + info!("Getting session history for: {}", session_id); + let user_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); match Uuid::parse_str(&session_id) { @@ -681,12 +1132,23 @@ async fn get_session_history( .get_conversation_history(session_uuid, user_id) .await { - Ok(history) => Ok(HttpResponse::Ok().json(history)), - Err(e) => Ok(HttpResponse::InternalServerError() - .json(serde_json::json!({"error": e.to_string()}))), + Ok(history) => { + info!( + "Retrieved {} history entries for session {}", + history.len(), + session_id + ); + Ok(HttpResponse::Ok().json(history)) + } + Err(e) => { + error!("Failed to get session history for {}: {}", session_id, e); + Ok(HttpResponse::InternalServerError() + .json(serde_json::json!({"error": e.to_string()}))) + } } } Err(_) => { + warn!("Invalid session ID format: {}", session_id); Ok(HttpResponse::BadRequest().json(serde_json::json!({"error": "Invalid session ID"}))) } } @@ -697,6 +1159,8 @@ async fn set_mode_handler( data: web::Data, info: web::Json>, ) -> Result { + info!("Setting user answer mode"); + let default_user = "default_user".to_string(); let default_bot = "default_bot".to_string(); let default_mode = "0".to_string(); @@ -707,23 +1171,72 @@ async fn set_mode_handler( let mode = mode_str.parse::().unwrap_or(0); + debug!( + "Setting mode - user: {}, bot: {}, mode: {}", + user_id, bot_id, mode + ); + let orchestrator = BotOrchestrator::new(Arc::clone(&data)); if let Err(e) = orchestrator .set_user_answer_mode(user_id, bot_id, mode) .await { + error!("Failed to set answer mode: {}", e); return Ok( HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) ); } + info!("Answer mode updated successfully"); Ok(HttpResponse::Ok().json(serde_json::json!({"status": "mode_updated"}))) } +#[actix_web::post("/api/warn")] +async fn send_warning_handler( + data: web::Data, + info: web::Json>, +) -> Result { + let default_session = "default".to_string(); + let default_channel = "web".to_string(); + let default_message = "Warning!".to_string(); + + let session_id = info.get("session_id").unwrap_or(&default_session); + let channel = info.get("channel").unwrap_or(&default_channel); + let message = info.get("message").unwrap_or(&default_message); + + info!( + "Sending warning via API - session: {}, channel: {}", + session_id, channel + ); + + let orchestrator = BotOrchestrator::new(Arc::clone(&data)); + if let Err(e) = orchestrator + .send_warning(session_id, channel, message) + .await + { + error!("Failed to send warning: {}", e); + return Ok( + HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) + ); + } + + info!("Warning sent successfully"); + Ok(HttpResponse::Ok().json(serde_json::json!({"status": "warning_sent"}))) +} + #[actix_web::get("/")] async fn index() -> Result { - let html = fs::read_to_string("web/index.html").unwrap(); - Ok(HttpResponse::Ok().content_type("text/html").body(html)) + info!("Serving index page"); + match fs::read_to_string("web/index.html") { + Ok(html) => { + debug!("Index page loaded successfully"); + Ok(HttpResponse::Ok().content_type("text/html").body(html)) + } + Err(e) => { + error!("Failed to load index page: {}", e); + Ok(HttpResponse::InternalServerError().body("Failed to load index page")) + } + } } #[actix_web::get("/static/{filename:.*}")] @@ -731,8 +1244,15 @@ async fn static_files(req: HttpRequest) -> Result { let filename = req.match_info().query("filename"); let path = format!("static/{}", filename); + info!("Serving static file: {}", filename); + match fs::read(&path) { Ok(content) => { + debug!( + "Static file {} loaded successfully, size: {} bytes", + filename, + content.len() + ); let content_type = match filename { f if f.ends_with(".js") => "application/javascript", f if f.ends_with(".css") => "text/css", @@ -743,6 +1263,9 @@ async fn static_files(req: HttpRequest) -> Result { Ok(HttpResponse::Ok().content_type(content_type).body(content)) } - Err(_) => Ok(HttpResponse::NotFound().body("File not found")), + Err(e) => { + warn!("Static file not found: {} - {}", filename, e); + Ok(HttpResponse::NotFound().body("File not found")) + } } } diff --git a/web/index.html b/web/index.html index af6bbc436..67a104209 100644 --- a/web/index.html +++ b/web/index.html @@ -7,7 +7,7 @@ - + @@ -398,6 +449,8 @@ let mediaRecorder = null; let audioChunks = []; let streamingMessageId = null; + let isThinking = false; + let thinkingIndicatorId = null; const messagesDiv = document.getElementById("messages"); const input = document.getElementById("messageInput"); @@ -473,6 +526,14 @@ ws.onmessage = function (event) { const response = JSON.parse(event.data); + // Handle event messages (thinking_start, thinking_end, warn) + if (response.message_type === 2) { + const eventData = JSON.parse(response.content); + handleEvent(eventData.event, eventData.data); + return; + } + + // Handle regular messages if (!response.is_complete) { if (!isStreaming) { isStreaming = true; @@ -497,6 +558,81 @@ }; } + function handleEvent(eventType, eventData) { + console.log("Event received:", eventType, eventData); + + switch (eventType) { + case "thinking_start": + showThinkingIndicator(); + isThinking = true; + break; + + case "thinking_end": + hideThinkingIndicator(); + isThinking = false; + break; + + case "warn": + showWarning(eventData.message); + break; + + default: + console.log("Unknown event type:", eventType); + } + } + + function showThinkingIndicator() { + if (isThinking) return; + + const emptyState = document.getElementById("emptyState"); + if (emptyState) emptyState.remove(); + + const thinkingDiv = document.createElement("div"); + thinkingDiv.id = "thinking-indicator"; + thinkingDiv.className = "thinking-indicator"; + thinkingDiv.innerHTML = ` +
+
+
+
+
+ Pensando... + `; + + messagesDiv.appendChild(thinkingDiv); + messagesDiv.scrollTop = messagesDiv.scrollHeight; + isThinking = true; + thinkingIndicatorId = thinkingDiv.id; + } + + function hideThinkingIndicator() { + if (!isThinking) return; + + const thinkingDiv = + document.getElementById("thinking-indicator"); + if (thinkingDiv) { + thinkingDiv.remove(); + } + isThinking = false; + thinkingIndicatorId = null; + } + + function showWarning(message) { + const warningDiv = document.createElement("div"); + warningDiv.className = "warning-message"; + warningDiv.innerHTML = `⚠️ ${message}`; + + messagesDiv.appendChild(warningDiv); + messagesDiv.scrollTop = messagesDiv.scrollHeight; + + // Remove warning after 5 seconds + setTimeout(() => { + if (warningDiv.parentNode) { + warningDiv.remove(); + } + }, 5000); + } + function addMessage( role, content, @@ -539,6 +675,12 @@ function sendMessage() { const message = input.value.trim(); if (!message || !ws || ws.readyState !== WebSocket.OPEN) return; + + // Hide thinking indicator if it's showing + if (isThinking) { + hideThinkingIndicator(); + } + addMessage("user", message); ws.send(message); input.value = ""; @@ -733,12 +875,27 @@ // Neon text animation gsap.to(".neon-text", { textShadow: - "0 0 25px var(--gb-glow),0 0 50px var(--gb-glow),0 0 100px rgba(255,215,0,0.8)", + "0 0 25px var(--dante-glow),0 0 50px var(--dante-glow),0 0 100px rgba(255,215,0,0.8)", repeat: -1, yoyo: true, duration: 1.8, ease: "power1.inOut", }); + + // Test warning functionality + window.testWarning = function () { + fetch("/api/warn", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + session_id: currentSessionId || "default", + channel: "web", + message: "Esta é uma mensagem de teste de aviso!", + }), + }); + };