feat(automation): add LLM server readiness check and improve user session handling

- Added LLM server readiness check in AutomationService before starting tasks
- Renamed `user` parameter to `user_session` in execute_talk for clarity
- Updated BotResponse fields to use user_session data instead of hardcoded values
- Improved Redis key generation in execute_talk to use user_session fields
- Removed commented Redis code in set_current_context_keyword

The changes ensure proper initialization of automation tasks by checking LLM server availability first, and improve code clarity by using more descriptive variable names for user session data.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-02 23:54:14 -03:00
parent 1d7d0e10c0
commit a5bfda4d09
8 changed files with 166 additions and 126 deletions

View file

@ -1,3 +1,4 @@
use crate::config::ConfigManager;
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
use crate::basic::ScriptService;
@ -33,6 +34,38 @@ impl AutomationService {
tokio::task::spawn_local({
let service = service.clone();
async move {
// Check if llama servers are ready before starting
let config_manager = ConfigManager::new(Arc::clone(&service.state.conn));
let default_bot_id = {
let mut conn = service.state.conn.lock().unwrap();
bots.filter(name.eq("default"))
.select(id)
.first::<uuid::Uuid>(&mut *conn)
.unwrap_or_else(|_| uuid::Uuid::nil())
};
let llm_url = match config_manager.get_config(&default_bot_id, "llm-url", None) {
Ok(url) => url,
Err(e) => {
error!("Failed to get llm-url config: {}", e);
return;
}
};
let embedding_url = match config_manager.get_config(&default_bot_id, "embedding-url", None) {
Ok(url) => url,
Err(e) => {
error!("Failed to get embedding-url config: {}", e);
return;
}
};
if !crate::llm::local::is_server_running(&llm_url).await ||
!crate::llm::local::is_server_running(&embedding_url).await {
trace!("LLM servers not ready - llm: {}, embedding: {}", llm_url, embedding_url);
return;
}
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut last_check = Utc::now();
loop {

View file

@ -59,7 +59,7 @@ pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
.unwrap();
}
pub async fn execute_talk(state: Arc<AppState>, user: UserSession, message: String) -> Result<BotResponse, Box<dyn std::error::Error>> {
pub async fn execute_talk(state: Arc<AppState>, user_session: UserSession, message: String) -> Result<BotResponse, Box<dyn std::error::Error>> {
info!("Executing TALK with message: {}", message);
debug!("TALK: Sending message: {}", message);
@ -68,7 +68,7 @@ pub async fn execute_talk(state: Arc<AppState>, user: UserSession, message: Stri
if let Some(redis_client) = &state.cache {
let mut conn: redis::aio::MultiplexedConnection = redis_client.get_multiplexed_async_connection().await?;
let redis_key = format!("suggestions:{}:{}", user.user_id, user.id);
let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id);
debug!("Loading suggestions from Redis key: {}", redis_key);
let suggestions_json: Result<Vec<String>, _> = redis::cmd("LRANGE")
.arg(redis_key.as_str())
@ -92,9 +92,9 @@ pub async fn execute_talk(state: Arc<AppState>, user: UserSession, message: Stri
}
let response = BotResponse {
bot_id: "default_bot".to_string(),
user_id: "default_user".to_string(),
session_id: user.id.to_string(),
bot_id: user_session.bot_id.to_string(),
user_id: user_session.user_id.to_string(),
session_id: user_session.id.to_string(),
channel: "web".to_string(),
content: message,
message_type: 1,
@ -106,7 +106,7 @@ pub async fn execute_talk(state: Arc<AppState>, user: UserSession, message: Stri
context_max_length: 0,
};
let user_id = user.id.to_string();
let user_id = user_session.id.to_string();
let response_clone = response.clone();
match state.response_channels.try_lock() {

View file

@ -48,84 +48,19 @@ pub fn set_current_context_keyword(state: Arc<AppState>, user: UserSession, engi
context_name
);
// If a Redis client is configured, perform the SET operation asynchronously.
if let Some(cache_client) = &cache {
trace!("Redis client is available, preparing to set context value");
// Clone values needed inside the async block.
let cache_client = cache_client.clone();
let redis_key = redis_key.clone();
let context_value = context_value.clone();
let context_name = context_name.clone();
trace!(
"Cloned cache_client, redis_key ({}) and context_value (len={}) for async task",
redis_key,
context_value.len()
);
// Spawn a background task so we don't need an async closure here.
tokio::spawn(async move {
trace!("Async task started for SET_CURRENT_CONTEXT operation");
// Acquire an async Redis connection.
let mut conn = match cache_client.get_multiplexed_async_connection().await {
Ok(conn) => {
trace!("Successfully acquired async Redis connection");
conn
// Use session manager to update context
let state = state.clone();
let user = user.clone();
let context_value = context_value.clone();
tokio::spawn(async move {
if let Err(e) = state.session_manager.lock().await.update_session_context(
&user.id,
&user.user_id,
context_value
).await {
error!("Failed to update session context: {}", e);
}
Err(e) => {
error!("Failed to connect to cache: {}", e);
trace!("Aborting SET_CURRENT_CONTEXT task due to connection error");
return;
}
};
// Perform the SET command for the context value.
trace!(
"Executing Redis SET command with key: {} and value length: {}",
redis_key,
context_value.len()
);
let set_result: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&redis_key)
.arg(&context_value)
.query_async(&mut conn)
.await;
match set_result {
Ok(_) => {
trace!("Successfully set context in Redis for key {}", redis_key);
}
Err(e) => {
error!("Failed to set cache value: {}", e);
trace!("SET_CURRENT_CONTEXT Redis SET command failed");
return;
}
}
// Mark the context as active in a separate hash.
let active_key = format!("active_context:{}:{}", user.user_id, user.id);
trace!("Setting active flag for context {} in hash {}", context_name, active_key);
let hset_result: Result<i64, redis::RedisError> = redis::cmd("HSET")
.arg(&active_key)
.arg(&context_name)
.arg("active")
.query_async(&mut conn)
.await;
match hset_result {
Ok(fields_added) => {
trace!("Active flag set for context {} (fields added: {})", context_name, fields_added);
}
Err(e) => {
error!("Failed to set active flag for context {}: {}", context_name, e);
}
}
});
} else {
trace!("No Redis client configured; SET_CURRENT_CONTEXT will not persist to cache");
}
});
Ok(Dynamic::UNIT)
},

View file

@ -17,6 +17,17 @@ pub fn execute_set_schedule(
cron, script_name, bot_uuid
);
// First check if bot exists
use crate::shared::models::bots::dsl::bots;
let bot_exists: bool = diesel::select(diesel::dsl::exists(
bots.filter(crate::shared::models::bots::dsl::id.eq(bot_uuid))
))
.get_result(conn)?;
if !bot_exists {
return Err(format!("Bot with id {} does not exist", bot_uuid).into());
}
use crate::shared::models::system_automations::dsl::*;
let new_automation = (

View file

@ -335,14 +335,22 @@ impl BotOrchestrator {
session_id, context_name
);
let mut session_manager = self.state.session_manager.lock().await;
session_manager
.update_session_context(
&Uuid::parse_str(session_id)?,
&Uuid::parse_str(user_id)?,
context_name.to_string(),
)
.await?;
// Use session manager to update context
let session_uuid = Uuid::parse_str(session_id).map_err(|e| {
error!("Failed to parse session_id: {}", e);
e
})?;
let user_uuid = Uuid::parse_str(user_id).map_err(|e| {
error!("Failed to parse user_id: {}", e);
e
})?;
if let Err(e) = self.state.session_manager.lock().await.update_session_context(
&session_uuid,
&user_uuid,
context_name.to_string()
).await {
error!("Failed to update session context: {}", e);
}
// Send confirmation back to client
let confirmation = BotResponse {
@ -458,17 +466,12 @@ impl BotOrchestrator {
)?;
}
let response_content = self.direct_mode_handler(&message, &session).await?;
{
let mut session_manager = self.state.session_manager.lock().await;
session_manager.save_message(session.id, user_id, 2, &response_content, 1)?;
}
// Handle context change messages (type 4) first
// Handle context change messages (type 4) immediately
// before any other processing
if message.message_type == 4 {
if let Some(context_name) = &message.context_name {
return self
self
.handle_context_change(
&message.user_id,
&message.bot_id,
@ -476,11 +479,20 @@ impl BotOrchestrator {
&message.channel,
context_name,
)
.await;
.await?;
}
}
// Create regular response
let response_content = self.direct_mode_handler(&message, &session).await?;
{
let mut session_manager = self.state.session_manager.lock().await;
session_manager.save_message(session.id, user_id, 2, &response_content, 1)?;
}
// Create regular response for non-context-change messages
let channel = message.channel.clone();
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
let max_context_size = config_manager
@ -528,7 +540,7 @@ impl BotOrchestrator {
let context_data = {
let session_manager = self.state.session_manager.lock().await;
session_manager
.get_session_context(&session.id, &session.user_id)
.get_session_context_data(&session.id, &session.user_id)
.await?
};
@ -721,7 +733,7 @@ impl BotOrchestrator {
let context_data = {
let session_manager = self.state.session_manager.lock().await;
session_manager
.get_session_context(&session.id, &session.user_id)
.get_session_context_data(&session.id, &session.user_id)
.await?
};
@ -1306,17 +1318,26 @@ async fn websocket_handler(
session_id: session_id_clone2.clone(),
channel: "web".to_string(),
content,
message_type: 1,
message_type: json_value["message_type"]
.as_u64()
.unwrap_or(1) as i32,
media_url: None,
timestamp: Utc::now(),
context_name: None,
context_name: json_value["context_name"]
.as_str()
.map(|s| s.to_string()),
};
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
error!(
"Error processing WebSocket message {}: {}",
message_count, e
);
// First try processing as a regular message
match orchestrator.process_message(user_message.clone()).await {
Ok(_) => (),
Err(e) => {
error!("Failed to process message: {}", e);
// Fall back to streaming if processing fails
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
error!("Failed to stream response: {}", e);
}
}
}
}
WsMessage::Close(reason) => {

View file

@ -1,3 +1,5 @@
use crate::shared::models::schema::bots::dsl::*;
use diesel::prelude::*;
use crate::basic::compiler::BasicCompiler;
use crate::config::ConfigManager;
use crate::kb::embeddings;
@ -42,6 +44,39 @@ impl DriveMonitor {
"Drive Monitor service started for bucket: {}",
self.bucket_name
);
// Check if llama servers are ready before first scan
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
let default_bot_id = {
let mut conn = self.state.conn.lock().unwrap();
bots.filter(name.eq("default"))
.select(id)
.first::<uuid::Uuid>(&mut *conn)
.unwrap_or_else(|_| uuid::Uuid::nil())
};
let llm_url = match config_manager.get_config(&default_bot_id, "llm-url", None) {
Ok(url) => url,
Err(e) => {
error!("Failed to get llm-url config: {}", e);
return;
}
};
let embedding_url = match config_manager.get_config(&default_bot_id, "embedding-url", None) {
Ok(url) => url,
Err(e) => {
error!("Failed to get embedding-url config: {}", e);
return;
}
};
if !crate::llm::local::is_server_running(&llm_url).await ||
!crate::llm::local::is_server_running(&embedding_url).await {
trace!("LLM servers not ready - llm: {}, embedding: {}", llm_url, embedding_url);
return;
}
let mut tick = interval(Duration::from_secs(30));
loop {
tick.tick().await;

View file

@ -218,20 +218,23 @@ impl SessionManager {
pub async fn update_session_context(
&mut self,
session_id: &Uuid,
_user_id: &Uuid,
context_name: String,
user_id: &Uuid,
context_data: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
use crate::shared::models::schema::user_sessions::dsl::*;
use diesel::prelude::*;
diesel::update(user_sessions.filter(id.eq(session_id).and(user_id.eq(user_id))))
.set(context_data.eq(serde_json::json!({ "current_context": context_name })))
.execute(&mut self.conn)?;
use redis::Commands;
let redis_key = format!("context:{}:{}", user_id, session_id);
if let Some(redis_client) = &self.redis {
let mut conn = redis_client.get_connection()?;
conn.set(&redis_key, &context_data)?;
info!("Updated context in Redis for key {}", redis_key);
} else {
warn!("No Redis client configured, context not persisted");
}
Ok(())
}
pub async fn get_session_context(
pub async fn get_session_context_data(
&self,
session_id: &Uuid,
user_id: &Uuid,
@ -241,11 +244,11 @@ impl SessionManager {
let redis_key = format!("context:{}:{}", user_id, session_id);
if let Some(redis_client) = &self.redis {
// Attempt to obtain a Redis connection; log and ignore errors, returning `None`.
// Attempt to obtain a Redis connection; log and ignore errors
let conn_option = redis_client
.get_connection()
.map_err(|e| {
warn!("Failed to get Redis connection: {}", e);
warn!("Failed to get Cache connection: {}", e);
e
})
.ok();
@ -254,22 +257,23 @@ impl SessionManager {
match connection.get::<_, Option<String>>(&redis_key) {
Ok(Some(context)) => {
debug!(
"Retrieved context from Redis for key {}: {} chars",
"Retrieved context from Cache for key {}: {} chars",
redis_key,
context.len()
);
return Ok(context);
}
Ok(None) => {
debug!("No context found in Redis for key {}", redis_key);
debug!("No context found in Cache for key {}", redis_key);
}
Err(e) => {
warn!("Failed to retrieve context from Redis: {}", e);
warn!("Failed to retrieve context from Cache: {}", e);
}
}
}
}
// If Redis is unavailable or the key is missing, return an empty context.
// If no context found, return empty string
Ok(String::new())
}

View file

@ -1728,7 +1728,8 @@
pendingContextChange = new Promise((resolve) => {
const handler = (event) => {
const response = JSON.parse(event.data);
if (response.message_type === 5 && response.context_name === context) {
if (response.message_type === 5 &&
response.context_name === context) {
ws.removeEventListener('message', handler);
resolve();
}