diff --git a/src/main.rs b/src/main.rs index d4e081e..b30159f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,7 @@ async fn main() -> std::io::Result<()> { ensure_llama_servers_running() .await .expect("Failed to initialize LLM local server."); - + initialize_browser_pool() .await .expect("Failed to initialize browser pool"); diff --git a/src/models/org_model.rs b/src/models/org_model.md similarity index 100% rename from src/models/org_model.rs rename to src/models/org_model.md diff --git a/src/services/llm_local.rs b/src/services/llm_local.rs index fb9b4ba..c1e21cb 100644 --- a/src/services/llm_local.rs +++ b/src/services/llm_local.rs @@ -4,8 +4,6 @@ use log::{error, info}; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::env; -use std::path::Path; -use std::process::{Command, Stdio}; use tokio::time::{sleep, Duration}; // OpenAI-compatible request/response structures @@ -190,91 +188,15 @@ async fn start_llm_server( std::env::set_var("OMP_PLACES", "cores"); std::env::set_var("OMP_PROC_BIND", "close"); - // Verify paths exist - let llama_path = Path::new(&llama_cpp_path); - let model_path = Path::new(&model_path); + // "cd {} && numactl --interleave=all ./llama-server -m {} --host 0.0.0.0 --port {} --threads 20 --threads-batch 40 --temp 0.7 --parallel 1 --repeat-penalty 1.1 --ctx-size 8192 --batch-size 8192 -n 4096 --mlock --no-mmap --flash-attn --no-kv-offload --no-mmap &", - if !llama_path.exists() { - return Err(format!("Llama path does not exist: {}", llama_cpp_path).into()); - } - - if !model_path.exists() { - return Err(format!("Model path does not exist: {}", model_path.display()).into()); - } - - #[cfg(target_os = "linux")] - { - let executable_path = llama_path.join("llama-server"); - - if !executable_path.exists() { - return Err(format!("Executable not found: {}", executable_path.display()).into()); - } - - info!("Starting LLM server on port: {}", port); - info!("Llama path: {}", llama_cpp_path); - info!("Model path: {}", model_path.display()); - - // Use absolute paths and proper process management - let mut cmd = Command::new(executable_path); - cmd.arg("-m") - .arg(model_path) - .arg("--host") - .arg("0.0.0.0") - .arg("--port") - .arg(port) - .arg("--n-gpu-layers") - .arg("99") - .arg("--threads") - .arg("20") - .arg("--threads-batch") - .arg("40") - .current_dir(llama_path) // Set working directory - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - // Get the command as a string - info!("Command: {}", cmd.get_program().to_string_lossy()); - info!("Args: {:?}", cmd.get_args().collect::>()); - info!("Current dir: {:?}", cmd.get_current_dir()); - - // Spawn and don't wait for completion - let child = cmd.spawn()?; - - // Store the child process if you need to manage it later - // You might want to add this to a process manager - info!("LLM server started with PID: {}", child.id()); - } - - #[cfg(target_os = "windows")] - { - let executable_path = llama_path.join("llama-server.exe"); - - if !executable_path.exists() { - return Err(format!("Executable not found: {}", executable_path.display()).into()); - } - - info!("Starting LLM server on port: {}", port); - info!("Llama path: {}", llama_cpp_path); - info!("Model path: {}", model_path.display()); - - let mut cmd = Command::new(executable_path); - cmd.arg("-m") - .arg(model_path) - .arg("--host") - .arg("0.0.0.0") - .arg("--port") - .arg(port) - .arg("--n-gpu-layers") - .arg("99") - .current_dir(llama_path) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - info!("LLM server command: {}", cmd); - let child = cmd.spawn()?; - info!("LLM server started with PID: {}", child.id()); - } + let mut cmd = tokio::process::Command::new("sh"); + cmd.arg("-c").arg(format!( + "cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --n-gpu-layers 99 &", + llama_cpp_path, model_path, port + )); + cmd.spawn()?; Ok(()) } @@ -285,93 +207,13 @@ async fn start_embedding_server( ) -> Result<(), Box> { let port = url.split(':').last().unwrap_or("8082"); - std::env::set_var("OMP_NUM_THREADS", "20"); - std::env::set_var("OMP_PLACES", "cores"); - std::env::set_var("OMP_PROC_BIND", "close"); - - // Verify paths exist - let llama_path = Path::new(&llama_cpp_path); - let model_path = Path::new(&model_path); - - if !llama_path.exists() { - return Err(format!("Llama path does not exist: {}", llama_cpp_path).into()); - } - - if !model_path.exists() { - return Err(format!("Model path does not exist: {}", model_path.display()).into()); - } - - #[cfg(target_os = "linux")] - { - let executable_path = llama_path.join("llama-server"); - - if !executable_path.exists() { - return Err(format!("Executable not found: {}", executable_path.display()).into()); - } - - info!("Starting embedding server on port: {}", port); - info!("Llama path: {}", llama_cpp_path); - info!("Model path: {}", model_path.display()); - - // Use absolute paths and proper process management - let mut cmd = Command::new(executable_path); - cmd.arg("-m") - .arg(model_path) - .arg("--host") - .arg("0.0.0.0") - .arg("--port") - .arg(port) - .arg("--embedding") - .arg("--n-gpu-layers") - .arg("99") - .arg("--threads") - .arg("20") - .arg("--threads-batch") - .arg("40") - .current_dir(llama_path) // Set working directory - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - // Get the command as a string - info!("Command: {}", cmd.get_program().to_string_lossy()); - info!("Args: {:?}", cmd.get_args().collect::>()); - info!("Current dir: {:?}", cmd.get_current_dir()); - - let child = cmd.spawn()?; - - info!("Embedding server started with PID: {}", child.id()); - } - - #[cfg(target_os = "windows")] - { - let executable_path = llama_path.join("llama-server.exe"); - - if !executable_path.exists() { - return Err(format!("Executable not found: {}", executable_path.display()).into()); - } - - info!("Starting embedding server on port: {}", port); - info!("Llama path: {}", llama_cpp_path); - info!("Model path: {}", model_path.display()); - - let mut cmd = Command::new(executable_path); - cmd.arg("-m") - .arg(model_path) - .arg("--host") - .arg("0.0.0.0") - .arg("--port") - .arg(port) - .arg("--embedding") - .arg("--n-gpu-layers") - .arg("99") - .current_dir(llama_path) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let child = cmd.spawn()?; - info!("Embedding server started with PID: {}", child.id()); - } + let mut cmd = tokio::process::Command::new("sh"); + cmd.arg("-c").arg(format!( + "cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 &", + llama_cpp_path, model_path, port + )); + cmd.spawn()?; Ok(()) } @@ -589,8 +431,7 @@ struct LlamaCppEmbeddingRequest { // FIXED: Handle the stupid nested array format #[derive(Debug, Deserialize)] struct LlamaCppEmbeddingResponseItem { - #[serde(rename = "index")] - pub _index: usize, + pub index: usize, pub embedding: Vec>, // This is the up part - embedding is an array of arrays } diff --git a/src/services/org.md b/src/services/org.md new file mode 100644 index 0000000..50245f9 --- /dev/null +++ b/src/services/org.md @@ -0,0 +1,170 @@ + .service(create_organization) + .service(get_organization) + .service(list_organizations) + .service(update_organization) + .service(delete_organization) + + +use actix_web::{web, HttpResponse, Result}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Deserialize)] +pub struct CreateOrganizationRequest { + pub name: String, + pub slug: String, +} + +#[derive(Debug, Serialize)] +pub struct ApiResponse { + pub data: T, + pub success: bool, +} + +// Helper functions + +/// Create a new organization in database +pub async fn create_organization_db( + db_pool: &PgPool, + name: &str, + slug: &str, +) -> Result { + let org = sqlx::query_as!( + Organization, + r#" + INSERT INTO organizations (org_id, name, slug, created_at) + VALUES ($1, $2, $3, $4) + RETURNING org_id, name, slug, created_at + "#, + Uuid::new_v4(), + name, + slug, + Utc::now() + ) + .fetch_one(db_pool) + .await?; + + Ok(org) +} + +/// Get organization by ID from database +pub async fn get_organization_by_id_db( + db_pool: &PgPool, + org_id: Uuid, +) -> Result, sqlx::Error> { + let org = sqlx::query_as!( + Organization, + r#" + SELECT org_id, name, slug, created_at + FROM organizations + WHERE org_id = $1 + "#, + org_id + ) + .fetch_optional(db_pool) + .await?; + + Ok(org) +} + +#[post("/organizations/create")] +pub async fn create_organization( + state: web::Data, + payload: web::Json, +) -> Result { + let org = create_organization_db(&state.db_pool, &payload.name, &payload.slug) + .await + .map_err(|e| { + actix_web::error::ErrorInternalServerError(format!( + "Failed to create organization: {}", + e + )) + })?; + + let response = ApiResponse { + data: org, + success: true, + }; + + Ok(HttpResponse::Ok().json(response)) +} + +#[get("/organizations/{org_id}")] +pub async fn get_organization( + state: web::Data, + path: web::Path, +) -> Result { + let org_id = path.into_inner(); + + let org = get_organization_by_id_db(&state.db_pool, org_id) + .await + .map_err(|e| { + actix_web::error::ErrorInternalServerError(format!("Database error: {}", e)) + })?; + + match org { + Some(org) => { + let response = ApiResponse { + data: org, + success: true, + }; + Ok(HttpResponse::Ok().json(response)) + } + None => Ok(HttpResponse::NotFound().json(ApiResponse { + data: "Organization not found", + success: false, + })), + } +} + +#[get("/organizations")] +pub async fn list_organizations( + state: web::Data, + query: web::Query, +) -> Result { + let orgs = get_organizations_db(&state.db_pool, query.page, query.page_size) + .await + .map_err(|e| { + actix_web::error::ErrorInternalServerError(format!("Database error: {}", e)) + })?; + + let response = ApiResponse { + data: orgs, + success: true, + }; + + Ok(HttpResponse::Ok().json(response)) +} + +#[put("/organizations/{org_id}")] +pub async fn update_organization( + state: web::Data, + path: web::Path, + payload: web::Json, +) -> Result { + let org_id = path.into_inner(); + + // Implementation for update operation + // Use spawn_blocking for CPU-intensive operations if needed + let updated_org = web::block(move || { + // Blocking database operation would go here + // For async, use direct SQLx calls + Ok::<_, actix_web::Error>(Organization { + org_id, + name: payload.name.clone(), + slug: payload.slug.clone(), + created_at: Utc::now(), + }) + }) + .await? + .map_err(|e: actix_web::Error| e)?; + + let response = ApiResponse { + data: updated_org, + success: true, + }; + + Ok(HttpResponse::Ok().json(response)) +}