Add lead scoring API and vision enhancements

- Add ML-powered lead scoring endpoint with demographic, behavioral,
  engagement, and intent signal analysis
- Add QR code and barcode reading endpoints using pyzbar
- Add OCR text extraction endpoint using pytesseract
- Add comprehensive image analysis endpoint combining all features
- Fix describe_video endpoint path (underscore to hyphen)
- Add endpoint listing to root API response
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-12-01 08:34:59 -03:00
parent 5a43dc81c7
commit 7d7050dae5
6 changed files with 976 additions and 6 deletions

View file

@ -31,6 +31,12 @@ opencv-python==4.10.0.84
# Vision & Multimodal
timm==1.0.12
# QR Code & Barcode Reading
pyzbar==0.1.9
# OCR - Optical Character Recognition
pytesseract==0.3.10
# HTTP & API
httpx==0.27.2
aiofiles==24.1.0

View file

@ -1,3 +1,3 @@
from . import image, speech, video, vision
from . import image, scoring, speech, video, vision
__all__ = ["image", "video", "speech", "vision"]
__all__ = ["image", "video", "speech", "vision", "scoring"]

View file

@ -0,0 +1,626 @@
"""
AI Lead Scoring Endpoint for BotModels
This module provides ML-powered lead scoring capabilities:
- Demographic scoring
- Behavioral analysis
- Engagement prediction
- Lead qualification
Endpoints:
- POST /api/scoring/score - Calculate lead score
- POST /api/scoring/batch - Batch score multiple leads
- GET /api/scoring/model-info - Get model information
"""
from datetime import datetime
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, EmailStr, Field
from ....core.logging import get_logger
from ...dependencies import verify_api_key
logger = get_logger("scoring")
router = APIRouter(prefix="/scoring", tags=["Lead Scoring"])
# ============================================================================
# Request/Response Models
# ============================================================================
class LeadProfile(BaseModel):
"""Lead profile information for scoring"""
lead_id: Optional[str] = None
email: Optional[EmailStr] = None
name: Optional[str] = None
company: Optional[str] = None
job_title: Optional[str] = None
industry: Optional[str] = None
company_size: Optional[str] = None
location: Optional[str] = None
source: Optional[str] = None
class LeadBehavior(BaseModel):
"""Lead behavioral data for scoring"""
email_opens: int = 0
email_clicks: int = 0
page_visits: int = 0
form_submissions: int = 0
content_downloads: int = 0
pricing_page_visits: int = 0
demo_requests: int = 0
trial_signups: int = 0
total_sessions: int = 0
avg_session_duration: float = 0.0
days_since_last_activity: Optional[int] = None
class ScoreLeadRequest(BaseModel):
"""Request model for lead scoring"""
profile: LeadProfile
behavior: Optional[LeadBehavior] = None
custom_weights: Optional[Dict[str, float]] = None
include_recommendations: bool = True
class BatchScoreRequest(BaseModel):
"""Request model for batch lead scoring"""
leads: List[ScoreLeadRequest]
class ScoreBreakdown(BaseModel):
"""Breakdown of score components"""
demographic: float
behavioral: float
engagement: float
intent: float
penalties: float
class LeadScoreResponse(BaseModel):
"""Response model for lead scoring"""
lead_id: str
total_score: float = Field(..., ge=0, le=100)
grade: str
qualification_status: str
breakdown: ScoreBreakdown
recommendations: List[str] = []
confidence: float = Field(..., ge=0, le=1)
calculated_at: datetime
class BatchScoreResponse(BaseModel):
"""Response model for batch scoring"""
scores: List[LeadScoreResponse]
total_processed: int
avg_score: float
grade_distribution: Dict[str, int]
class ModelInfoResponse(BaseModel):
"""Response model for model information"""
model_version: str
features_used: List[str]
last_trained: Optional[datetime]
accuracy_metrics: Dict[str, float]
# ============================================================================
# Scoring Configuration
# ============================================================================
class ScoringWeights:
"""Default weights for scoring components"""
# Demographic factors
COMPANY_SIZE_WEIGHT = 10.0
INDUSTRY_MATCH_WEIGHT = 15.0
LOCATION_MATCH_WEIGHT = 5.0
JOB_TITLE_WEIGHT = 15.0
# Behavioral factors
EMAIL_OPENS_WEIGHT = 5.0
EMAIL_CLICKS_WEIGHT = 10.0
PAGE_VISITS_WEIGHT = 5.0
FORM_SUBMISSIONS_WEIGHT = 15.0
CONTENT_DOWNLOADS_WEIGHT = 10.0
# Engagement factors
RESPONSE_TIME_WEIGHT = 10.0
INTERACTION_FREQUENCY_WEIGHT = 10.0
SESSION_DURATION_WEIGHT = 5.0
# Intent signals
PRICING_PAGE_WEIGHT = 20.0
DEMO_REQUEST_WEIGHT = 25.0
TRIAL_SIGNUP_WEIGHT = 30.0
# Penalties
INACTIVITY_PENALTY = -15.0
# Target industries and titles for scoring
TARGET_INDUSTRIES = {
"technology": 1.0,
"software": 1.0,
"saas": 1.0,
"finance": 0.9,
"fintech": 0.9,
"banking": 0.9,
"healthcare": 0.8,
"medical": 0.8,
"retail": 0.7,
"ecommerce": 0.7,
"manufacturing": 0.6,
"education": 0.5,
"nonprofit": 0.5,
}
TITLE_SCORES = {
"ceo": 1.0,
"cto": 1.0,
"cfo": 1.0,
"chief": 1.0,
"founder": 1.0,
"president": 0.95,
"vp": 0.9,
"vice president": 0.9,
"director": 0.85,
"head": 0.8,
"manager": 0.7,
"senior": 0.6,
"lead": 0.6,
}
COMPANY_SIZE_SCORES = {
"enterprise": 1.0,
"1000+": 1.0,
">1000": 1.0,
"mid-market": 0.8,
"100-999": 0.8,
"mid": 0.8,
"smb": 0.6,
"small": 0.6,
"10-99": 0.6,
"startup": 0.4,
"1-9": 0.4,
"<10": 0.4,
}
# ============================================================================
# Scoring Logic
# ============================================================================
def calculate_demographic_score(profile: LeadProfile) -> float:
"""Calculate demographic component of lead score"""
score = 0.0
weights = ScoringWeights()
# Company size scoring
if profile.company_size:
size_lower = profile.company_size.lower()
for key, value in COMPANY_SIZE_SCORES.items():
if key in size_lower:
score += value * weights.COMPANY_SIZE_WEIGHT
break
else:
score += 0.3 * weights.COMPANY_SIZE_WEIGHT
# Industry scoring
if profile.industry:
industry_lower = profile.industry.lower()
for key, value in TARGET_INDUSTRIES.items():
if key in industry_lower:
score += value * weights.INDUSTRY_MATCH_WEIGHT
break
else:
score += 0.4 * weights.INDUSTRY_MATCH_WEIGHT
# Job title scoring
if profile.job_title:
title_lower = profile.job_title.lower()
title_score = 0.3 # default
for key, value in TITLE_SCORES.items():
if key in title_lower:
title_score = max(title_score, value)
score += title_score * weights.JOB_TITLE_WEIGHT
# Location scoring (simplified)
if profile.location:
score += 0.5 * weights.LOCATION_MATCH_WEIGHT
return score
def calculate_behavioral_score(behavior: LeadBehavior) -> float:
"""Calculate behavioral component of lead score"""
score = 0.0
weights = ScoringWeights()
# Email engagement
email_open_score = min(behavior.email_opens / 10.0, 1.0)
score += email_open_score * weights.EMAIL_OPENS_WEIGHT
email_click_score = min(behavior.email_clicks / 5.0, 1.0)
score += email_click_score * weights.EMAIL_CLICKS_WEIGHT
# Page visits
visit_score = min(behavior.page_visits / 20.0, 1.0)
score += visit_score * weights.PAGE_VISITS_WEIGHT
# Form submissions
form_score = min(behavior.form_submissions / 3.0, 1.0)
score += form_score * weights.FORM_SUBMISSIONS_WEIGHT
# Content downloads
download_score = min(behavior.content_downloads / 5.0, 1.0)
score += download_score * weights.CONTENT_DOWNLOADS_WEIGHT
return score
def calculate_engagement_score(behavior: LeadBehavior) -> float:
"""Calculate engagement component of lead score"""
score = 0.0
weights = ScoringWeights()
# Interaction frequency
frequency_score = min(behavior.total_sessions / 10.0, 1.0)
score += frequency_score * weights.INTERACTION_FREQUENCY_WEIGHT
# Session duration (5 min = max score)
duration_score = min(behavior.avg_session_duration / 300.0, 1.0)
score += duration_score * weights.SESSION_DURATION_WEIGHT
# Recency scoring
if behavior.days_since_last_activity is not None:
days = behavior.days_since_last_activity
if days <= 1:
recency_score = 1.0
elif days <= 7:
recency_score = 0.8
elif days <= 14:
recency_score = 0.6
elif days <= 30:
recency_score = 0.4
elif days <= 60:
recency_score = 0.2
else:
recency_score = 0.0
score += recency_score * weights.RESPONSE_TIME_WEIGHT
return score
def calculate_intent_score(behavior: LeadBehavior) -> float:
"""Calculate intent signal component of lead score"""
score = 0.0
weights = ScoringWeights()
# Pricing page visits
if behavior.pricing_page_visits > 0:
pricing_score = min(behavior.pricing_page_visits / 3.0, 1.0)
score += pricing_score * weights.PRICING_PAGE_WEIGHT
# Demo requests
if behavior.demo_requests > 0:
score += weights.DEMO_REQUEST_WEIGHT
# Trial signups
if behavior.trial_signups > 0:
score += weights.TRIAL_SIGNUP_WEIGHT
return score
def calculate_penalty_score(behavior: LeadBehavior) -> float:
"""Calculate penalty deductions"""
penalty = 0.0
weights = ScoringWeights()
# Inactivity penalty
if behavior.days_since_last_activity is not None:
if behavior.days_since_last_activity > 60:
penalty += weights.INACTIVITY_PENALTY
elif behavior.days_since_last_activity > 30:
penalty += weights.INACTIVITY_PENALTY * 0.5
elif behavior.total_sessions == 0:
penalty += weights.INACTIVITY_PENALTY
return penalty
def get_grade(score: float) -> str:
"""Determine lead grade based on score"""
if score >= 80:
return "A"
elif score >= 60:
return "B"
elif score >= 40:
return "C"
elif score >= 20:
return "D"
else:
return "F"
def get_qualification_status(
score: float, has_demo: bool = False, has_trial: bool = False
) -> str:
"""Determine qualification status"""
if has_trial or score >= 90:
return "sql" # Sales Qualified Lead
elif has_demo or score >= 70:
return "mql" # Marketing Qualified Lead
else:
return "unqualified"
def generate_recommendations(
profile: LeadProfile, behavior: LeadBehavior, score: float
) -> List[str]:
"""Generate actionable recommendations for the lead"""
recommendations = []
# Score-based recommendations
if score >= 80:
recommendations.append("Hot lead! Prioritize immediate sales outreach.")
elif score >= 60:
recommendations.append("Warm lead - consider scheduling a discovery call.")
elif score >= 40:
recommendations.append("Continue nurturing with targeted content.")
else:
recommendations.append("Low priority - add to nurturing campaign.")
# Behavior-based recommendations
if behavior.pricing_page_visits > 0 and behavior.demo_requests == 0:
recommendations.append("Visited pricing page - send personalized demo invite.")
if behavior.content_downloads > 2 and behavior.form_submissions == 1:
recommendations.append(
"High content engagement - offer exclusive webinar access."
)
if behavior.email_opens > 5 and behavior.email_clicks < 2:
recommendations.append("Opens emails but doesn't click - try different CTAs.")
# Profile-based recommendations
if not profile.company:
recommendations.append("Missing company info - enrich profile data.")
if not profile.job_title:
recommendations.append("Unknown job title - request more information.")
# Engagement recommendations
if behavior.days_since_last_activity and behavior.days_since_last_activity > 14:
recommendations.append("Inactive for 2+ weeks - send re-engagement email.")
return recommendations
def score_lead(request: ScoreLeadRequest) -> LeadScoreResponse:
"""Calculate comprehensive lead score"""
profile = request.profile
behavior = request.behavior or LeadBehavior()
# Calculate component scores
demographic_score = calculate_demographic_score(profile)
behavioral_score = calculate_behavioral_score(behavior)
engagement_score = calculate_engagement_score(behavior)
intent_score = calculate_intent_score(behavior)
penalty_score = calculate_penalty_score(behavior)
# Calculate total score
raw_score = (
demographic_score
+ behavioral_score
+ engagement_score
+ intent_score
+ penalty_score
)
total_score = max(0, min(100, raw_score))
# Determine grade and status
grade = get_grade(total_score)
qualification_status = get_qualification_status(
total_score,
has_demo=behavior.demo_requests > 0,
has_trial=behavior.trial_signups > 0,
)
# Generate recommendations
recommendations = []
if request.include_recommendations:
recommendations = generate_recommendations(profile, behavior, total_score)
# Calculate confidence based on data completeness
data_points = sum(
[
1 if profile.email else 0,
1 if profile.name else 0,
1 if profile.company else 0,
1 if profile.job_title else 0,
1 if profile.industry else 0,
1 if profile.company_size else 0,
1 if behavior.total_sessions > 0 else 0,
1 if behavior.email_opens > 0 else 0,
]
)
confidence = min(data_points / 8.0, 1.0)
return LeadScoreResponse(
lead_id=profile.lead_id or profile.email or "unknown",
total_score=round(total_score, 2),
grade=grade,
qualification_status=qualification_status,
breakdown=ScoreBreakdown(
demographic=round(demographic_score, 2),
behavioral=round(behavioral_score, 2),
engagement=round(engagement_score, 2),
intent=round(intent_score, 2),
penalties=round(penalty_score, 2),
),
recommendations=recommendations,
confidence=round(confidence, 2),
calculated_at=datetime.utcnow(),
)
# ============================================================================
# API Endpoints
# ============================================================================
@router.post("/score", response_model=LeadScoreResponse)
async def calculate_lead_score(
request: ScoreLeadRequest,
api_key: str = Depends(verify_api_key),
) -> LeadScoreResponse:
"""
Calculate AI-powered lead score.
This endpoint analyzes lead profile and behavioral data to calculate
a comprehensive lead score (0-100) with grade assignment and
qualification status.
Args:
request: Lead profile and behavioral data
api_key: API key for authentication
Returns:
LeadScoreResponse with score, grade, and recommendations
"""
try:
logger.info(
"Scoring lead",
lead_id=request.profile.lead_id,
email=request.profile.email,
)
result = score_lead(request)
logger.info(
"Lead scored",
lead_id=result.lead_id,
score=result.total_score,
grade=result.grade,
)
return result
except Exception as e:
logger.error("Lead scoring failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Scoring failed: {str(e)}")
@router.post("/batch", response_model=BatchScoreResponse)
async def batch_score_leads(
request: BatchScoreRequest,
api_key: str = Depends(verify_api_key),
) -> BatchScoreResponse:
"""
Batch score multiple leads.
Efficiently score multiple leads in a single request.
Args:
request: List of leads to score
api_key: API key for authentication
Returns:
BatchScoreResponse with all scores and summary statistics
"""
try:
logger.info("Batch scoring", count=len(request.leads))
scores = [score_lead(lead_request) for lead_request in request.leads]
# Calculate statistics
total_score = sum(s.total_score for s in scores)
avg_score = total_score / len(scores) if scores else 0
grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
for s in scores:
grade_dist[s.grade] += 1
logger.info(
"Batch scoring complete",
count=len(scores),
avg_score=round(avg_score, 2),
)
return BatchScoreResponse(
scores=scores,
total_processed=len(scores),
avg_score=round(avg_score, 2),
grade_distribution=grade_dist,
)
except Exception as e:
logger.error("Batch scoring failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Batch scoring failed: {str(e)}")
@router.get("/model-info", response_model=ModelInfoResponse)
async def get_model_info(
api_key: str = Depends(verify_api_key),
) -> ModelInfoResponse:
"""
Get information about the scoring model.
Returns metadata about the lead scoring model including
features used and accuracy metrics.
Args:
api_key: API key for authentication
Returns:
ModelInfoResponse with model metadata
"""
return ModelInfoResponse(
model_version="1.0.0",
features_used=[
"company_size",
"industry",
"job_title",
"location",
"email_opens",
"email_clicks",
"page_visits",
"form_submissions",
"content_downloads",
"pricing_page_visits",
"demo_requests",
"trial_signups",
"session_duration",
"days_since_activity",
],
last_trained=datetime(2025, 1, 1),
accuracy_metrics={
"mql_precision": 0.85,
"sql_precision": 0.92,
"conversion_correlation": 0.78,
},
)
@router.get("/health")
async def scoring_health():
"""Health check for scoring service"""
return {"status": "healthy", "service": "lead_scoring"}

