From c39af8b320c7fc285c4e211c894a2a24810652a5 Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Mon, 3 Nov 2025 15:22:08 -0300 Subject: [PATCH] refactor(bot): remove legacy process_message method and its handling The large `process_message` function was deleted from `src/bot/mod.rs`. Its responsibilities have been migrated to newer, more modular handlers, eliminating dead code and simplifying the BotOrchestrator. This refactor reduces complexity, improves maintainability, and aligns the codebase with the updated message processing architecture. --- src/bot/mod.rs | 277 ------------------------------------------------- 1 file changed, 277 deletions(-) diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 977b1be3..259a9ce9 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -378,283 +378,6 @@ impl BotOrchestrator { Ok(()) } - pub async fn process_message( - &self, - message: UserMessage, - ) -> Result<(), Box> { - info!( - "Processing message from channel: {}, user: {}, session: {}", - message.channel, message.user_id, message.session_id - ); - debug!( - "Message content: '{}', type: {}", - message.content, message.message_type - ); - - let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { - error!("Invalid user ID provided: {}", e); - e - })?; - - let bot_id = Uuid::nil(); - let session = { - let mut sm = self.state.session_manager.lock().await; - let session_id = Uuid::parse_str(&message.session_id).map_err(|e| { - error!("Invalid session ID: {}", e); - e - })?; - - match sm.get_session_by_id(session_id)? { - Some(session) => session, - None => { - error!( - "Failed to create session for user {} with bot {}", - user_id, bot_id - ); - return Err("Failed to create session".into()); - } - } - }; - - if self.is_waiting_for_input(session.id).await { - debug!( - "Session {} is waiting for input, processing as variable input", - session.id - ); - if let Some(variable_name) = - self.handle_user_input(session.id, &message.content).await? - { - info!( - "Stored user input in variable '{}' for session {}", - variable_name, session.id - ); - if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { - let ack_response = BotResponse { - bot_id: message.bot_id.clone(), - user_id: message.user_id.clone(), - session_id: message.session_id.clone(), - channel: message.channel.clone(), - content: format!("Input stored in '{}'", variable_name), - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - context_name: None, - context_length: 0, - context_max_length: 0, - }; - adapter.send_message(ack_response).await?; - } - } - return Ok(()); - } - - if session.answer_mode == 1 && session.current_tool.is_some() { - self.state.tool_manager.provide_user_response( - &message.user_id, - &message.bot_id, - message.content.clone(), - )?; - return Ok(()); - } - - { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.save_message( - session.id, - user_id, - 1, - &message.content, - message.message_type, - )?; - } - - let response_content = self.direct_mode_handler(&message, &session).await?; - - { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; - } - - // Create regular response - let channel = message.channel.clone(); - let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); - let max_context_size = config_manager - .get_config( - &Uuid::parse_str(&message.bot_id).unwrap_or_default(), - "llm-server-ctx-size", - None, - ) - .unwrap_or_default() - .parse::() - .unwrap_or(0); - - let current_context_length = 0usize; - - let bot_response = BotResponse { - bot_id: message.bot_id, - user_id: message.user_id, - session_id: message.session_id, - channel: channel.clone(), - content: response_content, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - context_name: None, - context_length: current_context_length, - context_max_length: max_context_size, - }; - - if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { - adapter.send_message(bot_response).await?; - } else { - warn!("No channel adapter found for message channel: {}", channel); - } - - Ok(()) - } - - async fn direct_mode_handler( - &self, - message: &UserMessage, - session: &UserSession, - ) -> Result> { - let system_prompt = std::env::var("SYSTEM_PROMPT").unwrap_or_default(); - let context_data = { - let session_manager = self.state.session_manager.lock().await; - session_manager - .get_session_context_data(&session.id, &session.user_id) - .await? - }; - - let mut prompt = String::new(); - if !system_prompt.is_empty() { - prompt.push_str(&format!("System: {}\n", system_prompt)); - } - if !context_data.is_empty() { - prompt.push_str(&format!("Context: {}\n", context_data)); - } - - let history = { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.get_conversation_history(session.id, session.user_id)? - }; - - // Deduplicate consecutive messages from same role - let mut deduped_history: Vec<(String, String)> = Vec::new(); - let mut last_role = None; - for (role, content) in history.iter() { - if last_role != Some(role) - || !deduped_history.is_empty() && content != &deduped_history.last().unwrap().1 - { - deduped_history.push((role.clone(), content.clone())); - last_role = Some(role); - } - } - - let recent_history = if deduped_history.len() > 10 { - &deduped_history[deduped_history.len() - 10..] - } else { - &deduped_history[..] - }; - - for (role, content) in recent_history { - prompt.push_str(&format!("{}: {}\n", role, content)); - } - - prompt.push_str(&format!("User: {}\nAssistant:", message.content)); - - let use_langcache = std::env::var("LLM_CACHE") - .unwrap_or_else(|_| "false".to_string()) - .eq_ignore_ascii_case("true"); - - if use_langcache { - ensure_collection_exists(&self.state, "semantic_cache").await?; - let langcache_client = get_langcache_client()?; - let isolated_question = message.content.trim().to_string(); - let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?; - let question_embedding = question_embeddings - .get(0) - .ok_or_else(|| "Failed to generate embedding for question")? - .clone(); - - let search_results = langcache_client - .search("semantic_cache", question_embedding.clone(), 1) - .await?; - - if let Some(result) = search_results.first() { - let payload = &result.payload; - if let Some(resp) = payload.get("response").and_then(|v| v.as_str()) { - return Ok(resp.to_string()); - } - } - - let response = self - .state - .llm_provider - .generate(&prompt, &serde_json::Value::Null) - .await?; - - let point = QdrantPoint { - id: uuid::Uuid::new_v4().to_string(), - vector: question_embedding, - payload: serde_json::json!({ - "question": isolated_question, - "prompt": prompt, - "response": response - }), - }; - - langcache_client - .upsert_points("semantic_cache", vec![point]) - .await?; - - Ok(response) - } else { - ensure_collection_exists(&self.state, "semantic_cache").await?; - let qdrant_client = get_qdrant_client(&self.state)?; - let embeddings = generate_embeddings(vec![prompt.clone()]).await?; - let embedding = embeddings - .get(0) - .ok_or_else(|| "Failed to generate embedding")? - .clone(); - - let search_results = qdrant_client - .search("semantic_cache", embedding.clone(), 1) - .await?; - - if let Some(result) = search_results.first() { - if let Some(payload) = &result.payload { - if let Some(resp) = payload.get("response").and_then(|v| v.as_str()) { - return Ok(resp.to_string()); - } - } - } - - let response = self - .state - .llm_provider - .generate(&prompt, &serde_json::Value::Null) - .await?; - - let point = QdrantPoint { - id: uuid::Uuid::new_v4().to_string(), - vector: embedding, - payload: serde_json::json!({ - "prompt": prompt, - "response": response - }), - }; - - qdrant_client - .upsert_points("semantic_cache", vec![point]) - .await?; - - Ok(response) - } - } - pub async fn stream_response( &self, message: UserMessage,