diff --git a/src/docs/collaboration.rs b/src/docs/collaboration.rs index 9c7a2623b..83089dbe2 100644 --- a/src/docs/collaboration.rs +++ b/src/docs/collaboration.rs @@ -6,10 +6,12 @@ use axum::{ Path, State, }, response::IntoResponse, + Json, }; use chrono::Utc; use futures_util::{SinkExt, StreamExt}; use log::{error, info}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; @@ -19,10 +21,136 @@ pub type CollaborationChannels = static COLLAB_CHANNELS: std::sync::OnceLock = std::sync::OnceLock::new(); +pub type PresenceMap = Arc>>>; + +static PRESENCE: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type TypingMap = Arc>>>; + +static TYPING: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type SelectionMap = Arc>>>; + +static SELECTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type MentionMap = Arc>>>; + +static MENTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserPresence { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub cursor_position: Option, + pub last_active: chrono::DateTime, + pub status: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TypingIndicator { + pub user_id: String, + pub user_name: String, + pub position: usize, + pub started_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SelectionInfo { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub start_position: usize, + pub end_position: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MentionNotification { + pub id: String, + pub doc_id: String, + pub from_user_id: String, + pub from_user_name: String, + pub to_user_id: String, + pub position: usize, + pub message: String, + pub created_at: chrono::DateTime, + pub read: bool, +} + pub fn get_collab_channels() -> &'static CollaborationChannels { COLLAB_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) } +pub fn get_presence() -> &'static PresenceMap { + PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_typing() -> &'static TypingMap { + TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_selections() -> &'static SelectionMap { + SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_mentions() -> &'static MentionMap { + MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub async fn handle_get_collaborators( + Path(doc_id): Path, +) -> impl IntoResponse { + let presence = get_presence().read().await; + let users: Vec<&UserPresence> = presence + .get(&doc_id) + .map(|v| v.iter().collect()) + .unwrap_or_default(); + + Json(serde_json::json!({ + "count": users.len(), + "users": users + })) +} + +pub async fn handle_get_presence( + Path(doc_id): Path, +) -> impl IntoResponse { + let presence = get_presence().read().await; + let users = presence.get(&doc_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "users": users })) +} + +pub async fn handle_get_typing( + Path(doc_id): Path, +) -> impl IntoResponse { + let typing = get_typing().read().await; + let indicators = typing.get(&doc_id).cloned().unwrap_or_default(); + + let now = Utc::now(); + let active: Vec<&TypingIndicator> = indicators + .iter() + .filter(|t| (now - t.started_at).num_seconds() < 5) + .collect(); + + Json(serde_json::json!({ "typing": active })) +} + +pub async fn handle_get_selections( + Path(doc_id): Path, +) -> impl IntoResponse { + let selections = get_selections().read().await; + let sels = selections.get(&doc_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "selections": sels })) +} + +pub async fn handle_get_mentions( + Path(user_id): Path, +) -> impl IntoResponse { + let mentions = get_mentions().read().await; + let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "mentions": user_mentions })) +} + pub async fn handle_docs_websocket( ws: WebSocketUpgrade, Path(doc_id): Path, @@ -50,6 +178,19 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) { let user_name = format!("User {}", &user_id[..8]); let user_color = get_random_color(); + { + let mut presence = get_presence().write().await; + let users = presence.entry(doc_id.clone()).or_default(); + users.push(UserPresence { + user_id: user_id.clone(), + user_name: user_name.clone(), + user_color: user_color.clone(), + cursor_position: None, + last_active: Utc::now(), + status: "active".to_string(), + }); + } + let join_msg = CollabMessage { msg_type: "join".to_string(), doc_id: doc_id.clone(), @@ -84,6 +225,86 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) { collab_msg.doc_id = doc_id_clone.clone(); collab_msg.timestamp = Utc::now(); + match collab_msg.msg_type.as_str() { + "cursor" => { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&doc_id_clone) { + for user in users.iter_mut() { + if user.user_id == user_id_clone { + user.cursor_position = collab_msg.position; + user.last_active = Utc::now(); + } + } + } + } + "typing_start" => { + if let Some(pos) = collab_msg.position { + let mut typing = get_typing().write().await; + let indicators = typing.entry(doc_id_clone.clone()).or_default(); + indicators.retain(|t| t.user_id != user_id_clone); + indicators.push(TypingIndicator { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + position: pos, + started_at: Utc::now(), + }); + } + } + "typing_stop" => { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&doc_id_clone) { + indicators.retain(|t| t.user_id != user_id_clone); + } + } + "selection" => { + if let Some(content) = &collab_msg.content { + if let Ok(sel_data) = serde_json::from_str::(content) { + let mut selections = get_selections().write().await; + let sels = selections.entry(doc_id_clone.clone()).or_default(); + sels.retain(|s| s.user_id != user_id_clone); + + if let (Some(start), Some(end)) = ( + sel_data.get("start").and_then(|v| v.as_u64()), + sel_data.get("end").and_then(|v| v.as_u64()), + ) { + sels.push(SelectionInfo { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + user_color: user_color_clone.clone(), + start_position: start as usize, + end_position: end as usize, + }); + } + } + } + } + "mention" => { + if let Some(content) = &collab_msg.content { + if let Ok(mention_data) = serde_json::from_str::(content) { + if let (Some(to_user), Some(message)) = ( + mention_data.get("to_user_id").and_then(|v| v.as_str()), + mention_data.get("message").and_then(|v| v.as_str()), + ) { + let mut mentions = get_mentions().write().await; + let user_mentions = mentions.entry(to_user.to_string()).or_default(); + user_mentions.push(MentionNotification { + id: uuid::Uuid::new_v4().to_string(), + doc_id: doc_id_clone.clone(), + from_user_id: user_id_clone.clone(), + from_user_name: user_name_clone.clone(), + to_user_id: to_user.to_string(), + position: collab_msg.position.unwrap_or(0), + message: message.to_string(), + created_at: Utc::now(), + read: false, + }); + } + } + } + } + _ => {} + } + if let Err(e) = broadcast_tx_clone.send(collab_msg) { error!("Failed to broadcast message: {}", e); } @@ -112,6 +333,9 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) { } }); + let doc_id_leave = doc_id.clone(); + let user_id_leave = user_id.clone(); + let leave_msg = CollabMessage { msg_type: "leave".to_string(), doc_id: doc_id.clone(), @@ -130,6 +354,27 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) { _ = send_task => {} } + { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&doc_id_leave) { + users.retain(|u| u.user_id != user_id_leave); + } + } + + { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&doc_id_leave) { + indicators.retain(|t| t.user_id != user_id_leave); + } + } + + { + let mut selections = get_selections().write().await; + if let Some(sels) = selections.get_mut(&doc_id_leave) { + sels.retain(|s| s.user_id != user_id_leave); + } + } + if let Err(e) = broadcast_tx.send(leave_msg) { info!("User left (broadcast may have no receivers): {}", e); } @@ -160,11 +405,27 @@ pub async fn broadcast_doc_change( } } +pub async fn mark_mention_read(user_id: &str, mention_id: &str) { + let mut mentions = get_mentions().write().await; + if let Some(user_mentions) = mentions.get_mut(user_id) { + for mention in user_mentions.iter_mut() { + if mention.id == mention_id { + mention.read = true; + } + } + } +} + +pub async fn clear_user_mentions(user_id: &str) { + let mut mentions = get_mentions().write().await; + mentions.remove(user_id); +} + fn get_random_color() -> String { use rand::Rng; let colors = [ "#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F", - "#BB8FCE", "#85C1E9", + "#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2", ]; let idx = rand::rng().random_range(0..colors.len()); colors[idx].to_string() diff --git a/src/docs/mod.rs b/src/docs/mod.rs index 2864b4c19..0975e9265 100644 --- a/src/docs/mod.rs +++ b/src/docs/mod.rs @@ -12,7 +12,10 @@ use axum::{ }; use std::sync::Arc; -pub use collaboration::handle_docs_websocket; +pub use collaboration::{ + handle_docs_websocket, handle_get_collaborators, handle_get_mentions, handle_get_presence, + handle_get_selections, handle_get_typing, +}; pub use handlers::{ handle_accept_reject_all, handle_accept_reject_change, handle_add_comment, handle_add_endnote, handle_add_footnote, handle_ai_custom, handle_ai_expand, handle_ai_improve, handle_ai_simplify, @@ -88,5 +91,10 @@ pub fn configure_docs_routes() -> Router> { .route("/api/docs/styles", get(handle_list_styles)) .route("/api/docs/outline", post(handle_get_outline)) .route("/api/docs/compare", post(handle_compare_documents)) + .route("/api/docs/:doc_id/collaborators", get(handle_get_collaborators)) + .route("/api/docs/:doc_id/presence", get(handle_get_presence)) + .route("/api/docs/:doc_id/typing", get(handle_get_typing)) + .route("/api/docs/:doc_id/selections", get(handle_get_selections)) + .route("/api/docs/mentions/:user_id", get(handle_get_mentions)) .route("/ws/docs/:doc_id", get(handle_docs_websocket)) } diff --git a/src/sheet/collaboration.rs b/src/sheet/collaboration.rs index 1facf3992..a75222b2f 100644 --- a/src/sheet/collaboration.rs +++ b/src/sheet/collaboration.rs @@ -11,6 +11,7 @@ use axum::{ use chrono::Utc; use futures_util::{SinkExt, StreamExt}; use log::{error, info}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; @@ -20,16 +21,139 @@ pub type CollaborationChannels = static COLLAB_CHANNELS: std::sync::OnceLock = std::sync::OnceLock::new(); +pub type PresenceMap = Arc>>>; + +static PRESENCE: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type TypingMap = Arc>>>; + +static TYPING: std::sync::OnceLock = std::sync::OnceLock::new(); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserPresence { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub current_cell: Option, + pub current_worksheet: Option, + pub last_active: chrono::DateTime, + pub status: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TypingIndicator { + pub user_id: String, + pub user_name: String, + pub cell: String, + pub worksheet_index: usize, + pub started_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SelectionInfo { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub start_row: u32, + pub start_col: u32, + pub end_row: u32, + pub end_col: u32, + pub worksheet_index: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MentionNotification { + pub id: String, + pub sheet_id: String, + pub from_user_id: String, + pub from_user_name: String, + pub to_user_id: String, + pub cell: String, + pub message: String, + pub created_at: chrono::DateTime, + pub read: bool, +} + +pub type SelectionMap = Arc>>>; + +static SELECTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type MentionMap = Arc>>>; + +static MENTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + pub fn get_collab_channels() -> &'static CollaborationChannels { COLLAB_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) } +pub fn get_presence() -> &'static PresenceMap { + PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_typing() -> &'static TypingMap { + TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_selections() -> &'static SelectionMap { + SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_mentions() -> &'static MentionMap { + MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + pub async fn handle_get_collaborators( Path(sheet_id): Path, ) -> impl IntoResponse { - let channels = get_collab_channels().read().await; - let count = channels.get(&sheet_id).map(|s| s.receiver_count()).unwrap_or(0); - Json(serde_json::json!({ "count": count })) + let presence = get_presence().read().await; + let users: Vec<&UserPresence> = presence + .get(&sheet_id) + .map(|v| v.iter().collect()) + .unwrap_or_default(); + + Json(serde_json::json!({ + "count": users.len(), + "users": users + })) +} + +pub async fn handle_get_presence( + Path(sheet_id): Path, +) -> impl IntoResponse { + let presence = get_presence().read().await; + let users = presence.get(&sheet_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "users": users })) +} + +pub async fn handle_get_typing( + Path(sheet_id): Path, +) -> impl IntoResponse { + let typing = get_typing().read().await; + let indicators = typing.get(&sheet_id).cloned().unwrap_or_default(); + + let now = Utc::now(); + let active: Vec<&TypingIndicator> = indicators + .iter() + .filter(|t| (now - t.started_at).num_seconds() < 5) + .collect(); + + Json(serde_json::json!({ "typing": active })) +} + +pub async fn handle_get_selections( + Path(sheet_id): Path, +) -> impl IntoResponse { + let selections = get_selections().read().await; + let sels = selections.get(&sheet_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "selections": sels })) +} + +pub async fn handle_get_mentions( + Path(user_id): Path, +) -> impl IntoResponse { + let mentions = get_mentions().read().await; + let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "mentions": user_mentions })) } pub async fn handle_sheet_websocket( @@ -59,6 +183,20 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) { let user_name = format!("User {}", &user_id[..8]); let user_color = get_random_color(); + { + let mut presence = get_presence().write().await; + let users = presence.entry(sheet_id.clone()).or_default(); + users.push(UserPresence { + user_id: user_id.clone(), + user_name: user_name.clone(), + user_color: user_color.clone(), + current_cell: None, + current_worksheet: Some(0), + last_active: Utc::now(), + status: "active".to_string(), + }); + } + let join_msg = CollabMessage { msg_type: "join".to_string(), sheet_id: sheet_id.clone(), @@ -93,6 +231,98 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) { collab_msg.sheet_id = sheet_id_clone.clone(); collab_msg.timestamp = Utc::now(); + match collab_msg.msg_type.as_str() { + "cursor" | "cell_select" => { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&sheet_id_clone) { + for user in users.iter_mut() { + if user.user_id == user_id_clone { + if let (Some(row), Some(col)) = (collab_msg.row, collab_msg.col) { + user.current_cell = Some(format!("{},{}", row, col)); + } + user.current_worksheet = collab_msg.worksheet_index; + user.last_active = Utc::now(); + } + } + } + } + "typing_start" => { + if let (Some(row), Some(col), Some(ws_idx)) = + (collab_msg.row, collab_msg.col, collab_msg.worksheet_index) { + let mut typing = get_typing().write().await; + let indicators = typing.entry(sheet_id_clone.clone()).or_default(); + indicators.retain(|t| t.user_id != user_id_clone); + indicators.push(TypingIndicator { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + cell: format!("{},{}", row, col), + worksheet_index: ws_idx, + started_at: Utc::now(), + }); + } + } + "typing_stop" => { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&sheet_id_clone) { + indicators.retain(|t| t.user_id != user_id_clone); + } + } + "selection" => { + if let Some(value) = &collab_msg.value { + if let Ok(sel_data) = serde_json::from_str::(value) { + let mut selections = get_selections().write().await; + let sels = selections.entry(sheet_id_clone.clone()).or_default(); + sels.retain(|s| s.user_id != user_id_clone); + + if let (Some(sr), Some(sc), Some(er), Some(ec), Some(ws)) = ( + sel_data.get("start_row").and_then(|v| v.as_u64()), + sel_data.get("start_col").and_then(|v| v.as_u64()), + sel_data.get("end_row").and_then(|v| v.as_u64()), + sel_data.get("end_col").and_then(|v| v.as_u64()), + sel_data.get("worksheet_index").and_then(|v| v.as_u64()), + ) { + sels.push(SelectionInfo { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + user_color: user_color_clone.clone(), + start_row: sr as u32, + start_col: sc as u32, + end_row: er as u32, + end_col: ec as u32, + worksheet_index: ws as usize, + }); + } + } + } + } + "mention" => { + if let Some(value) = &collab_msg.value { + if let Ok(mention_data) = serde_json::from_str::(value) { + if let (Some(to_user), Some(message), Some(cell)) = ( + mention_data.get("to_user_id").and_then(|v| v.as_str()), + mention_data.get("message").and_then(|v| v.as_str()), + mention_data.get("cell").and_then(|v| v.as_str()), + ) { + let mut mentions = get_mentions().write().await; + let user_mentions = mentions.entry(to_user.to_string()).or_default(); + user_mentions.push(MentionNotification { + id: uuid::Uuid::new_v4().to_string(), + sheet_id: sheet_id_clone.clone(), + from_user_id: user_id_clone.clone(), + from_user_name: user_name_clone.clone(), + to_user_id: to_user.to_string(), + cell: cell.to_string(), + message: message.to_string(), + created_at: Utc::now(), + read: false, + }); + } + } + } + } + _ => {} + } + if let Err(e) = broadcast_tx_clone.send(collab_msg) { error!("Failed to broadcast message: {}", e); } @@ -121,6 +351,9 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) { } }); + let sheet_id_leave = sheet_id.clone(); + let user_id_leave = user_id.clone(); + let leave_msg = CollabMessage { msg_type: "leave".to_string(), sheet_id: sheet_id.clone(), @@ -139,6 +372,27 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) { _ = send_task => {} } + { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&sheet_id_leave) { + users.retain(|u| u.user_id != user_id_leave); + } + } + + { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&sheet_id_leave) { + indicators.retain(|t| t.user_id != user_id_leave); + } + } + + { + let mut selections = get_selections().write().await; + if let Some(sels) = selections.get_mut(&sheet_id_leave) { + sels.retain(|s| s.user_id != user_id_leave); + } + } + if let Err(e) = broadcast_tx.send(leave_msg) { info!("User left (broadcast may have no receivers): {}", e); } @@ -171,11 +425,27 @@ pub async fn broadcast_sheet_change( } } +pub async fn mark_mention_read(user_id: &str, mention_id: &str) { + let mut mentions = get_mentions().write().await; + if let Some(user_mentions) = mentions.get_mut(user_id) { + for mention in user_mentions.iter_mut() { + if mention.id == mention_id { + mention.read = true; + } + } + } +} + +pub async fn clear_user_mentions(user_id: &str) { + let mut mentions = get_mentions().write().await; + mentions.remove(user_id); +} + fn get_random_color() -> String { use rand::Rng; let colors = [ "#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F", - "#BB8FCE", "#85C1E9", + "#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2", ]; let idx = rand::rng().random_range(0..colors.len()); colors[idx].to_string() diff --git a/src/sheet/mod.rs b/src/sheet/mod.rs index 9be488351..5ed613e0f 100644 --- a/src/sheet/mod.rs +++ b/src/sheet/mod.rs @@ -12,7 +12,10 @@ use axum::{ }; use std::sync::Arc; -pub use collaboration::{handle_get_collaborators, handle_sheet_websocket}; +pub use collaboration::{ + handle_get_collaborators, handle_get_mentions, handle_get_presence, handle_get_selections, + handle_get_typing, handle_sheet_websocket, +}; pub use handlers::{ handle_add_comment, handle_add_external_link, handle_add_note, handle_array_formula, handle_clear_filter, handle_conditional_format, handle_create_chart, handle_create_named_range, @@ -82,5 +85,9 @@ pub fn configure_sheet_routes() -> Router> { .route("/api/sheet/named-range/update", post(handle_update_named_range)) .route("/api/sheet/named-range/delete", post(handle_delete_named_range)) .route("/api/sheet/named-ranges", get(handle_list_named_ranges)) + .route("/api/sheet/:sheet_id/presence", get(handle_get_presence)) + .route("/api/sheet/:sheet_id/typing", get(handle_get_typing)) + .route("/api/sheet/:sheet_id/selections", get(handle_get_selections)) + .route("/api/sheet/mentions/:user_id", get(handle_get_mentions)) .route("/ws/sheet/:sheet_id", get(handle_sheet_websocket)) } diff --git a/src/slides/collaboration.rs b/src/slides/collaboration.rs index 27ee942fd..b1d3fc924 100644 --- a/src/slides/collaboration.rs +++ b/src/slides/collaboration.rs @@ -11,6 +11,7 @@ use axum::{ use chrono::Utc; use futures_util::{SinkExt, StreamExt}; use log::{error, info}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; @@ -19,17 +20,134 @@ pub type SlideChannels = Arc = std::sync::OnceLock::new(); +pub type PresenceMap = Arc>>>; + +static PRESENCE: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type TypingMap = Arc>>>; + +static TYPING: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type SelectionMap = Arc>>>; + +static SELECTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub type MentionMap = Arc>>>; + +static MENTIONS: std::sync::OnceLock = std::sync::OnceLock::new(); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserPresence { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub current_slide: Option, + pub current_element: Option, + pub last_active: chrono::DateTime, + pub status: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TypingIndicator { + pub user_id: String, + pub user_name: String, + pub slide_index: usize, + pub element_id: String, + pub started_at: chrono::DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SelectionInfo { + pub user_id: String, + pub user_name: String, + pub user_color: String, + pub slide_index: usize, + pub element_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MentionNotification { + pub id: String, + pub presentation_id: String, + pub from_user_id: String, + pub from_user_name: String, + pub to_user_id: String, + pub slide_index: usize, + pub message: String, + pub created_at: chrono::DateTime, + pub read: bool, +} + pub fn get_slide_channels() -> &'static SlideChannels { SLIDE_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) } +pub fn get_presence() -> &'static PresenceMap { + PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_typing() -> &'static TypingMap { + TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_selections() -> &'static SelectionMap { + SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + +pub fn get_mentions() -> &'static MentionMap { + MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new()))) +} + pub async fn handle_get_collaborators(Path(presentation_id): Path) -> impl IntoResponse { - let channels = get_slide_channels().read().await; - let count = channels + let presence = get_presence().read().await; + let users: Vec<&UserPresence> = presence .get(&presentation_id) - .map(|s| s.receiver_count()) - .unwrap_or(0); - Json(serde_json::json!({ "count": count })) + .map(|v| v.iter().collect()) + .unwrap_or_default(); + + Json(serde_json::json!({ + "count": users.len(), + "users": users + })) +} + +pub async fn handle_get_presence( + Path(presentation_id): Path, +) -> impl IntoResponse { + let presence = get_presence().read().await; + let users = presence.get(&presentation_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "users": users })) +} + +pub async fn handle_get_typing( + Path(presentation_id): Path, +) -> impl IntoResponse { + let typing = get_typing().read().await; + let indicators = typing.get(&presentation_id).cloned().unwrap_or_default(); + + let now = Utc::now(); + let active: Vec<&TypingIndicator> = indicators + .iter() + .filter(|t| (now - t.started_at).num_seconds() < 5) + .collect(); + + Json(serde_json::json!({ "typing": active })) +} + +pub async fn handle_get_selections( + Path(presentation_id): Path, +) -> impl IntoResponse { + let selections = get_selections().read().await; + let sels = selections.get(&presentation_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "selections": sels })) +} + +pub async fn handle_get_mentions( + Path(user_id): Path, +) -> impl IntoResponse { + let mentions = get_mentions().read().await; + let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default(); + Json(serde_json::json!({ "mentions": user_mentions })) } pub async fn handle_slides_websocket( @@ -59,6 +177,20 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) { let user_name = format!("User {}", &user_id[..8]); let user_color = get_random_color(); + { + let mut presence = get_presence().write().await; + let users = presence.entry(presentation_id.clone()).or_default(); + users.push(UserPresence { + user_id: user_id.clone(), + user_name: user_name.clone(), + user_color: user_color.clone(), + current_slide: Some(0), + current_element: None, + last_active: Utc::now(), + status: "active".to_string(), + }); + } + let join_msg = SlideMessage { msg_type: "join".to_string(), presentation_id: presentation_id.clone(), @@ -92,6 +224,90 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) { slide_msg.presentation_id = presentation_id_clone.clone(); slide_msg.timestamp = Utc::now(); + match slide_msg.msg_type.as_str() { + "slide_change" | "cursor" => { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&presentation_id_clone) { + for user in users.iter_mut() { + if user.user_id == user_id_clone { + user.current_slide = slide_msg.slide_index; + user.current_element = slide_msg.element_id.clone(); + user.last_active = Utc::now(); + } + } + } + } + "typing_start" => { + if let (Some(slide_idx), Some(element_id)) = + (slide_msg.slide_index, &slide_msg.element_id) { + let mut typing = get_typing().write().await; + let indicators = typing.entry(presentation_id_clone.clone()).or_default(); + indicators.retain(|t| t.user_id != user_id_clone); + indicators.push(TypingIndicator { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + slide_index: slide_idx, + element_id: element_id.clone(), + started_at: Utc::now(), + }); + } + } + "typing_stop" => { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&presentation_id_clone) { + indicators.retain(|t| t.user_id != user_id_clone); + } + } + "selection" => { + if let Some(data) = &slide_msg.data { + let mut selections = get_selections().write().await; + let sels = selections.entry(presentation_id_clone.clone()).or_default(); + sels.retain(|s| s.user_id != user_id_clone); + + if let (Some(slide_idx), Some(element_ids)) = ( + data.get("slide_index").and_then(|v| v.as_u64()), + data.get("element_ids").and_then(|v| v.as_array()), + ) { + let ids: Vec = element_ids + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + sels.push(SelectionInfo { + user_id: user_id_clone.clone(), + user_name: user_name_clone.clone(), + user_color: user_color_clone.clone(), + slide_index: slide_idx as usize, + element_ids: ids, + }); + } + } + } + "mention" => { + if let Some(data) = &slide_msg.data { + if let (Some(to_user), Some(message), Some(slide_idx)) = ( + data.get("to_user_id").and_then(|v| v.as_str()), + data.get("message").and_then(|v| v.as_str()), + data.get("slide_index").and_then(|v| v.as_u64()), + ) { + let mut mentions = get_mentions().write().await; + let user_mentions = mentions.entry(to_user.to_string()).or_default(); + user_mentions.push(MentionNotification { + id: uuid::Uuid::new_v4().to_string(), + presentation_id: presentation_id_clone.clone(), + from_user_id: user_id_clone.clone(), + from_user_name: user_name_clone.clone(), + to_user_id: to_user.to_string(), + slide_index: slide_idx as usize, + message: message.to_string(), + created_at: Utc::now(), + read: false, + }); + } + } + } + _ => {} + } + if let Err(e) = broadcast_tx_clone.send(slide_msg) { error!("Failed to broadcast message: {}", e); } @@ -120,6 +336,9 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) { } }); + let presentation_id_leave = presentation_id.clone(); + let user_id_leave = user_id.clone(); + let leave_msg = SlideMessage { msg_type: "leave".to_string(), presentation_id: presentation_id.clone(), @@ -137,6 +356,27 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) { _ = send_task => {} } + { + let mut presence = get_presence().write().await; + if let Some(users) = presence.get_mut(&presentation_id_leave) { + users.retain(|u| u.user_id != user_id_leave); + } + } + + { + let mut typing = get_typing().write().await; + if let Some(indicators) = typing.get_mut(&presentation_id_leave) { + indicators.retain(|t| t.user_id != user_id_leave); + } + } + + { + let mut selections = get_selections().write().await; + if let Some(sels) = selections.get_mut(&presentation_id_leave) { + sels.retain(|s| s.user_id != user_id_leave); + } + } + if let Err(e) = broadcast_tx.send(leave_msg) { info!("User left (broadcast may have no receivers): {}", e); } @@ -168,11 +408,27 @@ pub async fn broadcast_slide_change( } } +pub async fn mark_mention_read(user_id: &str, mention_id: &str) { + let mut mentions = get_mentions().write().await; + if let Some(user_mentions) = mentions.get_mut(user_id) { + for mention in user_mentions.iter_mut() { + if mention.id == mention_id { + mention.read = true; + } + } + } +} + +pub async fn clear_user_mentions(user_id: &str) { + let mut mentions = get_mentions().write().await; + mentions.remove(user_id); +} + fn get_random_color() -> String { use rand::Rng; let colors = [ "#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F", - "#BB8FCE", "#85C1E9", + "#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2", ]; let idx = rand::rng().random_range(0..colors.len()); colors[idx].to_string() diff --git a/src/slides/mod.rs b/src/slides/mod.rs index 35e0f1e93..3abbf81f5 100644 --- a/src/slides/mod.rs +++ b/src/slides/mod.rs @@ -12,7 +12,10 @@ use axum::{ }; use std::sync::Arc; -pub use collaboration::{handle_get_collaborators, handle_slides_websocket}; +pub use collaboration::{ + handle_get_collaborators, handle_get_mentions, handle_get_presence, handle_get_selections, + handle_get_typing, handle_slides_websocket, +}; pub use handlers::{ handle_add_element, handle_add_media, handle_add_slide, handle_apply_theme, handle_apply_transition_to_all, handle_delete_element, handle_delete_media, @@ -72,5 +75,9 @@ pub fn configure_slides_routes() -> Router> { .route("/api/slides/presenter/update", post(handle_update_presenter)) .route("/api/slides/presenter/end", post(handle_end_presenter)) .route("/api/slides/presenter/notes", get(handle_get_presenter_notes)) + .route("/api/slides/:presentation_id/presence", get(handle_get_presence)) + .route("/api/slides/:presentation_id/typing", get(handle_get_typing)) + .route("/api/slides/:presentation_id/selections", get(handle_get_selections)) + .route("/api/slides/mentions/:user_id", get(handle_get_mentions)) .route("/ws/slides/:presentation_id", get(handle_slides_websocket)) }