diff --git a/migrations/core/6.2.1/down.sql b/migrations/core/6.2.1/down.sql new file mode 100644 index 000000000..3bb7d2cc3 --- /dev/null +++ b/migrations/core/6.2.1/down.sql @@ -0,0 +1,3 @@ +-- Remove the refresh_policy column from website_crawls table +ALTER TABLE website_crawls +DROP COLUMN IF EXISTS refresh_policy; diff --git a/migrations/core/6.2.1/up.sql b/migrations/core/6.2.1/up.sql new file mode 100644 index 000000000..8be92cbf5 --- /dev/null +++ b/migrations/core/6.2.1/up.sql @@ -0,0 +1,13 @@ +-- Add refresh_policy column to website_crawls table +-- This column stores the user-configured refresh interval (e.g., "1d", "1w", "1m", "1y") + +ALTER TABLE website_crawls +ADD COLUMN IF NOT EXISTS refresh_policy VARCHAR(20); + +-- Update existing records to have a default refresh policy (1 month) +UPDATE website_crawls +SET refresh_policy = '1m' +WHERE refresh_policy IS NULL; + +-- Add comment for documentation +COMMENT ON COLUMN website_crawls.refresh_policy IS 'User-configured refresh interval (e.g., "1d", "1w", "1m", "1y") - shortest interval is used when duplicates exist'; diff --git a/src/basic/keywords/use_website.rs b/src/basic/keywords/use_website.rs index 223179b52..d5286f25c 100644 --- a/src/basic/keywords/use_website.rs +++ b/src/basic/keywords/use_website.rs @@ -6,21 +6,64 @@ use rhai::{Dynamic, Engine}; use std::sync::Arc; use uuid::Uuid; +/// Parse refresh interval string (e.g., "1d", "1w", "1m", "1y") into days +/// Returns the number of days for the refresh interval +fn parse_refresh_interval(interval: &str) -> Result { + let interval_lower = interval.trim().to_lowercase(); + + // Match patterns like "1d", "7d", "2w", "1m", "1y", etc. + if interval_lower.ends_with('d') { + let days: i32 = interval_lower[..interval_lower.len()-1] + .parse() + .map_err(|_| format!("Invalid days format: {}", interval))?; + Ok(days) + } else if interval_lower.ends_with('w') { + let weeks: i32 = interval_lower[..interval_lower.len()-1] + .parse() + .map_err(|_| format!("Invalid weeks format: {}", interval))?; + Ok(weeks * 7) + } else if interval_lower.ends_with('m') { + let months: i32 = interval_lower[..interval_lower.len()-1] + .parse() + .map_err(|_| format!("Invalid months format: {}", interval))?; + Ok(months * 30) // Approximate month as 30 days + } else if interval_lower.ends_with('y') { + let years: i32 = interval_lower[..interval_lower.len()-1] + .parse() + .map_err(|_| format!("Invalid years format: {}", interval))?; + Ok(years * 365) // Approximate year as 365 days + } else { + // Try to parse as plain number (assume days) + interval.parse() + .map_err(|_| format!("Invalid refresh interval format: {}. Use format like '1d', '1w', '1m', '1y'", interval)) + } +} + +/// Convert days to expires_policy string format +fn days_to_expires_policy(days: i32) -> String { + format!("{}d", days) +} + pub fn use_website_keyword(state: Arc, user: UserSession, engine: &mut Engine) { let state_clone = Arc::clone(&state); - let user_clone = user; + let user_clone = user.clone(); + // Register syntax for USE WEBSITE "url" REFRESH "interval" engine .register_custom_syntax( - ["USE", "WEBSITE", "$expr$"], + ["USE", "WEBSITE", "$expr$", "REFRESH", "$expr$"], false, move |context, inputs| { let url = context.eval_expression_tree(&inputs[0])?; let url_str = url.to_string().trim_matches('"').to_string(); + let refresh = context.eval_expression_tree(&inputs[1])?; + let refresh_str = refresh.to_string().trim_matches('"').to_string(); + trace!( - "USE WEBSITE command executed: {} for session: {}", + "USE WEBSITE command executed: {} REFRESH {} for session: {}", url_str, + refresh_str, user_clone.id ); @@ -35,6 +78,83 @@ pub fn use_website_keyword(state: Arc, user: UserSession, engine: &mut let state_for_task = Arc::clone(&state_clone); let user_for_task = user_clone.clone(); let url_for_task = url_str; + let refresh_for_task = refresh_str; + let (tx, rx) = std::sync::mpsc::channel(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build(); + + let send_err = if let Ok(_rt) = rt { + let result = associate_website_with_session_refresh( + &state_for_task, + &user_for_task, + &url_for_task, + &refresh_for_task, + ); + tx.send(result).err() + } else { + tx.send(Err("Failed to build tokio runtime".to_string())) + .err() + }; + + if send_err.is_some() { + error!("Failed to send result from thread"); + } + }); + + match rx.recv_timeout(std::time::Duration::from_secs(10)) { + Ok(Ok(message)) => Ok(Dynamic::from(message)), + Ok(Err(e)) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + e.into(), + rhai::Position::NONE, + ))), + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "USE WEBSITE timed out".into(), + rhai::Position::NONE, + ))) + } + Err(e) => Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + format!("USE WEBSITE failed: {}", e).into(), + rhai::Position::NONE, + ))), + } + }, + ) + .expect("valid syntax registration"); + + // Register syntax for USE WEBSITE "url" (without REFRESH) + let state_clone2 = Arc::clone(&state); + let user_clone2 = user.clone(); + + engine + .register_custom_syntax( + ["USE", "WEBSITE", "$expr$"], + false, + move |context, inputs| { + let url = context.eval_expression_tree(&inputs[0])?; + let url_str = url.to_string().trim_matches('"').to_string(); + + trace!( + "USE WEBSITE command executed: {} for session: {}", + url_str, + user_clone2.id + ); + + let is_valid = url_str.starts_with("http://") || url_str.starts_with("https://"); + if !is_valid { + return Err(Box::new(rhai::EvalAltResult::ErrorRuntime( + "Invalid URL format. Must start with http:// or https://".into(), + rhai::Position::NONE, + ))); + } + + let state_for_task = Arc::clone(&state_clone2); + let user_for_task = user_clone2.clone(); + let url_for_task = url_str; let (tx, rx) = std::sync::mpsc::channel(); std::thread::spawn(move || { @@ -87,7 +207,16 @@ fn associate_website_with_session( user: &UserSession, url: &str, ) -> Result { - info!("Associating website {} with session {}", url, user.id); + associate_website_with_session_refresh(state, user, url, "1m") // Default: 1 month +} + +fn associate_website_with_session_refresh( + state: &AppState, + user: &UserSession, + url: &str, + refresh_interval: &str, +) -> Result { + info!("Associating website {} with session {} (refresh: {})", url, user.id, refresh_interval); let mut conn = state.conn.get().map_err(|e| format!("DB error: {}", e))?; @@ -97,16 +226,25 @@ fn associate_website_with_session( match website_status { WebsiteCrawlStatus::NotRegistered => { - return Err(format!( - "Website {} has not been registered for crawling. It should be added to the script for preprocessing.", - url + // Auto-register website for crawling instead of failing + info!("Website {} not registered, auto-registering for crawling with refresh: {}", url, refresh_interval); + register_website_for_crawling_with_refresh(&mut conn, &user.bot_id, url, refresh_interval) + .map_err(|e| format!("Failed to register website: {}", e))?; + + return Ok(format!( + "Website {} has been registered for crawling (refresh: {}). It will be available once crawling completes.", + url, refresh_interval )); } WebsiteCrawlStatus::Pending => { info!("Website {} is pending crawl, associating anyway", url); + // Update refresh policy if needed + update_refresh_policy_if_shorter(&mut conn, &user.bot_id, url, refresh_interval)?; } WebsiteCrawlStatus::Crawled => { info!("Website {} is already crawled and ready", url); + // Update refresh policy if needed + update_refresh_policy_if_shorter(&mut conn, &user.bot_id, url, refresh_interval)?; } WebsiteCrawlStatus::Failed => { return Err(format!( @@ -165,26 +303,96 @@ pub fn register_website_for_crawling( bot_id: &Uuid, url: &str, ) -> Result<(), String> { - let expires_policy = "1d"; + register_website_for_crawling_with_refresh(conn, bot_id, url, "1m") // Default: 1 month +} + +pub fn register_website_for_crawling_with_refresh( + conn: &mut PgConnection, + bot_id: &Uuid, + url: &str, + refresh_interval: &str, +) -> Result<(), String> { + let days = parse_refresh_interval(refresh_interval) + .map_err(|e| format!("Invalid refresh interval: {}", e))?; + + let expires_policy = days_to_expires_policy(days); let query = diesel::sql_query( - "INSERT INTO website_crawls (id, bot_id, url, expires_policy, crawl_status, next_crawl) - VALUES (gen_random_uuid(), $1, $2, $3, 0, NOW()) - ON CONFLICT (bot_id, url) DO UPDATE SET next_crawl = - CASE - WHEN website_crawls.crawl_status = 2 THEN NOW() -- Failed, retry now - ELSE website_crawls.next_crawl -- Keep existing schedule - END", + "INSERT INTO website_crawls (id, bot_id, url, expires_policy, crawl_status, next_crawl, refresh_policy) + VALUES (gen_random_uuid(), $1, $2, $3, 0, NOW(), $4) + ON CONFLICT (bot_id, url) DO UPDATE SET + next_crawl = CASE + WHEN website_crawls.crawl_status = 2 THEN NOW() -- Failed, retry now + ELSE website_crawls.next_crawl -- Keep existing schedule + END, + refresh_policy = CASE + WHEN website_crawls.refresh_policy IS NULL THEN $4 + ELSE LEAST(website_crawls.refresh_policy, $4) -- Use shorter interval + END", ) .bind::(bot_id) .bind::(url) - .bind::(expires_policy); + .bind::(expires_policy) + .bind::(refresh_interval); query .execute(conn) .map_err(|e| format!("Failed to register website for crawling: {}", e))?; - info!("Website {} registered for crawling for bot {}", url, bot_id); + info!("Website {} registered for crawling for bot {} with refresh policy: {}", url, bot_id, refresh_interval); + Ok(()) +} + +/// Update refresh policy if the new interval is shorter than the existing one +fn update_refresh_policy_if_shorter( + conn: &mut PgConnection, + bot_id: &Uuid, + url: &str, + refresh_interval: &str, +) -> Result<(), String> { + // Get current record to compare in Rust (no SQL business logic!) + #[derive(QueryableByName)] + struct CurrentRefresh { + #[diesel(sql_type = diesel::sql_types::Nullable)] + refresh_policy: Option, + } + + let current = diesel::sql_query( + "SELECT refresh_policy FROM website_crawls WHERE bot_id = $1 AND url = $2" + ) + .bind::(bot_id) + .bind::(url) + .get_result::(conn) + .ok(); + + let new_days = parse_refresh_interval(refresh_interval) + .map_err(|e| format!("Invalid refresh interval: {}", e))?; + + // Check if we should update (no policy exists or new interval is shorter) + let should_update = match ¤t { + Some(c) if c.refresh_policy.is_some() => { + let existing_days = parse_refresh_interval(c.refresh_policy.as_ref().unwrap()) + .unwrap_or(i32::MAX); + new_days < existing_days + } + _ => true, // No existing policy, so update + }; + + if should_update { + let expires_policy = days_to_expires_policy(new_days); + + diesel::sql_query( + "UPDATE website_crawls SET refresh_policy = $3, expires_policy = $4 + WHERE bot_id = $1 AND url = $2" + ) + .bind::(bot_id) + .bind::(url) + .bind::(refresh_interval) + .bind::(expires_policy) + .execute(conn) + .map_err(|e| format!("Failed to update refresh policy: {}", e))?; + } + Ok(()) } @@ -193,7 +401,16 @@ pub fn execute_use_website_preprocessing( url: &str, bot_id: Uuid, ) -> Result> { - trace!("Preprocessing USE_WEBSITE: {}, bot_id: {:?}", url, bot_id); + execute_use_website_preprocessing_with_refresh(conn, url, bot_id, "1m") // Default: 1 month +} + +pub fn execute_use_website_preprocessing_with_refresh( + conn: &mut PgConnection, + url: &str, + bot_id: Uuid, + refresh_interval: &str, +) -> Result> { + trace!("Preprocessing USE_WEBSITE: {}, bot_id: {:?}, refresh: {}", url, bot_id, refresh_interval); if !url.starts_with("http://") && !url.starts_with("https://") { return Err(format!( @@ -203,12 +420,13 @@ pub fn execute_use_website_preprocessing( .into()); } - register_website_for_crawling(conn, &bot_id, url)?; + register_website_for_crawling_with_refresh(conn, &bot_id, url, refresh_interval)?; Ok(serde_json::json!({ "command": "use_website", "url": url, "bot_id": bot_id.to_string(), + "refresh_policy": refresh_interval, "status": "registered_for_crawling" })) } diff --git a/src/core/kb/website_crawler_service.rs b/src/core/kb/website_crawler_service.rs index 336cc6e55..a2dbafe0b 100644 --- a/src/core/kb/website_crawler_service.rs +++ b/src/core/kb/website_crawler_service.rs @@ -5,6 +5,7 @@ use crate::shared::state::AppState; use crate::shared::utils::DbPool; use diesel::prelude::*; use log::{error, info, warn}; +use regex; use std::sync::Arc; use tokio::time::{interval, Duration}; use uuid::Uuid; @@ -22,7 +23,7 @@ impl WebsiteCrawlerService { Self { db_pool, kb_manager, - check_interval: Duration::from_secs(3600), + check_interval: Duration::from_secs(60), running: Arc::new(tokio::sync::RwLock::new(false)), } } @@ -57,10 +58,13 @@ impl WebsiteCrawlerService { fn check_and_crawl_websites(&self) -> Result<(), Box> { info!("Checking for websites that need recrawling"); + // First, scan for new USE WEBSITE commands in .bas files + self.scan_and_register_websites_from_scripts()?; + let mut conn = self.db_pool.get()?; let websites = diesel::sql_query( - "SELECT id, bot_id, url, expires_policy, max_depth, max_pages + "SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages FROM website_crawls WHERE next_crawl <= NOW() AND crawl_status != 2 @@ -116,6 +120,7 @@ impl WebsiteCrawlerService { max_pages: website_max_pages, crawl_delay_ms: 500, expires_policy: website.expires_policy.clone(), + refresh_policy: website.refresh_policy.clone(), last_crawled: None, next_crawl: None, }; @@ -207,6 +212,103 @@ impl WebsiteCrawlerService { Ok(()) } + + fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box> { + info!("Scanning .bas files for USE WEBSITE commands"); + + let work_dir = std::path::Path::new("work"); + if !work_dir.exists() { + return Ok(()); + } + + let mut conn = self.db_pool.get()?; + + for entry in std::fs::read_dir(work_dir)? { + let entry = entry?; + let path = entry.path(); + + if path.is_dir() && path.file_name().unwrap().to_string_lossy().ends_with(".gbai") { + let bot_name = path.file_name().unwrap().to_string_lossy().replace(".gbai", ""); + + // Get bot_id from database + #[derive(QueryableByName)] + struct BotIdResult { + #[diesel(sql_type = diesel::sql_types::Uuid)] + id: uuid::Uuid, + } + + let bot_id_result: Result = diesel::sql_query("SELECT id FROM bots WHERE name = $1") + .bind::(&bot_name) + .get_result(&mut conn); + + let bot_id = match bot_id_result { + Ok(result) => result.id, + Err(_) => continue, // Skip if bot not found + }; + + // Scan .gbdialog directory for .bas files + let dialog_dir = path.join(format!("{}.gbdialog", bot_name)); + if dialog_dir.exists() { + self.scan_directory_for_websites(&dialog_dir, bot_id, &mut conn)?; + } + } + } + + Ok(()) + } + + fn scan_directory_for_websites( + &self, + dir: &std::path::Path, + bot_id: uuid::Uuid, + conn: &mut diesel::PgConnection, + ) -> Result<(), Box> { + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if path.extension().map_or(false, |ext| ext == "bas") { + let content = std::fs::read_to_string(&path)?; + + // Regex to find USE WEBSITE commands with optional REFRESH parameter + let re = regex::Regex::new(r#"USE\s+WEBSITE\s+"([^"]+)"(?:\s+REFRESH\s+"([^"]+)")?"#)?; + + for cap in re.captures_iter(&content) { + if let Some(url) = cap.get(1) { + let url_str = url.as_str(); + let refresh_str = cap.get(2).map(|m| m.as_str()).unwrap_or("1m"); + + // Check if already registered + let exists = diesel::sql_query( + "SELECT COUNT(*) as count FROM website_crawls WHERE bot_id = $1 AND url = $2" + ) + .bind::(&bot_id) + .bind::(url_str) + .get_result::(conn) + .map(|r| r.count) + .unwrap_or(0); + + if exists == 0 { + info!("Auto-registering website {} for bot {} with refresh: {}", url_str, bot_id, refresh_str); + + // Register website for crawling with refresh policy + crate::basic::keywords::use_website::register_website_for_crawling_with_refresh( + conn, &bot_id, url_str, refresh_str + )?; + } + } + } + } + } + + Ok(()) + } +} + +#[derive(QueryableByName)] +struct CountResult { + #[diesel(sql_type = diesel::sql_types::BigInt)] + count: i64, } #[derive(QueryableByName, Debug)] @@ -219,6 +321,8 @@ struct WebsiteCrawlRecord { url: String, #[diesel(sql_type = diesel::sql_types::Text)] expires_policy: String, + #[diesel(sql_type = diesel::sql_types::Nullable)] + refresh_policy: Option, #[diesel(sql_type = diesel::sql_types::Integer)] max_depth: i32, #[diesel(sql_type = diesel::sql_types::Integer)]