diff --git a/Cargo.toml b/Cargo.toml index b24b72a56..442d5204d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ dotenvy = "0.15" env_logger = "0.11" futures = "0.3" futures-util = "0.3" +tokio-util = { version = "0.7", features = ["io", "compat"] } hex = "0.4" hmac = "0.12.1" hyper = { version = "1.4", features = ["full"] } @@ -241,6 +242,8 @@ rss = "2.0" # HTML parsing/web scraping scraper = "0.25" walkdir = "2.5.0" +hyper-util = { version = "0.1.19", features = ["client-legacy", "tokio"] } +http-body-util = "0.1.3" [dev-dependencies] mockito = "1.7.0" diff --git a/src/auto_task/app_generator.rs b/src/auto_task/app_generator.rs index 3d63e59f2..af8e0046c 100644 --- a/src/auto_task/app_generator.rs +++ b/src/auto_task/app_generator.rs @@ -13,6 +13,7 @@ use diesel::sql_query; use log::{error, info, trace, warn}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tokio::sync::mpsc; use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -105,42 +106,50 @@ pub struct SyncResult { pub migrations_applied: usize, } -#[derive(Debug, Clone, Deserialize)] +/// Streaming format parsed app structure +#[derive(Debug, Clone, Default)] struct LlmGeneratedApp { name: String, description: String, - #[serde(default)] - _domain: String, + domain: String, tables: Vec, files: Vec, - tools: Option>, - schedulers: Option>, + tools: Vec, + schedulers: Vec, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Default)] struct LlmTable { name: String, fields: Vec, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Default)] struct LlmField { name: String, - #[serde(rename = "type")] field_type: String, - nullable: Option, + nullable: bool, reference: Option, default: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Default)] struct LlmFile { filename: String, content: String, - #[serde(rename = "type", default)] - _file_type: Option, } +/// Streaming delimiter constants +const DELIM_APP_START: &str = "<<>>"; +const DELIM_APP_END: &str = "<<>>"; +const DELIM_TABLES_START: &str = "<<>>"; +const DELIM_TABLES_END: &str = "<<>>"; +const DELIM_TABLE_PREFIX: &str = "<<, task_id: Option, @@ -253,13 +262,13 @@ impl AppGenerator { activity ); - info!("[APP_GENERATOR] Calling generate_complete_app_with_llm for intent: {}", &intent[..intent.len().min(50)]); + trace!("APP_GENERATOR Calling LLM for intent: {}", &intent[..intent.len().min(50)]); let llm_start = std::time::Instant::now(); let llm_app = match self.generate_complete_app_with_llm(intent, session.bot_id).await { Ok(app) => { let llm_elapsed = llm_start.elapsed(); - info!("[APP_GENERATOR] LLM generation completed in {:?}: app={}, files={}, tables={}", + info!("APP_GENERATOR LLM completed in {:?}: app={}, files={}, tables={}", llm_elapsed, app.name, app.files.len(), app.tables.len()); log_generator_info( &app.name, @@ -286,7 +295,7 @@ impl AppGenerator { } Err(e) => { let llm_elapsed = llm_start.elapsed(); - error!("[APP_GENERATOR] LLM generation failed after {:?}: {}", llm_elapsed, e); + error!("APP_GENERATOR LLM failed after {:?}: {}", llm_elapsed, e); log_generator_error("unknown", "LLM app generation failed", &e.to_string()); if let Some(ref task_id) = self.task_id { self.state.emit_task_error(task_id, "llm_request", &e.to_string()); @@ -356,8 +365,12 @@ impl AppGenerator { } let bot_name = self.get_bot_name(session.bot_id)?; - let bucket_name = format!("{}.gbai", bot_name.to_lowercase()); - let drive_app_path = format!(".gbdrive/apps/{}", llm_app.name); + // Sanitize bucket name - replace spaces and invalid characters + let sanitized_name = bot_name.to_lowercase().replace(' ', "-").replace('_', "-"); + let bucket_name = format!("{}.gbai", sanitized_name); + let drive_app_path = format!("{}.gbapp/{}", sanitized_name, llm_app.name); + + info!("Writing app files to bucket: {}, path: {}", bucket_name, drive_app_path); let total_files = llm_app.files.len(); let activity = self.build_activity("writing", 0, Some(total_files as u32), Some("Preparing files")); @@ -390,6 +403,7 @@ impl AppGenerator { activity ); + // Write to MinIO - drive monitor will sync to SITES_ROOT if let Err(e) = self .write_to_drive(&bucket_name, &drive_path, &file.content) .await @@ -415,6 +429,8 @@ impl AppGenerator { let designer_js = Self::generate_designer_js(&llm_app.name); self.bytes_generated += designer_js.len() as u64; + + // Write designer.js to MinIO self.write_to_drive( &bucket_name, &format!("{}/designer.js", drive_app_path), @@ -423,8 +439,8 @@ impl AppGenerator { .await?; let mut tools = Vec::new(); - if let Some(llm_tools) = &llm_app.tools { - let tools_count = llm_tools.len(); + if !llm_app.tools.is_empty() { + let tools_count = llm_app.tools.len(); let activity = self.build_activity("tools", 0, Some(tools_count as u32), Some("Creating BASIC tools")); self.emit_activity( "write_tools", @@ -434,7 +450,7 @@ impl AppGenerator { activity ); - for (idx, tool) in llm_tools.iter().enumerate() { + for (idx, tool) in llm_app.tools.iter().enumerate() { let tool_path = format!(".gbdialog/tools/{}", tool.filename); self.files_written.push(format!("tools/{}", tool.filename)); self.bytes_generated += tool.content.len() as u64; @@ -461,8 +477,8 @@ impl AppGenerator { } let mut schedulers = Vec::new(); - if let Some(llm_schedulers) = &llm_app.schedulers { - let sched_count = llm_schedulers.len(); + if !llm_app.schedulers.is_empty() { + let sched_count = llm_app.schedulers.len(); let activity = self.build_activity("schedulers", 0, Some(sched_count as u32), Some("Creating schedulers")); self.emit_activity( "write_schedulers", @@ -472,7 +488,7 @@ impl AppGenerator { activity ); - for (idx, scheduler) in llm_schedulers.iter().enumerate() { + for (idx, scheduler) in llm_app.schedulers.iter().enumerate() { let scheduler_path = format!(".gbdialog/schedulers/{}", scheduler.filename); self.files_written.push(format!("schedulers/{}", scheduler.filename)); self.bytes_generated += scheduler.content.len() as u64; @@ -498,11 +514,8 @@ impl AppGenerator { } } - let activity = self.build_activity("syncing", TOTAL_STEPS as u32 - 1, Some(TOTAL_STEPS as u32), Some("Deploying to site")); - self.emit_activity("sync_site", "Syncing app to site...", 8, TOTAL_STEPS, activity); - - self.sync_app_to_site_root(&bucket_name, &llm_app.name, session.bot_id) - .await?; + let activity = self.build_activity("complete", TOTAL_STEPS as u32, Some(TOTAL_STEPS as u32), Some("App ready")); + self.emit_activity("complete", "App written to drive, ready to serve from MinIO", 8, TOTAL_STEPS, activity); let elapsed = self.generation_start.map(|s| s.elapsed().as_secs()).unwrap_or(0); @@ -679,55 +692,279 @@ If user says "inventory" → build stock tracking with products, categories, mov If user says "booking" → build appointment scheduler with calendar, slots, confirmations If user says ANYTHING → interpret creatively and BUILD SOMETHING AWESOME -Respond with a single JSON object: -{{ - "name": "app-name-lowercase-dashes", - "description": "What this app does", - "domain": "healthcare|sales|inventory|booking|utility|etc", - "tables": [ - {{ - "name": "table_name", - "fields": [ - {{"name": "id", "type": "guid", "nullable": false}}, - {{"name": "created_at", "type": "datetime", "nullable": false, "default": "now()"}}, - {{"name": "updated_at", "type": "datetime", "nullable": false, "default": "now()"}}, - {{"name": "field_name", "type": "string", "nullable": true, "reference": null}} - ] - }} - ], - "files": [ - {{"filename": "index.html", "content": "...complete HTML..."}}, - {{"filename": "styles.css", "content": ":root {{...}} body {{...}} ...complete CSS..."}}, - {{"filename": "table_name.html", "content": "...list page..."}}, - {{"filename": "table_name_form.html", "content": "...form page..."}} - ], - "tools": [ - {{"filename": "app_helper.bas", "content": "HEAR \"help\"\n TALK \"I can help with...\"\nEND HEAR"}} - ], - "schedulers": [ - {{"filename": "daily_report.bas", "content": "SET SCHEDULE \"0 9 * * *\"\n ...\nEND SCHEDULE"}} - ] -}} +=== OUTPUT FORMAT (STREAMING DELIMITERS) === -CRITICAL RULES: -- For utilities (calculator, timer, converter, BMI, mortgage): tables = [], focus on interactive HTML/JS +Use this EXACT format with delimiters (NOT JSON) so content can stream safely: + +<<>> +name: app-name-lowercase-dashes +description: What this app does +domain: healthcare|sales|inventory|booking|utility|etc +<<>> +<<>> +id:guid:false +created_at:datetime:false:now() +updated_at:datetime:false:now() +field_name:string:true +foreign_key:guid:false:ref:other_table +<<>> +id:guid:false +name:string:true +<<>> +<<>> + + +... complete HTML content here ... + +<<>> +:root {{ --primary: #3b82f6; }} +body {{ margin: 0; font-family: system-ui; }} +... complete CSS content here ... +<<>> + +... complete list page ... +<<>> + +... complete form page ... +<<>> +HEAR "help" + TALK "I can help with..." +END HEAR +<<>> +SET SCHEDULE "0 9 * * *" + data = GET FROM "table" + SEND MAIL TO "admin@example.com" WITH SUBJECT "Daily Report" BODY data +END SCHEDULE +<<>> + +=== TABLE FIELD FORMAT === +Each field on its own line: name:type:nullable[:default][:ref:table] +- Types: guid, string, text, integer, decimal, boolean, date, datetime, json +- nullable: true or false +- default: optional, e.g., now(), 0, '' +- ref:table: optional foreign key reference + +=== CRITICAL RULES === +- For utilities (calculator, timer, converter): TABLES_START/END with nothing between, focus on HTML/JS - For data apps (CRM, inventory): design proper tables and CRUD pages - Generate ALL files completely - no placeholders, no "...", no shortcuts - CSS must be comprehensive with variables, responsive design, dark mode - Every HTML page needs proper structure with all required scripts - Replace APP_NAME_HERE with actual app name in data-app-name attribute - BE CREATIVE - add extra features the user didn't ask for but would love +- Use the EXACT delimiter format above - this allows streaming progress! -Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# +NO QUESTIONS. JUST BUILD."# ); let response = self.call_llm(&prompt, bot_id).await?; - Self::parse_llm_app_response(&response) + Self::parse_streaming_response(&response) } - fn parse_llm_app_response( + /// Parse streaming delimiter format response + fn parse_streaming_response( response: &str, ) -> Result> { + let mut app = LlmGeneratedApp::default(); + + // Find APP_START and APP_END + let start_idx = response.find(DELIM_APP_START); + let end_idx = response.find(DELIM_APP_END); + + let content = match (start_idx, end_idx) { + (Some(s), Some(e)) => &response[s + DELIM_APP_START.len()..e], + (Some(s), None) => { + warn!("No APP_END found, using rest of response"); + &response[s + DELIM_APP_START.len()..] + } + _ => { + // Fallback: try to parse as JSON for backwards compatibility + return Self::parse_json_fallback(response); + } + }; + + let lines: Vec<&str> = content.lines().collect(); + let mut current_section = "header"; + let mut current_table: Option = None; + let mut current_file: Option<(String, String, String)> = None; // (type, filename, content) + + for raw_line in lines.iter() { + let line = raw_line.trim(); + + // Parse header fields + if current_section == "header" { + if line.starts_with("name:") { + app.name = line[5..].trim().to_string(); + continue; + } + if line.starts_with("description:") { + app.description = line[12..].trim().to_string(); + continue; + } + if line.starts_with("domain:") { + app.domain = line[7..].trim().to_string(); + continue; + } + } + + // Section transitions + if line == DELIM_TABLES_START { + current_section = "tables"; + continue; + } + if line == DELIM_TABLES_END { + // Save any pending table + if let Some(table) = current_table.take() { + if !table.name.is_empty() { + app.tables.push(table); + } + } + current_section = "files"; + continue; + } + + // Table definitions + if line.starts_with(DELIM_TABLE_PREFIX) && line.ends_with(DELIM_END) { + // Save previous table + if let Some(table) = current_table.take() { + if !table.name.is_empty() { + app.tables.push(table); + } + } + let table_name = &line[DELIM_TABLE_PREFIX.len()..line.len() - DELIM_END.len()]; + current_table = Some(LlmTable { + name: table_name.to_string(), + fields: Vec::new(), + }); + continue; + } + + // Table field (when in tables section with active table) + if current_section == "tables" && current_table.is_some() && !line.is_empty() && !line.starts_with("<<<") { + if let Some(ref mut table) = current_table { + if let Some(field) = Self::parse_field_line(line) { + table.fields.push(field); + } + } + continue; + } + + // File definitions + if line.starts_with(DELIM_FILE_PREFIX) && line.ends_with(DELIM_END) { + // Save previous file + if let Some((file_type, filename, content)) = current_file.take() { + Self::save_parsed_file(&mut app, &file_type, filename, content); + } + let filename = &line[DELIM_FILE_PREFIX.len()..line.len() - DELIM_END.len()]; + current_file = Some(("file".to_string(), filename.to_string(), String::new())); + continue; + } + + // Tool definitions + if line.starts_with(DELIM_TOOL_PREFIX) && line.ends_with(DELIM_END) { + if let Some((file_type, filename, content)) = current_file.take() { + Self::save_parsed_file(&mut app, &file_type, filename, content); + } + let filename = &line[DELIM_TOOL_PREFIX.len()..line.len() - DELIM_END.len()]; + current_file = Some(("tool".to_string(), filename.to_string(), String::new())); + continue; + } + + // Scheduler definitions + if line.starts_with(DELIM_SCHEDULER_PREFIX) && line.ends_with(DELIM_END) { + if let Some((file_type, filename, content)) = current_file.take() { + Self::save_parsed_file(&mut app, &file_type, filename, content); + } + let filename = &line[DELIM_SCHEDULER_PREFIX.len()..line.len() - DELIM_END.len()]; + current_file = Some(("scheduler".to_string(), filename.to_string(), String::new())); + continue; + } + + // Accumulate file content (use original line to preserve indentation) + if let Some((_, _, ref mut file_content)) = current_file { + if !file_content.is_empty() { + file_content.push('\n'); + } + file_content.push_str(raw_line); + } + } + + // Save any remaining file + if let Some((file_type, filename, content)) = current_file.take() { + Self::save_parsed_file(&mut app, &file_type, filename, content); + } + + // Validate + if app.name.is_empty() { + return Err("No app name found in response".into()); + } + if app.files.is_empty() { + return Err("No files generated".into()); + } + + info!( + "Parsed streaming response: name={}, tables={}, files={}, tools={}, schedulers={}", + app.name, + app.tables.len(), + app.files.len(), + app.tools.len(), + app.schedulers.len() + ); + + Ok(app) + } + + /// Parse a table field line in format: name:type:nullable[:default][:ref:table] + fn parse_field_line(line: &str) -> Option { + let parts: Vec<&str> = line.split(':').collect(); + if parts.len() < 3 { + return None; + } + + let mut field = LlmField { + name: parts[0].trim().to_string(), + field_type: parts[1].trim().to_string(), + nullable: parts[2].trim() == "true", + reference: None, + default: None, + }; + + // Parse optional parts + let mut i = 3; + while i < parts.len() { + if parts[i].trim() == "ref" && i + 1 < parts.len() { + field.reference = Some(parts[i + 1].trim().to_string()); + i += 2; + } else { + // It's a default value + field.default = Some(parts[i].trim().to_string()); + i += 1; + } + } + + Some(field) + } + + /// Save a parsed file to the appropriate collection + fn save_parsed_file(app: &mut LlmGeneratedApp, file_type: &str, filename: String, content: String) { + let file = LlmFile { + filename, + content: content.trim().to_string(), + }; + + match file_type { + "tool" => app.tools.push(file), + "scheduler" => app.schedulers.push(file), + _ => app.files.push(file), + } + } + + /// Fallback to JSON parsing for backwards compatibility + fn parse_json_fallback( + response: &str, + ) -> Result> { + warn!("Falling back to JSON parsing"); + let cleaned = response .trim() .trim_start_matches("```json") @@ -735,8 +972,93 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# .trim_end_matches("```") .trim(); - match serde_json::from_str::(cleaned) { - Ok(app) => { + #[derive(Debug, Deserialize)] + struct JsonApp { + name: String, + description: String, + #[serde(default)] + domain: String, + #[serde(default)] + tables: Vec, + #[serde(default)] + files: Vec, + #[serde(default)] + tools: Option>, + #[serde(default)] + schedulers: Option>, + } + + #[derive(Debug, Deserialize)] + struct JsonTable { + name: String, + fields: Vec, + } + + #[derive(Debug, Deserialize)] + struct JsonField { + name: String, + #[serde(rename = "type")] + field_type: String, + #[serde(default)] + nullable: Option, + #[serde(default)] + reference: Option, + #[serde(default, deserialize_with = "deserialize_default_value")] + default: Option, + } + + #[derive(Debug, Deserialize)] + struct JsonFile { + filename: String, + content: String, + } + + /// Deserialize default value that can be string, bool, number, or null + fn deserialize_default_value<'de, D>(deserializer: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let value: Option = Option::deserialize(deserializer)?; + match value { + None => Ok(None), + Some(serde_json::Value::Null) => Ok(None), + Some(serde_json::Value::String(s)) => Ok(Some(s)), + Some(serde_json::Value::Bool(b)) => Ok(Some(b.to_string())), + Some(serde_json::Value::Number(n)) => Ok(Some(n.to_string())), + Some(v) => Ok(Some(v.to_string())), + } + } + + match serde_json::from_str::(cleaned) { + Ok(json_app) => { + let app = LlmGeneratedApp { + name: json_app.name, + description: json_app.description, + domain: json_app.domain, + tables: json_app.tables.into_iter().map(|t| LlmTable { + name: t.name, + fields: t.fields.into_iter().map(|f| LlmField { + name: f.name, + field_type: f.field_type, + nullable: f.nullable.unwrap_or(true), + reference: f.reference, + default: f.default, + }).collect(), + }).collect(), + files: json_app.files.into_iter().map(|f| LlmFile { + filename: f.filename, + content: f.content, + }).collect(), + tools: json_app.tools.unwrap_or_default().into_iter().map(|f| LlmFile { + filename: f.filename, + content: f.content, + }).collect(), + schedulers: json_app.schedulers.unwrap_or_default().into_iter().map(|f| LlmFile { + filename: f.filename, + content: f.content, + }).collect(), + }; + if app.files.is_empty() { return Err("LLM generated no files".into()); } @@ -762,7 +1084,7 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# name: f.name.clone(), field_type: f.field_type.clone(), is_key: f.name == "id", - is_nullable: f.nullable.unwrap_or(true), + is_nullable: f.nullable, reference_table: f.reference.clone(), default_value: f.default.clone(), field_order: i as i32, @@ -820,23 +1142,101 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# }); let prompt_len = prompt.len(); - info!("[APP_GENERATOR] Starting LLM call: model={}, prompt_len={} chars", model, prompt_len); + trace!("APP_GENERATOR Starting LLM streaming: model={}, prompt_len={}", model, prompt_len); let start = std::time::Instant::now(); + // Use streaming to provide real-time feedback + let (tx, mut rx) = mpsc::channel::(100); + let state = self.state.clone(); + let task_id = self.task_id.clone(); + + // Spawn a task to receive stream chunks and broadcast them + let stream_task = tokio::spawn(async move { + let mut full_response = String::new(); + let mut chunk_buffer = String::new(); + let mut last_emit = std::time::Instant::now(); + let mut chunk_count = 0u32; + let stream_start = std::time::Instant::now(); + + trace!("APP_GENERATOR Stream receiver started"); + + while let Some(chunk) = rx.recv().await { + chunk_count += 1; + full_response.push_str(&chunk); + chunk_buffer.push_str(&chunk); + + // Log progress periodically + if chunk_count == 1 || chunk_count % 500 == 0 { + trace!("APP_GENERATOR Stream progress: {} chunks, {} chars, {:?}", + chunk_count, full_response.len(), stream_start.elapsed()); + } + + // Emit chunks every 100ms or when buffer has enough content + if last_emit.elapsed().as_millis() > 100 || chunk_buffer.len() > 50 { + if let Some(ref tid) = task_id { + state.emit_llm_stream(tid, &chunk_buffer); + } + chunk_buffer.clear(); + last_emit = std::time::Instant::now(); + } + } + + trace!("APP_GENERATOR Stream finished: {} chunks, {} chars in {:?}", + chunk_count, full_response.len(), stream_start.elapsed()); + + // Emit any remaining buffer + if !chunk_buffer.is_empty() { + trace!("APP_GENERATOR Emitting final buffer: {} chars", chunk_buffer.len()); + if let Some(ref tid) = task_id { + state.emit_llm_stream(tid, &chunk_buffer); + } + } + + // Log response preview + if full_response.len() > 0 { + let preview = if full_response.len() > 200 { + format!("{}...", &full_response[..200]) + } else { + full_response.clone() + }; + trace!("APP_GENERATOR Response preview: {}", preview.replace('\n', "\\n")); + } + + full_response + }); + + // Start the streaming LLM call + trace!("APP_GENERATOR Starting generate_stream..."); match self .state .llm_provider - .generate(prompt, &llm_config, &model, &key) + .generate_stream(prompt, &llm_config, tx, &model, &key) .await { - Ok(response) => { - let elapsed = start.elapsed(); - info!("[APP_GENERATOR] LLM call succeeded: response_len={} chars, elapsed={:?}", response.len(), elapsed); - return Ok(response); + Ok(()) => { + trace!("APP_GENERATOR generate_stream completed, waiting for stream_task"); + // Wait for the stream task to complete and get the full response + match stream_task.await { + Ok(response) => { + let elapsed = start.elapsed(); + trace!("APP_GENERATOR LLM streaming succeeded: {} chars in {:?}", response.len(), elapsed); + if response.is_empty() { + error!("APP_GENERATOR Empty response from LLM"); + } + return Ok(response); + } + Err(e) => { + let elapsed = start.elapsed(); + error!("APP_GENERATOR LLM stream task failed after {:?}: {}", elapsed, e); + return Err(format!("Stream task failed: {}", e).into()); + } + } } Err(e) => { let elapsed = start.elapsed(); - error!("[APP_GENERATOR] LLM call failed after {:?}: {}", elapsed, e); + error!("APP_GENERATOR LLM streaming failed after {:?}: {}", elapsed, e); + // Abort the stream task + stream_task.abort(); return Err(e); } } @@ -947,26 +1347,100 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# .ok_or_else(|| format!("Bot not found: {}", bot_id).into()) } + /// Ensure the bucket exists, creating it if necessary + async fn ensure_bucket_exists( + &self, + bucket: &str, + ) -> Result<(), Box> { + if let Some(ref s3) = self.state.drive { + // Check if bucket exists + match s3.head_bucket().bucket(bucket).send().await { + Ok(_) => { + trace!("Bucket {} already exists", bucket); + return Ok(()); + } + Err(_) => { + // Bucket doesn't exist, try to create it + info!("Bucket {} does not exist, creating...", bucket); + match s3.create_bucket().bucket(bucket).send().await { + Ok(_) => { + info!("Created bucket: {}", bucket); + return Ok(()); + } + Err(e) => { + // Check if error is "bucket already exists" (race condition) + let err_str = format!("{:?}", e); + if err_str.contains("BucketAlreadyExists") || err_str.contains("BucketAlreadyOwnedByYou") { + trace!("Bucket {} already exists (race condition)", bucket); + return Ok(()); + } + error!("Failed to create bucket {}: {}", bucket, e); + return Err(Box::new(e)); + } + } + } + } + } else { + // No S3 client, we'll use DB fallback - no bucket needed + trace!("No S3 client, using DB fallback for storage"); + Ok(()) + } + } + async fn write_to_drive( &self, bucket: &str, path: &str, content: &str, ) -> Result<(), Box> { - if let Some(ref s3) = self.state.s3_client { + info!("write_to_drive: bucket={}, path={}, content_len={}", bucket, path, content.len()); + + if let Some(ref s3) = self.state.drive { let body = ByteStream::from(content.as_bytes().to_vec()); let content_type = get_content_type(path); - s3.put_object() + info!("S3 client available, attempting put_object to s3://{}/{}", bucket, path); + + match s3.put_object() .bucket(bucket) .key(path) .body(body) .content_type(content_type) .send() - .await?; + .await + { + Ok(_) => { + info!("Successfully wrote to S3: s3://{}/{}", bucket, path); + } + Err(e) => { + // Log detailed error info + error!("S3 put_object failed: bucket={}, path={}, error={:?}", bucket, path, e); + error!("S3 error details: {}", e); - trace!("Wrote to S3: s3://{}/{}", bucket, path); + // If bucket doesn't exist, try to create it and retry + let err_str = format!("{:?}", e); + if err_str.contains("NoSuchBucket") || err_str.contains("NotFound") { + warn!("Bucket {} not found, attempting to create...", bucket); + self.ensure_bucket_exists(bucket).await?; + + // Retry the write + let body = ByteStream::from(content.as_bytes().to_vec()); + s3.put_object() + .bucket(bucket) + .key(path) + .body(body) + .content_type(get_content_type(path)) + .send() + .await?; + info!("Wrote to S3 after creating bucket: s3://{}/{}", bucket, path); + } else { + error!("S3 write failed (not a bucket issue): {}", err_str); + return Err(Box::new(e)); + } + } + } } else { + warn!("No S3/drive client available, using DB fallback for {}/{}", bucket, path); self.write_to_db_fallback(bucket, path, content)?; } @@ -1034,49 +1508,6 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# }) } - async fn sync_app_to_site_root( - &self, - bucket: &str, - app_name: &str, - bot_id: Uuid, - ) -> Result<(), Box> { - let source_path = format!(".gbdrive/apps/{}", app_name); - let site_path = Self::get_site_path(bot_id); - - if let Some(ref s3) = self.state.s3_client { - let list_result = s3 - .list_objects_v2() - .bucket(bucket) - .prefix(&source_path) - .send() - .await?; - - if let Some(contents) = list_result.contents { - for object in contents { - if let Some(key) = object.key { - let relative_path = - key.trim_start_matches(&source_path).trim_start_matches('/'); - let dest_key = format!("{}/{}/{}", site_path, app_name, relative_path); - - s3.copy_object() - .bucket(bucket) - .copy_source(format!("{}/{}", bucket, key)) - .key(&dest_key) - .send() - .await?; - - trace!("Synced {} to {}", key, dest_key); - } - } - } - } - - let _ = self.store_app_metadata(bot_id, app_name, &format!("{}/{}", site_path, app_name)); - - info!("App synced to site root: {}/{}", site_path, app_name); - Ok(()) - } - fn store_app_metadata( &self, bot_id: Uuid, @@ -1102,9 +1533,7 @@ Respond with valid JSON only. NO QUESTIONS. JUST BUILD."# Ok(()) } - fn get_site_path(_bot_id: Uuid) -> String { - ".gbdrive/site".to_string() - } + fn generate_designer_js(app_name: &str) -> String { format!( diff --git a/src/auto_task/autotask_api.rs b/src/auto_task/autotask_api.rs index 37b0fd299..e18d80d52 100644 --- a/src/auto_task/autotask_api.rs +++ b/src/auto_task/autotask_api.rs @@ -319,69 +319,78 @@ pub async fn create_and_execute_handler( let task_id = Uuid::new_v4(); if let Err(e) = create_task_record(&state, task_id, &session, &request.intent) { error!("Failed to create task record: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(CreateAndExecuteResponse { + success: false, + task_id: String::new(), + status: "error".to_string(), + message: format!("Failed to create task: {}", e), + app_url: None, + created_resources: Vec::new(), + pending_items: Vec::new(), + error: Some(e.to_string()), + }), + ); } // Update status to running let _ = update_task_status_db(&state, task_id, "running", None); - // Use IntentClassifier to classify and process with task tracking - let classifier = IntentClassifier::new(Arc::clone(&state)); + // Clone what we need for the background task + let state_clone = Arc::clone(&state); + let intent = request.intent.clone(); + let session_clone = session.clone(); + let task_id_str = task_id.to_string(); - match classifier - .classify_and_process_with_task_id(&request.intent, &session, Some(task_id.to_string())) - .await - { - Ok(result) => { - let status = if result.success { - "completed" - } else { - "failed" - }; - let _ = update_task_status_db(&state, task_id, status, result.error.as_deref()); + // Spawn background task to do the actual work + tokio::spawn(async move { + info!("[AUTOTASK] Background task started for task_id={}", task_id_str); - // Get any pending items (ASK LATER) - let pending_items = get_pending_items_for_bot(&state, session.bot_id); + // Use IntentClassifier to classify and process with task tracking + let classifier = IntentClassifier::new(state_clone.clone()); - ( - StatusCode::OK, - Json(CreateAndExecuteResponse { - success: result.success, - task_id: task_id.to_string(), - status: status.to_string(), - message: result.message, - app_url: result.app_url, - created_resources: result - .created_resources - .into_iter() - .map(|r| CreatedResourceResponse { - resource_type: r.resource_type, - name: r.name, - path: r.path, - }) - .collect(), - pending_items, - error: result.error, - }), - ) + match classifier + .classify_and_process_with_task_id(&intent, &session_clone, Some(task_id_str.clone())) + .await + { + Ok(result) => { + let status = if result.success { + "completed" + } else { + "failed" + }; + let _ = update_task_status_db(&state_clone, task_id, status, result.error.as_deref()); + info!( + "[AUTOTASK] Background task completed: task_id={}, status={}, message={}", + task_id_str, status, result.message + ); + } + Err(e) => { + let _ = update_task_status_db(&state_clone, task_id, "failed", Some(&e.to_string())); + error!( + "[AUTOTASK] Background task failed: task_id={}, error={}", + task_id_str, e + ); + } } - Err(e) => { - let _ = update_task_status_db(&state, task_id, "failed", Some(&e.to_string())); - error!("Failed to process intent: {}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(CreateAndExecuteResponse { - success: false, - task_id: task_id.to_string(), - status: "failed".to_string(), - message: "Failed to process request".to_string(), - app_url: None, - created_resources: Vec::new(), - pending_items: Vec::new(), - error: Some(e.to_string()), - }), - ) - } - } + }); + + // Return immediately with task_id - client will poll for status + info!("[AUTOTASK] Returning immediately with task_id={}", task_id); + ( + StatusCode::ACCEPTED, + Json(CreateAndExecuteResponse { + success: true, + task_id: task_id.to_string(), + status: "running".to_string(), + message: "Task started, poll for status".to_string(), + app_url: None, + created_resources: Vec::new(), + pending_items: Vec::new(), + error: None, + }), + ) } pub async fn classify_intent_handler( @@ -754,6 +763,42 @@ pub async fn list_tasks_handler( } } +/// Get a single task by ID - used for polling task status +pub async fn get_task_handler( + State(state): State>, + Path(task_id): Path, +) -> impl IntoResponse { + info!("Getting task: {}", task_id); + + match get_auto_task_by_id(&state, &task_id) { + Ok(Some(task)) => { + let error_str = task.error.as_ref().map(|e| e.message.clone()); + (StatusCode::OK, Json(serde_json::json!({ + "id": task.id, + "name": task.title, + "description": task.intent, + "status": format!("{:?}", task.status).to_lowercase(), + "progress": task.progress, + "current_step": task.current_step, + "total_steps": task.total_steps, + "error": error_str, + "created_at": task.created_at, + "updated_at": task.updated_at, + "completed_at": task.completed_at, + }))) + }, + Ok(None) => (StatusCode::NOT_FOUND, Json(serde_json::json!({ + "error": "Task not found" + }))), + Err(e) => { + error!("Failed to get task {}: {}", task_id, e); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ + "error": e.to_string() + }))) + } + } +} + pub async fn get_stats_handler(State(state): State>) -> impl IntoResponse { match get_auto_task_stats(&state) { Ok(stats) => (StatusCode::OK, Json(stats)), @@ -1366,6 +1411,102 @@ fn start_task_execution( Ok(()) } +/// Get a single auto task by ID +fn get_auto_task_by_id( + state: &Arc, + task_id: &str, +) -> Result, Box> { + let mut conn = state.conn.get()?; + + #[derive(QueryableByName)] + struct TaskRow { + #[diesel(sql_type = Text)] + id: String, + #[diesel(sql_type = Text)] + title: String, + #[diesel(sql_type = Text)] + intent: String, + #[diesel(sql_type = Text)] + status: String, + #[diesel(sql_type = diesel::sql_types::Float8)] + progress: f64, + #[diesel(sql_type = diesel::sql_types::Nullable)] + current_step: Option, + #[diesel(sql_type = diesel::sql_types::Nullable)] + error: Option, + #[diesel(sql_type = diesel::sql_types::Timestamptz)] + created_at: chrono::DateTime, + #[diesel(sql_type = diesel::sql_types::Timestamptz)] + updated_at: chrono::DateTime, + #[diesel(sql_type = diesel::sql_types::Nullable)] + completed_at: Option>, + } + + let query = format!( + "SELECT id::text, title, intent, status, progress, current_step, error, created_at, updated_at, completed_at \ + FROM auto_tasks WHERE id = '{}'", + task_id + ); + + let rows: Vec = sql_query(&query).get_results(&mut conn).unwrap_or_default(); + + if let Some(r) = rows.into_iter().next() { + Ok(Some(AutoTask { + id: r.id, + title: r.title.clone(), + intent: r.intent, + status: match r.status.as_str() { + "running" => AutoTaskStatus::Running, + "completed" => AutoTaskStatus::Completed, + "failed" => AutoTaskStatus::Failed, + "paused" => AutoTaskStatus::Paused, + "cancelled" => AutoTaskStatus::Cancelled, + _ => AutoTaskStatus::Draft, + }, + mode: ExecutionMode::FullyAutomatic, + priority: TaskPriority::Medium, + plan_id: None, + basic_program: None, + current_step: r.current_step.as_ref().and_then(|s| s.parse().ok()).unwrap_or(0), + total_steps: 0, + progress: r.progress, + step_results: Vec::new(), + pending_decisions: Vec::new(), + pending_approvals: Vec::new(), + risk_summary: None, + resource_usage: Default::default(), + error: r.error.map(|msg| crate::auto_task::task_types::TaskError { + code: "TASK_ERROR".to_string(), + message: msg, + details: None, + recoverable: false, + step_id: None, + occurred_at: Utc::now(), + }), + rollback_state: None, + session_id: String::new(), + bot_id: String::new(), + created_by: String::new(), + assigned_to: String::new(), + schedule: None, + tags: Vec::new(), + parent_task_id: None, + subtask_ids: Vec::new(), + depends_on: Vec::new(), + dependents: Vec::new(), + mcp_servers: Vec::new(), + external_apis: Vec::new(), + created_at: r.created_at, + updated_at: r.updated_at, + started_at: None, + completed_at: r.completed_at, + estimated_completion: None, + })) + } else { + Ok(None) + } +} + fn list_auto_tasks( state: &Arc, filter: &str, diff --git a/src/auto_task/intent_classifier.rs b/src/auto_task/intent_classifier.rs index 064deaeee..9f5abd51a 100644 --- a/src/auto_task/intent_classifier.rs +++ b/src/auto_task/intent_classifier.rs @@ -609,13 +609,12 @@ Respond with JSON only: let mut conn = self.state.conn.get()?; - // Insert into tasks table + // Insert into tasks table (no bot_id column in tasks table) sql_query( - "INSERT INTO tasks (id, bot_id, title, description, status, priority, created_at) - VALUES ($1, $2, $3, $4, 'pending', 'normal', NOW())", + "INSERT INTO tasks (id, title, description, status, priority, created_at) + VALUES ($1, $2, $3, 'pending', 'normal', NOW())", ) .bind::(task_id) - .bind::(session.bot_id) .bind::(&title) .bind::(&classification.original_text) .execute(&mut conn)?; diff --git a/src/auto_task/mod.rs b/src/auto_task/mod.rs index 0f7207de2..aadabe3bd 100644 --- a/src/auto_task/mod.rs +++ b/src/auto_task/mod.rs @@ -22,7 +22,7 @@ pub use autotask_api::{ apply_recommendation_handler, cancel_task_handler, classify_intent_handler, compile_intent_handler, create_and_execute_handler, execute_plan_handler, execute_task_handler, get_approvals_handler, get_decisions_handler, get_pending_items_handler, get_stats_handler, - get_task_logs_handler, list_tasks_handler, pause_task_handler, resume_task_handler, + get_task_handler, get_task_logs_handler, list_tasks_handler, pause_task_handler, resume_task_handler, simulate_plan_handler, simulate_task_handler, submit_approval_handler, submit_decision_handler, submit_pending_item_handler, }; @@ -59,6 +59,10 @@ pub fn configure_autotask_routes() -> axum::Router Router> { @@ -36,17 +36,17 @@ pub async fn serve_app_index( State(state): State>, Path(params): Path, ) -> impl IntoResponse { - serve_app_file_internal(&state, ¶ms.app_name, "index.html") + serve_app_file_internal(&state, ¶ms.app_name, "index.html").await } pub async fn serve_app_file( State(state): State>, Path(params): Path, ) -> impl IntoResponse { - serve_app_file_internal(&state, ¶ms.app_name, ¶ms.file_path) + serve_app_file_internal(&state, ¶ms.app_name, ¶ms.file_path).await } -fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &str) -> Response { +async fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &str) -> Response { let sanitized_app_name = sanitize_path_component(app_name); let sanitized_file_path = sanitize_path_component(file_path); @@ -54,6 +54,55 @@ fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &str) -> return (StatusCode::BAD_REQUEST, "Invalid path").into_response(); } + // Get bot name from bucket_name config (default to "default") + let bot_name = state.bucket_name + .trim_end_matches(".gbai") + .to_string(); + let sanitized_bot_name = bot_name.to_lowercase().replace(' ', "-").replace('_', "-"); + + // MinIO bucket and path: botname.gbai / botname.gbapp/appname/file + let bucket = format!("{}.gbai", sanitized_bot_name); + let key = format!("{}.gbapp/{}/{}", sanitized_bot_name, sanitized_app_name, sanitized_file_path); + + info!("Serving app file from MinIO: bucket={}, key={}", bucket, key); + + // Try to serve from MinIO + if let Some(ref drive) = state.drive { + match drive + .get_object() + .bucket(&bucket) + .key(&key) + .send() + .await + { + Ok(response) => { + match response.body.collect().await { + Ok(body) => { + let content = body.into_bytes(); + let content_type = get_content_type(&sanitized_file_path); + + return Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, content_type) + .header(header::CACHE_CONTROL, "public, max-age=3600") + .body(Body::from(content.to_vec())) + .unwrap_or_else(|_| { + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to build response") + .into_response() + }); + } + Err(e) => { + error!("Failed to read MinIO response body: {}", e); + } + } + } + Err(e) => { + warn!("MinIO get_object failed for {}/{}: {}", bucket, key, e); + } + } + } + + // Fallback to filesystem if MinIO fails let site_path = state .config .as_ref() @@ -61,11 +110,11 @@ fn serve_app_file_internal(state: &AppState, app_name: &str, file_path: &str) -> .unwrap_or_else(|| "./botserver-stack/sites".to_string()); let full_path = format!( - "{}/{}/{}", - site_path, sanitized_app_name, sanitized_file_path + "{}/{}.gbai/{}.gbapp/{}/{}", + site_path, sanitized_bot_name, sanitized_bot_name, sanitized_app_name, sanitized_file_path ); - trace!("Serving app file: {full_path}"); + trace!("Fallback: serving app file from filesystem: {full_path}"); let path = std::path::Path::new(&full_path); if !path.exists() { diff --git a/src/core/shared/state.rs b/src/core/shared/state.rs index ce387874f..30313b6d3 100644 --- a/src/core/shared/state.rs +++ b/src/core/shared/state.rs @@ -158,6 +158,8 @@ pub struct TaskProgressEvent { pub error: Option, #[serde(skip_serializing_if = "Option::is_none")] pub activity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, } impl TaskProgressEvent { @@ -174,6 +176,24 @@ impl TaskProgressEvent { details: None, error: None, activity: None, + text: None, + } + } + + pub fn llm_stream(task_id: impl Into, text: impl Into) -> Self { + Self { + event_type: "llm_stream".to_string(), + task_id: task_id.into(), + step: "llm_stream".to_string(), + message: String::new(), + progress: 0, + total_steps: 0, + current_step: 0, + timestamp: chrono::Utc::now().to_rfc3339(), + details: None, + error: None, + activity: None, + text: Some(text.into()), } } @@ -181,7 +201,7 @@ impl TaskProgressEvent { pub fn with_progress(mut self, current: u8, total: u8) -> Self { self.current_step = current; self.total_steps = total; - self.progress = if total > 0 { (current * 100) / total } else { 0 }; + self.progress = if total > 0 { ((current as u16 * 100) / total as u16) as u8 } else { 0 }; self } @@ -224,6 +244,7 @@ impl TaskProgressEvent { details: None, error: None, activity: None, + text: None, } } } @@ -473,6 +494,14 @@ impl AppState { .with_error(error); self.broadcast_task_progress(event); } + + pub fn emit_llm_stream(&self, task_id: &str, text: &str) { + let event = TaskProgressEvent::llm_stream(task_id, text); + if let Some(tx) = &self.task_progress_broadcast { + // Don't log every stream chunk - too noisy + let _ = tx.send(event); + } + } } #[cfg(test)] diff --git a/src/core/urls.rs b/src/core/urls.rs index b7a2c4cc4..45632a7c7 100644 --- a/src/core/urls.rs +++ b/src/core/urls.rs @@ -165,6 +165,7 @@ impl ApiUrls { pub const AUTOTASK_EXECUTE: &'static str = "/api/autotask/execute"; pub const AUTOTASK_SIMULATE: &'static str = "/api/autotask/simulate/:plan_id"; pub const AUTOTASK_LIST: &'static str = "/api/autotask/list"; + pub const AUTOTASK_GET: &'static str = "/api/autotask/tasks/:task_id"; pub const AUTOTASK_STATS: &'static str = "/api/autotask/stats"; pub const AUTOTASK_PAUSE: &'static str = "/api/autotask/:task_id/pause"; pub const AUTOTASK_RESUME: &'static str = "/api/autotask/:task_id/resume"; diff --git a/src/llm/claude.rs b/src/llm/claude.rs index 4ed1239b6..e360385ec 100644 --- a/src/llm/claude.rs +++ b/src/llm/claude.rs @@ -1,14 +1,22 @@ use async_trait::async_trait; -use futures::StreamExt; -use log::{info, trace, warn}; +use futures_util::StreamExt; +use log::{error, info, trace, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::time::Duration; use tokio::sync::mpsc; +use tokio::time::Instant; use super::{llm_models::get_handler, LLMProvider}; -const LLM_TIMEOUT_SECS: u64 = 300; +// Configuration matching Node.js proxy exactly +const MAX_RETRIES: u32 = 5; +const INITIAL_DELAY_MS: u64 = 1000; +const MAX_DELAY_MS: u64 = 30000; +const BACKOFF_MULTIPLIER: f64 = 2.0; +const TIMEOUT_MS: u64 = 60000; +const STREAMING_TIMEOUT_MS: u64 = 180000; // 3 minutes for streaming +const ACTIVITY_TIMEOUT_MS: u64 = 30000; // 30 seconds no data = timeout #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClaudeMessage { @@ -47,9 +55,10 @@ pub struct ClaudeResponse { pub stop_reason: Option, } +// SSE event structures - Anthropic format #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClaudeStreamDelta { - #[serde(rename = "type")] + #[serde(rename = "type", default)] pub delta_type: String, #[serde(default)] pub text: String, @@ -65,80 +74,162 @@ pub struct ClaudeStreamEvent { pub index: Option, } -#[derive(Debug)] +// Azure/OpenAI style streaming structures +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AzureStreamChoice { + #[serde(default)] + pub index: u32, + #[serde(default)] + pub delta: Option, + #[serde(default)] + pub finish_reason: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AzureStreamDelta { + #[serde(default)] + pub role: Option, + #[serde(default)] + pub content: Option, + #[serde(default)] + pub reasoning_content: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AzureStreamChunk { + #[serde(default)] + pub id: String, + #[serde(default)] + pub object: String, + #[serde(default)] + pub choices: Vec, +} + pub struct ClaudeClient { - client: reqwest::Client, base_url: String, deployment_name: String, - is_azure: bool, } impl ClaudeClient { pub fn new(base_url: String, deployment_name: Option) -> Self { - let is_azure = base_url.contains("azure.com") || base_url.contains("openai.azure.com"); - - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(LLM_TIMEOUT_SECS)) - .connect_timeout(Duration::from_secs(30)) - .build() - .unwrap_or_else(|_| reqwest::Client::new()); - Self { - client, base_url, deployment_name: deployment_name.unwrap_or_else(|| "claude-opus-4-5".to_string()), - is_azure, } } pub fn azure(endpoint: String, deployment_name: String) -> Self { - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(LLM_TIMEOUT_SECS)) - .connect_timeout(Duration::from_secs(30)) - .build() - .unwrap_or_else(|_| reqwest::Client::new()); - Self { - client, base_url: endpoint, deployment_name, - is_azure: true, } } - fn build_url(&self) -> String { - if self.is_azure { - format!("{}/v1/messages", self.base_url.trim_end_matches('/')) + fn get_retry_delay(attempt: u32) -> Duration { + let delay = (INITIAL_DELAY_MS as f64 * BACKOFF_MULTIPLIER.powi(attempt as i32)) + .min(MAX_DELAY_MS as f64); + // Add jitter like Node.js: delay * 0.25 * (Math.random() * 2 - 1) + let jitter = delay * 0.25 * (rand::random::() * 2.0 - 1.0); + Duration::from_millis((delay + jitter) as u64) + } + + fn is_retryable_error(err_msg: &str, status_code: Option) -> bool { + // Retryable status codes matching Node.js: [408, 429, 500, 502, 503, 504] + if let Some(code) = status_code { + if [408, 429, 500, 502, 503, 504].contains(&code) { + return true; + } + } + + // Retryable error patterns matching Node.js + let msg = err_msg.to_lowercase(); + msg.contains("timeout") + || msg.contains("econnreset") + || msg.contains("etimedout") + || msg.contains("enotfound") + || msg.contains("econnrefused") + || msg.contains("epipe") + || msg.contains("ehostunreach") + || msg.contains("aborted") + || msg.contains("socket hang up") + || msg.contains("network") + || msg.contains("connection reset") + || msg.contains("broken pipe") + || msg.contains("connection closed") + } + + fn build_messages_from_value( + &self, + prompt: &str, + messages: &Value, + ) -> (Option, Vec) { + let empty_vec = vec![]; + + let mut claude_messages: Vec = if messages.is_array() { + let arr = messages.as_array().unwrap_or(&empty_vec); + if arr.is_empty() { + vec![ClaudeMessage { + role: "user".to_string(), + content: prompt.to_string(), + }] + } else { + arr.iter() + .filter_map(|m| { + let role = m["role"].as_str().unwrap_or("user"); + let content = m["content"].as_str().unwrap_or(""); + if role == "system" + || role == "episodic" + || role == "compact" + || content.is_empty() + { + None + } else { + let normalized_role = match role { + "user" | "assistant" => role.to_string(), + _ => "user".to_string(), + }; + Some(ClaudeMessage { + role: normalized_role, + content: content.to_string(), + }) + } + }) + .collect() + } } else { - format!("{}/v1/messages", self.base_url.trim_end_matches('/')) - } - } + vec![ClaudeMessage { + role: "user".to_string(), + content: prompt.to_string(), + }] + }; - fn build_headers(&self, api_key: &str) -> reqwest::header::HeaderMap { - let mut headers = reqwest::header::HeaderMap::new(); - - if let Ok(val) = api_key.parse() { - headers.insert("x-api-key", val); - } - if let Ok(val) = "2023-06-01".parse() { - headers.insert("anthropic-version", val); + if claude_messages.is_empty() && !prompt.is_empty() { + claude_messages.push(ClaudeMessage { + role: "user".to_string(), + content: prompt.to_string(), + }); } - if let Ok(val) = "application/json".parse() { - headers.insert(reqwest::header::CONTENT_TYPE, val); - } + // Extract system messages + let system_prompt: Option = if messages.is_array() { + let system_text: String = messages + .as_array() + .unwrap_or(&empty_vec) + .iter() + .filter(|m| m["role"].as_str() == Some("system")) + .map(|m| m["content"].as_str().unwrap_or("").to_string()) + .collect::>() + .join("\n\n"); + if system_text.is_empty() { + None + } else { + Some(system_text) + } + } else { + None + }; - headers - } - - fn normalize_role(role: &str) -> Option<(String, bool)> { - match role { - "user" => Some(("user".to_string(), false)), - "assistant" => Some(("assistant".to_string(), false)), - "system" => None, - "episodic" | "compact" => Some(("user".to_string(), true)), - _ => Some(("user".to_string(), false)), - } + (system_prompt, claude_messages) } pub fn build_messages( @@ -146,6 +237,7 @@ impl ClaudeClient { context_data: &str, history: &[(String, String)], ) -> (Option, Vec) { + let mut messages = Vec::new(); let mut system_parts = Vec::new(); if !system_prompt.is_empty() { @@ -167,20 +259,37 @@ impl ClaudeClient { Some(system_parts.join("\n\n")) }; - let messages: Vec = history - .iter() - .filter_map(|(role, content)| { - match Self::normalize_role(role) { - Some((normalized_role, is_context)) if !is_context => { - Some(ClaudeMessage { - role: normalized_role, - content: content.clone(), - }) - } - _ => None, + let mut last_role: Option = None; + + for (role, content) in history { + let normalized_role = match role.as_str() { + "user" => Some("user".to_string()), + "assistant" => Some("assistant".to_string()), + "system" | "episodic" | "compact" => None, + _ => Some("user".to_string()), + }; + + if let Some(norm_role) = normalized_role { + if content.is_empty() { + continue; } - }) - .collect(); + + if Some(&norm_role) == last_role.as_ref() { + if let Some(last_msg) = messages.last_mut() { + let last_msg: &mut ClaudeMessage = last_msg; + last_msg.content.push_str("\n\n"); + last_msg.content.push_str(content); + continue; + } + } + + messages.push(ClaudeMessage { + role: norm_role.clone(), + content: content.clone(), + }); + last_role = Some(norm_role); + } + } (system, messages) } @@ -194,6 +303,318 @@ impl ClaudeClient { .collect::>() .join("") } + + /// Process SSE data and extract text - handles both Anthropic and Azure formats + fn process_sse_data(&self, data: &str, model_name: &str) -> Option { + if data == "[DONE]" { + return None; + } + + let handler = get_handler(model_name); + + // Try Azure/OpenAI format first (chat.completion.chunk) + if let Ok(chunk) = serde_json::from_str::(data) { + if chunk.object == "chat.completion.chunk" || !chunk.choices.is_empty() { + for choice in &chunk.choices { + if let Some(delta) = &choice.delta { + // Get content (prefer content over reasoning_content) + let text = delta + .content + .as_deref() + .or(delta.reasoning_content.as_deref()) + .unwrap_or(""); + + if !text.is_empty() { + let processed = handler.process_content(text); + if !processed.is_empty() { + return Some(processed); + } + } + } + } + return None; + } + } + + // Try standard Anthropic SSE format + if let Ok(event) = serde_json::from_str::(data) { + match event.event_type.as_str() { + "content_block_delta" => { + if let Some(delta) = event.delta { + if delta.delta_type == "text_delta" && !delta.text.is_empty() { + let processed = handler.process_content(&delta.text); + if !processed.is_empty() { + return Some(processed); + } + } + } + } + "message_start" => trace!("CLAUDE message_start"), + "content_block_start" => trace!("CLAUDE content_block_start"), + "content_block_stop" => trace!("CLAUDE content_block_stop"), + "message_stop" => trace!("CLAUDE message_stop"), + "message_delta" => trace!("CLAUDE message_delta"), + "error" => { + error!("CLAUDE Error event: {}", data); + } + _ => trace!("CLAUDE Event: {}", event.event_type), + } + } + + None + } + + /// Streaming implementation using reqwest - mimics Node.js https.request with res.on('data') + async fn stream_with_reqwest( + &self, + request_body: String, + api_key: &str, + tx: mpsc::Sender, + model_name: &str, + ) -> Result<(), Box> { + let url = format!("{}/v1/messages", self.base_url.trim_end_matches('/')); + let start_time = Instant::now(); + + trace!( + "CLAUDE Streaming request to {} body_len={}", + url, + request_body.len() + ); + + // Build client matching Node.js httpsAgent configuration + // IMPORTANT: NO timeout() here - it causes premature stream closure! + // Node.js doesn't set a timeout on the response body, only on connect/headers + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_millis(TIMEOUT_MS)) + .pool_idle_timeout(Duration::from_secs(30)) // keepAliveMsecs: 30000 + .pool_max_idle_per_host(10) // maxFreeSockets: 10 + .tcp_nodelay(true) + .tcp_keepalive(Duration::from_secs(60)) + .http1_only() // Force HTTP/1.1 like Node.js + .build()?; + + // Send request with headers matching Node.js exactly + let response = client + .post(&url) + .header("Content-Type", "application/json") + .header("x-api-key", api_key) + .header("anthropic-version", "2023-06-01") + .header("Accept", "text/event-stream") + .header("Connection", "keep-alive") + .body(request_body) + .send() + .await?; + + let status = response.status(); + trace!("CLAUDE Response status: {} in {:?}", status, start_time.elapsed()); + + if !status.is_success() { + let error_text = response.text().await.unwrap_or_default(); + error!("[CLAUDE] Error response: {}", error_text); + return Err(format!("HTTP {}: {}", status, error_text).into()); + } + + // Stream response body - this is like Node.js res.on('data') + let mut stream = response.bytes_stream(); + let mut sse_buffer = String::new(); + let mut text_chunks_sent = 0usize; + let mut total_bytes = 0usize; + let mut chunk_count = 0usize; + let mut last_activity = Instant::now(); + let mut done_received = false; + + trace!("CLAUDE Starting stream read"); + + loop { + // Check overall timeout + if start_time.elapsed() > Duration::from_millis(STREAMING_TIMEOUT_MS) { + error!( + "CLAUDE Overall streaming timeout {}ms exceeded", + STREAMING_TIMEOUT_MS + ); + break; + } + + // Check activity timeout + if last_activity.elapsed() > Duration::from_millis(ACTIVITY_TIMEOUT_MS) { + if done_received || text_chunks_sent > 0 { + trace!( + "CLAUDE Activity timeout but {} chunks sent, treating as complete", + text_chunks_sent + ); + break; + } + error!( + "CLAUDE Activity timeout - no data for {}ms", + ACTIVITY_TIMEOUT_MS + ); + return Err(format!( + "Stream activity timeout - no data for {}ms", + ACTIVITY_TIMEOUT_MS + ) + .into()); + } + + // Read next chunk with timeout - like Node.js res.on('data') + let chunk_result = tokio::time::timeout( + Duration::from_millis(ACTIVITY_TIMEOUT_MS), + stream.next(), + ) + .await; + + let chunk = match chunk_result { + Ok(Some(Ok(bytes))) => { + last_activity = Instant::now(); + bytes + } + Ok(Some(Err(e))) => { + error!("CLAUDE Stream read error: {}", e); + if text_chunks_sent > 0 { + warn!( + "CLAUDE Had {} chunks before error, treating as partial success", + text_chunks_sent + ); + break; + } + return Err(format!("Stream read error: {}", e).into()); + } + Ok(None) => { + trace!( + "CLAUDE Stream ended after {} chunks, {} bytes, {} text chunks", + chunk_count, total_bytes, text_chunks_sent + ); + break; + } + Err(_) => { + if text_chunks_sent > 0 || done_received { + trace!( + "CLAUDE Timeout but {} chunks sent, treating as complete", + text_chunks_sent + ); + break; + } + error!("CLAUDE Timeout waiting for stream data"); + return Err("Timeout waiting for stream data".into()); + } + }; + + chunk_count += 1; + total_bytes += chunk.len(); + let chunk_str = String::from_utf8_lossy(&chunk); + + + + sse_buffer.push_str(&chunk_str); + + // Process complete SSE events in buffer + while let Some(event_end) = sse_buffer.find("\n\n") { + let event_block = sse_buffer[..event_end].to_string(); + sse_buffer = sse_buffer[event_end + 2..].to_string(); + + // Parse SSE event lines + for line in event_block.lines() { + let line = line.trim(); + + if line.is_empty() || line.starts_with("event:") { + continue; + } + + if !line.starts_with("data: ") { + continue; + } + + let data = &line[6..]; + + if data == "[DONE]" { + trace!( + "CLAUDE Received DONE - {} chunks, {} bytes in {:?}", + text_chunks_sent, + total_bytes, + start_time.elapsed() + ); + done_received = true; + continue; + } + + // Process SSE data and send text chunks + if let Some(text) = self.process_sse_data(data, model_name) { + if tx.send(text.clone()).await.is_err() { + warn!("CLAUDE Receiver dropped, stopping stream"); + return Ok(()); + } + text_chunks_sent += 1; + } + } + } + } + + // Process any remaining data in buffer + if !sse_buffer.is_empty() { + for line in sse_buffer.lines() { + let line = line.trim(); + if line.starts_with("data: ") { + let data = &line[6..]; + if data != "[DONE]" { + if let Some(text) = self.process_sse_data(data, model_name) { + let _ = tx.send(text).await; + text_chunks_sent += 1; + } + } + } + } + } + + trace!( + "CLAUDE Stream complete: {} chunks, {} bytes, {} text in {:?}", + chunk_count, total_bytes, text_chunks_sent, start_time.elapsed() + ); + + if text_chunks_sent == 0 && !done_received { + warn!("CLAUDE No text chunks sent and no DONE received"); + } + + Ok(()) + } + + /// Single streaming attempt with full error handling + async fn stream_single_attempt( + &self, + prompt: &str, + messages: &Value, + tx: mpsc::Sender, + model: &str, + key: &str, + ) -> Result<(), Box> { + let model_name = if model.is_empty() { + &self.deployment_name + } else { + model + }; + let (system, claude_messages) = self.build_messages_from_value(prompt, messages); + + if claude_messages.is_empty() { + return Err("No messages to send".into()); + } + + let request = ClaudeRequest { + model: model_name.to_string(), + max_tokens: 16000, + messages: claude_messages, + system, + stream: Some(true), + }; + + let request_body = serde_json::to_string(&request)?; + trace!( + "CLAUDE Streaming request: model={}, messages={}, body_len={}", + model_name, + request.messages.len(), + request_body.len() + ); + + self.stream_with_reqwest(request_body, key, tx, model_name) + .await + } } #[async_trait] @@ -205,75 +626,15 @@ impl LLMProvider for ClaudeClient { model: &str, key: &str, ) -> Result> { - let url = self.build_url(); - let headers = self.build_headers(key); - let model_name = if model.is_empty() { &self.deployment_name } else { model }; - - let empty_vec = vec![]; - let mut claude_messages: Vec = if messages.is_array() { - let arr = messages.as_array().unwrap_or(&empty_vec); - if arr.is_empty() { - vec![ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }] - } else { - arr.iter() - .filter_map(|m| { - let role = m["role"].as_str().unwrap_or("user"); - let content = m["content"].as_str().unwrap_or(""); - if role == "system" || role == "episodic" || role == "compact" || content.is_empty() { - None - } else { - let normalized_role = match role { - "user" | "assistant" => role.to_string(), - _ => "user".to_string(), - }; - Some(ClaudeMessage { - role: normalized_role, - content: content.to_string(), - }) - } - }) - .collect() - } - } else { - vec![ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }] - }; - - if claude_messages.is_empty() && !prompt.is_empty() { - claude_messages.push(ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }); - } - - let system_prompt: Option = if messages.is_array() { - messages - .as_array() - .unwrap_or(&empty_vec) - .iter() - .filter(|m| m["role"].as_str() == Some("system")) - .map(|m| m["content"].as_str().unwrap_or("").to_string()) - .collect::>() - .join("\n\n") - .into() - } else { - None - }; - - let system = system_prompt.filter(|s| !s.is_empty()); + let (system, claude_messages) = self.build_messages_from_value(prompt, messages); if claude_messages.is_empty() { - return Err("Cannot send request to Claude: no messages with content".into()); + return Err("No messages to send".into()); } let request = ClaudeRequest { @@ -284,63 +645,39 @@ impl LLMProvider for ClaudeClient { stream: None, }; - info!("Claude request to {url}: model={model_name}"); - trace!("Claude request body: {:?}", serde_json::to_string(&request)); + let body = serde_json::to_string(&request)?; + trace!("CLAUDE request: model={}, body_len={}", model_name, body.len()); - let start_time = std::time::Instant::now(); + let start = Instant::now(); + let url = format!("{}/v1/messages", self.base_url.trim_end_matches('/')); - let response = match self - .client + // Use reqwest for non-streaming + let client = reqwest::Client::builder() + .timeout(Duration::from_millis(TIMEOUT_MS)) + .build()?; + + let response = client .post(&url) - .headers(headers) - .json(&request) + .header("Content-Type", "application/json") + .header("x-api-key", key) + .header("anthropic-version", "2023-06-01") + .body(body) .send() - .await - { - Ok(resp) => resp, - Err(e) => { - let elapsed = start_time.elapsed(); - if e.is_timeout() { - warn!("Claude request timed out after {elapsed:?} (limit: {LLM_TIMEOUT_SECS}s)"); - return Err(format!("Claude request timed out after {LLM_TIMEOUT_SECS}s").into()); - } - warn!("Claude request failed after {elapsed:?}: {e}"); - return Err(e.into()); - } - }; + .await?; - let elapsed = start_time.elapsed(); let status = response.status(); - if !status.is_success() { let error_text = response.text().await.unwrap_or_default(); - warn!("Claude API error ({status}) after {elapsed:?}: {error_text}"); - return Err(format!("Claude API error ({status}): {error_text}").into()); + return Err(format!("Claude API error ({}): {}", status, error_text).into()); } - info!("Claude response received in {elapsed:?}, status={status}"); - - let result: ClaudeResponse = match response.json().await { - Ok(r) => r, - Err(e) => { - warn!("Failed to parse Claude response: {e}"); - return Err(format!("Failed to parse Claude response: {e}").into()); - } - }; - - let raw_content = self.extract_text_from_response(&result); - let content_len = raw_content.len(); - - info!( - "Claude response parsed: id={}, stop_reason={:?}, content_length={content_len}", - result.id, - result.stop_reason - ); + trace!("CLAUDE response in {:?}, status={}", start.elapsed(), status); + let result: ClaudeResponse = response.json().await?; + let content = self.extract_text_from_response(&result); let handler = get_handler(model_name); - let content = handler.process_content(&raw_content); - Ok(content) + Ok(handler.process_content(&content)) } async fn generate_stream( @@ -351,160 +688,43 @@ impl LLMProvider for ClaudeClient { model: &str, key: &str, ) -> Result<(), Box> { - let url = self.build_url(); - let headers = self.build_headers(key); + let mut last_error: Option> = None; - let model_name = if model.is_empty() { - &self.deployment_name - } else { - model - }; - - let empty_vec = vec![]; - let mut claude_messages: Vec = if messages.is_array() { - let arr = messages.as_array().unwrap_or(&empty_vec); - if arr.is_empty() { - vec![ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }] - } else { - arr.iter() - .filter_map(|m| { - let role = m["role"].as_str().unwrap_or("user"); - let content = m["content"].as_str().unwrap_or(""); - if role == "system" || role == "episodic" || role == "compact" || content.is_empty() { - None - } else { - let normalized_role = match role { - "user" | "assistant" => role.to_string(), - _ => "user".to_string(), - }; - Some(ClaudeMessage { - role: normalized_role, - content: content.to_string(), - }) - } - }) - .collect() + for attempt in 0..=MAX_RETRIES { + if attempt > 0 { + let delay = Self::get_retry_delay(attempt - 1); + trace!( + "CLAUDE Retry {}/{} in {:?}", + attempt, MAX_RETRIES, delay + ); + tokio::time::sleep(delay).await; } - } else { - vec![ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }] - }; - if claude_messages.is_empty() && !prompt.is_empty() { - claude_messages.push(ClaudeMessage { - role: "user".to_string(), - content: prompt.to_string(), - }); - } - - let system_prompt: Option = if messages.is_array() { - messages - .as_array() - .unwrap_or(&empty_vec) - .iter() - .filter(|m| m["role"].as_str() == Some("system")) - .map(|m| m["content"].as_str().unwrap_or("").to_string()) - .collect::>() - .join("\n\n") - .into() - } else { - None - }; - - let system = system_prompt.filter(|s| !s.is_empty()); - - if claude_messages.is_empty() { - return Err("Cannot send streaming request to Claude: no messages with content".into()); - } - - let request = ClaudeRequest { - model: model_name.to_string(), - max_tokens: 4096, - messages: claude_messages, - system, - stream: Some(true), - }; - - info!("Claude streaming request to {url}: model={model_name}"); - - let start_time = std::time::Instant::now(); - - let response = match self - .client - .post(&url) - .headers(headers) - .json(&request) - .send() - .await - { - Ok(resp) => resp, - Err(e) => { - let elapsed = start_time.elapsed(); - if e.is_timeout() { - warn!("Claude streaming request timed out after {elapsed:?}"); - return Err(format!("Claude streaming request timed out after {LLM_TIMEOUT_SECS}s").into()); + match self + .stream_single_attempt(prompt, messages, tx.clone(), model, key) + .await + { + Ok(()) => { + if attempt > 0 { + trace!("CLAUDE Success after {} attempts", attempt + 1); + } + return Ok(()); } - warn!("Claude streaming request failed after {elapsed:?}: {e}"); - return Err(e.into()); - } - }; + Err(e) => { + let err_msg = e.to_string(); + error!("CLAUDE Attempt {} failed: {}", attempt + 1, err_msg); - let status = response.status(); - if !status.is_success() { - let error_text = response.text().await.unwrap_or_default(); - warn!("Claude streaming API error ({status}): {error_text}"); - return Err(format!("Claude streaming API error ({status}): {error_text}").into()); - } - - info!("Claude streaming connection established in {:?}", start_time.elapsed()); - - let handler = get_handler(model_name); - let mut stream = response.bytes_stream(); - let mut total_chunks = 0; - - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - let chunk_str = String::from_utf8_lossy(&chunk); - - for line in chunk_str.lines() { - let line = line.trim(); - - if line.starts_with("data: ") { - let data = &line[6..]; - - if data == "[DONE]" { - break; + if attempt < MAX_RETRIES && Self::is_retryable_error(&err_msg, None) { + last_error = Some(e); + continue; } - if let Ok(event) = serde_json::from_str::(data) { - if event.event_type == "content_block_delta" { - if let Some(delta) = event.delta { - if delta.delta_type == "text_delta" && !delta.text.is_empty() { - let processed = handler.process_content(&delta.text); - if !processed.is_empty() { - let _ = tx.send(processed).await; - total_chunks += 1; - } - } - } - } - } + return Err(e); } } } - info!( - "Claude streaming completed in {:?}, chunks={}", - start_time.elapsed(), - total_chunks - ); - - Ok(()) + Err(last_error.unwrap_or_else(|| "Max retries exceeded".into())) } async fn cancel_job( @@ -525,7 +745,6 @@ mod tests { "https://api.anthropic.com".to_string(), Some("claude-3-opus".to_string()), ); - assert!(!client.is_azure); assert_eq!(client.deployment_name, "claude-3-opus"); } @@ -535,30 +754,9 @@ mod tests { "https://myendpoint.openai.azure.com/anthropic".to_string(), "claude-opus-4-5".to_string(), ); - assert!(client.is_azure); assert_eq!(client.deployment_name, "claude-opus-4-5"); } - #[test] - fn test_build_url_azure() { - let client = ClaudeClient::azure( - "https://myendpoint.openai.azure.com/anthropic".to_string(), - "claude-opus-4-5".to_string(), - ); - let url = client.build_url(); - assert!(url.contains("/v1/messages")); - } - - #[test] - fn test_build_url_anthropic() { - let client = ClaudeClient::new( - "https://api.anthropic.com".to_string(), - None, - ); - let url = client.build_url(); - assert_eq!(url, "https://api.anthropic.com/v1/messages"); - } - #[test] fn test_build_messages_empty() { let (system, messages) = ClaudeClient::build_messages("", "", &[]); @@ -568,12 +766,8 @@ mod tests { #[test] fn test_build_messages_with_system() { - let (system, messages) = ClaudeClient::build_messages( - "You are a helpful assistant.", - "", - &[], - ); - assert_eq!(system, Some("You are a helpful assistant.".to_string())); + let (system, messages) = ClaudeClient::build_messages("Be helpful", "", &[]); + assert_eq!(system, Some("Be helpful".to_string())); assert!(messages.is_empty()); } @@ -581,7 +775,7 @@ mod tests { fn test_build_messages_with_history() { let history = vec![ ("user".to_string(), "Hello".to_string()), - ("assistant".to_string(), "Hi there!".to_string()), + ("assistant".to_string(), "Hi there".to_string()), ]; let (system, messages) = ClaudeClient::build_messages("", "", &history); assert!(system.is_none()); @@ -589,55 +783,4 @@ mod tests { assert_eq!(messages[0].role, "user"); assert_eq!(messages[0].content, "Hello"); } - - #[test] - fn test_build_messages_full() { - let history = vec![ - ("user".to_string(), "What is 2+2?".to_string()), - ]; - let (system, messages) = ClaudeClient::build_messages( - "You are a math tutor.", - "Focus on step-by-step explanations.", - &history, - ); - assert!(system.is_some()); - assert!(system.unwrap().contains("math tutor")); - assert_eq!(messages.len(), 1); - } - - #[test] - fn test_claude_request_serialization() { - let request = ClaudeRequest { - model: "claude-3-opus".to_string(), - max_tokens: 4096, - messages: vec![ClaudeMessage { - role: "user".to_string(), - content: "Hello".to_string(), - }], - system: Some("Be helpful".to_string()), - stream: None, - }; - - let json = serde_json::to_string(&request).unwrap(); - assert!(json.contains("claude-3-opus")); - assert!(json.contains("max_tokens")); - assert!(json.contains("Be helpful")); - } - - #[test] - fn test_claude_response_deserialization() { - let json = r#"{ - "id": "msg_123", - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "Hello!"}], - "model": "claude-3-opus", - "stop_reason": "end_turn" - }"#; - - let response: ClaudeResponse = serde_json::from_str(json).unwrap(); - assert_eq!(response.id, "msg_123"); - assert_eq!(response.content.len(), 1); - assert_eq!(response.content[0].text, "Hello!"); - } } diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 469c7db2d..10771b8bf 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -353,13 +353,18 @@ pub async fn handle_task_get( State(state): State>, Path(id): Path, ) -> impl IntoResponse { + log::info!("[TASK_GET] *** Handler called for task: {} ***", id); + let conn = state.conn.clone(); let task_id = id.clone(); let result = tokio::task::spawn_blocking(move || { let mut db_conn = conn .get() - .map_err(|e| format!("DB connection error: {}", e))?; + .map_err(|e| { + log::error!("[TASK_GET] DB connection error: {}", e); + format!("DB connection error: {}", e) + })?; #[derive(Debug, QueryableByName, serde::Serialize)] struct AutoTaskRow { @@ -375,27 +380,46 @@ pub async fn handle_task_get( pub intent: Option, #[diesel(sql_type = diesel::sql_types::Nullable)] pub error: Option, - #[diesel(sql_type = diesel::sql_types::Float)] - pub progress: f32, + #[diesel(sql_type = diesel::sql_types::Double)] + pub progress: f64, + #[diesel(sql_type = diesel::sql_types::Integer)] + pub current_step: i32, + #[diesel(sql_type = diesel::sql_types::Integer)] + pub total_steps: i32, + #[diesel(sql_type = diesel::sql_types::Nullable)] + pub step_results: Option, #[diesel(sql_type = diesel::sql_types::Timestamptz)] pub created_at: chrono::DateTime, #[diesel(sql_type = diesel::sql_types::Nullable)] + pub started_at: Option>, + #[diesel(sql_type = diesel::sql_types::Nullable)] pub completed_at: Option>, } let parsed_uuid = match Uuid::parse_str(&task_id) { - Ok(u) => u, - Err(_) => return Err(format!("Invalid task ID: {}", task_id)), + Ok(u) => { + log::info!("[TASK_GET] Parsed UUID: {}", u); + u + } + Err(e) => { + log::error!("[TASK_GET] Invalid task ID '{}': {}", task_id, e); + return Err(format!("Invalid task ID: {}", task_id)); + } }; let task: Option = diesel::sql_query( - "SELECT id, title, status, priority, intent, error, progress, created_at, completed_at + "SELECT id, title, status, priority, intent, error, progress, current_step, total_steps, step_results, created_at, started_at, completed_at FROM auto_tasks WHERE id = $1 LIMIT 1" ) .bind::(parsed_uuid) .get_result(&mut db_conn) + .map_err(|e| { + log::error!("[TASK_GET] Query error for {}: {}", parsed_uuid, e); + e + }) .ok(); + log::info!("[TASK_GET] Query result for {}: found={}", parsed_uuid, task.is_some()); Ok::<_, String>(task) }) .await @@ -406,6 +430,7 @@ pub async fn handle_task_get( match result { Ok(Some(task)) => { + log::info!("[TASK_GET] Returning task: {} - {}", task.id, task.title); let status_class = match task.status.as_str() { "completed" | "done" => "completed", "running" | "pending" => "running", @@ -414,49 +439,333 @@ pub async fn handle_task_get( }; let progress_percent = (task.progress * 100.0) as u8; let created = task.created_at.format("%Y-%m-%d %H:%M").to_string(); - let completed = task.completed_at.map(|d| d.format("%Y-%m-%d %H:%M").to_string()).unwrap_or_default(); + + // Calculate runtime + let runtime = if let Some(started) = task.started_at { + let end_time = task.completed_at.unwrap_or_else(chrono::Utc::now); + let duration = end_time.signed_duration_since(started); + let mins = duration.num_minutes(); + let secs = duration.num_seconds() % 60; + if mins > 0 { + format!("{}m {}s", mins, secs) + } else { + format!("{}s", secs) + } + } else { + "Not started".to_string() + }; + + let task_id = task.id.to_string(); + let intent_text = task.intent.clone().unwrap_or_else(|| task.title.clone()); + let error_html = task.error.clone().map(|e| format!( + r#"
+ + {} +
"#, e + )).unwrap_or_default(); + + let current_step = task.current_step; + let total_steps = if task.total_steps > 0 { task.total_steps } else { 1 }; + + let status_label = match task.status.as_str() { + "completed" | "done" => "Completed", + "running" => "Running", + "pending" => "Pending", + "failed" | "error" => "Failed", + "paused" => "Paused", + "waiting_approval" => "Awaiting Approval", + _ => &task.status + }; + + // Build progress log HTML from step_results + let progress_log_html = build_progress_log_html(&task.step_results, current_step, total_steps); + + // Build terminal output from recent activity + let terminal_html = build_terminal_html(&task.step_results, &task.status); let html = format!(r#" -
-

