Compare commits

..

No commits in common. "6125995fe5a8a115083fda16ff5639c2f4946f97" and "4eed996de4c8694625940bc1e29cd2674cae8a34" have entirely different histories.

26 changed files with 27 additions and 2599 deletions

View file

@ -1,38 +0,0 @@
# Server Configuration
ENV=development
HOST=0.0.0.0
PORT=8085
LOG_LEVEL=INFO
# Security - IMPORTANT: Change this in production!
API_KEY=change-me-in-production
# Model Paths
# These can be local paths or model identifiers for HuggingFace Hub
IMAGE_MODEL_PATH=./models/stable-diffusion-v1-5
VIDEO_MODEL_PATH=./models/zeroscope-v2
SPEECH_MODEL_PATH=./models/tts
VISION_MODEL_PATH=./models/blip2
WHISPER_MODEL_PATH=./models/whisper
# Device Configuration
# Options: cuda, cpu, mps (for Apple Silicon)
DEVICE=cuda
# Image Generation Defaults
IMAGE_STEPS=4
IMAGE_WIDTH=512
IMAGE_HEIGHT=512
IMAGE_GPU_LAYERS=20
IMAGE_BATCH_SIZE=1
# Video Generation Defaults
VIDEO_FRAMES=24
VIDEO_FPS=8
VIDEO_WIDTH=320
VIDEO_HEIGHT=576
VIDEO_GPU_LAYERS=15
VIDEO_BATCH_SIZE=1
# Storage
OUTPUT_DIR=./outputs

View file

