Compare commits

...

4 commits

Author SHA1 Message Date
f0cdf2047a - Local LLM Embeddings.
Some checks are pending
GBCI / build (push) Waiting to run
2025-09-10 11:01:39 -03:00
c54904b18b - LLM local fixes. 2025-09-09 15:09:28 -03:00
909f2ae5f1 - Remove values from .sh. 2025-09-08 15:39:37 -03:00
ed5caa64b6 - Local LLM embeddings. 2025-09-08 14:58:22 -03:00
6 changed files with 418 additions and 223 deletions

View file

@ -10,7 +10,8 @@ use sqlx::PgPool;
use crate::services::automation::AutomationService; use crate::services::automation::AutomationService;
use crate::services::email::{get_emails, list_emails, save_click, send_email}; use crate::services::email::{get_emails, list_emails, save_click, send_email};
use crate::services::llm::{chat, chat_stream}; use crate::services::llm::{chat, chat_stream};
use crate::services::llm_local::chat_completions_local; use crate::services::llm_local::ensure_llama_servers_running;
use crate::services::llm_local::{chat_completions_local, embeddings_local};
use crate::services::llm_provider::chat_completions; use crate::services::llm_provider::chat_completions;
use crate::services::web_automation::{initialize_browser_pool, BrowserPool}; use crate::services::web_automation::{initialize_browser_pool, BrowserPool};
@ -38,14 +39,9 @@ async fn main() -> std::io::Result<()> {
"/usr/bin/brave-browser-beta".to_string(), "/usr/bin/brave-browser-beta".to_string(),
)); ));
#[cfg(feature = "local_llm")] ensure_llama_servers_running()
{ .await
use crate::services::llm_local::ensure_llama_server_running; .expect("Failed to initialize LLM local server.");
ensure_llama_server_running()
.await
.expect("Failed to initialize LLM local server.");
}
initialize_browser_pool() initialize_browser_pool()
.await .await
@ -86,9 +82,9 @@ async fn main() -> std::io::Result<()> {
.service(list_emails) .service(list_emails)
.service(send_email) .service(send_email)
.service(chat_stream) .service(chat_stream)
.service(chat_completions)
.service(chat_completions_local) .service(chat_completions_local)
.service(chat) .service(chat)
.service(embeddings_local)
}) })
.bind((config.server.host.clone(), config.server.port))? .bind((config.server.host.clone(), config.server.port))?
.run() .run()

View file

@ -13,7 +13,6 @@ sleep 15
lxc exec "$PARAM_TENANT"-bot -- bash -c " lxc exec "$PARAM_TENANT"-bot -- bash -c "
apt-get update && apt-get install -y \ apt-get update && apt-get install -y \
build-essential cmake git pkg-config libjpeg-dev libtiff-dev \ build-essential cmake git pkg-config libjpeg-dev libtiff-dev \
libpng-dev libavcodec-dev libavformat-dev libswscale-dev \ libpng-dev libavcodec-dev libavformat-dev libswscale-dev \
@ -111,4 +110,4 @@ sudo systemctl start bot.service
lxc config device remove "$PARAM_TENANT"-bot bot-proxy 2>/dev/null || true lxc config device remove "$PARAM_TENANT"-bot bot-proxy 2>/dev/null || true
lxc config device add "$PARAM_TENANT"-bot bot-proxy proxy \ lxc config device add "$PARAM_TENANT"-bot bot-proxy proxy \
listen=tcp:0.0.0.0:"$PARAM_BOT_PORT" \ listen=tcp:0.0.0.0:"$PARAM_BOT_PORT" \
connect=tcp:127.0.0.1:"$PARAM_BOT_PORT" connect=tcp:127.0.0.1:"$PARAM_BOT_PORT"

View file