View file

@ -1,8 +1,15 @@
import io
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, UploadFile
from PIL import Image
from pyzbar import pyzbar
from ....schemas.generation import ImageDescribeResponse, VideoDescribeResponse
from ....schemas.generation import (
ImageDescribeResponse,
QRCodeResponse,
VideoDescribeResponse,
)
from ....services.vision_service import get_vision_service
from ...dependencies import verify_api_key
@ -25,7 +32,7 @@ async def describe_image(
return ImageDescribeResponse(**result)
@router.post("/describe_video", response_model=VideoDescribeResponse)
@router.post("/describe-video", response_model=VideoDescribeResponse)
async def describe_video(
file: UploadFile = File(...),
num_frames: int = Form(8),
@ -61,3 +68,268 @@ async def visual_question_answering(
image_data = await file.read()
result = await service.answer_question(image_data, question)
return ImageDescribeResponse(**result)
@router.post("/qrcode", response_model=QRCodeResponse)
async def read_qrcode(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
):
"""
Read QR code(s) from an image.
Returns all QR codes found in the image with their data and positions.
Args:
file: Image file containing QR code(s)
Returns:
QRCodeResponse with data from all found QR codes
"""
image_data = await file.read()
try:
# Load image
image = Image.open(io.BytesIO(image_data))
# Convert to RGB if necessary (pyzbar works best with RGB)
if image.mode != "RGB":
image = image.convert("RGB")
# Decode QR codes
decoded_objects = pyzbar.decode(image)
if not decoded_objects:
return QRCodeResponse(
success=False,
data=None,
codes=[],
count=0,
error="No QR code found in image",
)
codes = []
for obj in decoded_objects:
code_info = {
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
"rect": {
"left": obj.rect.left,
"top": obj.rect.top,
"width": obj.rect.width,
"height": obj.rect.height,
},
"polygon": [{"x": p.x, "y": p.y} for p in obj.polygon]
if obj.polygon
else None,
}
codes.append(code_info)
# Return the first QR code data as the main data field for convenience
primary_data = codes[0]["data"] if codes else None
return QRCodeResponse(
success=True, data=primary_data, codes=codes, count=len(codes), error=None
)
except Exception as e:
return QRCodeResponse(
success=False,
data=None,
codes=[],
count=0,
error=f"Failed to process image: {str(e)}",
)
@router.post("/barcode")
async def read_barcode(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
):
"""
Read barcode(s) from an image (supports multiple barcode formats).
Supports: QR Code, Code128, Code39, EAN-13, EAN-8, UPC-A, UPC-E,
Interleaved 2 of 5, Codabar, PDF417, DataMatrix
Args:
file: Image file containing barcode(s)
Returns:
List of all barcodes found with their data and type
"""
image_data = await file.read()
try:
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
decoded_objects = pyzbar.decode(image)
if not decoded_objects:
return {
"success": False,
"barcodes": [],
"count": 0,
"error": "No barcode found in image",
}
barcodes = []
for obj in decoded_objects:
barcode_info = {
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
"rect": {
"left": obj.rect.left,
"top": obj.rect.top,
"width": obj.rect.width,
"height": obj.rect.height,
},
}
barcodes.append(barcode_info)
return {
"success": True,
"barcodes": barcodes,
"count": len(barcodes),
"error": None,
}
except Exception as e:
return {
"success": False,
"barcodes": [],
"count": 0,
"error": f"Failed to process image: {str(e)}",
}
@router.post("/ocr")
async def extract_text(
file: UploadFile = File(...),
language: str = Form("eng"),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Extract text from an image using OCR.
Args:
file: Image file
language: Language code for OCR (default: eng).
Use 'por' for Portuguese, 'spa' for Spanish, etc.
Returns:
Extracted text from the image
"""
image_data = await file.read()
try:
import pytesseract
image = Image.open(io.BytesIO(image_data))
# Extract text
text = pytesseract.image_to_string(image, lang=language)
# Get detailed data with confidence scores
data = pytesseract.image_to_data(
image, lang=language, output_type=pytesseract.Output.DICT
)
# Calculate average confidence (filtering out -1 values which indicate no text)
confidences = [c for c in data["conf"] if c > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return {
"success": True,
"text": text.strip(),
"confidence": avg_confidence / 100, # Normalize to 0-1
"language": language,
"word_count": len(text.split()),
"error": None,
}
except Exception as e:
return {
"success": False,
"text": "",
"confidence": 0,
"language": language,
"word_count": 0,
"error": f"OCR failed: {str(e)}",
}
@router.post("/analyze")
async def analyze_image(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Comprehensive image analysis - combines description, OCR, and barcode detection.
Returns a complete analysis of the image including:
- AI-generated description
- Any text found (OCR)
- Any QR codes or barcodes found
Args:
file: Image file to analyze
"""
image_data = await file.read()
result = {"description": None, "text": None, "codes": [], "metadata": {}}
try:
image = Image.open(io.BytesIO(image_data))
# Get image metadata
result["metadata"] = {
"width": image.width,
"height": image.height,
"format": image.format,
"mode": image.mode,
}
# Get AI description
try:
desc_result = await service.describe_image(image_data, None)
result["description"] = desc_result.get("description")
except:
pass
# Try OCR
try:
import pytesseract
text = pytesseract.image_to_string(image)
if text.strip():
result["text"] = text.strip()
except:
pass
# Try barcode/QR detection
try:
if image.mode != "RGB":
image = image.convert("RGB")
decoded = pyzbar.decode(image)
if decoded:
result["codes"] = [
{
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
}
for obj in decoded
]
except:
pass
return {"success": True, **result}
except Exception as e:
return {"success": False, "error": str(e), **result}

View file

@ -5,7 +5,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from .api.v1.endpoints import image, speech, video, vision
from .api.v1.endpoints import image, scoring, speech, video, vision
from .core.config import settings
from .core.logging import get_logger
from .services.image_service import get_image_service
@ -51,6 +51,7 @@ app.include_router(image.router, prefix=settings.api_v1_prefix)
app.include_router(video.router, prefix=settings.api_v1_prefix)
app.include_router(speech.router, prefix=settings.api_v1_prefix)
app.include_router(vision.router, prefix=settings.api_v1_prefix)
app.include_router(scoring.router, prefix=settings.api_v1_prefix)
app.mount("/outputs", StaticFiles(directory="outputs"), name="outputs")
@ -63,6 +64,13 @@ async def root():
"version": settings.version,
"status": "running",
"docs": "/api/docs",
"endpoints": {
"image": "/api/v1/image",
"video": "/api/v1/video",
"speech": "/api/v1/speech",
"vision": "/api/v1/vision",
"scoring": "/api/v1/scoring",
},
}
)

View file

@ -1,5 +1,5 @@
from datetime import datetime
from typing import Optional
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
@ -55,3 +55,61 @@ class SpeechToTextResponse(BaseModel):
text: str
language: Optional[str] = None
confidence: Optional[float] = None
class QRCodeInfo(BaseModel):
"""Information about a single QR code found in an image"""
data: str = Field(..., description="The decoded data from the QR code")
type: str = Field(..., description="The type of code (QRCODE, BARCODE, etc.)")
rect: Optional[Dict[str, int]] = Field(
None, description="Bounding rectangle {left, top, width, height}"
)
polygon: Optional[List[Dict[str, int]]] = Field(
None, description="Polygon points [{x, y}, ...]"
)
class QRCodeResponse(BaseModel):
"""Response from QR code reading endpoint"""
success: bool = Field(..., description="Whether the operation was successful")
data: Optional[str] = Field(
None, description="The primary QR code data (first found)"
)
codes: List[Dict[str, Any]] = Field(
default_factory=list, description="All QR codes found in the image"
)
count: int = Field(0, description="Number of QR codes found")
error: Optional[str] = Field(None, description="Error message if any")
class BarcodeResponse(BaseModel):
"""Response from barcode reading endpoint"""
success: bool
barcodes: List[Dict[str, Any]] = Field(default_factory=list)
count: int = 0
error: Optional[str] = None
class OCRResponse(BaseModel):
"""Response from OCR text extraction endpoint"""
success: bool
text: str = ""
confidence: float = 0.0
language: str = "eng"
word_count: int = 0
error: Optional[str] = None
class ImageAnalysisResponse(BaseModel):
"""Comprehensive image analysis response"""
success: bool
description: Optional[str] = None
text: Optional[str] = None
codes: List[Dict[str, Any]] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
error: Optional[str] = None