diff --git a/src/core/shared/schema/drive.rs b/src/core/shared/schema/drive.rs index fc06b425..de8c727e 100644 --- a/src/core/shared/schema/drive.rs +++ b/src/core/shared/schema/drive.rs @@ -2,47 +2,6 @@ use chrono::{DateTime, Utc}; use diesel::prelude::*; use uuid::Uuid; -#[derive(Queryable, Insertable, AsChangeset, Debug, Clone)] -#[diesel(table_name = drive_files)] -pub struct DriveFile { - pub id: Uuid, - pub bot_id: Uuid, - pub file_path: String, - pub file_type: String, - pub etag: Option, - pub last_modified: Option>, - pub file_size: Option, - pub indexed: bool, - pub fail_count: i32, - pub last_failed_at: Option>, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Insertable, Debug)] -#[diesel(table_name = drive_files)] -pub struct NewDriveFile { - pub bot_id: Uuid, - pub file_path: String, - pub file_type: String, - pub etag: Option, - pub last_modified: Option>, - pub file_size: Option, - pub indexed: bool, -} - -#[derive(AsChangeset, Debug)] -#[diesel(table_name = drive_files)] -pub struct DriveFileUpdate { - pub etag: Option, - pub last_modified: Option>, - pub file_size: Option, - pub indexed: Option, - pub fail_count: Option, - pub last_failed_at: Option>, - pub updated_at: DateTime, -} - diesel::table! { drive_files (id) { id -> Uuid, @@ -59,3 +18,47 @@ diesel::table! { updated_at -> Timestamptz, } } + +// Query-only struct (no defaults needed) +#[derive(Queryable, Debug, Clone)] +pub struct DriveFile { + pub id: Uuid, + pub bot_id: Uuid, + pub file_path: String, + pub file_type: String, + pub etag: Option, + pub last_modified: Option>, + pub file_size: Option, + pub indexed: bool, + pub fail_count: i32, + pub last_failed_at: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +// Insert struct - uses diesel defaults +#[derive(Insertable, Debug)] +#[diesel(table_name = drive_files)] +pub struct NewDriveFile { + pub bot_id: Uuid, + pub file_path: String, + pub file_type: String, + pub etag: Option, + pub last_modified: Option>, + pub file_size: Option, + pub indexed: Option, + pub fail_count: Option, +} + +// Update struct +#[derive(AsChangeset, Debug)] +#[diesel(table_name = drive_files)] +pub struct DriveFileUpdate { + pub etag: Option, + pub last_modified: Option>, + pub file_size: Option, + pub indexed: Option, + pub fail_count: Option, + pub last_failed_at: Option>, + pub updated_at: DateTime, +} diff --git a/src/drive/drive_files.rs b/src/drive/drive_files.rs new file mode 100644 index 00000000..2e57ac15 --- /dev/null +++ b/src/drive/drive_files.rs @@ -0,0 +1,174 @@ +use crate::core::shared::DbPool; +use chrono::{DateTime, Utc}; +use diesel::dsl::{max, sql}; +use diesel::prelude::*; +use uuid::Uuid; + +diesel::table! { + drive_files (id) { + id -> Uuid, + bot_id -> Uuid, + file_path -> Text, + file_type -> Varchar, + etag -> Nullable, + last_modified -> Nullable, + file_size -> Nullable, + indexed -> Bool, + fail_count -> Int4, + last_failed_at -> Nullable, + created_at -> Timestamptz, + updated_at -> Timestamptz, + } +} + +pub mod dsl { + pub use super::drive_files::*; +} + +#[derive(Queryable, Debug, Clone)] +pub struct DriveFile { + pub id: Uuid, + pub bot_id: Uuid, + pub file_path: String, + pub file_type: String, + pub etag: Option, + pub last_modified: Option>, + pub file_size: Option, + pub indexed: bool, + pub fail_count: i32, + pub last_failed_at: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +pub struct DriveFileRepository { + pool: DbPool, +} + +impl DriveFileRepository { + pub fn new(pool: DbPool) -> Self { + Self { pool } + } + + pub fn get_file_state(&self, bot_id: Uuid, file_path: &str) -> Option { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return None, + }; + + drive_files::table + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.eq(file_path)), + ) + .first(&mut conn) + .ok() + } + + pub fn upsert_file( + &self, + bot_id: Uuid, + file_path: &str, + file_type: &str, + etag: Option, + last_modified: Option>, + ) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + let now = Utc::now(); + + diesel::insert_into(drive_files::table) + .values(( + drive_files::bot_id.eq(bot_id), + drive_files::file_path.eq(file_path), + drive_files::file_type.eq(file_type), + drive_files::etag.eq(etag), + drive_files::last_modified.eq(last_modified), + drive_files::indexed.eq(false), + drive_files::fail_count.eq(0), + drive_files::created_at.eq(now), + drive_files::updated_at.eq(now), + )) + .on_conflict((drive_files::bot_id, drive_files::file_path)) + .do_update() + .set(( + drive_files::etag.eq(etag), + drive_files::last_modified.eq(last_modified), + drive_files::updated_at.eq(now), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + pub fn mark_indexed(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::update(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.eq(file_path)), + ) + .set(( + drive_files::indexed.eq(true), + drive_files::fail_count.eq(0), + drive_files::updated_at.eq(Utc::now()), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + pub fn mark_failed(&self, bot_id: Uuid, file_path: &str) -> Result<(), String> { + let mut conn = self.pool.get().map_err(|e| e.to_string())?; + + diesel::update(drive_files::table) + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::file_path.eq(file_path)), + ) + .set(( + drive_files::fail_count.eq(sql("fail_count + 1")), + drive_files::last_failed_at.eq(Some(Utc::now())), + drive_files::updated_at.eq(Utc::now()), + )) + .execute(&mut conn) + .map_err(|e| e.to_string())?; + + Ok(()) + } + + pub fn get_max_fail_count(&self, bot_id: Uuid) -> i32 { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return 0, + }; + + drive_files::table + .filter(drive_files::bot_id.eq(bot_id)) + .select(max(drive_files::fail_count)) + .first(&mut conn) + .unwrap_or(0) + } + + pub fn get_files_to_index(&self, bot_id: Uuid) -> Vec { + let mut conn = match self.pool.get() { + Ok(c) => c, + Err(_) => return vec![], + }; + + drive_files::table + .filter( + drive_files::bot_id + .eq(bot_id) + .and(drive_files::indexed.eq(false)), + ) + .load(&mut conn) + .unwrap_or_default() + } +} diff --git a/src/drive/drive_monitor/mod.rs b/src/drive/drive_monitor/mod.rs index 38ef17f5..30903396 100644 --- a/src/drive/drive_monitor/mod.rs +++ b/src/drive/drive_monitor/mod.rs @@ -23,6 +23,9 @@ use tokio::time::Duration; use serde::{Deserialize, Serialize}; use tokio::fs as tokio_fs; +#[cfg(any(feature = "research", feature = "llm"))] +use crate::drive::drive_files::DriveFileRepository; + #[cfg(any(feature = "research", feature = "llm"))] static LLM_STREAMING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); @@ -78,6 +81,8 @@ pub struct DriveMonitor { kb_indexed_folders: Arc>>, #[cfg(not(any(feature = "research", feature = "llm")))] _pending_kb_index: Arc>>, + // Database-backed file state repository + file_repo: Arc, } impl DriveMonitor { fn normalize_config_value(value: &str) -> String { @@ -94,6 +99,9 @@ impl DriveMonitor { #[cfg(any(feature = "research", feature = "llm"))] let kb_manager = Arc::new(KnowledgeBaseManager::with_bot_config(work_root.clone(), state.conn.clone(), bot_id)); + // Initialize DB-backed file state repository + let file_repo = Arc::new(DriveFileRepository::new(state.conn.clone())); + Self { state, bucket_name, @@ -113,6 +121,7 @@ impl DriveMonitor { kb_indexed_folders: Arc::new(TokioRwLock::new(HashSet::new())), #[cfg(not(any(feature = "research", feature = "llm")))] _pending_kb_index: Arc::new(TokioRwLock::new(HashSet::new())), + file_repo, } } diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 0d799608..8af464dd 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -24,6 +24,9 @@ use std::sync::Arc; #[cfg(feature = "drive")] pub mod document_processing; +#[cfg(feature = "drive")] +pub mod drive_files; + #[cfg(feature = "drive")] pub mod drive_monitor;