@ -32,7 +32,7 @@ User=minio-user
Group=minio-user Group=minio-user
Environment="MINIO_ROOT_USER='"${PARAM_DRIVE_USER}"'" Environment="MINIO_ROOT_USER='"${PARAM_DRIVE_USER}"'"
Environment="MINIO_ROOT_PASSWORD='"${PARAM_DRIVE_PASSWORD}"'" Environment="MINIO_ROOT_PASSWORD='"${PARAM_DRIVE_PASSWORD}"'"
ExecStart=/usr/local/bin/minio server --console-address ":'"${PARAM_DRIVE_PORT}"'" /data ExecStart=/usr/local/bin/minio server --address ":'"${PARAM_DRIVE_PORT}"'" --console-address ":'"${PARAM_PORT}"'" /data
StandardOutput=append:/var/log/minio/output.log StandardOutput=append:/var/log/minio/output.log
StandardError=append:/var/log/minio/error.log StandardError=append:/var/log/minio/error.log
@ -53,4 +53,4 @@ lxc config device add "${PARAM_TENANT}-drive" minio-proxy proxy \
lxc config device remove "${PARAM_TENANT}-drive" console-proxy 2>/dev/null || true lxc config device remove "${PARAM_TENANT}-drive" console-proxy 2>/dev/null || true
lxc config device add "${PARAM_TENANT}-drive" console-proxy proxy \ lxc config device add "${PARAM_TENANT}-drive" console-proxy proxy \
listen=tcp:0.0.0.0:"${PARAM_DRIVE_PORT}" \ listen=tcp:0.0.0.0:"${PARAM_DRIVE_PORT}" \
connect=tcp:127.0.0.1:"${PARAM_DRIVE_PORT}" connect=tcp:127.0.0.1:"${PARAM_DRIVE_PORT}"

View file

