fix: send suggestions separately from TALK, clear Redis keys for refresh

- Remove suggestions fetching from TALK function
- WebSocket handler now fetches and sends suggestions after start.bas executes
- Clear suggestions and start_bas_executed keys to allow re-run on refresh
- Decouple TALK from suggestions handling
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-03-19 09:53:39 -03:00
parent 2fcfb05fd6
commit d6ebd0cf6e
9 changed files with 597 additions and 193 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "botserver" name = "botserver"
version = "6.2.0" version = "6.3.0"
edition = "2021" edition = "2021"
resolver = "2" resolver = "2"

View file

@ -0,0 +1,181 @@
#!/bin/bash
#
# Migrate LXD containers from ZFS to Incus directory storage
# Usage: ./migrate_zfs_to_incus_dir.sh [options]
#
# Options:
# --dry-run Show what would be done without executing
# --source-pool NAME LXD storage pool name (default: default)
# --dest-pool NAME Incus storage pool name (default: default)
# --containers LIST Comma-separated list of containers (default: all)
# --incus-host HOST Remote Incus host (optional)
# --help Show this help
set -e
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# Defaults
DRY_RUN=false
SOURCE_POOL="default"
DEST_POOL="default"
CONTAINERS=""
INCUS_HOST=""
BACKUP_DIR="/tmp/lxd-backups"
# Print banner
print_banner() {
echo -e "${BLUE}"
echo "╔════════════════════════════════════════════════════════════╗"
echo "║ LXD to Incus Migration Tool (ZFS -> Directory Storage) ║"
echo "╚════════════════════════════════════════════════════════════╝"
echo -e "${NC}"
}
# Show help
show_help() {
echo "Usage: $0 [options]"
echo ""
echo "Migrate LXD containers from ZFS storage to Incus directory storage."
echo ""
echo "Options:"
echo " --dry-run Show what would be done without executing"
echo " --source-pool NAME LXD storage pool name (default: default)"
echo " --dest-pool NAME Incus storage pool name (default: default)"
echo " --containers LIST Comma-separated list of containers (default: all)"
echo " --incus-host HOST Remote Incus host (ssh)"
echo " --help Show this help"
echo ""
echo "Examples:"
echo " $0 --dry-run"
echo " $0 --source-pool zfs-storage --dest-pool dir-storage"
echo " $0 --containers container1,container2 --incus-host user@remote-host"
}
# Check dependencies
check_dependencies() {
local missing=()
if ! command -v lxc &> /dev/null; then
missing+=("lxc")
fi
if ! command -v incus &> /dev/null; then
missing+=("incus")
fi
if [ ${#missing[@]} -gt 0 ]; then
echo -e "${RED}Missing dependencies: ${missing[*]}${NC}"
echo "Please install them and try again."
exit 1
fi
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--dry-run)
DRY_RUN=true
shift
;;
--source-pool)
SOURCE_POOL="$2"
shift 2
;;
--dest-pool)
DEST_POOL="$2"
shift 2
;;
--containers)
CONTAINERS="$2"
shift 2
;;
--incus-host)
INCUS_HOST="$2"
shift 2
;;
-h|--help)
show_help
exit 0
;;
*)
echo -e "${RED}Unknown option: $1${NC}"
show_help
exit 1
;;
esac
done
# Execute command (dry-run aware)
exec_cmd() {
local cmd="$1"
if [ "$DRY_RUN" = true ]; then
echo -e "${YELLOW}[DRY-RUN] $cmd${NC}"
else
echo -e "${GREEN}[EXEC] $cmd${NC}"
eval "$cmd"
fi
}
# Main migration logic
migrate_container() {
local container="$1"
local backup_file="$BACKUP_DIR/${container}-$(date +%Y%m%d%H%M%S).tar.gz"
echo -e "${BLUE}Migrating container: $container${NC}"
# Create backup directory
exec_cmd "mkdir -p $BACKUP_DIR"
# Create snapshot
local snapshot_name="migrate-$(date +%Y%m%d%H%M%S)"
exec_cmd "lxc snapshot $container $snapshot_name --stateful"
# Export container
exec_cmd "lxc export $container $backup_file --snapshot $snapshot_name"
# Transfer to remote Incus host if specified
if [ -n "$INCUS_HOST" ]; then
exec_cmd "scp $backup_file $INCUS_HOST:$BACKUP_DIR/"
exec_cmd "rm $backup_file"
backup_file="$BACKUP_DIR/$(basename $backup_file)"
fi
# Import into Incus
local import_cmd="incus import $backup_file $container"
if [ -n "$INCUS_HOST" ]; then
import_cmd="ssh $INCUS_HOST '$import_cmd'"
fi
exec_cmd "$import_cmd"
# Cleanup snapshot
exec_cmd "lxc delete $container/$snapshot_name"
# Cleanup local backup file if not remote
if [ -z "$INCUS_HOST" ] && [ "$DRY_RUN" = false ]; then
rm "$backup_file"
fi
echo -e "${GREEN}Completed migration for: $container${NC}"
}
# Main
print_banner
check_dependencies
# Get list of containers
if [ -z "$CONTAINERS" ]; then
CONTAINERS=$(lxc list --format csv | cut -d',' -f1)
fi
# Convert comma-separated list to array
IFS=',' read -ra CONTAINER_ARRAY <<< "$CONTAINERS"
# Migrate each container
for container in "${CONTAINER_ARRAY[@]}"; do
migrate_container "$container"
done
echo -e "${GREEN}Migration complete!${NC}"

View file

@ -1181,3 +1181,131 @@ pub fn configure_goals_routes() -> Router<Arc<AppState>> {
.route("/api/goals/templates", get(list_templates)) .route("/api/goals/templates", get(list_templates))
.route("/api/goals/ai/suggest", post(ai_suggest)) .route("/api/goals/ai/suggest", post(ai_suggest))
} }
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use uuid::Uuid;
#[test]
fn test_objective_record_creation() {
let now = Utc::now();
let objective = ObjectiveRecord {
id: Uuid::new_v4(),
org_id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
owner_id: Uuid::new_v4(),
parent_id: None,
title: "Test Objective".to_string(),
description: Some("Test description".to_string()),
period: "Q1 2025".to_string(),
period_start: None,
period_end: None,
status: "draft".to_string(),
progress: BigDecimal::from(0),
visibility: "team".to_string(),
weight: BigDecimal::from(1),
tags: vec![Some("test".to_string())],
created_at: now,
updated_at: now,
};
assert_eq!(objective.title, "Test Objective");
assert_eq!(objective.status, "draft");
assert_eq!(objective.progress, BigDecimal::from(0));
}
#[test]
fn test_key_result_record_creation() {
let now = Utc::now();
let key_result = KeyResultRecord {
id: Uuid::new_v4(),
org_id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
objective_id: Uuid::new_v4(),
owner_id: Uuid::new_v4(),
title: "Test Key Result".to_string(),
description: Some("Test KR description".to_string()),
metric_type: "numeric".to_string(),
start_value: BigDecimal::from(0),
target_value: BigDecimal::from(100),
current_value: BigDecimal::from(0),
unit: Some("units".to_string()),
weight: BigDecimal::from(1),
status: "not_started".to_string(),
due_date: None,
scoring_type: "linear".to_string(),
created_at: now,
updated_at: now,
};
assert_eq!(key_result.title, "Test Key Result");
assert_eq!(key_result.metric_type, "numeric");
assert_eq!(key_result.target_value, BigDecimal::from(100));
assert_eq!(key_result.status, "not_started");
}
#[test]
fn test_check_in_record_creation() {
let now = Utc::now();
let check_in = CheckInRecord {
id: Uuid::new_v4(),
org_id: Uuid::new_v4(),
bot_id: Uuid::new_v4(),
key_result_id: Uuid::new_v4(),
user_id: Uuid::new_v4(),
previous_value: Some(BigDecimal::from(0)),
new_value: BigDecimal::from(50),
note: Some("Progress update".to_string()),
confidence: Some("high".to_string()),
blockers: Some("No blockers".to_string()),
created_at: now,
};
assert_eq!(check_in.new_value, BigDecimal::from(50));
assert_eq!(check_in.confidence, Some("high".to_string()));
}
#[test]
fn test_goal_template_creation() {
let template = GoalTemplate {
id: Uuid::new_v4(),
organization_id: Uuid::new_v4(),
name: "Test Template".to_string(),
description: Some("Test template description".to_string()),
category: Some("product".to_string()),
objective_template: ObjectiveTemplate {
title: "Template Objective".to_string(),
description: "Template objective description".to_string(),
},
key_result_templates: vec![KeyResultTemplate {
title: "Template KR".to_string(),
description: "Template KR description".to_string(),
metric_type: "numeric".to_string(),
start_value: BigDecimal::from(0),
target_value: BigDecimal::from(100),
unit: Some("units".to_string()),
}],
is_system: false,
created_at: Utc::now(),
};
assert_eq!(template.name, "Test Template");
assert_eq!(template.objective_template.title, "Template Objective");
assert_eq!(template.key_result_templates.len(), 1);
assert!(!template.is_system);
}
#[test]
fn test_goals_error_display() {
let db_error = GoalsError::Database("Connection failed".to_string());
assert!(format!("{}", db_error).contains("Database error"));
let not_found = GoalsError::NotFound("Objective not found".to_string());
assert!(format!("{}", not_found).contains("not found"));
let validation = GoalsError::Validation("Invalid input".to_string());
assert!(format!("{}", validation).contains("Validation error"));
}
}

View file

@ -367,13 +367,13 @@ fn add_tool_suggestion(
/// Note: This function clears suggestions from Redis after fetching them to prevent duplicates /// Note: This function clears suggestions from Redis after fetching them to prevent duplicates
pub fn get_suggestions( pub fn get_suggestions(
cache: Option<&Arc<redis::Client>>, cache: Option<&Arc<redis::Client>>,
user_id: &str, bot_id: &str,
session_id: &str, session_id: &str,
) -> Vec<crate::core::shared::models::Suggestion> { ) -> Vec<crate::core::shared::models::Suggestion> {
let mut suggestions = Vec::new(); let mut suggestions = Vec::new();
if let Some(cache_client) = cache { if let Some(cache_client) = cache {
let redis_key = format!("suggestions:{}:{}", user_id, session_id); let redis_key = format!("suggestions:{}:{}", bot_id, session_id);
let mut conn = match cache_client.get_connection() { let mut conn = match cache_client.get_connection() {
Ok(conn) => conn, Ok(conn) => conn,

View file

@ -161,6 +161,7 @@ fn register_hear_as_type(state: Arc<AppState>, user: UserSession, engine: &mut E
fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut Engine) { fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
let session_id = user.id; let session_id = user.id;
let bot_id = user.bot_id;
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
engine engine
@ -207,10 +208,11 @@ fn register_hear_as_menu(state: Arc<AppState>, user: UserSession, engine: &mut E
// Store suggestions in Redis for UI // Store suggestions in Redis for UI
let state_for_suggestions = Arc::clone(&state_clone); let state_for_suggestions = Arc::clone(&state_clone);
let opts_clone = options.clone(); let opts_clone = options.clone();
let bot_id_clone = bot_id;
tokio::runtime::Handle::current().block_on(async move { tokio::runtime::Handle::current().block_on(async move {
if let Some(redis) = &state_for_suggestions.cache { if let Some(redis) = &state_for_suggestions.cache {
if let Ok(mut conn) = redis.get_multiplexed_async_connection().await { if let Ok(mut conn) = redis.get_multiplexed_async_connection().await {
let key = format!("suggestions:{session_id}:{session_id}"); let key = format!("suggestions:{}:{}", bot_id_clone, session_id);
for opt in &opts_clone { for opt in &opts_clone {
let _: Result<(), _> = redis::cmd("RPUSH") let _: Result<(), _> = redis::cmd("RPUSH")
.arg(&key) .arg(&key)

View file

@ -13,33 +13,6 @@ pub async fn execute_talk(
message: String, message: String,
) -> Result<BotResponse, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<BotResponse, Box<dyn std::error::Error + Send + Sync>> {
info!("TALK called with message: {}", message); info!("TALK called with message: {}", message);
let mut suggestions = Vec::new();
if let Some(redis_client) = &state.cache {
if let Ok(mut conn) = redis_client.get_multiplexed_async_connection().await {
let redis_key = format!("suggestions:{}:{}", user_session.bot_id, user_session.id);
info!("TALK: Fetching suggestions from Redis key: {}", redis_key);
let suggestions_json: Result<Vec<String>, _> = redis::cmd("LRANGE")
.arg(redis_key.as_str())
.arg(0)
.arg(-1)
.query_async(&mut conn)
.await;
if let Ok(suggestions_list) = suggestions_json {
info!("TALK: Got {} suggestions from Redis", suggestions_list.len());
suggestions = suggestions_list
.into_iter()
.filter_map(|s| serde_json::from_str(&s).ok())
.collect();
} else {
info!("TALK: No suggestions found in Redis");
}
}
} else {
info!("TALK: No cache configured");
}
let channel = user_session let channel = user_session
.context_data .context_data
@ -68,7 +41,7 @@ pub async fn execute_talk(
message_type: MessageType::BOT_RESPONSE, message_type: MessageType::BOT_RESPONSE,
stream_token: None, stream_token: None,
is_complete: true, is_complete: true,
suggestions, suggestions: Vec::new(),
context_name: None, context_name: None,
context_length: 0, context_length: 0,
context_max_length: 0, context_max_length: 0,
@ -96,17 +69,24 @@ pub async fn execute_talk(
} }
}); });
} else { } else {
let user_id = user_session.id.to_string(); // Use WebSocket session_id from context if available, otherwise fall back to session.id
let target_session_id = user_session
.context_data
.get("websocket_session_id")
.and_then(|v| v.as_str())
.unwrap_or(&user_session.id.to_string())
.to_string();
let web_adapter = Arc::clone(&state.web_adapter); let web_adapter = Arc::clone(&state.web_adapter);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = web_adapter if let Err(e) = web_adapter
.send_message_to_session(&user_id, response_clone) .send_message_to_session(&target_session_id, response_clone)
.await .await
{ {
error!("Failed to send TALK message via web adapter: {}", e); error!("Failed to send TALK message via web adapter: {}", e);
} else { } else {
trace!("TALK message sent via web adapter"); trace!("TALK message sent via web adapter to session {}", target_session_id);
} }
}); });
} }

View file

@ -13,29 +13,33 @@ fn parse_refresh_interval(interval: &str) -> Result<i32, String> {
// Match patterns like "1d", "7d", "2w", "1m", "1y", etc. // Match patterns like "1d", "7d", "2w", "1m", "1y", etc.
if interval_lower.ends_with('d') { if interval_lower.ends_with('d') {
let days: i32 = interval_lower[..interval_lower.len()-1] let days: i32 = interval_lower[..interval_lower.len() - 1]
.parse() .parse()
.map_err(|_| format!("Invalid days format: {}", interval))?; .map_err(|_| format!("Invalid days format: {}", interval))?;
Ok(days) Ok(days)
} else if interval_lower.ends_with('w') { } else if interval_lower.ends_with('w') {
let weeks: i32 = interval_lower[..interval_lower.len()-1] let weeks: i32 = interval_lower[..interval_lower.len() - 1]
.parse() .parse()
.map_err(|_| format!("Invalid weeks format: {}", interval))?; .map_err(|_| format!("Invalid weeks format: {}", interval))?;
Ok(weeks * 7) Ok(weeks * 7)
} else if interval_lower.ends_with('m') { } else if interval_lower.ends_with('m') {
let months: i32 = interval_lower[..interval_lower.len()-1] let months: i32 = interval_lower[..interval_lower.len() - 1]
.parse() .parse()
.map_err(|_| format!("Invalid months format: {}", interval))?; .map_err(|_| format!("Invalid months format: {}", interval))?;
Ok(months * 30) // Approximate month as 30 days Ok(months * 30) // Approximate month as 30 days
} else if interval_lower.ends_with('y') { } else if interval_lower.ends_with('y') {
let years: i32 = interval_lower[..interval_lower.len()-1] let years: i32 = interval_lower[..interval_lower.len() - 1]
.parse() .parse()
.map_err(|_| format!("Invalid years format: {}", interval))?; .map_err(|_| format!("Invalid years format: {}", interval))?;
Ok(years * 365) // Approximate year as 365 days Ok(years * 365) // Approximate year as 365 days
} else { } else {
// Try to parse as plain number (assume days) // Try to parse as plain number (assume days)
interval.parse() interval.parse().map_err(|_| {
.map_err(|_| format!("Invalid refresh interval format: {}. Use format like '1d', '1w', '1m', '1y'", interval)) format!(
"Invalid refresh interval format: {}. Use format like '1d', '1w', '1m', '1y'",
interval
)
})
} }
} }
@ -192,118 +196,109 @@ pub fn register_use_website_function(state: Arc<AppState>, user: UserSession, en
let user_clone = user.clone(); let user_clone = user.clone();
// Register USE_WEBSITE(url, refresh) with both parameters (uppercase) // Register USE_WEBSITE(url, refresh) with both parameters (uppercase)
engine.register_fn( engine.register_fn("USE_WEBSITE", move |url: &str, refresh: &str| -> Dynamic {
"USE_WEBSITE", trace!(
move |url: &str, refresh: &str| -> Dynamic { "USE_WEBSITE function called: {} REFRESH {} for session: {}",
trace!( url,
"USE_WEBSITE function called: {} REFRESH {} for session: {}", refresh,
url, user_clone.id
refresh, );
user_clone.id
let is_valid = url.starts_with("http://") || url.starts_with("https://");
if !is_valid {
return Dynamic::from(format!(
"ERROR: Invalid URL format: {}. Must start with http:// or https://",
url
));
}
let state_for_task = Arc::clone(&state_clone);
let user_for_task = user_clone.clone();
let url_for_task = url.to_string();
let refresh_for_task = refresh.to_string();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(_rt) => _rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e)));
return;
}
};
let result = associate_website_with_session_refresh(
&state_for_task,
&user_for_task,
&url_for_task,
&refresh_for_task,
); );
let _ = tx.send(result);
});
let is_valid = url.starts_with("http://") || url.starts_with("https://"); match rx.recv_timeout(std::time::Duration::from_secs(3)) {
if !is_valid { Ok(Ok(message)) => Dynamic::from(message),
return Dynamic::from(format!( Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
"ERROR: Invalid URL format: {}. Must start with http:// or https://", Err(_) => Dynamic::from("Website association scheduled."),
url }
)); });
}
let state_for_task = Arc::clone(&state_clone);
let user_for_task = user_clone.clone();
let url_for_task = url.to_string();
let refresh_for_task = refresh.to_string();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
{
Ok(_rt) => _rt,
Err(e) => {
let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e)));
return;
}
};
let result = associate_website_with_session_refresh(
&state_for_task,
&user_for_task,
&url_for_task,
&refresh_for_task,
);
let _ = tx.send(result);
});
match rx.recv_timeout(std::time::Duration::from_secs(3)) {
Ok(Ok(message)) => Dynamic::from(message),
Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
Err(_) => Dynamic::from("Website association scheduled."),
}
},
);
let state_clone2 = Arc::clone(&state); let state_clone2 = Arc::clone(&state);
let user_clone2 = user.clone(); let user_clone2 = user.clone();
// Register use_website(url, refresh) with both parameters (lowercase for preprocessor) // Register use_website(url, refresh) with both parameters (lowercase for preprocessor)
engine.register_fn( engine.register_fn("use_website", move |url: &str, refresh: &str| -> Dynamic {
"use_website", trace!(
move |url: &str, refresh: &str| -> Dynamic { "use_website function called: {} REFRESH {} for session: {}",
trace!( url,
"use_website function called: {} REFRESH {} for session: {}", refresh,
url, user_clone2.id
refresh, );
user_clone2.id
);
let is_valid = url.starts_with("http://") || url.starts_with("https://"); let is_valid = url.starts_with("http://") || url.starts_with("https://");
if !is_valid { if !is_valid {
return Dynamic::from(format!( return Dynamic::from(format!(
"ERROR: Invalid URL format: {}. Must start with http:// or https://", "ERROR: Invalid URL format: {}. Must start with http:// or https://",
url url
)); ));
} }
let state_for_task = Arc::clone(&state_clone2); let state_for_task = Arc::clone(&state_clone2);
let user_for_task = user_clone2.clone(); let user_for_task = user_clone2.clone();
let url_for_task = url.to_string(); let url_for_task = url.to_string();
let refresh_for_task = refresh.to_string(); let refresh_for_task = refresh.to_string();
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
let _rt = match tokio::runtime::Builder::new_multi_thread() let _rt = match tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) .worker_threads(2)
.enable_all() .enable_all()
.build() .build()
{ {
Ok(_rt) => _rt, Ok(_rt) => _rt,
Err(e) => { Err(e) => {
let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e))); let _ = tx.send(Err(format!("Failed to build tokio runtime: {}", e)));
return; return;
}
};
let result = associate_website_with_session_refresh(
&state_for_task,
&user_for_task,
&url_for_task,
&refresh_for_task,
);
let _ = tx.send(result);
});
match rx.recv_timeout(std::time::Duration::from_secs(3)) {
Ok(Ok(message)) => Dynamic::from(message),
Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
Err(_) => {
Dynamic::from("Website association scheduled.")
} }
Err(e) => Dynamic::from(format!("ERROR: use_website failed: {}", e)), };
} let result = associate_website_with_session_refresh(
}, &state_for_task,
); &user_for_task,
&url_for_task,
&refresh_for_task,
);
let _ = tx.send(result);
});
match rx.recv_timeout(std::time::Duration::from_secs(3)) {
Ok(Ok(message)) => Dynamic::from(message),
Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
Err(_) => Dynamic::from("Website association scheduled."),
}
});
let state_clone3 = Arc::clone(&state); let state_clone3 = Arc::clone(&state);
let user_clone3 = user.clone(); let user_clone3 = user.clone();
@ -341,21 +336,15 @@ pub fn register_use_website_function(state: Arc<AppState>, user: UserSession, en
return; return;
} }
}; };
let result = associate_website_with_session( let result =
&state_for_task, associate_website_with_session(&state_for_task, &user_for_task, &url_for_task);
&user_for_task,
&url_for_task,
);
let _ = tx.send(result); let _ = tx.send(result);
}); });
match rx.recv_timeout(std::time::Duration::from_secs(3)) { match rx.recv_timeout(std::time::Duration::from_secs(3)) {
Ok(Ok(message)) => Dynamic::from(message), Ok(Ok(message)) => Dynamic::from(message),
Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
Err(_) => { Err(_) => Dynamic::from("Website association scheduled."),
Dynamic::from("Website association scheduled.")
}
Err(e) => Dynamic::from(format!("ERROR: USE_WEBSITE failed: {}", e)),
} }
}); });
@ -395,21 +384,15 @@ pub fn register_use_website_function(state: Arc<AppState>, user: UserSession, en
return; return;
} }
}; };
let result = associate_website_with_session( let result =
&state_for_task, associate_website_with_session(&state_for_task, &user_for_task, &url_for_task);
&user_for_task,
&url_for_task,
);
let _ = tx.send(result); let _ = tx.send(result);
}); });
match rx.recv_timeout(std::time::Duration::from_secs(3)) { match rx.recv_timeout(std::time::Duration::from_secs(3)) {
Ok(Ok(message)) => Dynamic::from(message), Ok(Ok(message)) => Dynamic::from(message),
Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)), Ok(Err(e)) => Dynamic::from(format!("ERROR: {}", e)),
Err(_) => { Err(_) => Dynamic::from("Website association scheduled."),
Dynamic::from("Website association scheduled.")
}
Err(e) => Dynamic::from(format!("ERROR: use_website failed: {}", e)),
} }
}); });
@ -430,7 +413,10 @@ fn associate_website_with_session_refresh(
url: &str, url: &str,
refresh_interval: &str, refresh_interval: &str,
) -> Result<String, String> { ) -> Result<String, String> {
info!("Associating website {} with session {} (refresh: {})", url, user.id, refresh_interval); info!(
"Associating website {} with session {} (refresh: {})",
url, user.id, refresh_interval
);
let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?;
@ -440,22 +426,34 @@ fn associate_website_with_session_refresh(
#[diesel(sql_type = diesel::sql_types::Text)] #[diesel(sql_type = diesel::sql_types::Text)]
name: String, name: String,
} }
let bot_name_result: BotName = diesel::sql_query("SELECT name FROM bots WHERE id = $1") let bot_name_result: BotName = diesel::sql_query("SELECT name FROM bots WHERE id = $1")
.bind::<diesel::sql_types::Uuid, _>(&user.bot_id) .bind::<diesel::sql_types::Uuid, _>(&user.bot_id)
.get_result(&mut conn) .get_result(&mut conn)
.map_err(|e| format!("Failed to get bot name: {}", e))?; .map_err(|e| format!("Failed to get bot name: {}", e))?;
let collection_name = format!("{}_website_{}", bot_name_result.name, sanitize_url_for_collection(url)); let collection_name = format!(
"{}_website_{}",
bot_name_result.name,
sanitize_url_for_collection(url)
);
let website_status = check_website_crawl_status(&mut conn, &user.bot_id, url)?; let website_status = check_website_crawl_status(&mut conn, &user.bot_id, url)?;
match website_status { match website_status {
WebsiteCrawlStatus::NotRegistered => { WebsiteCrawlStatus::NotRegistered => {
// Auto-register website for crawling instead of failing // Auto-register website for crawling instead of failing
info!("Website {} not registered, auto-registering for crawling with refresh: {}", url, refresh_interval); info!(
register_website_for_crawling_with_refresh(&mut conn, &user.bot_id, url, refresh_interval) "Website {} not registered, auto-registering for crawling with refresh: {}",
.map_err(|e| format!("Failed to register website: {}", e))?; url, refresh_interval
);
register_website_for_crawling_with_refresh(
&mut conn,
&user.bot_id,
url,
refresh_interval,
)
.map_err(|e| format!("Failed to register website: {}", e))?;
// ADD TO SESSION EVEN IF CRAWL IS PENDING! // ADD TO SESSION EVEN IF CRAWL IS PENDING!
// Otherwise kb_context will think the session has no website associated if start.bas only runs once. // Otherwise kb_context will think the session has no website associated if start.bas only runs once.
@ -569,7 +567,10 @@ pub fn register_website_for_crawling_with_refresh(
.execute(conn) .execute(conn)
.map_err(|e| format!("Failed to register website for crawling: {}", e))?; .map_err(|e| format!("Failed to register website for crawling: {}", e))?;
info!("Website {} registered for crawling for bot {} with refresh policy: {}", url, bot_id, refresh_interval); info!(
"Website {} registered for crawling for bot {} with refresh policy: {}",
url, bot_id, refresh_interval
);
Ok(()) Ok(())
} }
@ -591,7 +592,7 @@ fn update_refresh_policy_if_shorter(
} }
let current = diesel::sql_query( let current = diesel::sql_query(
"SELECT refresh_policy FROM website_crawls WHERE bot_id = $1 AND url = $2" "SELECT refresh_policy FROM website_crawls WHERE bot_id = $1 AND url = $2",
) )
.bind::<diesel::sql_types::Uuid, _>(bot_id) .bind::<diesel::sql_types::Uuid, _>(bot_id)
.bind::<diesel::sql_types::Text, _>(url) .bind::<diesel::sql_types::Text, _>(url)
@ -628,7 +629,10 @@ fn update_refresh_policy_if_shorter(
.execute(conn) .execute(conn)
.map_err(|e| format!("Failed to update refresh policy: {}", e))?; .map_err(|e| format!("Failed to update refresh policy: {}", e))?;
info!("Refresh policy updated to {} for {} - immediate crawl scheduled", refresh_interval, url); info!(
"Refresh policy updated to {} for {} - immediate crawl scheduled",
refresh_interval, url
);
} }
Ok(()) Ok(())
@ -648,7 +652,12 @@ pub fn execute_use_website_preprocessing_with_refresh(
bot_id: Uuid, bot_id: Uuid,
refresh_interval: &str, refresh_interval: &str,
) -> Result<serde_json::Value, Box<dyn std::error::Error>> { ) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
trace!("Preprocessing USE_WEBSITE: {}, bot_id: {:?}, refresh: {}", url, bot_id, refresh_interval); trace!(
"Preprocessing USE_WEBSITE: {}, bot_id: {:?}, refresh: {}",
url,
bot_id,
refresh_interval
);
if !url.starts_with("http://") && !url.starts_with("https://") { if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(format!( return Err(format!(

View file

@ -475,12 +475,21 @@ impl BotOrchestrator {
let state_clone = self.state.clone(); let state_clone = self.state.clone();
tokio::task::spawn_blocking( tokio::task::spawn_blocking(
move || -> Result<_, Box<dyn std::error::Error + Send + Sync>> { move || -> Result<_, Box<dyn std::error::Error + Send + Sync>> {
let session = { let mut session = {
let mut sm = state_clone.session_manager.blocking_lock(); let mut sm = state_clone.session_manager.blocking_lock();
sm.get_session_by_id(session_id)? sm.get_session_by_id(session_id)?
} }
.ok_or("Session not found")?; .ok_or("Session not found")?;
// Store WebSocket session_id in context for TALK routing
if let serde_json::Value::Object(ref mut map) = session.context_data {
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
} else {
let mut map = serde_json::Map::new();
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
session.context_data = serde_json::Value::Object(map);
}
if !message.content.trim().is_empty() { if !message.content.trim().is_empty() {
let mut sm = state_clone.session_manager.blocking_lock(); let mut sm = state_clone.session_manager.blocking_lock();
sm.save_message(session.id, user_id, 1, &message.content, 1)?; sm.save_message(session.id, user_id, 1, &message.content, 1)?;
@ -649,11 +658,11 @@ impl BotOrchestrator {
// If message content is empty, we stop here after potentially running start.bas. // If message content is empty, we stop here after potentially running start.bas.
// This happens when the bot is activated by its name in WhatsApp, where an empty string is sent as a signal. // This happens when the bot is activated by its name in WhatsApp, where an empty string is sent as a signal.
if message_content.trim().is_empty() { if message_content.trim().is_empty() {
let user_id_str = message.user_id.clone(); let bot_id_str = message.bot_id.clone();
let session_id_str = message.session_id.clone(); let session_id_str = message.session_id.clone();
#[cfg(feature = "chat")] #[cfg(feature = "chat")]
let suggestions = get_suggestions(self.state.cache.as_ref(), &user_id_str, &session_id_str); let suggestions = get_suggestions(self.state.cache.as_ref(), &bot_id_str, &session_id_str);
#[cfg(not(feature = "chat"))] #[cfg(not(feature = "chat"))]
let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new(); let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new();
@ -1089,12 +1098,12 @@ impl BotOrchestrator {
) )
.await??; .await??;
// Extract user_id and session_id before moving them into BotResponse // Extract bot_id and session_id before moving them into BotResponse
let user_id_str = message.user_id.clone(); let bot_id_str = message.bot_id.clone();
let session_id_str = message.session_id.clone(); let session_id_str = message.session_id.clone();
#[cfg(feature = "chat")] #[cfg(feature = "chat")]
let suggestions = get_suggestions(self.state.cache.as_ref(), &user_id_str, &session_id_str); let suggestions = get_suggestions(self.state.cache.as_ref(), &bot_id_str, &session_id_str);
#[cfg(not(feature = "chat"))] #[cfg(not(feature = "chat"))]
let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new(); let suggestions: Vec<crate::core::shared::models::Suggestion> = Vec::new();
@ -1375,7 +1384,9 @@ async fn handle_websocket(
); );
let state_for_start = state.clone(); let state_for_start = state.clone();
let _tx_for_start = tx.clone(); let tx_for_start = tx.clone();
let bot_id_str = bot_id.to_string();
let session_id_str = session_id.to_string();
let mut send_ready_rx = send_ready_rx; let mut send_ready_rx = send_ready_rx;
tokio::spawn(async move { tokio::spawn(async move {
@ -1390,9 +1401,18 @@ async fn handle_websocket(
} }
}; };
if let Ok(Some(session)) = session_result { if let Ok(Some(mut session)) = session_result {
info!("start.bas: Found session {} for websocket session {}", session.id, session_id); info!("start.bas: Found session {} for websocket session {}", session.id, session_id);
// Store WebSocket session_id in context so TALK can route messages correctly
if let serde_json::Value::Object(ref mut map) = session.context_data {
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
} else {
let mut map = serde_json::Map::new();
map.insert("websocket_session_id".to_string(), serde_json::Value::String(session_id.to_string()));
session.context_data = serde_json::Value::Object(map);
}
// Clone state_for_start for use in Redis SET after execution // Clone state_for_start for use in Redis SET after execution
let state_for_redis = state_for_start.clone(); let state_for_redis = state_for_start.clone();
@ -1430,6 +1450,42 @@ async fn handle_websocket(
info!("Marked start.bas as executed for session {}", session_id); info!("Marked start.bas as executed for session {}", session_id);
} }
} }
// Fetch suggestions from Redis and send to frontend
let user_id_str = user_id.to_string();
let suggestions = get_suggestions(state_for_redis.cache.as_ref(), &bot_id_str, &session_id_str);
if !suggestions.is_empty() {
info!("Sending {} suggestions to frontend for session {}", suggestions.len(), session_id);
let response = BotResponse {
bot_id: bot_id_str.clone(),
user_id: user_id_str.clone(),
session_id: session_id_str.clone(),
channel: "Chat".to_string(),
content: String::new(),
message_type: MessageType::BOT_RESPONSE,
stream_token: None,
is_complete: true,
suggestions,
context_name: None,
context_length: 0,
context_max_length: 0,
};
let _ = tx_for_start.send(response).await;
// Clear suggestions and start_bas_executed key to allow re-run on next page load
if let Some(cache) = &state_for_redis.cache {
if let Ok(mut conn) = cache.get_multiplexed_async_connection().await {
let suggestions_key = format!("suggestions:{}:{}", bot_id_str, session_id_str);
let start_bas_key = format!("start_bas_executed:{}:{}", server_epoch(), session_id_str);
let _: Result<(), redis::RedisError> = redis::cmd("DEL")
.arg(&suggestions_key)
.arg(&start_bas_key)
.query_async(&mut conn)
.await;
info!("Cleared suggestions and start_bas_executed from Redis for session {}", session_id);
}
}
}
} }
Ok(Err(e)) => { Ok(Err(e)) => {
error!("start.bas error for bot {}: {}", bot_name, e); error!("start.bas error for bot {}: {}", bot_name, e);

View file

@ -4,7 +4,7 @@ use axum::{
http::StatusCode, http::StatusCode,
response::{IntoResponse, Json}, response::{IntoResponse, Json},
}; };
use log::error; use log::{error, info};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
@ -51,6 +51,12 @@ pub async fn auth_handler(
let existing_user_id = params let existing_user_id = params
.get("user_id") .get("user_id")
.and_then(|s| Uuid::parse_str(s).ok()); .and_then(|s| Uuid::parse_str(s).ok());
let existing_session_id = params
.get("session_id")
.and_then(|s| Uuid::parse_str(s).ok());
info!("Auth handler called: bot_name={}, existing_user_id={:?}, existing_session_id={:?}",
bot_name, existing_user_id, existing_session_id);
let user_id = { let user_id = {
let mut sm = state.session_manager.lock().await; let mut sm = state.session_manager.lock().await;
@ -118,21 +124,63 @@ pub async fn auth_handler(
let session = { let session = {
let mut sm = state.session_manager.lock().await; let mut sm = state.session_manager.lock().await;
match sm.get_or_create_user_session(user_id, bot_id, "Auth Session") {
Ok(Some(sess)) => sess, // Try to get existing session by ID first
Ok(None) => { if let Some(existing_session_id) = existing_session_id {
error!("Failed to create session"); info!("Attempting to get existing session: {}", existing_session_id);
return ( match sm.get_session_by_id(existing_session_id) {
StatusCode::INTERNAL_SERVER_ERROR, Ok(Some(sess)) => {
Json(serde_json::json!({ "error": "Failed to create session" })), info!("Successfully retrieved existing session: {}", sess.id);
); sess
}
Ok(None) => {
// Session not found, create a new one
info!("Session {} not found in database, creating new session", existing_session_id);
match sm.create_session(user_id, bot_id, "Auth Session") {
Ok(sess) => sess,
Err(e) => {
error!("Failed to create session: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": e.to_string() })),
);
}
}
}
Err(e) => {
// Error getting session, create a new one
error!("Error getting session {}: {}", existing_session_id, e);
match sm.create_session(user_id, bot_id, "Auth Session") {
Ok(sess) => sess,
Err(e) => {
error!("Failed to create session: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": e.to_string() })),
);
}
}
}
} }
Err(e) => { } else {
error!("Failed to create session: {}", e); // No session_id provided, get or create session
return ( info!("No session_id provided, getting or creating session for user {}", user_id);
StatusCode::INTERNAL_SERVER_ERROR, match sm.get_or_create_user_session(user_id, bot_id, "Auth Session") {
Json(serde_json::json!({ "error": e.to_string() })), Ok(Some(sess)) => sess,
); Ok(None) => {
error!("Failed to create session");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": "Failed to create session" })),
);
}
Err(e) => {
error!("Failed to create session: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": e.to_string() })),
);
}
} }
} }
}; };