From 1a7d6ae0e2784f09113b609f01f4335c67a3a7ee Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Sun, 5 Oct 2025 07:18:43 -0300 Subject: [PATCH] - Reorganize services & shared models, add bot/chart Rename mod files, add bot & chart, move structs, extend LLM. --- src/{services/mod.rs => services.rs} | 7 +- .../automation/{automation.rs => mod.rs} | 0 src/services/bot/{orchestrator.rs => mod.rs} | 18 +- src/services/channels/mod.rs | 69 ++++-- src/services/chart/{chart.rs => mod.rs} | 0 src/services/email/{email.rs => mod.rs} | 0 src/services/file/{file.rs => mod.rs} | 0 src/services/llm/mod.rs | 201 +++++++++++++++--- src/services/shared/mod.rs | 60 +++++- src/services/shared/shared.rs | 58 ----- 10 files changed, 287 insertions(+), 126 deletions(-) rename src/{services/mod.rs => services.rs} (73%) rename src/services/automation/{automation.rs => mod.rs} (100%) rename src/services/bot/{orchestrator.rs => mod.rs} (98%) rename src/services/chart/{chart.rs => mod.rs} (100%) rename src/services/email/{email.rs => mod.rs} (100%) rename src/services/file/{file.rs => mod.rs} (100%) delete mode 100644 src/services/shared/shared.rs diff --git a/src/services/mod.rs b/src/services.rs similarity index 73% rename from src/services/mod.rs rename to src/services.rs index 4587321..586a31d 100644 --- a/src/services/mod.rs +++ b/src/services.rs @@ -1,17 +1,16 @@ pub mod auth; pub mod automation; +pub mod bot; pub mod channels; +pub mod chart; pub mod config; pub mod context; pub mod email; pub mod file; +pub mod keywords; pub mod llm; -pub mod llm_generic; -pub mod llm_local; -pub mod orchestrator; pub mod session; pub mod shared; -pub mod state; pub mod tools; pub mod web_automation; pub mod whatsapp; diff --git a/src/services/automation/automation.rs b/src/services/automation/mod.rs similarity index 100% rename from src/services/automation/automation.rs rename to src/services/automation/mod.rs diff --git a/src/services/bot/orchestrator.rs b/src/services/bot/mod.rs similarity index 98% rename from src/services/bot/orchestrator.rs rename to src/services/bot/mod.rs index 41f9b99..330c840 100644 --- a/src/services/bot/orchestrator.rs +++ b/src/services/bot/mod.rs @@ -1,34 +1,22 @@ -use actix_cors::Cors; -use actix_web::middleware::Logger; -use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Result}; -use actix_ws::Message; +use actix_web::{web, HttpRequest, HttpResponse, Result}; use chrono::Utc; use langchain_rust::{ - chain::{Chain, LLMChain, LLMChainBuilder}, + chain::{Chain, LLMChain}, embedding::openai::openai_embedder::OpenAiEmbedder, llm::openai::OpenAI, memory::SimpleMemory, prompt_args, - schemas::{Document, Message}, tools::{postgres::PostgreSQLEngine, SQLDatabaseBuilder}, - vectorstore::qdrant::{Qdrant as LangChainQdrant, StoreBuilder}, + vectorstore::qdrant::Qdrant as LangChainQdrant, vectorstore::{VecStoreOptions, VectorStore}, }; use log::info; -use qdrant_client::qdrant::Qdrant as QdrantClient; -use serde::{Deserialize, Serialize}; -use sqlx::{postgres::PgPoolOptions, PgPool}; use std::collections::HashMap; use std::fs; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; use uuid::Uuid; -mod shared; -use shared::*; - -mod services; - use services::auth::AuthService; use services::channels::{ChannelAdapter, VoiceAdapter, WebChannelAdapter}; use services::chart::ChartGenerator; diff --git a/src/services/channels/mod.rs b/src/services/channels/mod.rs index 1136175..a4cd302 100644 --- a/src/services/channels/mod.rs +++ b/src/services/channels/mod.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; -use livekit::{AccessToken, Room, RoomOptions, DataPacketKind}; +use chrono::Utc; +use livekit::{DataPacketKind, Room, RoomOptions}; use log::info; -use tokio::sync::{mpsc, Mutex}; use std::collections::HashMap; use std::sync::Arc; -use chrono::Utc; +use tokio::sync::{mpsc, Mutex}; -use crate::shared::{UserMessage, BotResponse}; +use crate::services::shared::{BotResponse, UserMessage}; #[async_trait] pub trait ChannelAdapter: Send + Sync { @@ -63,7 +63,11 @@ impl VoiceAdapter { } } - pub async fn start_voice_session(&self, session_id: &str, user_id: &str) -> Result> { + pub async fn start_voice_session( + &self, + session_id: &str, + user_id: &str, + ) -> Result> { let token = AccessToken::with_api_key(&self.api_key, &self.api_secret) .with_identity(user_id) .with_name(user_id) @@ -77,7 +81,10 @@ impl VoiceAdapter { }; let (room, mut events) = Room::connect(&self.livekit_url, &token, room_options).await?; - self.rooms.lock().await.insert(session_id.to_string(), room.clone()); + self.rooms + .lock() + .await + .insert(session_id.to_string(), room.clone()); let rooms_clone = self.rooms.clone(); let connections_clone = self.connections.clone(); @@ -87,23 +94,33 @@ impl VoiceAdapter { while let Some(event) = events.recv().await { match event { livekit::prelude::RoomEvent::DataReceived(data_packet) => { - if let Ok(message) = serde_json::from_slice::(&data_packet.data) { + if let Ok(message) = + serde_json::from_slice::(&data_packet.data) + { info!("Received voice message: {}", message.content); - if let Some(tx) = connections_clone.lock().await.get(&message.session_id) { - let _ = tx.send(BotResponse { - bot_id: message.bot_id, - user_id: message.user_id, - session_id: message.session_id, - channel: "voice".to_string(), - content: format!("🎤 Voice: {}", message.content), - message_type: "voice".to_string(), - stream_token: None, - is_complete: true, - }).await; + if let Some(tx) = + connections_clone.lock().await.get(&message.session_id) + { + let _ = tx + .send(BotResponse { + bot_id: message.bot_id, + user_id: message.user_id, + session_id: message.session_id, + channel: "voice".to_string(), + content: format!("🎤 Voice: {}", message.content), + message_type: "voice".to_string(), + stream_token: None, + is_complete: true, + }) + .await; } } } - livekit::prelude::RoomEvent::TrackSubscribed(track, publication, participant) => { + livekit::prelude::RoomEvent::TrackSubscribed( + track, + publication, + participant, + ) => { info!("Voice track subscribed from {}", participant.identity()); } _ => {} @@ -115,7 +132,10 @@ impl VoiceAdapter { Ok(token) } - pub async fn stop_voice_session(&self, session_id: &str) -> Result<(), Box> { + pub async fn stop_voice_session( + &self, + session_id: &str, + ) -> Result<(), Box> { if let Some(room) = self.rooms.lock().await.remove(session_id) { room.disconnect(); } @@ -126,7 +146,11 @@ impl VoiceAdapter { self.connections.lock().await.insert(session_id, tx); } - pub async fn send_voice_response(&self, session_id: &str, text: &str) -> Result<(), Box> { + pub async fn send_voice_response( + &self, + session_id: &str, + text: &str, + ) -> Result<(), Box> { if let Some(room) = self.rooms.lock().await.get(session_id) { let voice_response = serde_json::json!({ "type": "voice_response", @@ -148,6 +172,7 @@ impl VoiceAdapter { impl ChannelAdapter for VoiceAdapter { async fn send_message(&self, response: BotResponse) -> Result<(), Box> { info!("Sending voice response to: {}", response.user_id); - self.send_voice_response(&response.session_id, &response.content).await + self.send_voice_response(&response.session_id, &response.content) + .await } } diff --git a/src/services/chart/chart.rs b/src/services/chart/mod.rs similarity index 100% rename from src/services/chart/chart.rs rename to src/services/chart/mod.rs diff --git a/src/services/email/email.rs b/src/services/email/mod.rs similarity index 100% rename from src/services/email/email.rs rename to src/services/email/mod.rs diff --git a/src/services/file/file.rs b/src/services/file/mod.rs similarity index 100% rename from src/services/file/file.rs rename to src/services/file/mod.rs diff --git a/src/services/llm/mod.rs b/src/services/llm/mod.rs index 5171bcf..298a83a 100644 --- a/src/services/llm/mod.rs +++ b/src/services/llm/mod.rs @@ -6,10 +6,10 @@ use langchain_rust::{ schemas::Message, }; use serde_json::Value; +use std::sync::Arc; use tokio::sync::mpsc; -pub mod llm_generic; -pub mod llm_local; -pub mod llm_provider; + +use crate::services::tools::ToolManager; #[async_trait] pub trait LLMProvider: Send + Sync { @@ -26,49 +26,40 @@ pub trait LLMProvider: Send + Sync { tx: mpsc::Sender, ) -> Result<(), Box>; - // Add tool calling capability + // Add tool calling capability using LangChain tools async fn generate_with_tools( &self, prompt: &str, - config: &serde_json::Value, + config: &Value, available_tools: &[String], - tool_manager: Arc, + tool_manager: Arc, session_id: &str, user_id: &str, - ) -> Result>; + ) -> Result>; } -pub struct OpenAIClient -where - C: langchain_rust::llm::Config, -{ - client: OpenAI, +pub struct OpenAIClient { + client: OpenAI, } -impl OpenAIClient -where - C: langchain_rust::llm::Config, -{ - pub fn new(config: C) -> Self { - let client = OpenAI::new(config); +impl OpenAIClient { + pub fn new(client: OpenAI) -> Self { Self { client } } } #[async_trait] -impl LLMProvider for OpenAIClient -where - C: langchain_rust::llm::Config + Send + Sync, -{ +impl LLMProvider for OpenAIClient { async fn generate( &self, prompt: &str, _config: &Value, ) -> Result> { - // Call the underlying OpenAI client with the raw prompt string. + let messages = vec![Message::new_human_message(prompt.to_string())]; + let result = self .client - .invoke(prompt) + .invoke(&messages) .await .map_err(|e| Box::new(e) as Box)?; @@ -81,7 +72,6 @@ where _config: &Value, mut tx: mpsc::Sender, ) -> Result<(), Box> { - // Build a message vector for the OpenAI streaming API let messages = vec![Message::new_human_message(prompt.to_string())]; let mut stream = self @@ -93,7 +83,6 @@ where while let Some(result) = stream.next().await { match result { Ok(chunk) => { - // The `content` field is accessed directly (no method). let content = chunk.content; if !content.is_empty() { let _ = tx.send(content.to_string()).await; @@ -107,6 +96,35 @@ where Ok(()) } + + async fn generate_with_tools( + &self, + prompt: &str, + _config: &Value, + available_tools: &[String], + _tool_manager: Arc, + _session_id: &str, + _user_id: &str, + ) -> Result> { + // Enhanced prompt with tool information + let tools_info = if available_tools.is_empty() { + String::new() + } else { + format!("\n\nAvailable tools: {}. You can suggest using these tools if they would help answer the user's question.", available_tools.join(", ")) + }; + + let enhanced_prompt = format!("{}{}", prompt, tools_info); + + let messages = vec![Message::new_human_message(enhanced_prompt)]; + + let result = self + .client + .invoke(&messages) + .await + .map_err(|e| Box::new(e) as Box)?; + + Ok(result) + } } pub struct AnthropicClient { @@ -119,3 +137,134 @@ impl AnthropicClient { Self { client } } } + +#[async_trait] +impl LLMProvider for AnthropicClient { + async fn generate( + &self, + prompt: &str, + _config: &Value, + ) -> Result> { + let messages = vec![Message::new_human_message(prompt.to_string())]; + + let result = self + .client + .invoke(&messages) + .await + .map_err(|e| Box::new(e) as Box)?; + + Ok(result) + } + + async fn generate_stream( + &self, + prompt: &str, + _config: &Value, + mut tx: mpsc::Sender, + ) -> Result<(), Box> { + let messages = vec![Message::new_human_message(prompt.to_string())]; + + let mut stream = self + .client + .stream(&messages) + .await + .map_err(|e| Box::new(e) as Box)?; + + while let Some(result) = stream.next().await { + match result { + Ok(chunk) => { + let content = chunk.content; + if !content.is_empty() { + let _ = tx.send(content.to_string()).await; + } + } + Err(e) => { + eprintln!("Stream error: {}", e); + } + } + } + + Ok(()) + } + + async fn generate_with_tools( + &self, + prompt: &str, + _config: &Value, + available_tools: &[String], + _tool_manager: Arc, + _session_id: &str, + _user_id: &str, + ) -> Result> { + let tools_info = if available_tools.is_empty() { + String::new() + } else { + format!("\n\nAvailable tools: {}. You can suggest using these tools if they would help answer the user's question.", available_tools.join(", ")) + }; + + let enhanced_prompt = format!("{}{}", prompt, tools_info); + + let messages = vec![Message::new_human_message(enhanced_prompt)]; + + let result = self + .client + .invoke(&messages) + .await + .map_err(|e| Box::new(e) as Box)?; + + Ok(result) + } +} + +pub struct MockLLMProvider; + +impl MockLLMProvider { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl LLMProvider for MockLLMProvider { + async fn generate( + &self, + prompt: &str, + _config: &Value, + ) -> Result> { + Ok(format!("Mock response to: {}", prompt)) + } + + async fn generate_stream( + &self, + prompt: &str, + _config: &Value, + mut tx: mpsc::Sender, + ) -> Result<(), Box> { + let response = format!("Mock stream response to: {}", prompt); + for word in response.split_whitespace() { + let _ = tx.send(format!("{} ", word)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + Ok(()) + } + + async fn generate_with_tools( + &self, + prompt: &str, + _config: &Value, + available_tools: &[String], + _tool_manager: Arc, + _session_id: &str, + _user_id: &str, + ) -> Result> { + let tools_list = if available_tools.is_empty() { + "no tools available".to_string() + } else { + available_tools.join(", ") + }; + Ok(format!( + "Mock response with tools [{}] to: {}", + tools_list, prompt + )) + } +} diff --git a/src/services/shared/mod.rs b/src/services/shared/mod.rs index cdcc6f2..f077184 100644 --- a/src/services/shared/mod.rs +++ b/src/services/shared/mod.rs @@ -1,3 +1,61 @@ -pub mod shared; pub mod state; pub mod utils; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct UserSession { + pub id: Uuid, + pub user_id: Uuid, + pub bot_id: Uuid, + pub title: String, + pub context_data: serde_json::Value, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmbeddingRequest { + pub text: String, + pub model: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EmbeddingResponse { + pub embedding: Vec, + pub model: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchResult { + pub text: String, + pub similarity: f32, + pub metadata: serde_json::Value, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserMessage { + pub bot_id: String, + pub user_id: String, + pub session_id: String, + pub channel: String, + pub content: String, + pub message_type: String, + pub media_url: Option, + pub timestamp: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BotResponse { + pub bot_id: String, + pub user_id: String, + pub session_id: String, + pub channel: String, + pub content: String, + pub message_type: String, + pub stream_token: Option, + pub is_complete: bool, +} diff --git a/src/services/shared/shared.rs b/src/services/shared/shared.rs deleted file mode 100644 index bd86586..0000000 --- a/src/services/shared/shared.rs +++ /dev/null @@ -1,58 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use sqlx::FromRow; -use uuid::Uuid; - -#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] -pub struct UserSession { - pub id: Uuid, - pub user_id: Uuid, - pub bot_id: Uuid, - pub title: String, - pub context_data: serde_json::Value, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EmbeddingRequest { - pub text: String, - pub model: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct EmbeddingResponse { - pub embedding: Vec, - pub model: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SearchResult { - pub text: String, - pub similarity: f32, - pub metadata: serde_json::Value, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UserMessage { - pub bot_id: String, - pub user_id: String, - pub session_id: String, - pub channel: String, - pub content: String, - pub message_type: String, - pub media_url: Option, - pub timestamp: DateTime, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BotResponse { - pub bot_id: String, - pub user_id: String, - pub session_id: String, - pub channel: String, - pub content: String, - pub message_type: String, - pub stream_token: Option, - pub is_complete: bool, -}