diff --git a/Cargo.toml b/Cargo.toml index b4f31dee..2a5955ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "botserver" -version = "6.2.0" +version = "6.3.0" edition = "2021" resolver = "2" diff --git a/scripts/migrate_zfs_to_incus_dir.sh b/scripts/migrate_zfs_to_incus_dir.sh new file mode 100755 index 00000000..acab9b99 --- /dev/null +++ b/scripts/migrate_zfs_to_incus_dir.sh @@ -0,0 +1,181 @@ +#!/bin/bash +# +# Migrate LXD containers from ZFS to Incus directory storage +# Usage: ./migrate_zfs_to_incus_dir.sh [options] +# +# Options: +# --dry-run Show what would be done without executing +# --source-pool NAME LXD storage pool name (default: default) +# --dest-pool NAME Incus storage pool name (default: default) +# --containers LIST Comma-separated list of containers (default: all) +# --incus-host HOST Remote Incus host (optional) +# --help Show this help + +set -e + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +# Defaults +DRY_RUN=false +SOURCE_POOL="default" +DEST_POOL="default" +CONTAINERS="" +INCUS_HOST="" +BACKUP_DIR="/tmp/lxd-backups" + +# Print banner +print_banner() { + echo -e "${BLUE}" + echo "╔════════════════════════════════════════════════════════════╗" + echo "║ LXD to Incus Migration Tool (ZFS -> Directory Storage) ║" + echo "╚════════════════════════════════════════════════════════════╝" + echo -e "${NC}" +} + +# Show help +show_help() { + echo "Usage: $0 [options]" + echo "" + echo "Migrate LXD containers from ZFS storage to Incus directory storage." + echo "" + echo "Options:" + echo " --dry-run Show what would be done without executing" + echo " --source-pool NAME LXD storage pool name (default: default)" + echo " --dest-pool NAME Incus storage pool name (default: default)" + echo " --containers LIST Comma-separated list of containers (default: all)" + echo " --incus-host HOST Remote Incus host (ssh)" + echo " --help Show this help" + echo "" + echo "Examples:" + echo " $0 --dry-run" + echo " $0 --source-pool zfs-storage --dest-pool dir-storage" + echo " $0 --containers container1,container2 --incus-host user@remote-host" +} + +# Check dependencies +check_dependencies() { + local missing=() + if ! command -v lxc &> /dev/null; then + missing+=("lxc") + fi + if ! command -v incus &> /dev/null; then + missing+=("incus") + fi + if [ ${#missing[@]} -gt 0 ]; then + echo -e "${RED}Missing dependencies: ${missing[*]}${NC}" + echo "Please install them and try again." + exit 1 + fi +} + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + --dry-run) + DRY_RUN=true + shift + ;; + --source-pool) + SOURCE_POOL="$2" + shift 2 + ;; + --dest-pool) + DEST_POOL="$2" + shift 2 + ;; + --containers) + CONTAINERS="$2" + shift 2 + ;; + --incus-host) + INCUS_HOST="$2" + shift 2 + ;; + -h|--help) + show_help + exit 0 + ;; + *) + echo -e "${RED}Unknown option: $1${NC}" + show_help + exit 1 + ;; + esac +done + +# Execute command (dry-run aware) +exec_cmd() { + local cmd="$1" + if [ "$DRY_RUN" = true ]; then + echo -e "${YELLOW}[DRY-RUN] $cmd${NC}" + else + echo -e "${GREEN}[EXEC] $cmd${NC}" + eval "$cmd" + fi +} + +# Main migration logic +migrate_container() { + local container="$1" + local backup_file="$BACKUP_DIR/${container}-$(date +%Y%m%d%H%M%S).tar.gz" + + echo -e "${BLUE}Migrating container: $container${NC}" + + # Create backup directory + exec_cmd "mkdir -p $BACKUP_DIR" + + # Create snapshot + local snapshot_name="migrate-$(date +%Y%m%d%H%M%S)" + exec_cmd "lxc snapshot $container $snapshot_name --stateful" + + # Export container + exec_cmd "lxc export $container $backup_file --snapshot $snapshot_name" + + # Transfer to remote Incus host if specified + if [ -n "$INCUS_HOST" ]; then + exec_cmd "scp $backup_file $INCUS_HOST:$BACKUP_DIR/" + exec_cmd "rm $backup_file" + backup_file="$BACKUP_DIR/$(basename $backup_file)" + fi + + # Import into Incus + local import_cmd="incus import $backup_file $container" + if [ -n "$INCUS_HOST" ]; then + import_cmd="ssh $INCUS_HOST '$import_cmd'" + fi + exec_cmd "$import_cmd" + + # Cleanup snapshot + exec_cmd "lxc delete $container/$snapshot_name" + + # Cleanup local backup file if not remote + if [ -z "$INCUS_HOST" ] && [ "$DRY_RUN" = false ]; then + rm "$backup_file" + fi + + echo -e "${GREEN}Completed migration for: $container${NC}" +} + +# Main +print_banner +check_dependencies + +# Get list of containers +if [ -z "$CONTAINERS" ]; then + CONTAINERS=$(lxc list --format csv | cut -d',' -f1) +fi + +# Convert comma-separated list to array +IFS=',' read -ra CONTAINER_ARRAY <<< "$CONTAINERS" + +# Migrate each container +for container in "${CONTAINER_ARRAY[@]}"; do + migrate_container "$container" +done + +echo -e "${GREEN}Migration complete!${NC}" diff --git a/src/analytics/goals.rs b/src/analytics/goals.rs index 112dd3b1..18a5a9f7 100644 --- a/src/analytics/goals.rs +++ b/src/analytics/goals.rs @@ -1181,3 +1181,131 @@ pub fn configure_goals_routes() -> Router> { .route("/api/goals/templates", get(list_templates)) .route("/api/goals/ai/suggest", post(ai_suggest)) } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use uuid::Uuid; + + #[test] + fn test_objective_record_creation() { + let now = Utc::now(); + let objective = ObjectiveRecord { + id: Uuid::new_v4(), + org_id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + owner_id: Uuid::new_v4(), + parent_id: None, + title: "Test Objective".to_string(), + description: Some("Test description".to_string()), + period: "Q1 2025".to_string(), + period_start: None, + period_end: None, + status: "draft".to_string(), + progress: BigDecimal::from(0), + visibility: "team".to_string(), + weight: BigDecimal::from(1), + tags: vec![Some("test".to_string())], + created_at: now, + updated_at: now, + }; + + assert_eq!(objective.title, "Test Objective"); + assert_eq!(objective.status, "draft"); + assert_eq!(objective.progress, BigDecimal::from(0)); + } + + #[test] + fn test_key_result_record_creation() { + let now = Utc::now(); + let key_result = KeyResultRecord { + id: Uuid::new_v4(), + org_id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + objective_id: Uuid::new_v4(), + owner_id: Uuid::new_v4(), + title: "Test Key Result".to_string(), + description: Some("Test KR description".to_string()), + metric_type: "numeric".to_string(), + start_value: BigDecimal::from(0), + target_value: BigDecimal::from(100), + current_value: BigDecimal::from(0), + unit: Some("units".to_string()), + weight: BigDecimal::from(1), + status: "not_started".to_string(), + due_date: None, + scoring_type: "linear".to_string(), + created_at: now, + updated_at: now, + }; + + assert_eq!(key_result.title, "Test Key Result"); + assert_eq!(key_result.metric_type, "numeric"); + assert_eq!(key_result.target_value, BigDecimal::from(100)); + assert_eq!(key_result.status, "not_started"); + } + + #[test] + fn test_check_in_record_creation() { + let now = Utc::now(); + let check_in = CheckInRecord { + id: Uuid::new_v4(), + org_id: Uuid::new_v4(), + bot_id: Uuid::new_v4(), + key_result_id: Uuid::new_v4(), + user_id: Uuid::new_v4(), + previous_value: Some(BigDecimal::from(0)), + new_value: BigDecimal::from(50), + note: Some("Progress update".to_string()), + confidence: Some("high".to_string()), + blockers: Some("No blockers".to_string()), + created_at: now, + }; + + assert_eq!(check_in.new_value, BigDecimal::from(50)); + assert_eq!(check_in.confidence, Some("high".to_string())); + } + + #[test] + fn test_goal_template_creation() { + let template = GoalTemplate { + id: Uuid::new_v4(), + organization_id: Uuid::new_v4(), + name: "Test Template".to_string(), + description: Some("Test template description".to_string()), + category: Some("product".to_string()), + objective_template: ObjectiveTemplate { + title: "Template Objective".to_string(), + description: "Template objective description".to_string(), + }, + key_result_templates: vec![KeyResultTemplate { + title: "Template KR".to_string(), + description: "Template KR description".to_string(), + metric_type: "numeric".to_string(), + start_value: BigDecimal::from(0), + target_value: BigDecimal::from(100), + unit: Some("units".to_string()), + }], + is_system: false, + created_at: Utc::now(), + }; + + assert_eq!(template.name, "Test Template"); + assert_eq!(template.objective_template.title, "Template Objective"); + assert_eq!(template.key_result_templates.len(), 1); + assert!(!template.is_system); + } + + #[test] + fn test_goals_error_display() { + let db_error = GoalsError::Database("Connection failed".to_string()); + assert!(format!("{}", db_error).contains("Database error")); + + let not_found = GoalsError::NotFound("Objective not found".to_string()); + assert!(format!("{}", not_found).contains("not found")); + + let validation = GoalsError::Validation("Invalid input".to_string()); + assert!(format!("{}", validation).contains("Validation error")); + } +} diff --git a/src/basic/keywords/add_suggestion.rs b/src/basic/keywords/add_suggestion.rs index 1ec58bc5..033d19d8 100644 --- a/src/basic/keywords/add_suggestion.rs +++ b/src/basic/keywords/add_suggestion.rs @@ -367,13 +367,13 @@ fn add_tool_suggestion( /// Note: This function clears suggestions from Redis after fetching them to prevent duplicates pub fn get_suggestions( cache: Option<&Arc>, - user_id: &str, + bot_id: &str, session_id: &str, ) -> Vec { let mut suggestions = Vec::new(); if let Some(cache_client) = cache { - let redis_key = format!("suggestions:{}:{}", user_id, session_id); + let redis_key = format!("suggestions:{}:{}", bot_id, session_id); let mut conn = match cache_client.get_connection() { Ok(conn) => conn, diff --git a/src/basic/keywords/hearing/syntax.rs b/src/basic/keywords/hearing/syntax.rs index 227acefc..3a3e39a4 100644 --- a/src/basic/keywords/hearing/syntax.rs +++ b/src/basic/keywords/hearing/syntax.rs @@ -161,6 +161,7 @@ fn register_hear_as_type(state: Arc, user: UserSession, engine: &mut E fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut Engine) { let session_id = user.id; + let bot_id = user.bot_id; let state_clone = Arc::clone(&state); engine @@ -207,10 +208,11 @@ fn register_hear_as_menu(state: Arc, user: UserSession, engine: &mut E // Store suggestions in Redis for UI let state_for_suggestions = Arc::clone(&state_clone); let opts_clone = options.clone(); + let bot_id_clone = bot_id; tokio::runtime::Handle::current().block_on(async move { if let Some(redis) = &state_for_suggestions.cache { if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { - let key = format!("suggestions:{session_id}:{session_id}"); + let key = format!("suggestions:{}:{}", bot_id_clone, session_id); for opt in &opts_clone { let _: Result<(), _> = redis::cmd("RPUSH") .arg(&key) diff --git a/src/basic/keywords/hearing/talk.rs b/src/basic/keywords/hearing/talk.rs index 9f20ff58..c8d74779 100644 --- a/src/basic/keywords/hearing/talk.rs +++ b/src/basic/keywords/hearing/talk.rs @@ -13,33 +13,6 @@ pub async fn execute_talk( message: String, ) -> Result> { info!("TALK called with message: {}", message); - let mut suggestions = Vec::new(); - - if let Some(redis_client) = &state.cache { - if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await { - let redis_key = format!("suggestions:{}:{}", user_session.bot_id, user_session.id); - info!("TALK: Fetching suggestions from Redis key: {}", redis_key); - - let suggestions_json: Result, _> = redis::cmd("LRANGE") - .arg(redis_key.as_str()) - .arg(0) - .arg(-1) - .query_async(&mut conn) - .await; - - if let Ok(suggestions_list) = suggestions_json { - info!("TALK: Got {} suggestions from Redis", suggestions_list.len()); - suggestions = suggestions_list - .into_iter() - .filter_map(|s| serde_json::from_str(&s).ok()) - .collect(); - } else { - info!("TALK: No suggestions found in Redis"); - } - } - } else { - info!("TALK: No cache configured"); - } let channel = user_session .context_data @@ -68,7 +41,7 @@ pub async fn execute_talk( message_type: MessageType::BOT_RESPONSE, stream_token: None, is_complete: true, - suggestions, + suggestions: Vec::new(), context_name: None, context_length: 0, context_max_length: 0, @@ -96,17 +69,24 @@ pub async fn execute_talk( } }); } else { - let user_id = user_session.id.to_string(); + // Use WebSocket session_id from context if available, otherwise fall back to session.id + let target_session_id = user_session + .context_data + .get("websocket_session_id") + .and_then(|v| v.as_str()) + .unwrap_or(&user_session.id.to_string()) + .to_string(); + let web_adapter = Arc::clone(&state.web_adapter); tokio::spawn(async move { if let Err(e) = web_adapter - .send_message_to_session(&user_id, response_clone) + .send_message_to_session(&target_session_id, response_clone) .await { error!("Failed to send TALK message via web adapter: {}", e); } else { - trace!("TALK message sent via web adapter"); + trace!("TALK message sent via web adapter to session {}", target_session_id); } }); } diff --git a/src/basic/keywords/use_website.rs b/src/basic/keywords/use_website.rs index 34610732..cbf9993e 100644 --- a/src/basic/keywords/use_website.rs +++ b/src/basic/keywords/use_website.rs @@ -13,29 +13,33 @@ fn parse_refresh_interval(interval: &str) -> Result { // Match patterns like "1d", "7d", "2w", "1m", "1y", etc. if interval_lower.ends_with('d') { - let days: i32 = interval_lower[..interval_lower.len()-1] + let days: i32 = interval_lower[..interval_lower.len() - 1] .parse() .map_err(|_| format!("Invalid days format: {}", interval))?; Ok(days) } else if interval_lower.ends_with('w') { - let weeks: i32 = interval_lower[..interval_lower.len()-1] + let weeks: i32 = interval_lower[..interval_lower.len() - 1] .parse() .map_err(|_| format!("Invalid weeks format: {}", interval))?; Ok(weeks * 7) } else if interval_lower.ends_with('m') { - let months: i32 = interval_lower[..interval_lower.len()-1] + let months: i32 = interval_lower[..interval_lower.len() - 1] .parse() .map_err(|_| format!("Invalid months format: {}", interval))?; Ok(months * 30) // Approximate month as 30 days } else if interval_lower.ends_with('y') { - let years: i32 = interval_lower[..interval_lower.len()-1] + let years: i32 = interval_lower[..interval_lower.len() - 1] .parse() .map_err(|_| format!("Invalid years format: {}", interval))?; Ok(years * 365) // Approximate year as 365 days } else { // Try to parse as plain number (assume days) - interval.parse() - .map_err(|_| format!("Invalid refresh interval format: {}. Use format like '1d', '1w', '1m', '1y'", interval)) + interval.parse().map_err(|_| { + format!( + "Invalid refresh interval format: {}. Use format like '1d', '1w', '1m', '1y'", + interval + ) + }) } } @@ -192,118 +196,109 @@ pub fn register_use_website_function(state: Arc, user: UserSession, en let user_clone = user.clone(); // Register USE_WEBSITE(url, refresh) with both parameters (uppercase) - engine.register_fn( - "USE_WEBSITE", - move |url: &str, refresh: &str| -> Dynamic { - trace!( - "USE_WEBSITE function called: {} REFRESH {} for session: {}", - url, - refresh, - user_clone.id + engine.register_fn("USE_WEBSITE", move |url: &str, refresh: &str| -> Dynamic { + trace!( + "USE_WEBSITE function called: {} REFRESH {} for session: {}", + url, + refresh, + user_clone.id + ); + + let is_valid = url.starts_with("http://") || url.starts_with("https://"); + if !is_valid { + return Dynamic::from(format!( + "ERROR: Invalid URL format: {}. Must start with http:// or https://", + url + )); + } + + let state_for_task = Arc::clone(&state_clone); + let user_for_task = user_clone.clone(); + let url_for_task = url.to_string(); + let refresh_for_task = refresh.to_string(); + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let _rt = match tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + { + Ok(_rt) => _rt, + Err(e) => { + let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e))); + return; + } + }; + let result = associate_website_with_session_refresh( + &state_for_task, + &user_for_task, + &url_for_task, + &refresh_for_task, ); + let _ = tx.send(result); + }); - let is_valid = url.starts_with("http://") || url.starts_with("https://"); - if !is_valid { - return Dynamic::from(format!( - "ERROR: Invalid URL format: {}. Must start with http:// or https://", - url - )); - } - - let state_for_task = Arc::clone(&state_clone); - let user_for_task = user_clone.clone(); - let url_for_task = url.to_string(); - let refresh_for_task = refresh.to_string(); - let (tx, rx) = std::sync::mpsc::channel(); - - std::thread::spawn(move || { - let _rt = match tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - { - Ok(_rt) => _rt, - Err(e) => { - let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e))); - return; - } - }; - let result = associate_website_with_session_refresh( - &state_for_task, - &user_for_task, - &url_for_task, - &refresh_for_task, - ); - let _ = tx.send(result); - }); - - match rx.recv_timeout(std::time::Duration::from_secs(3)) { - Ok(Ok(message)) => Dynamic::from(message), - Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), - Err(_) => Dynamic::from("Website association scheduled."), - } - }, - ); + match rx.recv_timeout(std::time::Duration::from_secs(3)) { + Ok(Ok(message)) => Dynamic::from(message), + Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), + Err(_) => Dynamic::from("Website association scheduled."), + } + }); let state_clone2 = Arc::clone(&state); let user_clone2 = user.clone(); // Register use_website(url, refresh) with both parameters (lowercase for preprocessor) - engine.register_fn( - "use_website", - move |url: &str, refresh: &str| -> Dynamic { - trace!( - "use_website function called: {} REFRESH {} for session: {}", - url, - refresh, - user_clone2.id - ); + engine.register_fn("use_website", move |url: &str, refresh: &str| -> Dynamic { + trace!( + "use_website function called: {} REFRESH {} for session: {}", + url, + refresh, + user_clone2.id + ); - let is_valid = url.starts_with("http://") || url.starts_with("https://"); - if !is_valid { - return Dynamic::from(format!( - "ERROR: Invalid URL format: {}. Must start with http:// or https://", - url - )); - } + let is_valid = url.starts_with("http://") || url.starts_with("https://"); + if !is_valid { + return Dynamic::from(format!( + "ERROR: Invalid URL format: {}. Must start with http:// or https://", + url + )); + } - let state_for_task = Arc::clone(&state_clone2); - let user_for_task = user_clone2.clone(); - let url_for_task = url.to_string(); - let refresh_for_task = refresh.to_string(); - let (tx, rx) = std::sync::mpsc::channel(); + let state_for_task = Arc::clone(&state_clone2); + let user_for_task = user_clone2.clone(); + let url_for_task = url.to_string(); + let refresh_for_task = refresh.to_string(); + let (tx, rx) = std::sync::mpsc::channel(); - std::thread::spawn(move || { - let _rt = match tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - { - Ok(_rt) => _rt, - Err(e) => { - let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e))); - return; - } - }; - let result = associate_website_with_session_refresh( - &state_for_task, - &user_for_task, - &url_for_task, - &refresh_for_task, - ); - let _ = tx.send(result); - }); - - match rx.recv_timeout(std::time::Duration::from_secs(3)) { - Ok(Ok(message)) => Dynamic::from(message), - Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), - Err(_) => { - Dynamic::from("Website association scheduled.") + std::thread::spawn(move || { + let _rt = match tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + { + Ok(_rt) => _rt, + Err(e) => { + let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e))); + return; } - Err(e) => Dynamic::from(format!("ERROR: use_website failed: {}", e)), - } - }, - ); + }; + let result = associate_website_with_session_refresh( + &state_for_task, + &user_for_task, + &url_for_task, + &refresh_for_task, + ); + let _ = tx.send(result); + }); + + match rx.recv_timeout(std::time::Duration::from_secs(3)) { + Ok(Ok(message)) => Dynamic::from(message), + Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), + Err(_) => Dynamic::from("Website association scheduled."), + } + }); let state_clone3 = Arc::clone(&state); let user_clone3 = user.clone(); @@ -341,21 +336,15 @@ pub fn register_use_website_function(state: Arc, user: UserSession, en return; } }; - let result = associate_website_with_session( - &state_for_task, - &user_for_task, - &url_for_task, - ); + let result = + associate_website_with_session(&state_for_task, &user_for_task, &url_for_task); let _ = tx.send(result); }); match rx.recv_timeout(std::time::Duration::from_secs(3)) { Ok(Ok(message)) => Dynamic::from(message), Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), - Err(_) => { - Dynamic::from("Website association scheduled.") - } - Err(e) => Dynamic::from(format!("ERROR: USE_WEBSITE failed: {}", e)), + Err(_) => Dynamic::from("Website association scheduled."), } }); @@ -395,21 +384,15 @@ pub fn register_use_website_function(state: Arc, user: UserSession, en return; } }; - let result = associate_website_with_session( - &state_for_task, - &user_for_task, - &url_for_task, - ); + let result = + associate_website_with_session(&state_for_task, &user_for_task, &url_for_task); let _ = tx.send(result); }); match rx.recv_timeout(std::time::Duration::from_secs(3)) { Ok(Ok(message)) => Dynamic::from(message), Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), - Err(_) => { - Dynamic::from("Website association scheduled.") - } - Err(e) => Dynamic::from(format!("ERROR: use_website failed: {}", e)), + Err(_) => Dynamic::from("Website association scheduled."), } }); @@ -430,7 +413,10 @@ fn associate_website_with_session_refresh( url: &str, refresh_interval: &str, ) -> Result { - info!("Associating website {} with session {} (refresh: {})", url, user.id, refresh_interval); + info!( + "Associating website {} with session {} (refresh: {})", + url, user.id, refresh_interval + ); let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; @@ -440,22 +426,34 @@ fn associate_website_with_session_refresh( #[diesel(sql_type = diesel::sql_types::Text)] name: String, } - + let bot_name_result: BotName = diesel::sql_query("SELECT name FROM bots WHERE id = $1") .bind::(&user.bot_id) .get_result(&mut conn) .map_err(|e| format!("Failed to get bot name: {}", e))?; - let collection_name = format!("{}_website_{}", bot_name_result.name, sanitize_url_for_collection(url)); + let collection_name = format!( + "{}_website_{}", + bot_name_result.name, + sanitize_url_for_collection(url) + ); let website_status = check_website_crawl_status(&mut conn, &user.bot_id, url)?; match website_status { WebsiteCrawlStatus::NotRegistered => { // Auto-register website for crawling instead of failing - info!("Website {} not registered, auto-registering for crawling with refresh: {}", url, refresh_interval); - register_website_for_crawling_with_refresh(&mut conn, &user.bot_id, url, refresh_interval) - .map_err(|e| format!("Failed to register website: {}", e))?; + info!( + "Website {} not registered, auto-registering for crawling with refresh: {}", + url, refresh_interval + ); + register_website_for_crawling_with_refresh( + &mut conn, + &user.bot_id, + url, + refresh_interval, + ) + .map_err(|e| format!("Failed to register website: {}", e))?; // ADD TO SESSION EVEN IF CRAWL IS PENDING! // Otherwise kb_context will think the session has no website associated if start.bas only runs once. @@ -569,7 +567,10 @@ pub fn register_website_for_crawling_with_refresh( .execute(conn) .map_err(|e| format!("Failed to register website for crawling: {}", e))?; - info!("Website {} registered for crawling for bot {} with refresh policy: {}", url, bot_id, refresh_interval); + info!( + "Website {} registered for crawling for bot {} with refresh policy: {}", + url, bot_id, refresh_interval + ); Ok(()) } @@ -591,7 +592,7 @@ fn update_refresh_policy_if_shorter( } let current = diesel::sql_query( - "SELECT refresh_policy FROM website_crawls WHERE bot_id = $1 AND url = $2" + "SELECT refresh_policy FROM website_crawls WHERE bot_id = $1 AND url = $2", ) .bind::(bot_id) .bind::(url) @@ -628,7 +629,10 @@ fn update_refresh_policy_if_shorter( .execute(conn) .map_err(|e| format!("Failed to update refresh policy: {}", e))?; - info!("Refresh policy updated to {} for {} - immediate crawl scheduled", refresh_interval, url); + info!( + "Refresh policy updated to {} for {} - immediate crawl scheduled", + refresh_interval, url + ); } Ok(()) @@ -648,7 +652,12 @@ pub fn execute_use_website_preprocessing_with_refresh( bot_id: Uuid, refresh_interval: &str, ) -> Result> { - trace!("Preprocessing USE_WEBSITE: {}, bot_id: {:?}, refresh: {}", url, bot_id, refresh_interval); + trace!( + "Preprocessing USE_WEBSITE: {}, bot_id: {:?}, refresh: {}", + url, + bot_id, + refresh_interval + ); if !url.starts_with("http://") && !url.starts_with("https://") { return Err(format!( diff --git a/src/core/bot/mod.rs b/src/core/bot/mod.rs index 4c837dc5..4c063a04 100644 --- a/src/core/bot/mod.rs +++ b/src/core/bot/mod.rs @@ -475,12 +475,21 @@ impl BotOrchestrator { let state_clone = self.state.clone(); tokio::task::spawn_blocking( move || -> Result<_, Box> { - let session = { + let mut session = { let mut sm = state_clone.session_manager.blocking_lock(); sm.get_session_by_id(session_id)? } .ok_or("Session not found")?; + // Store WebSocket session_id in context for TALK routing + if let serde_json::Value::Object(ref mut map) = session.context_data { + map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); + } else { + let mut map = serde_json::Map::new(); + map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); + session.context_data = serde_json::Value::Object(map); + } + if !message.content.trim().is_empty() { let mut sm = state_clone.session_manager.blocking_lock(); sm.save_message(session.id, user_id, 1, &message.content, 1)?; @@ -649,11 +658,11 @@ impl BotOrchestrator { // If message content is empty, we stop here after potentially running start.bas. // This happens when the bot is activated by its name in WhatsApp, where an empty string is sent as a signal. if message_content.trim().is_empty() { - let user_id_str = message.user_id.clone(); + let bot_id_str = message.bot_id.clone(); let session_id_str = message.session_id.clone(); #[cfg(feature = "chat")] - let suggestions = get_suggestions(self.state.cache.as_ref(), &user_id_str, &session_id_str); + let suggestions = get_suggestions(self.state.cache.as_ref(), &bot_id_str, &session_id_str); #[cfg(not(feature = "chat"))] let suggestions: Vec = Vec::new(); @@ -1089,12 +1098,12 @@ impl BotOrchestrator { ) .await??; - // Extract user_id and session_id before moving them into BotResponse - let user_id_str = message.user_id.clone(); + // Extract bot_id and session_id before moving them into BotResponse + let bot_id_str = message.bot_id.clone(); let session_id_str = message.session_id.clone(); #[cfg(feature = "chat")] - let suggestions = get_suggestions(self.state.cache.as_ref(), &user_id_str, &session_id_str); + let suggestions = get_suggestions(self.state.cache.as_ref(), &bot_id_str, &session_id_str); #[cfg(not(feature = "chat"))] let suggestions: Vec = Vec::new(); @@ -1375,7 +1384,9 @@ async fn handle_websocket( ); let state_for_start = state.clone(); - let _tx_for_start = tx.clone(); + let tx_for_start = tx.clone(); + let bot_id_str = bot_id.to_string(); + let session_id_str = session_id.to_string(); let mut send_ready_rx = send_ready_rx; tokio::spawn(async move { @@ -1390,9 +1401,18 @@ async fn handle_websocket( } }; - if let Ok(Some(session)) = session_result { + if let Ok(Some(mut session)) = session_result { info!("start.bas: Found session {} for websocket session {}", session.id, session_id); + // Store WebSocket session_id in context so TALK can route messages correctly + if let serde_json::Value::Object(ref mut map) = session.context_data { + map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); + } else { + let mut map = serde_json::Map::new(); + map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string())); + session.context_data = serde_json::Value::Object(map); + } + // Clone state_for_start for use in Redis SET after execution let state_for_redis = state_for_start.clone(); @@ -1430,6 +1450,42 @@ async fn handle_websocket( info!("Marked start.bas as executed for session {}", session_id); } } + + // Fetch suggestions from Redis and send to frontend + let user_id_str = user_id.to_string(); + let suggestions = get_suggestions(state_for_redis.cache.as_ref(), &bot_id_str, &session_id_str); + if !suggestions.is_empty() { + info!("Sending {} suggestions to frontend for session {}", suggestions.len(), session_id); + let response = BotResponse { + bot_id: bot_id_str.clone(), + user_id: user_id_str.clone(), + session_id: session_id_str.clone(), + channel: "Chat".to_string(), + content: String::new(), + message_type: MessageType::BOT_RESPONSE, + stream_token: None, + is_complete: true, + suggestions, + context_name: None, + context_length: 0, + context_max_length: 0, + }; + let _ = tx_for_start.send(response).await; + + // Clear suggestions and start_bas_executed key to allow re-run on next page load + if let Some(cache) = &state_for_redis.cache { + if let Ok(mut conn) = cache.get_multiplexed_async_connection().await { + let suggestions_key = format!("suggestions:{}:{}", bot_id_str, session_id_str); + let start_bas_key = format!("start_bas_executed:{}:{}", server_epoch(), session_id_str); + let _: Result<(), redis::RedisError> = redis::cmd("DEL") + .arg(&suggestions_key) + .arg(&start_bas_key) + .query_async(&mut conn) + .await; + info!("Cleared suggestions and start_bas_executed from Redis for session {}", session_id); + } + } + } } Ok(Err(e)) => { error!("start.bas error for bot {}: {}", bot_name, e); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 3ed53fcc..241bba52 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -4,7 +4,7 @@ use axum::{ http::StatusCode, response::{IntoResponse, Json}, }; -use log::error; +use log::{error, info}; use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; @@ -51,6 +51,12 @@ pub async fn auth_handler( let existing_user_id = params .get("user_id") .and_then(|s| Uuid::parse_str(s).ok()); + let existing_session_id = params + .get("session_id") + .and_then(|s| Uuid::parse_str(s).ok()); + + info!("Auth handler called: bot_name={}, existing_user_id={:?}, existing_session_id={:?}", + bot_name, existing_user_id, existing_session_id); let user_id = { let mut sm = state.session_manager.lock().await; @@ -118,21 +124,63 @@ pub async fn auth_handler( let session = { let mut sm = state.session_manager.lock().await; - match sm.get_or_create_user_session(user_id, bot_id, "Auth Session") { - Ok(Some(sess)) => sess, - Ok(None) => { - error!("Failed to create session"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": "Failed to create session" })), - ); + + // Try to get existing session by ID first + if let Some(existing_session_id) = existing_session_id { + info!("Attempting to get existing session: {}", existing_session_id); + match sm.get_session_by_id(existing_session_id) { + Ok(Some(sess)) => { + info!("Successfully retrieved existing session: {}", sess.id); + sess + } + Ok(None) => { + // Session not found, create a new one + info!("Session {} not found in database, creating new session", existing_session_id); + match sm.create_session(user_id, bot_id, "Auth Session") { + Ok(sess) => sess, + Err(e) => { + error!("Failed to create session: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": e.to_string() })), + ); + } + } + } + Err(e) => { + // Error getting session, create a new one + error!("Error getting session {}: {}", existing_session_id, e); + match sm.create_session(user_id, bot_id, "Auth Session") { + Ok(sess) => sess, + Err(e) => { + error!("Failed to create session: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": e.to_string() })), + ); + } + } + } } - Err(e) => { - error!("Failed to create session: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": e.to_string() })), - ); + } else { + // No session_id provided, get or create session + info!("No session_id provided, getting or creating session for user {}", user_id); + match sm.get_or_create_user_session(user_id, bot_id, "Auth Session") { + Ok(Some(sess)) => sess, + Ok(None) => { + error!("Failed to create session"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": "Failed to create session" })), + ); + } + Err(e) => { + error!("Failed to create session: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ "error": e.to_string() })), + ); + } } } };