@ -1,43 +0,0 @@
name: GBCI
on:
push:
branches: ["main"]
pull_request:
branches: ["main"]
jobs:
build:
runs-on: gbo
steps:
- name: Disable SSL verification (temporary)
run: git config --global http.sslVerify false
- uses: actions/checkout@v4
- name: Set up Python
run: |
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
- name: Install dependencies
run: |
source venv/bin/activate
pip install -r requirements.txt
- name: Run tests
run: |
source venv/bin/activate
python -m pytest tests/ -v || true
- name: Deploy application
run: |
lxc exec bot:pragmatismo-system -- systemctl stop botmodels || true
sudo mkdir -p /opt/gbo/bin/botmodels
sudo cp -r ./* /opt/gbo/bin/botmodels/
sudo cp -r venv /opt/gbo/bin/botmodels/
lxc exec bot:pragmatismo-system -- systemctl start botmodels

338
README.md
View file

@ -1,326 +1,20 @@
# BotModels
A multimodal AI service for General Bots providing image, video, audio generation, and vision/captioning capabilities. Works as a companion service to botserver, similar to how llama.cpp provides LLM capabilities.
Models in Python for General Bots AI demands.
# Environment
1. Install Visual Studio Code (VSCode);
2. Install VSCode Extension: Azure Functions;
3. Install VSCode Extension: Azure Machine Learning;
4. Install NodeJS;
5. Run npm install -g azure-functions-core-tools@3 --unsafe-perm true.
# Libraries
- TensorFlow;
- SciKit-Learn;
- Pandas;
- NumPy.
![General Bots Models Services](https://raw.githubusercontent.com/GeneralBots/BotModels/master/BotModels.png)
## Features
- **Image Generation**: Generate images from text prompts using Stable Diffusion
- **Video Generation**: Create short videos from text descriptions using Zeroscope
- **Speech Synthesis**: Text-to-speech using Coqui TTS
- **Speech Recognition**: Audio transcription using OpenAI Whisper
- **Vision/Captioning**: Image and video description using BLIP2
## Quick Start
### Installation
```bash
# Clone the repository
cd botmodels
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
.\venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txt
```
### Configuration
Copy the example environment file and configure:
```bash
cp .env.example .env
```
Edit `.env` with your settings:
```env
HOST=0.0.0.0
PORT=8085
API_KEY=your-secret-key
DEVICE=cuda
IMAGE_MODEL_PATH=./models/stable-diffusion-v1-5
VIDEO_MODEL_PATH=./models/zeroscope-v2
VISION_MODEL_PATH=./models/blip2
```
### Running the Server
```bash
# Development mode
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --reload
# Production mode
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --workers 4
# With HTTPS (production)
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --ssl-keyfile key.pem --ssl-certfile cert.pem
```
## API Endpoints
All endpoints require the `X-API-Key` header for authentication.
### Image Generation
```http
POST /api/image/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "a cute cat playing with yarn",
"steps": 30,
"width": 512,
"height": 512,
"guidance_scale": 7.5,
"seed": 42
}
```
### Video Generation
```http
POST /api/video/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "a rocket launching into space",
"num_frames": 24,
"fps": 8,
"steps": 50
}
```
### Speech Generation (TTS)
```http
POST /api/speech/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "Hello, welcome to our service!",
"voice": "default",
"language": "en"
}
```
### Speech to Text
```http
POST /api/speech/totext
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <audio_file>
```
### Image Description
```http
POST /api/vision/describe
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <image_file>
prompt: "What is in this image?" (optional)
```
### Video Description
```http
POST /api/vision/describe_video
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <video_file>
num_frames: 8 (optional)
```
### Visual Question Answering
```http
POST /api/vision/vqa
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <image_file>
question: "How many people are in this image?"
```
### Health Check
```http
GET /api/health
```
## Integration with botserver
BotModels integrates with botserver through HTTPS, providing multimodal capabilities to BASIC scripts.
### botserver Configuration (config.csv)
```csv
key,value
botmodels-enabled,true
botmodels-host,0.0.0.0
botmodels-port,8085
botmodels-api-key,your-secret-key
botmodels-https,false
image-generator-model,../../../../data/diffusion/sd_turbo_f16.gguf
image-generator-steps,4
image-generator-width,512
image-generator-height,512
video-generator-model,../../../../data/diffusion/zeroscope_v2_576w
video-generator-frames,24
video-generator-fps,8
```
### BASIC Script Keywords
Once configured, these keywords are available in BASIC:
```basic
// Generate an image
file = IMAGE "a beautiful sunset over mountains"
SEND FILE TO user, file
// Generate a video
video = VIDEO "waves crashing on a beach"
SEND FILE TO user, video
// Generate speech
audio = AUDIO "Welcome to General Bots!"
SEND FILE TO user, audio
// Get image/video description
caption = SEE "/path/to/image.jpg"
TALK caption
```
## Architecture
```
┌─────────────┐ HTTPS ┌─────────────┐
│ botserver │ ────────────▶ │ botmodels │
│ (Rust) │ │ (Python) │
└─────────────┘ └─────────────┘
│ │
│ BASIC Keywords │ AI Models
│ - IMAGE │ - Stable Diffusion
│ - VIDEO │ - Zeroscope
│ - AUDIO │ - TTS/Whisper
│ - SEE │ - BLIP2
▼ ▼
┌─────────────┐ ┌─────────────┐
│ config │ │ outputs │
│ .csv │ │ (files) │
└─────────────┘ └─────────────┘
```
## Model Downloads
Models are downloaded automatically on first use, or you can pre-download them:
```bash
# Stable Diffusion
python -c "from diffusers import StableDiffusionPipeline; StableDiffusionPipeline.from_pretrained('runwayml/stable-diffusion-v1-5')"
# BLIP2 (Vision)
python -c "from transformers import Blip2Processor, Blip2ForConditionalGeneration; Blip2Processor.from_pretrained('Salesforce/blip2-opt-2.7b'); Blip2ForConditionalGeneration.from_pretrained('Salesforce/blip2-opt-2.7b')"
# Whisper (Speech-to-Text)
python -c "import whisper; whisper.load_model('base')"
```
## API Documentation
Interactive API documentation is available at:
- Swagger UI: `http://localhost:8085/api/docs`
- ReDoc: `http://localhost:8085/api/redoc`
## Development
### Project Structure
```
botmodels/
├── src/
│ ├── api/
│ │ ├── v1/
│ │ │ └── endpoints/
│ │ │ ├── image.py
│ │ │ ├── video.py
│ │ │ ├── speech.py
│ │ │ └── vision.py
│ │ └── dependencies.py
│ ├── core/
│ │ ├── config.py
│ │ └── logging.py
│ ├── schemas/
│ │ └── generation.py
│ ├── services/
│ │ ├── image_service.py
│ │ ├── video_service.py
│ │ ├── speech_service.py
│ │ └── vision_service.py
│ └── main.py
├── outputs/
├── models/
├── tests/
├── requirements.txt
└── README.md
```
### Running Tests
```bash
pytest tests/
```
## Security Notes
1. **Always use HTTPS in production**
2. Use strong, unique API keys
3. Restrict network access to the service
4. Consider running on a separate GPU server
5. Monitor resource usage and set appropriate limits
## Requirements
- Python 3.10+
- CUDA-capable GPU (recommended, 8GB+ VRAM)
- 16GB+ RAM
## Resources
### Education
- [Computer Vision Course](https://pjreddie.com/courses/computer-vision/)
- [Adversarial VQA Paper](https://arxiv.org/abs/2106.00245)
- [LLM Visualization](https://bbycroft.net/llm)
### References
- [VizWiz VQA PyTorch](https://github.com/DenisDsh/VizWiz-VQA-PyTorch)
- [Diffusers Library](https://github.com/huggingface/diffusers)
- [OpenAI Whisper](https://github.com/openai/whisper)
- [BLIP2](https://huggingface.co/Salesforce/blip2-opt-2.7b)
### Community
- [AI for Mankind](https://github.com/aiformankind)
- [ManaAI](https://manaai.cn/)
## License
See LICENSE file for details.

View file

@ -1,50 +1,11 @@
# Core Framework
fastapi==0.115.0
uvicorn[standard]==0.30.6
pydantic==2.9.0
pydantic-settings==2.5.2
# Logging
structlog==25.5.0
python-json-logger==2.0.7
# Generation Libraries
diffusers==0.30.3
torch==2.5.1
torchaudio==2.5.1
torchvision==0.20.1
transformers==4.46.0
accelerate==1.1.1
safetensors==0.4.5
Pillow==11.0.0
# Audio Generation & Processing
openai-whisper==20231117
TTS==0.22.0
scipy==1.14.1
# Video Processing
imageio==2.36.0
imageio-ffmpeg==0.5.1
opencv-python==4.10.0.84
# Vision & Multimodal
timm==1.0.12
# QR Code & Barcode Reading
pyzbar==0.1.9
# OCR - Optical Character Recognition
pytesseract==0.3.10
# HTTP & API
httpx==0.27.2
aiofiles==24.1.0
python-multipart==0.0.12
# Monitoring
prometheus-client==0.21.0
# Utils
python-dotenv==1.0.1
typing-extensions==4.12.2
azure-functions
azure-storage-blob
azure-identity
tensorflow
scikit-learn
pandas
numpy
allennlp
allennlp-models
nltk
Flask>=1.0,<=1.1.2

View file

View file

View file

@ -1,7 +0,0 @@
from fastapi import Header, HTTPException
from ..core.config import settings
async def verify_api_key(x_api_key: str = Header(...)):
if x_api_key != settings.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key

View file

View file

@ -1,3 +0,0 @@
from . import image, scoring, speech, video, vision
__all__ = ["image", "video", "speech", "vision", "scoring"]

View file

@ -1,64 +0,0 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
ImageDescribeResponse,
ImageGenerateRequest,
)
from ....services.image_service import get_image_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/image", tags=["Image"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_image(
request: ImageGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_image_service),
):
"""
Generate an image from a text prompt.
Args:
request: Image generation parameters including prompt, steps, dimensions, etc.
api_key: API key for authentication
service: Image service instance
Returns:
GenerationResponse with file path and generation time
"""
result = await service.generate(
prompt=request.prompt,
steps=request.steps,
width=request.width,
height=request.height,
guidance_scale=request.guidance_scale,
seed=request.seed,
)
return GenerationResponse(**result)
@router.post("/describe", response_model=ImageDescribeResponse)
async def describe_image(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_image_service),
):
"""
Get a description of an uploaded image.
Note: This endpoint is deprecated. Use /api/vision/describe instead
for full captioning capabilities.
Args:
file: Image file to describe
api_key: API key for authentication
service: Image service instance
Returns:
ImageDescribeResponse with description
"""
image_data = await file.read()
result = await service.describe(image_data)
return ImageDescribeResponse(**result)

View file

@ -1,626 +0,0 @@
"""
AI Lead Scoring Endpoint for BotModels
This module provides ML-powered lead scoring capabilities:
- Demographic scoring
- Behavioral analysis
- Engagement prediction
- Lead qualification
Endpoints:
- POST /api/scoring/score - Calculate lead score
- POST /api/scoring/batch - Batch score multiple leads
- GET /api/scoring/model-info - Get model information
"""
from datetime import datetime
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, EmailStr, Field
from ....core.logging import get_logger
from ...dependencies import verify_api_key
logger = get_logger("scoring")
router = APIRouter(prefix="/scoring", tags=["Lead Scoring"])
# ============================================================================
# Request/Response Models
# ============================================================================
class LeadProfile(BaseModel):
"""Lead profile information for scoring"""
lead_id: Optional[str] = None
email: Optional[EmailStr] = None
name: Optional[str] = None
company: Optional[str] = None
job_title: Optional[str] = None
industry: Optional[str] = None
company_size: Optional[str] = None
location: Optional[str] = None
source: Optional[str] = None
class LeadBehavior(BaseModel):
"""Lead behavioral data for scoring"""
email_opens: int = 0
email_clicks: int = 0
page_visits: int = 0
form_submissions: int = 0
content_downloads: int = 0
pricing_page_visits: int = 0
demo_requests: int = 0
trial_signups: int = 0
total_sessions: int = 0
avg_session_duration: float = 0.0
days_since_last_activity: Optional[int] = None
class ScoreLeadRequest(BaseModel):
"""Request model for lead scoring"""
profile: LeadProfile
behavior: Optional[LeadBehavior] = None
custom_weights: Optional[Dict[str, float]] = None
include_recommendations: bool = True
class BatchScoreRequest(BaseModel):
"""Request model for batch lead scoring"""
leads: List[ScoreLeadRequest]
class ScoreBreakdown(BaseModel):
"""Breakdown of score components"""
demographic: float
behavioral: float
engagement: float
intent: float
penalties: float
class LeadScoreResponse(BaseModel):
"""Response model for lead scoring"""
lead_id: str
total_score: float = Field(..., ge=0, le=100)
grade: str
qualification_status: str
breakdown: ScoreBreakdown
recommendations: List[str] = []
confidence: float = Field(..., ge=0, le=1)
calculated_at: datetime
class BatchScoreResponse(BaseModel):
"""Response model for batch scoring"""
scores: List[LeadScoreResponse]
total_processed: int
avg_score: float
grade_distribution: Dict[str, int]
class ModelInfoResponse(BaseModel):
"""Response model for model information"""
model_version: str
features_used: List[str]
last_trained: Optional[datetime]
accuracy_metrics: Dict[str, float]
# ============================================================================
# Scoring Configuration
# ============================================================================
class ScoringWeights:
"""Default weights for scoring components"""
# Demographic factors
COMPANY_SIZE_WEIGHT = 10.0
INDUSTRY_MATCH_WEIGHT = 15.0
LOCATION_MATCH_WEIGHT = 5.0
JOB_TITLE_WEIGHT = 15.0
# Behavioral factors
EMAIL_OPENS_WEIGHT = 5.0
EMAIL_CLICKS_WEIGHT = 10.0
PAGE_VISITS_WEIGHT = 5.0
FORM_SUBMISSIONS_WEIGHT = 15.0
CONTENT_DOWNLOADS_WEIGHT = 10.0
# Engagement factors
RESPONSE_TIME_WEIGHT = 10.0
INTERACTION_FREQUENCY_WEIGHT = 10.0
SESSION_DURATION_WEIGHT = 5.0
# Intent signals
PRICING_PAGE_WEIGHT = 20.0
DEMO_REQUEST_WEIGHT = 25.0
TRIAL_SIGNUP_WEIGHT = 30.0
# Penalties
INACTIVITY_PENALTY = -15.0
# Target industries and titles for scoring
TARGET_INDUSTRIES = {
"technology": 1.0,
"software": 1.0,
"saas": 1.0,
"finance": 0.9,
"fintech": 0.9,
"banking": 0.9,
"healthcare": 0.8,
"medical": 0.8,
"retail": 0.7,
"ecommerce": 0.7,
"manufacturing": 0.6,
"education": 0.5,
"nonprofit": 0.5,
}
TITLE_SCORES = {
"ceo": 1.0,
"cto": 1.0,
"cfo": 1.0,
"chief": 1.0,
"founder": 1.0,
"president": 0.95,
"vp": 0.9,
"vice president": 0.9,
"director": 0.85,
"head": 0.8,
"manager": 0.7,
"senior": 0.6,
"lead": 0.6,
}
COMPANY_SIZE_SCORES = {
"enterprise": 1.0,
"1000+": 1.0,
">1000": 1.0,
"mid-market": 0.8,
"100-999": 0.8,
"mid": 0.8,
"smb": 0.6,
"small": 0.6,
"10-99": 0.6,
"startup": 0.4,
"1-9": 0.4,
"<10": 0.4,
}
# ============================================================================
# Scoring Logic
# ============================================================================
def calculate_demographic_score(profile: LeadProfile) -> float:
"""Calculate demographic component of lead score"""
score = 0.0
weights = ScoringWeights()
# Company size scoring
if profile.company_size:
size_lower = profile.company_size.lower()
for key, value in COMPANY_SIZE_SCORES.items():
if key in size_lower:
score += value * weights.COMPANY_SIZE_WEIGHT
break
else:
score += 0.3 * weights.COMPANY_SIZE_WEIGHT
# Industry scoring
if profile.industry:
industry_lower = profile.industry.lower()
for key, value in TARGET_INDUSTRIES.items():
if key in industry_lower:
score += value * weights.INDUSTRY_MATCH_WEIGHT
break
else:
score += 0.4 * weights.INDUSTRY_MATCH_WEIGHT
# Job title scoring
if profile.job_title:
title_lower = profile.job_title.lower()
title_score = 0.3 # default
for key, value in TITLE_SCORES.items():
if key in title_lower:
title_score = max(title_score, value)
score += title_score * weights.JOB_TITLE_WEIGHT
# Location scoring (simplified)
if profile.location:
score += 0.5 * weights.LOCATION_MATCH_WEIGHT
return score
def calculate_behavioral_score(behavior: LeadBehavior) -> float:
"""Calculate behavioral component of lead score"""
score = 0.0
weights = ScoringWeights()
# Email engagement
email_open_score = min(behavior.email_opens / 10.0, 1.0)
score += email_open_score * weights.EMAIL_OPENS_WEIGHT
email_click_score = min(behavior.email_clicks / 5.0, 1.0)
score += email_click_score * weights.EMAIL_CLICKS_WEIGHT
# Page visits
visit_score = min(behavior.page_visits / 20.0, 1.0)
score += visit_score * weights.PAGE_VISITS_WEIGHT
# Form submissions
form_score = min(behavior.form_submissions / 3.0, 1.0)
score += form_score * weights.FORM_SUBMISSIONS_WEIGHT
# Content downloads
download_score = min(behavior.content_downloads / 5.0, 1.0)
score += download_score * weights.CONTENT_DOWNLOADS_WEIGHT
return score
def calculate_engagement_score(behavior: LeadBehavior) -> float:
"""Calculate engagement component of lead score"""
score = 0.0
weights = ScoringWeights()
# Interaction frequency
frequency_score = min(behavior.total_sessions / 10.0, 1.0)
score += frequency_score * weights.INTERACTION_FREQUENCY_WEIGHT
# Session duration (5 min = max score)
duration_score = min(behavior.avg_session_duration / 300.0, 1.0)
score += duration_score * weights.SESSION_DURATION_WEIGHT
# Recency scoring
if behavior.days_since_last_activity is not None:
days = behavior.days_since_last_activity
if days <= 1:
recency_score = 1.0
elif days <= 7:
recency_score = 0.8
elif days <= 14:
recency_score = 0.6
elif days <= 30:
recency_score = 0.4
elif days <= 60:
recency_score = 0.2
else:
recency_score = 0.0
score += recency_score * weights.RESPONSE_TIME_WEIGHT
return score
def calculate_intent_score(behavior: LeadBehavior) -> float:
"""Calculate intent signal component of lead score"""
score = 0.0
weights = ScoringWeights()
# Pricing page visits
if behavior.pricing_page_visits > 0:
pricing_score = min(behavior.pricing_page_visits / 3.0, 1.0)
score += pricing_score * weights.PRICING_PAGE_WEIGHT
# Demo requests
if behavior.demo_requests > 0:
score += weights.DEMO_REQUEST_WEIGHT
# Trial signups
if behavior.trial_signups > 0:
score += weights.TRIAL_SIGNUP_WEIGHT
return score
def calculate_penalty_score(behavior: LeadBehavior) -> float:
"""Calculate penalty deductions"""
penalty = 0.0
weights = ScoringWeights()
# Inactivity penalty
if behavior.days_since_last_activity is not None:
if behavior.days_since_last_activity > 60:
penalty += weights.INACTIVITY_PENALTY
elif behavior.days_since_last_activity > 30:
penalty += weights.INACTIVITY_PENALTY * 0.5
elif behavior.total_sessions == 0:
penalty += weights.INACTIVITY_PENALTY
return penalty
def get_grade(score: float) -> str:
"""Determine lead grade based on score"""
if score >= 80:
return "A"
elif score >= 60:
return "B"
elif score >= 40:
return "C"
elif score >= 20:
return "D"
else:
return "F"
def get_qualification_status(
score: float, has_demo: bool = False, has_trial: bool = False
) -> str:
"""Determine qualification status"""
if has_trial or score >= 90:
return "sql" # Sales Qualified Lead
elif has_demo or score >= 70:
return "mql" # Marketing Qualified Lead
else:
return "unqualified"
def generate_recommendations(
profile: LeadProfile, behavior: LeadBehavior, score: float
) -> List[str]:
"""Generate actionable recommendations for the lead"""
recommendations = []
# Score-based recommendations
if score >= 80:
recommendations.append("Hot lead! Prioritize immediate sales outreach.")
elif score >= 60:
recommendations.append("Warm lead - consider scheduling a discovery call.")
elif score >= 40:
recommendations.append("Continue nurturing with targeted content.")
else:
recommendations.append("Low priority - add to nurturing campaign.")
# Behavior-based recommendations
if behavior.pricing_page_visits > 0 and behavior.demo_requests == 0:
recommendations.append("Visited pricing page - send personalized demo invite.")
if behavior.content_downloads > 2 and behavior.form_submissions == 1:
recommendations.append(
"High content engagement - offer exclusive webinar access."
)
if behavior.email_opens > 5 and behavior.email_clicks < 2:
recommendations.append("Opens emails but doesn't click - try different CTAs.")
# Profile-based recommendations
if not profile.company:
recommendations.append("Missing company info - enrich profile data.")
if not profile.job_title:
recommendations.append("Unknown job title - request more information.")
# Engagement recommendations
if behavior.days_since_last_activity and behavior.days_since_last_activity > 14:
recommendations.append("Inactive for 2+ weeks - send re-engagement email.")
return recommendations
def score_lead(request: ScoreLeadRequest) -> LeadScoreResponse:
"""Calculate comprehensive lead score"""
profile = request.profile
behavior = request.behavior or LeadBehavior()
# Calculate component scores
demographic_score = calculate_demographic_score(profile)
behavioral_score = calculate_behavioral_score(behavior)
engagement_score = calculate_engagement_score(behavior)
intent_score = calculate_intent_score(behavior)
penalty_score = calculate_penalty_score(behavior)
# Calculate total score
raw_score = (
demographic_score
+ behavioral_score
+ engagement_score
+ intent_score
+ penalty_score
)
total_score = max(0, min(100, raw_score))
# Determine grade and status
grade = get_grade(total_score)
qualification_status = get_qualification_status(
total_score,
has_demo=behavior.demo_requests > 0,
has_trial=behavior.trial_signups > 0,
)
# Generate recommendations
recommendations = []
if request.include_recommendations:
recommendations = generate_recommendations(profile, behavior, total_score)
# Calculate confidence based on data completeness
data_points = sum(
[
1 if profile.email else 0,
1 if profile.name else 0,
1 if profile.company else 0,
1 if profile.job_title else 0,
1 if profile.industry else 0,
1 if profile.company_size else 0,
1 if behavior.total_sessions > 0 else 0,
1 if behavior.email_opens > 0 else 0,
]
)
confidence = min(data_points / 8.0, 1.0)
return LeadScoreResponse(
lead_id=profile.lead_id or profile.email or "unknown",
total_score=round(total_score, 2),
grade=grade,
qualification_status=qualification_status,
breakdown=ScoreBreakdown(
demographic=round(demographic_score, 2),
behavioral=round(behavioral_score, 2),
engagement=round(engagement_score, 2),
intent=round(intent_score, 2),
penalties=round(penalty_score, 2),
),
recommendations=recommendations,
confidence=round(confidence, 2),
calculated_at=datetime.utcnow(),
)
# ============================================================================
# API Endpoints
# ============================================================================
@router.post("/score", response_model=LeadScoreResponse)
async def calculate_lead_score(
request: ScoreLeadRequest,
api_key: str = Depends(verify_api_key),
) -> LeadScoreResponse:
"""
Calculate AI-powered lead score.
This endpoint analyzes lead profile and behavioral data to calculate
a comprehensive lead score (0-100) with grade assignment and
qualification status.
Args:
request: Lead profile and behavioral data
api_key: API key for authentication
Returns:
LeadScoreResponse with score, grade, and recommendations
"""
try:
logger.info(
"Scoring lead",
lead_id=request.profile.lead_id,
email=request.profile.email,
)
result = score_lead(request)
logger.info(
"Lead scored",
lead_id=result.lead_id,
score=result.total_score,
grade=result.grade,
)
return result
except Exception as e:
logger.error("Lead scoring failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Scoring failed: {str(e)}")
@router.post("/batch", response_model=BatchScoreResponse)
async def batch_score_leads(
request: BatchScoreRequest,
api_key: str = Depends(verify_api_key),
) -> BatchScoreResponse:
"""
Batch score multiple leads.
Efficiently score multiple leads in a single request.
Args:
request: List of leads to score
api_key: API key for authentication
Returns:
BatchScoreResponse with all scores and summary statistics
"""
try:
logger.info("Batch scoring", count=len(request.leads))
scores = [score_lead(lead_request) for lead_request in request.leads]
# Calculate statistics
total_score = sum(s.total_score for s in scores)
avg_score = total_score / len(scores) if scores else 0
grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
for s in scores:
grade_dist[s.grade] += 1
logger.info(
"Batch scoring complete",
count=len(scores),
avg_score=round(avg_score, 2),
)
return BatchScoreResponse(
scores=scores,
total_processed=len(scores),
avg_score=round(avg_score, 2),
grade_distribution=grade_dist,
)
except Exception as e:
logger.error("Batch scoring failed", error=str(e))
raise HTTPException(status_code=500, detail=f"Batch scoring failed: {str(e)}")
@router.get("/model-info", response_model=ModelInfoResponse)
async def get_model_info(
api_key: str = Depends(verify_api_key),
) -> ModelInfoResponse:
"""
Get information about the scoring model.
Returns metadata about the lead scoring model including
features used and accuracy metrics.
Args:
api_key: API key for authentication
Returns:
ModelInfoResponse with model metadata
"""
return ModelInfoResponse(
model_version="1.0.0",
features_used=[
"company_size",
"industry",
"job_title",
"location",
"email_opens",
"email_clicks",
"page_visits",
"form_submissions",
"content_downloads",
"pricing_page_visits",
"demo_requests",
"trial_signups",
"session_duration",
"days_since_activity",
],
last_trained=datetime(2025, 1, 1),
accuracy_metrics={
"mql_precision": 0.85,
"sql_precision": 0.92,
"conversion_correlation": 0.78,
},
)
@router.get("/health")
async def scoring_health():
"""Health check for scoring service"""
return {"status": "healthy", "service": "lead_scoring"}

View file

@ -1,85 +0,0 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
SpeechGenerateRequest,
SpeechToTextResponse,
)
from ....services.speech_service import get_speech_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/speech", tags=["Speech"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_speech(
request: SpeechGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Generate speech audio from text (Text-to-Speech).
Args:
request: Speech generation parameters including:
- prompt: Text to convert to speech
- voice: Voice model to use (optional, default: "default")
- language: Language code (optional, default: "en")
api_key: API key for authentication
service: Speech service instance
Returns:
GenerationResponse with file path to generated audio and generation time
"""
result = await service.generate(
prompt=request.prompt,
voice=request.voice,
language=request.language,
)
return GenerationResponse(**result)
@router.post("/totext", response_model=SpeechToTextResponse)
async def speech_to_text(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Convert speech audio to text (Speech-to-Text) using Whisper.
Supported audio formats: wav, mp3, m4a, flac, ogg
Args:
file: Audio file to transcribe
api_key: API key for authentication
service: Speech service instance
Returns:
SpeechToTextResponse with transcribed text, detected language, and confidence
"""
audio_data = await file.read()
result = await service.to_text(audio_data)
return SpeechToTextResponse(**result)
@router.post("/detect_language")
async def detect_language(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Detect the language of spoken audio using Whisper.
Args:
file: Audio file to analyze
api_key: API key for authentication
service: Speech service instance
Returns:
dict with detected language code and confidence score
"""
audio_data = await file.read()
result = await service.detect_language(audio_data)
return result

View file

@ -1,63 +0,0 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
VideoDescribeResponse,
VideoGenerateRequest,
)
from ....services.video_service import get_video_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/video", tags=["Video"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_video(
request: VideoGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_video_service),
):
"""
Generate a video from a text prompt.
Args:
request: Video generation parameters including prompt, frames, fps, etc.
api_key: API key for authentication
service: Video service instance
Returns:
GenerationResponse with file path and generation time
"""
result = await service.generate(
prompt=request.prompt,
num_frames=request.num_frames,
fps=request.fps,
steps=request.steps,
seed=request.seed,
)
return GenerationResponse(**result)
@router.post("/describe", response_model=VideoDescribeResponse)
async def describe_video(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_video_service),
):
"""
Get a description of an uploaded video.
Note: This endpoint is deprecated. Use /api/vision/describe_video instead
for full video captioning capabilities.
Args:
file: Video file to describe
api_key: API key for authentication
service: Video service instance
Returns:
VideoDescribeResponse with description and frame count
"""
video_data = await file.read()
result = await service.describe(video_data)
return VideoDescribeResponse(**result)

View file

@ -1,335 +0,0 @@
import io
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, UploadFile
from PIL import Image
from pyzbar import pyzbar
from ....schemas.generation import (
ImageDescribeResponse,
QRCodeResponse,
VideoDescribeResponse,
)
from ....services.vision_service import get_vision_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/vision", tags=["Vision"])
@router.post("/describe", response_model=ImageDescribeResponse)
async def describe_image(
file: UploadFile = File(...),
prompt: Optional[str] = Form(None),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Get a caption/description for an image.
Optionally provide a prompt to guide the description.
"""
image_data = await file.read()
result = await service.describe_image(image_data, prompt)
return ImageDescribeResponse(**result)
@router.post("/describe-video", response_model=VideoDescribeResponse)
async def describe_video(
file: UploadFile = File(...),
num_frames: int = Form(8),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Get a description for a video by sampling and analyzing frames.
Args:
file: Video file (mp4, avi, mov, webm, mkv)
num_frames: Number of frames to sample for analysis (default: 8)
"""
video_data = await file.read()
result = await service.describe_video(video_data, num_frames)
return VideoDescribeResponse(**result)
@router.post("/vqa")
async def visual_question_answering(
file: UploadFile = File(...),
question: str = Form(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Visual Question Answering - ask a question about an image.
Args:
file: Image file
question: Question to ask about the image
"""
image_data = await file.read()
result = await service.answer_question(image_data, question)
return ImageDescribeResponse(**result)
@router.post("/qrcode", response_model=QRCodeResponse)
async def read_qrcode(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
):
"""
Read QR code(s) from an image.
Returns all QR codes found in the image with their data and positions.
Args:
file: Image file containing QR code(s)
Returns:
QRCodeResponse with data from all found QR codes
"""
image_data = await file.read()
try:
# Load image
image = Image.open(io.BytesIO(image_data))
# Convert to RGB if necessary (pyzbar works best with RGB)
if image.mode != "RGB":
image = image.convert("RGB")
# Decode QR codes
decoded_objects = pyzbar.decode(image)
if not decoded_objects:
return QRCodeResponse(
success=False,
data=None,
codes=[],
count=0,
error="No QR code found in image",
)
codes = []
for obj in decoded_objects:
code_info = {
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
"rect": {
"left": obj.rect.left,
"top": obj.rect.top,
"width": obj.rect.width,
"height": obj.rect.height,
},
"polygon": [{"x": p.x, "y": p.y} for p in obj.polygon]
if obj.polygon
else None,
}
codes.append(code_info)
# Return the first QR code data as the main data field for convenience
primary_data = codes[0]["data"] if codes else None
return QRCodeResponse(
success=True, data=primary_data, codes=codes, count=len(codes), error=None
)
except Exception as e:
return QRCodeResponse(
success=False,
data=None,
codes=[],
count=0,
error=f"Failed to process image: {str(e)}",
)
@router.post("/barcode")
async def read_barcode(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
):
"""
Read barcode(s) from an image (supports multiple barcode formats).
Supports: QR Code, Code128, Code39, EAN-13, EAN-8, UPC-A, UPC-E,
Interleaved 2 of 5, Codabar, PDF417, DataMatrix
Args:
file: Image file containing barcode(s)
Returns:
List of all barcodes found with their data and type
"""
image_data = await file.read()
try:
image = Image.open(io.BytesIO(image_data))
if image.mode != "RGB":
image = image.convert("RGB")
decoded_objects = pyzbar.decode(image)
if not decoded_objects:
return {
"success": False,
"barcodes": [],
"count": 0,
"error": "No barcode found in image",
}
barcodes = []
for obj in decoded_objects:
barcode_info = {
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
"rect": {
"left": obj.rect.left,
"top": obj.rect.top,
"width": obj.rect.width,
"height": obj.rect.height,
},
}
barcodes.append(barcode_info)
return {
"success": True,
"barcodes": barcodes,
"count": len(barcodes),
"error": None,
}
except Exception as e:
return {
"success": False,
"barcodes": [],
"count": 0,
"error": f"Failed to process image: {str(e)}",
}
@router.post("/ocr")
async def extract_text(
file: UploadFile = File(...),
language: str = Form("eng"),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Extract text from an image using OCR.
Args:
file: Image file
language: Language code for OCR (default: eng).
Use 'por' for Portuguese, 'spa' for Spanish, etc.
Returns:
Extracted text from the image
"""
image_data = await file.read()
try:
import pytesseract
image = Image.open(io.BytesIO(image_data))
# Extract text
text = pytesseract.image_to_string(image, lang=language)
# Get detailed data with confidence scores
data = pytesseract.image_to_data(
image, lang=language, output_type=pytesseract.Output.DICT
)
# Calculate average confidence (filtering out -1 values which indicate no text)
confidences = [c for c in data["conf"] if c > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return {
"success": True,
"text": text.strip(),
"confidence": avg_confidence / 100, # Normalize to 0-1
"language": language,
"word_count": len(text.split()),
"error": None,
}
except Exception as e:
return {
"success": False,
"text": "",
"confidence": 0,
"language": language,
"word_count": 0,
"error": f"OCR failed: {str(e)}",
}
@router.post("/analyze")
async def analyze_image(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Comprehensive image analysis - combines description, OCR, and barcode detection.
Returns a complete analysis of the image including:
- AI-generated description
- Any text found (OCR)
- Any QR codes or barcodes found
Args:
file: Image file to analyze
"""
image_data = await file.read()
result = {"description": None, "text": None, "codes": [], "metadata": {}}
try:
image = Image.open(io.BytesIO(image_data))
# Get image metadata
result["metadata"] = {
"width": image.width,
"height": image.height,
"format": image.format,
"mode": image.mode,
}
# Get AI description
try:
desc_result = await service.describe_image(image_data, None)
result["description"] = desc_result.get("description")
except:
pass
# Try OCR
try:
import pytesseract
text = pytesseract.image_to_string(image)
if text.strip():
result["text"] = text.strip()
except:
pass
# Try barcode/QR detection
try:
if image.mode != "RGB":
image = image.convert("RGB")
decoded = pyzbar.decode(image)
if decoded:
result["codes"] = [
{
"data": obj.data.decode("utf-8", errors="replace"),
"type": obj.type,
}
for obj in decoded
]
except:
pass
return {"success": True, **result}
except Exception as e:
return {"success": False, "error": str(e), **result}

View file

View file

@ -1,64 +0,0 @@
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
env: str = "development"
host: str = "0.0.0.0"
port: int = 8085
log_level: str = "INFO"
api_v1_prefix: str = "/api"
project_name: str = "BotModels API"
version: str = "2.0.0"
api_key: str = "change-me"
# Image generation model
image_model_path: str = "./models/stable-diffusion-v1-5"
image_steps: int = 4
image_width: int = 512
image_height: int = 512
image_gpu_layers: int = 20
image_batch_size: int = 1
# Video generation model
video_model_path: str = "./models/zeroscope-v2"
video_frames: int = 24
video_fps: int = 8
video_width: int = 320
video_height: int = 576
video_gpu_layers: int = 15
video_batch_size: int = 1
# Speech/TTS model
speech_model_path: str = "./models/tts"
# Vision model (BLIP2 for captioning)
vision_model_path: str = "./models/blip2"
# Whisper model for speech-to-text
whisper_model_path: str = "./models/whisper"
# Device configuration
device: str = "cuda"
# Output directory for generated files
output_dir: Path = Path("./outputs")
@property
def is_production(self) -> bool:
return self.env == "production"
settings = Settings()
settings.output_dir.mkdir(parents=True, exist_ok=True)
(settings.output_dir / "images").mkdir(exist_ok=True)
(settings.output_dir / "videos").mkdir(exist_ok=True)
(settings.output_dir / "audio").mkdir(exist_ok=True)

View file

@ -1,33 +0,0 @@
import structlog
from .config import settings
def setup_logging():
if settings.is_production:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(structlog.stdlib.logging, settings.log_level.upper())
),
)
else:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(colors=True)
],
)
def get_logger(name: str = None):
logger = structlog.get_logger()
if name:
logger = logger.bind(service=name)
return logger
setup_logging()

View file

@ -1,86 +0,0 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from .api.v1.endpoints import image, scoring, speech, video, vision
from .core.config import settings
from .core.logging import get_logger
from .services.image_service import get_image_service
from .services.speech_service import get_speech_service
from .services.video_service import get_video_service
from .services.vision_service import get_vision_service
logger = get_logger("main")
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting BotModels API", version=settings.version)
try:
get_image_service().initialize()
get_video_service().initialize()
get_speech_service().initialize()
get_vision_service().initialize()
logger.info("All services initialized")
except Exception as e:
logger.error("Failed to initialize services", error=str(e))
yield
logger.info("Shutting down BotModels API")
app = FastAPI(
title=settings.project_name,
version=settings.version,
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(image.router, prefix=settings.api_v1_prefix)
app.include_router(video.router, prefix=settings.api_v1_prefix)
app.include_router(speech.router, prefix=settings.api_v1_prefix)
app.include_router(vision.router, prefix=settings.api_v1_prefix)
app.include_router(scoring.router, prefix=settings.api_v1_prefix)
app.mount("/outputs", StaticFiles(directory="outputs"), name="outputs")
@app.get("/")
async def root():
return JSONResponse(
{
"service": settings.project_name,
"version": settings.version,
"status": "running",
"docs": "/api/docs",
"endpoints": {
"image": "/api/v1/image",
"video": "/api/v1/video",
"speech": "/api/v1/speech",
"vision": "/api/v1/vision",
"scoring": "/api/v1/scoring",
},
}
)
@app.get("/api/health")
async def health():
return {"status": "healthy", "version": settings.version, "device": settings.device}
if __name__ == "__main__":
import uvicorn
uvicorn.run("src.main:app", host=settings.host, port=settings.port, reload=True)

View file

@ -1,115 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class GenerationRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=2000)
seed: Optional[int] = None
class ImageGenerateRequest(GenerationRequest):
steps: Optional[int] = Field(30, ge=1, le=150)
width: Optional[int] = Field(512, ge=64, le=2048)
height: Optional[int] = Field(512, ge=64, le=2048)
guidance_scale: Optional[float] = Field(7.5, ge=1.0, le=20.0)
class VideoGenerateRequest(GenerationRequest):
num_frames: Optional[int] = Field(24, ge=8, le=128)
fps: Optional[int] = Field(8, ge=1, le=60)
steps: Optional[int] = Field(50, ge=10, le=100)
class SpeechGenerateRequest(GenerationRequest):
voice: Optional[str] = Field("default", description="Voice model")
language: Optional[str] = Field("en", description="Language code")
class GenerationResponse(BaseModel):
status: str
file_path: Optional[str] = None
generation_time: Optional[float] = None
error: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
class DescribeRequest(BaseModel):
file_data: bytes
class ImageDescribeResponse(BaseModel):
description: str
confidence: Optional[float] = None
generation_time: Optional[float] = None
class VideoDescribeResponse(BaseModel):
description: str
frame_count: int
generation_time: Optional[float] = None
class SpeechToTextResponse(BaseModel):
text: str
language: Optional[str] = None
confidence: Optional[float] = None
class QRCodeInfo(BaseModel):
"""Information about a single QR code found in an image"""
data: str = Field(..., description="The decoded data from the QR code")
type: str = Field(..., description="The type of code (QRCODE, BARCODE, etc.)")
rect: Optional[Dict[str, int]] = Field(
None, description="Bounding rectangle {left, top, width, height}"
)
polygon: Optional[List[Dict[str, int]]] = Field(
None, description="Polygon points [{x, y}, ...]"
)
class QRCodeResponse(BaseModel):
"""Response from QR code reading endpoint"""
success: bool = Field(..., description="Whether the operation was successful")
data: Optional[str] = Field(
None, description="The primary QR code data (first found)"
)
codes: List[Dict[str, Any]] = Field(
default_factory=list, description="All QR codes found in the image"
)
count: int = Field(0, description="Number of QR codes found")
error: Optional[str] = Field(None, description="Error message if any")
class BarcodeResponse(BaseModel):
"""Response from barcode reading endpoint"""
success: bool
barcodes: List[Dict[str, Any]] = Field(default_factory=list)
count: int = 0
error: Optional[str] = None
class OCRResponse(BaseModel):
"""Response from OCR text extraction endpoint"""
success: bool
text: str = ""
confidence: float = 0.0
language: str = "eng"
word_count: int = 0
error: Optional[str] = None
class ImageAnalysisResponse(BaseModel):
"""Comprehensive image analysis response"""
success: bool
description: Optional[str] = None
text: Optional[str] = None
codes: List[Dict[str, Any]] = Field(default_factory=list)
metadata: Dict[str, Any] = Field(default_factory=dict)
error: Optional[str] = None

View file

@ -1,15 +0,0 @@
from .image_service import ImageService, get_image_service
from .speech_service import SpeechService, get_speech_service
from .video_service import VideoService, get_video_service
from .vision_service import VisionService, get_vision_service
__all__ = [
"ImageService",
"get_image_service",
"VideoService",
"get_video_service",
"SpeechService",
"get_speech_service",
"VisionService",
"get_vision_service",
]

View file

@ -1,111 +0,0 @@
import time
from datetime import datetime
from typing import Optional
import torch
from diffusers import DPMSolverMultistepScheduler, StableDiffusionPipeline
from PIL import Image
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("image_service")
class ImageService:
def __init__(self):
self.pipeline: Optional[StableDiffusionPipeline] = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading Stable Diffusion model", path=settings.image_model_path)
try:
self.pipeline = StableDiffusionPipeline.from_pretrained(
settings.image_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
safety_checker=None,
)
self.pipeline.scheduler = DPMSolverMultistepScheduler.from_config(
self.pipeline.scheduler.config
)
self.pipeline = self.pipeline.to(self.device)
if self.device == "cuda":
self.pipeline.enable_attention_slicing()
self._initialized = True
logger.info("Stable Diffusion loaded successfully")
except Exception as e:
logger.error("Failed to load model", error=str(e))
raise
async def generate(
self,
prompt: str,
steps: Optional[int] = None,
width: Optional[int] = None,
height: Optional[int] = None,
guidance_scale: Optional[float] = None,
seed: Optional[int] = None,
) -> dict:
if not self._initialized:
self.initialize()
# Use config defaults if not specified
actual_steps = steps if steps is not None else settings.image_steps
actual_width = width if width is not None else settings.image_width
actual_height = height if height is not None else settings.image_height
actual_guidance = guidance_scale if guidance_scale is not None else 7.5
start = time.time()
generator = (
torch.Generator(device=self.device).manual_seed(seed) if seed else None
)
logger.info(
"Generating image",
prompt=prompt[:50],
steps=actual_steps,
width=actual_width,
height=actual_height,
)
output = self.pipeline(
prompt=prompt,
num_inference_steps=actual_steps,
guidance_scale=actual_guidance,
width=actual_width,
height=actual_height,
generator=generator,
)
image: Image.Image = output.images[0]
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.png"
output_path = settings.output_dir / "images" / filename
image.save(output_path)
generation_time = time.time() - start
logger.info("Image generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/images/{filename}",
"generation_time": generation_time,
}
async def describe(self, image_data: bytes) -> dict:
# Placeholder for backward compatibility
# Use vision_service for actual image description
return {"description": "Use /api/vision/describe endpoint", "confidence": 0.0}
_service = None
def get_image_service():
global _service
if _service is None:
_service = ImageService()
return _service

View file

@ -1,229 +0,0 @@
import io
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("speech_service")
class SpeechService:
def __init__(self):
self.tts_model = None
self.whisper_model = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading speech models")
try:
# Load TTS model (Coqui TTS)
self._load_tts_model()
# Load Whisper model for speech-to-text
self._load_whisper_model()
self._initialized = True
logger.info("Speech models loaded successfully")
except Exception as e:
logger.error("Failed to load speech models", error=str(e))
# Don't raise - allow service to run with partial functionality
logger.warning("Speech service will have limited functionality")
def _load_tts_model(self):
"""Load TTS model for text-to-speech generation"""
try:
from TTS.api import TTS
# Use a fast, high-quality model
self.tts_model = TTS(
model_name="tts_models/en/ljspeech/tacotron2-DDC",
progress_bar=False,
gpu=(self.device == "cuda"),
)
logger.info("TTS model loaded")
except Exception as e:
logger.warning("TTS model not available", error=str(e))
self.tts_model = None
def _load_whisper_model(self):
"""Load Whisper model for speech-to-text"""
try:
import whisper
# Use base model for balance of speed and accuracy
model_size = "base"
if Path(settings.whisper_model_path).exists():
self.whisper_model = whisper.load_model(
model_size, download_root=settings.whisper_model_path
)
else:
self.whisper_model = whisper.load_model(model_size)
logger.info("Whisper model loaded", model=model_size)
except Exception as e:
logger.warning("Whisper model not available", error=str(e))
self.whisper_model = None
async def generate(
self,
prompt: str,
voice: Optional[str] = None,
language: Optional[str] = None,
) -> dict:
"""Generate speech audio from text"""
if not self._initialized:
self.initialize()
start = time.time()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.wav"
output_path = settings.output_dir / "audio" / filename
if self.tts_model is None:
logger.error("TTS model not available")
return {
"status": "error",
"error": "TTS model not initialized",
"file_path": None,
"generation_time": time.time() - start,
}
try:
logger.info(
"Generating speech",
text_length=len(prompt),
voice=voice,
language=language,
)
# Generate speech
self.tts_model.tts_to_file(
text=prompt,
file_path=str(output_path),
)
generation_time = time.time() - start
logger.info("Speech generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/audio/{filename}",
"generation_time": generation_time,
}
except Exception as e:
logger.error("Speech generation failed", error=str(e))
return {
"status": "error",
"error": str(e),
"file_path": None,
"generation_time": time.time() - start,
}
async def to_text(self, audio_data: bytes) -> dict:
"""Convert speech audio to text using Whisper"""
if not self._initialized:
self.initialize()
start = time.time()
if self.whisper_model is None:
logger.error("Whisper model not available")
return {
"text": "",
"language": None,
"confidence": 0.0,
"error": "Whisper model not initialized",
}
try:
# Save audio to temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
logger.info("Transcribing audio", file_size=len(audio_data))
# Transcribe
result = self.whisper_model.transcribe(tmp_path)
# Clean up temp file
import os
os.unlink(tmp_path)
transcription_time = time.time() - start
logger.info(
"Audio transcribed",
text_length=len(result["text"]),
language=result.get("language"),
time=transcription_time,
)
return {
"text": result["text"].strip(),
"language": result.get("language", "en"),
"confidence": 0.95, # Whisper doesn't provide confidence directly
}
except Exception as e:
logger.error("Speech-to-text failed", error=str(e))
return {
"text": "",
"language": None,
"confidence": 0.0,
"error": str(e),
}
async def detect_language(self, audio_data: bytes) -> dict:
"""Detect the language of spoken audio"""
if not self._initialized:
self.initialize()
if self.whisper_model is None:
return {"language": None, "error": "Whisper model not initialized"}
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
import whisper
# Load audio and detect language
audio = whisper.load_audio(tmp_path)
audio = whisper.pad_or_trim(audio)
mel = whisper.log_mel_spectrogram(audio).to(self.whisper_model.device)
_, probs = self.whisper_model.detect_language(mel)
import os
os.unlink(tmp_path)
detected_lang = max(probs, key=probs.get)
confidence = probs[detected_lang]
return {
"language": detected_lang,
"confidence": confidence,
}
except Exception as e:
logger.error("Language detection failed", error=str(e))
return {"language": None, "error": str(e)}
_service = None
def get_speech_service():
global _service
if _service is None:
_service = SpeechService()
return _service

View file

@ -1,106 +0,0 @@
import time
from datetime import datetime
from typing import Optional
import imageio
import torch
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("video_service")
class VideoService:
def __init__(self):
self.pipeline = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading video model", path=settings.video_model_path)
try:
from diffusers import DiffusionPipeline
self.pipeline = DiffusionPipeline.from_pretrained(
settings.video_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
)
self.pipeline = self.pipeline.to(self.device)
self._initialized = True
logger.info("Video model loaded successfully")
except Exception as e:
logger.error("Failed to load video model", error=str(e))
raise
async def generate(
self,
prompt: str,
num_frames: Optional[int] = None,
fps: Optional[int] = None,
steps: Optional[int] = None,
seed: Optional[int] = None,
) -> dict:
if not self._initialized:
self.initialize()
# Use config defaults if not specified
actual_frames = num_frames if num_frames is not None else settings.video_frames
actual_fps = fps if fps is not None else settings.video_fps
actual_steps = steps if steps is not None else 50
start = time.time()
generator = (
torch.Generator(device=self.device).manual_seed(seed) if seed else None
)
logger.info(
"Generating video",
prompt=prompt[:50],
frames=actual_frames,
fps=actual_fps,
steps=actual_steps,
)
output = self.pipeline(
prompt=prompt,
num_frames=actual_frames,
num_inference_steps=actual_steps,
generator=generator,
)
frames = output.frames[0]
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.mp4"
output_path = settings.output_dir / "videos" / filename
imageio.mimsave(output_path, frames, fps=actual_fps, codec="libx264")
generation_time = time.time() - start
logger.info("Video generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/videos/{filename}",
"generation_time": generation_time,
}
async def describe(self, video_data: bytes) -> dict:
# Placeholder for backward compatibility
# Use vision_service for actual video description
return {
"description": "Use /api/vision/describe_video endpoint",
"frame_count": 0,
}
_service = None
def get_video_service():
global _service
if _service is None:
_service = VideoService()
return _service

View file

@ -1,204 +0,0 @@
import io
import time
from datetime import datetime
from typing import Optional
import torch
from PIL import Image
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("vision_service")
class VisionService:
def __init__(self):
self.model = None
self.processor = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading vision model (BLIP2)")
try:
from transformers import Blip2ForConditionalGeneration, Blip2Processor
self.processor = Blip2Processor.from_pretrained(settings.vision_model_path)
self.model = Blip2ForConditionalGeneration.from_pretrained(
settings.vision_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
)
self.model = self.model.to(self.device)
self._initialized = True
logger.info("Vision model loaded")
except Exception as e:
logger.error("Failed to load vision model", error=str(e))
# Don't raise - allow service to run without vision
logger.warning("Vision service will return placeholder responses")
async def describe_image(
self, image_data: bytes, prompt: Optional[str] = None
) -> dict:
"""Generate a caption/description for an image"""
start = time.time()
if not self._initialized or self.model is None:
# Return placeholder if model not loaded
return {
"description": "Vision model not initialized. Please check model path configuration.",
"confidence": 0.0,
"generation_time": time.time() - start,
}
try:
# Load image from bytes
image = Image.open(io.BytesIO(image_data)).convert("RGB")
# Prepare inputs
if prompt:
inputs = self.processor(image, text=prompt, return_tensors="pt").to(
self.device
)
else:
inputs = self.processor(image, return_tensors="pt").to(self.device)
# Generate caption
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=100, num_beams=5, early_stopping=True
)
# Decode the generated text
description = self.processor.decode(
generated_ids[0], skip_special_tokens=True
)
return {
"description": description.strip(),
"confidence": 0.85, # BLIP2 doesn't provide confidence scores directly
"generation_time": time.time() - start,
}
except Exception as e:
logger.error("Image description failed", error=str(e))
return {
"description": f"Error describing image: {str(e)}",
"confidence": 0.0,
"generation_time": time.time() - start,
}
async def describe_video(self, video_data: bytes, num_frames: int = 8) -> dict:
"""Generate a description for a video by sampling frames"""
start = time.time()
if not self._initialized or self.model is None:
return {
"description": "Vision model not initialized. Please check model path configuration.",
"frame_count": 0,
"generation_time": time.time() - start,
}
try:
import tempfile
import cv2
import numpy as np
# Save video to temp file
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
tmp.write(video_data)
tmp_path = tmp.name
# Open video and extract frames
cap = cv2.VideoCapture(tmp_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
cap.release()
return {
"description": "Could not read video frames",
"frame_count": 0,
"generation_time": time.time() - start,
}
# Sample frames evenly throughout the video
frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(Image.fromarray(frame_rgb))
cap.release()
# Clean up temp file
import os
os.unlink(tmp_path)
if not frames:
return {
"description": "No frames could be extracted from video",
"frame_count": 0,
"generation_time": time.time() - start,
}
# Generate descriptions for each sampled frame
descriptions = []
for frame in frames:
inputs = self.processor(frame, return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=50, num_beams=3, early_stopping=True
)
desc = self.processor.decode(generated_ids[0], skip_special_tokens=True)
descriptions.append(desc.strip())
# Combine descriptions into a coherent summary
# Use the most common elements or create a timeline
unique_descriptions = list(
dict.fromkeys(descriptions)
) # Remove duplicates preserving order
if len(unique_descriptions) == 1:
combined = unique_descriptions[0]
else:
combined = "Video shows: " + "; ".join(unique_descriptions[:4])
return {
"description": combined,
"frame_count": len(frames),
"generation_time": time.time() - start,
}
except Exception as e:
logger.error("Video description failed", error=str(e))
return {
"description": f"Error describing video: {str(e)}",
"frame_count": 0,
"generation_time": time.time() - start,
}
async def answer_question(self, image_data: bytes, question: str) -> dict:
"""Visual question answering - ask a question about an image"""
# Use describe_image with the question as a prompt
return await self.describe_image(image_data, prompt=question)
_service = None
def get_vision_service():
global _service
if _service is None:
_service = VisionService()
return _service

View file