refactor(bot): restructure message handling and context flow

- Reformatted `update_session_context` call for better readability.
- Moved context‑change (type 4) handling to a later stage in the processing pipeline and removed early return, ensuring proper flow.
- Adjusted deduplication logic formatting and clarified condition.
- Restored saving of user messages after context handling, preserving message history.
- Added detailed logging of the LLM prompt for debugging.
- Simplified JSON extraction for `message_type` and applied minor whitespace clean‑ups.
- Overall code refactor improves maintainability and corrects context‑change handling behavior.
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-03 15:20:53 -03:00
parent 41d7377ab7
commit 9840d0a406
4 changed files with 51 additions and 52 deletions

View file

@ -344,11 +344,14 @@ impl BotOrchestrator {
error!("Failed to parse user_id: {}", e); error!("Failed to parse user_id: {}", e);
e e
})?; })?;
if let Err(e) = self.state.session_manager.lock().await.update_session_context( if let Err(e) = self
&session_uuid, .state
&user_uuid, .session_manager
context_name.to_string() .lock()
).await { .await
.update_session_context(&session_uuid, &user_uuid, context_name.to_string())
.await
{
error!("Failed to update session context: {}", e); error!("Failed to update session context: {}", e);
} }
@ -473,21 +476,6 @@ impl BotOrchestrator {
session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; session_manager.save_message(session.id, user_id, 2, &response_content, 1)?;
} }
// Handle context change messages (type 4) first
if message.message_type == 4 {
if let Some(context_name) = &message.context_name {
return self
.handle_context_change(
&message.user_id,
&message.bot_id,
&message.session_id,
&message.channel,
context_name,
)
.await;
}
}
// Create regular response // Create regular response
let channel = message.channel.clone(); let channel = message.channel.clone();
let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); let config_manager = ConfigManager::new(Arc::clone(&self.state.conn));
@ -557,8 +545,9 @@ impl BotOrchestrator {
let mut deduped_history: Vec<(String, String)> = Vec::new(); let mut deduped_history: Vec<(String, String)> = Vec::new();
let mut last_role = None; let mut last_role = None;
for (role, content) in history.iter() { for (role, content) in history.iter() {
if last_role != Some(role) || !deduped_history.is_empty() && if last_role != Some(role)
content != &deduped_history.last().unwrap().1 { || !deduped_history.is_empty() && content != &deduped_history.last().unwrap().1
{
deduped_history.push((role.clone(), content.clone())); deduped_history.push((role.clone(), content.clone()));
last_role = Some(role); last_role = Some(role);
} }
@ -719,6 +708,22 @@ impl BotOrchestrator {
} }
}; };
// Handle context change messages (type 4) first
if message.message_type == 4 {
if let Some(context_name) = &message.context_name {
self
.handle_context_change(
&message.user_id,
&message.bot_id,
&message.session_id,
&message.channel,
context_name,
)
.await;
}
}
if session.answer_mode == 1 && session.current_tool.is_some() { if session.answer_mode == 1 && session.current_tool.is_some() {
self.state.tool_manager.provide_user_response( self.state.tool_manager.provide_user_response(
&message.user_id, &message.user_id,
@ -728,16 +733,6 @@ impl BotOrchestrator {
return Ok(()); return Ok(());
} }
{
let mut sm = self.state.session_manager.lock().await;
sm.save_message(
session.id,
user_id,
1,
&message.content,
message.message_type,
)?;
}
let system_prompt = std::env::var("SYSTEM_PROMPT").unwrap_or_default(); let system_prompt = std::env::var("SYSTEM_PROMPT").unwrap_or_default();
let context_data = { let context_data = {
@ -771,6 +766,17 @@ impl BotOrchestrator {
p p
}; };
{
let mut sm = self.state.session_manager.lock().await;
sm.save_message(
session.id,
user_id,
1,
&message.content,
message.message_type,
)?;
}
let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100); let (stream_tx, mut stream_rx) = mpsc::channel::<String>(100);
let llm = self.state.llm_provider.clone(); let llm = self.state.llm_provider.clone();
@ -803,6 +809,7 @@ impl BotOrchestrator {
} }
tokio::spawn(async move { tokio::spawn(async move {
info!("LLM prompt: {}", prompt);
if let Err(e) = llm if let Err(e) = llm
.generate_stream(&prompt, &serde_json::Value::Null, stream_tx) .generate_stream(&prompt, &serde_json::Value::Null, stream_tx)
.await .await
@ -1328,28 +1335,16 @@ async fn websocket_handler(
session_id: session_id_clone2.clone(), session_id: session_id_clone2.clone(),
channel: "web".to_string(), channel: "web".to_string(),
content, content,
message_type: json_value["message_type"] message_type: json_value["message_type"].as_u64().unwrap_or(1) as i32,
.as_u64()
.unwrap_or(1) as i32,
media_url: None, media_url: None,
timestamp: Utc::now(), timestamp: Utc::now(),
context_name: json_value["context_name"] context_name: json_value["context_name"].as_str().map(|s| s.to_string()),
.as_str()
.map(|s| s.to_string()),
}; };
// First try processing as a regular message
match orchestrator.process_message(user_message.clone()).await {
Ok(_) => (),
Err(e) => {
error!("Failed to process message: {}", e);
// Fall back to streaming if processing fails
if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await { if let Err(e) = orchestrator.stream_response(user_message, tx.clone()).await {
error!("Failed to stream response: {}", e); error!("Failed to stream response: {}", e);
} }
} }
}
}
WsMessage::Close(reason) => { WsMessage::Close(reason) => {
debug!( debug!(
"WebSocket closing for session {} - reason: {:?}", "WebSocket closing for session {} - reason: {:?}",

View file

@ -1,4 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use log::trace;
use reqwest::Client; use reqwest::Client;
use serde_json::Value; use serde_json::Value;
use std::sync::Arc; use std::sync::Arc;
@ -32,6 +33,8 @@ impl LLMProvider for AzureOpenAIClient {
prompt: &str, prompt: &str,
_config: &Value, _config: &Value,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> { ) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
trace!("LLM Prompt (no stream): {}", prompt);
let url = format!( let url = format!(
"{}/openai/deployments/{}/chat/completions?api-version={}", "{}/openai/deployments/{}/chat/completions?api-version={}",
self.endpoint, self.deployment, self.api_version self.endpoint, self.deployment, self.api_version
@ -71,6 +74,7 @@ impl LLMProvider for AzureOpenAIClient {
_config: &Value, _config: &Value,
tx: tokio::sync::mpsc::Sender<String>, tx: tokio::sync::mpsc::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
trace!("LLM Prompt: {}", prompt);
let content = self.generate(prompt, _config).await?; let content = self.generate(prompt, _config).await?;
let _ = tx.send(content).await; let _ = tx.send(content).await;
Ok(()) Ok(())

View file

@ -9,8 +9,8 @@ SET_CONTEXT "toolbix" AS resume3;
CLEAR_SUGGESTIONS; CLEAR_SUGGESTIONS;
ADD_SUGGESTION "general" AS "Show me the weekly announcements" ADD_SUGGESTION "general" AS "Show me the weekly announcements"
ADD_SUGGESTION "auxiliom" AS "Will Auxiliom help me with what?" ADD_SUGGESTION "auxiliom" AS "Explain Auxiliom to me"
ADD_SUGGESTION "auxiliom" AS "What does Auxiliom do?" ADD_SUGGESTION "auxiliom" AS "What does Auxiliom provide?"
ADD_SUGGESTION "toolbix" AS "Show me Toolbix features" ADD_SUGGESTION "toolbix" AS "Show me Toolbix features"
ADD_SUGGESTION "toolbix" AS "How can Toolbix help my business?" ADD_SUGGESTION "toolbix" AS "How can Toolbix help my business?"

View file

@ -18,7 +18,7 @@ llm-server-port,8081
llm-server-gpu-layers,0 llm-server-gpu-layers,0
llm-server-n-moe,0 llm-server-n-moe,0
llm-server-ctx-size,512 llm-server-ctx-size,512
llm-server-n-predict, 50 llm-server-n-predict,256
llm-server-parallel,6 llm-server-parallel,6
llm-server-cont-batching,true llm-server-cont-batching,true
llm-server-mlock,false llm-server-mlock,false

Can't render this file because it has a wrong number of fields in line 26.