refactor(state): rename resource clients and improve keyword syntax
Updated references from `redis_client`, `s3_client`, and `custom_conn` to unified names `cache`, `drive`, and `conn` for consistency across modules. Adjusted `add_suggestion_keyword` to use clearer parameter naming and enhanced custom syntax registration for better readability and maintainability.
This commit is contained in:
parent
fda0b0c9e8
commit
0342e1cac9
21 changed files with 405 additions and 229 deletions
|
|
@ -164,7 +164,7 @@ impl AuthService {
|
|||
|
||||
#[actix_web::get("/api/auth")]
|
||||
async fn auth_handler(
|
||||
req: HttpRequest,
|
||||
_req: HttpRequest,
|
||||
data: web::Data<AppState>,
|
||||
web::Query(params): web::Query<HashMap<String, String>>,
|
||||
) -> Result<HttpResponse> {
|
||||
|
|
|
|||
|
|
@ -285,7 +285,7 @@ impl AutomationService {
|
|||
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||
trace!("Redis key for job tracking: {}", redis_key);
|
||||
|
||||
if let Some(redis_client) = &self.state.redis_client {
|
||||
if let Some(redis_client) = &self.state.cache {
|
||||
match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(mut conn) => {
|
||||
trace!("Connected to Redis; checking if job '{}' is running", param);
|
||||
|
|
@ -331,7 +331,7 @@ impl AutomationService {
|
|||
e
|
||||
);
|
||||
|
||||
if let Some(client) = &self.state.s3_client {
|
||||
if let Some(client) = &self.state.drive {
|
||||
let bucket_name = format!(
|
||||
"{}{}.gbai",
|
||||
env::var("MINIO_ORG_PREFIX").unwrap_or_else(|_| "org1_".to_string()),
|
||||
|
|
@ -436,7 +436,7 @@ impl AutomationService {
|
|||
);
|
||||
let redis_key = format!("job:running:{}:{}", bot_id, param);
|
||||
|
||||
if let Some(redis_client) = &self.state.redis_client {
|
||||
if let Some(redis_client) = &self.state.cache {
|
||||
match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(mut conn) => {
|
||||
let _: Result<(), redis::RedisError> = redis::cmd("DEL")
|
||||
|
|
|
|||
|
|
@ -5,11 +5,11 @@ use rhai::{Dynamic, Engine};
|
|||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub fn add_suggestion_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let cache = state.redis_client.clone();
|
||||
pub fn add_suggestion_keyword(state: Arc<AppState>, user_session: UserSession, engine: &mut Engine) {
|
||||
let cache = state.cache.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(&["ADD_SUGGESTION", "$expr$", "$expr$"], true, move |context, inputs| {
|
||||
.register_custom_syntax(&["ADD_SUGGESTION", "$expr$", "AS", "$expr$"], true, move |context, inputs| {
|
||||
let context_name = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
let button_text = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||
|
||||
|
|
@ -17,7 +17,7 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user: UserSession, engine: &
|
|||
|
||||
if let Some(cache_client) = &cache {
|
||||
let cache_client = cache_client.clone();
|
||||
let redis_key = format!("suggestions:{}:{}", user.user_id, user.id);
|
||||
let redis_key = format!("suggestions:{}:{}", user_session.user_id, user_session.id);
|
||||
let suggestion = json!({ "context": context_name, "text": button_text });
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
|
@ -41,7 +41,7 @@ pub fn add_suggestion_keyword(state: Arc<AppState>, user: UserSession, engine: &
|
|||
debug!("Suggestion added successfully to Redis key {}, new length: {}", redis_key, length);
|
||||
|
||||
// Also register context as inactive initially
|
||||
let active_key = format!("active_context:{}:{}", user.user_id, user.id);
|
||||
let active_key = format!("active_context:{}:{}", user_session.user_id, user_session.id);
|
||||
let hset_result: Result<i64, redis::RedisError> = redis::cmd("HSET")
|
||||
.arg(&active_key)
|
||||
.arg(&context_name)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ use crate::shared::utils;
|
|||
use crate::shared::utils::to_array;
|
||||
|
||||
pub fn find_keyword(state: &AppState, _user: UserSession, engine: &mut Engine) {
|
||||
let connection = state.custom_conn.clone();
|
||||
let connection = state.conn.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(&["FIND", "$expr$", ",", "$expr$"], false, {
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ pub async fn get_from_bucket(
|
|||
return Err("Invalid file path".into());
|
||||
}
|
||||
|
||||
let client = state.s3_client.as_ref().ok_or("S3 client not configured")?;
|
||||
let client = state.drive.as_ref().ok_or("S3 client not configured")?;
|
||||
|
||||
let bucket_name = {
|
||||
let cfg = state
|
||||
|
|
|
|||
|
|
@ -1,9 +1,8 @@
|
|||
use crate::shared::models::{BotResponse, Suggestion, UserSession};
|
||||
use crate::shared::models::{BotResponse, UserSession};
|
||||
use crate::shared::state::AppState;
|
||||
use log::{debug, error, info};
|
||||
use rhai::{Dynamic, Engine, EvalAltResult};
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let session_id = user.id;
|
||||
|
|
@ -34,7 +33,7 @@ pub fn hear_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
|
|||
let mut session_manager = state_for_spawn.session_manager.lock().await;
|
||||
session_manager.mark_waiting(session_id_clone);
|
||||
|
||||
if let Some(redis_client) = &state_for_spawn.redis_client {
|
||||
if let Some(redis_client) = &state_for_spawn.cache {
|
||||
let mut conn = match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
|
|
@ -131,168 +130,3 @@ pub fn talk_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let state_clone = Arc::clone(&state);
|
||||
let user_clone = user.clone();
|
||||
engine
|
||||
.register_custom_syntax(&["SET_USER", "$expr$"], true, move |context, inputs| {
|
||||
let user_id_str = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
|
||||
info!("SET USER command executed with ID: {}", user_id_str);
|
||||
|
||||
match Uuid::parse_str(&user_id_str) {
|
||||
Ok(user_id) => {
|
||||
debug!("Successfully parsed user UUID: {}", user_id);
|
||||
|
||||
let state_for_spawn = Arc::clone(&state_clone);
|
||||
let user_clone_spawn = user_clone.clone();
|
||||
|
||||
let mut session_manager =
|
||||
futures::executor::block_on(state_for_spawn.session_manager.lock());
|
||||
|
||||
if let Err(e) = session_manager.update_user_id(user_clone_spawn.id, user_id) {
|
||||
error!("Failed to update user ID in session: {}", e);
|
||||
} else {
|
||||
info!(
|
||||
"Updated session {} to user ID: {}",
|
||||
user_clone_spawn.id, user_id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Invalid UUID format for SET USER: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
pub fn add_suggestion_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let user_clone = user.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(&["ADD_SUGGESTION", "$expr$", "$expr$", "$expr$"], true, move |context, inputs| {
|
||||
// Evaluate expressions: text, context_name
|
||||
let text = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
let context_name = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||
|
||||
info!("ADD_SUGGESTION command executed - text: {}, context: {}", text, context_name);
|
||||
|
||||
// Get current response channels
|
||||
let state_clone = Arc::clone(&state);
|
||||
let user_id = user_clone.id.to_string();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut response_channels = state_clone.response_channels.lock().await;
|
||||
if let Some(tx) = response_channels.get_mut(&user_id) {
|
||||
let suggestion = Suggestion {
|
||||
text,
|
||||
context_name,
|
||||
is_suggestion: true
|
||||
};
|
||||
|
||||
// Create a response with just this suggestion
|
||||
let response = BotResponse {
|
||||
bot_id: "system".to_string(),
|
||||
user_id: user_clone.user_id.to_string(),
|
||||
session_id: user_clone.id.to_string(),
|
||||
channel: "web".to_string(),
|
||||
content: String::new(),
|
||||
message_type: 3, // Special type for suggestions
|
||||
stream_token: None,
|
||||
is_complete: true,
|
||||
suggestions: vec![suggestion],
|
||||
};
|
||||
|
||||
if let Err(e) = tx.try_send(response) {
|
||||
error!("Failed to send suggestion: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn set_context_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let cache = state.redis_client.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(&["SET_CONTEXT", "$expr$", "$expr$"], true, move |context, inputs| {
|
||||
// Evaluate both expressions - first is context name, second is context value
|
||||
let context_name = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
let context_value = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||
|
||||
info!("SET CONTEXT command executed - name: {}, value: {}", context_name, context_value);
|
||||
// Build the Redis key using user ID, session ID and context name
|
||||
let redis_key = format!("context:{}:{}:{}", user.user_id, user.id, context_name);
|
||||
log::trace!(
|
||||
target: "app::set_context",
|
||||
"Constructed Redis key: {} for user {}, session {}, context {}",
|
||||
redis_key,
|
||||
user.user_id,
|
||||
user.id,
|
||||
context_name
|
||||
);
|
||||
|
||||
// If a Redis client is configured, perform the SET operation in a background task.
|
||||
if let Some(cache_client) = &cache {
|
||||
log::trace!("Redis client is available, preparing to set context value");
|
||||
// Clone the values we need inside the async block.
|
||||
let cache_client = cache_client.clone();
|
||||
let redis_key = redis_key.clone();
|
||||
let context_value = context_value.clone();
|
||||
log::trace!(
|
||||
"Cloned cache_client, redis_key ({}) and context_value (len={}) for async task",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
|
||||
// Spawn a task so we don't need an async closure here.
|
||||
tokio::spawn(async move {
|
||||
log::trace!("Async task started for SET_CONTEXT operation");
|
||||
// Acquire an async Redis connection.
|
||||
let mut conn = match cache_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => {
|
||||
log::trace!("Successfully acquired async Redis connection");
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect to cache: {}", e);
|
||||
log::trace!("Aborting SET_CONTEXT task due to connection error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Perform the SET command.
|
||||
log::trace!(
|
||||
"Executing Redis SET command with key: {} and value length: {}",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
let result: Result<(), redis::RedisError> = redis::cmd("SET")
|
||||
.arg(&redis_key)
|
||||
.arg(&context_value)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
log::trace!("Successfully set context in Redis for key {}", redis_key);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to set cache value: {}", e);
|
||||
log::trace!("SET_CONTEXT Redis SET command failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log::trace!("No Redis client configured; SET_CONTEXT will not persist to cache");
|
||||
}
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,9 @@ pub mod set_kb;
|
|||
pub mod set_schedule;
|
||||
pub mod wait;
|
||||
pub mod add_suggestion;
|
||||
pub mod set_user;
|
||||
pub mod set_context;
|
||||
pub mod set_current_context;
|
||||
|
||||
#[cfg(feature = "email")]
|
||||
pub mod create_draft_keyword;
|
||||
|
|
|
|||
106
src/basic/keywords/set_context.rs
Normal file
106
src/basic/keywords/set_context.rs
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
use std::sync::Arc;
|
||||
use log::{error, info, trace};
|
||||
use crate::shared::state::AppState;
|
||||
use crate::shared::models::UserSession;
|
||||
use rhai::Engine;
|
||||
use rhai::Dynamic;
|
||||
|
||||
pub fn set_context_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
// Clone the Redis client (if any) for use inside the async task.
|
||||
let cache = state.cache.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(
|
||||
&["SET_CONTEXT", "$expr$", "AS", "$expr$"],
|
||||
true,
|
||||
move |context, inputs| {
|
||||
// First expression is the context name, second is the value.
|
||||
let context_name = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
let context_value = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||
|
||||
info!(
|
||||
"SET CONTEXT command executed - name: {}, value: {}",
|
||||
context_name,
|
||||
context_value
|
||||
);
|
||||
|
||||
// Build a Redis key that is unique per user and session.
|
||||
let redis_key = format!(
|
||||
"context:{}:{}:{}",
|
||||
user.user_id,
|
||||
user.id,
|
||||
context_name
|
||||
);
|
||||
|
||||
trace!(
|
||||
target: "app::set_context",
|
||||
"Constructed Redis key: {} for user {}, session {}, context {}",
|
||||
redis_key,
|
||||
user.user_id,
|
||||
user.id,
|
||||
context_name
|
||||
);
|
||||
|
||||
// If a Redis client is configured, perform the SET operation asynchronously.
|
||||
if let Some(cache_client) = &cache {
|
||||
trace!("Redis client is available, preparing to set context value");
|
||||
|
||||
// Clone values needed inside the async block.
|
||||
let cache_client = cache_client.clone();
|
||||
let redis_key = redis_key.clone();
|
||||
let context_value = context_value.clone();
|
||||
|
||||
trace!(
|
||||
"Cloned cache_client, redis_key ({}) and context_value (len={}) for async task",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
|
||||
// Spawn a background task so we don't need an async closure here.
|
||||
tokio::spawn(async move {
|
||||
trace!("Async task started for SET_CONTEXT operation");
|
||||
|
||||
// Acquire an async Redis connection.
|
||||
let mut conn = match cache_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => {
|
||||
trace!("Successfully acquired async Redis connection");
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect to cache: {}", e);
|
||||
trace!("Aborting SET_CONTEXT task due to connection error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Perform the SET command.
|
||||
trace!(
|
||||
"Executing Redis SET command with key: {} and value length: {}",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
let result: Result<(), redis::RedisError> = redis::cmd("SET")
|
||||
.arg(&redis_key)
|
||||
.arg(&context_value)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
trace!("Successfully set context in Redis for key {}", redis_key);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to set cache value: {}", e);
|
||||
trace!("SET_CONTEXT Redis SET command failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
trace!("No Redis client configured; SET_CONTEXT will not persist to cache");
|
||||
}
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
135
src/basic/keywords/set_current_context.rs
Normal file
135
src/basic/keywords/set_current_context.rs
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
use std::sync::Arc;
|
||||
use log::{error, info, trace};
|
||||
use crate::shared::state::AppState;
|
||||
use crate::shared::models::UserSession;
|
||||
use rhai::Engine;
|
||||
use rhai::Dynamic;
|
||||
|
||||
/// Registers the `SET_CURRENT_CONTEXT` keyword which stores a context value in Redis
|
||||
/// and marks the context as active.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `state` – Shared application state (Arc<AppState>).
|
||||
/// * `user` – The current user session (provides user_id and session id).
|
||||
/// * `engine` – The script engine where the custom syntax will be registered.
|
||||
pub fn set_current_context_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
// Clone the Redis client (if any) for use inside the async task.
|
||||
let cache = state.cache.clone();
|
||||
|
||||
engine
|
||||
.register_custom_syntax(
|
||||
&["SET_CURRENT_CONTEXT", "$expr$", "AS", "$expr$"],
|
||||
true,
|
||||
move |context, inputs| {
|
||||
// First expression is the context name, second is the value.
|
||||
let context_name = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
let context_value = context.eval_expression_tree(&inputs[1])?.to_string();
|
||||
|
||||
info!(
|
||||
"SET_CURRENT_CONTEXT command executed - name: {}, value: {}",
|
||||
context_name,
|
||||
context_value
|
||||
);
|
||||
|
||||
// Build a Redis key that is unique per user and session.
|
||||
let redis_key = format!(
|
||||
"context:{}:{}:{}",
|
||||
user.user_id,
|
||||
user.id,
|
||||
context_name
|
||||
);
|
||||
|
||||
trace!(
|
||||
target: "app::set_current_context",
|
||||
"Constructed Redis key: {} for user {}, session {}, context {}",
|
||||
redis_key,
|
||||
user.user_id,
|
||||
user.id,
|
||||
context_name
|
||||
);
|
||||
|
||||
// If a Redis client is configured, perform the SET operation asynchronously.
|
||||
if let Some(cache_client) = &cache {
|
||||
trace!("Redis client is available, preparing to set context value");
|
||||
|
||||
// Clone values needed inside the async block.
|
||||
let cache_client = cache_client.clone();
|
||||
let redis_key = redis_key.clone();
|
||||
let context_value = context_value.clone();
|
||||
let context_name = context_name.clone();
|
||||
|
||||
trace!(
|
||||
"Cloned cache_client, redis_key ({}) and context_value (len={}) for async task",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
|
||||
// Spawn a background task so we don't need an async closure here.
|
||||
tokio::spawn(async move {
|
||||
trace!("Async task started for SET_CURRENT_CONTEXT operation");
|
||||
|
||||
// Acquire an async Redis connection.
|
||||
let mut conn = match cache_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => {
|
||||
trace!("Successfully acquired async Redis connection");
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect to cache: {}", e);
|
||||
trace!("Aborting SET_CURRENT_CONTEXT task due to connection error");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Perform the SET command for the context value.
|
||||
trace!(
|
||||
"Executing Redis SET command with key: {} and value length: {}",
|
||||
redis_key,
|
||||
context_value.len()
|
||||
);
|
||||
let set_result: Result<(), redis::RedisError> = redis::cmd("SET")
|
||||
.arg(&redis_key)
|
||||
.arg(&context_value)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match set_result {
|
||||
Ok(_) => {
|
||||
trace!("Successfully set context in Redis for key {}", redis_key);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to set cache value: {}", e);
|
||||
trace!("SET_CURRENT_CONTEXT Redis SET command failed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Mark the context as active in a separate hash.
|
||||
let active_key = format!("active_context:{}:{}", user.user_id, user.id);
|
||||
trace!("Setting active flag for context {} in hash {}", context_name, active_key);
|
||||
let hset_result: Result<i64, redis::RedisError> = redis::cmd("HSET")
|
||||
.arg(&active_key)
|
||||
.arg(&context_name)
|
||||
.arg("active")
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
|
||||
match hset_result {
|
||||
Ok(fields_added) => {
|
||||
trace!("Active flag set for context {} (fields added: {})", context_name, fields_added);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to set active flag for context {}: {}", context_name, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
trace!("No Redis client configured; SET_CURRENT_CONTEXT will not persist to cache");
|
||||
}
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
44
src/basic/keywords/set_user.rs
Normal file
44
src/basic/keywords/set_user.rs
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
use crate::shared::state::AppState;
|
||||
use crate::shared::models::UserSession;
|
||||
use log::{debug, error, info};
|
||||
use rhai::{Dynamic, Engine};
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn set_user_keyword(state: Arc<AppState>, user: UserSession, engine: &mut Engine) {
|
||||
let state_clone = Arc::clone(&state);
|
||||
let user_clone = user.clone();
|
||||
engine
|
||||
.register_custom_syntax(&["SET_USER", "$expr$"], true, move |context, inputs| {
|
||||
let user_id_str = context.eval_expression_tree(&inputs[0])?.to_string();
|
||||
|
||||
info!("SET USER command executed with ID: {}", user_id_str);
|
||||
|
||||
match Uuid::parse_str(&user_id_str) {
|
||||
Ok(user_id) => {
|
||||
debug!("Successfully parsed user UUID: {}", user_id);
|
||||
|
||||
let state_for_spawn = Arc::clone(&state_clone);
|
||||
let user_clone_spawn = user_clone.clone();
|
||||
|
||||
let mut session_manager =
|
||||
futures::executor::block_on(state_for_spawn.session_manager.lock());
|
||||
|
||||
if let Err(e) = session_manager.update_user_id(user_clone_spawn.id, user_id) {
|
||||
error!("Failed to update user ID in session: {}", e);
|
||||
} else {
|
||||
info!(
|
||||
"Updated session {} to user ID: {}",
|
||||
user_clone_spawn.id, user_id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Invalid UUID format for SET USER: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Dynamic::UNIT)
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
use crate::basic::keywords::set_user::set_user_keyword;
|
||||
use crate::shared::models::UserSession;
|
||||
use crate::shared::state::AppState;
|
||||
use log::info;
|
||||
|
|
@ -17,9 +18,8 @@ use self::keywords::first::first_keyword;
|
|||
use self::keywords::for_next::for_keyword;
|
||||
use self::keywords::format::format_keyword;
|
||||
use self::keywords::get::get_keyword;
|
||||
use self::keywords::hear_talk::{
|
||||
hear_keyword, set_context_keyword, set_user_keyword, talk_keyword,
|
||||
};
|
||||
use self::keywords::hear_talk::{hear_keyword, talk_keyword};
|
||||
use self::keywords::set_context::set_context_keyword;
|
||||
use self::keywords::last::last_keyword;
|
||||
use self::keywords::list_tools::list_tools_keyword;
|
||||
use self::keywords::llm_keyword::llm_keyword;
|
||||
|
|
|
|||
|
|
@ -551,7 +551,7 @@ impl BotOrchestrator {
|
|||
);
|
||||
|
||||
// Get suggestions from Redis
|
||||
let suggestions = if let Some(redis) = &self.state.redis_client {
|
||||
let suggestions = if let Some(redis) = &self.state.cache {
|
||||
let mut conn = redis.get_multiplexed_async_connection().await?;
|
||||
let redis_key = format!("suggestions:{}:{}", message.user_id, message.session_id);
|
||||
let suggestions: Vec<String> = redis::cmd("LRANGE")
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ impl DriveMonitor {
|
|||
}
|
||||
|
||||
async fn check_for_changes(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let client = match &self.state.s3_client {
|
||||
let client = match &self.state.drive {
|
||||
Some(client) => client,
|
||||
None => {
|
||||
return Ok(());
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ pub async fn upload_file(
|
|||
let file_name = file_name.unwrap_or_else(|| "unnamed_file".to_string());
|
||||
let temp_file_path = temp_file.into_temp_path();
|
||||
|
||||
let client = state.get_ref().s3_client.as_ref().ok_or_else(|| {
|
||||
let client = state.get_ref().drive.as_ref().ok_or_else(|| {
|
||||
actix_web::error::ErrorInternalServerError("S3 client is not initialized")
|
||||
})?;
|
||||
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ pub async fn get_file_content(
|
|||
|
||||
impl MinIOHandler {
|
||||
pub fn new(state: Arc<AppState>) -> Self {
|
||||
let client = state.s3_client.as_ref().expect("S3 client must be initialized").clone();
|
||||
let client = state.drive.as_ref().expect("S3 client must be initialized").clone();
|
||||
Self {
|
||||
state: Arc::clone(&state),
|
||||
s3: Arc::new(client),
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ impl KBManager {
|
|||
&self,
|
||||
collection: &KBCollection,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let _client = match &self.state.s3_client {
|
||||
let _client = match &self.state.drive {
|
||||
Some(client) => client,
|
||||
None => {
|
||||
warn!("S3 client not configured");
|
||||
|
|
@ -117,7 +117,7 @@ impl KBManager {
|
|||
file_size: i64,
|
||||
_last_modified: Option<String>,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let client = self.state.s3_client.as_ref().ok_or("S3 client not configured")?;
|
||||
let client = self.state.drive.as_ref().ok_or("S3 client not configured")?;
|
||||
let content = minio_handler::get_file_content(client, &self.state.bucket_name, file_path).await?;
|
||||
let file_hash = if content.len() > 100 {
|
||||
format!(
|
||||
|
|
|
|||
|
|
@ -171,7 +171,6 @@ async fn main() -> std::io::Result<()> {
|
|||
}
|
||||
};
|
||||
|
||||
let db_custom_pool = db_pool.clone();
|
||||
let cache_url = std::env::var("CACHE_URL")
|
||||
.or_else(|_| std::env::var("REDIS_URL"))
|
||||
.unwrap_or_else(|_| "redis://localhost:6379".to_string());
|
||||
|
|
@ -216,12 +215,11 @@ async fn main() -> std::io::Result<()> {
|
|||
)));
|
||||
|
||||
let app_state = Arc::new(AppState {
|
||||
s3_client: Some(drive),
|
||||
drive: Some(drive),
|
||||
config: Some(cfg.clone()),
|
||||
conn: db_pool.clone(),
|
||||
bucket_name: "default.gbai".to_string(), // Default bucket name
|
||||
custom_conn: db_custom_pool.clone(),
|
||||
redis_client: redis_client.clone(),
|
||||
cache: redis_client.clone(),
|
||||
session_manager: session_manager.clone(),
|
||||
tool_manager: tool_manager.clone(),
|
||||
llm_provider: llm_provider.clone(),
|
||||
|
|
|
|||
|
|
@ -241,7 +241,7 @@ post_install_cmds_linux: vec![
|
|||
post_install_cmds_windows: vec![],
|
||||
env_vars: HashMap::new(),
|
||||
data_download_list: Vec::new(),
|
||||
exec_cmd: "nohup {{BIN_PATH}}/bin/valkey-server --port 6379 --dir {{DATA_PATH}} > {{LOGS_PATH}}/valkey.log 2>&1 &".to_string(),
|
||||
exec_cmd: "nohup {{BIN_PATH}}/bin/valkey-server --port 6379 --dir {{DATA_PATH}} > {{LOGS_PATH}}/valkey.log && {{BIN_PATH}}/bin/valkey-cli CONFIG SET stop-writes-on-bgsave-error no 2>&1 &".to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,12 +15,11 @@ use tokio::sync::mpsc;
|
|||
use crate::shared::models::BotResponse;
|
||||
|
||||
pub struct AppState {
|
||||
pub s3_client: Option<S3Client>,
|
||||
pub drive: Option<S3Client>,
|
||||
pub cache: Option<Arc<RedisClient>>,
|
||||
pub bucket_name: String,
|
||||
pub config: Option<AppConfig>,
|
||||
pub conn: Arc<Mutex<PgConnection>>,
|
||||
pub custom_conn: Arc<Mutex<PgConnection>>,
|
||||
pub redis_client: Option<Arc<RedisClient>>,
|
||||
pub session_manager: Arc<tokio::sync::Mutex<SessionManager>>,
|
||||
pub tool_manager: Arc<ToolManager>,
|
||||
pub llm_provider: Arc<dyn LLMProvider>,
|
||||
|
|
@ -36,12 +35,12 @@ pub struct AppState {
|
|||
impl Clone for AppState {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
s3_client: self.s3_client.clone(),
|
||||
drive: self.drive.clone(),
|
||||
bucket_name: self.bucket_name.clone(),
|
||||
config: self.config.clone(),
|
||||
conn: Arc::clone(&self.conn),
|
||||
custom_conn: Arc::clone(&self.custom_conn),
|
||||
redis_client: self.redis_client.clone(),
|
||||
|
||||
cache: self.cache.clone(),
|
||||
session_manager: Arc::clone(&self.session_manager),
|
||||
tool_manager: Arc::clone(&self.tool_manager),
|
||||
llm_provider: Arc::clone(&self.llm_provider),
|
||||
|
|
@ -59,16 +58,14 @@ impl Clone for AppState {
|
|||
impl Default for AppState {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
s3_client: None,
|
||||
drive: None,
|
||||
bucket_name: "default.gbai".to_string(),
|
||||
config: None,
|
||||
conn: Arc::new(Mutex::new(
|
||||
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),
|
||||
)),
|
||||
custom_conn: Arc::new(Mutex::new(
|
||||
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),
|
||||
)),
|
||||
redis_client: None,
|
||||
|
||||
cache: None,
|
||||
session_manager: Arc::new(tokio::sync::Mutex::new(SessionManager::new(
|
||||
diesel::PgConnection::establish("postgres://localhost/test").unwrap(),
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -1,17 +1,17 @@
|
|||
LET resume1 = GET_BOT_MEMORY("general")
|
||||
LET resume2 = GET_BOT_MEMORY("auxiliom")
|
||||
LET resume3 = GET_BOT_MEMORY("toolbix")
|
||||
let resume1 = GET_BOT_MEMORY("general");
|
||||
let resume2 = GET_BOT_MEMORY("auxiliom");
|
||||
let resume3 = GET_BOT_MEMORY("toolbix");
|
||||
|
||||
SET_CONTEXT "general", resume1
|
||||
SET_CONTEXT "auxiliom", resume2
|
||||
SET_CONTEXT "toolbix", resume3
|
||||
SET_CONTEXT "general" AS resume1;
|
||||
SET_CONTEXT "auxiliom" AS resume2;
|
||||
SET_CONTEXT "toolbix" AS resume3;
|
||||
|
||||
|
||||
ADD_SUGGESTION "general", "Show me the weekly announcements"
|
||||
ADD_SUGGESTION "auxiliom", "Will Auxiliom help me with what?"
|
||||
ADD_SUGGESTION "auxiliom", "What does Auxiliom do?" , "fixed"
|
||||
ADD_SUGGESTION "toolbix", "Show me Toolbix features"
|
||||
ADD_SUGGESTION "toolbix", "How can Toolbix help my business?", "fixed"
|
||||
ADD_SUGGESTION "general" AS "Show me the weekly announcements"
|
||||
ADD_SUGGESTION "auxiliom" AS "Will Auxiliom help me with what?"
|
||||
ADD_SUGGESTION "auxiliom" AS "What does Auxiliom do?"
|
||||
ADD_SUGGESTION "toolbix" AS "Show me Toolbix features"
|
||||
ADD_SUGGESTION "toolbix" AS "How can Toolbix help my business?"
|
||||
|
||||
|
||||
TALK "You can ask me about any of the announcements or circulars."
|
||||
|
|
|
|||
|
|
@ -878,7 +878,6 @@
|
|||
/>
|
||||
<button id="sendBtn">Enviar</button>
|
||||
</div>
|
||||
<div id="suggestions-container" style="text-align:center; margin-top:10px;"></div>
|
||||
<script>
|
||||
function handleSuggestions(suggestions) {
|
||||
const container = document.getElementById('suggestions-container');
|
||||
|
|
@ -899,9 +898,30 @@
|
|||
});
|
||||
}
|
||||
|
||||
let pendingContextChange = null;
|
||||
|
||||
async function setContext(context) {
|
||||
try {
|
||||
// Get the button text from the clicked suggestion
|
||||
const buttonText = event?.target?.textContent || context;
|
||||
|
||||
// Set the input field value to the button text
|
||||
const input = document.getElementById('messageInput');
|
||||
if (input) {
|
||||
input.value = buttonText;
|
||||
}
|
||||
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
pendingContextChange = new Promise((resolve) => {
|
||||
const handler = (event) => {
|
||||
const response = JSON.parse(event.data);
|
||||
if (response.message_type === 5 && response.context_name === context) {
|
||||
ws.removeEventListener('message', handler);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
ws.addEventListener('message', handler);
|
||||
|
||||
const suggestionEvent = {
|
||||
bot_id: currentBotId,
|
||||
user_id: currentUserId,
|
||||
|
|
@ -914,6 +934,9 @@
|
|||
timestamp: new Date().toISOString()
|
||||
};
|
||||
ws.send(JSON.stringify(suggestionEvent));
|
||||
});
|
||||
|
||||
await pendingContextChange;
|
||||
alert(`Contexto alterado para: ${context}`);
|
||||
} else {
|
||||
console.warn("WebSocket não está conectado. Tentando reconectar...");
|
||||
|
|
@ -923,6 +946,42 @@
|
|||
console.error('Failed to set context:', err);
|
||||
}
|
||||
}
|
||||
|
||||
async function sendMessage() {
|
||||
if (pendingContextChange) {
|
||||
await pendingContextChange;
|
||||
pendingContextChange = null;
|
||||
}
|
||||
const message = input.value.trim();
|
||||
if (!message || !ws || ws.readyState !== WebSocket.OPEN) {
|
||||
if (!ws || ws.readyState !== WebSocket.OPEN) {
|
||||
showWarning("Conexão não disponível. Tentando reconectar...");
|
||||
connectWebSocket();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (isThinking) {
|
||||
hideThinkingIndicator();
|
||||
}
|
||||
|
||||
addMessage("user", message);
|
||||
|
||||
const messageData = {
|
||||
bot_id: currentBotId,
|
||||
user_id: currentUserId,
|
||||
session_id: currentSessionId,
|
||||
channel: "web",
|
||||
content: message,
|
||||
message_type: 1,
|
||||
media_url: null,
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
ws.send(JSON.stringify(messageData));
|
||||
input.value = "";
|
||||
input.focus();
|
||||
}
|
||||
</script>
|
||||
</footer>
|
||||
</div>
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue