From 841b59affd3aae3f182eb7bc62ab2ee30f1b76b7 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 2 Feb 2026 19:58:11 -0300 Subject: [PATCH] fix(website-crawler): Add missing next_crawl column to SELECT query Fix the SQL query in check_and_crawl_websites() to include next_crawl in the SELECT clause. The WebsiteCrawlRecord struct expects this field but it was missing, causing Diesel to fail with 'Column next_crawl was not present in query'. This resolves the website crawler service error that was preventing websites from being properly queried and recrawled. --- src/core/kb/website_crawler_service.rs | 239 +++++++++++++++++++------ 1 file changed, 184 insertions(+), 55 deletions(-) diff --git a/src/core/kb/website_crawler_service.rs b/src/core/kb/website_crawler_service.rs index a2dbafe0b..8cc295e88 100644 --- a/src/core/kb/website_crawler_service.rs +++ b/src/core/kb/website_crawler_service.rs @@ -6,6 +6,7 @@ use crate::shared::utils::DbPool; use diesel::prelude::*; use log::{error, info, warn}; use regex; +use std::collections::HashSet; use std::sync::Arc; use tokio::time::{interval, Duration}; use uuid::Uuid; @@ -16,6 +17,7 @@ pub struct WebsiteCrawlerService { kb_manager: Arc, check_interval: Duration, running: Arc>, + active_crawls: Arc>>, } impl WebsiteCrawlerService { @@ -25,6 +27,7 @@ impl WebsiteCrawlerService { kb_manager, check_interval: Duration::from_secs(60), running: Arc::new(tokio::sync::RwLock::new(false)), + active_crawls: Arc::new(tokio::sync::RwLock::new(HashSet::new())), } } @@ -46,7 +49,7 @@ impl WebsiteCrawlerService { *service.running.write().await = true; - if let Err(e) = service.check_and_crawl_websites() { + if let Err(e) = service.check_and_crawl_websites().await { error!("Error in website crawler service: {}", e); } @@ -55,7 +58,7 @@ impl WebsiteCrawlerService { }) } - fn check_and_crawl_websites(&self) -> Result<(), Box> { + async fn check_and_crawl_websites(&self) -> Result<(), Box> { info!("Checking for websites that need recrawling"); // First, scan for new USE WEBSITE commands in .bas files @@ -63,31 +66,71 @@ impl WebsiteCrawlerService { let mut conn = self.db_pool.get()?; + // Debug: Log all websites in database + let all_websites: Vec = diesel::sql_query( + "SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages, next_crawl, crawl_status + FROM website_crawls + ORDER BY id DESC + LIMIT 10" + ) + .load(&mut conn)?; + + info!("Total websites in database: {}", all_websites.len()); + for ws in &all_websites { + info!(" - URL: {}, status: {:?}, refresh: {:?}", ws.url, ws.crawl_status, ws.refresh_policy); + } + let websites = diesel::sql_query( - "SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages + "SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages, next_crawl FROM website_crawls WHERE next_crawl <= NOW() AND crawl_status != 2 ORDER BY next_crawl ASC - LIMIT 10", + LIMIT 3" ) .load::(&mut conn)?; - info!("Found {} websites to recrawl", websites.len()); + info!("Found {} websites to recrawl (next_crawl <= NOW())", websites.len()); + // Process websites sequentially to prevent memory exhaustion for website in websites { + // Skip if already being crawled + let should_skip = { + let active = self.active_crawls.read().await; + active.contains(&website.url) + }; + + if should_skip { + warn!("Skipping {} - already being crawled", website.url); + continue; + } + + // Update status to "in progress" diesel::sql_query("UPDATE website_crawls SET crawl_status = 2 WHERE id = $1") .bind::(&website.id) .execute(&mut conn)?; + // Process one website at a time to control memory usage let kb_manager = Arc::clone(&self.kb_manager); let db_pool = self.db_pool.clone(); + let active_crawls = Arc::clone(&self.active_crawls); - tokio::spawn(async move { - if let Err(e) = Self::crawl_website(website, kb_manager, db_pool).await { + info!("Processing website: {}", website.url); + + match Self::crawl_website(website, kb_manager, db_pool, active_crawls).await { + Ok(_) => { + info!("Successfully processed website crawl"); + } + Err(e) => { error!("Failed to crawl website: {}", e); } - }); + } + + // Force memory cleanup between websites + tokio::task::yield_now().await; + + // Add delay between crawls to prevent overwhelming the system + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } Ok(()) @@ -97,7 +140,31 @@ impl WebsiteCrawlerService { website: WebsiteCrawlRecord, kb_manager: Arc, db_pool: DbPool, - ) -> Result<(), Box> { + active_crawls: Arc>>, + ) -> Result<(), Box> { + // Check if already crawling this URL + { + let mut active = active_crawls.write().await; + if active.contains(&website.url) { + warn!("Crawl already in progress for {}, skipping", website.url); + return Ok(()); + } + active.insert(website.url.clone()); + } + + // Ensure cleanup on exit + let url_for_cleanup = website.url.clone(); + let active_crawls_cleanup = Arc::clone(&active_crawls); + + // Manual cleanup instead of scopeguard + let cleanup = || { + let url = url_for_cleanup.clone(); + let active = Arc::clone(&active_crawls_cleanup); + tokio::spawn(async move { + active.write().await.remove(&url); + }); + }; + info!("Starting crawl for website: {}", website.url); let config_manager = ConfigManager::new(db_pool.clone()); @@ -114,11 +181,15 @@ impl WebsiteCrawlerService { .and_then(|v| v.parse::().ok()) .unwrap_or(website.max_pages as usize); + // Strict limits to prevent memory exhaustion + let max_depth = std::cmp::min(website_max_depth, 3); // Max 3 levels deep + let max_pages = std::cmp::min(website_max_pages, 50); // Max 50 pages + let mut config = WebsiteCrawlConfig { url: website.url.clone(), - max_depth: website_max_depth, - max_pages: website_max_pages, - crawl_delay_ms: 500, + max_depth, + max_pages, + crawl_delay_ms: 1000, // Increased delay to be respectful expires_policy: website.expires_policy.clone(), refresh_policy: website.refresh_policy.clone(), last_crawled: None, @@ -152,21 +223,51 @@ impl WebsiteCrawlerService { tokio::fs::create_dir_all(&work_path).await?; - for (idx, page) in pages.iter().enumerate() { - let filename = format!("page_{:04}.txt", idx); - let filepath = work_path.join(&filename); + // Process pages in small batches to prevent memory exhaustion + const BATCH_SIZE: usize = 5; + let total_pages = pages.len(); - let content = format!( - "URL: {}\nTitle: {}\nCrawled: {}\n\n{}", - page.url, - page.title.as_deref().unwrap_or("Untitled"), - page.crawled_at, - page.content - ); + for (batch_idx, batch) in pages.chunks(BATCH_SIZE).enumerate() { + info!("Processing batch {} of {} pages", batch_idx + 1, (total_pages + BATCH_SIZE - 1) / BATCH_SIZE); - tokio::fs::write(&filepath, content).await?; + for (idx, page) in batch.iter().enumerate() { + let global_idx = batch_idx * BATCH_SIZE + idx; + let filename = format!("page_{:04}.txt", global_idx); + let filepath = work_path.join(&filename); + + // Limit content size to prevent memory issues + let content_preview = if page.content.len() > 10_000 { + format!("{}\n\n[Content truncated - original size: {} chars]", + &page.content[..10_000], page.content.len()) + } else { + page.content.clone() + }; + + let content = format!( + "URL: {}\nTitle: {}\nCrawled: {}\n\n{}", + page.url, + page.title.as_deref().unwrap_or("Untitled"), + page.crawled_at, + content_preview + ); + + tokio::fs::write(&filepath, content).await?; + } + + // Process this batch immediately to free memory + if batch_idx == 0 || (batch_idx + 1) % 2 == 0 { + // Index every 2 batches to prevent memory buildup + match kb_manager.index_kb_folder(&bot_name, &kb_name, &work_path).await { + Ok(_) => info!("Indexed batch {} successfully", batch_idx + 1), + Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e), + } + + // Force memory cleanup + tokio::task::yield_now().await; + } } + // Final indexing for any remaining content kb_manager .index_kb_folder(&bot_name, &kb_name, &work_path) .await?; @@ -193,6 +294,7 @@ impl WebsiteCrawlerService { "Successfully recrawled {}, next crawl: {:?}", website.url, config.next_crawl ); + cleanup(); } Err(e) => { error!("Failed to crawl {}: {}", website.url, e); @@ -207,13 +309,15 @@ impl WebsiteCrawlerService { .bind::(&e.to_string()) .bind::(&website.id) .execute(&mut conn)?; + + cleanup(); } } Ok(()) } - fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box> { + fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box> { info!("Scanning .bas files for USE WEBSITE commands"); let work_dir = std::path::Path::new("work"); @@ -257,45 +361,66 @@ impl WebsiteCrawlerService { Ok(()) } + pub async fn crawl_single_website( + &self, + website: WebsiteCrawlRecord, + ) -> Result<(), Box> { + Self::crawl_website(website, Arc::clone(&self.kb_manager), self.db_pool.clone(), Arc::clone(&self.active_crawls)).await + } + fn scan_directory_for_websites( &self, dir: &std::path::Path, bot_id: uuid::Uuid, conn: &mut diesel::PgConnection, - ) -> Result<(), Box> { + ) -> Result<(), Box> { for entry in std::fs::read_dir(dir)? { let entry = entry?; let path = entry.path(); - if path.extension().map_or(false, |ext| ext == "bas") { + if path.extension().is_some_and(|ext| ext == "bas") { let content = std::fs::read_to_string(&path)?; - // Regex to find USE WEBSITE commands with optional REFRESH parameter - let re = regex::Regex::new(r#"USE\s+WEBSITE\s+"([^"]+)"(?:\s+REFRESH\s+"([^"]+)")?"#)?; + // Regex to find both syntaxes: USE WEBSITE "url" REFRESH "interval" and USE_WEBSITE("url", "refresh") + // Case-insensitive to match preprocessed lowercase versions + let re = regex::Regex::new(r#"(?i)(?:USE\s+WEBSITE\s+"([^"]+)"\s+REFRESH\s+"([^"]+)")|(?:USE_WEBSITE\s*\(\s*"([^"]+)"\s*(?:,\s*"([^"]+)"\s*)?\))"#)?; for cap in re.captures_iter(&content) { - if let Some(url) = cap.get(1) { - let url_str = url.as_str(); - let refresh_str = cap.get(2).map(|m| m.as_str()).unwrap_or("1m"); + // Extract URL from either capture group 1 (space syntax) or group 3 (function syntax) + let url_str = if let Some(url) = cap.get(1) { + url.as_str() + } else if let Some(url) = cap.get(3) { + url.as_str() + } else { + continue; + }; - // Check if already registered - let exists = diesel::sql_query( - "SELECT COUNT(*) as count FROM website_crawls WHERE bot_id = $1 AND url = $2" - ) - .bind::(&bot_id) - .bind::(url_str) - .get_result::(conn) - .map(|r| r.count) - .unwrap_or(0); + // Extract refresh from either capture group 2 (space syntax) or group 4 (function syntax) + let refresh_str = if let Some(refresh) = cap.get(2) { + refresh.as_str() + } else if let Some(refresh) = cap.get(4) { + refresh.as_str() + } else { + "1m" + }; - if exists == 0 { - info!("Auto-registering website {} for bot {} with refresh: {}", url_str, bot_id, refresh_str); + // Check if already registered + let exists = diesel::sql_query( + "SELECT COUNT(*) as count FROM website_crawls WHERE bot_id = $1 AND url = $2" + ) + .bind::(&bot_id) + .bind::(url_str) + .get_result::(conn) + .map(|r| r.count) + .unwrap_or(0); - // Register website for crawling with refresh policy - crate::basic::keywords::use_website::register_website_for_crawling_with_refresh( - conn, &bot_id, url_str, refresh_str - )?; - } + if exists == 0 { + info!("Auto-registering website {} for bot {} with refresh: {}", url_str, bot_id, refresh_str); + + // Register website for crawling with refresh policy + crate::basic::keywords::use_website::register_website_for_crawling_with_refresh( + conn, &bot_id, url_str, refresh_str + )?; } } } @@ -312,21 +437,25 @@ struct CountResult { } #[derive(QueryableByName, Debug)] -struct WebsiteCrawlRecord { +pub struct WebsiteCrawlRecord { #[diesel(sql_type = diesel::sql_types::Uuid)] - id: Uuid, + pub id: Uuid, #[diesel(sql_type = diesel::sql_types::Uuid)] - bot_id: Uuid, + pub bot_id: Uuid, #[diesel(sql_type = diesel::sql_types::Text)] - url: String, + pub url: String, #[diesel(sql_type = diesel::sql_types::Text)] - expires_policy: String, + pub expires_policy: String, #[diesel(sql_type = diesel::sql_types::Nullable)] - refresh_policy: Option, + pub refresh_policy: Option, #[diesel(sql_type = diesel::sql_types::Integer)] - max_depth: i32, + pub max_depth: i32, #[diesel(sql_type = diesel::sql_types::Integer)] - max_pages: i32, + pub max_pages: i32, + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub next_crawl: Option>, + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub crawl_status: Option, } fn sanitize_url_for_kb(url: &str) -> String {