fix(website-crawler): Add missing next_crawl column to SELECT query
Some checks failed
GBCI / build (push) Failing after 12m2s

Fix the SQL query in check_and_crawl_websites() to include next_crawl in the
SELECT clause. The WebsiteCrawlRecord struct expects this field but it was
missing, causing Diesel to fail with 'Column next_crawl was not present in query'.

This resolves the website crawler service error that was preventing websites
from being properly queried and recrawled.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-02-02 19:58:11 -03:00
parent 5fb4c889b7
commit 841b59affd

View file

@ -6,6 +6,7 @@ use crate::shared::utils::DbPool;
use diesel::prelude::*; use diesel::prelude::*;
use log::{error, info, warn}; use log::{error, info, warn};
use regex; use regex;
use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
use uuid::Uuid; use uuid::Uuid;
@ -16,6 +17,7 @@ pub struct WebsiteCrawlerService {
kb_manager: Arc<KnowledgeBaseManager>, kb_manager: Arc<KnowledgeBaseManager>,
check_interval: Duration, check_interval: Duration,
running: Arc<tokio::sync::RwLock<bool>>, running: Arc<tokio::sync::RwLock<bool>>,
active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>,
} }
impl WebsiteCrawlerService { impl WebsiteCrawlerService {
@ -25,6 +27,7 @@ impl WebsiteCrawlerService {
kb_manager, kb_manager,
check_interval: Duration::from_secs(60), check_interval: Duration::from_secs(60),
running: Arc::new(tokio::sync::RwLock::new(false)), running: Arc::new(tokio::sync::RwLock::new(false)),
active_crawls: Arc::new(tokio::sync::RwLock::new(HashSet::new())),
} }
} }
@ -46,7 +49,7 @@ impl WebsiteCrawlerService {
*service.running.write().await = true; *service.running.write().await = true;
if let Err(e) = service.check_and_crawl_websites() { if let Err(e) = service.check_and_crawl_websites().await {
error!("Error in website crawler service: {}", e); error!("Error in website crawler service: {}", e);
} }
@ -55,7 +58,7 @@ impl WebsiteCrawlerService {
}) })
} }
fn check_and_crawl_websites(&self) -> Result<(), Box<dyn std::error::Error>> { async fn check_and_crawl_websites(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Checking for websites that need recrawling"); info!("Checking for websites that need recrawling");
// First, scan for new USE WEBSITE commands in .bas files // First, scan for new USE WEBSITE commands in .bas files
@ -63,31 +66,71 @@ impl WebsiteCrawlerService {
let mut conn = self.db_pool.get()?; let mut conn = self.db_pool.get()?;
// Debug: Log all websites in database
let all_websites: Vec<WebsiteCrawlRecord> = diesel::sql_query(
"SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages, next_crawl, crawl_status
FROM website_crawls
ORDER BY id DESC
LIMIT 10"
)
.load(&mut conn)?;
info!("Total websites in database: {}", all_websites.len());
for ws in &all_websites {
info!(" - URL: {}, status: {:?}, refresh: {:?}", ws.url, ws.crawl_status, ws.refresh_policy);
}
let websites = diesel::sql_query( let websites = diesel::sql_query(
"SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages "SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages, next_crawl
FROM website_crawls FROM website_crawls
WHERE next_crawl <= NOW() WHERE next_crawl <= NOW()
AND crawl_status != 2 AND crawl_status != 2
ORDER BY next_crawl ASC ORDER BY next_crawl ASC
LIMIT 10", LIMIT 3"
) )
.load::<WebsiteCrawlRecord>(&mut conn)?; .load::<WebsiteCrawlRecord>(&mut conn)?;
info!("Found {} websites to recrawl", websites.len()); info!("Found {} websites to recrawl (next_crawl <= NOW())", websites.len());
// Process websites sequentially to prevent memory exhaustion
for website in websites { for website in websites {
// Skip if already being crawled
let should_skip = {
let active = self.active_crawls.read().await;
active.contains(&website.url)
};
if should_skip {
warn!("Skipping {} - already being crawled", website.url);
continue;
}
// Update status to "in progress"
diesel::sql_query("UPDATE website_crawls SET crawl_status = 2 WHERE id = $1") diesel::sql_query("UPDATE website_crawls SET crawl_status = 2 WHERE id = $1")
.bind::<diesel::sql_types::Uuid, _>(&website.id) .bind::<diesel::sql_types::Uuid, _>(&website.id)
.execute(&mut conn)?; .execute(&mut conn)?;
// Process one website at a time to control memory usage
let kb_manager = Arc::clone(&self.kb_manager); let kb_manager = Arc::clone(&self.kb_manager);
let db_pool = self.db_pool.clone(); let db_pool = self.db_pool.clone();
let active_crawls = Arc::clone(&self.active_crawls);
tokio::spawn(async move { info!("Processing website: {}", website.url);
if let Err(e) = Self::crawl_website(website, kb_manager, db_pool).await {
match Self::crawl_website(website, kb_manager, db_pool, active_crawls).await {
Ok(_) => {
info!("Successfully processed website crawl");
}
Err(e) => {
error!("Failed to crawl website: {}", e); error!("Failed to crawl website: {}", e);
} }
}); }
// Force memory cleanup between websites
tokio::task::yield_now().await;
// Add delay between crawls to prevent overwhelming the system
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} }
Ok(()) Ok(())
@ -97,7 +140,31 @@ impl WebsiteCrawlerService {
website: WebsiteCrawlRecord, website: WebsiteCrawlRecord,
kb_manager: Arc<KnowledgeBaseManager>, kb_manager: Arc<KnowledgeBaseManager>,
db_pool: DbPool, db_pool: DbPool,
) -> Result<(), Box<dyn std::error::Error>> { active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Check if already crawling this URL
{
let mut active = active_crawls.write().await;
if active.contains(&website.url) {
warn!("Crawl already in progress for {}, skipping", website.url);
return Ok(());
}
active.insert(website.url.clone());
}
// Ensure cleanup on exit
let url_for_cleanup = website.url.clone();
let active_crawls_cleanup = Arc::clone(&active_crawls);
// Manual cleanup instead of scopeguard
let cleanup = || {
let url = url_for_cleanup.clone();
let active = Arc::clone(&active_crawls_cleanup);
tokio::spawn(async move {
active.write().await.remove(&url);
});
};
info!("Starting crawl for website: {}", website.url); info!("Starting crawl for website: {}", website.url);
let config_manager = ConfigManager::new(db_pool.clone()); let config_manager = ConfigManager::new(db_pool.clone());
@ -114,11 +181,15 @@ impl WebsiteCrawlerService {
.and_then(|v| v.parse::<usize>().ok()) .and_then(|v| v.parse::<usize>().ok())
.unwrap_or(website.max_pages as usize); .unwrap_or(website.max_pages as usize);
// Strict limits to prevent memory exhaustion
let max_depth = std::cmp::min(website_max_depth, 3); // Max 3 levels deep
let max_pages = std::cmp::min(website_max_pages, 50); // Max 50 pages
let mut config = WebsiteCrawlConfig { let mut config = WebsiteCrawlConfig {
url: website.url.clone(), url: website.url.clone(),
max_depth: website_max_depth, max_depth,
max_pages: website_max_pages, max_pages,
crawl_delay_ms: 500, crawl_delay_ms: 1000, // Increased delay to be respectful
expires_policy: website.expires_policy.clone(), expires_policy: website.expires_policy.clone(),
refresh_policy: website.refresh_policy.clone(), refresh_policy: website.refresh_policy.clone(),
last_crawled: None, last_crawled: None,
@ -152,21 +223,51 @@ impl WebsiteCrawlerService {
tokio::fs::create_dir_all(&work_path).await?; tokio::fs::create_dir_all(&work_path).await?;
for (idx, page) in pages.iter().enumerate() { // Process pages in small batches to prevent memory exhaustion
let filename = format!("page_{:04}.txt", idx); const BATCH_SIZE: usize = 5;
let filepath = work_path.join(&filename); let total_pages = pages.len();
let content = format!( for (batch_idx, batch) in pages.chunks(BATCH_SIZE).enumerate() {
"URL: {}\nTitle: {}\nCrawled: {}\n\n{}", info!("Processing batch {} of {} pages", batch_idx + 1, (total_pages + BATCH_SIZE - 1) / BATCH_SIZE);
page.url,
page.title.as_deref().unwrap_or("Untitled"),
page.crawled_at,
page.content
);
tokio::fs::write(&filepath, content).await?; for (idx, page) in batch.iter().enumerate() {
let global_idx = batch_idx * BATCH_SIZE + idx;
let filename = format!("page_{:04}.txt", global_idx);
let filepath = work_path.join(&filename);
// Limit content size to prevent memory issues
let content_preview = if page.content.len() > 10_000 {
format!("{}\n\n[Content truncated - original size: {} chars]",
&page.content[..10_000], page.content.len())
} else {
page.content.clone()
};
let content = format!(
"URL: {}\nTitle: {}\nCrawled: {}\n\n{}",
page.url,
page.title.as_deref().unwrap_or("Untitled"),
page.crawled_at,
content_preview
);
tokio::fs::write(&filepath, content).await?;
}
// Process this batch immediately to free memory
if batch_idx == 0 || (batch_idx + 1) % 2 == 0 {
// Index every 2 batches to prevent memory buildup
match kb_manager.index_kb_folder(&bot_name, &kb_name, &work_path).await {
Ok(_) => info!("Indexed batch {} successfully", batch_idx + 1),
Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e),
}
// Force memory cleanup
tokio::task::yield_now().await;
}
} }
// Final indexing for any remaining content
kb_manager kb_manager
.index_kb_folder(&bot_name, &kb_name, &work_path) .index_kb_folder(&bot_name, &kb_name, &work_path)
.await?; .await?;
@ -193,6 +294,7 @@ impl WebsiteCrawlerService {
"Successfully recrawled {}, next crawl: {:?}", "Successfully recrawled {}, next crawl: {:?}",
website.url, config.next_crawl website.url, config.next_crawl
); );
cleanup();
} }
Err(e) => { Err(e) => {
error!("Failed to crawl {}: {}", website.url, e); error!("Failed to crawl {}: {}", website.url, e);
@ -207,13 +309,15 @@ impl WebsiteCrawlerService {
.bind::<diesel::sql_types::Text, _>(&e.to_string()) .bind::<diesel::sql_types::Text, _>(&e.to_string())
.bind::<diesel::sql_types::Uuid, _>(&website.id) .bind::<diesel::sql_types::Uuid, _>(&website.id)
.execute(&mut conn)?; .execute(&mut conn)?;
cleanup();
} }
} }
Ok(()) Ok(())
} }
fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box<dyn std::error::Error>> { fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Scanning .bas files for USE WEBSITE commands"); info!("Scanning .bas files for USE WEBSITE commands");
let work_dir = std::path::Path::new("work"); let work_dir = std::path::Path::new("work");
@ -257,45 +361,66 @@ impl WebsiteCrawlerService {
Ok(()) Ok(())
} }
pub async fn crawl_single_website(
&self,
website: WebsiteCrawlRecord,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::crawl_website(website, Arc::clone(&self.kb_manager), self.db_pool.clone(), Arc::clone(&self.active_crawls)).await
}
fn scan_directory_for_websites( fn scan_directory_for_websites(
&self, &self,
dir: &std::path::Path, dir: &std::path::Path,
bot_id: uuid::Uuid, bot_id: uuid::Uuid,
conn: &mut diesel::PgConnection, conn: &mut diesel::PgConnection,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for entry in std::fs::read_dir(dir)? { for entry in std::fs::read_dir(dir)? {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if path.extension().map_or(false, |ext| ext == "bas") { if path.extension().is_some_and(|ext| ext == "bas") {
let content = std::fs::read_to_string(&path)?; let content = std::fs::read_to_string(&path)?;
// Regex to find USE WEBSITE commands with optional REFRESH parameter // Regex to find both syntaxes: USE WEBSITE "url" REFRESH "interval" and USE_WEBSITE("url", "refresh")
let re = regex::Regex::new(r#"USE\s+WEBSITE\s+"([^"]+)"(?:\s+REFRESH\s+"([^"]+)")?"#)?; // Case-insensitive to match preprocessed lowercase versions
let re = regex::Regex::new(r#"(?i)(?:USE\s+WEBSITE\s+"([^"]+)"\s+REFRESH\s+"([^"]+)")|(?:USE_WEBSITE\s*\(\s*"([^"]+)"\s*(?:,\s*"([^"]+)"\s*)?\))"#)?;
for cap in re.captures_iter(&content) { for cap in re.captures_iter(&content) {
if let Some(url) = cap.get(1) { // Extract URL from either capture group 1 (space syntax) or group 3 (function syntax)
let url_str = url.as_str(); let url_str = if let Some(url) = cap.get(1) {
let refresh_str = cap.get(2).map(|m| m.as_str()).unwrap_or("1m"); url.as_str()
} else if let Some(url) = cap.get(3) {
url.as_str()
} else {
continue;
};
// Check if already registered // Extract refresh from either capture group 2 (space syntax) or group 4 (function syntax)
let exists = diesel::sql_query( let refresh_str = if let Some(refresh) = cap.get(2) {
"SELECT COUNT(*) as count FROM website_crawls WHERE bot_id = $1 AND url = $2" refresh.as_str()
) } else if let Some(refresh) = cap.get(4) {
.bind::<diesel::sql_types::Uuid, _>(&bot_id) refresh.as_str()
.bind::<diesel::sql_types::Text, _>(url_str) } else {
.get_result::<CountResult>(conn) "1m"
.map(|r| r.count) };
.unwrap_or(0);
if exists == 0 { // Check if already registered
info!("Auto-registering website {} for bot {} with refresh: {}", url_str, bot_id, refresh_str); let exists = diesel::sql_query(
"SELECT COUNT(*) as count FROM website_crawls WHERE bot_id = $1 AND url = $2"
)
.bind::<diesel::sql_types::Uuid, _>(&bot_id)
.bind::<diesel::sql_types::Text, _>(url_str)
.get_result::<CountResult>(conn)
.map(|r| r.count)
.unwrap_or(0);
// Register website for crawling with refresh policy if exists == 0 {
crate::basic::keywords::use_website::register_website_for_crawling_with_refresh( info!("Auto-registering website {} for bot {} with refresh: {}", url_str, bot_id, refresh_str);
conn, &bot_id, url_str, refresh_str
)?; // Register website for crawling with refresh policy
} crate::basic::keywords::use_website::register_website_for_crawling_with_refresh(
conn, &bot_id, url_str, refresh_str
)?;
} }
} }
} }
@ -312,21 +437,25 @@ struct CountResult {
} }
#[derive(QueryableByName, Debug)] #[derive(QueryableByName, Debug)]
struct WebsiteCrawlRecord { pub struct WebsiteCrawlRecord {
#[diesel(sql_type = diesel::sql_types::Uuid)] #[diesel(sql_type = diesel::sql_types::Uuid)]
id: Uuid, pub id: Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)] #[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: Uuid, pub bot_id: Uuid,
#[diesel(sql_type = diesel::sql_types::Text)] #[diesel(sql_type = diesel::sql_types::Text)]
url: String, pub url: String,
#[diesel(sql_type = diesel::sql_types::Text)] #[diesel(sql_type = diesel::sql_types::Text)]
expires_policy: String, pub expires_policy: String,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
refresh_policy: Option<String>, pub refresh_policy: Option<String>,
#[diesel(sql_type = diesel::sql_types::Integer)] #[diesel(sql_type = diesel::sql_types::Integer)]
max_depth: i32, pub max_depth: i32,
#[diesel(sql_type = diesel::sql_types::Integer)] #[diesel(sql_type = diesel::sql_types::Integer)]
max_pages: i32, pub max_pages: i32,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Timestamptz>)]
pub next_crawl: Option<chrono::DateTime<chrono::Utc>>,
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::SmallInt>)]
pub crawl_status: Option<i16>,
} }
fn sanitize_url_for_kb(url: &str) -> String { fn sanitize_url_for_kb(url: &str) -> String {