From 9bb8b64be7f27f3d2552032be86acb24649042bb Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Wed, 12 Nov 2025 08:19:21 -0300 Subject: [PATCH] feat(llm): pass model configuration to LLM generation and streaming Include model parameter in LLM provider calls across automation, bot, and keyword modules to ensure correct model selection based on configuration. This improves flexibility and consistency in LLM usage. --- src/automation/compact_prompt.rs | 6 +- src/automation/mod.rs | 1 - src/basic/keywords/llm_keyword.rs | 2 +- src/bot/mod.rs | 28 ++- src/llm/local.rs | 173 ++++++++++++------ src/llm/mod.rs | 17 +- .../default.gbai/default.gbot/config.csv | 18 +- 7 files changed, 157 insertions(+), 88 deletions(-) diff --git a/src/automation/compact_prompt.rs b/src/automation/compact_prompt.rs index 7b11704f..88bbbc00 100644 --- a/src/automation/compact_prompt.rs +++ b/src/automation/compact_prompt.rs @@ -123,7 +123,11 @@ async fn compact_prompt_for_bots( let llm_provider = state.llm_provider.clone(); trace!("Starting summarization for session {}", session.id); let mut filtered = String::new(); - let summarized = match llm_provider.generate("", &serde_json::Value::Array(messages)).await { + let config_manager = crate::config::ConfigManager::new(state.conn.clone()); + let model = config_manager.get_config(&Uuid::nil(), "llm-model", None).unwrap_or_default(); + + let summarized = match llm_provider.generate( + "", &serde_json::Value::Array(messages), &model).await { Ok(summary) => { trace!( "Successfully summarized session {} ({} chars)", diff --git a/src/automation/mod.rs b/src/automation/mod.rs index 91d43d29..ebedd795 100644 --- a/src/automation/mod.rs +++ b/src/automation/mod.rs @@ -1,5 +1,4 @@ use crate::basic::ScriptService; -use crate::config::ConfigManager; use crate::shared::models::{Automation, TriggerKind}; use crate::shared::state::AppState; use chrono::Utc; diff --git a/src/basic/keywords/llm_keyword.rs b/src/basic/keywords/llm_keyword.rs index 5da89f1c..8d71a33e 100644 --- a/src/basic/keywords/llm_keyword.rs +++ b/src/basic/keywords/llm_keyword.rs @@ -45,7 +45,7 @@ pub async fn execute_llm_generation(state: Arc, prompt: String) -> Res let config_manager = crate::config::ConfigManager::new(state.conn.clone()); let model = config_manager.get_config(&Uuid::nil(), "llm-model", None).unwrap_or_default(); let handler = crate::llm_models::get_handler(&model); - let raw_response = state.llm_provider.generate(&prompt, &serde_json::Value::Null).await?; + let raw_response = state.llm_provider.generate(&prompt, &serde_json::Value::Null, &model).await?; let processed = handler.process_content(&raw_response); Ok(processed) } diff --git a/src/bot/mod.rs b/src/bot/mod.rs index c85649ac..a1cc7019 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -1,10 +1,10 @@ mod ui; +use crate::bot::ui::BotUI; use crate::config::ConfigManager; use crate::drive_monitor::DriveMonitor; use crate::llm::OpenAIClient; use crate::llm_models; use crate::nvidia::get_system_metrics; -use crate::bot::ui::BotUI; use crate::shared::models::{BotResponse, Suggestion, UserMessage, UserSession}; use crate::shared::state::AppState; use actix_web::{web, HttpRequest, HttpResponse, Result}; @@ -397,11 +397,17 @@ impl BotOrchestrator { }; response_tx.send(thinking_response).await?; } + let config_manager = ConfigManager::new(self.state.conn.clone()); + let model = config_manager + .get_config( + &Uuid::parse_str(&message.bot_id).unwrap_or_default(), + "llm-model", + None, + ) + .unwrap_or_default(); + let model1 = model.clone(); tokio::spawn(async move { - if let Err(e) = llm - .generate_stream("", &messages, stream_tx) - .await - { + if let Err(e) = llm.generate_stream("", &messages, stream_tx, &model).await { error!("LLM streaming error: {}", e); } }); @@ -413,7 +419,6 @@ impl BotOrchestrator { let mut last_progress_update = Instant::now(); let progress_interval = Duration::from_secs(1); let initial_tokens = crate::shared::utils::estimate_token_count(&message.content); - let config_manager = ConfigManager::new(self.state.conn.clone()); let max_context_size = config_manager .get_config( &Uuid::parse_str(&message.bot_id).unwrap_or_default(), @@ -423,14 +428,7 @@ impl BotOrchestrator { .unwrap_or_default() .parse::() .unwrap_or(0); - let model = config_manager - .get_config( - &Uuid::parse_str(&message.bot_id).unwrap_or_default(), - "llm-model", - None, - ) - .unwrap_or_default(); - let handler = llm_models::get_handler(&model); + let handler = llm_models::get_handler(&model1); while let Some(chunk) = stream_rx.recv().await { chunk_count += 1; if !first_word_received && !chunk.trim().is_empty() { @@ -503,7 +501,7 @@ impl BotOrchestrator { "Total tokens (context + prompt + response): {}", total_tokens ); - let config_manager = ConfigManager::new( self.state.conn.clone()); + let config_manager = ConfigManager::new(self.state.conn.clone()); { let mut sm = self.state.session_manager.lock().await; sm.save_message(session.id, user_id, 2, &full_response, 1)?; diff --git a/src/llm/local.rs b/src/llm/local.rs index 9486c61e..dfe70f4c 100644 --- a/src/llm/local.rs +++ b/src/llm/local.rs @@ -1,49 +1,65 @@ -use crate::shared::state::AppState; -use crate::shared::models::schema::bots::dsl::*; use crate::config::ConfigManager; -use diesel::prelude::*; -use std::sync::Arc; -use log::{info, error}; -use tokio; -use reqwest; +use crate::shared::models::schema::bots::dsl::*; +use crate::shared::state::AppState; use actix_web::{post, web, HttpResponse, Result}; +use diesel::prelude::*; +use log::{error, info}; +use reqwest; +use std::sync::Arc; +use tokio; #[post("/api/chat/completions")] pub async fn chat_completions_local( _data: web::Data, _payload: web::Json, ) -> Result { - Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "chat_completions_local not implemented" }))) + Ok(HttpResponse::Ok() + .json(serde_json::json!({ "status": "chat_completions_local not implemented" }))) } #[post("/api/embeddings")] pub async fn embeddings_local( _data: web::Data, _payload: web::Json, ) -> Result { - Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "embeddings_local not implemented" }))) + Ok( + HttpResponse::Ok() + .json(serde_json::json!({ "status": "embeddings_local not implemented" })), + ) } pub async fn ensure_llama_servers_running( - app_state: Arc + app_state: Arc, ) -> Result<(), Box> { -let config_values = { - let conn_arc = app_state.conn.clone(); - let default_bot_id = tokio::task::spawn_blocking(move || { - let mut conn = conn_arc.get().unwrap(); - bots.filter(name.eq("default")) - .select(id) - .first::(&mut *conn) - .unwrap_or_else(|_| uuid::Uuid::nil()) - }).await?; - let config_manager = ConfigManager::new(app_state.conn.clone()); - ( - default_bot_id, - config_manager.get_config(&default_bot_id, "llm-url", None).unwrap_or_default(), - config_manager.get_config(&default_bot_id, "llm-model", None).unwrap_or_default(), - config_manager.get_config(&default_bot_id, "embedding-url", None).unwrap_or_default(), - config_manager.get_config(&default_bot_id, "embedding-model", None).unwrap_or_default(), - config_manager.get_config(&default_bot_id, "llm-server-path", None).unwrap_or_default(), - ) -}; -let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_server_path) = config_values; + let config_values = { + let conn_arc = app_state.conn.clone(); + let default_bot_id = tokio::task::spawn_blocking(move || { + let mut conn = conn_arc.get().unwrap(); + bots.filter(name.eq("default")) + .select(id) + .first::(&mut *conn) + .unwrap_or_else(|_| uuid::Uuid::nil()) + }) + .await?; + let config_manager = ConfigManager::new(app_state.conn.clone()); + ( + default_bot_id, + config_manager + .get_config(&default_bot_id, "llm-url", None) + .unwrap_or_default(), + config_manager + .get_config(&default_bot_id, "llm-model", None) + .unwrap_or_default(), + config_manager + .get_config(&default_bot_id, "embedding-url", None) + .unwrap_or_default(), + config_manager + .get_config(&default_bot_id, "embedding-model", None) + .unwrap_or_default(), + config_manager + .get_config(&default_bot_id, "llm-server-path", None) + .unwrap_or_default(), + ) + }; + let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_server_path) = + config_values; info!("Starting LLM servers..."); info!("Configuration:"); info!(" LLM URL: {}", llm_url); @@ -52,6 +68,12 @@ let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_se info!(" Embedding Model: {}", embedding_model); info!(" LLM Server Path: {}", llm_server_path); info!("Restarting any existing llama-server processes..."); + + + + + + if let Err(e) = tokio::process::Command::new("sh") .arg("-c") .arg("pkill llama-server -9 || true") @@ -62,8 +84,21 @@ let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_se tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; info!("Existing llama-server processes terminated (if any)"); } - let llm_running = is_server_running(&llm_url).await; - let embedding_running = is_server_running(&embedding_url).await; + + // Skip local server startup if using HTTPS endpoints + let llm_running = if llm_url.starts_with("https://") { + info!("Using external HTTPS LLM server, skipping local startup"); + true + } else { + is_server_running(&llm_url).await + }; + + let embedding_running = if embedding_url.starts_with("https://") { + info!("Using external HTTPS embedding server, skipping local startup"); + true + } else { + is_server_running(&embedding_url).await + }; if llm_running && embedding_running { info!("Both LLM and Embedding servers are already running"); return Ok(()); @@ -97,10 +132,14 @@ let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_se let mut llm_ready = llm_running || llm_model.is_empty(); let mut embedding_ready = embedding_running || embedding_model.is_empty(); let mut attempts = 0; - let max_attempts = 60; + let max_attempts = 60; while attempts < max_attempts && (!llm_ready || !embedding_ready) { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - info!("Checking server health (attempt {}/{})...", attempts + 1, max_attempts); + info!( + "Checking server health (attempt {}/{})...", + attempts + 1, + max_attempts + ); if !llm_ready && !llm_model.is_empty() { if is_server_running(&llm_url).await { info!("LLM server ready at {}", llm_url); @@ -119,11 +158,20 @@ let (_default_bot_id, llm_url, llm_model, embedding_url, embedding_model, llm_se } attempts += 1; if attempts % 10 == 0 { - info!("Still waiting for servers... (attempt {}/{})", attempts, max_attempts); + info!( + "Still waiting for servers... (attempt {}/{})", + attempts, max_attempts + ); } } if llm_ready && embedding_ready { info!("All llama.cpp servers are ready and responding!"); + + // Update LLM provider with new endpoints + let llm_provider1 = Arc::new(crate::llm::OpenAIClient::new( + llm_model.clone(), + Some(llm_url.clone()), + )); Ok(()) } else { let mut error_msg = "Servers failed to start within timeout:".to_string(); @@ -155,19 +203,36 @@ pub async fn start_llm_server( std::env::set_var("OMP_PROC_BIND", "close"); let conn = app_state.conn.clone(); let config_manager = ConfigManager::new(conn.clone()); - let mut conn = conn.get().unwrap(); - let default_bot_id = bots.filter(name.eq("default")) - .select(id) - .first::(&mut *conn) - .unwrap_or_else(|_| uuid::Uuid::nil()); - let n_moe = config_manager.get_config(&default_bot_id, "llm-server-n-moe", None).unwrap_or("4".to_string()); - let parallel = config_manager.get_config(&default_bot_id, "llm-server-parallel", None).unwrap_or("1".to_string()); - let cont_batching = config_manager.get_config(&default_bot_id, "llm-server-cont-batching", None).unwrap_or("true".to_string()); - let mlock = config_manager.get_config(&default_bot_id, "llm-server-mlock", None).unwrap_or("true".to_string()); - let no_mmap = config_manager.get_config(&default_bot_id, "llm-server-no-mmap", None).unwrap_or("true".to_string()); - let gpu_layers = config_manager.get_config(&default_bot_id, "llm-server-gpu-layers", None).unwrap_or("20".to_string()); - let reasoning_format = config_manager.get_config(&default_bot_id, "llm-server-reasoning-format", None).unwrap_or("".to_string()); - let n_predict = config_manager.get_config(&default_bot_id, "llm-server-n-predict", None).unwrap_or("50".to_string()); + let mut conn = conn.get().unwrap(); + let default_bot_id = bots + .filter(name.eq("default")) + .select(id) + .first::(&mut *conn) + .unwrap_or_else(|_| uuid::Uuid::nil()); + let n_moe = config_manager + .get_config(&default_bot_id, "llm-server-n-moe", None) + .unwrap_or("4".to_string()); + let parallel = config_manager + .get_config(&default_bot_id, "llm-server-parallel", None) + .unwrap_or("1".to_string()); + let cont_batching = config_manager + .get_config(&default_bot_id, "llm-server-cont-batching", None) + .unwrap_or("true".to_string()); + let mlock = config_manager + .get_config(&default_bot_id, "llm-server-mlock", None) + .unwrap_or("true".to_string()); + let no_mmap = config_manager + .get_config(&default_bot_id, "llm-server-no-mmap", None) + .unwrap_or("true".to_string()); + let gpu_layers = config_manager + .get_config(&default_bot_id, "llm-server-gpu-layers", None) + .unwrap_or("20".to_string()); + let reasoning_format = config_manager + .get_config(&default_bot_id, "llm-server-reasoning-format", None) + .unwrap_or("".to_string()); + let n_predict = config_manager + .get_config(&default_bot_id, "llm-server-n-predict", None) + .unwrap_or("50".to_string()); let mut args = format!( "-m {} --host 0.0.0.0 --port {} --reasoning-format deepseek --top_p 0.95 --temp 0.6 --repeat-penalty 1.2 --n-gpu-layers {}", model_path, port, gpu_layers @@ -199,15 +264,21 @@ pub async fn start_llm_server( "cd {} && .\\llama-server.exe {} --verbose>llm-stdout.log", llama_cpp_path, args )); - info!("Executing LLM server command: cd {} && .\\llama-server.exe {} --verbose", llama_cpp_path, args); + info!( + "Executing LLM server command: cd {} && .\\llama-server.exe {} --verbose", + llama_cpp_path, args + ); cmd.spawn()?; } else { let mut cmd = tokio::process::Command::new("sh"); cmd.arg("-c").arg(format!( "cd {} && ./llama-server {} --verbose >../../../../logs/llm/stdout.log 2>&1 &", - llama_cpp_path, args + llama_cpp_path, args )); - info!("Executing LLM server command: cd {} && ./llama-server {} --verbose", llama_cpp_path, args); + info!( + "Executing LLM server command: cd {} && ./llama-server {} --verbose", + llama_cpp_path, args + ); cmd.spawn()?; } Ok(()) diff --git a/src/llm/mod.rs b/src/llm/mod.rs index 734d2b8c..f8d0cb27 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -10,21 +10,15 @@ pub trait LLMProvider: Send + Sync { &self, prompt: &str, config: &Value, + model: &str, ) -> Result>; async fn generate_stream( &self, prompt: &str, config: &Value, tx: mpsc::Sender, + model: &str ) -> Result<(), Box>; - async fn summarize( - &self, - text: &str, - ) -> Result> { - let prompt = format!("Summarize the following conversation while preserving key details:\n\n{}", text); - self.generate(&prompt, &serde_json::json!({"max_tokens": 500})) - .await - } async fn cancel_job( &self, session_id: &str, @@ -34,6 +28,7 @@ pub struct OpenAIClient { client: reqwest::Client, api_key: String, base_url: String, + } #[async_trait] impl LLMProvider for OpenAIClient { @@ -41,6 +36,7 @@ impl LLMProvider for OpenAIClient { &self, prompt: &str, messages: &Value, + model: &str, ) -> Result> { let default_messages = serde_json::json!([{"role": "user", "content": prompt}]); let response = self @@ -48,7 +44,7 @@ impl LLMProvider for OpenAIClient { .post(&format!("{}/v1/chat/completions", self.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&serde_json::json!({ - "model": "gpt-3.5-turbo", + "model": model, "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() { messages } else { @@ -74,6 +70,7 @@ impl LLMProvider for OpenAIClient { prompt: &str, messages: &Value, tx: mpsc::Sender, + model: &str ) -> Result<(), Box> { let default_messages = serde_json::json!([{"role": "user", "content": prompt}]); let response = self @@ -81,7 +78,7 @@ impl LLMProvider for OpenAIClient { .post(&format!("{}/v1/chat/completions", self.base_url)) .header("Authorization", format!("Bearer {}", self.api_key)) .json(&serde_json::json!({ - "model": "gpt-3.5-turbo", + "model": model.clone(), "messages": if messages.is_array() && !messages.as_array().unwrap().is_empty() { info!("Using provided messages: {:?}", messages); messages diff --git a/templates/default.gbai/default.gbot/config.csv b/templates/default.gbai/default.gbot/config.csv index 5982c283..797f0ac6 100644 --- a/templates/default.gbai/default.gbot/config.csv +++ b/templates/default.gbai/default.gbot/config.csv @@ -1,20 +1,20 @@ name,value - +, server_host,0.0.0.0 server_port,8080 sites_root,/tmp - +, llm-key,none llm-url,http://localhost:8081 llm-model,../../../../data/llm/DeepSeek-R1-Distill-Qwen-1.5B-Q3_K_M.gguf - +, prompt-compact,4 - +, mcp-server,false - +, embedding-url,http://localhost:8082 embedding-model,../../../../data/llm/bge-small-en-v1.5-f32.gguf - +, llm-server,false llm-server-path,botserver-stack/bin/llm/build/bin llm-server-host,0.0.0.0 @@ -27,14 +27,14 @@ llm-server-parallel,6 llm-server-cont-batching,true llm-server-mlock,false llm-server-no-mmap,false - - + , +, email-from,from@domain.com email-server,mail.domain.com email-port,587 email-user,user@domain.com email-pass, - +, custom-server,localhost custom-port,5432 custom-database,mycustomdb