Compare commits

..

No commits in common. "859db6b8a0b654d632de7b8fd16a66c50c1861d2" and "c03398fe5615c65879a706043935d0297f3a01fd" have entirely different histories.

19 changed files with 742 additions and 1580 deletions

View file

@ -111,7 +111,6 @@ dirs = { workspace = true }
dotenvy = { workspace = true } dotenvy = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
futures-util = { workspace = true } futures-util = { workspace = true }
git2 = "0.19"
hex = { workspace = true } hex = { workspace = true }
hmac = { workspace = true } hmac = { workspace = true }
log = { workspace = true } log = { workspace = true }

View file

@ -478,10 +478,6 @@ fn associate_website_with_session_refresh(
register_website_for_crawling_with_refresh(&mut conn, &user.bot_id, url, refresh_interval) register_website_for_crawling_with_refresh(&mut conn, &user.bot_id, url, refresh_interval)
.map_err(|e| format!("Failed to register website: {}", e))?; .map_err(|e| format!("Failed to register website: {}", e))?;
// ADD TO SESSION EVEN IF CRAWL IS PENDING!
// Otherwise kb_context will think the session has no website associated if start.bas only runs once.
add_website_to_session(&mut conn, &user.id, &user.bot_id, url, &collection_name)?;
return Ok(format!( return Ok(format!(
"Website {} has been registered for crawling (refresh: {}). It will be available once crawling completes.", "Website {} has been registered for crawling (refresh: {}). It will be available once crawling completes.",
url, refresh_interval url, refresh_interval

View file

@ -294,7 +294,7 @@ impl BootstrapManager {
} }
// Install other core components (names must match 3rdparty.toml) // Install other core components (names must match 3rdparty.toml)
let core_components = ["tables", "cache", "drive", "directory", "llm", "vector_db"]; let core_components = ["tables", "cache", "drive", "directory", "llm"];
for component in core_components { for component in core_components {
if !pm.is_installed(component) { if !pm.is_installed(component) {
info!("Installing {}...", component); info!("Installing {}...", component);

View file

@ -7,7 +7,6 @@ use uuid::Uuid;
use crate::core::kb::KnowledgeBaseManager; use crate::core::kb::KnowledgeBaseManager;
use crate::core::shared::utils::DbPool; use crate::core::shared::utils::DbPool;
use crate::core::kb::{EmbeddingConfig, KbIndexer, QdrantConfig};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionKbAssociation { pub struct SessionKbAssociation {
@ -239,116 +238,56 @@ impl KbContextManager {
Ok(kb_contexts) Ok(kb_contexts)
} }
async fn get_collection_dimension(&self, qdrant_config: &QdrantConfig, collection_name: &str) -> Result<Option<usize>> { async fn search_single_collection(
let http_client = crate::core::shared::utils::create_tls_client(Some(10)); &self,
let check_url = format!("{}/collections/{}", qdrant_config.url, collection_name); collection_name: &str,
display_name: &str,
query: &str,
max_results: usize,
max_tokens: usize,
) -> Result<KbContext> {
debug!("Searching collection '{}' with query: {}", collection_name, query);
let response = http_client.get(&check_url).send().await?; let search_results = self
.kb_manager
.search_collection(collection_name, query, max_results)
.await?;
if !response.status().is_success() { let mut kb_search_results = Vec::new();
debug!("Could not get collection info for '{}', using default dimension", collection_name); let mut total_tokens = 0;
return Ok(None);
for result in search_results {
let tokens = estimate_tokens(&result.content);
if total_tokens + tokens > max_tokens {
debug!(
"Skipping result due to token limit ({} + {} > {})",
total_tokens, tokens, max_tokens
);
break;
} }
let info_json: serde_json::Value = response.json().await?; kb_search_results.push(KbSearchResult {
let dimension = info_json["result"]["config"]["params"]["vectors"]["size"] content: result.content,
.as_u64() document_path: result.document_path,
.map(|d| d as usize); score: result.score,
chunk_tokens: tokens,
});
Ok(dimension) total_tokens += tokens;
}
async fn search_single_collection( if result.score < 0.6 {
&self, debug!("Skipping low-relevance result (score: {})", result.score);
collection_name: &str, break;
display_name: &str,
query: &str,
max_results: usize,
max_tokens: usize,
) -> Result<KbContext> {
debug!("Searching collection '{}' with query: {}", collection_name, query);
// Extract bot_name from collection_name (format: "{bot_name}_{kb_name}")
let bot_name = collection_name.split('_').next().unwrap_or("default");
// Get bot_id from bot_name
let bot_id = self.get_bot_id_by_name(bot_name).await?;
// Load embedding config from database for this bot
let mut embedding_config = EmbeddingConfig::from_bot_config(&self.db_pool, &bot_id);
let qdrant_config = QdrantConfig::default();
// Query Qdrant to get the collection's actual vector dimension
let collection_dimension = self.get_collection_dimension(&qdrant_config, collection_name).await?;
// Override the embedding config dimension to match the collection
if let Some(dim) = collection_dimension {
if dim != embedding_config.dimensions {
debug!(
"Overriding embedding dimension from {} to {} to match collection '{}'",
embedding_config.dimensions, dim, collection_name
);
embedding_config.dimensions = dim;
}
} }
// Create a temporary indexer with bot-specific config
let indexer = KbIndexer::new(embedding_config, qdrant_config);
// Use the bot-specific indexer for search
let search_results = indexer
.search(collection_name, query, max_results)
.await?;
let mut kb_search_results = Vec::new();
let mut total_tokens = 0;
for result in search_results {
let tokens = estimate_tokens(&result.content);
if total_tokens + tokens > max_tokens {
debug!(
"Skipping result due to token limit ({} + {} > {})",
total_tokens, tokens, max_tokens
);
break;
}
kb_search_results.push(KbSearchResult {
content: result.content,
document_path: result.document_path,
score: result.score,
chunk_tokens: tokens,
});
total_tokens += tokens;
if result.score < 0.4 {
debug!("Skipping low-relevance result (score: {})", result.score);
break;
}
}
Ok(KbContext {
kb_name: display_name.to_string(),
search_results: kb_search_results,
total_tokens,
})
} }
async fn get_bot_id_by_name(&self, bot_name: &str) -> Result<Uuid> { Ok(KbContext {
use crate::core::shared::models::schema::bots::dsl::*; kb_name: display_name.to_string(),
search_results: kb_search_results,
let mut conn = self.db_pool.get()?; total_tokens,
})
let bot_uuid: Uuid = bots }
.filter(name.eq(bot_name))
.select(id)
.first(&mut conn)
.map_err(|e| anyhow::anyhow!("Failed to find bot '{}': {}", bot_name, e))?;
Ok(bot_uuid)
}
async fn search_single_kb( async fn search_single_kb(
&self, &self,
@ -388,7 +327,7 @@ impl KbContextManager {
total_tokens += tokens; total_tokens += tokens;
if result.score < 0.5 { if result.score < 0.7 {
debug!("Skipping low-relevance result (score: {})", result.score); debug!("Skipping low-relevance result (score: {})", result.score);
break; break;
} }
@ -434,7 +373,7 @@ impl KbContextManager {
context_parts.push("\n--- End Knowledge Base Context ---\n".to_string()); context_parts.push("\n--- End Knowledge Base Context ---\n".to_string());
let full_context = context_parts.join("\n"); let full_context = context_parts.join("\n");
// Truncate KB context to fit within token limits (max 400 tokens for KB context) // Truncate KB context to fit within token limits (max 400 tokens for KB context)
crate::core::shared::utils::truncate_text_for_model(&full_context, "local", 400) crate::core::shared::utils::truncate_text_for_model(&full_context, "local", 400)
} }

View file

@ -5,7 +5,7 @@ use serde_json::Value;
// use std::collections::HashMap; // use std::collections::HashMap;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::Write; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
@ -37,10 +37,7 @@ pub struct ToolExecutor;
impl ToolExecutor { impl ToolExecutor {
/// Log tool execution errors to a dedicated log file /// Log tool execution errors to a dedicated log file
fn log_tool_error(bot_name: &str, tool_name: &str, error_msg: &str) { fn log_tool_error(bot_name: &str, tool_name: &str, error_msg: &str) {
let log_path = std::env::current_dir() let log_path = Path::new("work").join(format!("{}_tool_errors.log", bot_name));
.unwrap_or_else(|_| std::path::PathBuf::from("."))
.join("botserver-stack/data/system/work")
.join(format!("{}_tool_errors.log", bot_name));
// Create work directory if it doesn't exist // Create work directory if it doesn't exist
if let Some(parent) = log_path.parent() { if let Some(parent) = log_path.parent() {
@ -343,30 +340,28 @@ impl ToolExecutor {
/// Get the path to a tool's .bas file /// Get the path to a tool's .bas file
fn get_tool_bas_path(bot_name: &str, tool_name: &str) -> std::path::PathBuf { fn get_tool_bas_path(bot_name: &str, tool_name: &str) -> std::path::PathBuf {
// Try source directory first (/opt/gbo/data - primary location for bot source files) let home_dir = std::env::var("HOME").unwrap_or_else(|_| ".".to_string());
let source_path = Path::new("/opt/gbo/data")
// Try data directory first
let data_path = Path::new(&home_dir)
.join("data")
.join(format!("{}.gbai", bot_name)) .join(format!("{}.gbai", bot_name))
.join(format!("{}.gbdialog", bot_name)) .join(format!("{}.gbdialog", bot_name))
.join(format!("{}.bas", tool_name)); .join(format!("{}.bas", tool_name));
if source_path.exists() { if data_path.exists() {
return source_path; return data_path;
} }
// Try compiled work directory (botserver-stack/data/system/work relative to current dir) // Try work directory (for development/testing)
let work_path = std::env::current_dir() let work_path = Path::new(&home_dir)
.unwrap_or_else(|_| PathBuf::from(".")) .join("gb")
.join("botserver-stack/data/system/work") .join("work")
.join(format!("{}.gbai", bot_name)) .join(format!("{}.gbai", bot_name))
.join(format!("{}.gbdialog", bot_name)) .join(format!("{}.gbdialog", bot_name))
.join(format!("{}.bas", tool_name)); .join(format!("{}.bas", tool_name));
if work_path.exists() { work_path
return work_path;
}
// Fallback to source path for error messages (even if it doesn't exist)
source_path
} }
} }

View file

@ -28,7 +28,6 @@ pub fn set_embedding_server_ready(ready: bool) {
pub struct EmbeddingConfig { pub struct EmbeddingConfig {
pub embedding_url: String, pub embedding_url: String,
pub embedding_model: String, pub embedding_model: String,
pub embedding_key: Option<String>,
pub dimensions: usize, pub dimensions: usize,
pub batch_size: usize, pub batch_size: usize,
pub timeout_seconds: u64, pub timeout_seconds: u64,
@ -40,9 +39,8 @@ impl Default for EmbeddingConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
embedding_url: "http://localhost:8082".to_string(), embedding_url: "http://localhost:8082".to_string(),
embedding_model: "BAAI/bge-multilingual-gemma2".to_string(), embedding_model: "bge-small-en-v1.5".to_string(),
embedding_key: None, dimensions: 384,
dimensions: 2048,
batch_size: 16, batch_size: 16,
timeout_seconds: 60, timeout_seconds: 60,
max_concurrent_requests: 1, max_concurrent_requests: 1,
@ -63,14 +61,13 @@ impl EmbeddingConfig {
/// embedding-dimensions,384 /// embedding-dimensions,384
/// embedding-batch-size,16 /// embedding-batch-size,16
/// embedding-timeout,60 /// embedding-timeout,60
/// embedding-key,hf_xxxxx (for HuggingFace API)
pub fn from_bot_config(pool: &DbPool, _bot_id: &uuid::Uuid) -> Self { pub fn from_bot_config(pool: &DbPool, _bot_id: &uuid::Uuid) -> Self {
use crate::core::shared::models::schema::bot_configuration::dsl::*; use crate::core::shared::models::schema::bot_configuration::dsl::*;
use diesel::prelude::*; use diesel::prelude::*;
let embedding_url = match pool.get() { let embedding_url = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-url")) .filter(config_key.eq("embedding-url"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
@ -81,29 +78,18 @@ impl EmbeddingConfig {
let embedding_model = match pool.get() { let embedding_model = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-model")) .filter(config_key.eq("embedding-model"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
.ok() .ok()
.filter(|s| !s.is_empty()), .filter(|s| !s.is_empty()),
Err(_) => None, Err(_) => None,
}.unwrap_or_else(|| "BAAI/bge-multilingual-gemma2".to_string()); }.unwrap_or_else(|| "bge-small-en-v1.5".to_string());
let embedding_key = match pool.get() {
Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id))
.filter(config_key.eq("embedding-key"))
.select(config_value)
.first::<String>(&mut conn)
.ok()
.filter(|s| !s.is_empty()),
Err(_) => None,
};
let dimensions = match pool.get() { let dimensions = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-dimensions")) .filter(config_key.eq("embedding-dimensions"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
@ -114,7 +100,7 @@ impl EmbeddingConfig {
let batch_size = match pool.get() { let batch_size = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-batch-size")) .filter(config_key.eq("embedding-batch-size"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
@ -125,7 +111,7 @@ impl EmbeddingConfig {
let timeout_seconds = match pool.get() { let timeout_seconds = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-timeout")) .filter(config_key.eq("embedding-timeout"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
@ -136,7 +122,7 @@ impl EmbeddingConfig {
let max_concurrent_requests = match pool.get() { let max_concurrent_requests = match pool.get() {
Ok(mut conn) => bot_configuration Ok(mut conn) => bot_configuration
.filter(bot_id.eq(_bot_id)) .filter(bot_id.eq(bot_id))
.filter(config_key.eq("embedding-concurrent")) .filter(config_key.eq("embedding-concurrent"))
.select(config_value) .select(config_value)
.first::<String>(&mut conn) .first::<String>(&mut conn)
@ -148,7 +134,6 @@ impl EmbeddingConfig {
Self { Self {
embedding_url, embedding_url,
embedding_model, embedding_model,
embedding_key,
dimensions, dimensions,
batch_size, batch_size,
timeout_seconds, timeout_seconds,
@ -158,9 +143,7 @@ impl EmbeddingConfig {
} }
fn detect_dimensions(model: &str) -> usize { fn detect_dimensions(model: &str) -> usize {
if model.contains("gemma") || model.contains("Gemma") { if model.contains("small") || model.contains("MiniLM") {
2048
} else if model.contains("small") || model.contains("MiniLM") {
384 384
} else if model.contains("base") || model.contains("mpnet") { } else if model.contains("base") || model.contains("mpnet") {
768 768
@ -200,27 +183,6 @@ struct LlamaCppEmbeddingItem {
// Hugging Face/SentenceTransformers format (simple array) // Hugging Face/SentenceTransformers format (simple array)
type HuggingFaceEmbeddingResponse = Vec<Vec<f32>>; type HuggingFaceEmbeddingResponse = Vec<Vec<f32>>;
// Scaleway/OpenAI-compatible format (object with data array)
#[derive(Debug, Deserialize)]
struct ScalewayEmbeddingResponse {
data: Vec<ScalewayEmbeddingData>,
#[serde(default)]
model: Option<String>,
#[serde(default)]
usage: Option<EmbeddingUsage>,
}
#[derive(Debug, Deserialize)]
struct ScalewayEmbeddingData {
embedding: Vec<f32>,
#[serde(default)]
#[allow(dead_code)]
index: usize,
#[serde(default)]
#[allow(dead_code)]
object: Option<String>,
}
// Generic embedding service format (object with embeddings key) // Generic embedding service format (object with embeddings key)
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct GenericEmbeddingResponse { struct GenericEmbeddingResponse {
@ -231,54 +193,14 @@ struct GenericEmbeddingResponse {
usage: Option<EmbeddingUsage>, usage: Option<EmbeddingUsage>,
} }
// Cloudflare AI Workers format
#[derive(Debug, Serialize)]
struct CloudflareEmbeddingRequest {
text: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct CloudflareEmbeddingResponse {
result: CloudflareResult,
success: bool,
#[serde(default)]
errors: Vec<CloudflareError>,
}
#[derive(Debug, Deserialize)]
struct CloudflareResult {
data: Vec<Vec<f32>>,
#[serde(default)]
meta: Option<CloudflareMeta>,
}
#[derive(Debug, Deserialize)]
struct CloudflareMeta {
#[serde(default)]
#[allow(dead_code)]
cost_metric_name_1: Option<String>,
#[serde(default)]
cost_metric_value_1: Option<f64>,
}
#[derive(Debug, Deserialize)]
struct CloudflareError {
#[serde(default)]
code: i32,
#[serde(default)]
message: String,
}
// Universal response wrapper - tries formats in order of likelihood // Universal response wrapper - tries formats in order of likelihood
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
enum EmbeddingResponse { enum EmbeddingResponse {
Scaleway(ScalewayEmbeddingResponse), // Scaleway/OpenAI-compatible format
OpenAI(OpenAIEmbeddingResponse), // Most common: OpenAI, Claude, etc. OpenAI(OpenAIEmbeddingResponse), // Most common: OpenAI, Claude, etc.
LlamaCpp(Vec<LlamaCppEmbeddingItem>), // llama.cpp server LlamaCpp(Vec<LlamaCppEmbeddingItem>), // llama.cpp server
HuggingFace(HuggingFaceEmbeddingResponse), // Simple array format HuggingFace(HuggingFaceEmbeddingResponse), // Simple array format
Generic(GenericEmbeddingResponse), // Generic services Generic(GenericEmbeddingResponse), // Generic services
Cloudflare(CloudflareEmbeddingResponse), // Cloudflare AI Workers
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -336,16 +258,6 @@ impl KbEmbeddingGenerator {
} }
pub async fn check_health(&self) -> bool { pub async fn check_health(&self) -> bool {
// For HuggingFace and other remote APIs, skip health check and mark as ready
// These APIs don't have a /health endpoint and we verified they work during setup
if self.config.embedding_url.contains("huggingface.co") ||
self.config.embedding_url.contains("api.") ||
self.config.embedding_key.is_some() {
info!("Remote embedding API detected ({}), marking as ready", self.config.embedding_url);
set_embedding_server_ready(true);
return true;
}
let health_url = format!("{}/health", self.config.embedding_url); let health_url = format!("{}/health", self.config.embedding_url);
match tokio::time::timeout( match tokio::time::timeout(
@ -360,7 +272,6 @@ impl KbEmbeddingGenerator {
is_healthy is_healthy
} }
Ok(Err(e)) => { Ok(Err(e)) => {
warn!("Health check failed for primary URL: {}", e);
let alt_url = &self.config.embedding_url; let alt_url = &self.config.embedding_url;
match tokio::time::timeout( match tokio::time::timeout(
Duration::from_secs(5), Duration::from_secs(5),
@ -373,18 +284,14 @@ impl KbEmbeddingGenerator {
} }
is_healthy is_healthy
} }
Ok(Err(_)) => { _ => {
set_embedding_server_ready(false); warn!("Health check failed: {}", e);
false
}
Err(_) => {
set_embedding_server_ready(false);
false false
} }
} }
} }
Err(_) => { Err(_) => {
set_embedding_server_ready(false); warn!("Health check timed out");
false false
} }
} }
@ -404,17 +311,12 @@ impl KbEmbeddingGenerator {
} }
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
} }
warn!("Embedding server not available after {}s", max_wait_secs); warn!("Embedding server not available after {}s", max_wait_secs);
false false
} }
/// Get the configured embedding dimensions
pub fn get_dimensions(&self) -> usize {
self.config.dimensions
}
pub async fn generate_embeddings( pub async fn generate_embeddings(
&self, &self,
chunks: &[TextChunk], chunks: &[TextChunk],
) -> Result<Vec<(TextChunk, Embedding)>> { ) -> Result<Vec<(TextChunk, Embedding)>> {
@ -454,14 +356,12 @@ impl KbEmbeddingGenerator {
Ok(Ok(embeddings)) => embeddings, Ok(Ok(embeddings)) => embeddings,
Ok(Err(e)) => { Ok(Err(e)) => {
warn!("Batch {} failed: {}", batch_num + 1, e); warn!("Batch {} failed: {}", batch_num + 1, e);
// Continue with next batch instead of breaking completely break;
continue;
} }
Err(_) => { Err(_) => {
warn!("Batch {} timed out after {}s", warn!("Batch {} timed out after {}s",
batch_num + 1, self.config.timeout_seconds); batch_num + 1, self.config.timeout_seconds);
// Continue with next batch instead of breaking completely break;
continue;
} }
}; };
@ -529,148 +429,25 @@ impl KbEmbeddingGenerator {
.map(|text| crate::core::shared::utils::truncate_text_for_model(text, &self.config.embedding_model, 600)) .map(|text| crate::core::shared::utils::truncate_text_for_model(text, &self.config.embedding_model, 600))
.collect(); .collect();
// Detect API format based on URL pattern let request = EmbeddingRequest {
// Cloudflare AI: https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/@cf/baai/bge-m3 input: truncated_texts,
// Scaleway (OpenAI-compatible): https://router.huggingface.co/scaleway/v1/embeddings model: self.config.embedding_model.clone(),
// HuggingFace Inference (old): https://router.huggingface.co/hf-inference/models/.../pipeline/feature-extraction
let is_cloudflare = self.config.embedding_url.contains("api.cloudflare.com/client/v4/accounts");
let is_scaleway = self.config.embedding_url.contains("/scaleway/v1/embeddings");
let is_hf_inference = self.config.embedding_url.contains("/hf-inference/") ||
self.config.embedding_url.contains("/pipeline/feature-extraction");
let response = if is_cloudflare {
// Cloudflare AI Workers API format: {"text": ["text1", "text2", ...]}
let cf_request = CloudflareEmbeddingRequest {
text: truncated_texts,
};
let request_size = serde_json::to_string(&cf_request)
.map(|s| s.len())
.unwrap_or(0);
trace!("Sending Cloudflare AI request to {} (size: {} bytes)",
self.config.embedding_url, request_size);
let mut request_builder = self.client
.post(&self.config.embedding_url)
.json(&cf_request);
// Add Authorization header if API key is provided
if let Some(ref api_key) = self.config.embedding_key {
request_builder = request_builder.header("Authorization", format!("Bearer {}", api_key));
}
request_builder
.send()
.await
.context("Failed to send request to Cloudflare AI embedding service")?
} else if is_hf_inference {
// HuggingFace Inference API (old format): {"inputs": "text"}
// Process one text at a time for HuggingFace Inference
let mut all_embeddings = Vec::new();
for text in &truncated_texts {
let hf_request = serde_json::json!({
"inputs": text
});
let request_size = serde_json::to_string(&hf_request)
.map(|s| s.len())
.unwrap_or(0);
trace!("Sending HuggingFace Inference request to {} (size: {} bytes)",
self.config.embedding_url, request_size);
let mut request_builder = self.client
.post(&self.config.embedding_url)
.json(&hf_request);
// Add Authorization header if API key is provided
if let Some(ref api_key) = self.config.embedding_key {
request_builder = request_builder.header("Authorization", format!("Bearer {}", api_key));
}
let resp = request_builder
.send()
.await
.context("Failed to send request to HuggingFace Inference embedding service")?;
let status = resp.status();
if !status.is_success() {
let error_bytes = resp.bytes().await.unwrap_or_default();
let error_text = String::from_utf8_lossy(&error_bytes[..error_bytes.len().min(1024)]);
return Err(anyhow::anyhow!(
"HuggingFace Inference embedding service error {}: {}",
status,
error_text
));
}
let response_bytes = resp.bytes().await
.context("Failed to read HuggingFace Inference embedding response bytes")?;
trace!("Received HuggingFace Inference response: {} bytes", response_bytes.len());
if response_bytes.len() > 50 * 1024 * 1024 {
return Err(anyhow::anyhow!(
"Embedding response too large: {} bytes (max 50MB)",
response_bytes.len()
));
}
// Parse HuggingFace Inference response (single array for single input)
let embedding_vec: Vec<f32> = serde_json::from_slice(&response_bytes)
.with_context(|| {
let preview = std::str::from_utf8(&response_bytes)
.map(|s| if s.len() > 200 { &s[..200] } else { s })
.unwrap_or("<invalid utf8>");
format!("Failed to parse HuggingFace Inference embedding response. Preview: {}", preview)
})?;
all_embeddings.push(Embedding {
vector: embedding_vec,
dimensions: self.config.dimensions,
model: self.config.embedding_model.clone(),
tokens_used: None,
});
}
return Ok(all_embeddings);
} else {
// Standard embedding service format (OpenAI-compatible, Scaleway, llama.cpp, local server, etc.)
// This includes Scaleway which uses OpenAI-compatible format: {"input": [texts], "model": "model-name"}
let request = EmbeddingRequest {
input: truncated_texts,
model: self.config.embedding_model.clone(),
};
let request_size = serde_json::to_string(&request)
.map(|s| s.len())
.unwrap_or(0);
// Log the API format being used
if is_scaleway {
trace!("Sending Scaleway (OpenAI-compatible) request to {} (size: {} bytes)",
self.config.embedding_url, request_size);
} else {
trace!("Sending standard embedding request to {} (size: {} bytes)",
self.config.embedding_url, request_size);
}
// Build request
let mut request_builder = self.client
.post(&self.config.embedding_url)
.json(&request);
// Add Authorization header if API key is provided (for Scaleway, OpenAI, etc.)
if let Some(ref api_key) = self.config.embedding_key {
request_builder = request_builder.header("Authorization", format!("Bearer {}", api_key));
}
request_builder
.send()
.await
.context("Failed to send request to embedding service")?
}; };
let request_size = serde_json::to_string(&request)
.map(|s| s.len())
.unwrap_or(0);
trace!("Sending request to {} (size: {} bytes)",
self.config.embedding_url, request_size);
let response = self
.client
.post(format!("{}/embedding", self.config.embedding_url))
.json(&request)
.send()
.await
.context("Failed to send request to embedding service")?;
let status = response.status(); let status = response.status();
if !status.is_success() { if !status.is_success() {
let error_bytes = response.bytes().await.unwrap_or_default(); let error_bytes = response.bytes().await.unwrap_or_default();
@ -755,38 +532,6 @@ impl KbEmbeddingGenerator {
} }
embeddings embeddings
} }
EmbeddingResponse::Scaleway(scaleway_response) => {
let mut embeddings = Vec::with_capacity(scaleway_response.data.len());
for data in scaleway_response.data {
embeddings.push(Embedding {
vector: data.embedding,
dimensions: self.config.dimensions,
model: scaleway_response.model.clone().unwrap_or_else(|| self.config.embedding_model.clone()),
tokens_used: scaleway_response.usage.as_ref().map(|u| u.total_tokens),
});
}
embeddings
}
EmbeddingResponse::Cloudflare(cf_response) => {
if !cf_response.success {
let error_msg = cf_response.errors.first()
.map(|e| format!("{}: {}", e.code, e.message))
.unwrap_or_else(|| "Unknown Cloudflare error".to_string());
return Err(anyhow::anyhow!("Cloudflare AI error: {}", error_msg));
}
let mut embeddings = Vec::with_capacity(cf_response.result.data.len());
for embedding_vec in cf_response.result.data {
embeddings.push(Embedding {
vector: embedding_vec,
dimensions: self.config.dimensions,
model: self.config.embedding_model.clone(),
tokens_used: cf_response.result.meta.as_ref().and_then(|m| {
m.cost_metric_value_1.map(|v| v as usize)
}),
});
}
embeddings
}
}; };
Ok(embeddings) Ok(embeddings)

View file

@ -22,7 +22,7 @@ pub struct QdrantConfig {
impl Default for QdrantConfig { impl Default for QdrantConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
url: "http://localhost:6333".to_string(), url: "https://localhost:6333".to_string(),
api_key: None, api_key: None,
timeout_secs: 30, timeout_secs: 30,
} }
@ -33,8 +33,8 @@ impl QdrantConfig {
pub fn from_config(pool: DbPool, bot_id: &Uuid) -> Self { pub fn from_config(pool: DbPool, bot_id: &Uuid) -> Self {
let config_manager = ConfigManager::new(pool); let config_manager = ConfigManager::new(pool);
let url = config_manager let url = config_manager
.get_config(bot_id, "vectordb-url", Some("http://localhost:6333")) .get_config(bot_id, "vectordb-url", Some("https://localhost:6333"))
.unwrap_or_else(|_| "http://localhost:6333".to_string()); .unwrap_or_else(|_| "https://localhost:6333".to_string());
Self { Self {
url, url,
api_key: None, api_key: None,
@ -173,7 +173,7 @@ impl KbIndexer {
// Process documents in iterator to avoid keeping all in memory // Process documents in iterator to avoid keeping all in memory
let doc_iter = documents.into_iter(); let doc_iter = documents.into_iter();
for (doc_path, chunks) in doc_iter { for (doc_path, chunks) in doc_iter {
if chunks.is_empty() { if chunks.is_empty() {
debug!("Skipping document with no chunks: {}", doc_path); debug!("Skipping document with no chunks: {}", doc_path);
@ -187,23 +187,23 @@ impl KbIndexer {
let (processed, chunks_count) = self.process_document_batch(&collection_name, &mut batch_docs).await?; let (processed, chunks_count) = self.process_document_batch(&collection_name, &mut batch_docs).await?;
indexed_documents += processed; indexed_documents += processed;
total_chunks += chunks_count; total_chunks += chunks_count;
// Clear batch and force memory cleanup // Clear batch and force memory cleanup
batch_docs.clear(); batch_docs.clear();
batch_docs.shrink_to_fit(); batch_docs.shrink_to_fit();
// Yield control to prevent blocking // Yield control to prevent blocking
tokio::task::yield_now().await; tokio::task::yield_now().await;
// Memory pressure check - more aggressive // Memory pressure check - more aggressive
let current_mem = MemoryStats::current(); let current_mem = MemoryStats::current();
if current_mem.rss_bytes > 1_500_000_000 { // 1.5GB threshold (reduced) if current_mem.rss_bytes > 1_500_000_000 { // 1.5GB threshold (reduced)
warn!("High memory usage detected: {}, forcing cleanup", warn!("High memory usage detected: {}, forcing cleanup",
MemoryStats::format_bytes(current_mem.rss_bytes)); MemoryStats::format_bytes(current_mem.rss_bytes));
// Force garbage collection hint // Force garbage collection hint
std::hint::black_box(&batch_docs); std::hint::black_box(&batch_docs);
// Add delay to allow memory cleanup // Add delay to allow memory cleanup
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
} }
@ -263,10 +263,10 @@ impl KbIndexer {
// Process chunks in smaller sub-batches to prevent memory exhaustion // Process chunks in smaller sub-batches to prevent memory exhaustion
const CHUNK_BATCH_SIZE: usize = 20; // Process 20 chunks at a time const CHUNK_BATCH_SIZE: usize = 20; // Process 20 chunks at a time
let chunk_batches = chunks.chunks(CHUNK_BATCH_SIZE); let chunk_batches = chunks.chunks(CHUNK_BATCH_SIZE);
for chunk_batch in chunk_batches { for chunk_batch in chunk_batches {
trace!("Processing chunk batch of {} chunks", chunk_batch.len()); trace!("Processing chunk batch of {} chunks", chunk_batch.len());
let embeddings = match self let embeddings = match self
.embedding_generator .embedding_generator
.generate_embeddings(chunk_batch) .generate_embeddings(chunk_batch)
@ -281,7 +281,7 @@ impl KbIndexer {
let points = Self::create_qdrant_points(&doc_path, embeddings)?; let points = Self::create_qdrant_points(&doc_path, embeddings)?;
self.upsert_points(collection_name, points).await?; self.upsert_points(collection_name, points).await?;
// Yield control between chunk batches // Yield control between chunk batches
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
@ -293,7 +293,7 @@ impl KbIndexer {
total_chunks += chunks.len(); total_chunks += chunks.len();
processed_count += 1; processed_count += 1;
// Force memory cleanup after each document // Force memory cleanup after each document
std::hint::black_box(&chunks); std::hint::black_box(&chunks);
} }
@ -303,54 +303,19 @@ impl KbIndexer {
async fn ensure_collection_exists(&self, collection_name: &str) -> Result<()> { async fn ensure_collection_exists(&self, collection_name: &str) -> Result<()> {
let check_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name); let check_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name);
let required_dims = self.embedding_generator.get_dimensions();
let response = self.http_client.get(&check_url).send().await?; let response = self.http_client.get(&check_url).send().await?;
if response.status().is_success() { if response.status().is_success() {
// Check if the existing collection has the correct vector size info!("Collection {} already exists", collection_name);
let info_json: serde_json::Value = response.json().await?; return Ok(());
let existing_dims = info_json["result"]["config"]["params"]["vectors"]["size"]
.as_u64()
.map(|d| d as usize);
match existing_dims {
Some(dims) if dims == required_dims => {
trace!("Collection {} already exists with correct dims ({})", collection_name, required_dims);
return Ok(());
}
Some(dims) => {
warn!(
"Collection {} exists with dim={} but embedding requires dim={}. \
Recreating collection.",
collection_name, dims, required_dims
);
// Delete the stale collection so we can recreate it
let delete_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name);
let del_resp = self.http_client.delete(&delete_url).send().await?;
if !del_resp.status().is_success() {
let err = del_resp.text().await.unwrap_or_default();
return Err(anyhow::anyhow!(
"Failed to delete stale collection {}: {}",
collection_name, err
));
}
info!("Deleted stale collection {} (was dim={})", collection_name, dims);
}
None => {
// Could not read dims recreate to be safe
warn!("Could not read dims for collection {}, recreating", collection_name);
let delete_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name);
let _ = self.http_client.delete(&delete_url).send().await;
}
}
} }
info!("Creating collection {} with dim={}", collection_name, required_dims); info!("Creating collection: {}", collection_name);
let config = CollectionConfig { let config = CollectionConfig {
vectors: VectorConfig { vectors: VectorConfig {
size: required_dims, size: 384,
distance: "Cosine".to_string(), distance: "Cosine".to_string(),
}, },
replication_factor: 1, replication_factor: 1,
@ -520,44 +485,16 @@ impl KbIndexer {
query: &str, query: &str,
limit: usize, limit: usize,
) -> Result<Vec<SearchResult>> { ) -> Result<Vec<SearchResult>> {
// Get the collection's actual vector dimension to handle dimension mismatch
let collection_dimension = self.get_collection_vector_dimension(collection_name).await?;
let embedding = self let embedding = self
.embedding_generator .embedding_generator
.generate_single_embedding(query) .generate_single_embedding(query)
.await?; .await?;
// Truncate embedding vector to match collection dimension if needed
let search_vector = if let Some(target_dim) = collection_dimension {
if embedding.vector.len() > target_dim {
debug!(
"Truncating embedding from {} to {} dimensions for collection '{}'",
embedding.vector.len(), target_dim, collection_name
);
embedding.vector[..target_dim].to_vec()
} else if embedding.vector.len() < target_dim {
warn!(
"Embedding dimension ({}) is smaller than collection dimension ({}). \
Search may return poor results for collection '{}'.",
embedding.vector.len(), target_dim, collection_name
);
// Pad with zeros (not ideal but allows search to proceed)
let mut padded = embedding.vector.clone();
padded.resize(target_dim, 0.0);
padded
} else {
embedding.vector
}
} else {
embedding.vector
};
let search_request = SearchRequest { let search_request = SearchRequest {
vector: search_vector, vector: embedding.vector,
limit, limit,
with_payload: true, with_payload: true,
score_threshold: Some(0.3), score_threshold: Some(0.5),
filter: None, filter: None,
}; };
@ -628,31 +565,6 @@ impl KbIndexer {
Ok(()) Ok(())
} }
/// Get the vector dimension of a collection from Qdrant
async fn get_collection_vector_dimension(&self, collection_name: &str) -> Result<Option<usize>> {
let info_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name);
let response = match self.http_client.get(&info_url).send().await {
Ok(r) => r,
Err(e) => {
debug!("Failed to get collection dimension: {}", e);
return Ok(None);
}
};
if !response.status().is_success() {
debug!("Collection '{}' not found or error, using default dimension", collection_name);
return Ok(None);
}
let info_json: serde_json::Value = response.json().await?;
let dimension = info_json["result"]["config"]["params"]["vectors"]["size"]
.as_u64()
.map(|d| d as usize);
Ok(dimension)
}
pub async fn get_collection_info(&self, collection_name: &str) -> Result<CollectionInfo> { pub async fn get_collection_info(&self, collection_name: &str) -> Result<CollectionInfo> {
let info_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name); let info_url = format!("{}/collections/{}", self.qdrant_config.url, collection_name);

View file

@ -1,12 +1,10 @@
use crate::core::config::ConfigManager; use crate::core::config::ConfigManager;
use crate::core::kb::web_crawler::{WebCrawler, WebsiteCrawlConfig}; use crate::core::kb::web_crawler::{WebCrawler, WebsiteCrawlConfig};
use crate::core::kb::embedding_generator::EmbeddingConfig; use crate::core::kb::KnowledgeBaseManager;
use crate::core::kb::kb_indexer::{KbIndexer, QdrantConfig};
use crate::core::shared::state::AppState; use crate::core::shared::state::AppState;
use crate::core::shared::utils::DbPool; use crate::core::shared::utils::DbPool;
use diesel::prelude::*; use diesel::prelude::*;
use log::{error, info, trace, warn}; use log::{error, trace, warn};
use regex; use regex;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
@ -16,15 +14,17 @@ use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub struct WebsiteCrawlerService { pub struct WebsiteCrawlerService {
db_pool: DbPool, db_pool: DbPool,
kb_manager: Arc<KnowledgeBaseManager>,
check_interval: Duration, check_interval: Duration,
running: Arc<tokio::sync::RwLock<bool>>, running: Arc<tokio::sync::RwLock<bool>>,
active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>, active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>,
} }
impl WebsiteCrawlerService { impl WebsiteCrawlerService {
pub fn new(db_pool: DbPool) -> Self { pub fn new(db_pool: DbPool, kb_manager: Arc<KnowledgeBaseManager>) -> Self {
Self { Self {
db_pool, db_pool,
kb_manager,
check_interval: Duration::from_secs(60), check_interval: Duration::from_secs(60),
running: Arc::new(tokio::sync::RwLock::new(false)), running: Arc::new(tokio::sync::RwLock::new(false)),
active_crawls: Arc::new(tokio::sync::RwLock::new(HashSet::new())), active_crawls: Arc::new(tokio::sync::RwLock::new(HashSet::new())),
@ -37,20 +37,6 @@ impl WebsiteCrawlerService {
tokio::spawn(async move { tokio::spawn(async move {
trace!("Website crawler service started"); trace!("Website crawler service started");
// Reset any rows stuck at crawl_status=2 (in-progress) from a previous
// crash or restart — they are no longer actually being crawled.
if let Ok(mut conn) = service.db_pool.get() {
let reset = diesel::sql_query(
"UPDATE website_crawls SET crawl_status = 0 WHERE crawl_status = 2"
)
.execute(&mut conn);
match reset {
Ok(n) if n > 0 => info!("Reset {} stale in-progress crawl(s) to pending", n),
Ok(_) => {}
Err(e) => warn!("Could not reset stale crawl statuses: {}", e),
}
}
let mut ticker = interval(service.check_interval); let mut ticker = interval(service.check_interval);
loop { loop {
@ -125,12 +111,13 @@ impl WebsiteCrawlerService {
.execute(&mut conn)?; .execute(&mut conn)?;
// Process one website at a time to control memory usage // Process one website at a time to control memory usage
let kb_manager = Arc::clone(&self.kb_manager);
let db_pool = self.db_pool.clone(); let db_pool = self.db_pool.clone();
let active_crawls = Arc::clone(&self.active_crawls); let active_crawls = Arc::clone(&self.active_crawls);
trace!("Processing website: {}", website.url); trace!("Processing website: {}", website.url);
match Self::crawl_website(website, db_pool, active_crawls).await { match Self::crawl_website(website, kb_manager, db_pool, active_crawls).await {
Ok(_) => { Ok(_) => {
trace!("Successfully processed website crawl"); trace!("Successfully processed website crawl");
} }
@ -151,6 +138,7 @@ impl WebsiteCrawlerService {
async fn crawl_website( async fn crawl_website(
website: WebsiteCrawlRecord, website: WebsiteCrawlRecord,
kb_manager: Arc<KnowledgeBaseManager>,
db_pool: DbPool, db_pool: DbPool,
active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>, active_crawls: Arc<tokio::sync::RwLock<HashSet<String>>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
@ -164,21 +152,19 @@ impl WebsiteCrawlerService {
active.insert(website.url.clone()); active.insert(website.url.clone());
} }
// Ensure cleanup on exit
let url_for_cleanup = website.url.clone(); let url_for_cleanup = website.url.clone();
let active_crawls_cleanup = Arc::clone(&active_crawls); let active_crawls_cleanup = Arc::clone(&active_crawls);
// Always remove from active_crawls at the end, regardless of success or error. // Manual cleanup instead of scopeguard
let result = Self::do_crawl_website(website, db_pool).await; let cleanup = || {
let url = url_for_cleanup.clone();
let active = Arc::clone(&active_crawls_cleanup);
tokio::spawn(async move {
active.write().await.remove(&url);
});
};
active_crawls_cleanup.write().await.remove(&url_for_cleanup);
result
}
async fn do_crawl_website(
website: WebsiteCrawlRecord,
db_pool: DbPool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
trace!("Starting crawl for website: {}", website.url); trace!("Starting crawl for website: {}", website.url);
let config_manager = ConfigManager::new(db_pool.clone()); let config_manager = ConfigManager::new(db_pool.clone());
@ -239,14 +225,6 @@ impl WebsiteCrawlerService {
tokio::fs::create_dir_all(&work_path).await?; tokio::fs::create_dir_all(&work_path).await?;
// Load embedding config: DB settings + Vault API key at gbo/{bot_name}.
let embedding_config = embedding_config_with_vault(&db_pool, &website.bot_id, &bot_name).await;
info!("Using embedding URL: {} for bot {}", embedding_config.embedding_url, bot_name);
// Create bot-specific KB indexer with correct embedding config
let qdrant_config = QdrantConfig::default();
let bot_indexer = KbIndexer::new(embedding_config, qdrant_config);
// Process pages in small batches to prevent memory exhaustion // Process pages in small batches to prevent memory exhaustion
const BATCH_SIZE: usize = 5; const BATCH_SIZE: usize = 5;
let total_pages = pages.len(); let total_pages = pages.len();
@ -281,9 +259,8 @@ impl WebsiteCrawlerService {
// Process this batch immediately to free memory // Process this batch immediately to free memory
if batch_idx == 0 || (batch_idx + 1) % 2 == 0 { if batch_idx == 0 || (batch_idx + 1) % 2 == 0 {
// Index every 2 batches to prevent memory buildup // Index every 2 batches to prevent memory buildup
match bot_indexer.index_kb_folder(&bot_name, &kb_name, &work_path).await { match kb_manager.index_kb_folder(&bot_name, &kb_name, &work_path).await {
Ok(result) => trace!("Indexed batch {} successfully: {} docs, {} chunks", Ok(_) => trace!("Indexed batch {} successfully", batch_idx + 1),
batch_idx + 1, result.documents_processed, result.chunks_indexed),
Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e), Err(e) => warn!("Failed to index batch {}: {}", batch_idx + 1, e),
} }
@ -293,7 +270,7 @@ impl WebsiteCrawlerService {
} }
// Final indexing for any remaining content // Final indexing for any remaining content
bot_indexer kb_manager
.index_kb_folder(&bot_name, &kb_name, &work_path) .index_kb_folder(&bot_name, &kb_name, &work_path)
.await?; .await?;
@ -319,7 +296,7 @@ impl WebsiteCrawlerService {
"Successfully recrawled {}, next crawl: {:?}", "Successfully recrawled {}, next crawl: {:?}",
website.url, config.next_crawl website.url, config.next_crawl
); );
Ok(()) cleanup();
} }
Err(e) => { Err(e) => {
error!("Failed to crawl {}: {}", website.url, e); error!("Failed to crawl {}: {}", website.url, e);
@ -335,19 +312,17 @@ impl WebsiteCrawlerService {
.bind::<diesel::sql_types::Uuid, _>(&website.id) .bind::<diesel::sql_types::Uuid, _>(&website.id)
.execute(&mut conn)?; .execute(&mut conn)?;
Err(e.into()) cleanup();
} }
} }
Ok(())
} }
fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { fn scan_and_register_websites_from_scripts(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
trace!("Scanning .bas files for USE WEBSITE commands"); trace!("Scanning .bas files for USE WEBSITE commands");
// Use the correct work directory path instead of plain "work" let work_dir = std::path::Path::new("work");
let work_dir = std::env::current_dir()
.unwrap_or_else(|_| std::path::PathBuf::from("."))
.join("botserver-stack/data/system/work");
if !work_dir.exists() { if !work_dir.exists() {
return Ok(()); return Ok(());
} }
@ -395,7 +370,7 @@ impl WebsiteCrawlerService {
&self, &self,
website: WebsiteCrawlRecord, website: WebsiteCrawlRecord,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::crawl_website(website, self.db_pool.clone(), Arc::clone(&self.active_crawls)).await Self::crawl_website(website, Arc::clone(&self.kb_manager), self.db_pool.clone(), Arc::clone(&self.active_crawls)).await
} }
fn scan_directory_for_websites( fn scan_directory_for_websites(
@ -404,7 +379,7 @@ impl WebsiteCrawlerService {
bot_id: uuid::Uuid, bot_id: uuid::Uuid,
conn: &mut diesel::PgConnection, conn: &mut diesel::PgConnection,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let website_regex = regex::Regex::new(r#"(?i)(?:USE\s+WEBSITE\s+"([^"]+)"(?:\s+REFRESH\s+"([^"]+)")?)|(?:USE_WEBSITE\s*\(\s*"([^"]+)"\s*(?:,\s*"([^"]+)"\s*)?\))"#)?; let website_regex = regex::Regex::new(r#"(?i)(?:USE\s+WEBSITE\s+"([^"]+)"\s+REFRESH\s+"([^"]+)")|(?:USE_WEBSITE\s*\(\s*"([^"]+)"\s*(?:,\s*"([^"]+)"\s*)?\))"#)?;
for entry in std::fs::read_dir(dir)? { for entry in std::fs::read_dir(dir)? {
let entry = entry?; let entry = entry?;
@ -496,138 +471,13 @@ fn sanitize_url_for_kb(url: &str) -> String {
.to_lowercase() .to_lowercase()
} }
/// Force recrawl a specific website immediately
/// Updates next_crawl to NOW() and triggers immediate crawl
pub async fn force_recrawl_website(
db_pool: crate::core::shared::utils::DbPool,
bot_id: uuid::Uuid,
url: String,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
use diesel::prelude::*;
let mut conn = db_pool.get()?;
// Update next_crawl to NOW() to mark for immediate recrawl
let rows_updated = diesel::sql_query(
"UPDATE website_crawls
SET next_crawl = NOW(),
crawl_status = 0,
error_message = NULL
WHERE bot_id = $1 AND url = $2"
)
.bind::<diesel::sql_types::Uuid, _>(&bot_id)
.bind::<diesel::sql_types::Text, _>(&url)
.execute(&mut conn)?;
if rows_updated == 0 {
return Err(format!("Website not found: bot_id={}, url={}", bot_id, url).into());
}
trace!("Updated next_crawl to NOW() for website: {}", url);
// Get the website record for immediate crawling
#[derive(diesel::QueryableByName)]
struct WebsiteRecord {
#[diesel(sql_type = diesel::sql_types::Uuid)]
id: uuid::Uuid,
#[diesel(sql_type = diesel::sql_types::Uuid)]
bot_id: uuid::Uuid,
#[diesel(sql_type = diesel::sql_types::Text)]
url: String,
#[diesel(sql_type = diesel::sql_types::Text)]
expires_policy: String,
#[diesel(sql_type = diesel::sql_types::Text)]
refresh_policy: String,
#[diesel(sql_type = diesel::sql_types::Integer)]
max_depth: i32,
#[diesel(sql_type = diesel::sql_types::Integer)]
max_pages: i32,
}
let website: WebsiteRecord = diesel::sql_query(
"SELECT id, bot_id, url, expires_policy, refresh_policy, max_depth, max_pages
FROM website_crawls
WHERE bot_id = $1 AND url = $2"
)
.bind::<diesel::sql_types::Uuid, _>(&bot_id)
.bind::<diesel::sql_types::Text, _>(&url)
.get_result(&mut conn)?;
// Convert to WebsiteCrawlRecord
let website_record = WebsiteCrawlRecord {
id: website.id,
bot_id: website.bot_id,
url: website.url.clone(),
expires_policy: website.expires_policy,
refresh_policy: Some(website.refresh_policy),
max_depth: website.max_depth,
max_pages: website.max_pages,
next_crawl: None,
crawl_status: Some(0),
};
// Trigger immediate crawl
let active_crawls = Arc::new(tokio::sync::RwLock::new(HashSet::new()));
trace!("Starting immediate crawl for website: {}", url);
match WebsiteCrawlerService::crawl_website(website_record, db_pool, active_crawls).await {
Ok(_) => {
let msg = format!("Successfully triggered immediate recrawl for website: {}", url);
info!("{}", msg);
Ok(msg)
}
Err(e) => {
let error_msg = format!("Failed to crawl website {}: {}", url, e);
error!("{}", error_msg);
Err(error_msg.into())
}
}
}
/// API Handler for force recrawl endpoint
/// POST /api/website/force-recrawl
/// Body: {"bot_id": "uuid", "url": "https://example.com"}
pub async fn handle_force_recrawl(
axum::extract::State(state): axum::extract::State<std::sync::Arc<AppState>>,
axum::Json(payload): axum::Json<ForceRecrawlRequest>,
) -> Result<axum::Json<serde_json::Value>, (axum::http::StatusCode, axum::Json<serde_json::Value>)> {
use crate::security::error_sanitizer::log_and_sanitize_str;
match force_recrawl_website(
state.conn.clone(),
payload.bot_id,
payload.url.clone(),
).await {
Ok(msg) => Ok(axum::Json(serde_json::json!({
"success": true,
"message": msg,
"bot_id": payload.bot_id,
"url": payload.url
}))),
Err(e) => {
let sanitized = log_and_sanitize_str(&e.to_string(), "force_recrawl", None);
Err((
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(serde_json::json!({"error": sanitized}))
))
}
}
}
/// Request payload for force recrawl endpoint
#[derive(serde::Deserialize)]
pub struct ForceRecrawlRequest {
pub bot_id: uuid::Uuid,
pub url: String,
}
pub async fn ensure_crawler_service_running( pub async fn ensure_crawler_service_running(
state: Arc<AppState>, state: Arc<AppState>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
if let Some(_kb_manager) = &state.kb_manager { if let Some(kb_manager) = &state.kb_manager {
let service = Arc::new(WebsiteCrawlerService::new( let service = Arc::new(WebsiteCrawlerService::new(
state.conn.clone(), state.conn.clone(),
Arc::clone(kb_manager),
)); ));
drop(service.start()); drop(service.start());
@ -640,33 +490,3 @@ pub async fn ensure_crawler_service_running(
Ok(()) Ok(())
} }
} }
/// Build an `EmbeddingConfig` for a bot, reading most settings from the DB
/// but **overriding the API key from Vault** (per-bot path `gbo/{bot_name}` → `embedding-key`)
/// when available. Falls back transparently to the DB value when Vault is unavailable.
async fn embedding_config_with_vault(
pool: &DbPool,
bot_id: &uuid::Uuid,
bot_name: &str,
) -> EmbeddingConfig {
// Start from the DB-backed config (URL, model, dimensions, etc.)
let mut config = EmbeddingConfig::from_bot_config(pool, bot_id);
// Try to upgrade the API key from Vault using the per-bot secret path.
if let Some(secrets) = crate::core::shared::utils::get_secrets_manager().await {
let per_bot_path = format!("gbo/{}", bot_name);
match secrets.get_value(&per_bot_path, "embedding-key").await {
Ok(key) if !key.is_empty() => {
trace!("Loaded embedding key from Vault path {} for bot {}", per_bot_path, bot_name);
config.embedding_key = Some(key);
}
Ok(_) => {} // Key present but empty — keep DB value
Err(e) => {
trace!("Vault embedding-key not found at {} ({}), using DB value", per_bot_path, e);
}
}
}
config
}

View file

@ -530,19 +530,17 @@ pub fn parse_hex_color(hex: &str) -> Option<(u8, u8, u8)> {
pub fn truncate_text_for_model(text: &str, model: &str, max_tokens: usize) -> String { pub fn truncate_text_for_model(text: &str, model: &str, max_tokens: usize) -> String {
let chars_per_token = estimate_chars_per_token(model); let chars_per_token = estimate_chars_per_token(model);
let max_chars = max_tokens * chars_per_token; let max_chars = max_tokens * chars_per_token;
if text.chars().count() <= max_chars { if text.len() <= max_chars {
return text.to_string(); text.to_string()
}
// Get first max_chars characters safely (UTF-8 aware)
let truncated: String = text.chars().take(max_chars).collect();
// Try to truncate at word boundary
if let Some(last_space_idx) = truncated.rfind(' ') {
truncated[..last_space_idx].to_string()
} else { } else {
truncated // Try to truncate at word boundary
let truncated = &text[..max_chars];
if let Some(last_space) = truncated.rfind(' ') {
text[..last_space].to_string()
} else {
truncated.to_string()
}
} }
} }

View file

@ -1,10 +1,9 @@
use git2::{Repository, Signature}; use git2::{Repository, Oid, Signature, Time};
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::Path; use std::path::{Path, PathBuf};
use super::{DeploymentError, GeneratedApp, GeneratedFile}; use super::{DeploymentError, GeneratedApp};
use super::types::{AppType, DeploymentEnvironment};
pub struct ForgejoClient { pub struct ForgejoClient {
base_url: String, base_url: String,
@ -130,20 +129,17 @@ impl ForgejoClient {
Ok(oid.to_string()) Ok(oid.to_string())
} }
/// Create CI/CD workflow for the app based on Phase 2.5 app types /// Create CI/CD workflow for the app
pub async fn create_cicd_workflow( pub async fn create_cicd_workflow(
&self, &self,
repo_url: &str, repo_url: &str,
app_type: &AppType, app_type: AppType,
environment: &DeploymentEnvironment, build_config: BuildConfig,
) -> Result<(), DeploymentError> { ) -> Result<(), DeploymentError> {
let workflow = match app_type { let workflow = match app_type {
AppType::GbNative { .. } => self.generate_gb_native_workflow(environment), AppType::Htmx => self.generate_htmx_workflow(build_config),
AppType::Custom { framework, node_version, build_command, output_directory } => { AppType::React => self.generate_react_workflow(build_config),
self.generate_custom_workflow(framework, node_version.as_deref().unwrap_or("20"), AppType::Vue => self.generate_vue_workflow(build_config),
build_command.as_deref().unwrap_or("npm run build"),
output_directory.as_deref().unwrap_or("dist"), environment)
}
}; };
// Create workflow file // Create workflow file
@ -197,11 +193,106 @@ impl ForgejoClient {
} }
} }
fn generate_htmx_workflow(&self, _config: BuildConfig) -> String {
r#"name: Deploy HTMX App
on:
push:
branches: [main, develop]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '20'
- name: Install dependencies
run: npm ci
- name: Build
run: npm run build
- name: Deploy to server
run: |
echo "Deploying HTMX app to production..."
# Add deployment commands here
"#.to_string()
}
fn generate_react_workflow(&self, _config: BuildConfig) -> String {
r#"name: Deploy React App
on:
push:
branches: [main, develop]
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Build React app
run: npm run build
- name: Run tests
run: npm test
- name: Deploy to production
run: |
echo "Deploying React app to production..."
# Add deployment commands here
"#.to_string()
}
fn generate_vue_workflow(&self, _config: BuildConfig) -> String {
r#"name: Deploy Vue App
on:
push:
branches: [main, develop]
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '20'
cache: 'npm'
- name: Install dependencies
run: npm ci
- name: Build Vue app
run: npm run build
- name: Run tests
run: npm test
- name: Deploy to production
run: |
echo "Deploying Vue app to production..."
# Add deployment commands here
"#.to_string()
}
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -227,7 +318,29 @@ struct CreateRepoRequest {
readme: Option<String>, readme: Option<String>,
} }
// AppType and related types are now defined in types.rs #[derive(Debug, Clone, Copy)]
pub enum AppType {
Htmx,
React,
Vue,
}
#[derive(Debug, Clone)]
pub struct BuildConfig {
pub node_version: String,
pub build_command: String,
pub output_dir: String,
}
impl Default for BuildConfig {
fn default() -> Self {
Self {
node_version: "20".to_string(),
build_command: "npm run build".to_string(),
output_dir: "dist".to_string(),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum ForgejoError { pub enum ForgejoError {
@ -248,90 +361,4 @@ impl std::fmt::Display for ForgejoError {
} }
} }
// =============================================================================
// CI/CD Workflow Generation for Phase 2.5
// =============================================================================
impl ForgejoClient {
/// Generate CI/CD workflow for GB Native apps
fn generate_gb_native_workflow(&self, environment: &DeploymentEnvironment) -> String {
let env_name = environment.to_string();
format!(r#"name: Deploy GB Native App
on:
push:
branches: [ main, {env_name} ]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '20'
- name: Install dependencies
run: npm ci
- name: Build app
run: npm run build
env:
NODE_ENV: production
GB_ENV: {env_name}
- name: Deploy to GB Platform
run: |
echo "Deploying to GB Platform ({env_name})"
# GB Platform deployment logic here
env:
GB_DEPLOYMENT_TOKEN: ${{{{ secrets.GB_DEPLOYMENT_TOKEN }}}}
"#)
}
/// Generate CI/CD workflow for Custom apps
fn generate_custom_workflow(&self, framework: &str, node_version: &str,
build_command: &str, output_dir: &str, environment: &DeploymentEnvironment) -> String {
let env_name = environment.to_string();
format!(r#"name: Deploy Custom {framework} App
on:
push:
branches: [ main, {env_name} ]
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: '{node_version}'
- name: Install dependencies
run: npm ci
- name: Build {framework} app
run: {build_command}
env:
NODE_ENV: production
- name: Upload build artifacts
uses: actions/upload-artifact@v3
with:
name: build-output
path: {output_dir}
- name: Deploy to custom hosting
run: |
echo "Deploying {framework} app to {env_name}"
# Custom deployment logic here
"#)
}
}
impl std::error::Error for ForgejoError {} impl std::error::Error for ForgejoError {}

View file

@ -1,175 +0,0 @@
//! API handlers for VibeCode deployment module - Phase 2.5
use axum::{
extract::State,
Json,
};
use std::sync::Arc;
use crate::core::shared::state::AppState;
use super::types::*;
use super::router::DeploymentRouter;
/// Configure deployment routes
pub fn configure_deployment_routes() -> axum::Router<Arc<AppState>> {
axum::Router::new()
.route("/api/deployment/types", axum::routing::get(get_app_types))
.route("/api/deployment/deploy", axum::routing::post(deploy_app))
}
/// Get available app types
pub async fn get_app_types(
State(_state): State<Arc<AppState>>,
) -> Result<Json<AppTypesResponse>, DeploymentApiError> {
let app_types = vec![
AppTypeInfo {
id: "gb-native".to_string(),
name: "GB Native".to_string(),
description: "Optimized for General Bots platform with shared resources".to_string(),
features: vec![
"Shared database connection pool".to_string(),
"Integrated GB authentication".to_string(),
"Shared caching layer".to_string(),
"Auto-scaling".to_string(),
"Built-in monitoring".to_string(),
"Zero configuration".to_string(),
],
},
AppTypeInfo {
id: "custom-htmx".to_string(),
name: "Custom HTMX".to_string(),
description: "HTMX-based application with custom deployment".to_string(),
features: vec![
"Lightweight frontend".to_string(),
"Server-side rendering".to_string(),
"Custom CI/CD pipeline".to_string(),
"Independent deployment".to_string(),
],
},
AppTypeInfo {
id: "custom-react".to_string(),
name: "Custom React".to_string(),
description: "React application with custom deployment".to_string(),
features: vec![
"Modern React".to_string(),
"Vite build system".to_string(),
"Custom CI/CD pipeline".to_string(),
"Independent deployment".to_string(),
],
},
AppTypeInfo {
id: "custom-vue".to_string(),
name: "Custom Vue".to_string(),
description: "Vue.js application with custom deployment".to_string(),
features: vec![
"Vue 3 composition API".to_string(),
"Vite build system".to_string(),
"Custom CI/CD pipeline".to_string(),
"Independent deployment".to_string(),
],
},
];
Ok(Json(AppTypesResponse { app_types }))
}
/// Deploy an application to Forgejo
pub async fn deploy_app(
State(_state): State<Arc<AppState>>,
Json(request): Json<DeploymentRequest>,
) -> Result<Json<DeploymentResponse>, DeploymentApiError> {
log::info!(
"Deployment request: org={:?}, app={}, type={}, env={}",
request.organization,
request.app_name,
request.app_type,
request.environment
);
// Parse app type
let app_type = match request.app_type.as_str() {
"gb-native" => AppType::GbNative {
shared_database: request.shared_database.unwrap_or(true),
shared_auth: request.shared_auth.unwrap_or(true),
shared_cache: request.shared_cache.unwrap_or(true),
},
custom_type if custom_type.starts_with("custom-") => {
let framework = request.framework.clone()
.unwrap_or_else(|| custom_type.strip_prefix("custom-").unwrap_or("unknown").to_string());
AppType::Custom {
framework,
node_version: Some("20".to_string()),
build_command: Some("npm run build".to_string()),
output_directory: Some("dist".to_string()),
}
}
_ => {
return Err(DeploymentApiError::ValidationError(format!(
"Unknown app type: {}",
request.app_type
)));
}
};
// Parse environment
let environment = match request.environment.as_str() {
"development" => DeploymentEnvironment::Development,
"staging" => DeploymentEnvironment::Staging,
"production" => DeploymentEnvironment::Production,
_ => DeploymentEnvironment::Development,
};
// Get Forgejo configuration
let forgejo_url = std::env::var("FORGEJO_URL")
.unwrap_or_else(|_| "https://alm.pragmatismo.com.br".to_string());
let forgejo_token = std::env::var("FORGEJO_TOKEN").ok();
// Get or default organization
let organization = request.organization
.or_else(|| std::env::var("FORGEJO_DEFAULT_ORG").ok())
.unwrap_or_else(|| "generalbots".to_string());
// Create deployment configuration
let config = DeploymentConfig {
organization,
app_name: request.app_name.clone(),
app_type,
environment,
custom_domain: request.custom_domain,
ci_cd_enabled: request.ci_cd_enabled.unwrap_or(true),
};
// Create deployment router
let router = DeploymentRouter::new(forgejo_url, forgejo_token);
// Create placeholder generated app
// In real implementation, this would come from the orchestrator
let generated_app = GeneratedApp::new(
config.app_name.clone(),
format!("Generated {} application", config.app_type),
);
// Execute deployment
let result = router.deploy(config, generated_app).await
.map_err(|e| DeploymentApiError::DeploymentFailed(e.to_string()))?;
log::info!(
"Deployment successful: url={}, repo={}, status={:?}",
result.url,
result.repository,
result.status
);
Ok(Json(DeploymentResponse {
success: true,
url: Some(result.url),
repository: Some(result.repository),
app_type: Some(result.app_type),
status: Some(format!("{:?}", result.status)),
error: None,
}))
}

View file

@ -1,42 +1,434 @@
//! Deployment module for VibeCode platform - Phase 2.5
//!
//! All apps are deployed to Forgejo repositories using org/app_name format.
//! Two app types: GB Native (optimized for GB platform) and Custom (any framework).
//!
//! # Architecture
//!
//! - `types` - Type definitions for deployment configuration and results
//! - `router` - Deployment router that manages the deployment process
//! - `handlers` - HTTP API handlers for deployment endpoints
//! - `forgejo` - Forgejo client for repository management
pub mod types;
pub mod router;
pub mod handlers;
pub mod forgejo; pub mod forgejo;
// Re-export commonly used types from types module use axum::{
pub use types::{ extract::State,
AppType, http::StatusCode,
DeploymentConfig, response::{IntoResponse, Response},
DeploymentEnvironment, Json,
DeploymentResult,
DeploymentStatus,
DeploymentError,
GeneratedApp,
GeneratedFile,
DeploymentRequest,
DeploymentResponse,
AppTypesResponse,
AppTypeInfo,
DeploymentApiError,
}; };
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
// Re-export deployment router use crate::core::shared::state::AppState;
pub use router::DeploymentRouter;
// Re-export route configuration function // Re-export types from forgejo module
pub use handlers::configure_deployment_routes; pub use forgejo::{AppType, BuildConfig, ForgejoClient, ForgejoError, ForgejoRepo};
// Re-export Forgejo types #[derive(Debug, Clone, Serialize, Deserialize)]
pub use forgejo::{ForgejoClient, ForgejoError, ForgejoRepo}; pub enum DeploymentTarget {
/// Serve from GB platform (/apps/{name})
Internal {
route: String,
shared_resources: bool,
},
/// Deploy to external Forgejo repository
External {
repo_url: String,
custom_domain: Option<String>,
ci_cd_enabled: bool,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentConfig {
pub app_name: String,
pub target: DeploymentTarget,
pub environment: DeploymentEnvironment,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeploymentEnvironment {
Development,
Staging,
Production,
}
pub struct DeploymentRouter {
forgejo_url: String,
forgejo_token: Option<String>,
internal_base_path: PathBuf,
}
impl DeploymentRouter {
pub fn new(
forgejo_url: String,
forgejo_token: Option<String>,
internal_base_path: PathBuf,
) -> Self {
Self {
forgejo_url,
forgejo_token,
internal_base_path,
}
}
/// Route deployment based on target type
pub async fn deploy(
&self,
config: DeploymentConfig,
generated_app: GeneratedApp,
) -> Result<DeploymentResult, DeploymentError> {
match config.target {
DeploymentTarget::Internal { route, .. } => {
self.deploy_internal(route, generated_app).await
}
DeploymentTarget::External { ref repo_url, .. } => {
self.deploy_external(repo_url, generated_app).await
}
}
}
/// Deploy internally to GB platform
async fn deploy_internal(
&self,
route: String,
app: GeneratedApp,
) -> Result<DeploymentResult, DeploymentError> {
// 1. Store files in Drive
// 2. Register route in app router
// 3. Create API endpoints
// 4. Return deployment URL
let url = format!("/apps/{}/", route);
Ok(DeploymentResult {
url,
deployment_type: "internal".to_string(),
status: DeploymentStatus::Deployed,
metadata: serde_json::json!({
"route": route,
"platform": "gb",
}),
})
}
/// Deploy externally to Forgejo
async fn deploy_external(
&self,
repo_url: &str,
app: GeneratedApp,
) -> Result<DeploymentResult, DeploymentError> {
// 1. Initialize git repo
// 2. Add Forgejo remote
// 3. Push generated files
// 4. Create CI/CD workflow
// 5. Trigger build
Ok(DeploymentResult {
url: repo_url.to_string(),
deployment_type: "external".to_string(),
status: DeploymentStatus::Pending,
metadata: serde_json::json!({
"repo_url": repo_url,
"forgejo": self.forgejo_url,
}),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentResult {
pub url: String,
pub deployment_type: String,
pub status: DeploymentStatus,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DeploymentStatus {
Pending,
Building,
Deployed,
Failed,
}
#[derive(Debug)]
pub enum DeploymentError {
InternalDeploymentError(String),
ForgejoError(String),
GitError(String),
CiCdError(String),
}
impl std::fmt::Display for DeploymentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeploymentError::InternalDeploymentError(msg) => {
write!(f, "Internal deployment error: {}", msg)
}
DeploymentError::ForgejoError(msg) => write!(f, "Forgejo error: {}", msg),
DeploymentError::GitError(msg) => write!(f, "Git error: {}", msg),
DeploymentError::CiCdError(msg) => write!(f, "CI/CD error: {}", msg),
}
}
}
impl std::error::Error for DeploymentError {}
impl From<ForgejoError> for DeploymentError {
fn from(err: ForgejoError) -> Self {
DeploymentError::ForgejoError(err.to_string())
}
}
#[derive(Debug, Clone)]
pub struct GeneratedApp {
pub name: String,
pub description: String,
pub files: Vec<GeneratedFile>,
}
#[derive(Debug, Clone)]
pub struct GeneratedFile {
pub path: String,
pub content: Vec<u8>,
}
impl GeneratedApp {
pub fn temp_dir(&self) -> Result<PathBuf, DeploymentError> {
let temp_dir = std::env::temp_dir()
.join("gb-deployments")
.join(&self.name);
Ok(temp_dir)
}
pub fn new(name: String, description: String) -> Self {
Self {
name,
description,
files: Vec::new(),
}
}
pub fn add_file(&mut self, path: String, content: Vec<u8>) {
self.files.push(GeneratedFile { path, content });
}
pub fn add_text_file(&mut self, path: String, content: String) {
self.add_file(path, content.into_bytes());
}
}
// =============================================================================
// API Types and Handlers
// =============================================================================
#[derive(Debug, Deserialize)]
pub struct DeploymentRequest {
pub app_name: String,
pub target: String,
pub environment: String,
pub manifest: serde_json::Value,
}
#[derive(Debug, Serialize)]
pub struct DeploymentResponse {
pub success: bool,
pub url: Option<String>,
pub deployment_type: Option<String>,
pub status: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct DeploymentTargetsResponse {
pub targets: Vec<DeploymentTargetInfo>,
}
#[derive(Debug, Serialize)]
pub struct DeploymentTargetInfo {
pub id: String,
pub name: String,
pub description: String,
pub features: Vec<String>,
}
/// Configure deployment routes
pub fn configure_deployment_routes() -> axum::Router<Arc<AppState>> {
axum::Router::new()
.route("/api/deployment/targets", axum::routing::get(get_deployment_targets))
.route("/api/deployment/deploy", axum::routing::post(deploy_app))
}
/// Get available deployment targets
pub async fn get_deployment_targets(
State(_state): State<Arc<AppState>>,
) -> Result<Json<DeploymentTargetsResponse>, DeploymentApiError> {
let targets = vec![
DeploymentTargetInfo {
id: "internal".to_string(),
name: "GB Platform".to_string(),
description: "Deploy internally to General Bots platform".to_string(),
features: vec![
"Instant deployment".to_string(),
"Shared resources".to_string(),
"Auto-scaling".to_string(),
"Built-in monitoring".to_string(),
"Zero configuration".to_string(),
],
},
DeploymentTargetInfo {
id: "external".to_string(),
name: "Forgejo ALM".to_string(),
description: "Deploy to external Git repository with CI/CD".to_string(),
features: vec![
"Git-based deployment".to_string(),
"Custom domains".to_string(),
"CI/CD pipelines".to_string(),
"Version control".to_string(),
"Team collaboration".to_string(),
],
},
];
Ok(Json(DeploymentTargetsResponse { targets }))
}
/// Deploy an application
pub async fn deploy_app(
State(state): State<Arc<AppState>>,
Json(request): Json<DeploymentRequest>,
) -> Result<Json<DeploymentResponse>, DeploymentApiError> {
log::info!(
"Deployment request received: app={}, target={}, env={}",
request.app_name,
request.target,
request.environment
);
// Parse deployment target
let target = match request.target.as_str() {
"internal" => {
let route = request.manifest
.get("route")
.and_then(|v| v.as_str())
.unwrap_or(&request.app_name)
.to_string();
let shared_resources = request.manifest
.get("shared_resources")
.and_then(|v| v.as_bool())
.unwrap_or(true);
DeploymentTarget::Internal {
route,
shared_resources,
}
}
"external" => {
let repo_url = request.manifest
.get("repo_url")
.and_then(|v| v.as_str())
.ok_or_else(|| DeploymentApiError::ValidationError("repo_url is required for external deployment".to_string()))?
.to_string();
let custom_domain = request.manifest
.get("custom_domain")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let ci_cd_enabled = request.manifest
.get("ci_cd_enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
DeploymentTarget::External {
repo_url,
custom_domain,
ci_cd_enabled,
}
}
_ => {
return Err(DeploymentApiError::ValidationError(format!(
"Unknown deployment target: {}",
request.target
)));
}
};
// Parse environment
let environment = match request.environment.as_str() {
"development" => DeploymentEnvironment::Development,
"staging" => DeploymentEnvironment::Staging,
"production" => DeploymentEnvironment::Production,
_ => DeploymentEnvironment::Development,
};
// Create deployment configuration
let config = DeploymentConfig {
app_name: request.app_name.clone(),
target,
environment,
};
// Get Forgejo configuration from environment
let forgejo_url = std::env::var("FORGEJO_URL")
.unwrap_or_else(|_| "https://alm.pragmatismo.com.br".to_string());
let forgejo_token = std::env::var("FORGEJO_TOKEN").ok();
// Create deployment router
let internal_base_path = std::path::PathBuf::from("/opt/gbo/data/apps");
let router = DeploymentRouter::new(forgejo_url, forgejo_token, internal_base_path);
// Create a placeholder generated app
// In real implementation, this would come from the orchestrator
let generated_app = GeneratedApp::new(
config.app_name.clone(),
"Generated application".to_string(),
);
// Execute deployment
let result = router.deploy(config, generated_app).await
.map_err(|e| DeploymentApiError::DeploymentFailed(e.to_string()))?;
log::info!(
"Deployment successful: url={}, type={}, status={:?}",
result.url,
result.deployment_type,
result.status
);
Ok(Json(DeploymentResponse {
success: true,
url: Some(result.url),
deployment_type: Some(result.deployment_type),
status: Some(format!("{:?}", result.status)),
error: None,
}))
}
#[derive(Debug)]
pub enum DeploymentApiError {
ValidationError(String),
DeploymentFailed(String),
InternalError(String),
}
impl IntoResponse for DeploymentApiError {
fn into_response(self) -> Response {
use crate::security::error_sanitizer::log_and_sanitize;
let (status, message) = match self {
DeploymentApiError::ValidationError(msg) => {
(StatusCode::BAD_REQUEST, msg)
}
DeploymentApiError::DeploymentFailed(msg) => {
let sanitized = log_and_sanitize(&msg, "deployment", None);
(StatusCode::INTERNAL_SERVER_ERROR, sanitized)
}
DeploymentApiError::InternalError(msg) => {
let sanitized = log_and_sanitize(&msg, "deployment", None);
(StatusCode::INTERNAL_SERVER_ERROR, sanitized)
}
};
let body = Json(serde_json::json!({
"success": false,
"error": message,
}));
(status, body).into_response()
}
}

View file

@ -1,102 +0,0 @@
//! Deployment router for VibeCode platform - Phase 2.5
//!
//! All apps are deployed to Forgejo repositories using org/app_name format.
use super::types::*;
use super::forgejo::ForgejoClient;
/// Deployment router - all apps go to Forgejo
pub struct DeploymentRouter {
forgejo_url: String,
forgejo_token: Option<String>,
}
impl DeploymentRouter {
pub fn new(forgejo_url: String, forgejo_token: Option<String>) -> Self {
Self {
forgejo_url,
forgejo_token,
}
}
/// Deploy to Forgejo repository (org/app_name)
pub async fn deploy(
&self,
config: DeploymentConfig,
generated_app: GeneratedApp,
) -> Result<DeploymentResult, DeploymentError> {
log::info!(
"Deploying {} app: {}/{} to {} environment",
config.app_type,
config.organization,
config.app_name,
config.environment
);
// Get or create Forgejo client
let token = self.forgejo_token.clone()
.ok_or_else(|| DeploymentError::ConfigurationError("FORGEJO_TOKEN not configured".to_string()))?;
let client = ForgejoClient::new(self.forgejo_url.clone(), token);
// Create repository if it doesn't exist
let repo = client.create_repository(
&config.app_name,
&generated_app.description,
false, // public repo
).await?;
log::info!("Repository created/verified: {}", repo.clone_url);
// Push app to repository
let branch = config.environment.to_string();
client.push_app(&repo.clone_url, &generated_app, &branch).await?;
// Create CI/CD workflow if enabled
if config.ci_cd_enabled {
client.create_cicd_workflow(&repo.clone_url, &config.app_type, &config.environment).await?;
log::info!("CI/CD workflow created for {}/{}", config.organization, config.app_name);
}
// Build deployment URL
let url = self.build_deployment_url(&config);
Ok(DeploymentResult {
url,
repository: repo.clone_url,
app_type: config.app_type.to_string(),
environment: config.environment.to_string(),
status: if config.ci_cd_enabled {
DeploymentStatus::Building
} else {
DeploymentStatus::Deployed
},
metadata: serde_json::json!({
"org": config.organization,
"app_name": config.app_name,
"repo_id": repo.id,
"forgejo_url": self.forgejo_url,
"custom_domain": config.custom_domain,
}),
})
}
/// Build deployment URL based on environment and domain
fn build_deployment_url(&self, config: &DeploymentConfig) -> String {
if let Some(ref domain) = config.custom_domain {
format!("https://{}/", domain)
} else {
match config.environment {
DeploymentEnvironment::Production => {
format!("https://{}.gb.solutions/", config.app_name)
}
DeploymentEnvironment::Staging => {
format!("https://{}-staging.gb.solutions/", config.app_name)
}
DeploymentEnvironment::Development => {
format!("https://{}-dev.gb.solutions/", config.app_name)
}
}
}
}
}

View file

@ -1,270 +0,0 @@
//! Type definitions for VibeCode deployment module - Phase 2.5
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde::{Deserialize, Serialize};
use std::fmt;
// Re-export ForgejoError for From implementation
use super::forgejo::ForgejoError;
/// App type determines the deployment strategy and CI/CD workflow
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum AppType {
/// GB Native - Optimized for General Bots platform
/// Uses GB-specific features, shared resources, and optimized runtime
GbNative {
/// Use GB shared database connection pool
shared_database: bool,
/// Use GB authentication system
shared_auth: bool,
/// Use GB caching layer
shared_cache: bool,
},
/// Custom - Any framework or technology
/// Fully independent deployment with custom CI/CD
Custom {
/// Framework type: htmx, react, vue, nextjs, svelte, etc.
framework: String,
/// Node.js version for build
node_version: Option<String>,
/// Build command
build_command: Option<String>,
/// Output directory
output_directory: Option<String>,
},
}
impl Default for AppType {
fn default() -> Self {
AppType::GbNative {
shared_database: true,
shared_auth: true,
shared_cache: true,
}
}
}
impl std::fmt::Display for AppType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppType::GbNative { .. } => write!(f, "gb-native"),
AppType::Custom { framework, .. } => write!(f, "custom-{}", framework),
}
}
}
/// Deployment configuration for all apps
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentConfig {
/// Organization name (becomes part of repo: org/app_name)
pub organization: String,
/// Application name (becomes part of repo: org/app_name)
pub app_name: String,
/// App type determines deployment strategy
pub app_type: AppType,
/// Deployment environment
pub environment: DeploymentEnvironment,
/// Custom domain (optional)
pub custom_domain: Option<String>,
/// Enable CI/CD pipeline
pub ci_cd_enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum DeploymentEnvironment {
Development,
Staging,
Production,
}
impl Default for DeploymentEnvironment {
fn default() -> Self {
DeploymentEnvironment::Development
}
}
impl std::fmt::Display for DeploymentEnvironment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeploymentEnvironment::Development => write!(f, "development"),
DeploymentEnvironment::Staging => write!(f, "staging"),
DeploymentEnvironment::Production => write!(f, "production"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeploymentResult {
pub url: String,
pub repository: String,
pub app_type: String,
pub environment: String,
pub status: DeploymentStatus,
pub metadata: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum DeploymentStatus {
Pending,
Building,
Deployed,
Failed,
}
#[derive(Debug)]
pub enum DeploymentError {
ConfigurationError(String),
ForgejoError(String),
GitError(String),
CiCdError(String),
}
impl std::fmt::Display for DeploymentError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeploymentError::ConfigurationError(msg) => {
write!(f, "Configuration error: {}", msg)
}
DeploymentError::ForgejoError(msg) => write!(f, "Forgejo error: {}", msg),
DeploymentError::GitError(msg) => write!(f, "Git error: {}", msg),
DeploymentError::CiCdError(msg) => write!(f, "CI/CD error: {}", msg),
}
}
}
impl std::error::Error for DeploymentError {}
impl From<ForgejoError> for DeploymentError {
fn from(err: ForgejoError) -> Self {
DeploymentError::ForgejoError(err.to_string())
}
}
/// Helper type for wrapping string errors to implement Error trait
#[derive(Debug)]
struct StringError(String);
impl fmt::Display for StringError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for StringError {}
#[derive(Debug, Clone)]
pub struct GeneratedApp {
pub name: String,
pub description: String,
pub files: Vec<GeneratedFile>,
}
#[derive(Debug, Clone)]
pub struct GeneratedFile {
pub path: String,
pub content: Vec<u8>,
}
impl GeneratedApp {
pub fn new(name: String, description: String) -> Self {
Self {
name,
description,
files: Vec::new(),
}
}
pub fn temp_dir(&self) -> Result<std::path::PathBuf, DeploymentError> {
let temp_dir = std::env::temp_dir()
.join("gb-deployments")
.join(&self.name);
Ok(temp_dir)
}
pub fn add_file(&mut self, path: String, content: Vec<u8>) {
self.files.push(GeneratedFile { path, content });
}
pub fn add_text_file(&mut self, path: String, content: String) {
self.add_file(path, content.into_bytes());
}
}
// =============================================================================
// API Types
// =============================================================================
#[derive(Debug, Deserialize)]
pub struct DeploymentRequest {
pub organization: Option<String>,
pub app_name: String,
pub app_type: String,
pub framework: Option<String>,
pub environment: String,
pub custom_domain: Option<String>,
pub ci_cd_enabled: Option<bool>,
pub shared_database: Option<bool>,
pub shared_auth: Option<bool>,
pub shared_cache: Option<bool>,
}
#[derive(Debug, Serialize)]
pub struct DeploymentResponse {
pub success: bool,
pub url: Option<String>,
pub repository: Option<String>,
pub app_type: Option<String>,
pub status: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct AppTypesResponse {
pub app_types: Vec<AppTypeInfo>,
}
#[derive(Debug, Serialize)]
pub struct AppTypeInfo {
pub id: String,
pub name: String,
pub description: String,
pub features: Vec<String>,
}
#[derive(Debug)]
pub enum DeploymentApiError {
ValidationError(String),
DeploymentFailed(String),
ConfigurationError(String),
}
impl IntoResponse for DeploymentApiError {
fn into_response(self) -> Response {
use crate::security::error_sanitizer::log_and_sanitize;
let (status, message) = match self {
DeploymentApiError::ValidationError(msg) => (StatusCode::BAD_REQUEST, msg),
DeploymentApiError::DeploymentFailed(msg) => {
let error = StringError(msg);
let sanitized = log_and_sanitize(&error, "deployment", None);
(StatusCode::INTERNAL_SERVER_ERROR, sanitized.message)
}
DeploymentApiError::ConfigurationError(msg) => {
let error = StringError(msg);
let sanitized = log_and_sanitize(&error, "deployment_config", None);
(StatusCode::INTERNAL_SERVER_ERROR, sanitized.message)
}
};
let body = Json(serde_json::json!({
"success": false,
"error": message,
}));
(status, body).into_response()
}
}

View file

@ -266,7 +266,7 @@ impl DriveMonitor {
tokio::spawn(async move { tokio::spawn(async move {
let mut consecutive_processing_failures = 0; let mut consecutive_processing_failures = 0;
trace!("Starting periodic monitoring loop for bot {}", self_clone.bot_id); trace!("Starting periodic monitoring loop for bot {}", self_clone.bot_id);
let is_processing_state = self_clone.is_processing.load(std::sync::atomic::Ordering::SeqCst); let is_processing_state = self_clone.is_processing.load(std::sync::atomic::Ordering::SeqCst);
trace!("is_processing state at loop start: {} for bot {}", is_processing_state, self_clone.bot_id); trace!("is_processing state at loop start: {} for bot {}", is_processing_state, self_clone.bot_id);
@ -630,7 +630,7 @@ impl DriveMonitor {
Err(_) => String::new(), Err(_) => String::new(),
}; };
let normalized_new_value = Self::normalize_config_value(new_value); let normalized_new_value = Self::normalize_config_value(new_value);
if normalized_old_value != normalized_new_value { if normalized_old_value != normalized_new_value {
trace!( trace!(
"Detected change in {} (old: {}, new: {})", "Detected change in {} (old: {}, new: {})",
@ -956,10 +956,13 @@ impl DriveMonitor {
use crate::core::kb::website_crawler_service::WebsiteCrawlerService; use crate::core::kb::website_crawler_service::WebsiteCrawlerService;
use diesel::prelude::*; use diesel::prelude::*;
if _kb_manager.is_none() { let kb_manager = match _kb_manager {
warn!("Knowledge base manager not available, skipping website crawl"); Some(kb) => kb,
return Ok(()); None => {
} warn!("Knowledge base manager not available, skipping website crawl");
return Ok(());
}
};
let mut conn = _db_pool.get()?; let mut conn = _db_pool.get()?;
@ -1005,7 +1008,7 @@ impl DriveMonitor {
}; };
// Create a temporary crawler service to use its crawl_website method // Create a temporary crawler service to use its crawl_website method
let crawler_service = WebsiteCrawlerService::new(_db_pool.clone()); let crawler_service = WebsiteCrawlerService::new(_db_pool.clone(), kb_manager);
match crawler_service.crawl_single_website(website_record).await { match crawler_service.crawl_single_website(website_record).await {
Ok(_) => {}, Ok(_) => {},
Err(e) => return Err(format!("Website crawl failed: {}", e).into()), Err(e) => return Err(format!("Website crawl failed: {}", e).into()),

View file

@ -108,7 +108,7 @@ impl LocalFileMonitor {
trace!("Watching directory: {:?}", self.data_dir); trace!("Watching directory: {:?}", self.data_dir);
while self.is_processing.load(Ordering::SeqCst) { while self.is_processing.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_secs(60)).await; tokio::time::sleep(Duration::from_secs(5)).await;
// Process events from the watcher // Process events from the watcher
while let Ok(event) = rx.try_recv() { while let Ok(event) = rx.try_recv() {
@ -134,16 +134,21 @@ impl LocalFileMonitor {
_ => {} _ => {}
} }
} }
// Periodic scan to catch any missed changes
if let Err(e) = self.scan_and_compile_all().await {
error!("Scan failed: {}", e);
}
} }
trace!("Monitoring loop ended"); trace!("Monitoring loop ended");
} }
async fn polling_loop(&self) { async fn polling_loop(&self) {
trace!("Using polling fallback (checking every 60s)"); trace!("Using polling fallback (checking every 10s)");
while self.is_processing.load(Ordering::SeqCst) { while self.is_processing.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_secs(60)).await; tokio::time::sleep(Duration::from_secs(10)).await;
if let Err(e) = self.scan_and_compile_all().await { if let Err(e) = self.scan_and_compile_all().await {
error!("Scan failed: {}", e); error!("Scan failed: {}", e);
@ -198,6 +203,8 @@ impl LocalFileMonitor {
} }
async fn compile_gbdialog(&self, bot_name: &str, gbdialog_path: &Path) -> Result<(), Box<dyn Error + Send + Sync>> { async fn compile_gbdialog(&self, bot_name: &str, gbdialog_path: &Path) -> Result<(), Box<dyn Error + Send + Sync>> {
info!("Compiling bot: {}", bot_name);
let entries = tokio::fs::read_dir(gbdialog_path).await?; let entries = tokio::fs::read_dir(gbdialog_path).await?;
let mut entries = entries; let mut entries = entries;
@ -224,7 +231,6 @@ impl LocalFileMonitor {
}; };
if should_compile { if should_compile {
info!("Compiling bot: {}", bot_name);
debug!("Recompiling {:?} - modification detected", path); debug!("Recompiling {:?} - modification detected", path);
if let Err(e) = self.compile_local_file(&path).await { if let Err(e) = self.compile_local_file(&path).await {
error!("Failed to compile {:?}: {}", path, e); error!("Failed to compile {:?}: {}", path, e);

View file

@ -605,15 +605,13 @@ impl LLMProvider for CachedLLMProvider {
pub struct LocalEmbeddingService { pub struct LocalEmbeddingService {
embedding_url: String, embedding_url: String,
model: String, model: String,
api_key: Option<String>,
} }
impl LocalEmbeddingService { impl LocalEmbeddingService {
pub fn new(embedding_url: String, model: String, api_key: Option<String>) -> Self { pub fn new(embedding_url: String, model: String) -> Self {
Self { Self {
embedding_url, embedding_url,
model, model,
api_key,
} }
} }
} }
@ -625,121 +623,22 @@ impl EmbeddingService for LocalEmbeddingService {
text: &str, text: &str,
) -> Result<Vec<f32>, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<Vec<f32>, Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let response = client
// Determine if URL already includes endpoint path .post(format!("{}/embedding", self.embedding_url))
let url = if self.embedding_url.contains("/pipeline/") || .json(&serde_json::json!({
self.embedding_url.contains("/v1/") ||
self.embedding_url.contains("/ai/run/") ||
self.embedding_url.ends_with("/embeddings") {
self.embedding_url.clone()
} else {
format!("{}/embedding", self.embedding_url)
};
let mut request = client.post(&url);
// Add authorization header if API key is provided
if let Some(ref api_key) = self.api_key {
request = request.header("Authorization", format!("Bearer {}", api_key));
}
// Determine request body format based on URL
let request_body = if self.embedding_url.contains("huggingface.co") {
serde_json::json!({
"inputs": text,
})
} else if self.embedding_url.contains("/ai/run/") {
// Cloudflare AI format
serde_json::json!({
"text": text,
})
} else {
serde_json::json!({
"input": text, "input": text,
"model": self.model, "model": self.model,
}) }))
};
let response = request
.json(&request_body)
.send() .send()
.await?; .await?;
let status = response.status(); let result: Value = response.json().await?;
let response_text = response.text().await?; let embedding = result["data"][0]["embedding"]
.as_array()
if !status.is_success() { .ok_or("Invalid embedding response")?
debug!( .iter()
"Embedding service HTTP error {}: {}", .filter_map(|v| v.as_f64().map(|f| f as f32))
status, .collect();
response_text
);
return Err(format!(
"Embedding service returned HTTP {}: {}",
status,
response_text
).into());
}
let result: Value = serde_json::from_str(&response_text)
.map_err(|e| {
debug!("Failed to parse embedding JSON: {} - Response: {}", e, response_text);
format!("Failed to parse embedding response JSON: {} - Response: {}", e, response_text)
})?;
if let Some(error) = result.get("error") {
debug!("Embedding service returned error: {}", error);
return Err(format!("Embedding service error: {}", error).into());
}
// Try multiple response formats
let embedding = if let Some(arr) = result.as_array() {
// HuggingFace format: direct array [0.1, 0.2, ...]
arr.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else if let Some(result_obj) = result.get("result") {
// Cloudflare AI format: {"result": {"data": [[...]]}}
if let Some(data) = result_obj.get("data") {
if let Some(data_arr) = data.as_array() {
if let Some(first) = data_arr.first() {
if let Some(embedding_arr) = first.as_array() {
embedding_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
data_arr
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
}
} else {
return Err("Empty data array in Cloudflare response".into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data array, got: {}", response_text).into());
}
} else {
return Err(format!("Invalid Cloudflare response format - Expected result.data, got: {}", response_text).into());
}
} else if let Some(data) = result.get("data") {
// OpenAI/Standard format: {"data": [{"embedding": [...]}]}
data[0]["embedding"]
.as_array()
.ok_or_else(|| {
debug!("Invalid embedding response format. Expected data[0].embedding array. Got: {}", response_text);
format!("Invalid embedding response format - Expected data[0].embedding array, got: {}", response_text)
})?
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect()
} else {
return Err(format!(
"Invalid embedding response format - Expected array or data[0].embedding, got: {}",
response_text
).into());
};
Ok(embedding) Ok(embedding)
} }

View file

@ -748,32 +748,17 @@ fn init_llm_provider(
let embedding_model = config_manager let embedding_model = config_manager
.get_config(&bot_id, "embedding-model", Some("all-MiniLM-L6-v2")) .get_config(&bot_id, "embedding-model", Some("all-MiniLM-L6-v2"))
.unwrap_or_else(|_| "all-MiniLM-L6-v2".to_string()); .unwrap_or_else(|_| "all-MiniLM-L6-v2".to_string());
let embedding_key = config_manager
.get_config(&bot_id, "embedding-key", None)
.ok();
let semantic_cache_enabled = config_manager
.get_config(&bot_id, "semantic-cache-enabled", Some("true"))
.unwrap_or_else(|_| "true".to_string())
.to_lowercase() == "true";
info!("Embedding URL: {}", embedding_url); info!("Embedding URL: {}", embedding_url);
info!("Embedding Model: {}", embedding_model); info!("Embedding Model: {}", embedding_model);
info!("Embedding Key: {}", if embedding_key.is_some() { "configured" } else { "not set" });
info!("Semantic Cache Enabled: {}", semantic_cache_enabled);
let embedding_service = if semantic_cache_enabled { let embedding_service = Some(Arc::new(LocalEmbeddingService::new(
Some(Arc::new(LocalEmbeddingService::new( embedding_url,
embedding_url, embedding_model,
embedding_model, )) as Arc<dyn EmbeddingService>);
embedding_key,
)) as Arc<dyn EmbeddingService>)
} else {
None
};
let cache_config = CacheConfig { let cache_config = CacheConfig {
ttl: 3600, ttl: 3600,
semantic_matching: semantic_cache_enabled, semantic_matching: true,
similarity_threshold: 0.85, similarity_threshold: 0.85,
max_similarity_checks: 100, max_similarity_checks: 100,
key_prefix: "llm_cache".to_string(), key_prefix: "llm_cache".to_string(),

View file

@ -247,13 +247,6 @@ pub async fn run_axum_server(
api_router = api_router.merge(crate::research::configure_research_routes()); api_router = api_router.merge(crate::research::configure_research_routes());
api_router = api_router.merge(crate::research::ui::configure_research_ui_routes()); api_router = api_router.merge(crate::research::ui::configure_research_ui_routes());
} }
#[cfg(any(feature = "research", feature = "llm"))]
{
api_router = api_router.route(
"/api/website/force-recrawl",
post(crate::core::kb::website_crawler_service::handle_force_recrawl)
);
}
#[cfg(feature = "sources")] #[cfg(feature = "sources")]
{ {
api_router = api_router.merge(crate::sources::configure_sources_routes()); api_router = api_router.merge(crate::sources::configure_sources_routes());