{}

- {} -
-
-
Priority: {}
-
Created: {}
- {} -
-
-
Progress: {}%
-
-
+
+ +
+

{title}

+ {status_label} +
+ + +
+ +
+
+ + {title} +
+
+ Runtime: {runtime} + Step {current_step}/{total_steps} +
+
+ {error_html} +
+
+ + {status_label} (Step {current_step}/{total_steps}) + {priority} priority +
+
+
+ + +
+
+
+
+
+ Progress: {progress_percent}% +
+
+ + +
+ +
+ {progress_log_html} +
+
+ + +
+
+ +
+ Step: {current_step} of {total_steps} +
+
+
+ {terminal_html} +
+ +
+ + +
+ +

{intent_text}

+
+ + +
+ + +
- {} - {} "#, - task.title, - status_class, - task.status, - task.priority, - created, - if !completed.is_empty() { format!(r#"
Completed: {}
"#, completed) } else { String::new() }, - progress_percent, - progress_percent, - task.intent.map(|i| format!(r#"

Intent

{}

"#, i)).unwrap_or_default(), - task.error.map(|e| format!(r#"

Error

{}

"#, e)).unwrap_or_default() + task_id = task_id, + title = task.title, + status_class = status_class, + status_label = status_label, + runtime = runtime, + current_step = current_step, + total_steps = total_steps, + error_html = error_html, + status_indicator = if task.status == "running" { "active" } else { "" }, + priority = task.priority, + progress_percent = progress_percent, + progress_log_html = progress_log_html, + terminal_active = if task.status == "running" { "active" } else { "" }, + terminal_html = terminal_html, + created = created, + intent_text = intent_text, ); (StatusCode::OK, axum::response::Html(html)).into_response() } Ok(None) => { + log::warn!("[TASK_GET] Task not found: {}", id); (StatusCode::NOT_FOUND, axum::response::Html("
Task not found
".to_string())).into_response() } Err(e) => { + log::error!("[TASK_GET] Error fetching task {}: {}", id, e); (StatusCode::INTERNAL_SERVER_ERROR, axum::response::Html(format!("
{}
", e))).into_response() } } } +/// Build HTML for the progress log section from step_results JSON +fn build_progress_log_html(step_results: &Option, current_step: i32, total_steps: i32) -> String { + let mut html = String::new(); + + if let Some(serde_json::Value::Array(steps)) = step_results { + if steps.is_empty() { + // No steps yet - show current status + html.push_str(&format!(r#" +
+
+ Task Execution + Step {}/{} + In Progress +
+
+
+ + Waiting for execution steps... +
+
+
+ "#, current_step, total_steps)); + } else { + // Group steps and show real data + for (idx, step) in steps.iter().enumerate() { + let step_name = step.get("step_name") + .and_then(|v| v.as_str()) + .unwrap_or("Step"); + let step_status = step.get("status") + .and_then(|v| v.as_str()) + .unwrap_or("pending"); + let step_order = step.get("step_order") + .and_then(|v| v.as_i64()) + .unwrap_or((idx + 1) as i64); + let duration_ms = step.get("duration_ms") + .and_then(|v| v.as_i64()); + + let status_class = match step_status { + "completed" | "Completed" => "completed", + "running" | "Running" => "running", + "failed" | "Failed" => "failed", + _ => "pending" + }; + + let duration_str = duration_ms.map(|ms| { + if ms > 60000 { + format!("{}m {}s", ms / 60000, (ms % 60000) / 1000) + } else if ms > 1000 { + format!("{}s", ms / 1000) + } else { + format!("{}ms", ms) + } + }).unwrap_or_else(|| "--".to_string()); + + html.push_str(&format!(r#" +
+ + {step_name} + Step {step_order}/{total_steps} + {step_status} + Duration: {duration_str} +
+ "#, + status_class = status_class, + step_name = step_name, + step_order = step_order, + total_steps = total_steps, + step_status = step_status, + duration_str = duration_str, + )); + + // Show logs if present + if let Some(serde_json::Value::Array(logs)) = step.get("logs") { + for log_entry in logs.iter().take(3) { + let msg = log_entry.get("message") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if !msg.is_empty() { + html.push_str(&format!(r#" +
+ + {msg} +
+ "#, status_class = status_class, msg = msg)); + } + } + } + } + } + } else { + // No step results - show placeholder based on current progress + html.push_str(&format!(r#" +
+
+ Task Progress + Step {}/{} + Pending +
+
+
+ + No execution steps recorded yet +
+
+
+ "#, current_step, total_steps)); + } + + html +} + +/// Build HTML for terminal output from step results +fn build_terminal_html(step_results: &Option, status: &str) -> String { + let mut html = String::new(); + let mut lines: Vec = Vec::new(); + + if let Some(serde_json::Value::Array(steps)) = step_results { + for step in steps.iter() { + // Add step name as a line + if let Some(step_name) = step.get("step_name").and_then(|v| v.as_str()) { + let step_status = step.get("status").and_then(|v| v.as_str()).unwrap_or(""); + let prefix = match step_status { + "completed" | "Completed" => "✓", + "running" | "Running" => "►", + "failed" | "Failed" => "✗", + _ => "○" + }; + lines.push(format!("{} {}", prefix, step_name)); + } + + // Add log messages + if let Some(serde_json::Value::Array(logs)) = step.get("logs") { + for log_entry in logs.iter() { + if let Some(msg) = log_entry.get("message").and_then(|v| v.as_str()) { + lines.push(format!(" {}", msg)); + } + } + } + } + } + + if lines.is_empty() { + // Show default message based on status + let default_msg = match status { + "running" => "Task is running...", + "pending" => "Waiting to start...", + "completed" | "done" => "Task completed successfully", + "failed" | "error" => "Task failed - check error details", + "paused" => "Task is paused", + _ => "Initializing..." + }; + html.push_str(&format!(r#"
{}
"#, default_msg)); + } else { + // Show last 10 lines, with the last one marked as current + let start = if lines.len() > 10 { lines.len() - 10 } else { 0 }; + for (idx, line) in lines[start..].iter().enumerate() { + let is_last = idx == lines[start..].len() - 1; + let class = if is_last && status == "running" { "terminal-line current" } else { "terminal-line" }; + html.push_str(&format!(r#"
{}
"#, class, line)); + } + } + + html +} + impl TaskEngine { pub async fn create_task_with_db( &self, @@ -1295,39 +1604,31 @@ pub async fn handle_task_set_dependencies( } pub fn configure_task_routes() -> Router> { + log::info!("[ROUTES] Registering task routes with /api/tasks/:id pattern"); + Router::new() + // Task list and create .route( - ApiUrls::TASKS, + "/api/tasks", post(handle_task_create).get(handle_task_list_htmx), ) + // Specific routes MUST come before parameterized route .route("/api/tasks/stats", get(handle_task_stats_htmx)) .route("/api/tasks/stats/json", get(handle_task_stats)) .route("/api/tasks/time-saved", get(handle_time_saved)) .route("/api/tasks/completed", delete(handle_clear_completed)) + // Parameterized task routes - use :id for axum path params .route( - &ApiUrls::TASK_BY_ID.replace(":id", "{id}"), - get(handle_task_get).put(handle_task_update), - ) - .route( - &ApiUrls::TASK_BY_ID.replace(":id", "{id}"), - delete(handle_task_delete).patch(handle_task_patch), - ) - .route( - &ApiUrls::TASK_ASSIGN.replace(":id", "{id}"), - post(handle_task_assign), - ) - .route( - &ApiUrls::TASK_STATUS.replace(":id", "{id}"), - put(handle_task_status_update), - ) - .route( - &ApiUrls::TASK_PRIORITY.replace(":id", "{id}"), - put(handle_task_priority_set), - ) - .route( - "/api/tasks/{id}/dependencies", - put(handle_task_set_dependencies), + "/api/tasks/:id", + get(handle_task_get) + .put(handle_task_update) + .delete(handle_task_delete) + .patch(handle_task_patch), ) + .route("/api/tasks/:id/assign", post(handle_task_assign)) + .route("/api/tasks/:id/status", put(handle_task_status_update)) + .route("/api/tasks/:id/priority", put(handle_task_priority_set)) + .route("/api/tasks/:id/dependencies", put(handle_task_set_dependencies)) } pub fn configure(router: Router>) -> Router> {