Fix more clippy warnings: unused imports, raw string hashes, conditional imports
This commit is contained in:
parent
3a8510d191
commit
3a260a5703
3 changed files with 37 additions and 162 deletions
|
|
@ -3,6 +3,7 @@ use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
#[cfg(not(feature = "vectordb"))]
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,54 +1,12 @@
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
use log::{debug, error, info, trace, warn};
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::shared::state::AppState;
|
use crate::shared::state::AppState;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct HybridSearchConfig {
|
pub struct HybridSearchConfig {
|
||||||
|
|
||||||
pub dense_weight: f32,
|
pub dense_weight: f32,
|
||||||
|
|
||||||
pub sparse_weight: f32,
|
pub sparse_weight: f32,
|
||||||
|
|
@ -82,7 +40,6 @@ impl Default for HybridSearchConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HybridSearchConfig {
|
impl HybridSearchConfig {
|
||||||
|
|
||||||
pub fn from_bot_config(state: &AppState, bot_id: Uuid) -> Self {
|
pub fn from_bot_config(state: &AppState, bot_id: Uuid) -> Self {
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
|
|
||||||
|
|
@ -136,7 +93,6 @@ impl HybridSearchConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let total = config.dense_weight + config.sparse_weight;
|
let total = config.dense_weight + config.sparse_weight;
|
||||||
if total > 0.0 {
|
if total > 0.0 {
|
||||||
config.dense_weight /= total;
|
config.dense_weight /= total;
|
||||||
|
|
@ -151,21 +107,17 @@ impl HybridSearchConfig {
|
||||||
config
|
config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn use_sparse_search(&self) -> bool {
|
pub fn use_sparse_search(&self) -> bool {
|
||||||
self.bm25_enabled && self.sparse_weight > 0.0
|
self.bm25_enabled && self.sparse_weight > 0.0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn use_dense_search(&self) -> bool {
|
pub fn use_dense_search(&self) -> bool {
|
||||||
self.dense_weight > 0.0
|
self.dense_weight > 0.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct SearchResult {
|
pub struct SearchResult {
|
||||||
|
|
||||||
pub doc_id: String,
|
pub doc_id: String,
|
||||||
|
|
||||||
pub content: String,
|
pub content: String,
|
||||||
|
|
@ -179,7 +131,6 @@ pub struct SearchResult {
|
||||||
pub search_method: SearchMethod,
|
pub search_method: SearchMethod,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
pub enum SearchMethod {
|
pub enum SearchMethod {
|
||||||
Dense,
|
Dense,
|
||||||
|
|
@ -188,8 +139,6 @@ pub enum SearchMethod {
|
||||||
Reranked,
|
Reranked,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pub struct BM25Index {
|
pub struct BM25Index {
|
||||||
doc_freq: HashMap<String, usize>,
|
doc_freq: HashMap<String, usize>,
|
||||||
doc_count: usize,
|
doc_count: usize,
|
||||||
|
|
@ -341,7 +290,6 @@ impl Default for BM25Index {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct BM25Stats {
|
pub struct BM25Stats {
|
||||||
pub doc_count: usize,
|
pub doc_count: usize,
|
||||||
|
|
@ -350,7 +298,6 @@ pub struct BM25Stats {
|
||||||
pub enabled: bool,
|
pub enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct DocumentEntry {
|
struct DocumentEntry {
|
||||||
pub content: String,
|
pub content: String,
|
||||||
|
|
@ -358,9 +305,7 @@ struct DocumentEntry {
|
||||||
pub metadata: HashMap<String, String>,
|
pub metadata: HashMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct HybridSearchEngine {
|
pub struct HybridSearchEngine {
|
||||||
|
|
||||||
bm25_index: BM25Index,
|
bm25_index: BM25Index,
|
||||||
|
|
||||||
documents: HashMap<String, DocumentEntry>,
|
documents: HashMap<String, DocumentEntry>,
|
||||||
|
|
@ -391,7 +336,6 @@ impl HybridSearchEngine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn index_document(
|
pub async fn index_document(
|
||||||
&mut self,
|
&mut self,
|
||||||
doc_id: &str,
|
doc_id: &str,
|
||||||
|
|
@ -400,10 +344,8 @@ impl HybridSearchEngine {
|
||||||
metadata: HashMap<String, String>,
|
metadata: HashMap<String, String>,
|
||||||
embedding: Option<Vec<f32>>,
|
embedding: Option<Vec<f32>>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
|
|
||||||
self.bm25_index.add_document(doc_id, content, source);
|
self.bm25_index.add_document(doc_id, content, source);
|
||||||
|
|
||||||
|
|
||||||
self.documents.insert(
|
self.documents.insert(
|
||||||
doc_id.to_string(),
|
doc_id.to_string(),
|
||||||
DocumentEntry {
|
DocumentEntry {
|
||||||
|
|
@ -413,7 +355,6 @@ impl HybridSearchEngine {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
if let Some(emb) = embedding {
|
if let Some(emb) = embedding {
|
||||||
self.upsert_to_qdrant(doc_id, &emb).await?;
|
self.upsert_to_qdrant(doc_id, &emb).await?;
|
||||||
}
|
}
|
||||||
|
|
@ -421,12 +362,10 @@ impl HybridSearchEngine {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn commit(&mut self) -> Result<(), String> {
|
pub fn commit(&mut self) -> Result<(), String> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn remove_document(&mut self, doc_id: &str) -> Result<(), String> {
|
pub async fn remove_document(&mut self, doc_id: &str) -> Result<(), String> {
|
||||||
self.bm25_index.remove_document(doc_id);
|
self.bm25_index.remove_document(doc_id);
|
||||||
self.documents.remove(doc_id);
|
self.documents.remove(doc_id);
|
||||||
|
|
@ -434,7 +373,6 @@ impl HybridSearchEngine {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn search(
|
pub async fn search(
|
||||||
&self,
|
&self,
|
||||||
query: &str,
|
query: &str,
|
||||||
|
|
@ -442,7 +380,6 @@ impl HybridSearchEngine {
|
||||||
) -> Result<Vec<SearchResult>, String> {
|
) -> Result<Vec<SearchResult>, String> {
|
||||||
let fetch_count = self.config.max_results * 3;
|
let fetch_count = self.config.max_results * 3;
|
||||||
|
|
||||||
|
|
||||||
let sparse_results: Vec<(String, f32)> = if self.config.use_sparse_search() {
|
let sparse_results: Vec<(String, f32)> = if self.config.use_sparse_search() {
|
||||||
self.bm25_index
|
self.bm25_index
|
||||||
.search(query, fetch_count)
|
.search(query, fetch_count)
|
||||||
|
|
@ -453,7 +390,6 @@ impl HybridSearchEngine {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let dense_results = if self.config.use_dense_search() {
|
let dense_results = if self.config.use_dense_search() {
|
||||||
if let Some(embedding) = query_embedding {
|
if let Some(embedding) = query_embedding {
|
||||||
self.search_qdrant(&embedding, fetch_count).await?
|
self.search_qdrant(&embedding, fetch_count).await?
|
||||||
|
|
@ -464,7 +400,6 @@ impl HybridSearchEngine {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let (results, method) = if sparse_results.is_empty() && dense_results.is_empty() {
|
let (results, method) = if sparse_results.is_empty() && dense_results.is_empty() {
|
||||||
(Vec::new(), SearchMethod::Hybrid)
|
(Vec::new(), SearchMethod::Hybrid)
|
||||||
} else if sparse_results.is_empty() {
|
} else if sparse_results.is_empty() {
|
||||||
|
|
@ -478,7 +413,6 @@ impl HybridSearchEngine {
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let mut search_results: Vec<SearchResult> = results
|
let mut search_results: Vec<SearchResult> = results
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(doc_id, score)| {
|
.filter_map(|(doc_id, score)| {
|
||||||
|
|
@ -495,7 +429,6 @@ impl HybridSearchEngine {
|
||||||
.take(self.config.max_results)
|
.take(self.config.max_results)
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
|
||||||
if self.config.reranker_enabled && !search_results.is_empty() {
|
if self.config.reranker_enabled && !search_results.is_empty() {
|
||||||
search_results = self.rerank(query, search_results).await?;
|
search_results = self.rerank(query, search_results).await?;
|
||||||
}
|
}
|
||||||
|
|
@ -503,7 +436,6 @@ impl HybridSearchEngine {
|
||||||
Ok(search_results)
|
Ok(search_results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn sparse_search(&self, query: &str) -> Vec<SearchResult> {
|
pub fn sparse_search(&self, query: &str) -> Vec<SearchResult> {
|
||||||
let results = self.bm25_index.search(query, self.config.max_results);
|
let results = self.bm25_index.search(query, self.config.max_results);
|
||||||
|
|
||||||
|
|
@ -522,7 +454,6 @@ impl HybridSearchEngine {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn dense_search(
|
pub async fn dense_search(
|
||||||
&self,
|
&self,
|
||||||
query_embedding: Vec<f32>,
|
query_embedding: Vec<f32>,
|
||||||
|
|
@ -548,7 +479,6 @@ impl HybridSearchEngine {
|
||||||
Ok(search_results)
|
Ok(search_results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn reciprocal_rank_fusion(
|
fn reciprocal_rank_fusion(
|
||||||
&self,
|
&self,
|
||||||
sparse: &[(String, f32)],
|
sparse: &[(String, f32)],
|
||||||
|
|
@ -557,23 +487,19 @@ impl HybridSearchEngine {
|
||||||
let k = self.config.rrf_k as f32;
|
let k = self.config.rrf_k as f32;
|
||||||
let mut scores: HashMap<String, f32> = HashMap::new();
|
let mut scores: HashMap<String, f32> = HashMap::new();
|
||||||
|
|
||||||
|
|
||||||
for (rank, (doc_id, _)) in sparse.iter().enumerate() {
|
for (rank, (doc_id, _)) in sparse.iter().enumerate() {
|
||||||
let rrf_score = self.config.sparse_weight / (k + rank as f32 + 1.0);
|
let rrf_score = self.config.sparse_weight / (k + rank as f32 + 1.0);
|
||||||
*scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score;
|
*scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for (rank, (doc_id, _)) in dense.iter().enumerate() {
|
for (rank, (doc_id, _)) in dense.iter().enumerate() {
|
||||||
let rrf_score = self.config.dense_weight / (k + rank as f32 + 1.0);
|
let rrf_score = self.config.dense_weight / (k + rank as f32 + 1.0);
|
||||||
*scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score;
|
*scores.entry(doc_id.clone()).or_insert(0.0) += rrf_score;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let mut results: Vec<(String, f32)> = scores.into_iter().collect();
|
let mut results: Vec<(String, f32)> = scores.into_iter().collect();
|
||||||
results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
|
results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
|
||||||
|
|
||||||
|
|
||||||
let max_score = results.first().map(|(_, s)| *s).unwrap_or(0.0);
|
let max_score = results.first().map(|(_, s)| *s).unwrap_or(0.0);
|
||||||
if max_score > 0.0 {
|
if max_score > 0.0 {
|
||||||
for (_, score) in &mut results {
|
for (_, score) in &mut results {
|
||||||
|
|
@ -584,19 +510,18 @@ impl HybridSearchEngine {
|
||||||
results
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn rerank(
|
async fn rerank(
|
||||||
&self,
|
&self,
|
||||||
query: &str,
|
query: &str,
|
||||||
results: Vec<SearchResult>,
|
results: Vec<SearchResult>,
|
||||||
) -> Result<Vec<SearchResult>, String> {
|
) -> Result<Vec<SearchResult>, String> {
|
||||||
|
|
||||||
|
|
||||||
let mut reranked = results;
|
let mut reranked = results;
|
||||||
|
|
||||||
let query_lower = query.to_lowercase();
|
let query_lower = query.to_lowercase();
|
||||||
let query_terms: std::collections::HashSet<String> =
|
let query_terms: std::collections::HashSet<String> = query_lower
|
||||||
query_lower.split_whitespace().map(|s| s.to_string()).collect();
|
.split_whitespace()
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.collect();
|
||||||
let query_terms_len = query_terms.len();
|
let query_terms_len = query_terms.len();
|
||||||
|
|
||||||
for result in &mut reranked {
|
for result in &mut reranked {
|
||||||
|
|
@ -623,7 +548,6 @@ impl HybridSearchEngine {
|
||||||
Ok(reranked)
|
Ok(reranked)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn search_qdrant(
|
async fn search_qdrant(
|
||||||
&self,
|
&self,
|
||||||
embedding: &[f32],
|
embedding: &[f32],
|
||||||
|
|
@ -673,7 +597,6 @@ impl HybridSearchEngine {
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn upsert_to_qdrant(&self, doc_id: &str, embedding: &[f32]) -> Result<(), String> {
|
async fn upsert_to_qdrant(&self, doc_id: &str, embedding: &[f32]) -> Result<(), String> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
|
@ -702,7 +625,6 @@ impl HybridSearchEngine {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn delete_from_qdrant(&self, doc_id: &str) -> Result<(), String> {
|
async fn delete_from_qdrant(&self, doc_id: &str) -> Result<(), String> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
|
@ -731,7 +653,6 @@ impl HybridSearchEngine {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn stats(&self) -> HybridSearchStats {
|
pub fn stats(&self) -> HybridSearchStats {
|
||||||
let bm25_stats = self.bm25_index.stats();
|
let bm25_stats = self.bm25_index.stats();
|
||||||
|
|
||||||
|
|
@ -746,7 +667,6 @@ impl HybridSearchEngine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct HybridSearchStats {
|
pub struct HybridSearchStats {
|
||||||
pub total_documents: usize,
|
pub total_documents: usize,
|
||||||
|
|
@ -757,7 +677,6 @@ pub struct HybridSearchStats {
|
||||||
pub config: HybridSearchConfig,
|
pub config: HybridSearchConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct QueryDecomposer {
|
pub struct QueryDecomposer {
|
||||||
llm_endpoint: String,
|
llm_endpoint: String,
|
||||||
api_key: String,
|
api_key: String,
|
||||||
|
|
@ -771,11 +690,9 @@ impl QueryDecomposer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn decompose(&self, query: &str) -> Result<Vec<String>, String> {
|
pub async fn decompose(&self, query: &str) -> Result<Vec<String>, String> {
|
||||||
let mut sub_queries = Vec::new();
|
let mut sub_queries = Vec::new();
|
||||||
|
|
||||||
|
|
||||||
let conjunctions = ["and", "also", "as well as", "in addition to"];
|
let conjunctions = ["and", "also", "as well as", "in addition to"];
|
||||||
let mut parts: Vec<&str> = vec![query];
|
let mut parts: Vec<&str> = vec![query];
|
||||||
|
|
||||||
|
|
@ -821,7 +738,6 @@ impl QueryDecomposer {
|
||||||
Ok(sub_queries)
|
Ok(sub_queries)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn synthesize(&self, query: &str, sub_answers: &[String]) -> String {
|
pub fn synthesize(&self, query: &str, sub_answers: &[String]) -> String {
|
||||||
if sub_answers.len() == 1 {
|
if sub_answers.len() == 1 {
|
||||||
return sub_answers[0].clone();
|
return sub_answers[0].clone();
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use diesel::RunQueryDsl;
|
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
@ -50,7 +49,6 @@ impl UserWorkspace {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub enum IndexingStatus {
|
pub enum IndexingStatus {
|
||||||
Idle,
|
Idle,
|
||||||
|
|
@ -59,7 +57,6 @@ pub enum IndexingStatus {
|
||||||
Failed(String),
|
Failed(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct IndexingStats {
|
pub struct IndexingStats {
|
||||||
pub emails_indexed: u64,
|
pub emails_indexed: u64,
|
||||||
|
|
@ -70,7 +67,6 @@ pub struct IndexingStats {
|
||||||
pub errors: u64,
|
pub errors: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
struct UserIndexingJob {
|
struct UserIndexingJob {
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
bot_id: Uuid,
|
bot_id: Uuid,
|
||||||
|
|
@ -83,7 +79,6 @@ struct UserIndexingJob {
|
||||||
status: IndexingStatus,
|
status: IndexingStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct VectorDBIndexer {
|
pub struct VectorDBIndexer {
|
||||||
db_pool: DbPool,
|
db_pool: DbPool,
|
||||||
work_root: PathBuf,
|
work_root: PathBuf,
|
||||||
|
|
@ -96,7 +91,6 @@ pub struct VectorDBIndexer {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VectorDBIndexer {
|
impl VectorDBIndexer {
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
db_pool: DbPool,
|
db_pool: DbPool,
|
||||||
work_root: PathBuf,
|
work_root: PathBuf,
|
||||||
|
|
@ -115,7 +109,6 @@ impl VectorDBIndexer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn start(self: Arc<Self>) -> Result<()> {
|
pub async fn start(self: Arc<Self>) -> Result<()> {
|
||||||
let mut running = self.running.write().await;
|
let mut running = self.running.write().await;
|
||||||
if *running {
|
if *running {
|
||||||
|
|
@ -135,17 +128,14 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn stop(&self) {
|
pub async fn stop(&self) {
|
||||||
let mut running = self.running.write().await;
|
let mut running = self.running.write().await;
|
||||||
*running = false;
|
*running = false;
|
||||||
info!("🛑 Stopping Vector DB Indexer");
|
info!("🛑 Stopping Vector DB Indexer");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn run_indexing_loop(self: Arc<Self>) {
|
async fn run_indexing_loop(self: Arc<Self>) {
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
{
|
{
|
||||||
let running = self.running.read().await;
|
let running = self.running.read().await;
|
||||||
if !*running {
|
if !*running {
|
||||||
|
|
@ -155,7 +145,6 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
info!(" Running vector DB indexing cycle...");
|
info!(" Running vector DB indexing cycle...");
|
||||||
|
|
||||||
|
|
||||||
match self.get_active_users().await {
|
match self.get_active_users().await {
|
||||||
Ok(users) => {
|
Ok(users) => {
|
||||||
info!("Found {} active users to index", users.len());
|
info!("Found {} active users to index", users.len());
|
||||||
|
|
@ -173,14 +162,12 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
info!(" Indexing cycle complete");
|
info!(" Indexing cycle complete");
|
||||||
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(self.interval_seconds)).await;
|
sleep(Duration::from_secs(self.interval_seconds)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Vector DB Indexer stopped");
|
info!("Vector DB Indexer stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn get_active_users(&self) -> Result<Vec<(Uuid, Uuid)>> {
|
async fn get_active_users(&self) -> Result<Vec<(Uuid, Uuid)>> {
|
||||||
let conn = self.db_pool.clone();
|
let conn = self.db_pool.clone();
|
||||||
|
|
||||||
|
|
@ -190,7 +177,6 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
let mut db_conn = conn.get()?;
|
let mut db_conn = conn.get()?;
|
||||||
|
|
||||||
|
|
||||||
let results: Vec<(Uuid, Uuid)> = user_sessions
|
let results: Vec<(Uuid, Uuid)> = user_sessions
|
||||||
.select((user_id, bot_id))
|
.select((user_id, bot_id))
|
||||||
.distinct()
|
.distinct()
|
||||||
|
|
@ -201,11 +187,9 @@ impl VectorDBIndexer {
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn index_user_data(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> {
|
async fn index_user_data(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> {
|
||||||
info!("Indexing user: {} (bot: {})", user_id, bot_id);
|
info!("Indexing user: {} (bot: {})", user_id, bot_id);
|
||||||
|
|
||||||
|
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
let job = jobs.entry(user_id).or_insert_with(|| {
|
let job = jobs.entry(user_id).or_insert_with(|| {
|
||||||
let workspace = UserWorkspace::new(self.work_root.clone(), &bot_id, &user_id);
|
let workspace = UserWorkspace::new(self.work_root.clone(), &bot_id, &user_id);
|
||||||
|
|
@ -235,7 +219,6 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
job.status = IndexingStatus::Running;
|
job.status = IndexingStatus::Running;
|
||||||
|
|
||||||
|
|
||||||
if job.email_db.is_none() {
|
if job.email_db.is_none() {
|
||||||
let mut email_db =
|
let mut email_db =
|
||||||
UserEmailVectorDB::new(user_id, bot_id, job.workspace.email_vectordb().into());
|
UserEmailVectorDB::new(user_id, bot_id, job.workspace.email_vectordb().into());
|
||||||
|
|
@ -264,17 +247,14 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
drop(jobs);
|
drop(jobs);
|
||||||
|
|
||||||
|
|
||||||
if let Err(e) = self.index_user_emails(user_id).await {
|
if let Err(e) = self.index_user_emails(user_id).await {
|
||||||
error!("Failed to index emails for user {}: {}", user_id, e);
|
error!("Failed to index emails for user {}: {}", user_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if let Err(e) = self.index_user_files(user_id).await {
|
if let Err(e) = self.index_user_files(user_id).await {
|
||||||
error!("Failed to index files for user {}: {}", user_id, e);
|
error!("Failed to index files for user {}: {}", user_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(&user_id) {
|
if let Some(job) = jobs.get_mut(&user_id) {
|
||||||
job.status = IndexingStatus::Idle;
|
job.status = IndexingStatus::Idle;
|
||||||
|
|
@ -284,7 +264,6 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn index_user_emails(&self, user_id: Uuid) -> Result<()> {
|
async fn index_user_emails(&self, user_id: Uuid) -> Result<()> {
|
||||||
let jobs = self.jobs.read().await;
|
let jobs = self.jobs.read().await;
|
||||||
let job = jobs
|
let job = jobs
|
||||||
|
|
@ -299,7 +278,6 @@ impl VectorDBIndexer {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let accounts = self.get_user_email_accounts(user_id).await?;
|
let accounts = self.get_user_email_accounts(user_id).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
|
@ -309,7 +287,6 @@ impl VectorDBIndexer {
|
||||||
);
|
);
|
||||||
|
|
||||||
for account_id in accounts {
|
for account_id in accounts {
|
||||||
|
|
||||||
match self.get_unindexed_emails(user_id, &account_id).await {
|
match self.get_unindexed_emails(user_id, &account_id).await {
|
||||||
Ok(emails) => {
|
Ok(emails) => {
|
||||||
if emails.is_empty() {
|
if emails.is_empty() {
|
||||||
|
|
@ -322,7 +299,6 @@ impl VectorDBIndexer {
|
||||||
account_id
|
account_id
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
for chunk in emails.chunks(self.batch_size) {
|
for chunk in emails.chunks(self.batch_size) {
|
||||||
for email in chunk {
|
for email in chunk {
|
||||||
match self.embedding_generator.generate_embedding(&email).await {
|
match self.embedding_generator.generate_embedding(&email).await {
|
||||||
|
|
@ -342,7 +318,6 @@ impl VectorDBIndexer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -358,7 +333,6 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn index_user_files(&self, user_id: Uuid) -> Result<()> {
|
async fn index_user_files(&self, user_id: Uuid) -> Result<()> {
|
||||||
let jobs = self.jobs.read().await;
|
let jobs = self.jobs.read().await;
|
||||||
let job = jobs
|
let job = jobs
|
||||||
|
|
@ -373,7 +347,6 @@ impl VectorDBIndexer {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
match self.get_unindexed_files(user_id).await {
|
match self.get_unindexed_files(user_id).await {
|
||||||
Ok(files) => {
|
Ok(files) => {
|
||||||
if files.is_empty() {
|
if files.is_empty() {
|
||||||
|
|
@ -382,16 +355,13 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
info!("Indexing {} files for user {}", files.len(), user_id);
|
info!("Indexing {} files for user {}", files.len(), user_id);
|
||||||
|
|
||||||
|
|
||||||
for chunk in files.chunks(self.batch_size) {
|
for chunk in files.chunks(self.batch_size) {
|
||||||
for file in chunk {
|
for file in chunk {
|
||||||
|
|
||||||
let mime_type = file.mime_type.as_ref().map(|s| s.as_str()).unwrap_or("");
|
let mime_type = file.mime_type.as_ref().map(|s| s.as_str()).unwrap_or("");
|
||||||
if !FileContentExtractor::should_index(&mime_type, file.file_size) {
|
if !FileContentExtractor::should_index(&mime_type, file.file_size) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let text = format!(
|
let text = format!(
|
||||||
"File: {}\nType: {}\n\n{}",
|
"File: {}\nType: {}\n\n{}",
|
||||||
file.file_name, file.file_type, file.content_text
|
file.file_name, file.file_type, file.content_text
|
||||||
|
|
@ -415,7 +385,6 @@ impl VectorDBIndexer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -427,7 +396,6 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn get_user_email_accounts(&self, user_id: Uuid) -> Result<Vec<String>> {
|
async fn get_user_email_accounts(&self, user_id: Uuid) -> Result<Vec<String>> {
|
||||||
let conn = self.db_pool.clone();
|
let conn = self.db_pool.clone();
|
||||||
|
|
||||||
|
|
@ -490,7 +458,7 @@ impl VectorDBIndexer {
|
||||||
folder: String,
|
folder: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
let query = r#"
|
let query = r"
|
||||||
SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses,
|
SELECT e.id, e.message_id, e.subject, e.from_address, e.to_addresses,
|
||||||
e.body_text, e.body_html, e.received_at, e.folder
|
e.body_text, e.body_html, e.received_at, e.folder
|
||||||
FROM emails e
|
FROM emails e
|
||||||
|
|
@ -500,7 +468,7 @@ impl VectorDBIndexer {
|
||||||
AND (eis.indexed_at IS NULL OR eis.needs_reindex = true)
|
AND (eis.indexed_at IS NULL OR eis.needs_reindex = true)
|
||||||
ORDER BY e.received_at DESC
|
ORDER BY e.received_at DESC
|
||||||
LIMIT 100
|
LIMIT 100
|
||||||
"#;
|
";
|
||||||
|
|
||||||
let rows: Vec<EmailRow> = diesel::sql_query(query)
|
let rows: Vec<EmailRow> = diesel::sql_query(query)
|
||||||
.bind::<diesel::sql_types::Uuid, _>(user_id)
|
.bind::<diesel::sql_types::Uuid, _>(user_id)
|
||||||
|
|
@ -510,8 +478,7 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
let emails: Vec<EmailDocument> = rows
|
let emails: Vec<EmailDocument> = rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|row| {
|
.map(|row| EmailDocument {
|
||||||
EmailDocument {
|
|
||||||
id: row.id.to_string(),
|
id: row.id.to_string(),
|
||||||
account_id: account_id.clone(),
|
account_id: account_id.clone(),
|
||||||
from_email: row.from_address.clone(),
|
from_email: row.from_address.clone(),
|
||||||
|
|
@ -523,7 +490,6 @@ impl VectorDBIndexer {
|
||||||
folder: row.folder,
|
folder: row.folder,
|
||||||
has_attachments: false,
|
has_attachments: false,
|
||||||
thread_id: None,
|
thread_id: None,
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
|
@ -566,17 +532,16 @@ impl VectorDBIndexer {
|
||||||
modified_at: DateTime<Utc>,
|
modified_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
let query = r#"
|
let query = r"
|
||||||
SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size,
|
SELECT f.id, f.file_path, f.file_name, f.file_type, f.file_size,
|
||||||
f.bucket, f.mime_type, f.created_at, f.modified_at
|
f.bucket, f.mime_type, f.created_at, f.modified_at
|
||||||
FROM files f
|
FROM user_files f
|
||||||
LEFT JOIN file_index_status fis ON f.id = fis.file_id
|
LEFT JOIN file_index_status fis ON f.id = fis.file_id
|
||||||
WHERE f.user_id = $1
|
WHERE f.user_id = $1
|
||||||
AND (fis.indexed_at IS NULL OR fis.needs_reindex = true)
|
AND (fis.indexed_at IS NULL OR fis.needs_reindex = true)
|
||||||
AND f.file_size < 10485760
|
|
||||||
ORDER BY f.modified_at DESC
|
ORDER BY f.modified_at DESC
|
||||||
LIMIT 100
|
LIMIT 100
|
||||||
"#;
|
";
|
||||||
|
|
||||||
let rows: Vec<FileRow> = diesel::sql_query(query)
|
let rows: Vec<FileRow> = diesel::sql_query(query)
|
||||||
.bind::<diesel::sql_types::Uuid, _>(user_id)
|
.bind::<diesel::sql_types::Uuid, _>(user_id)
|
||||||
|
|
@ -585,8 +550,7 @@ impl VectorDBIndexer {
|
||||||
|
|
||||||
let files: Vec<FileDocument> = rows
|
let files: Vec<FileDocument> = rows
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|row| {
|
.map(|row| FileDocument {
|
||||||
FileDocument {
|
|
||||||
id: row.id.to_string(),
|
id: row.id.to_string(),
|
||||||
file_path: row.file_path,
|
file_path: row.file_path,
|
||||||
file_name: row.file_name,
|
file_name: row.file_name,
|
||||||
|
|
@ -600,7 +564,6 @@ impl VectorDBIndexer {
|
||||||
indexed_at: Utc::now(),
|
indexed_at: Utc::now(),
|
||||||
mime_type: row.mime_type,
|
mime_type: row.mime_type,
|
||||||
tags: Vec::new(),
|
tags: Vec::new(),
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
|
@ -611,13 +574,11 @@ impl VectorDBIndexer {
|
||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn get_user_stats(&self, user_id: Uuid) -> Option<IndexingStats> {
|
pub async fn get_user_stats(&self, user_id: Uuid) -> Option<IndexingStats> {
|
||||||
let jobs = self.jobs.read().await;
|
let jobs = self.jobs.read().await;
|
||||||
jobs.get(&user_id).map(|job| job.stats.clone())
|
jobs.get(&user_id).map(|job| job.stats.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn get_overall_stats(&self) -> IndexingStats {
|
pub async fn get_overall_stats(&self) -> IndexingStats {
|
||||||
let jobs = self.jobs.read().await;
|
let jobs = self.jobs.read().await;
|
||||||
|
|
||||||
|
|
@ -647,7 +608,6 @@ impl VectorDBIndexer {
|
||||||
total_stats
|
total_stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn pause_user_indexing(&self, user_id: Uuid) -> Result<()> {
|
pub async fn pause_user_indexing(&self, user_id: Uuid) -> Result<()> {
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(&user_id) {
|
if let Some(job) = jobs.get_mut(&user_id) {
|
||||||
|
|
@ -657,7 +617,6 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn resume_user_indexing(&self, user_id: Uuid) -> Result<()> {
|
pub async fn resume_user_indexing(&self, user_id: Uuid) -> Result<()> {
|
||||||
let mut jobs = self.jobs.write().await;
|
let mut jobs = self.jobs.write().await;
|
||||||
if let Some(job) = jobs.get_mut(&user_id) {
|
if let Some(job) = jobs.get_mut(&user_id) {
|
||||||
|
|
@ -667,7 +626,6 @@ impl VectorDBIndexer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub async fn trigger_user_indexing(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> {
|
pub async fn trigger_user_indexing(&self, user_id: Uuid, bot_id: Uuid) -> Result<()> {
|
||||||
info!(" Triggering immediate indexing for user {}", user_id);
|
info!(" Triggering immediate indexing for user {}", user_id);
|
||||||
self.index_user_data(user_id, bot_id).await
|
self.index_user_data(user_id, bot_id).await
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue