From 21e0cb09e44069a66f8b4beee0ced6677b3ddebd Mon Sep 17 00:00:00 2001 From: Rodrigo Rodriguez Date: Thu, 30 Apr 2026 17:14:09 -0300 Subject: [PATCH] fix: resolve syntax errors and stabilize start.bas guard logic --- botserver/src/core/bot/mod.rs | 231 +++++++++++++--------------------- 1 file changed, 89 insertions(+), 142 deletions(-) diff --git a/botserver/src/core/bot/mod.rs b/botserver/src/core/bot/mod.rs index 86b5b4d6..945ee0a2 100644 --- a/botserver/src/core/bot/mod.rs +++ b/botserver/src/core/bot/mod.rs @@ -930,12 +930,15 @@ let system_prompt = if !message.active_switchers.is_empty() { .collect(), switchers: Vec::new(), context_name: None, + context_length: 0, + context_max_length: 0, + }; - if let Err(e) = response_tx.send(final_response).await { - warn!("Failed to send final response for empty content: {}", e); - } - return Ok(()); - } + if let Err(e) = response_tx.send(final_response).await { + warn!("Failed to send final response for empty content: {}", e); + } + return Ok(()); + } // Inject KB context for normal messages if let Some(kb_manager) = self.state.kb_manager.as_ref() { @@ -1767,19 +1770,11 @@ async fn handle_websocket( bot_id, bot_name_result ); - if let Some(bot_name) = bot_name_result { - // Web clients expect start.bas to execute their first screen every time they connect/reload. - // Execute unconditionally on every WebSocket connection. - let should_execute_start_bas = true; + let work_path = crate::core::shared::utils::get_work_path(); + let start_script_path = format!("{}/{}.gbai/{}.gbdialog/start.bas", work_path, bot_name, bot_name); - if should_execute_start_bas { - let work_path = crate::core::shared::utils::get_work_path(); - let start_script_path = format!("{}/{}.gbai/{}.gbdialog/start.bas", work_path, bot_name, bot_name); - - info!("Looking for start.bas at: {}", start_script_path); - - // Load pre-compiled .ast only (compilation happens in Drive Monitor) + // Load pre-compiled .ast or fallback to .bas let ast_path = start_script_path.replace(".bas", ".ast"); let ast_content = match tokio::fs::read_to_string(&ast_path).await { Ok(content) if !content.is_empty() => content, @@ -1794,144 +1789,96 @@ async fn handle_websocket( } }; - if !ast_content.is_empty() { - info!( - "Executing start.bas for bot {} on session {}", - bot_name, session_id - ); + if !ast_content.is_empty() { + let state_for_start = state.clone(); + let tx_for_start = tx.clone(); + let bot_id_str = bot_id.to_string(); + let session_id_str = session_id.to_string(); + let mut send_ready_rx = send_ready_rx; - let state_for_start = state.clone(); - let tx_for_start = tx.clone(); - let bot_id_str = bot_id.to_string(); - let session_id_str = session_id.to_string(); - let mut send_ready_rx = send_ready_rx; + tokio::spawn(async move { + let _ = send_ready_rx.recv().await; - tokio::spawn(async move { - let _ = send_ready_rx.recv().await; + let session_result = { + let mut sm = state_for_start.session_manager.lock().await; + let by_id = sm.get_session_by_id(session_id); + match by_id { + Ok(Some(s)) => Ok(Some(s)), + _ => sm.get_or_create_user_session(user_id, bot_id, "Chat Session"), + } + }; - let session_result = { - let mut sm = state_for_start.session_manager.lock().await; - let by_id = sm.get_session_by_id(session_id); - match by_id { - Ok(Some(s)) => Ok(Some(s)), - _ => sm.get_or_create_user_session(user_id, bot_id, "Chat Session"), + if let Ok(Some(mut session)) = session_result { + let actual_session_id = session.id.to_string(); + let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id); + + let should_execute = if let Some(cache) = &state_for_start.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + let was_set: Option = redis::cmd("SET") + .arg(&start_bas_flag_key) + .arg("1") + .arg("NX") + .arg("EX") + .arg(86400) + .query_async(&mut conn) + .await + .ok(); + was_set.is_some() + } else { + false } + } else { + true }; - if let Ok(Some(mut session)) = session_result { - // Unified guard for start.bas execution (shared with stream_response) - let actual_session_id = session.id.to_string(); - let start_bas_flag_key = format!("start_bas_executed:{}:{}", bot_id_str, actual_session_id); - - let should_execute = if let Some(cache) = &state_for_start.cache { - if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { - let was_set: Option = redis::cmd("SET") - .arg(&start_bas_flag_key) - .arg("1") - .arg("NX") - .arg("EX") - .arg(86400) - .query_async(&mut conn) - .await - .ok(); - was_set.is_some() - } else { - false // Safe fallback - } - } else { - true // No cache, allow execution + if !should_execute { + trace!("start.bas already executed for session {}, skipping", actual_session_id); + return; + } + + // Store WebSocket ID in context + if let serde_json::Value::Object(ref mut map) = session.context_data { + map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); + } + + let state_for_redis = state_for_start.clone(); + let result = tokio::task::spawn_blocking(move || { + let mut script_service = crate::basic::ScriptService::new(state_for_start.clone(), session.clone()); + script_service.load_bot_config_params(&state_for_start, bot_id); + script_service.run(&ast_content).map_err(|e| e.to_string()) + }).await; + + match result { + Ok(Ok(())) => info!("start.bas executed successfully for bot {}", bot_name), + Ok(Err(e)) => error!("start.bas error: {}", e), + Err(e) => error!("start.bas task error: {}", e), + } + + // Send suggestions if any + let suggestions = get_suggestions(state_for_redis.cache.as_ref(), &bot_id_str, &actual_session_id); + let switchers = get_switchers(state_for_redis.cache.as_ref(), &bot_id_str, &actual_session_id); + if !suggestions.is_empty() || !switchers.is_empty() { + let response = BotResponse { + bot_id: bot_id_str, + user_id: user_id.to_string(), + session_id: session_id_str, + channel: "Chat".to_string(), + content: String::new(), + message_type: MessageType::BOT_RESPONSE, + is_complete: true, + suggestions, + switchers, + ..Default::default() }; - - if !should_execute { - trace!("start.bas already executed for session {}, skipping in handle_websocket", actual_session_id); - return; - } - - info!("start.bas: Found session {} for websocket session {}", session.id, session_id); - - // Save session ID before session is moved into closure - let session_id_for_redis = session.id.to_string(); - - // Store WebSocket session_id in context so TALK can route messages correctly - if let serde_json::Value::Object(ref mut map) = session.context_data { - map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); - } else { - let mut map = serde_json::Map::new(); - map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); - session.context_data = serde_json::Value::Object(map); - } - - // Clone state_for_start for use in Redis SET after execution - let state_for_redis = state_for_start.clone(); - - let result = tokio::task::spawn_blocking(move || { - info!("start.bas: Creating ScriptService with session.id={}", session.id); - let mut script_service = crate::basic::ScriptService::new( - state_for_start.clone(), - session.clone() - ); - script_service.load_bot_config_params(&state_for_start, bot_id); - - match script_service.run(&ast_content) { - Ok(_) => Ok(()), - Err(e) => Err(format!("Script execution error: {}", e)), - } - }).await; - - match result { - Ok(Ok(())) => { - info!("start.bas executed successfully for bot {}", bot_name); - } - Ok(Err(e)) => { - error!("start.bas error for bot {}: {}", bot_name, e); - } - Err(e) => { - error!("start.bas task error for bot {}: {}", bot_name, e); - } - } - - // Fetch suggestions and switchers from Redis and send to frontend - // Use session_id_for_redis (DB session) not session_id_str (WebSocket session) for Redis key consistency - let user_id_str = user_id.to_string(); - let suggestions = get_suggestions(state_for_redis.cache.as_ref(), &bot_id_str, &session_id_for_redis); - let switchers = get_switchers(state_for_redis.cache.as_ref(), &bot_id_str, &session_id_for_redis); - if !suggestions.is_empty() || !switchers.is_empty() { - info!("Sending {} suggestions to frontend for session {}", suggestions.len(), session_id); - let response = BotResponse { - bot_id: bot_id_str.clone(), - user_id: user_id_str.clone(), - session_id: session_id_str.clone(), - channel: "Chat".to_string(), - content: String::new(), - message_type: MessageType::BOT_RESPONSE, - stream_token: None, - is_complete: true, - suggestions, - switchers, - context_name: None, - context_length: 0, - context_max_length: 0, - }; - let _ = tx_for_start.send(response).await; - - - } - } - Ok(Err(e)) => { - error!("start.bas error for bot {}: {}", bot_name, e); - } - Err(e) => { - error!("start.bas task error for bot {}: {}", bot_name, e); + let _ = tx_for_start.send(response).await; } + } + }); } } - }); } -} -} // End of if should_execute_start_bas -} -let mut send_task = tokio::spawn(async move { + let mut send_task = tokio::spawn(async move { while let Some(response) = rx.recv().await { if let Ok(json_str) = serde_json::to_string(&response) { if sender.send(Message::Text(json_str)).await.is_err() {