diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 977b1be3..259a9ce9 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -378,283 +378,6 @@ impl BotOrchestrator { Ok(()) } - pub async fn process_message( - &self, - message: UserMessage, - ) -> Result<(), Box> { - info!( - "Processing message from channel: {}, user: {}, session: {}", - message.channel, message.user_id, message.session_id - ); - debug!( - "Message content: '{}', type: {}", - message.content, message.message_type - ); - - let user_id = Uuid::parse_str(&message.user_id).map_err(|e| { - error!("Invalid user ID provided: {}", e); - e - })?; - - let bot_id = Uuid::nil(); - let session = { - let mut sm = self.state.session_manager.lock().await; - let session_id = Uuid::parse_str(&message.session_id).map_err(|e| { - error!("Invalid session ID: {}", e); - e - })?; - - match sm.get_session_by_id(session_id)? { - Some(session) => session, - None => { - error!( - "Failed to create session for user {} with bot {}", - user_id, bot_id - ); - return Err("Failed to create session".into()); - } - } - }; - - if self.is_waiting_for_input(session.id).await { - debug!( - "Session {} is waiting for input, processing as variable input", - session.id - ); - if let Some(variable_name) = - self.handle_user_input(session.id, &message.content).await? - { - info!( - "Stored user input in variable '{}' for session {}", - variable_name, session.id - ); - if let Some(adapter) = self.state.channels.lock().unwrap().get(&message.channel) { - let ack_response = BotResponse { - bot_id: message.bot_id.clone(), - user_id: message.user_id.clone(), - session_id: message.session_id.clone(), - channel: message.channel.clone(), - content: format!("Input stored in '{}'", variable_name), - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - context_name: None, - context_length: 0, - context_max_length: 0, - }; - adapter.send_message(ack_response).await?; - } - } - return Ok(()); - } - - if session.answer_mode == 1 && session.current_tool.is_some() { - self.state.tool_manager.provide_user_response( - &message.user_id, - &message.bot_id, - message.content.clone(), - )?; - return Ok(()); - } - - { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.save_message( - session.id, - user_id, - 1, - &message.content, - message.message_type, - )?; - } - - let response_content = self.direct_mode_handler(&message, &session).await?; - - { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.save_message(session.id, user_id, 2, &response_content, 1)?; - } - - // Create regular response - let channel = message.channel.clone(); - let config_manager = ConfigManager::new(Arc::clone(&self.state.conn)); - let max_context_size = config_manager - .get_config( - &Uuid::parse_str(&message.bot_id).unwrap_or_default(), - "llm-server-ctx-size", - None, - ) - .unwrap_or_default() - .parse::() - .unwrap_or(0); - - let current_context_length = 0usize; - - let bot_response = BotResponse { - bot_id: message.bot_id, - user_id: message.user_id, - session_id: message.session_id, - channel: channel.clone(), - content: response_content, - message_type: 1, - stream_token: None, - is_complete: true, - suggestions: Vec::new(), - context_name: None, - context_length: current_context_length, - context_max_length: max_context_size, - }; - - if let Some(adapter) = self.state.channels.lock().unwrap().get(&channel) { - adapter.send_message(bot_response).await?; - } else { - warn!("No channel adapter found for message channel: {}", channel); - } - - Ok(()) - } - - async fn direct_mode_handler( - &self, - message: &UserMessage, - session: &UserSession, - ) -> Result> { - let system_prompt = std::env::var("SYSTEM_PROMPT").unwrap_or_default(); - let context_data = { - let session_manager = self.state.session_manager.lock().await; - session_manager - .get_session_context_data(&session.id, &session.user_id) - .await? - }; - - let mut prompt = String::new(); - if !system_prompt.is_empty() { - prompt.push_str(&format!("System: {}\n", system_prompt)); - } - if !context_data.is_empty() { - prompt.push_str(&format!("Context: {}\n", context_data)); - } - - let history = { - let mut session_manager = self.state.session_manager.lock().await; - session_manager.get_conversation_history(session.id, session.user_id)? - }; - - // Deduplicate consecutive messages from same role - let mut deduped_history: Vec<(String, String)> = Vec::new(); - let mut last_role = None; - for (role, content) in history.iter() { - if last_role != Some(role) - || !deduped_history.is_empty() && content != &deduped_history.last().unwrap().1 - { - deduped_history.push((role.clone(), content.clone())); - last_role = Some(role); - } - } - - let recent_history = if deduped_history.len() > 10 { - &deduped_history[deduped_history.len() - 10..] - } else { - &deduped_history[..] - }; - - for (role, content) in recent_history { - prompt.push_str(&format!("{}: {}\n", role, content)); - } - - prompt.push_str(&format!("User: {}\nAssistant:", message.content)); - - let use_langcache = std::env::var("LLM_CACHE") - .unwrap_or_else(|_| "false".to_string()) - .eq_ignore_ascii_case("true"); - - if use_langcache { - ensure_collection_exists(&self.state, "semantic_cache").await?; - let langcache_client = get_langcache_client()?; - let isolated_question = message.content.trim().to_string(); - let question_embeddings = generate_embeddings(vec![isolated_question.clone()]).await?; - let question_embedding = question_embeddings - .get(0) - .ok_or_else(|| "Failed to generate embedding for question")? - .clone(); - - let search_results = langcache_client - .search("semantic_cache", question_embedding.clone(), 1) - .await?; - - if let Some(result) = search_results.first() { - let payload = &result.payload; - if let Some(resp) = payload.get("response").and_then(|v| v.as_str()) { - return Ok(resp.to_string()); - } - } - - let response = self - .state - .llm_provider - .generate(&prompt, &serde_json::Value::Null) - .await?; - - let point = QdrantPoint { - id: uuid::Uuid::new_v4().to_string(), - vector: question_embedding, - payload: serde_json::json!({ - "question": isolated_question, - "prompt": prompt, - "response": response - }), - }; - - langcache_client - .upsert_points("semantic_cache", vec![point]) - .await?; - - Ok(response) - } else { - ensure_collection_exists(&self.state, "semantic_cache").await?; - let qdrant_client = get_qdrant_client(&self.state)?; - let embeddings = generate_embeddings(vec![prompt.clone()]).await?; - let embedding = embeddings - .get(0) - .ok_or_else(|| "Failed to generate embedding")? - .clone(); - - let search_results = qdrant_client - .search("semantic_cache", embedding.clone(), 1) - .await?; - - if let Some(result) = search_results.first() { - if let Some(payload) = &result.payload { - if let Some(resp) = payload.get("response").and_then(|v| v.as_str()) { - return Ok(resp.to_string()); - } - } - } - - let response = self - .state - .llm_provider - .generate(&prompt, &serde_json::Value::Null) - .await?; - - let point = QdrantPoint { - id: uuid::Uuid::new_v4().to_string(), - vector: embedding, - payload: serde_json::json!({ - "prompt": prompt, - "response": response - }), - }; - - qdrant_client - .upsert_points("semantic_cache", vec![point]) - .await?; - - Ok(response) - } - } - pub async fn stream_response( &self, message: UserMessage,