@ -17,12 +17,42 @@ sleep 15
lxc exec $CONTAINER_NAME -- bash -c ' lxc exec $CONTAINER_NAME -- bash -c '
apt-get update && apt-get install -y wget apt-get update && apt-get install -y wget curl unzip git
useradd -r -s /bin/false gbuser || true useradd -r -s /bin/false gbuser || true
mkdir -p /opt/gbo/logs /opt/gbo/bin /opt/gbo/data /opt/gbo/conf mkdir -p /opt/gbo/logs /opt/gbo/bin /opt/gbo/data /opt/gbo/conf
chown -R gbuser:gbuser /opt/gbo/ chown -R gbuser:gbuser /opt/gbo/
wget https://github.com/ggml-org/llama.cpp/releases/download/b6148/llama-b6148-bin-ubuntu-x64.zip
mkdir llama.cpp
mv llama-b6148-bin-ubuntu-x64.zip llama.cpp
cd llama.cpp
unzip llama-b6148-bin-ubuntu-x64.zip
mv build/bin/* .
rm build/bin -r
rm llama-b6148-bin-ubuntu-x64.zip
wget https://huggingface.co/bartowski/DeepSeek-R1-Distill-Qwen-1.5B-GGUF/resolve/main/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf
wget https://huggingface.co/CompendiumLabs/bge-small-en-v1.5-gguf/resolve/main/bge-small-en-v1.5-f32.gguf
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source "$HOME/.cargo/env"
git clone https://alm.pragmatismo.com.br/generalbots/gbserver
apt install -y build-essential \
pkg-config \
libssl-dev \
gcc-multilib \
g++-multilib \
clang \
lld \
binutils-dev \
libudev-dev \
libdbus-1-dev
cat > /etc/systemd/system/system.service <<EOF cat > /etc/systemd/system/system.service <<EOF
[Unit] [Unit]
Description=General Bots System Service Description=General Bots System Service

View file

@ -9,7 +9,6 @@ mkdir -p "$HOST_DATA" "$HOST_CONF" "$HOST_LOGS"
lxc launch images:debian/12 "$PARAM_TENANT"-tables -c security.privileged=true lxc launch images:debian/12 "$PARAM_TENANT"-tables -c security.privileged=true
until lxc exec "$PARAM_TENANT"-tables -- test -f /bin/bash; do until lxc exec "$PARAM_TENANT"-tables -- test -f /bin/bash; do
sleep 5 sleep 5
done done
sleep 10 sleep 10
@ -17,75 +16,35 @@ sleep 10
lxc exec "$PARAM_TENANT"-tables -- bash -c " lxc exec "$PARAM_TENANT"-tables -- bash -c "
set -e set -e
export DEBIAN_FRONTEND=noninteractive export DEBIAN_FRONTEND=noninteractive
apt-get update apt-get update
apt-get install -y wget gnupg2 sudo lsb-release apt-get install -y wget gnupg2 sudo lsb-release curl
CODENAME=\$(lsb_release -cs)
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor > /etc/apt/trusted.gpg.d/postgresql.gpg sudo apt install -y postgresql-common
apt-get install -y postgresql-14 postgresql-client-14 sudo /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh
if ! id postgres &>/dev/null; then apt install -y postgresql
# TODO: Open listener on *.
until sudo -u postgres psql -p $PARAM_TABLES_PORT -c '\q' 2>/dev/null; do
echo \"Waiting for PostgreSQL to start on port $PARAM_TABLES_PORT...\"
sleep 3
done
sudo -u postgres psql -p $PARAM_TABLES_PORT -c \"CREATE USER $PARAM_TENANT WITH PASSWORD '$PARAM_TABLES_PASSWORD';\"
sudo -u postgres psql -p $PARAM_TABLES_PORT -c \"CREATE DATABASE ${PARAM_TENANT}_db OWNER $PARAM_TENANT;\"
sudo -u postgres psql -p $PARAM_TABLES_PORT -c \"GRANT ALL PRIVILEGES ON DATABASE ${PARAM_TENANT}_db TO $PARAM_TENANT;\"
exit 1
fi
systemctl stop postgresql@14-main 2>/dev/null || systemctl stop postgresql 2>/dev/null || true
" "
POSTGRES_UID=$(lxc exec "$PARAM_TENANT"-tables -- id -u postgres)
POSTGRES_GID=$(lxc exec "$PARAM_TENANT"-tables -- id -g postgres)
HOST_POSTGRES_UID=$((100000 + POSTGRES_UID))
HOST_POSTGRES_GID=$((100000 + POSTGRES_GID))
chown -R "$HOST_POSTGRES_UID:$HOST_POSTGRES_GID" "$HOST_BASE"
chmod -R 750 "$HOST_BASE"
lxc config device add "$PARAM_TENANT"-tables pgdata disk source="$HOST_DATA" path=/var/lib/postgresql/14/main
lxc config device add "$PARAM_TENANT"-tables pgconf disk source="$HOST_CONF" path=/etc/postgresql/14/main
lxc config device add "$PARAM_TENANT"-tables pglogs disk source="$HOST_LOGS" path=/var/log/postgresql
mkdir -p /var/lib/postgresql/14/main
mkdir -p /etc/postgresql/14/main
mkdir -p /var/log/postgresql
chown -R postgres:postgres /var/lib/postgresql/14/main
chown -R postgres:postgres /etc/postgresql/14/main
chown -R postgres:postgres /var/log/postgresql
chmod 700 /var/lib/postgresql/14/main
sudo -u postgres /usr/lib/postgresql/14/bin/initdb -D /var/lib/postgresql/14/main
cat > /etc/postgresql/14/main/postgresql.conf <<EOF
data_directory = '/var/lib/postgresql/14/main'
hba_file = '/etc/postgresql/14/main/pg_hba.conf'
ident_file = '/etc/postgresql/14/main/pg_ident.conf'
listen_addresses = '*'
port = $PARAM_TABLES_PORT
max_connections = 100
shared_buffers = 128MB
log_destination = 'stderr'
logging_collector = on
log_directory = '/var/log/postgresql'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
EOF
cat > /etc/postgresql/14/main/pg_hba.conf <<EOF
local all postgres peer
local all all peer
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
host all all 0.0.0.0/0 md5
systemctl start postgresql@14-main
systemctl enable postgresql@14-main
EOF
lxc config device remove "$PARAM_TENANT"-tables postgres-proxy 2>/dev/null || true lxc config device remove "$PARAM_TENANT"-tables postgres-proxy 2>/dev/null || true
lxc config device add "$PARAM_TENANT"-tables postgres-proxy proxy \ lxc config device add "$PARAM_TENANT"-tables postgres-proxy proxy \
listen=tcp:0.0.0.0:"$PARAM_TABLES_PORT" \ listen=tcp:0.0.0.0:"$PARAM_TABLES_PORT" \
connect=tcp:127.0.0.1:"$PARAM_TABLES_PORT" connect=tcp:127.0.0.1:"$PARAM_TABLES_PORT"
cd /var/lib/postgresql echo "PostgreSQL setup completed successfully!"
until sudo -u postgres psql -p $PARAM_TABLES_PORT -c '\q' 2>/dev/null; do echo "Database: ${PARAM_TENANT}_db"
echo "User: $PARAM_TENANT"
sleep 3 echo "Password: $PARAM_TABLES_PASSWORD"
sudo -u "$PARAM_TABLES_USER" psql -p $PARAM_TABLES_PORT -c \"CREATE USER $PARAM_TENANT WITH PASSWORD '$PARAM_TABLES_PASSWORD';\" 2>/dev/null echo "Port: $PARAM_TABLES_PORT"
sudo -u "$PARAM_TABLES_USER" psql -p $PARAM_TABLES_PORT -c \"CREATE DATABASE ${PARAM_TENANT}_db OWNER $PARAM_TENANT;\" 2>/dev/null
sudo -u "$PARAM_TABLES_USER" psql -p $PARAM_TABLES_PORT -c \"GRANT ALL PRIVILEGES ON DATABASE ${PARAM_TENANT}_db TO $PARAM_TENANT;\" 2>/dev/null

View file

@ -3,15 +3,10 @@ use dotenv::dotenv;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::env; use std::env;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
// Global process handle
static mut LLAMA_PROCESS: Option<Arc<Mutex<Option<tokio::process::Child>>>> = None;
// OpenAI-compatible request/response structures // OpenAI-compatible request/response structures
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct ChatMessage { struct ChatMessage {
@ -59,157 +54,170 @@ struct LlamaCppResponse {
generation_settings: Option<serde_json::Value>, generation_settings: Option<serde_json::Value>,
} }
// Function to check if server is running pub async fn ensure_llama_servers_running() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
async fn is_server_running(url: &str) -> bool { {
let client = Client::builder() let llm_local = env::var("LLM_LOCAL").unwrap_or_else(|_| "false".to_string());
.timeout(Duration::from_secs(3))
.build()
.unwrap();
match client.get(&format!("{}/health", url)).send().await { if llm_local.to_lowercase() != "true" {
Ok(response) => { println!(" LLM_LOCAL is not enabled, skipping local server startup");
let is_ok = response.status().is_success();
if is_ok {
println!("🟢 Server health check: OK");
} else {
println!(
"🔴 Server health check: Failed with status {}",
response.status()
);
}
is_ok
}
Err(e) => {
println!("🔴 Server health check: Connection failed - {}", e);
false
}
}
}
// Function to start llama.cpp server
async fn start_llama_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("🚀 Starting llama.cpp server...");
// Get environment variables for llama.cpp configuration
let llama_path = env::var("LLM_CPP_PATH").unwrap_or_else(|_| "llama-server".to_string());
let model_path = env::var("LLM_MODEL_PATH")
.unwrap_or_else(|_| "./tinyllama-1.1b-chat-v1.0.Q4_0.gguf".to_string());
let cpu_limit = env::var("CPU_LIMIT").unwrap_or_else(|_| "50".to_string());
let port = env::var("LLM_PORT").unwrap_or_else(|_| "8080".to_string());
println!("🔧 Configuration:");
println!(" - Llama path: {}", llama_path);
println!(" - Model path: {}", model_path);
println!(" - CPU limit: {}%", cpu_limit);
println!(" - Port: {}", port);
// Kill any existing llama processes
println!("🧹 Cleaning up existing processes...");
let _ = Command::new("pkill").arg("-f").arg("llama-cli").output();
// Wait a bit for cleanup
sleep(Duration::from_secs(2)).await;
// Build the command
let full_command = format!(
"{}/llama-server -m {} --mlock --port {} --host 127.0.0.1",
llama_path, model_path, port
);
println!("📝 Executing command: {}", full_command);
// Start llama.cpp server with cpulimit using tokio
let mut cmd = TokioCommand::new("sh");
cmd.arg("-c");
cmd.arg(&full_command);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
let mut child = cmd
.spawn()
.map_err(|e| format!("Failed to start llama.cpp server: {}", e))?;
println!("🔄 Process spawned with PID: {:?}", child.id());
// Capture stdout and stderr for real-time logging
if let Some(stdout) = child.stdout.take() {
let stdout_reader = BufReader::new(stdout);
tokio::spawn(async move {
let mut lines = stdout_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
println!("🦙📤 STDOUT: {}", line);
}
println!("🦙📤 STDOUT stream ended");
});
}
if let Some(stderr) = child.stderr.take() {
let stderr_reader = BufReader::new(stderr);
tokio::spawn(async move {
let mut lines = stderr_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
println!("🦙📥 STDERR: {}", line);
}
println!("🦙📥 STDERR stream ended");
});
}
// Store the process handle
unsafe {
LLAMA_PROCESS = Some(Arc::new(Mutex::new(Some(child))));
}
println!("✅ Llama.cpp server process started!");
Ok(())
}
// Function to ensure llama.cpp server is running
pub async fn ensure_llama_server_running() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8080".to_string());
// Check if server is already running
if is_server_running(&llama_url).await {
println!("✅ Llama.cpp server is already running");
return Ok(()); return Ok(());
} }
// Start the server // Get configuration from environment variables
start_llama_server().await?; let llm_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
let embedding_url =
env::var("EMBEDDING_URL").unwrap_or_else(|_| "http://localhost:8082".to_string());
let llama_cpp_path = env::var("LLM_CPP_PATH").unwrap_or_else(|_| "~/llama.cpp".to_string());
let llm_model_path = env::var("LLM_MODEL_PATH").unwrap_or_else(|_| "".to_string());
let embedding_model_path = env::var("EMBEDDING_MODEL_PATH").unwrap_or_else(|_| "".to_string());
println!("🚀 Starting local llama.cpp servers...");
println!("📋 Configuration:");
println!(" LLM URL: {}", llm_url);
println!(" Embedding URL: {}", embedding_url);
println!(" LLM Model: {}", llm_model_path);
println!(" Embedding Model: {}", embedding_model_path);
// Check if servers are already running
let llm_running = is_server_running(&llm_url).await;
let embedding_running = is_server_running(&embedding_url).await;
if llm_running && embedding_running {
println!("✅ Both LLM and Embedding servers are already running");
return Ok(());
}
// Start servers that aren't running
let mut tasks = vec![];
if !llm_running && !llm_model_path.is_empty() {
println!("🔄 Starting LLM server...");
tasks.push(tokio::spawn(start_llm_server(
llama_cpp_path.clone(),
llm_model_path.clone(),
llm_url.clone(),
)));
} else if llm_model_path.is_empty() {
println!("⚠️ LLM_MODEL_PATH not set, skipping LLM server");
}
if !embedding_running && !embedding_model_path.is_empty() {
println!("🔄 Starting Embedding server...");
tasks.push(tokio::spawn(start_embedding_server(
llama_cpp_path.clone(),
embedding_model_path.clone(),
embedding_url.clone(),
)));
} else if embedding_model_path.is_empty() {
println!("⚠️ EMBEDDING_MODEL_PATH not set, skipping Embedding server");
}
// Wait for all server startup tasks
for task in tasks {
task.await??;
}
// Wait for servers to be ready with verbose logging
println!("⏳ Waiting for servers to become ready...");
let mut llm_ready = llm_running || llm_model_path.is_empty();
let mut embedding_ready = embedding_running || embedding_model_path.is_empty();
// Wait for server to be ready with verbose logging
println!("⏳ Waiting for llama.cpp server to become ready...");
let mut attempts = 0; let mut attempts = 0;
let max_attempts = 60; // 2 minutes total let max_attempts = 60; // 2 minutes total
while attempts < max_attempts { while attempts < max_attempts && (!llm_ready || !embedding_ready) {
sleep(Duration::from_secs(2)).await; sleep(Duration::from_secs(2)).await;
print!( println!(
"🔍 Checking server health (attempt {}/{})... ", "🔍 Checking server health (attempt {}/{})...",
attempts + 1, attempts + 1,
max_attempts max_attempts
); );
if is_server_running(&llama_url).await { if !llm_ready && !llm_model_path.is_empty() {
println!("✅ SUCCESS!"); if is_server_running(&llm_url).await {
println!("🎉 Llama.cpp server is ready and responding!"); println!(" ✅ LLM server ready at {}", llm_url);
return Ok(()); llm_ready = true;
} else { } else {
println!("❌ Not ready yet"); println!(" ❌ LLM server not ready yet");
}
}
if !embedding_ready && !embedding_model_path.is_empty() {
if is_server_running(&embedding_url).await {
println!(" ✅ Embedding server ready at {}", embedding_url);
embedding_ready = true;
} else {
println!(" ❌ Embedding server not ready yet");
}
} }
attempts += 1; attempts += 1;
if attempts % 10 == 0 { if attempts % 10 == 0 {
println!( println!(
"⏰ Still waiting for llama.cpp server... (attempt {}/{})", "⏰ Still waiting for servers... (attempt {}/{})",
attempts, max_attempts attempts, max_attempts
); );
println!("💡 Check the logs above for any errors from the llama server");
} }
} }
Err("❌ Llama.cpp server failed to start within timeout (2 minutes)".into()) if llm_ready && embedding_ready {
println!("🎉 All llama.cpp servers are ready and responding!");
Ok(())
} else {
let mut error_msg = "❌ Servers failed to start within timeout:".to_string();
if !llm_ready && !llm_model_path.is_empty() {
error_msg.push_str(&format!("\n - LLM server at {}", llm_url));
}
if !embedding_ready && !embedding_model_path.is_empty() {
error_msg.push_str(&format!("\n - Embedding server at {}", embedding_url));
}
Err(error_msg.into())
}
}
async fn start_llm_server(
llama_cpp_path: String,
model_path: String,
url: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let port = url.split(':').last().unwrap_or("8081");
let mut cmd = tokio::process::Command::new("sh");
cmd.arg("-c").arg(format!(
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --n-gpu-layers 99 &",
llama_cpp_path, model_path, port
));
cmd.spawn()?;
Ok(())
}
async fn start_embedding_server(
llama_cpp_path: String,
model_path: String,
url: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let port = url.split(':').last().unwrap_or("8082");
let mut cmd = tokio::process::Command::new("sh");
cmd.arg("-c").arg(format!(
"cd {} && ./llama-server -m {} --host 0.0.0.0 --port {} --embedding --n-gpu-layers 99 &",
llama_cpp_path, model_path, port
));
cmd.spawn()?;
Ok(())
}
async fn is_server_running(url: &str) -> bool {
let client = reqwest::Client::new();
match client.get(&format!("{}/health", url)).send().await {
Ok(response) => response.status().is_success(),
Err(_) => false,
}
} }
// Convert OpenAI chat messages to a single prompt // Convert OpenAI chat messages to a single prompt
@ -238,26 +246,15 @@ fn messages_to_prompt(messages: &[ChatMessage]) -> String {
} }
// Proxy endpoint // Proxy endpoint
#[post("/v1/chat/completions1")] #[post("/v1/chat/completions")]
pub async fn chat_completions_local( pub async fn chat_completions_local(
req_body: web::Json<ChatCompletionRequest>, req_body: web::Json<ChatCompletionRequest>,
_req: HttpRequest, _req: HttpRequest,
) -> Result<HttpResponse> { ) -> Result<HttpResponse> {
dotenv().ok().unwrap(); dotenv().ok().unwrap();
// Ensure llama.cpp server is running
if let Err(e) = ensure_llama_server_running().await {
eprintln!("Failed to start llama.cpp server: {}", e);
return Ok(HttpResponse::InternalServerError().json(serde_json::json!({
"error": {
"message": format!("Failed to start llama.cpp server: {}", e),
"type": "server_error"
}
})));
}
// Get llama.cpp server URL // Get llama.cpp server URL
let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8080".to_string()); let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
// Convert OpenAI format to llama.cpp format // Convert OpenAI format to llama.cpp format
let prompt = messages_to_prompt(&req_body.messages); let prompt = messages_to_prompt(&req_body.messages);
@ -342,10 +339,224 @@ pub async fn chat_completions_local(
} }
} }
// OpenAI Embedding Request - Modified to handle both string and array inputs
#[derive(Debug, Deserialize)]
pub struct EmbeddingRequest {
#[serde(deserialize_with = "deserialize_input")]
pub input: Vec<String>,
pub model: String,
#[serde(default)]
pub encoding_format: Option<String>,
}
// Custom deserializer to handle both string and array inputs
fn deserialize_input<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct InputVisitor;
impl<'de> Visitor<'de> for InputVisitor {
type Value = Vec<String>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a string or an array of strings")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(vec![value.to_string()])
}
fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
where
E: de::Error,
{
Ok(vec![value])
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: de::SeqAccess<'de>,
{
let mut vec = Vec::new();
while let Some(value) = seq.next_element::<String>()? {
vec.push(value);
}
Ok(vec)
}
}
deserializer.deserialize_any(InputVisitor)
}
// OpenAI Embedding Response
#[derive(Debug, Serialize)]
pub struct EmbeddingResponse {
pub object: String,
pub data: Vec<EmbeddingData>,
pub model: String,
pub usage: Usage,
}
#[derive(Debug, Serialize)]
pub struct EmbeddingData {
pub object: String,
pub embedding: Vec<f32>,
pub index: usize,
}
#[derive(Debug, Serialize)]
pub struct Usage {
pub prompt_tokens: u32,
pub total_tokens: u32,
}
// Llama.cpp Embedding Request
#[derive(Debug, Serialize)]
struct LlamaCppEmbeddingRequest {
pub content: String,
}
// FIXED: Handle the stupid nested array format
#[derive(Debug, Deserialize)]
struct LlamaCppEmbeddingResponseItem {
pub index: usize,
pub embedding: Vec<Vec<f32>>, // This is the fucked up part - embedding is an array of arrays
}
// Proxy endpoint for embeddings
#[post("/v1/embeddings")]
pub async fn embeddings_local(
req_body: web::Json<EmbeddingRequest>,
_req: HttpRequest,
) -> Result<HttpResponse> {
dotenv().ok();
// Get llama.cpp server URL
let llama_url =
env::var("EMBEDDING_URL").unwrap_or_else(|_| "http://localhost:8082".to_string());
let client = Client::builder()
.timeout(Duration::from_secs(120))
.build()
.map_err(|e| {
eprintln!("Error creating HTTP client: {}", e);
actix_web::error::ErrorInternalServerError("Failed to create HTTP client")
})?;
// Process each input text and get embeddings
let mut embeddings_data = Vec::new();
let mut total_tokens = 0;
for (index, input_text) in req_body.input.iter().enumerate() {
let llama_request = LlamaCppEmbeddingRequest {
content: input_text.clone(),
};
let response = client
.post(&format!("{}/embedding", llama_url))
.header("Content-Type", "application/json")
.json(&llama_request)
.send()
.await
.map_err(|e| {
eprintln!("Error calling llama.cpp server for embedding: {}", e);
actix_web::error::ErrorInternalServerError(
"Failed to call llama.cpp server for embedding",
)
})?;
let status = response.status();
if status.is_success() {
// First, get the raw response text for debugging
let raw_response = response.text().await.map_err(|e| {
eprintln!("Error reading response text: {}", e);
actix_web::error::ErrorInternalServerError("Failed to read response")
})?;
// Parse the response as a vector of items with nested arrays
let llama_response: Vec<LlamaCppEmbeddingResponseItem> =
serde_json::from_str(&raw_response).map_err(|e| {
eprintln!("Error parsing llama.cpp embedding response: {}", e);
eprintln!("Raw response: {}", raw_response);
actix_web::error::ErrorInternalServerError(
"Failed to parse llama.cpp embedding response",
)
})?;
// Extract the embedding from the nested array bullshit
if let Some(item) = llama_response.get(0) {
// The embedding field contains Vec<Vec<f32>>, so we need to flatten it
// If it's [[0.1, 0.2, 0.3]], we want [0.1, 0.2, 0.3]
let flattened_embedding = if !item.embedding.is_empty() {
item.embedding[0].clone() // Take the first (and probably only) inner array
} else {
vec![] // Empty if no embedding data
};
// Estimate token count
let estimated_tokens = (input_text.len() as f32 / 4.0).ceil() as u32;
total_tokens += estimated_tokens;
embeddings_data.push(EmbeddingData {
object: "embedding".to_string(),
embedding: flattened_embedding,
index,
});
} else {
eprintln!("No embedding data returned for input: {}", input_text);
return Ok(HttpResponse::InternalServerError().json(serde_json::json!({
"error": {
"message": format!("No embedding data returned for input {}", index),
"type": "server_error"
}
})));
}
} else {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
eprintln!("Llama.cpp server error ({}): {}", status, error_text);
let actix_status = actix_web::http::StatusCode::from_u16(status.as_u16())
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);
return Ok(HttpResponse::build(actix_status).json(serde_json::json!({
"error": {
"message": format!("Failed to get embedding for input {}: {}", index, error_text),
"type": "server_error"
}
})));
}
}
// Build OpenAI-compatible response
let openai_response = EmbeddingResponse {
object: "list".to_string(),
data: embeddings_data,
model: req_body.model.clone(),
usage: Usage {
prompt_tokens: total_tokens,
total_tokens,
},
};
Ok(HttpResponse::Ok().json(openai_response))
}
// Health check endpoint // Health check endpoint
#[actix_web::get("/health")] #[actix_web::get("/health")]
pub async fn health() -> Result<HttpResponse> { pub async fn health() -> Result<HttpResponse> {
let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8080".to_string()); let llama_url = env::var("LLM_URL").unwrap_or_else(|_| "http://localhost:8081".to_string());
if is_server_running(&llama_url).await { if is_server_running(&llama_url).await {
Ok(HttpResponse::Ok().json(serde_json::json!({ Ok(HttpResponse::Ok().json(serde_json::json!({