From a5bfda4d0920fc6c4ee676857bd82119d51256f2 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 2 Nov 2025 23:54:14 -0300 Subject: [PATCH] feat(automation): add LLM server readiness check and improve user session handling - Added LLM server readiness check in AutomationService before starting tasks - Renamed `user` parameter to `user_session` in execute_talk for clarity - Updated BotResponse fields to use user_session data instead of hardcoded values - Improved Redis key generation in execute_talk to use user_session fields - Removed commented Redis code in set_current_context_keyword The changes ensure proper initialization of automation tasks by checking LLM server availability first, and improve code clarity by using more descriptive variable names for user session data. --- src/automation/mod.rs | 33 +++++++++ src/basic/keywords/hear_talk.rs | 12 +-- src/basic/keywords/set_current_context.rs | 89 +++-------------------- src/basic/keywords/set_schedule.rs | 11 +++ src/bot/mod.rs | 75 ++++++++++++------- src/drive_monitor/mod.rs | 35 +++++++++ src/session/mod.rs | 34 +++++---- web/html/index.html | 3 +- 8 files changed, 166 insertions(+), 126 deletions(-) diff --git a/src/automation/mod.rs b/src/automation/mod.rs index c1d8f33d..46d66cde 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -1,3 +1,4 @@ +use crate::config::ConfigManager; use crate::shared::models::schema::bots::dsl::*; use diesel::prelude::*; use crate::basic::ScriptService; @@ -33,6 +34,38 @@ impl AutomationService { tokio::task::spawn_local({ let service = service.clone(); async move { + // Check if llama servers are ready before starting + let config_manager = ConfigManager::new(Arc::clone(&service.state.conn)); + let default_bot_id = { + let mut conn = service.state.conn.lock().unwrap(); + bots.filter(name.eq("default")) + .select(id) + .first::(&mut *conn) + .unwrap_or_else(|_| uuid::Uuid::nil()) + }; + + let llm_url = match config_manager.get_config(&default_bot_id, "llm-url", None) { + Ok(url) => url, + Err(e) => { + error!("Failed to get llm-url config: {}", e); + return; + } + }; + + let embedding_url = match config_manager.get_config(&default_bot_id, "embedding-url", None) { + Ok(url) => url, + Err(e) => { + error!("Failed to get embedding-url config: {}", e); + return; + } + }; + + if !crate::llm::local::is_server_running(&llm_url).await || + !crate::llm::local::is_server_running(&embedding_url).await { + trace!("LLM servers not ready - llm: {}, embedding: {}", llm_url, embedding_url); + return; + } + let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut last_check = Utc::now(); loop { diff --git a/src/basic/keywords/hear_talk.rs b/src/basic/keywords/hear_talk.rs index db0e2774..235e7d5f 100644 --- a/src/basic/keywords/hear_talk.rs +++ b/src/basic/keywords/hear_talk.rs @@ -59,7 +59,7 @@ pub fn hear_keyword(state: Arc, user: UserSession, engine: &mut Engine .unwrap(); } -pub async fn execute_talk(state: Arc, user: UserSession, message: String) -> Result> { +pub async fn execute_talk(state: Arc, user_session: UserSession, message: String) -> Result> { info!("Executing TALK with message: {}", message); debug!("TALK: Sending message: {}", message); @@ -68,7 +68,7 @@ pub async fn execute_talk(state: Arc, user: UserSession, message: Stri if let Some(redis_client) = &state.cache { let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?; - let redis_key = format!("suggestions:{}:{}", user.user_id, user.id); + let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id); debug!("Loading suggestions from Redis key: {}", redis_key); let suggestions_json: Result, _> = redis::cmd("LRANGE") .arg(redis_key.as_str()) @@ -92,9 +92,9 @@ pub async fn execute_talk(state: Arc, user: UserSession, message: Stri } let response = BotResponse { - bot_id: "default_bot".to_string(), - user_id: "default_user".to_string(), - session_id: user.id.to_string(), + bot_id: user_session.bot_id.to_string(), + user_id: user_session.user_id.to_string(), + session_id: user_session.id.to_string(), channel: "web".to_string(), content: message, message_type: 1, @@ -106,7 +106,7 @@ pub async fn execute_talk(state: Arc, user: UserSession, message: Stri context_max_length: 0, }; - let user_id = user.id.to_string(); + let user_id = user_session.id.to_string(); let response_clone = response.clone(); match state.response_channels.try_lock() { diff --git a/src/basic/keywords/set_current_context.rs b/src/basic/keywords/set_current_context.rs index 385bebdf..1388ad1f 100644 --- a/src/basic/keywords/set_current_context.rs +++ b/src/basic/keywords/set_current_context.rs @@ -48,84 +48,19 @@ pub fn set_current_context_keyword(state: Arc, user: UserSession, engi context_name ); - // If a Redis client is configured, perform the SET operation asynchronously. - if let Some(cache_client) = &cache { - trace!("Redis client is available, preparing to set context value"); - - // Clone values needed inside the async block. - let cache_client = cache_client.clone(); - let redis_key = redis_key.clone(); - let context_value = context_value.clone(); - let context_name = context_name.clone(); - - trace!( - "Cloned cache_client, redis_key ({}) and context_value (len={}) for async task", - redis_key, - context_value.len() - ); - - // Spawn a background task so we don't need an async closure here. - tokio::spawn(async move { - trace!("Async task started for SET_CURRENT_CONTEXT operation"); - - // Acquire an async Redis connection. - let mut conn = match cache_client.get_multiplexed_async_connection().await { - Ok(conn) => { - trace!("Successfully acquired async Redis connection"); - conn + // Use session manager to update context + let state = state.clone(); + let user = user.clone(); + let context_value = context_value.clone(); + tokio::spawn(async move { + if let Err(e) = state.session_manager.lock().await.update_session_context( + &user.id, + &user.user_id, + context_value + ).await { + error!("Failed to update session context: {}", e); } - Err(e) => { - error!("Failed to connect to cache: {}", e); - trace!("Aborting SET_CURRENT_CONTEXT task due to connection error"); - return; - } - }; - - // Perform the SET command for the context value. - trace!( - "Executing Redis SET command with key: {} and value length: {}", - redis_key, - context_value.len() - ); - let set_result: Result<(), redis::RedisError> = redis::cmd("SET") - .arg(&redis_key) - .arg(&context_value) - .query_async(&mut conn) - .await; - - match set_result { - Ok(_) => { - trace!("Successfully set context in Redis for key {}", redis_key); - } - Err(e) => { - error!("Failed to set cache value: {}", e); - trace!("SET_CURRENT_CONTEXT Redis SET command failed"); - return; - } - } - - // Mark the context as active in a separate hash. - let active_key = format!("active_context:{}:{}", user.user_id, user.id); - trace!("Setting active flag for context {} in hash {}", context_name, active_key); - let hset_result: Result = redis::cmd("HSET") - .arg(&active_key) - .arg(&context_name) - .arg("active") - .query_async(&mut conn) - .await; - - match hset_result { - Ok(fields_added) => { - trace!("Active flag set for context {} (fields added: {})", context_name, fields_added); - } - Err(e) => { - error!("Failed to set active flag for context {}: {}", context_name, e); - } - } - }); - } else { - trace!("No Redis client configured; SET_CURRENT_CONTEXT will not persist to cache"); - } + }); Ok(Dynamic::UNIT) }, diff --git a/src/basic/keywords/set_schedule.rs b/src/basic/keywords/set_schedule.rs index 8e7678fb..94af3ac7 100644 --- a/src/basic/keywords/set_schedule.rs +++ b/src/basic/keywords/set_schedule.rs @@ -17,6 +17,17 @@ pub fn execute_set_schedule( cron, script_name, bot_uuid ); + // First check if bot exists + use crate::shared::models::bots::dsl::bots; + let bot_exists: bool = diesel::select(diesel::dsl::exists( + bots.filter(crate::shared::models::bots::dsl::id.eq(bot_uuid)) + )) + .get_result(conn)?; + + if !bot_exists { + return Err(format!("Bot with id {} does not exist", bot_uuid).into()); + } + use crate::shared::models::system_automations::dsl::*; let new_automation = ( diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 95685749..ba054e27 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -335,14 +335,22 @@ impl BotOrchestrator { session_id, context_name ); - let mut session_manager = self.state.session_manager.lock().await; - session_manager - .update_session_context( - &Uuid::parse_str(session_id)?, - &Uuid::parse_str(user_id)?, - context_name.to_string(), - ) - .await?; + // Use session manager to update context + let session_uuid = Uuid::parse_str(session_id).map_err(|e| { + error!("Failed to parse session_id: {}", e); + e + })?; + let user_uuid = Uuid::parse_str(user_id).map_err(|e| { + error!("Failed to parse user_id: {}", e); + e + })?; + if let Err(e) = self.state.session_manager.lock().await.update_session_context( + &session_uuid, + &user_uuid, + context_name.to_string() + ).await { + error!("Failed to update session context: {}", e); + } // Send confirmation back to client let confirmation = BotResponse { @@ -458,17 +466,12 @@ impl BotOrchestrator { )?; } - let response_content = self.direct_mode_handler(&message, &session).await?; - { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; - } - - // Handle context change messages (type 4) first + // Handle context change messages (type 4) immediately + // before any other processing if message.message_type == 4 { if let Some(context_name) = &message.context_name { - return self + self .handle_context_change( &message.user_id, &message.bot_id, @@ -476,11 +479,20 @@ impl BotOrchestrator { &message.channel, context_name, ) - .await; + .await?; + } } - // Create regular response + + let response_content = self.direct_mode_handler(&message, &session).await?; + + { + let mut session_manager = self.state.session_manager.lock().await; + session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; + } + + // Create regular response for non-context-change messages let channel = message.channel.clone(); let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); let max_context_size = config_manager @@ -528,7 +540,7 @@ impl BotOrchestrator { let context_data = { let session_manager = self.state.session_manager.lock().await; session_manager - .get_session_context(&session.id, &session.user_id) + .get_session_context_data(&session.id, &session.user_id) .await? }; @@ -721,7 +733,7 @@ impl BotOrchestrator { let context_data = { let session_manager = self.state.session_manager.lock().await; session_manager - .get_session_context(&session.id, &session.user_id) + .get_session_context_data(&session.id, &session.user_id) .await? }; @@ -1306,17 +1318,26 @@ async fn websocket_handler( session_id: session_id_clone2.clone(), channel: "web".to_string(), content, - message_type: 1, + message_type: json_value["message_type"] + .as_u64() + .unwrap_or(1) as i32, media_url: None, timestamp: Utc::now(), - context_name: None, + context_name: json_value["context_name"] + .as_str() + .map(|s| s.to_string()), }; - if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { - error!( - "Error processing WebSocket message {}: {}", - message_count, e - ); + // First try processing as a regular message + match orchestrator.process_message(user_message.clone()).await { + Ok(_) => (), + Err(e) => { + error!("Failed to process message: {}", e); + // Fall back to streaming if processing fails + if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { + error!("Failed to stream response: {}", e); + } + } } } WsMessage::Close(reason) => { diff --git a/src/drive_monitor/mod.rs b/src/drive_monitor/mod.rs index 44c1559b..58d17c9a 100644 --- a/src/drive_monitor/mod.rs +++ b/src/drive_monitor/mod.rs @@ -1,3 +1,5 @@ +use crate::shared::models::schema::bots::dsl::*; +use diesel::prelude::*; use crate::basic::compiler::BasicCompiler; use crate::config::ConfigManager; use crate::kb::embeddings; @@ -42,6 +44,39 @@ impl DriveMonitor { "Drive Monitor service started for bucket: {}", self.bucket_name ); + + // Check if llama servers are ready before first scan + let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); + let default_bot_id = { + let mut conn = self.state.conn.lock().unwrap(); + bots.filter(name.eq("default")) + .select(id) + .first::(&mut *conn) + .unwrap_or_else(|_| uuid::Uuid::nil()) + }; + + let llm_url = match config_manager.get_config(&default_bot_id, "llm-url", None) { + Ok(url) => url, + Err(e) => { + error!("Failed to get llm-url config: {}", e); + return; + } + }; + + let embedding_url = match config_manager.get_config(&default_bot_id, "embedding-url", None) { + Ok(url) => url, + Err(e) => { + error!("Failed to get embedding-url config: {}", e); + return; + } + }; + + if !crate::llm::local::is_server_running(&llm_url).await || + !crate::llm::local::is_server_running(&embedding_url).await { + trace!("LLM servers not ready - llm: {}, embedding: {}", llm_url, embedding_url); + return; + } + let mut tick = interval(Duration::from_secs(30)); loop { tick.tick().await; diff --git a/src/session/mod.rs b/src/session/mod.rs index 53fd427c..836c881b 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -218,20 +218,23 @@ impl SessionManager { pub async fn update_session_context( &mut self, session_id: &Uuid, - _user_id: &Uuid, - context_name: String, + user_id: &Uuid, + context_data: String, ) -> Result<(), Box> { - use crate::shared::models::schema::user_sessions::dsl::*; - use diesel::prelude::*; - - diesel::update(user_sessions.filter(id.eq(session_id).and(user_id.eq(user_id)))) - .set(context_data.eq(serde_json::json!({ "current_context": context_name }))) - .execute(&mut self.conn)?; + use redis::Commands; + let redis_key = format!("context:{}:{}", user_id, session_id); + if let Some(redis_client) = &self.redis { + let mut conn = redis_client.get_connection()?; + conn.set(&redis_key, &context_data)?; + info!("Updated context in Redis for key {}", redis_key); + } else { + warn!("No Redis client configured, context not persisted"); + } Ok(()) } - pub async fn get_session_context( + pub async fn get_session_context_data( &self, session_id: &Uuid, user_id: &Uuid, @@ -241,11 +244,11 @@ impl SessionManager { let redis_key = format!("context:{}:{}", user_id, session_id); if let Some(redis_client) = &self.redis { - // Attempt to obtain a Redis connection; log and ignore errors, returning `None`. + // Attempt to obtain a Redis connection; log and ignore errors let conn_option = redis_client .get_connection() .map_err(|e| { - warn!("Failed to get Redis connection: {}", e); + warn!("Failed to get Cache connection: {}", e); e }) .ok(); @@ -254,22 +257,23 @@ impl SessionManager { match connection.get::<_, Option>(&redis_key) { Ok(Some(context)) => { debug!( - "Retrieved context from Redis for key {}: {} chars", + "Retrieved context from Cache for key {}: {} chars", redis_key, context.len() ); return Ok(context); } Ok(None) => { - debug!("No context found in Redis for key {}", redis_key); + debug!("No context found in Cache for key {}", redis_key); } Err(e) => { - warn!("Failed to retrieve context from Redis: {}", e); + warn!("Failed to retrieve context from Cache: {}", e); } } } } - // If Redis is unavailable or the key is missing, return an empty context. + + // If no context found, return empty string Ok(String::new()) } diff --git a/web/html/index.html b/web/html/index.html index 17025229..3abe017b 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -1728,7 +1728,8 @@ pendingContextChange = new Promise((resolve) => { const handler = (event) => { const response = JSON.parse(event.data); - if (response.message_type === 5 && response.context_name === context) { + if (response.message_type === 5 && + response.context_name === context) { ws.removeEventListener('message', handler); resolve(); }