From 998e4c2806363fa71dfca1618b556d25bbf0354f Mon Sep 17 00:00:00 2001 From: "Rodrigo Rodriguez (Pragmatismo)" Date: Thu, 8 Jan 2026 13:44:45 -0300 Subject: [PATCH] feat(video): Complete video editing module implementation - Complete engine.rs with all AI-powered video operations - Complete handlers.rs with 28+ HTTP API endpoints - Add analytics.rs for video engagement tracking - Add mcp_tools.rs for AI agent integration (6 tools) - Add render.rs with FFmpeg worker and .gbdrive storage - Add websocket.rs for real-time export progress - Wire up all submodules and routes in mod.rs AI features: transcription, auto-captions, TTS, scene detection, auto-reframe, background removal, enhancement, beat sync, waveforms Follows PROMPT.md: SafeCommand, SafeErrorResponse, no unwrap/comments --- src/video/analytics.rs | 347 +++++++++++++++++++++++ src/video/engine.rs | 144 ++++++++++ src/video/handlers.rs | 261 ++++++++++++++++-- src/video/mcp_tools.rs | 608 +++++++++++++++++++++++++++++++++++++++++ src/video/mod.rs | 103 ++++++- src/video/render.rs | 465 +++++++++++++++++++++++++++++++ src/video/websocket.rs | 200 ++++++++++++++ 7 files changed, 2093 insertions(+), 35 deletions(-) create mode 100644 src/video/analytics.rs create mode 100644 src/video/mcp_tools.rs create mode 100644 src/video/render.rs create mode 100644 src/video/websocket.rs diff --git a/src/video/analytics.rs b/src/video/analytics.rs new file mode 100644 index 000000000..1595227be --- /dev/null +++ b/src/video/analytics.rs @@ -0,0 +1,347 @@ +use chrono::Utc; +use diesel::prelude::*; +use std::sync::Arc; +use tracing::error; +use uuid::Uuid; + +use crate::shared::utils::DbPool; + +use super::models::*; +use super::schema::*; + +pub struct AnalyticsEngine { + db: DbPool, +} + +impl AnalyticsEngine { + pub fn new(db: DbPool) -> Self { + Self { db } + } + + fn get_conn( + &self, + ) -> Result< + diesel::r2d2::PooledConnection>, + diesel::result::Error, + > { + self.db.get().map_err(|e| { + error!("DB connection error: {e}"); + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::Unknown, + Box::new(e.to_string()), + ) + }) + } + + pub async fn get_or_create_analytics( + &self, + project_id: Uuid, + export_id: Option, + ) -> Result { + let mut conn = self.get_conn()?; + + let existing: Result = video_analytics::table + .filter(video_analytics::project_id.eq(project_id)) + .filter(video_analytics::export_id.eq(export_id)) + .first(&mut conn); + + match existing { + Ok(analytics) => Ok(analytics), + Err(diesel::result::Error::NotFound) => { + let analytics = VideoAnalytics { + id: Uuid::new_v4(), + project_id, + export_id, + views: 0, + unique_viewers: 0, + total_watch_time_ms: 0, + avg_watch_percent: 0.0, + completions: 0, + shares: 0, + likes: 0, + engagement_score: 0.0, + viewer_retention_json: Some(serde_json::json!([])), + geography_json: Some(serde_json::json!({})), + device_json: Some(serde_json::json!({ + "desktop": 0, + "mobile": 0, + "tablet": 0, + "tv": 0 + })), + created_at: Utc::now(), + updated_at: Utc::now(), + }; + + diesel::insert_into(video_analytics::table) + .values(&analytics) + .execute(&mut conn)?; + + Ok(analytics) + } + Err(e) => Err(e), + } + } + + pub async fn record_view( + &self, + req: RecordViewRequest, + ) -> Result { + let mut conn = self.get_conn()?; + + let analytics: VideoAnalytics = video_analytics::table + .filter(video_analytics::export_id.eq(Some(req.export_id))) + .first(&mut conn)?; + + let new_views = analytics.views + 1; + let new_watch_time = analytics.total_watch_time_ms + req.watch_time_ms; + let new_completions = if req.completed { + analytics.completions + 1 + } else { + analytics.completions + }; + + let mut geo_json = analytics + .geography_json + .clone() + .unwrap_or(serde_json::json!({})); + if let Some(country) = &req.country { + if let Some(obj) = geo_json.as_object_mut() { + let count = obj.get(country).and_then(|v| v.as_i64()).unwrap_or(0); + obj.insert(country.clone(), serde_json::json!(count + 1)); + } + } + + let mut device_json = analytics + .device_json + .clone() + .unwrap_or(serde_json::json!({})); + if let Some(device) = &req.device { + if let Some(obj) = device_json.as_object_mut() { + let count = obj.get(device).and_then(|v| v.as_i64()).unwrap_or(0); + obj.insert(device.clone(), serde_json::json!(count + 1)); + } + } + + let engagement_score = calculate_engagement_score( + new_views, + new_completions, + analytics.shares, + analytics.likes, + ); + + diesel::update(video_analytics::table.find(analytics.id)) + .set(( + video_analytics::views.eq(new_views), + video_analytics::total_watch_time_ms.eq(new_watch_time), + video_analytics::completions.eq(new_completions), + video_analytics::engagement_score.eq(engagement_score), + video_analytics::geography_json.eq(&geo_json), + video_analytics::device_json.eq(&device_json), + video_analytics::updated_at.eq(Utc::now()), + )) + .execute(&mut conn)?; + + video_analytics::table.find(analytics.id).first(&mut conn) + } + + pub async fn get_analytics( + &self, + project_id: Uuid, + ) -> Result { + let mut conn = self.get_conn()?; + + let analytics: VideoAnalytics = video_analytics::table + .filter(video_analytics::project_id.eq(project_id)) + .first(&mut conn)?; + + let viewer_retention = parse_retention(&analytics.viewer_retention_json); + let top_countries = parse_geography(&analytics.geography_json); + let devices = parse_devices(&analytics.device_json); + + Ok(AnalyticsResponse { + views: analytics.views, + unique_viewers: analytics.unique_viewers, + total_watch_time_ms: analytics.total_watch_time_ms, + avg_watch_percent: analytics.avg_watch_percent, + completions: analytics.completions, + shares: analytics.shares, + likes: analytics.likes, + engagement_score: analytics.engagement_score, + viewer_retention, + top_countries, + devices, + }) + } + + pub async fn increment_shares(&self, project_id: Uuid) -> Result<(), diesel::result::Error> { + let mut conn = self.get_conn()?; + + diesel::update( + video_analytics::table.filter(video_analytics::project_id.eq(project_id)), + ) + .set(( + video_analytics::shares.eq(video_analytics::shares + 1), + video_analytics::updated_at.eq(Utc::now()), + )) + .execute(&mut conn)?; + + Ok(()) + } + + pub async fn increment_likes(&self, project_id: Uuid) -> Result<(), diesel::result::Error> { + let mut conn = self.get_conn()?; + + diesel::update( + video_analytics::table.filter(video_analytics::project_id.eq(project_id)), + ) + .set(( + video_analytics::likes.eq(video_analytics::likes + 1), + video_analytics::updated_at.eq(Utc::now()), + )) + .execute(&mut conn)?; + + Ok(()) + } +} + +fn calculate_engagement_score(views: i64, completions: i64, shares: i64, likes: i64) -> f32 { + if views == 0 { + return 0.0; + } + + let completion_rate = completions as f32 / views as f32; + let share_rate = shares as f32 / views as f32; + let like_rate = likes as f32 / views as f32; + + (completion_rate * 0.5 + share_rate * 0.3 + like_rate * 0.2) * 100.0 +} + +fn parse_retention(json: &Option) -> Vec { + json.as_ref() + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|item| { + Some(RetentionPoint { + percent: item.get("percent")?.as_f64()? as f32, + viewers: item.get("viewers")?.as_i64()?, + }) + }) + .collect() + }) + .unwrap_or_default() +} + +fn parse_geography(json: &Option) -> Vec { + let obj = match json.as_ref().and_then(|v| v.as_object()) { + Some(o) => o, + None => return vec![], + }; + + let total: i64 = obj.values().filter_map(|v| v.as_i64()).sum(); + if total == 0 { + return vec![]; + } + + let mut data: Vec = obj + .iter() + .filter_map(|(country, views)| { + let v = views.as_i64()?; + Some(GeoData { + country: country.clone(), + views: v, + percent: (v as f32 / total as f32) * 100.0, + }) + }) + .collect(); + + data.sort_by(|a, b| b.views.cmp(&a.views)); + data.truncate(10); + data +} + +fn parse_devices(json: &Option) -> DeviceBreakdown { + let obj = match json.as_ref().and_then(|v| v.as_object()) { + Some(o) => o, + None => { + return DeviceBreakdown { + desktop: 0.0, + mobile: 0.0, + tablet: 0.0, + tv: 0.0, + } + } + }; + + let desktop = obj.get("desktop").and_then(|v| v.as_i64()).unwrap_or(0) as f32; + let mobile = obj.get("mobile").and_then(|v| v.as_i64()).unwrap_or(0) as f32; + let tablet = obj.get("tablet").and_then(|v| v.as_i64()).unwrap_or(0) as f32; + let tv = obj.get("tv").and_then(|v| v.as_i64()).unwrap_or(0) as f32; + + let total = desktop + mobile + tablet + tv; + if total == 0.0 { + return DeviceBreakdown { + desktop: 0.0, + mobile: 0.0, + tablet: 0.0, + tv: 0.0, + }; + } + + DeviceBreakdown { + desktop: (desktop / total) * 100.0, + mobile: (mobile / total) * 100.0, + tablet: (tablet / total) * 100.0, + tv: (tv / total) * 100.0, + } +} + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Json}, +}; + +use crate::security::error_sanitizer::SafeErrorResponse; +use crate::shared::state::AppState; + +pub async fn get_analytics_handler( + State(state): State>, + Path(project_id): Path, +) -> impl IntoResponse { + let engine = AnalyticsEngine::new(state.db.clone()); + + let _ = engine.get_or_create_analytics(project_id, None).await; + + match engine.get_analytics(project_id).await { + Ok(analytics) => (StatusCode::OK, Json(serde_json::json!(analytics))), + Err(e) => { + error!("Failed to get analytics: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn record_view_handler( + State(state): State>, + Json(req): Json, +) -> impl IntoResponse { + let engine = AnalyticsEngine::new(state.db.clone()); + + match engine.record_view(req).await { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ "success": true })), + ), + Err(e) => { + error!("Failed to record view: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} diff --git a/src/video/engine.rs b/src/video/engine.rs index ecf3a907e..ff50908c4 100644 --- a/src/video/engine.rs +++ b/src/video/engine.rs @@ -1171,4 +1171,148 @@ impl VideoEngine { sample_rate: result["sample_rate"].as_i64().unwrap_or(10) as i32, }) } + + pub async fn process_chat_command( + &self, + project_id: Uuid, + message: &str, + playhead_ms: Option, + selection: Option, + ) -> Result> { + let lower_msg = message.to_lowercase(); + let mut commands_executed = Vec::new(); + + if lower_msg.contains("add text") || lower_msg.contains("add title") { + let content = extract_quoted_text(message).unwrap_or_else(|| "Text".to_string()); + let at_ms = playhead_ms.unwrap_or(0); + + self.add_layer( + project_id, + AddLayerRequest { + name: Some("Text".to_string()), + layer_type: "text".to_string(), + start_ms: Some(at_ms), + end_ms: Some(at_ms + 5000), + x: Some(0.5), + y: Some(0.5), + width: Some(0.8), + height: Some(0.2), + properties: Some(serde_json::json!({ + "content": content, + "font_family": "Arial", + "font_size": 48, + "color": "#FFFFFF", + })), + }, + ) + .await?; + + commands_executed.push("Added text layer".to_string()); + } + + if lower_msg.contains("delete") || lower_msg.contains("remove") { + if let Some(sel) = &selection { + if let Some(layer_id) = sel.get("layer_id").and_then(|v| v.as_str()) { + if let Ok(id) = Uuid::parse_str(layer_id) { + self.delete_layer(id).await?; + commands_executed.push("Deleted layer".to_string()); + } + } else if let Some(clip_id) = sel.get("clip_id").and_then(|v| v.as_str()) { + if let Ok(id) = Uuid::parse_str(clip_id) { + self.delete_clip(id).await?; + commands_executed.push("Deleted clip".to_string()); + } + } + } + } + + if lower_msg.contains("split") { + if let Some(sel) = &selection { + if let Some(clip_id) = sel.get("clip_id").and_then(|v| v.as_str()) { + if let Ok(id) = Uuid::parse_str(clip_id) { + let at = playhead_ms.unwrap_or(0); + self.split_clip(id, at).await?; + commands_executed.push("Split clip".to_string()); + } + } + } + } + + if lower_msg.contains("bigger") || lower_msg.contains("larger") { + if let Some(sel) = &selection { + if let Some(layer_id) = sel.get("layer_id").and_then(|v| v.as_str()) { + if let Ok(id) = Uuid::parse_str(layer_id) { + let layer = video_layers::table.find(id).first::(&mut self.get_conn()?)?; + self.update_layer( + id, + UpdateLayerRequest { + width: Some(layer.width * 1.2), + height: Some(layer.height * 1.2), + ..Default::default() + }, + ) + .await?; + commands_executed.push("Made layer bigger".to_string()); + } + } + } + } + + if lower_msg.contains("smaller") { + if let Some(sel) = &selection { + if let Some(layer_id) = sel.get("layer_id").and_then(|v| v.as_str()) { + if let Ok(id) = Uuid::parse_str(layer_id) { + let layer = video_layers::table.find(id).first::(&mut self.get_conn()?)?; + self.update_layer( + id, + UpdateLayerRequest { + width: Some(layer.width * 0.8), + height: Some(layer.height * 0.8), + ..Default::default() + }, + ) + .await?; + commands_executed.push("Made layer smaller".to_string()); + } + } + } + } + + let response_message = if commands_executed.is_empty() { + "I couldn't understand that command. Try: add text \"Hello\", delete, split, make it bigger/smaller".to_string() + } else { + commands_executed.join(", ") + }; + + let project_detail = self.get_project_detail(project_id).await.ok(); + + Ok(ChatEditResponse { + success: !commands_executed.is_empty(), + message: response_message, + commands_executed, + project: project_detail, + }) + } +} + +fn extract_quoted_text(message: &str) -> Option { + let chars: Vec = message.chars().collect(); + let mut start = None; + let mut end = None; + + for (i, c) in chars.iter().enumerate() { + if *c == '"' || *c == '\'' || *c == '"' || *c == '"' { + if start.is_none() { + start = Some(i + 1); + } else { + end = Some(i); + break; + } + } + } + + match (start, end) { + (Some(s), Some(e)) if e > s => Some(chars[s..e].iter().collect()), + _ => None, + } } diff --git a/src/video/handlers.rs b/src/video/handlers.rs index fd46df024..781bb34c1 100644 --- a/src/video/handlers.rs +++ b/src/video/handlers.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tracing::{error, info}; use uuid::Uuid; -use crate::security::error_sanitizer::sanitize_error; +use crate::security::error_sanitizer::SafeErrorResponse; use crate::shared::state::AppState; use super::engine::VideoEngine; @@ -27,7 +27,7 @@ pub async fn list_projects( error!("Failed to list video projects: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("list_projects") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -47,7 +47,7 @@ pub async fn create_project( error!("Failed to create video project: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("create_project") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -68,7 +68,7 @@ pub async fn get_project( error!("Failed to get video project: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("get_project") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -93,7 +93,7 @@ pub async fn update_project( error!("Failed to update video project: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("update_project") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -110,7 +110,7 @@ pub async fn delete_project( error!("Failed to delete video project: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("delete_project") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -127,7 +127,7 @@ pub async fn get_clips( error!("Failed to get clips: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("get_clips") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -145,7 +145,7 @@ pub async fn add_clip( error!("Failed to add clip: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("add_clip") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -167,7 +167,7 @@ pub async fn update_clip( error!("Failed to update clip: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("update_clip") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -184,7 +184,7 @@ pub async fn delete_clip( error!("Failed to delete clip: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("delete_clip") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -212,7 +212,7 @@ pub async fn split_clip_handler( error!("Failed to split clip: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("split_clip") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -229,7 +229,7 @@ pub async fn get_layers( error!("Failed to get layers: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("get_layers") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -250,7 +250,7 @@ pub async fn add_layer( error!("Failed to add layer: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("add_layer") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -272,7 +272,7 @@ pub async fn update_layer( error!("Failed to update layer: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("update_layer") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -289,7 +289,7 @@ pub async fn delete_layer( error!("Failed to delete layer: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("delete_layer") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -309,7 +309,7 @@ pub async fn get_audio_tracks( error!("Failed to get audio tracks: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("get_audio_tracks") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -330,7 +330,7 @@ pub async fn add_audio_track( error!("Failed to add audio track: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("add_audio_track") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -347,7 +347,7 @@ pub async fn delete_audio_track( error!("Failed to delete audio track: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("delete_audio_track") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -367,7 +367,7 @@ pub async fn get_keyframes( error!("Failed to get keyframes: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("get_keyframes") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -388,7 +388,7 @@ pub async fn add_keyframe( error!("Failed to add keyframe: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("add_keyframe") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -405,7 +405,7 @@ pub async fn delete_keyframe( error!("Failed to delete keyframe: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("delete_keyframe") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -550,7 +550,7 @@ pub async fn transcribe_handler( error!("Failed to transcribe: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("transcribe") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -603,7 +603,7 @@ pub async fn generate_captions_handler( error!("Failed to generate captions: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("generate_captions") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -670,7 +670,7 @@ pub async fn tts_handler( error!("TTS failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("tts") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -698,7 +698,7 @@ pub async fn detect_scenes_handler( error!("Scene detection failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("detect_scenes") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -758,7 +758,7 @@ pub async fn auto_reframe_handler( error!("Auto-reframe failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("auto_reframe") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -780,7 +780,7 @@ pub async fn remove_background_handler( error!("Background removal failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("remove_background") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -799,7 +799,7 @@ pub async fn enhance_video_handler( error!("Video enhancement failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("enhance_video") })), + Json(serde_json::json!(SafeErrorResponse::internal_error())), ) } } @@ -821,4 +821,207 @@ pub async fn beat_sync_handler( error!("Beat sync failed: {e}"); ( StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ "error": sanitize_error("beat_sync + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn generate_waveform_handler( + State(state): State>, + Path(project_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine + .generate_waveform(project_id, req.audio_track_id, req.samples_per_second) + .await + { + Ok(response) => (StatusCode::OK, Json(serde_json::json!(response))), + Err(e) => { + error!("Waveform generation failed: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn list_templates(State(_state): State>) -> impl IntoResponse { + let templates = vec![ + TemplateInfo { + id: "social-promo".to_string(), + name: "Social Promo".to_string(), + description: "Quick social media promotional video".to_string(), + thumbnail_url: "/video/templates/social-promo.jpg".to_string(), + duration_ms: 15000, + category: "social".to_string(), + }, + TemplateInfo { + id: "youtube-intro".to_string(), + name: "YouTube Intro".to_string(), + description: "Professional YouTube channel intro".to_string(), + thumbnail_url: "/video/templates/youtube-intro.jpg".to_string(), + duration_ms: 5000, + category: "intro".to_string(), + }, + TemplateInfo { + id: "talking-head".to_string(), + name: "Talking Head".to_string(), + description: "Interview or presentation style".to_string(), + thumbnail_url: "/video/templates/talking-head.jpg".to_string(), + duration_ms: 30000, + category: "presentation".to_string(), + }, + TemplateInfo { + id: "product-showcase".to_string(), + name: "Product Showcase".to_string(), + description: "E-commerce product highlight".to_string(), + thumbnail_url: "/video/templates/product-showcase.jpg".to_string(), + duration_ms: 20000, + category: "commercial".to_string(), + }, + ]; + + ( + StatusCode::OK, + Json(serde_json::json!({ "templates": templates })), + ) +} + +pub async fn apply_template_handler( + State(state): State>, + Path(project_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine + .apply_template(project_id, &req.template_id, req.customizations) + .await + { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ "success": true })), + ), + Err(e) => { + error!("Apply template failed: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn add_transition_handler( + State(state): State>, + Path((from_id, to_id)): Path<(Uuid, Uuid)>, + Json(req): Json, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine + .add_transition(from_id, to_id, &req.transition_type, req.duration_ms) + .await + { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ "success": true })), + ), + Err(e) => { + error!("Add transition failed: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn chat_edit( + State(state): State>, + Path(project_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine + .process_chat_command(project_id, &req.message, req.playhead_ms, req.selection) + .await + { + Ok(response) => (StatusCode::OK, Json(serde_json::json!(response))), + Err(e) => { + error!("Chat edit failed: {e}"); + ( + StatusCode::OK, + Json(serde_json::json!(ChatEditResponse { + success: false, + message: "Could not process that request".to_string(), + commands_executed: vec![], + project: None, + })), + ) + } + } +} + +pub async fn start_export( + State(state): State>, + Path(project_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine.start_export(project_id, req, state.cache.as_ref()).await { + Ok(export) => ( + StatusCode::OK, + Json(serde_json::json!({ "export": export })), + ), + Err(e) => { + error!("Start export failed: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn get_export_status( + State(state): State>, + Path(export_id): Path, +) -> impl IntoResponse { + let engine = VideoEngine::new(state.db.clone()); + + match engine.get_export_status(export_id).await { + Ok(export) => ( + StatusCode::OK, + Json(serde_json::json!(ExportStatusResponse { + id: export.id, + status: export.status, + progress: export.progress, + output_url: export.output_url, + gbdrive_path: export.gbdrive_path, + error_message: export.error_message, + })), + ), + Err(diesel::result::Error::NotFound) => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ "error": "Export not found" })), + ), + Err(e) => { + error!("Get export status failed: {e}"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!(SafeErrorResponse::internal_error())), + ) + } + } +} + +pub async fn video_ui() -> Html<&'static str> { + Html(include_str!("../../../botui/ui/suite/video/video.html")) +} diff --git a/src/video/mcp_tools.rs b/src/video/mcp_tools.rs new file mode 100644 index 000000000..2cc07efda --- /dev/null +++ b/src/video/mcp_tools.rs @@ -0,0 +1,608 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::shared::utils::DbPool; + +use super::engine::VideoEngine; +use super::models::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct McpToolResponse { + pub success: bool, + pub data: Option, + pub error: Option, +} + +impl McpToolResponse { + pub fn ok(data: T) -> Self { + Self { + success: true, + data: Some(data), + error: None, + } + } + + pub fn err(message: impl Into) -> Self { + Self { + success: false, + data: None, + error: Some(message.into()), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateVideoProjectInput { + pub name: String, + pub description: Option, + pub resolution_width: Option, + pub resolution_height: Option, + pub fps: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddVideoClipInput { + pub project_id: String, + pub source_url: String, + pub name: Option, + pub at_ms: Option, + pub duration_ms: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GenerateCaptionsInput { + pub project_id: String, + pub style: Option, + pub max_chars_per_line: Option, + pub font_size: Option, + pub color: Option, + pub with_background: Option, + pub language: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExportVideoInput { + pub project_id: String, + pub format: Option, + pub quality: Option, + pub save_to_library: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddTextOverlayInput { + pub project_id: String, + pub content: String, + pub at_ms: Option, + pub duration_ms: Option, + pub x: Option, + pub y: Option, + pub font_size: Option, + pub color: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddAudioTrackInput { + pub project_id: String, + pub source_url: String, + pub name: Option, + pub track_type: Option, + pub start_ms: Option, + pub volume: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateProjectOutput { + pub project_id: String, + pub name: String, + pub resolution: String, + pub fps: i32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddClipOutput { + pub clip_id: String, + pub project_id: String, + pub name: String, + pub start_ms: i64, + pub duration_ms: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GenerateCaptionsOutput { + pub project_id: String, + pub captions_count: usize, + pub total_duration_ms: i64, + pub language: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExportVideoOutput { + pub export_id: String, + pub project_id: String, + pub status: String, + pub format: String, + pub quality: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddTextOverlayOutput { + pub layer_id: String, + pub project_id: String, + pub content: String, + pub start_ms: i64, + pub end_ms: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddAudioTrackOutput { + pub track_id: String, + pub project_id: String, + pub name: String, + pub track_type: String, +} + +pub async fn create_video_project_tool( + db: DbPool, + input: CreateVideoProjectInput, +) -> McpToolResponse { + let engine = VideoEngine::new(db); + + let req = CreateProjectRequest { + name: input.name.clone(), + description: input.description, + resolution_width: input.resolution_width, + resolution_height: input.resolution_height, + fps: input.fps, + }; + + match engine.create_project(None, None, req).await { + Ok(project) => { + info!("MCP: Created video project {} ({})", project.name, project.id); + McpToolResponse::ok(CreateProjectOutput { + project_id: project.id.to_string(), + name: project.name, + resolution: format!("{}x{}", project.resolution_width, project.resolution_height), + fps: project.fps, + }) + } + Err(e) => { + error!("MCP: Failed to create video project: {e}"); + McpToolResponse::err(format!("Failed to create project: {e}")) + } + } +} + +pub async fn add_video_clip_tool( + db: DbPool, + input: AddVideoClipInput, +) -> McpToolResponse { + let project_id = match Uuid::parse_str(&input.project_id) { + Ok(id) => id, + Err(_) => return McpToolResponse::err("Invalid project_id format"), + }; + + let engine = VideoEngine::new(db); + + let req = AddClipRequest { + name: input.name, + source_url: input.source_url, + at_ms: input.at_ms, + duration_ms: input.duration_ms, + }; + + match engine.add_clip(project_id, req).await { + Ok(clip) => { + info!("MCP: Added clip {} to project {}", clip.id, project_id); + McpToolResponse::ok(AddClipOutput { + clip_id: clip.id.to_string(), + project_id: clip.project_id.to_string(), + name: clip.name, + start_ms: clip.start_ms, + duration_ms: clip.duration_ms, + }) + } + Err(e) => { + error!("MCP: Failed to add clip: {e}"); + McpToolResponse::err(format!("Failed to add clip: {e}")) + } + } +} + +pub async fn generate_captions_tool( + db: DbPool, + input: GenerateCaptionsInput, +) -> McpToolResponse { + let project_id = match Uuid::parse_str(&input.project_id) { + Ok(id) => id, + Err(_) => return McpToolResponse::err("Invalid project_id format"), + }; + + let engine = VideoEngine::new(db); + + let transcription = match engine + .transcribe_audio(project_id, None, input.language.clone()) + .await + { + Ok(t) => t, + Err(e) => { + error!("MCP: Transcription failed: {e}"); + return McpToolResponse::err(format!("Transcription failed: {e}")); + } + }; + + let style = input.style.as_deref().unwrap_or("default"); + let max_chars = input.max_chars_per_line.unwrap_or(40); + let font_size = input.font_size.unwrap_or(32); + let color = input.color.as_deref().unwrap_or("#FFFFFF"); + let with_bg = input.with_background.unwrap_or(true); + + match engine + .generate_captions_from_transcription( + project_id, + &transcription, + style, + max_chars, + font_size, + color, + with_bg, + ) + .await + { + Ok(layers) => { + info!( + "MCP: Generated {} captions for project {}", + layers.len(), + project_id + ); + McpToolResponse::ok(GenerateCaptionsOutput { + project_id: project_id.to_string(), + captions_count: layers.len(), + total_duration_ms: transcription.duration_ms, + language: transcription.language, + }) + } + Err(e) => { + error!("MCP: Failed to generate captions: {e}"); + McpToolResponse::err(format!("Failed to generate captions: {e}")) + } + } +} + +pub async fn export_video_tool( + db: DbPool, + cache: Option>, + input: ExportVideoInput, +) -> McpToolResponse { + let project_id = match Uuid::parse_str(&input.project_id) { + Ok(id) => id, + Err(_) => return McpToolResponse::err("Invalid project_id format"), + }; + + let engine = VideoEngine::new(db); + + let format = input.format.clone().unwrap_or_else(|| "mp4".to_string()); + let quality = input.quality.clone().unwrap_or_else(|| "high".to_string()); + + let req = ExportRequest { + format: Some(format.clone()), + quality: Some(quality.clone()), + save_to_library: input.save_to_library, + }; + + match engine.start_export(project_id, req, cache.as_ref()).await { + Ok(export) => { + info!( + "MCP: Started export {} for project {}", + export.id, project_id + ); + McpToolResponse::ok(ExportVideoOutput { + export_id: export.id.to_string(), + project_id: export.project_id.to_string(), + status: export.status, + format, + quality, + }) + } + Err(e) => { + error!("MCP: Failed to start export: {e}"); + McpToolResponse::err(format!("Failed to start export: {e}")) + } + } +} + +pub async fn add_text_overlay_tool( + db: DbPool, + input: AddTextOverlayInput, +) -> McpToolResponse { + let project_id = match Uuid::parse_str(&input.project_id) { + Ok(id) => id, + Err(_) => return McpToolResponse::err("Invalid project_id format"), + }; + + let engine = VideoEngine::new(db); + + let start_ms = input.at_ms.unwrap_or(0); + let duration_ms = input.duration_ms.unwrap_or(5000); + let end_ms = start_ms + duration_ms; + + let req = AddLayerRequest { + name: Some("Text".to_string()), + layer_type: "text".to_string(), + start_ms: Some(start_ms), + end_ms: Some(end_ms), + x: input.x.or(Some(0.5)), + y: input.y.or(Some(0.9)), + width: Some(0.8), + height: Some(0.1), + properties: Some(serde_json::json!({ + "content": input.content, + "font_family": "Arial", + "font_size": input.font_size.unwrap_or(48), + "color": input.color.unwrap_or_else(|| "#FFFFFF".to_string()), + "text_align": "center", + })), + }; + + match engine.add_layer(project_id, req).await { + Ok(layer) => { + info!("MCP: Added text overlay {} to project {}", layer.id, project_id); + McpToolResponse::ok(AddTextOverlayOutput { + layer_id: layer.id.to_string(), + project_id: layer.project_id.to_string(), + content: input.content, + start_ms: layer.start_ms, + end_ms: layer.end_ms, + }) + } + Err(e) => { + error!("MCP: Failed to add text overlay: {e}"); + McpToolResponse::err(format!("Failed to add text overlay: {e}")) + } + } +} + +pub async fn add_audio_track_tool( + db: DbPool, + input: AddAudioTrackInput, +) -> McpToolResponse { + let project_id = match Uuid::parse_str(&input.project_id) { + Ok(id) => id, + Err(_) => return McpToolResponse::err("Invalid project_id format"), + }; + + let engine = VideoEngine::new(db); + + let track_type = input.track_type.clone().unwrap_or_else(|| "music".to_string()); + + let req = AddAudioRequest { + name: input.name, + source_url: input.source_url, + track_type: Some(track_type.clone()), + start_ms: input.start_ms, + duration_ms: None, + volume: input.volume, + }; + + match engine.add_audio_track(project_id, req).await { + Ok(track) => { + info!("MCP: Added audio track {} to project {}", track.id, project_id); + McpToolResponse::ok(AddAudioTrackOutput { + track_id: track.id.to_string(), + project_id: track.project_id.to_string(), + name: track.name, + track_type, + }) + } + Err(e) => { + error!("MCP: Failed to add audio track: {e}"); + McpToolResponse::err(format!("Failed to add audio track: {e}")) + } + } +} + +pub fn get_tool_definitions() -> Vec { + vec![ + serde_json::json!({ + "name": "create_video_project", + "description": "Create a new video editing project", + "input_schema": { + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Name of the video project" + }, + "description": { + "type": "string", + "description": "Optional description of the project" + }, + "resolution_width": { + "type": "integer", + "description": "Video width in pixels (default: 1920)" + }, + "resolution_height": { + "type": "integer", + "description": "Video height in pixels (default: 1080)" + }, + "fps": { + "type": "integer", + "description": "Frames per second (default: 30)" + } + }, + "required": ["name"] + } + }), + serde_json::json!({ + "name": "add_video_clip", + "description": "Add a video clip to an existing project", + "input_schema": { + "type": "object", + "properties": { + "project_id": { + "type": "string", + "description": "UUID of the project" + }, + "source_url": { + "type": "string", + "description": "URL or path to the video file" + }, + "name": { + "type": "string", + "description": "Optional name for the clip" + }, + "at_ms": { + "type": "integer", + "description": "Position in timeline (milliseconds)" + }, + "duration_ms": { + "type": "integer", + "description": "Duration of the clip (milliseconds)" + } + }, + "required": ["project_id", "source_url"] + } + }), + serde_json::json!({ + "name": "generate_captions", + "description": "Generate captions from audio transcription using AI", + "input_schema": { + "type": "object", + "properties": { + "project_id": { + "type": "string", + "description": "UUID of the project" + }, + "style": { + "type": "string", + "description": "Caption style (default, bold, minimal)" + }, + "max_chars_per_line": { + "type": "integer", + "description": "Maximum characters per caption line" + }, + "font_size": { + "type": "integer", + "description": "Font size for captions" + }, + "color": { + "type": "string", + "description": "Text color (hex format)" + }, + "with_background": { + "type": "boolean", + "description": "Add background box behind captions" + }, + "language": { + "type": "string", + "description": "Language code for transcription" + } + }, + "required": ["project_id"] + } + }), + serde_json::json!({ + "name": "export_video", + "description": "Export a video project to a file, optionally saving to .gbdrive library", + "input_schema": { + "type": "object", + "properties": { + "project_id": { + "type": "string", + "description": "UUID of the project" + }, + "format": { + "type": "string", + "description": "Output format (mp4, webm, mov)" + }, + "quality": { + "type": "string", + "description": "Quality preset (low, medium, high, 4k)" + }, + "save_to_library": { + "type": "boolean", + "description": "Save to .gbdrive/videos library (default: true)" + } + }, + "required": ["project_id"] + } + }), + serde_json::json!({ + "name": "add_text_overlay", + "description": "Add a text overlay to a video project", + "input_schema": { + "type": "object", + "properties": { + "project_id": { + "type": "string", + "description": "UUID of the project" + }, + "content": { + "type": "string", + "description": "Text content to display" + }, + "at_ms": { + "type": "integer", + "description": "Start time in milliseconds" + }, + "duration_ms": { + "type": "integer", + "description": "Duration to display (milliseconds)" + }, + "x": { + "type": "number", + "description": "Horizontal position (0.0 to 1.0)" + }, + "y": { + "type": "number", + "description": "Vertical position (0.0 to 1.0)" + }, + "font_size": { + "type": "integer", + "description": "Font size in pixels" + }, + "color": { + "type": "string", + "description": "Text color (hex format)" + } + }, + "required": ["project_id", "content"] + } + }), + serde_json::json!({ + "name": "add_audio_track", + "description": "Add an audio track to a video project", + "input_schema": { + "type": "object", + "properties": { + "project_id": { + "type": "string", + "description": "UUID of the project" + }, + "source_url": { + "type": "string", + "description": "URL or path to the audio file" + }, + "name": { + "type": "string", + "description": "Optional name for the track" + }, + "track_type": { + "type": "string", + "description": "Type of track (music, narration, sound_effect)" + }, + "start_ms": { + "type": "integer", + "description": "Start time in timeline (milliseconds)" + }, + "volume": { + "type": "number", + "description": "Volume level (0.0 to 1.0)" + } + }, + "required": ["project_id", "source_url"] + } + }), + ] +} diff --git a/src/video/mod.rs b/src/video/mod.rs index 37f8b22eb..7f08e9b01 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -1,16 +1,107 @@ -use axum::{routing::get, Router}; +mod analytics; +mod engine; +mod handlers; +mod models; +mod render; +mod schema; +mod websocket; + +pub mod mcp_tools; + +pub use analytics::{get_analytics_handler, record_view_handler, AnalyticsEngine}; +pub use engine::VideoEngine; +pub use handlers::*; +pub use models::*; +pub use render::{start_render_worker, start_render_worker_with_broadcaster, VideoRenderWorker}; +pub use schema::*; +pub use websocket::{broadcast_export_progress, export_progress_websocket, ExportProgressBroadcaster}; + +use axum::{ + routing::{delete, get, post, put}, + Router, +}; use std::sync::Arc; use crate::shared::state::AppState; pub fn configure_video_routes() -> Router> { - Router::new().route("/video", get(video_ui)) + Router::new() + .route("/api/video/projects", get(list_projects).post(create_project)) + .route( + "/api/video/projects/:id", + get(get_project).put(update_project).delete(delete_project), + ) + .route( + "/api/video/projects/:id/clips", + get(get_clips).post(add_clip), + ) + .route("/api/video/clips/:id", put(update_clip).delete(delete_clip)) + .route("/api/video/clips/:id/split", post(split_clip_handler)) + .route( + "/api/video/projects/:id/layers", + get(get_layers).post(add_layer), + ) + .route( + "/api/video/layers/:id", + put(update_layer).delete(delete_layer), + ) + .route( + "/api/video/projects/:id/audio", + get(get_audio_tracks).post(add_audio_track), + ) + .route("/api/video/audio/:id", delete(delete_audio_track)) + .route("/api/video/projects/:id/upload", post(upload_media)) + .route("/api/video/projects/:id/preview", get(get_preview_frame)) + .route( + "/api/video/projects/:id/transcribe", + post(transcribe_handler), + ) + .route( + "/api/video/projects/:id/captions", + post(generate_captions_handler), + ) + .route("/api/video/projects/:id/tts", post(tts_handler)) + .route("/api/video/projects/:id/scenes", post(detect_scenes_handler)) + .route("/api/video/projects/:id/reframe", post(auto_reframe_handler)) + .route( + "/api/video/projects/:id/remove-background", + post(remove_background_handler), + ) + .route("/api/video/projects/:id/enhance", post(enhance_video_handler)) + .route( + "/api/video/projects/:id/beat-sync", + post(beat_sync_handler), + ) + .route( + "/api/video/projects/:id/waveform", + post(generate_waveform_handler), + ) + .route( + "/api/video/layers/:id/keyframes", + get(get_keyframes).post(add_keyframe), + ) + .route("/api/video/keyframes/:id", delete(delete_keyframe)) + .route("/api/video/templates", get(list_templates)) + .route( + "/api/video/projects/:id/template", + post(apply_template_handler), + ) + .route( + "/api/video/clips/:from_id/transition/:to_id", + post(add_transition_handler), + ) + .route("/api/video/projects/:id/chat", post(chat_edit)) + .route("/api/video/projects/:id/export", post(start_export)) + .route("/api/video/exports/:id/status", get(get_export_status)) + .route( + "/api/video/projects/:id/analytics", + get(get_analytics_handler), + ) + .route("/api/video/analytics/view", post(record_view_handler)) + .route("/api/video/ws/export/:id", get(export_progress_websocket)) + .route("/video", get(video_ui)) } pub fn configure(router: Router>) -> Router> { router.merge(configure_video_routes()) } - -async fn video_ui() -> &'static str { - "Video module" -} diff --git a/src/video/render.rs b/src/video/render.rs new file mode 100644 index 000000000..c7b5be3ff --- /dev/null +++ b/src/video/render.rs @@ -0,0 +1,465 @@ +use chrono::Utc; +use diesel::prelude::*; +use std::sync::Arc; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use crate::security::command_guard::SafeCommand; +use crate::shared::utils::DbPool; + +use super::models::*; +use super::schema::*; +use super::websocket::{broadcast_export_progress, ExportProgressBroadcaster}; + +pub struct VideoRenderWorker { + db: DbPool, + cache: Arc, + output_dir: String, + broadcaster: Option>, +} + +impl VideoRenderWorker { + pub fn new(db: DbPool, cache: Arc, output_dir: String) -> Self { + Self { + db, + cache, + output_dir, + broadcaster: None, + } + } + + pub fn with_broadcaster( + db: DbPool, + cache: Arc, + output_dir: String, + broadcaster: Arc, + ) -> Self { + Self { + db, + cache, + output_dir, + broadcaster: Some(broadcaster), + } + } + + pub async fn start(self) { + info!("Starting video render worker"); + tokio::spawn(async move { + self.run_worker_loop().await; + }); + } + + pub async fn run_worker_loop(&self) { + loop { + match self.process_next_job().await { + Ok(true) => continue, + Ok(false) => { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + Err(e) => { + error!("Worker error: {e}"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + } + } + } + } + + async fn process_next_job(&self) -> Result> { + let mut conn = self.cache.get_connection()?; + + let job_json: Option = redis::cmd("RPOP") + .arg("video:export:queue") + .query(&mut conn)?; + + let job_str = match job_json { + Some(j) => j, + None => return Ok(false), + }; + + let job: serde_json::Value = serde_json::from_str(&job_str)?; + let export_id = Uuid::parse_str(job["export_id"].as_str().unwrap_or_default())?; + let project_id = Uuid::parse_str(job["project_id"].as_str().unwrap_or_default())?; + let format = job["format"].as_str().unwrap_or("mp4"); + let quality = job["quality"].as_str().unwrap_or("high"); + let save_to_library = job["save_to_library"].as_bool().unwrap_or(true); + let bot_name = job["bot_name"].as_str().map(|s| s.to_string()); + + info!("Processing export job: {export_id}"); + + self.update_progress(export_id, project_id, 10, "processing", None, None) + .await?; + + match self + .render_video(project_id, export_id, format, quality) + .await + { + Ok(output_url) => { + let gbdrive_path = if save_to_library { + self.save_to_gbdrive(&output_url, project_id, export_id, format, bot_name.as_deref()) + .await + .ok() + } else { + None + }; + + self.update_progress( + export_id, + project_id, + 100, + "completed", + Some(output_url), + gbdrive_path, + ) + .await?; + info!("Export {export_id} completed"); + } + Err(e) => { + let error_msg = format!("Render failed: {e}"); + self.update_progress(export_id, project_id, 0, "failed", None, None) + .await?; + error!("Export {export_id} failed: {error_msg}"); + self.set_export_error(export_id, &error_msg).await?; + } + } + + Ok(true) + } + + async fn update_progress( + &self, + export_id: Uuid, + project_id: Uuid, + progress: i32, + status: &str, + output_url: Option, + gbdrive_path: Option, + ) -> Result<(), Box> { + let mut db_conn = self.db.get()?; + + let completed_at = if status == "completed" || status == "failed" { + Some(Utc::now()) + } else { + None + }; + + diesel::update(video_exports::table.find(export_id)) + .set(( + video_exports::progress.eq(progress), + video_exports::status.eq(status), + video_exports::output_url.eq(&output_url), + video_exports::gbdrive_path.eq(&gbdrive_path), + video_exports::completed_at.eq(completed_at), + )) + .execute(&mut db_conn)?; + + if status == "completed" || status == "failed" { + let new_status = if status == "completed" { + "published" + } else { + "draft" + }; + diesel::update(video_projects::table.find(project_id)) + .set(video_projects::status.eq(new_status)) + .execute(&mut db_conn)?; + } + + if let Some(broadcaster) = &self.broadcaster { + broadcast_export_progress( + broadcaster, + export_id, + project_id, + status, + progress, + Some(format!("Export {progress}%")), + output_url, + gbdrive_path, + ); + } + + Ok(()) + } + + async fn set_export_error( + &self, + export_id: Uuid, + error_message: &str, + ) -> Result<(), Box> { + let mut db_conn = self.db.get()?; + + diesel::update(video_exports::table.find(export_id)) + .set(video_exports::error_message.eq(Some(error_message))) + .execute(&mut db_conn)?; + + Ok(()) + } + + async fn render_video( + &self, + project_id: Uuid, + export_id: Uuid, + format: &str, + quality: &str, + ) -> Result> { + let mut db_conn = self.db.get()?; + + let project: VideoProject = video_projects::table.find(project_id).first(&mut db_conn)?; + + let clips: Vec = video_clips::table + .filter(video_clips::project_id.eq(project_id)) + .order(video_clips::clip_order.asc()) + .load(&mut db_conn)?; + + let layers: Vec = video_layers::table + .filter(video_layers::project_id.eq(project_id)) + .order(video_layers::track_index.asc()) + .load(&mut db_conn)?; + + if clips.is_empty() { + return Err("No clips in project".into()); + } + + std::fs::create_dir_all(&self.output_dir)?; + + let output_filename = format!("{export_id}.{format}"); + let output_path = format!("{}/{output_filename}", self.output_dir); + + let resolution = match quality { + "4k" => "3840x2160", + "high" => "1920x1080", + "medium" => "1280x720", + "low" => "854x480", + _ => "1920x1080", + }; + + let bitrate = match quality { + "4k" => "20M", + "high" => "8M", + "medium" => "4M", + "low" => "2M", + _ => "8M", + }; + + let filter_complex = self.build_filter_complex(&clips, &layers, &project, resolution); + + let mut cmd = SafeCommand::new("ffmpeg") + .map_err(|e| format!("Failed to create command: {e}"))?; + + cmd.arg("-y").map_err(|e| format!("Arg error: {e}"))?; + + for clip in &clips { + cmd.arg("-i").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg(&clip.source_url).map_err(|e| format!("Arg error: {e}"))?; + } + + if !filter_complex.is_empty() { + cmd.arg("-filter_complex").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg(&filter_complex).map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-map").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("[outv]").map_err(|e| format!("Arg error: {e}"))?; + + if clips.len() == 1 { + cmd.arg("-map").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("0:a?").map_err(|e| format!("Arg error: {e}"))?; + } + } + + cmd.arg("-c:v").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("libx264").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-preset").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("medium").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-b:v").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg(bitrate).map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-c:a").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("aac").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-b:a").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("192k").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("-movflags").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg("+faststart").map_err(|e| format!("Arg error: {e}"))?; + cmd.arg(&output_path).map_err(|e| format!("Arg error: {e}"))?; + + info!("Running FFmpeg render for export {export_id}"); + + let result = cmd.execute().map_err(|e| format!("Execution failed: {e}"))?; + + if !result.success { + warn!("FFmpeg stderr: {}", result.stderr); + return Err(format!("FFmpeg failed: {}", result.stderr).into()); + } + + let output_url = format!("/video/exports/{output_filename}"); + Ok(output_url) + } + + fn build_filter_complex( + &self, + clips: &[VideoClip], + layers: &[VideoLayer], + project: &VideoProject, + resolution: &str, + ) -> String { + let mut filters = Vec::new(); + let mut inputs = Vec::new(); + + for (i, clip) in clips.iter().enumerate() { + let trim_start = clip.trim_in_ms as f64 / 1000.0; + let trim_end = (clip.duration_ms - clip.trim_out_ms) as f64 / 1000.0; + + filters.push(format!( + "[{i}:v]trim=start={trim_start}:end={trim_end},setpts=PTS-STARTPTS,scale={resolution}:force_original_aspect_ratio=decrease,pad={resolution}:(ow-iw)/2:(oh-ih)/2[v{i}]" + )); + inputs.push(format!("[v{i}]")); + } + + if clips.len() > 1 { + let concat_inputs = inputs.join(""); + filters.push(format!( + "{concat_inputs}concat=n={}:v=1:a=0[outv]", + clips.len() + )); + } else if !inputs.is_empty() { + filters.push(format!("{}copy[outv]", inputs[0])); + } + + for layer in layers { + if layer.layer_type == "text" { + if let Some(content) = layer + .properties_json + .get("content") + .and_then(|c| c.as_str()) + { + let font_size = layer + .properties_json + .get("font_size") + .and_then(|s| s.as_i64()) + .unwrap_or(48); + let color = layer + .properties_json + .get("color") + .and_then(|c| c.as_str()) + .unwrap_or("white"); + + let x = (layer.x * project.resolution_width as f32) as i32; + let y = (layer.y * project.resolution_height as f32) as i32; + + let escaped_content = content + .replace('\'', "'\\''") + .replace(':', "\\:") + .replace('\\', "\\\\"); + + filters.push(format!( + "[outv]drawtext=text='{}':fontsize={}:fontcolor={}:x={}:y={}:enable='between(t,{},{})':alpha={}[outv]", + escaped_content, + font_size, + color.trim_start_matches('#'), + x, + y, + layer.start_ms as f64 / 1000.0, + layer.end_ms as f64 / 1000.0, + layer.opacity + )); + } + } + } + + if filters.is_empty() { + return String::new(); + } + + filters.join(";") + } + + async fn save_to_gbdrive( + &self, + output_url: &str, + project_id: Uuid, + export_id: Uuid, + format: &str, + bot_name: Option<&str>, + ) -> Result> { + let mut db_conn = self.db.get()?; + + let project: VideoProject = video_projects::table.find(project_id).first(&mut db_conn)?; + + let safe_name: String = project + .name + .chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c + } else { + '_' + } + }) + .collect(); + + let timestamp = Utc::now().format("%Y%m%d_%H%M%S"); + let filename = format!("{safe_name}_{timestamp}.{format}"); + let gbdrive_path = format!("videos/{filename}"); + + let source_path = format!( + "{}/{}", + self.output_dir, + output_url.trim_start_matches("/video/exports/") + ); + + if std::env::var("S3_ENDPOINT").is_ok() { + let bot = bot_name.unwrap_or("default"); + let bucket = format!("{bot}.gbai"); + let key = format!("{bot}.gbdrive/{gbdrive_path}"); + + info!("Uploading video to S3: s3://{bucket}/{key}"); + + let file_data = std::fs::read(&source_path)?; + + let s3_config = aws_config::from_env().load().await; + let s3_client = aws_sdk_s3::Client::new(&s3_config); + + s3_client + .put_object() + .bucket(&bucket) + .key(&key) + .content_type(format!("video/{format}")) + .body(file_data.into()) + .send() + .await + .map_err(|e| format!("S3 upload failed: {e}"))?; + + info!("Video saved to .gbdrive: {gbdrive_path}"); + } else { + let gbdrive_dir = std::env::var("GBDRIVE_DIR").unwrap_or_else(|_| "./.gbdrive".to_string()); + let videos_dir = format!("{gbdrive_dir}/videos"); + + std::fs::create_dir_all(&videos_dir)?; + + let dest_path = format!("{videos_dir}/{filename}"); + std::fs::copy(&source_path, &dest_path)?; + + info!("Video saved to local .gbdrive: {gbdrive_path}"); + } + + diesel::update(video_exports::table.find(export_id)) + .set(video_exports::gbdrive_path.eq(Some(&gbdrive_path))) + .execute(&mut db_conn)?; + + Ok(gbdrive_path) + } +} + +pub fn start_render_worker(db: DbPool, cache: Arc, output_dir: String) { + let worker = VideoRenderWorker::new(db, cache, output_dir); + tokio::spawn(async move { + worker.run_worker_loop().await; + }); +} + +pub fn start_render_worker_with_broadcaster( + db: DbPool, + cache: Arc, + output_dir: String, + broadcaster: Arc, +) { + let worker = VideoRenderWorker::with_broadcaster(db, cache, output_dir, broadcaster); + tokio::spawn(async move { + worker.run_worker_loop().await; + }); +} diff --git a/src/video/websocket.rs b/src/video/websocket.rs new file mode 100644 index 000000000..546a1e8e3 --- /dev/null +++ b/src/video/websocket.rs @@ -0,0 +1,200 @@ +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, State, + }, + response::IntoResponse, +}; +use futures::{SinkExt, StreamExt}; +use std::sync::Arc; +use tokio::sync::broadcast; +use tracing::{error, info, warn}; +use uuid::Uuid; + +use crate::shared::state::AppState; + +use super::models::ExportProgressEvent; + +pub struct ExportProgressBroadcaster { + tx: broadcast::Sender, +} + +impl ExportProgressBroadcaster { + pub fn new() -> Self { + let (tx, _) = broadcast::channel(100); + Self { tx } + } + + pub fn sender(&self) -> broadcast::Sender { + self.tx.clone() + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub fn send(&self, event: ExportProgressEvent) { + if let Err(e) = self.tx.send(event) { + warn!("No active WebSocket listeners: {e}"); + } + } +} + +impl Default for ExportProgressBroadcaster { + fn default() -> Self { + Self::new() + } +} + +pub async fn export_progress_websocket( + ws: WebSocketUpgrade, + State(state): State>, + Path(export_id): Path, +) -> impl IntoResponse { + info!("WebSocket connection request for export: {export_id}"); + ws.on_upgrade(move |socket| handle_export_websocket(socket, state, export_id)) +} + +async fn handle_export_websocket(socket: WebSocket, state: Arc, export_id: Uuid) { + let (mut sender, mut receiver) = socket.split(); + + info!("WebSocket connected for export: {export_id}"); + + let welcome = serde_json::json!({ + "type": "connected", + "export_id": export_id.to_string(), + "message": "Connected to export progress stream", + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + if let Err(e) = sender + .send(Message::Text(welcome.to_string().into())) + .await + { + error!("Failed to send welcome message: {e}"); + return; + } + + let mut progress_rx = if let Some(broadcaster) = state.video_progress_broadcaster.as_ref() { + broadcaster.subscribe() + } else { + let (tx, rx) = broadcast::channel(1); + drop(tx); + rx + }; + + let export_id_for_recv = export_id; + + let recv_task = tokio::spawn(async move { + while let Some(msg) = receiver.next().await { + match msg { + Ok(Message::Close(_)) => { + info!("WebSocket close requested for export: {export_id_for_recv}"); + break; + } + Ok(Message::Ping(_)) => { + info!("Received ping for export: {export_id_for_recv}"); + } + Ok(Message::Text(text)) => { + if let Ok(json) = serde_json::from_str::(&text) { + if json.get("type").and_then(|v| v.as_str()) == Some("ping") { + info!("Client ping received"); + } + } + } + Err(e) => { + error!("WebSocket receive error: {e}"); + break; + } + _ => {} + } + } + }); + + loop { + tokio::select! { + result = progress_rx.recv() => { + match result { + Ok(event) => { + if event.export_id == export_id { + let json = serde_json::json!({ + "type": "progress", + "export_id": event.export_id.to_string(), + "project_id": event.project_id.to_string(), + "status": event.status, + "progress": event.progress, + "message": event.message, + "output_url": event.output_url, + "gbdrive_path": event.gbdrive_path, + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + if let Err(e) = sender.send(Message::Text(json.to_string().into())).await { + error!("Failed to send progress update: {e}"); + break; + } + + if event.status == "completed" || event.status == "failed" { + let final_msg = serde_json::json!({ + "type": "finished", + "export_id": event.export_id.to_string(), + "status": event.status, + "output_url": event.output_url, + "gbdrive_path": event.gbdrive_path + }); + + let _ = sender.send(Message::Text(final_msg.to_string().into())).await; + break; + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("WebSocket lagged behind by {n} messages"); + } + Err(broadcast::error::RecvError::Closed) => { + info!("Progress broadcast channel closed"); + break; + } + } + } + + _ = tokio::time::sleep(tokio::time::Duration::from_secs(30)) => { + let heartbeat = serde_json::json!({ + "type": "heartbeat", + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + if let Err(e) = sender.send(Message::Text(heartbeat.to_string().into())).await { + error!("Failed to send heartbeat: {e}"); + break; + } + } + } + } + + recv_task.abort(); + info!("WebSocket disconnected for export: {export_id}"); +} + +pub fn broadcast_export_progress( + broadcaster: &ExportProgressBroadcaster, + export_id: Uuid, + project_id: Uuid, + status: &str, + progress: i32, + message: Option, + output_url: Option, + gbdrive_path: Option, +) { + let event = ExportProgressEvent { + export_id, + project_id, + status: status.to_string(), + progress, + message, + output_url, + gbdrive_path, + }; + + broadcaster.send(event); +}