feat(collab): Add Phase 5 collaboration - presence, typing, selections, mentions

This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2026-01-11 12:27:40 -03:00
parent c27ba404c0
commit 9c2a4dbb97
6 changed files with 823 additions and 14 deletions

View file

@ -6,10 +6,12 @@ use axum::{
Path, State,
},
response::IntoResponse,
Json,
};
use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -19,10 +21,136 @@ pub type CollaborationChannels =
static COLLAB_CHANNELS: std::sync::OnceLock<CollaborationChannels> = std::sync::OnceLock::new();
pub type PresenceMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<UserPresence>>>>;
static PRESENCE: std::sync::OnceLock<PresenceMap> = std::sync::OnceLock::new();
pub type TypingMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<TypingIndicator>>>>;
static TYPING: std::sync::OnceLock<TypingMap> = std::sync::OnceLock::new();
pub type SelectionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<SelectionInfo>>>>;
static SELECTIONS: std::sync::OnceLock<SelectionMap> = std::sync::OnceLock::new();
pub type MentionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<MentionNotification>>>>;
static MENTIONS: std::sync::OnceLock<MentionMap> = std::sync::OnceLock::new();
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserPresence {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub cursor_position: Option<usize>,
pub last_active: chrono::DateTime<Utc>,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypingIndicator {
pub user_id: String,
pub user_name: String,
pub position: usize,
pub started_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionInfo {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub start_position: usize,
pub end_position: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MentionNotification {
pub id: String,
pub doc_id: String,
pub from_user_id: String,
pub from_user_name: String,
pub to_user_id: String,
pub position: usize,
pub message: String,
pub created_at: chrono::DateTime<Utc>,
pub read: bool,
}
pub fn get_collab_channels() -> &'static CollaborationChannels {
COLLAB_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_presence() -> &'static PresenceMap {
PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_typing() -> &'static TypingMap {
TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_selections() -> &'static SelectionMap {
SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_mentions() -> &'static MentionMap {
MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub async fn handle_get_collaborators(
Path(doc_id): Path<String>,
) -> impl IntoResponse {
let presence = get_presence().read().await;
let users: Vec<&UserPresence> = presence
.get(&doc_id)
.map(|v| v.iter().collect())
.unwrap_or_default();
Json(serde_json::json!({
"count": users.len(),
"users": users
}))
}
pub async fn handle_get_presence(
Path(doc_id): Path<String>,
) -> impl IntoResponse {
let presence = get_presence().read().await;
let users = presence.get(&doc_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "users": users }))
}
pub async fn handle_get_typing(
Path(doc_id): Path<String>,
) -> impl IntoResponse {
let typing = get_typing().read().await;
let indicators = typing.get(&doc_id).cloned().unwrap_or_default();
let now = Utc::now();
let active: Vec<&TypingIndicator> = indicators
.iter()
.filter(|t| (now - t.started_at).num_seconds() < 5)
.collect();
Json(serde_json::json!({ "typing": active }))
}
pub async fn handle_get_selections(
Path(doc_id): Path<String>,
) -> impl IntoResponse {
let selections = get_selections().read().await;
let sels = selections.get(&doc_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "selections": sels }))
}
pub async fn handle_get_mentions(
Path(user_id): Path<String>,
) -> impl IntoResponse {
let mentions = get_mentions().read().await;
let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "mentions": user_mentions }))
}
pub async fn handle_docs_websocket(
ws: WebSocketUpgrade,
Path(doc_id): Path<String>,
@ -50,6 +178,19 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) {
let user_name = format!("User {}", &user_id[..8]);
let user_color = get_random_color();
{
let mut presence = get_presence().write().await;
let users = presence.entry(doc_id.clone()).or_default();
users.push(UserPresence {
user_id: user_id.clone(),
user_name: user_name.clone(),
user_color: user_color.clone(),
cursor_position: None,
last_active: Utc::now(),
status: "active".to_string(),
});
}
let join_msg = CollabMessage {
msg_type: "join".to_string(),
doc_id: doc_id.clone(),
@ -84,6 +225,86 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) {
collab_msg.doc_id = doc_id_clone.clone();
collab_msg.timestamp = Utc::now();
match collab_msg.msg_type.as_str() {
"cursor" => {
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&doc_id_clone) {
for user in users.iter_mut() {
if user.user_id == user_id_clone {
user.cursor_position = collab_msg.position;
user.last_active = Utc::now();
}
}
}
}
"typing_start" => {
if let Some(pos) = collab_msg.position {
let mut typing = get_typing().write().await;
let indicators = typing.entry(doc_id_clone.clone()).or_default();
indicators.retain(|t| t.user_id != user_id_clone);
indicators.push(TypingIndicator {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
position: pos,
started_at: Utc::now(),
});
}
}
"typing_stop" => {
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&doc_id_clone) {
indicators.retain(|t| t.user_id != user_id_clone);
}
}
"selection" => {
if let Some(content) = &collab_msg.content {
if let Ok(sel_data) = serde_json::from_str::<serde_json::Value>(content) {
let mut selections = get_selections().write().await;
let sels = selections.entry(doc_id_clone.clone()).or_default();
sels.retain(|s| s.user_id != user_id_clone);
if let (Some(start), Some(end)) = (
sel_data.get("start").and_then(|v| v.as_u64()),
sel_data.get("end").and_then(|v| v.as_u64()),
) {
sels.push(SelectionInfo {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
user_color: user_color_clone.clone(),
start_position: start as usize,
end_position: end as usize,
});
}
}
}
}
"mention" => {
if let Some(content) = &collab_msg.content {
if let Ok(mention_data) = serde_json::from_str::<serde_json::Value>(content) {
if let (Some(to_user), Some(message)) = (
mention_data.get("to_user_id").and_then(|v| v.as_str()),
mention_data.get("message").and_then(|v| v.as_str()),
) {
let mut mentions = get_mentions().write().await;
let user_mentions = mentions.entry(to_user.to_string()).or_default();
user_mentions.push(MentionNotification {
id: uuid::Uuid::new_v4().to_string(),
doc_id: doc_id_clone.clone(),
from_user_id: user_id_clone.clone(),
from_user_name: user_name_clone.clone(),
to_user_id: to_user.to_string(),
position: collab_msg.position.unwrap_or(0),
message: message.to_string(),
created_at: Utc::now(),
read: false,
});
}
}
}
}
_ => {}
}
if let Err(e) = broadcast_tx_clone.send(collab_msg) {
error!("Failed to broadcast message: {}", e);
}
@ -112,6 +333,9 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) {
}
});
let doc_id_leave = doc_id.clone();
let user_id_leave = user_id.clone();
let leave_msg = CollabMessage {
msg_type: "leave".to_string(),
doc_id: doc_id.clone(),
@ -130,6 +354,27 @@ async fn handle_docs_connection(socket: WebSocket, doc_id: String) {
_ = send_task => {}
}
{
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&doc_id_leave) {
users.retain(|u| u.user_id != user_id_leave);
}
}
{
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&doc_id_leave) {
indicators.retain(|t| t.user_id != user_id_leave);
}
}
{
let mut selections = get_selections().write().await;
if let Some(sels) = selections.get_mut(&doc_id_leave) {
sels.retain(|s| s.user_id != user_id_leave);
}
}
if let Err(e) = broadcast_tx.send(leave_msg) {
info!("User left (broadcast may have no receivers): {}", e);
}
@ -160,11 +405,27 @@ pub async fn broadcast_doc_change(
}
}
pub async fn mark_mention_read(user_id: &str, mention_id: &str) {
let mut mentions = get_mentions().write().await;
if let Some(user_mentions) = mentions.get_mut(user_id) {
for mention in user_mentions.iter_mut() {
if mention.id == mention_id {
mention.read = true;
}
}
}
}
pub async fn clear_user_mentions(user_id: &str) {
let mut mentions = get_mentions().write().await;
mentions.remove(user_id);
}
fn get_random_color() -> String {
use rand::Rng;
let colors = [
"#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F",
"#BB8FCE", "#85C1E9",
"#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2",
];
let idx = rand::rng().random_range(0..colors.len());
colors[idx].to_string()

View file

@ -12,7 +12,10 @@ use axum::{
};
use std::sync::Arc;
pub use collaboration::handle_docs_websocket;
pub use collaboration::{
handle_docs_websocket, handle_get_collaborators, handle_get_mentions, handle_get_presence,
handle_get_selections, handle_get_typing,
};
pub use handlers::{
handle_accept_reject_all, handle_accept_reject_change, handle_add_comment, handle_add_endnote,
handle_add_footnote, handle_ai_custom, handle_ai_expand, handle_ai_improve, handle_ai_simplify,
@ -88,5 +91,10 @@ pub fn configure_docs_routes() -> Router<Arc<AppState>> {
.route("/api/docs/styles", get(handle_list_styles))
.route("/api/docs/outline", post(handle_get_outline))
.route("/api/docs/compare", post(handle_compare_documents))
.route("/api/docs/:doc_id/collaborators", get(handle_get_collaborators))
.route("/api/docs/:doc_id/presence", get(handle_get_presence))
.route("/api/docs/:doc_id/typing", get(handle_get_typing))
.route("/api/docs/:doc_id/selections", get(handle_get_selections))
.route("/api/docs/mentions/:user_id", get(handle_get_mentions))
.route("/ws/docs/:doc_id", get(handle_docs_websocket))
}

View file

@ -11,6 +11,7 @@ use axum::{
use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -20,16 +21,139 @@ pub type CollaborationChannels =
static COLLAB_CHANNELS: std::sync::OnceLock<CollaborationChannels> = std::sync::OnceLock::new();
pub type PresenceMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<UserPresence>>>>;
static PRESENCE: std::sync::OnceLock<PresenceMap> = std::sync::OnceLock::new();
pub type TypingMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<TypingIndicator>>>>;
static TYPING: std::sync::OnceLock<TypingMap> = std::sync::OnceLock::new();
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserPresence {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub current_cell: Option<String>,
pub current_worksheet: Option<usize>,
pub last_active: chrono::DateTime<Utc>,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypingIndicator {
pub user_id: String,
pub user_name: String,
pub cell: String,
pub worksheet_index: usize,
pub started_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionInfo {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub start_row: u32,
pub start_col: u32,
pub end_row: u32,
pub end_col: u32,
pub worksheet_index: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MentionNotification {
pub id: String,
pub sheet_id: String,
pub from_user_id: String,
pub from_user_name: String,
pub to_user_id: String,
pub cell: String,
pub message: String,
pub created_at: chrono::DateTime<Utc>,
pub read: bool,
}
pub type SelectionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<SelectionInfo>>>>;
static SELECTIONS: std::sync::OnceLock<SelectionMap> = std::sync::OnceLock::new();
pub type MentionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<MentionNotification>>>>;
static MENTIONS: std::sync::OnceLock<MentionMap> = std::sync::OnceLock::new();
pub fn get_collab_channels() -> &'static CollaborationChannels {
COLLAB_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_presence() -> &'static PresenceMap {
PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_typing() -> &'static TypingMap {
TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_selections() -> &'static SelectionMap {
SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_mentions() -> &'static MentionMap {
MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub async fn handle_get_collaborators(
Path(sheet_id): Path<String>,
) -> impl IntoResponse {
let channels = get_collab_channels().read().await;
let count = channels.get(&sheet_id).map(|s| s.receiver_count()).unwrap_or(0);
Json(serde_json::json!({ "count": count }))
let presence = get_presence().read().await;
let users: Vec<&UserPresence> = presence
.get(&sheet_id)
.map(|v| v.iter().collect())
.unwrap_or_default();
Json(serde_json::json!({
"count": users.len(),
"users": users
}))
}
pub async fn handle_get_presence(
Path(sheet_id): Path<String>,
) -> impl IntoResponse {
let presence = get_presence().read().await;
let users = presence.get(&sheet_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "users": users }))
}
pub async fn handle_get_typing(
Path(sheet_id): Path<String>,
) -> impl IntoResponse {
let typing = get_typing().read().await;
let indicators = typing.get(&sheet_id).cloned().unwrap_or_default();
let now = Utc::now();
let active: Vec<&TypingIndicator> = indicators
.iter()
.filter(|t| (now - t.started_at).num_seconds() < 5)
.collect();
Json(serde_json::json!({ "typing": active }))
}
pub async fn handle_get_selections(
Path(sheet_id): Path<String>,
) -> impl IntoResponse {
let selections = get_selections().read().await;
let sels = selections.get(&sheet_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "selections": sels }))
}
pub async fn handle_get_mentions(
Path(user_id): Path<String>,
) -> impl IntoResponse {
let mentions = get_mentions().read().await;
let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "mentions": user_mentions }))
}
pub async fn handle_sheet_websocket(
@ -59,6 +183,20 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) {
let user_name = format!("User {}", &user_id[..8]);
let user_color = get_random_color();
{
let mut presence = get_presence().write().await;
let users = presence.entry(sheet_id.clone()).or_default();
users.push(UserPresence {
user_id: user_id.clone(),
user_name: user_name.clone(),
user_color: user_color.clone(),
current_cell: None,
current_worksheet: Some(0),
last_active: Utc::now(),
status: "active".to_string(),
});
}
let join_msg = CollabMessage {
msg_type: "join".to_string(),
sheet_id: sheet_id.clone(),
@ -93,6 +231,98 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) {
collab_msg.sheet_id = sheet_id_clone.clone();
collab_msg.timestamp = Utc::now();
match collab_msg.msg_type.as_str() {
"cursor" | "cell_select" => {
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&sheet_id_clone) {
for user in users.iter_mut() {
if user.user_id == user_id_clone {
if let (Some(row), Some(col)) = (collab_msg.row, collab_msg.col) {
user.current_cell = Some(format!("{},{}", row, col));
}
user.current_worksheet = collab_msg.worksheet_index;
user.last_active = Utc::now();
}
}
}
}
"typing_start" => {
if let (Some(row), Some(col), Some(ws_idx)) =
(collab_msg.row, collab_msg.col, collab_msg.worksheet_index) {
let mut typing = get_typing().write().await;
let indicators = typing.entry(sheet_id_clone.clone()).or_default();
indicators.retain(|t| t.user_id != user_id_clone);
indicators.push(TypingIndicator {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
cell: format!("{},{}", row, col),
worksheet_index: ws_idx,
started_at: Utc::now(),
});
}
}
"typing_stop" => {
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&sheet_id_clone) {
indicators.retain(|t| t.user_id != user_id_clone);
}
}
"selection" => {
if let Some(value) = &collab_msg.value {
if let Ok(sel_data) = serde_json::from_str::<serde_json::Value>(value) {
let mut selections = get_selections().write().await;
let sels = selections.entry(sheet_id_clone.clone()).or_default();
sels.retain(|s| s.user_id != user_id_clone);
if let (Some(sr), Some(sc), Some(er), Some(ec), Some(ws)) = (
sel_data.get("start_row").and_then(|v| v.as_u64()),
sel_data.get("start_col").and_then(|v| v.as_u64()),
sel_data.get("end_row").and_then(|v| v.as_u64()),
sel_data.get("end_col").and_then(|v| v.as_u64()),
sel_data.get("worksheet_index").and_then(|v| v.as_u64()),
) {
sels.push(SelectionInfo {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
user_color: user_color_clone.clone(),
start_row: sr as u32,
start_col: sc as u32,
end_row: er as u32,
end_col: ec as u32,
worksheet_index: ws as usize,
});
}
}
}
}
"mention" => {
if let Some(value) = &collab_msg.value {
if let Ok(mention_data) = serde_json::from_str::<serde_json::Value>(value) {
if let (Some(to_user), Some(message), Some(cell)) = (
mention_data.get("to_user_id").and_then(|v| v.as_str()),
mention_data.get("message").and_then(|v| v.as_str()),
mention_data.get("cell").and_then(|v| v.as_str()),
) {
let mut mentions = get_mentions().write().await;
let user_mentions = mentions.entry(to_user.to_string()).or_default();
user_mentions.push(MentionNotification {
id: uuid::Uuid::new_v4().to_string(),
sheet_id: sheet_id_clone.clone(),
from_user_id: user_id_clone.clone(),
from_user_name: user_name_clone.clone(),
to_user_id: to_user.to_string(),
cell: cell.to_string(),
message: message.to_string(),
created_at: Utc::now(),
read: false,
});
}
}
}
}
_ => {}
}
if let Err(e) = broadcast_tx_clone.send(collab_msg) {
error!("Failed to broadcast message: {}", e);
}
@ -121,6 +351,9 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) {
}
});
let sheet_id_leave = sheet_id.clone();
let user_id_leave = user_id.clone();
let leave_msg = CollabMessage {
msg_type: "leave".to_string(),
sheet_id: sheet_id.clone(),
@ -139,6 +372,27 @@ async fn handle_sheet_connection(socket: WebSocket, sheet_id: String) {
_ = send_task => {}
}
{
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&sheet_id_leave) {
users.retain(|u| u.user_id != user_id_leave);
}
}
{
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&sheet_id_leave) {
indicators.retain(|t| t.user_id != user_id_leave);
}
}
{
let mut selections = get_selections().write().await;
if let Some(sels) = selections.get_mut(&sheet_id_leave) {
sels.retain(|s| s.user_id != user_id_leave);
}
}
if let Err(e) = broadcast_tx.send(leave_msg) {
info!("User left (broadcast may have no receivers): {}", e);
}
@ -171,11 +425,27 @@ pub async fn broadcast_sheet_change(
}
}
pub async fn mark_mention_read(user_id: &str, mention_id: &str) {
let mut mentions = get_mentions().write().await;
if let Some(user_mentions) = mentions.get_mut(user_id) {
for mention in user_mentions.iter_mut() {
if mention.id == mention_id {
mention.read = true;
}
}
}
}
pub async fn clear_user_mentions(user_id: &str) {
let mut mentions = get_mentions().write().await;
mentions.remove(user_id);
}
fn get_random_color() -> String {
use rand::Rng;
let colors = [
"#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F",
"#BB8FCE", "#85C1E9",
"#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2",
];
let idx = rand::rng().random_range(0..colors.len());
colors[idx].to_string()

View file

@ -12,7 +12,10 @@ use axum::{
};
use std::sync::Arc;
pub use collaboration::{handle_get_collaborators, handle_sheet_websocket};
pub use collaboration::{
handle_get_collaborators, handle_get_mentions, handle_get_presence, handle_get_selections,
handle_get_typing, handle_sheet_websocket,
};
pub use handlers::{
handle_add_comment, handle_add_external_link, handle_add_note, handle_array_formula,
handle_clear_filter, handle_conditional_format, handle_create_chart, handle_create_named_range,
@ -82,5 +85,9 @@ pub fn configure_sheet_routes() -> Router<Arc<AppState>> {
.route("/api/sheet/named-range/update", post(handle_update_named_range))
.route("/api/sheet/named-range/delete", post(handle_delete_named_range))
.route("/api/sheet/named-ranges", get(handle_list_named_ranges))
.route("/api/sheet/:sheet_id/presence", get(handle_get_presence))
.route("/api/sheet/:sheet_id/typing", get(handle_get_typing))
.route("/api/sheet/:sheet_id/selections", get(handle_get_selections))
.route("/api/sheet/mentions/:user_id", get(handle_get_mentions))
.route("/ws/sheet/:sheet_id", get(handle_sheet_websocket))
}

View file

@ -11,6 +11,7 @@ use axum::{
use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -19,17 +20,134 @@ pub type SlideChannels = Arc<tokio::sync::RwLock<HashMap<String, broadcast::Send
static SLIDE_CHANNELS: std::sync::OnceLock<SlideChannels> = std::sync::OnceLock::new();
pub type PresenceMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<UserPresence>>>>;
static PRESENCE: std::sync::OnceLock<PresenceMap> = std::sync::OnceLock::new();
pub type TypingMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<TypingIndicator>>>>;
static TYPING: std::sync::OnceLock<TypingMap> = std::sync::OnceLock::new();
pub type SelectionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<SelectionInfo>>>>;
static SELECTIONS: std::sync::OnceLock<SelectionMap> = std::sync::OnceLock::new();
pub type MentionMap = Arc<tokio::sync::RwLock<HashMap<String, Vec<MentionNotification>>>>;
static MENTIONS: std::sync::OnceLock<MentionMap> = std::sync::OnceLock::new();
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserPresence {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub current_slide: Option<usize>,
pub current_element: Option<String>,
pub last_active: chrono::DateTime<Utc>,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypingIndicator {
pub user_id: String,
pub user_name: String,
pub slide_index: usize,
pub element_id: String,
pub started_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectionInfo {
pub user_id: String,
pub user_name: String,
pub user_color: String,
pub slide_index: usize,
pub element_ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MentionNotification {
pub id: String,
pub presentation_id: String,
pub from_user_id: String,
pub from_user_name: String,
pub to_user_id: String,
pub slide_index: usize,
pub message: String,
pub created_at: chrono::DateTime<Utc>,
pub read: bool,
}
pub fn get_slide_channels() -> &'static SlideChannels {
SLIDE_CHANNELS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_presence() -> &'static PresenceMap {
PRESENCE.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_typing() -> &'static TypingMap {
TYPING.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_selections() -> &'static SelectionMap {
SELECTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub fn get_mentions() -> &'static MentionMap {
MENTIONS.get_or_init(|| Arc::new(tokio::sync::RwLock::new(HashMap::new())))
}
pub async fn handle_get_collaborators(Path(presentation_id): Path<String>) -> impl IntoResponse {
let channels = get_slide_channels().read().await;
let count = channels
let presence = get_presence().read().await;
let users: Vec<&UserPresence> = presence
.get(&presentation_id)
.map(|s| s.receiver_count())
.unwrap_or(0);
Json(serde_json::json!({ "count": count }))
.map(|v| v.iter().collect())
.unwrap_or_default();
Json(serde_json::json!({
"count": users.len(),
"users": users
}))
}
pub async fn handle_get_presence(
Path(presentation_id): Path<String>,
) -> impl IntoResponse {
let presence = get_presence().read().await;
let users = presence.get(&presentation_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "users": users }))
}
pub async fn handle_get_typing(
Path(presentation_id): Path<String>,
) -> impl IntoResponse {
let typing = get_typing().read().await;
let indicators = typing.get(&presentation_id).cloned().unwrap_or_default();
let now = Utc::now();
let active: Vec<&TypingIndicator> = indicators
.iter()
.filter(|t| (now - t.started_at).num_seconds() < 5)
.collect();
Json(serde_json::json!({ "typing": active }))
}
pub async fn handle_get_selections(
Path(presentation_id): Path<String>,
) -> impl IntoResponse {
let selections = get_selections().read().await;
let sels = selections.get(&presentation_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "selections": sels }))
}
pub async fn handle_get_mentions(
Path(user_id): Path<String>,
) -> impl IntoResponse {
let mentions = get_mentions().read().await;
let user_mentions = mentions.get(&user_id).cloned().unwrap_or_default();
Json(serde_json::json!({ "mentions": user_mentions }))
}
pub async fn handle_slides_websocket(
@ -59,6 +177,20 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) {
let user_name = format!("User {}", &user_id[..8]);
let user_color = get_random_color();
{
let mut presence = get_presence().write().await;
let users = presence.entry(presentation_id.clone()).or_default();
users.push(UserPresence {
user_id: user_id.clone(),
user_name: user_name.clone(),
user_color: user_color.clone(),
current_slide: Some(0),
current_element: None,
last_active: Utc::now(),
status: "active".to_string(),
});
}
let join_msg = SlideMessage {
msg_type: "join".to_string(),
presentation_id: presentation_id.clone(),
@ -92,6 +224,90 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) {
slide_msg.presentation_id = presentation_id_clone.clone();
slide_msg.timestamp = Utc::now();
match slide_msg.msg_type.as_str() {
"slide_change" | "cursor" => {
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&presentation_id_clone) {
for user in users.iter_mut() {
if user.user_id == user_id_clone {
user.current_slide = slide_msg.slide_index;
user.current_element = slide_msg.element_id.clone();
user.last_active = Utc::now();
}
}
}
}
"typing_start" => {
if let (Some(slide_idx), Some(element_id)) =
(slide_msg.slide_index, &slide_msg.element_id) {
let mut typing = get_typing().write().await;
let indicators = typing.entry(presentation_id_clone.clone()).or_default();
indicators.retain(|t| t.user_id != user_id_clone);
indicators.push(TypingIndicator {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
slide_index: slide_idx,
element_id: element_id.clone(),
started_at: Utc::now(),
});
}
}
"typing_stop" => {
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&presentation_id_clone) {
indicators.retain(|t| t.user_id != user_id_clone);
}
}
"selection" => {
if let Some(data) = &slide_msg.data {
let mut selections = get_selections().write().await;
let sels = selections.entry(presentation_id_clone.clone()).or_default();
sels.retain(|s| s.user_id != user_id_clone);
if let (Some(slide_idx), Some(element_ids)) = (
data.get("slide_index").and_then(|v| v.as_u64()),
data.get("element_ids").and_then(|v| v.as_array()),
) {
let ids: Vec<String> = element_ids
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
sels.push(SelectionInfo {
user_id: user_id_clone.clone(),
user_name: user_name_clone.clone(),
user_color: user_color_clone.clone(),
slide_index: slide_idx as usize,
element_ids: ids,
});
}
}
}
"mention" => {
if let Some(data) = &slide_msg.data {
if let (Some(to_user), Some(message), Some(slide_idx)) = (
data.get("to_user_id").and_then(|v| v.as_str()),
data.get("message").and_then(|v| v.as_str()),
data.get("slide_index").and_then(|v| v.as_u64()),
) {
let mut mentions = get_mentions().write().await;
let user_mentions = mentions.entry(to_user.to_string()).or_default();
user_mentions.push(MentionNotification {
id: uuid::Uuid::new_v4().to_string(),
presentation_id: presentation_id_clone.clone(),
from_user_id: user_id_clone.clone(),
from_user_name: user_name_clone.clone(),
to_user_id: to_user.to_string(),
slide_index: slide_idx as usize,
message: message.to_string(),
created_at: Utc::now(),
read: false,
});
}
}
}
_ => {}
}
if let Err(e) = broadcast_tx_clone.send(slide_msg) {
error!("Failed to broadcast message: {}", e);
}
@ -120,6 +336,9 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) {
}
});
let presentation_id_leave = presentation_id.clone();
let user_id_leave = user_id.clone();
let leave_msg = SlideMessage {
msg_type: "leave".to_string(),
presentation_id: presentation_id.clone(),
@ -137,6 +356,27 @@ async fn handle_slides_connection(socket: WebSocket, presentation_id: String) {
_ = send_task => {}
}
{
let mut presence = get_presence().write().await;
if let Some(users) = presence.get_mut(&presentation_id_leave) {
users.retain(|u| u.user_id != user_id_leave);
}
}
{
let mut typing = get_typing().write().await;
if let Some(indicators) = typing.get_mut(&presentation_id_leave) {
indicators.retain(|t| t.user_id != user_id_leave);
}
}
{
let mut selections = get_selections().write().await;
if let Some(sels) = selections.get_mut(&presentation_id_leave) {
sels.retain(|s| s.user_id != user_id_leave);
}
}
if let Err(e) = broadcast_tx.send(leave_msg) {
info!("User left (broadcast may have no receivers): {}", e);
}
@ -168,11 +408,27 @@ pub async fn broadcast_slide_change(
}
}
pub async fn mark_mention_read(user_id: &str, mention_id: &str) {
let mut mentions = get_mentions().write().await;
if let Some(user_mentions) = mentions.get_mut(user_id) {
for mention in user_mentions.iter_mut() {
if mention.id == mention_id {
mention.read = true;
}
}
}
}
pub async fn clear_user_mentions(user_id: &str) {
let mut mentions = get_mentions().write().await;
mentions.remove(user_id);
}
fn get_random_color() -> String {
use rand::Rng;
let colors = [
"#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#98D8C8", "#F7DC6F",
"#BB8FCE", "#85C1E9",
"#BB8FCE", "#85C1E9", "#F1948A", "#82E0AA", "#F8C471", "#AED6F1", "#D7BDE2",
];
let idx = rand::rng().random_range(0..colors.len());
colors[idx].to_string()

View file

@ -12,7 +12,10 @@ use axum::{
};
use std::sync::Arc;
pub use collaboration::{handle_get_collaborators, handle_slides_websocket};
pub use collaboration::{
handle_get_collaborators, handle_get_mentions, handle_get_presence, handle_get_selections,
handle_get_typing, handle_slides_websocket,
};
pub use handlers::{
handle_add_element, handle_add_media, handle_add_slide, handle_apply_theme,
handle_apply_transition_to_all, handle_delete_element, handle_delete_media,
@ -72,5 +75,9 @@ pub fn configure_slides_routes() -> Router<Arc<AppState>> {
.route("/api/slides/presenter/update", post(handle_update_presenter))
.route("/api/slides/presenter/end", post(handle_end_presenter))
.route("/api/slides/presenter/notes", get(handle_get_presenter_notes))
.route("/api/slides/:presentation_id/presence", get(handle_get_presence))
.route("/api/slides/:presentation_id/typing", get(handle_get_typing))
.route("/api/slides/:presentation_id/selections", get(handle_get_selections))
.route("/api/slides/mentions/:user_id", get(handle_get_mentions))
.route("/ws/slides/:presentation_id", get(handle_slides_websocket))
}