Add async Redis context storage and session fetch

- Use `tokio::spawn` to run Redis SET for `SET_CONTEXT` in a background
  task with detailed tracing.
- Move context retrieval from `BotOrchestrator` to `SessionManager`,
  inserting it as a system message in conversation history.
- Remove redundant Redis fetch logic from `BotOrchestrator`.
- Update `DEV.md` to install `valkey-cli`, reorder cargo tools, and
  adjust apt commands.
- Add a `SET_CONTEXT "azul bolinha"` example to the announcements
  template.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-10-15 00:01:48 -03:00
parent d3c486094f
commit e144e013d7
5 changed files with 120 additions and 80 deletions

View file

@ -9,17 +9,17 @@ ADD Manual: Claude/DeepSeek -> DeepSeek
curl -sSL https://get.livekit.io | bash
livekit-server --dev
# Util
cargo install cargo-audit
cargo install cargo-edit
apt install -y xclip valkey-cli
# Util
cargo upgrade
cargo audit
apt install xclip
valkey-cli -p 6379 monitor
# Prompt add-ons

View file

@ -162,31 +162,73 @@ pub fn set_context_keyword(state: &AppState, user: UserSession, engine: &mut Eng
engine
.register_custom_syntax(&["SET_CONTEXT", "$expr$"], true, move |context, inputs| {
// Evaluate the expression that should be stored in the context.
let context_value = context.eval_expression_tree(&inputs[0])?.to_string();
info!("SET CONTEXT command executed: {}", context_value);
// Build the Redis key using the user ID and the session ID.
let redis_key = format!("context:{}:{}", user.user_id, user.id);
log::trace!(
target: "app::set_context",
"Constructed Redis key: {} for user {} and session {}",
redis_key,
user.user_id,
user.id
);
let cache_clone = cache.clone();
// If a Redis client is configured, perform the SET operation in a background task.
if let Some(cache_client) = &cache {
log::trace!("Redis client is available, preparing to set context value");
// Clone the values we need inside the async block.
let cache_client = cache_client.clone();
let redis_key = redis_key.clone();
let context_value = context_value.clone();
log::trace!(
"Cloned cache_client, redis_key ({}) and context_value (len={}) for async task",
redis_key,
context_value.len()
);
if let Some(cache_client) = &cache_clone {
let mut conn = match futures::executor::block_on(
cache_client.get_multiplexed_async_connection(),
) {
Ok(conn) => conn,
Err(e) => {
error!("Failed to connect to cache: {}", e);
return Ok(Dynamic::UNIT);
}
};
// Spawn a task so we don't need an async closure here.
tokio::spawn(async move {
log::trace!("Async task started for SET_CONTEXT operation");
// Acquire an async Redis connection.
let mut conn = match cache_client.get_multiplexed_async_connection().await {
Ok(conn) => {
log::trace!("Successfully acquired async Redis connection");
conn
}
Err(e) => {
error!("Failed to connect to cache: {}", e);
log::trace!("Aborting SET_CONTEXT task due to connection error");
return;
}
};
let _: Result<(), _> = futures::executor::block_on(
redis::cmd("SET")
// Perform the SET command.
log::trace!(
"Executing Redis SET command with key: {} and value length: {}",
redis_key,
context_value.len()
);
let result: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&redis_key)
.arg(&context_value)
.query_async(&mut conn),
);
.query_async(&mut conn)
.await;
match result {
Ok(_) => {
log::trace!("Successfully set context in Redis for key {}", redis_key);
}
Err(e) => {
error!("Failed to set cache value: {}", e);
log::trace!("SET_CONTEXT Redis SET command failed");
}
}
});
} else {
log::trace!("No Redis client configured; SET_CONTEXT will not persist to cache");
}
Ok(Dynamic::UNIT)

View file

@ -5,7 +5,7 @@ use actix_web::{web, HttpRequest, HttpResponse, Result};
use actix_ws::Message as WsMessage;
use chrono::Utc;
use log::{debug, error, info, warn};
use redis::Commands;
use serde_json;
use std::collections::HashMap;
use std::fs;
@ -502,57 +502,13 @@ impl BotOrchestrator {
session_id: Uuid,
user_id: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn std::error::Error + Send + Sync>> {
let redis_key = format!("context:{}:{}", user_id, session_id);
// Fetch context from Redis and append to history on first retrieval
let redis_context = if let Some(redis_client) = &self.state.redis_client {
let conn = redis_client
.get_connection()
.map_err(|e| {
warn!("Failed to get Redis connection: {}", e);
e
})
.ok();
if let Some(mut connection) = conn {
match connection.get::<_, Option<String>>(&redis_key) {
Ok(Some(context)) => {
info!(
"Retrieved context from Redis for key {}: {} chars",
redis_key,
context.len()
);
Some(context)
}
Ok(None) => {
debug!("No context found in Redis for key {}", redis_key);
None
}
Err(e) => {
warn!("Failed to retrieve context from Redis: {}", e);
None
}
}
} else {
None
}
} else {
None
};
info!(
"Getting conversation history for session {} user {}",
session_id, user_id
);
let mut session_manager = self.state.session_manager.lock().await;
let history = session_manager.get_conversation_history(session_id, user_id)?;
if let Some(context) = redis_context {
let mut result = vec![("system".to_string(), context)];
result.extend(history);
Ok(result)
} else {
Ok(history)
}
Ok(history)
}
pub async fn run_start_script(

View file

@ -219,6 +219,45 @@ impl SessionManager {
_uid: Uuid,
) -> Result<Vec<(String, String)>, Box<dyn Error + Send + Sync>> {
use crate::shared::models::message_history::dsl::*;
use redis::Commands; // Import trait that provides the `get` method
let redis_key = format!("context:{}:{}", _uid, sess_id);
// Fetch context from Redis and append to history on first retrieval
let redis_context = if let Some(redis_client) = &self.redis {
let conn = redis_client
.get_connection()
.map_err(|e| {
warn!("Failed to get Redis connection: {}", e);
e
})
.ok();
if let Some(mut connection) = conn {
match connection.get::<_, Option<String>>(&redis_key) {
Ok(Some(context)) => {
info!(
"Retrieved context from Redis for key {}: {} chars",
redis_key,
context.len()
);
Some(context)
}
Ok(None) => {
debug!("No context found in Redis for key {}", redis_key);
None
}
Err(e) => {
warn!("Failed to retrieve context from Redis: {}", e);
None
}
}
} else {
None
}
} else {
None
};
let messages = message_history
.filter(session_id.eq(sess_id))
@ -226,18 +265,22 @@ impl SessionManager {
.select((role, content_encrypted))
.load::<(i32, String)>(&mut self.conn)?;
let history = messages
.into_iter()
.map(|(other_role, content)| {
let role_str = match other_role {
0 => "user".to_string(),
1 => "assistant".to_string(),
_ => "unknown".to_string(),
};
(role_str, content)
})
.collect();
// Build conversation history, inserting Redis context as a system (role 2) message if it exists
let mut history: Vec<(String, String)> = Vec::new();
if let Some(ctx) = redis_context {
history.push(("system".to_string(), ctx));
}
for (other_role, content) in messages {
let role_str = match other_role {
0 => "user".to_string(),
1 => "assistant".to_string(),
2 => "system".to_string(),
_ => "unknown".to_string(),
};
history.push((role_str, content));
}
Ok(history)
}

View file

@ -1,8 +1,7 @@
TALK "Olá, estou preparando um resumo para você."
SET_CONTEXT "azul bolinha"
REM text = GET "default.pdf"
REM resume = LLM "Say Hello and present a a resume from " + text
REM TALK resume
REM SET_CONTEXT text