2025-10-18 18:20:02 -03:00
|
|
|
use crate::basic::compiler::BasicCompiler;
|
2025-11-01 09:18:02 -03:00
|
|
|
use crate::config::ConfigManager;
|
2025-10-18 18:20:02 -03:00
|
|
|
use crate::kb::embeddings;
|
|
|
|
|
use crate::kb::qdrant_client;
|
|
|
|
|
use crate::shared::state::AppState;
|
2025-10-30 12:35:25 -03:00
|
|
|
use aws_sdk_s3::Client;
|
2025-10-31 07:30:37 -03:00
|
|
|
use log::{debug, error, info, warn};
|
2025-10-18 18:20:02 -03:00
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::error::Error;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::time::{interval, Duration};
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
|
pub struct FileState {
|
|
|
|
|
pub path: String,
|
|
|
|
|
pub size: i64,
|
|
|
|
|
pub etag: String,
|
|
|
|
|
pub last_modified: Option<String>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct DriveMonitor {
|
|
|
|
|
state: Arc<AppState>,
|
|
|
|
|
bucket_name: String,
|
|
|
|
|
file_states: Arc<tokio::sync::RwLock<HashMap<String, FileState>>>,
|
2025-11-01 09:38:15 -03:00
|
|
|
bot_id: uuid::Uuid,
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DriveMonitor {
|
2025-11-01 09:38:15 -03:00
|
|
|
pub fn new(state: Arc<AppState>, bucket_name: String, bot_id: uuid::Uuid) -> Self {
|
2025-10-18 18:20:02 -03:00
|
|
|
Self {
|
|
|
|
|
state,
|
|
|
|
|
bucket_name,
|
|
|
|
|
file_states: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
|
2025-11-01 09:38:15 -03:00
|
|
|
bot_id,
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn spawn(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
|
|
|
|
|
tokio::spawn(async move {
|
2025-10-31 07:30:37 -03:00
|
|
|
info!(
|
|
|
|
|
"Drive Monitor service started for bucket: {}",
|
|
|
|
|
self.bucket_name
|
|
|
|
|
);
|
2025-10-28 14:00:52 -03:00
|
|
|
let mut tick = interval(Duration::from_secs(30));
|
2025-10-18 18:20:02 -03:00
|
|
|
loop {
|
|
|
|
|
tick.tick().await;
|
|
|
|
|
if let Err(e) = self.check_for_changes().await {
|
|
|
|
|
error!("Error checking for drive changes: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
2025-10-30 12:35:25 -03:00
|
|
|
let client = match &self.state.s3_client {
|
|
|
|
|
Some(client) => client,
|
2025-10-18 18:20:02 -03:00
|
|
|
None => {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
self.check_gbdialog_changes(client).await?;
|
|
|
|
|
self.check_gbkb_changes(client).await?;
|
2025-10-31 07:30:37 -03:00
|
|
|
|
|
|
|
|
if let Err(e) = self.check_gbot(client).await {
|
2025-10-26 14:15:43 -03:00
|
|
|
error!("Error checking default bot config: {}", e);
|
|
|
|
|
}
|
2025-10-18 18:20:02 -03:00
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn check_gbdialog_changes(
|
|
|
|
|
&self,
|
2025-10-30 12:35:25 -03:00
|
|
|
client: &Client,
|
2025-10-18 18:20:02 -03:00
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
|
|
|
let prefix = ".gbdialog/";
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-18 18:20:02 -03:00
|
|
|
let mut current_files = HashMap::new();
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
let mut continuation_token = None;
|
|
|
|
|
loop {
|
2025-10-31 07:30:37 -03:00
|
|
|
let list_objects = client
|
|
|
|
|
.list_objects_v2()
|
|
|
|
|
.bucket(&self.bucket_name.to_lowercase())
|
2025-10-30 12:35:25 -03:00
|
|
|
.set_continuation_token(continuation_token)
|
|
|
|
|
.send()
|
|
|
|
|
.await?;
|
2025-10-31 07:30:37 -03:00
|
|
|
debug!("List objects result: {:?}", list_objects);
|
2025-10-30 12:35:25 -03:00
|
|
|
|
|
|
|
|
for obj in list_objects.contents.unwrap_or_default() {
|
|
|
|
|
let path = obj.key().unwrap_or_default().to_string();
|
2025-10-31 07:30:37 -03:00
|
|
|
let path_parts: Vec<&str> = path.split('/').collect();
|
|
|
|
|
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbdialog") {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2025-10-30 12:35:25 -03:00
|
|
|
if path.ends_with('/') || !path.ends_with(".bas") {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let file_state = FileState {
|
|
|
|
|
path: path.clone(),
|
|
|
|
|
size: obj.size().unwrap_or(0),
|
|
|
|
|
etag: obj.e_tag().unwrap_or_default().to_string(),
|
|
|
|
|
last_modified: obj.last_modified().map(|dt| dt.to_string()),
|
|
|
|
|
};
|
|
|
|
|
current_files.insert(path, file_state);
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
if !list_objects.is_truncated.unwrap_or(false) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
continuation_token = list_objects.next_continuation_token;
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut file_states = self.file_states.write().await;
|
|
|
|
|
for (path, current_state) in current_files.iter() {
|
|
|
|
|
if let Some(previous_state) = file_states.get(path) {
|
|
|
|
|
if current_state.etag != previous_state.etag {
|
2025-10-30 12:35:25 -03:00
|
|
|
if let Err(e) = self.compile_tool(client, path).await {
|
2025-10-18 18:20:02 -03:00
|
|
|
error!("Failed to compile tool {}: {}", path, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2025-10-30 12:35:25 -03:00
|
|
|
if let Err(e) = self.compile_tool(client, path).await {
|
2025-10-18 18:20:02 -03:00
|
|
|
error!("Failed to compile tool {}: {}", path, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let previous_paths: Vec<String> = file_states
|
|
|
|
|
.keys()
|
|
|
|
|
.filter(|k| k.starts_with(prefix))
|
|
|
|
|
.cloned()
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
for path in previous_paths {
|
|
|
|
|
if !current_files.contains_key(&path) {
|
|
|
|
|
file_states.remove(&path);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (path, state) in current_files {
|
|
|
|
|
file_states.insert(path, state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn check_gbkb_changes(
|
|
|
|
|
&self,
|
2025-10-30 12:35:25 -03:00
|
|
|
client: &Client,
|
2025-10-18 18:20:02 -03:00
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
|
|
|
let prefix = ".gbkb/";
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-18 18:20:02 -03:00
|
|
|
let mut current_files = HashMap::new();
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
let mut continuation_token = None;
|
|
|
|
|
loop {
|
2025-10-31 07:30:37 -03:00
|
|
|
let list_objects = client
|
|
|
|
|
.list_objects_v2()
|
|
|
|
|
.bucket(&self.bucket_name.to_lowercase())
|
2025-10-30 12:35:25 -03:00
|
|
|
.prefix(prefix)
|
|
|
|
|
.set_continuation_token(continuation_token)
|
|
|
|
|
.send()
|
|
|
|
|
.await?;
|
2025-10-31 07:30:37 -03:00
|
|
|
debug!("List objects result: {:?}", list_objects);
|
2025-10-30 12:35:25 -03:00
|
|
|
|
|
|
|
|
for obj in list_objects.contents.unwrap_or_default() {
|
|
|
|
|
let path = obj.key().unwrap_or_default().to_string();
|
2025-10-31 07:30:37 -03:00
|
|
|
|
|
|
|
|
let path_parts: Vec<&str> = path.split('/').collect();
|
|
|
|
|
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbkb") {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
if path.ends_with('/') {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2025-10-18 18:20:02 -03:00
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
|
|
|
|
|
if !["pdf", "txt", "md", "docx"].contains(&ext.as_str()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let file_state = FileState {
|
|
|
|
|
path: path.clone(),
|
|
|
|
|
size: obj.size().unwrap_or(0),
|
|
|
|
|
etag: obj.e_tag().unwrap_or_default().to_string(),
|
|
|
|
|
last_modified: obj.last_modified().map(|dt| dt.to_string()),
|
|
|
|
|
};
|
|
|
|
|
current_files.insert(path, file_state);
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
if !list_objects.is_truncated.unwrap_or(false) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
continuation_token = list_objects.next_continuation_token;
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut file_states = self.file_states.write().await;
|
|
|
|
|
for (path, current_state) in current_files.iter() {
|
|
|
|
|
if let Some(previous_state) = file_states.get(path) {
|
|
|
|
|
if current_state.etag != previous_state.etag {
|
2025-10-30 12:35:25 -03:00
|
|
|
if let Err(e) = self.index_document(client, path).await {
|
2025-10-18 18:20:02 -03:00
|
|
|
error!("Failed to index document {}: {}", path, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2025-10-30 12:35:25 -03:00
|
|
|
if let Err(e) = self.index_document(client, path).await {
|
2025-10-18 18:20:02 -03:00
|
|
|
error!("Failed to index document {}: {}", path, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let previous_paths: Vec<String> = file_states
|
|
|
|
|
.keys()
|
|
|
|
|
.filter(|k| k.starts_with(prefix))
|
|
|
|
|
.cloned()
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
for path in previous_paths {
|
|
|
|
|
if !current_files.contains_key(&path) {
|
|
|
|
|
file_states.remove(&path);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (path, state) in current_files {
|
|
|
|
|
file_states.insert(path, state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-01 09:38:15 -03:00
|
|
|
async fn check_gbot(&self, client: &Client) -> Result<(), Box<dyn Error + Send + Sync>> {
|
2025-11-01 14:23:40 -03:00
|
|
|
|
|
|
|
|
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
|
|
|
|
|
|
2025-10-31 07:30:37 -03:00
|
|
|
let mut continuation_token = None;
|
|
|
|
|
|
|
|
|
|
loop {
|
2025-11-01 14:23:40 -03:00
|
|
|
|
2025-10-31 07:30:37 -03:00
|
|
|
let list_objects = client
|
|
|
|
|
.list_objects_v2()
|
|
|
|
|
.bucket(&self.bucket_name.to_lowercase())
|
|
|
|
|
.set_continuation_token(continuation_token)
|
|
|
|
|
.send()
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
for obj in list_objects.contents.unwrap_or_default() {
|
|
|
|
|
let path = obj.key().unwrap_or_default().to_string();
|
|
|
|
|
let path_parts: Vec<&str> = path.split('/').collect();
|
2025-11-01 09:38:15 -03:00
|
|
|
|
2025-10-31 07:30:37 -03:00
|
|
|
if path_parts.len() < 2 || !path_parts[0].ends_with(".gbot") {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
2025-11-01 09:38:15 -03:00
|
|
|
|
2025-10-31 07:30:37 -03:00
|
|
|
if !path.ends_with("config.csv") {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
debug!("Checking config file at path: {}", path);
|
|
|
|
|
match client
|
|
|
|
|
.head_object()
|
2025-10-30 12:35:25 -03:00
|
|
|
.bucket(&self.bucket_name)
|
2025-10-31 07:30:37 -03:00
|
|
|
.key(&path)
|
2025-10-30 12:35:25 -03:00
|
|
|
.send()
|
2025-10-31 07:30:37 -03:00
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(head_res) => {
|
2025-11-01 09:38:15 -03:00
|
|
|
debug!(
|
|
|
|
|
"HeadObject successful for {}, metadata: {:?}",
|
|
|
|
|
path, head_res
|
|
|
|
|
);
|
2025-10-31 07:30:37 -03:00
|
|
|
let response = client
|
|
|
|
|
.get_object()
|
|
|
|
|
.bucket(&self.bucket_name)
|
|
|
|
|
.key(&path)
|
|
|
|
|
.send()
|
2025-11-01 14:23:40 -03:00
|
|
|
.await?;
|
2025-11-01 09:38:15 -03:00
|
|
|
debug!(
|
|
|
|
|
"GetObject successful for {}, content length: {}",
|
|
|
|
|
path,
|
|
|
|
|
response.content_length().unwrap_or(0)
|
|
|
|
|
);
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-11-01 14:23:40 -03:00
|
|
|
|
2025-10-31 07:30:37 -03:00
|
|
|
let bytes = response.body.collect().await?.into_bytes();
|
|
|
|
|
debug!("Collected {} bytes for {}", bytes.len(), path);
|
|
|
|
|
let csv_content = String::from_utf8(bytes.to_vec())
|
|
|
|
|
.map_err(|e| format!("UTF-8 error in {}: {}", path, e))?;
|
|
|
|
|
debug!("Found {}: {} bytes", path, csv_content.len());
|
2025-11-01 09:18:02 -03:00
|
|
|
|
2025-11-01 14:23:40 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
// Restart LLaMA servers only if llm- properties changed
|
|
|
|
|
let llm_lines: Vec<_> = csv_content
|
|
|
|
|
.lines()
|
|
|
|
|
.filter(|line| line.trim_start().starts_with("llm-"))
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
if !llm_lines.is_empty() {
|
|
|
|
|
use crate::llm_legacy::llm_local::ensure_llama_servers_running;
|
|
|
|
|
let mut restart_needed = false;
|
|
|
|
|
|
|
|
|
|
for line in llm_lines {
|
|
|
|
|
let parts: Vec<&str> = line.split(',').collect();
|
|
|
|
|
if parts.len() >= 2 {
|
|
|
|
|
let key = parts[0].trim();
|
|
|
|
|
let new_value = parts[1].trim();
|
|
|
|
|
match config_manager.get_config(&self.bot_id, key, None) {
|
|
|
|
|
Ok(old_value) => {
|
|
|
|
|
if old_value != new_value {
|
|
|
|
|
info!("Detected change in {} (old: {}, new: {})", key, old_value, new_value);
|
|
|
|
|
restart_needed = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
info!("New llm- property detected: {}", key);
|
|
|
|
|
restart_needed = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if restart_needed {
|
|
|
|
|
info!("Detected llm- configuration change, restarting LLaMA servers...");
|
|
|
|
|
if let Err(e) = ensure_llama_servers_running(&self.state).await {
|
|
|
|
|
error!("Failed to restart LLaMA servers after llm- config change: {}", e);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
info!("No llm- property changes detected; skipping LLaMA server restart.");
|
|
|
|
|
}
|
|
|
|
|
config_manager.sync_gbot_config(&self.bot_id, &csv_content);
|
2025-11-01 09:18:02 -03:00
|
|
|
}
|
2025-10-31 07:30:37 -03:00
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
debug!("Config file {} not found or inaccessible: {}", path, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-26 14:15:43 -03:00
|
|
|
}
|
2025-10-31 07:30:37 -03:00
|
|
|
|
|
|
|
|
if !list_objects.is_truncated.unwrap_or(false) {
|
|
|
|
|
break;
|
2025-10-26 14:15:43 -03:00
|
|
|
}
|
2025-10-31 07:30:37 -03:00
|
|
|
continuation_token = list_objects.next_continuation_token;
|
2025-10-26 14:15:43 -03:00
|
|
|
}
|
2025-10-31 07:30:37 -03:00
|
|
|
|
|
|
|
|
Ok(())
|
2025-10-26 14:15:43 -03:00
|
|
|
}
|
|
|
|
|
|
2025-10-18 18:20:02 -03:00
|
|
|
async fn compile_tool(
|
|
|
|
|
&self,
|
2025-10-30 12:35:25 -03:00
|
|
|
client: &Client,
|
2025-10-18 18:20:02 -03:00
|
|
|
file_path: &str,
|
|
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
2025-11-01 09:38:15 -03:00
|
|
|
debug!(
|
|
|
|
|
"Fetching object from S3: bucket={}, key={}",
|
|
|
|
|
&self.bucket_name, file_path
|
|
|
|
|
);
|
2025-10-31 07:30:37 -03:00
|
|
|
let response = match client
|
|
|
|
|
.get_object()
|
2025-10-30 12:35:25 -03:00
|
|
|
.bucket(&self.bucket_name)
|
|
|
|
|
.key(file_path)
|
|
|
|
|
.send()
|
2025-11-01 09:38:15 -03:00
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(res) => {
|
|
|
|
|
debug!(
|
|
|
|
|
"Successfully fetched object from S3: bucket={}, key={}, size={}",
|
|
|
|
|
&self.bucket_name,
|
|
|
|
|
file_path,
|
|
|
|
|
res.content_length().unwrap_or(0)
|
|
|
|
|
);
|
|
|
|
|
res
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!(
|
|
|
|
|
"Failed to fetch object from S3: bucket={}, key={}, error={:?}",
|
|
|
|
|
&self.bucket_name, file_path, e
|
|
|
|
|
);
|
|
|
|
|
return Err(e.into());
|
|
|
|
|
}
|
|
|
|
|
};
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-30 12:35:25 -03:00
|
|
|
let bytes = response.body.collect().await?.into_bytes();
|
|
|
|
|
let source_content = String::from_utf8(bytes.to_vec())?;
|
2025-10-18 18:20:02 -03:00
|
|
|
|
|
|
|
|
let tool_name = file_path
|
2025-10-31 07:30:37 -03:00
|
|
|
.split('/')
|
|
|
|
|
.last()
|
2025-10-18 18:20:02 -03:00
|
|
|
.unwrap_or(file_path)
|
|
|
|
|
.strip_suffix(".bas")
|
|
|
|
|
.unwrap_or(file_path)
|
|
|
|
|
.to_string();
|
|
|
|
|
|
2025-10-20 23:32:49 -03:00
|
|
|
let bot_name = self
|
|
|
|
|
.bucket_name
|
|
|
|
|
.strip_suffix(".gbai")
|
|
|
|
|
.unwrap_or(&self.bucket_name);
|
2025-10-31 07:30:37 -03:00
|
|
|
let work_dir = format!("./work/{}.gbai/{}.gbdialog", bot_name, bot_name);
|
2025-10-20 23:32:49 -03:00
|
|
|
std::fs::create_dir_all(&work_dir)?;
|
2025-10-18 18:20:02 -03:00
|
|
|
|
|
|
|
|
let local_source_path = format!("{}/{}.bas", work_dir, tool_name);
|
|
|
|
|
std::fs::write(&local_source_path, &source_content)?;
|
|
|
|
|
|
|
|
|
|
let compiler = BasicCompiler::new(Arc::clone(&self.state));
|
2025-10-20 23:32:49 -03:00
|
|
|
let result = compiler.compile_file(&local_source_path, &work_dir)?;
|
2025-10-18 18:20:02 -03:00
|
|
|
|
|
|
|
|
if let Some(mcp_tool) = result.mcp_tool {
|
|
|
|
|
info!(
|
2025-10-28 14:00:52 -03:00
|
|
|
"MCP tool definition generated with {} parameters",
|
2025-10-18 18:20:02 -03:00
|
|
|
mcp_tool.input_schema.properties.len()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if result.openai_tool.is_some() {
|
2025-10-28 14:00:52 -03:00
|
|
|
debug!("OpenAI tool definition generated");
|
2025-10-18 18:20:02 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn index_document(
|
|
|
|
|
&self,
|
2025-10-30 12:35:25 -03:00
|
|
|
client: &Client,
|
2025-10-18 18:20:02 -03:00
|
|
|
file_path: &str,
|
|
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
|
|
|
|
let parts: Vec<&str> = file_path.split('/').collect();
|
|
|
|
|
if parts.len() < 3 {
|
|
|
|
|
warn!("Invalid KB path structure: {}", file_path);
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let collection_name = parts[1];
|
2025-10-31 07:30:37 -03:00
|
|
|
let response = client
|
|
|
|
|
.get_object()
|
2025-10-30 12:35:25 -03:00
|
|
|
.bucket(&self.bucket_name)
|
|
|
|
|
.key(file_path)
|
|
|
|
|
.send()
|
|
|
|
|
.await?;
|
|
|
|
|
let bytes = response.body.collect().await?.into_bytes();
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-18 18:20:02 -03:00
|
|
|
let text_content = self.extract_text(file_path, &bytes)?;
|
|
|
|
|
if text_content.trim().is_empty() {
|
|
|
|
|
warn!("No text extracted from: {}", file_path);
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info!(
|
|
|
|
|
"Extracted {} characters from {}",
|
|
|
|
|
text_content.len(),
|
|
|
|
|
file_path
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let qdrant_collection = format!("kb_default_{}", collection_name);
|
|
|
|
|
qdrant_client::ensure_collection_exists(&self.state, &qdrant_collection).await?;
|
2025-10-31 07:30:37 -03:00
|
|
|
|
2025-10-18 18:20:02 -03:00
|
|
|
embeddings::index_document(&self.state, &qdrant_collection, file_path, &text_content)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn extract_text(
|
|
|
|
|
&self,
|
|
|
|
|
file_path: &str,
|
|
|
|
|
content: &[u8],
|
|
|
|
|
) -> Result<String, Box<dyn Error + Send + Sync>> {
|
|
|
|
|
let path_lower = file_path.to_ascii_lowercase();
|
|
|
|
|
if path_lower.ends_with(".pdf") {
|
|
|
|
|
match pdf_extract::extract_text_from_mem(content) {
|
|
|
|
|
Ok(text) => Ok(text),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("PDF extraction failed for {}: {}", file_path, e);
|
|
|
|
|
Err(format!("PDF extraction failed: {}", e).into())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if path_lower.ends_with(".txt") || path_lower.ends_with(".md") {
|
|
|
|
|
String::from_utf8(content.to_vec())
|
|
|
|
|
.map_err(|e| format!("UTF-8 decoding failed: {}", e).into())
|
|
|
|
|
} else {
|
|
|
|
|
String::from_utf8(content.to_vec())
|
|
|
|
|
.map_err(|e| format!("Unsupported file format or UTF-8 error: {}", e).into())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn clear_state(&self) {
|
|
|
|
|
let mut states = self.file_states.write().await;
|
|
|
|
|
states.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|