Rewrite BotModels as FastAPI multimodal AI service

Replace Azure Functions architecture with a modern FastAPI-based REST
API providing image, video, speech, and vision capabilities for General
Bots.

Key changes:
- Add FastAPI app with versioned API endpoints and OpenAPI docs
- Implement services for Stable Diffusion, Zeroscope, TTS/Whisper, BLIP2
- Add pydantic schemas for request/response validation
- Configure structured logging with structlog
- Support lazy model loading and GPU acceleration
- Update dependencies from Azure/TensorFlow stack to PyTorch/diffusers
This commit is contained in:
Rodrigo Rodriguez (Pragmatismo) 2025-11-30 07:52:56 -03:00
parent 7cb64ca0c4
commit 5a43dc81c7
24 changed files with 1577 additions and 39 deletions

38
.env.example Normal file
View file

@ -0,0 +1,38 @@
# Server Configuration
ENV=development
HOST=0.0.0.0
PORT=8085
LOG_LEVEL=INFO
# Security - IMPORTANT: Change this in production!
API_KEY=change-me-in-production
# Model Paths
# These can be local paths or model identifiers for HuggingFace Hub
IMAGE_MODEL_PATH=./models/stable-diffusion-v1-5
VIDEO_MODEL_PATH=./models/zeroscope-v2
SPEECH_MODEL_PATH=./models/tts
VISION_MODEL_PATH=./models/blip2
WHISPER_MODEL_PATH=./models/whisper
# Device Configuration
# Options: cuda, cpu, mps (for Apple Silicon)
DEVICE=cuda
# Image Generation Defaults
IMAGE_STEPS=4
IMAGE_WIDTH=512
IMAGE_HEIGHT=512
IMAGE_GPU_LAYERS=20
IMAGE_BATCH_SIZE=1
# Video Generation Defaults
VIDEO_FRAMES=24
VIDEO_FPS=8
VIDEO_WIDTH=320
VIDEO_HEIGHT=576
VIDEO_GPU_LAYERS=15
VIDEO_BATCH_SIZE=1
# Storage
OUTPUT_DIR=./outputs

341
README.md
View file

@ -1,41 +1,326 @@
# BotModels
Models in Python for General Bots AI demands.
# Environment
1. Install Visual Studio Code (VSCode);
2. Install VSCode Extension: Azure Functions;
3. Install VSCode Extension: Azure Machine Learning;
4. Install NodeJS;
5. Run npm install -g azure-functions-core-tools@3 --unsafe-perm true.
# Libraries
- TensorFlow;
- SciKit-Learn;
- Pandas;
- NumPy.
A multimodal AI service for General Bots providing image, video, audio generation, and vision/captioning capabilities. Works as a companion service to botserver, similar to how llama.cpp provides LLM capabilities.
![General Bots Models Services](https://raw.githubusercontent.com/GeneralBots/BotModels/master/BotModels.png)
# Tools
1. LLM Visualization https://bbycroft.net/llm
2.
## Features
# Education
- **Image Generation**: Generate images from text prompts using Stable Diffusion
- **Video Generation**: Create short videos from text descriptions using Zeroscope
- **Speech Synthesis**: Text-to-speech using Coqui TTS
- **Speech Recognition**: Audio transcription using OpenAI Whisper
- **Vision/Captioning**: Image and video description using BLIP2
1. https://pjreddie.com/courses/computer-vision/
2. https://arxiv.org/abs/2106.00245 (Adversarial VQA: A New Benchmark for Evaluating the Robustness of VQA Models)
## Quick Start
# References
### Installation
1. https://github.com/DenisDsh/VizWiz-VQA-PyTorch (VQA, Visual Question Answering)
```bash
# Clone the repository
cd botmodels
# Community
# Create virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
.\venv\Scripts\activate # Windows
1. https://github.com/aiformankind
# Install dependencies
pip install -r requirements.txt
```
# Resources
### Configuration
1. https://manaai.cn/
Copy the example environment file and configure:
```bash
cp .env.example .env
```
Edit `.env` with your settings:
```env
HOST=0.0.0.0
PORT=8085
API_KEY=your-secret-key
DEVICE=cuda
IMAGE_MODEL_PATH=./models/stable-diffusion-v1-5
VIDEO_MODEL_PATH=./models/zeroscope-v2
VISION_MODEL_PATH=./models/blip2
```
### Running the Server
```bash
# Development mode
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --reload
# Production mode
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --workers 4
# With HTTPS (production)
python -m uvicorn src.main:app --host 0.0.0.0 --port 8085 --ssl-keyfile key.pem --ssl-certfile cert.pem
```
## API Endpoints
All endpoints require the `X-API-Key` header for authentication.
### Image Generation
```http
POST /api/image/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "a cute cat playing with yarn",
"steps": 30,
"width": 512,
"height": 512,
"guidance_scale": 7.5,
"seed": 42
}
```
### Video Generation
```http
POST /api/video/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "a rocket launching into space",
"num_frames": 24,
"fps": 8,
"steps": 50
}
```
### Speech Generation (TTS)
```http
POST /api/speech/generate
Content-Type: application/json
X-API-Key: your-api-key
{
"prompt": "Hello, welcome to our service!",
"voice": "default",
"language": "en"
}
```
### Speech to Text
```http
POST /api/speech/totext
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <audio_file>
```
### Image Description
```http
POST /api/vision/describe
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <image_file>
prompt: "What is in this image?" (optional)
```
### Video Description
```http
POST /api/vision/describe_video
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <video_file>
num_frames: 8 (optional)
```
### Visual Question Answering
```http
POST /api/vision/vqa
Content-Type: multipart/form-data
X-API-Key: your-api-key
file: <image_file>
question: "How many people are in this image?"
```
### Health Check
```http
GET /api/health
```
## Integration with BotServer
BotModels integrates with botserver through HTTPS, providing multimodal capabilities to BASIC scripts.
### BotServer Configuration (config.csv)
```csv
key,value
botmodels-enabled,true
botmodels-host,0.0.0.0
botmodels-port,8085
botmodels-api-key,your-secret-key
botmodels-https,false
image-generator-model,../../../../data/diffusion/sd_turbo_f16.gguf
image-generator-steps,4
image-generator-width,512
image-generator-height,512
video-generator-model,../../../../data/diffusion/zeroscope_v2_576w
video-generator-frames,24
video-generator-fps,8
```
### BASIC Script Keywords
Once configured, these keywords are available in BASIC:
```basic
// Generate an image
file = IMAGE "a beautiful sunset over mountains"
SEND FILE TO user, file
// Generate a video
video = VIDEO "waves crashing on a beach"
SEND FILE TO user, video
// Generate speech
audio = AUDIO "Welcome to General Bots!"
SEND FILE TO user, audio
// Get image/video description
caption = SEE "/path/to/image.jpg"
TALK caption
```
## Architecture
```
┌─────────────┐ HTTPS ┌─────────────┐
│ botserver │ ────────────▶ │ botmodels │
│ (Rust) │ │ (Python) │
└─────────────┘ └─────────────┘
│ │
│ BASIC Keywords │ AI Models
│ - IMAGE │ - Stable Diffusion
│ - VIDEO │ - Zeroscope
│ - AUDIO │ - TTS/Whisper
│ - SEE │ - BLIP2
▼ ▼
┌─────────────┐ ┌─────────────┐
│ config │ │ outputs │
│ .csv │ │ (files) │
└─────────────┘ └─────────────┘
```
## Model Downloads
Models are downloaded automatically on first use, or you can pre-download them:
```bash
# Stable Diffusion
python -c "from diffusers import StableDiffusionPipeline; StableDiffusionPipeline.from_pretrained('runwayml/stable-diffusion-v1-5')"
# BLIP2 (Vision)
python -c "from transformers import Blip2Processor, Blip2ForConditionalGeneration; Blip2Processor.from_pretrained('Salesforce/blip2-opt-2.7b'); Blip2ForConditionalGeneration.from_pretrained('Salesforce/blip2-opt-2.7b')"
# Whisper (Speech-to-Text)
python -c "import whisper; whisper.load_model('base')"
```
## API Documentation
Interactive API documentation is available at:
- Swagger UI: `http://localhost:8085/api/docs`
- ReDoc: `http://localhost:8085/api/redoc`
## Development
### Project Structure
```
botmodels/
├── src/
│ ├── api/
│ │ ├── v1/
│ │ │ └── endpoints/
│ │ │ ├── image.py
│ │ │ ├── video.py
│ │ │ ├── speech.py
│ │ │ └── vision.py
│ │ └── dependencies.py
│ ├── core/
│ │ ├── config.py
│ │ └── logging.py
│ ├── schemas/
│ │ └── generation.py
│ ├── services/
│ │ ├── image_service.py
│ │ ├── video_service.py
│ │ ├── speech_service.py
│ │ └── vision_service.py
│ └── main.py
├── outputs/
├── models/
├── tests/
├── requirements.txt
└── README.md
```
### Running Tests
```bash
pytest tests/
```
## Security Notes
1. **Always use HTTPS in production**
2. Use strong, unique API keys
3. Restrict network access to the service
4. Consider running on a separate GPU server
5. Monitor resource usage and set appropriate limits
## Requirements
- Python 3.10+
- CUDA-capable GPU (recommended, 8GB+ VRAM)
- 16GB+ RAM
## Resources
### Education
- [Computer Vision Course](https://pjreddie.com/courses/computer-vision/)
- [Adversarial VQA Paper](https://arxiv.org/abs/2106.00245)
- [LLM Visualization](https://bbycroft.net/llm)
### References
- [VizWiz VQA PyTorch](https://github.com/DenisDsh/VizWiz-VQA-PyTorch)
- [Diffusers Library](https://github.com/huggingface/diffusers)
- [OpenAI Whisper](https://github.com/openai/whisper)
- [BLIP2](https://huggingface.co/Salesforce/blip2-opt-2.7b)
### Community
- [AI for Mankind](https://github.com/aiformankind)
- [ManaAI](https://manaai.cn/)
## License
See LICENSE file for details.

View file

@ -1,11 +1,44 @@
azure-functions
azure-storage-blob
azure-identity
tensorflow
scikit-learn
pandas
numpy
allennlp
allennlp-models
nltk
Flask>=1.0,<=1.1.2
# Core Framework
fastapi==0.115.0
uvicorn[standard]==0.30.6
pydantic==2.9.0
pydantic-settings==2.5.2
# Logging
structlog==25.5.0
python-json-logger==2.0.7
# Generation Libraries
diffusers==0.30.3
torch==2.5.1
torchaudio==2.5.1
torchvision==0.20.1
transformers==4.46.0
accelerate==1.1.1
safetensors==0.4.5
Pillow==11.0.0
# Audio Generation & Processing
openai-whisper==20231117
TTS==0.22.0
scipy==1.14.1
# Video Processing
imageio==2.36.0
imageio-ffmpeg==0.5.1
opencv-python==4.10.0.84
# Vision & Multimodal
timm==1.0.12
# HTTP & API
httpx==0.27.2
aiofiles==24.1.0
python-multipart==0.0.12
# Monitoring
prometheus-client==0.21.0
# Utils
python-dotenv==1.0.1
typing-extensions==4.12.2

0
src/__init__.py Normal file
View file

0
src/api/__init__.py Normal file
View file

7
src/api/dependencies.py Normal file
View file

@ -0,0 +1,7 @@
from fastapi import Header, HTTPException
from ..core.config import settings
async def verify_api_key(x_api_key: str = Header(...)):
if x_api_key != settings.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key

0
src/api/v1/__init__.py Normal file
View file

View file

@ -0,0 +1,3 @@
from . import image, speech, video, vision
__all__ = ["image", "video", "speech", "vision"]

View file

@ -0,0 +1,64 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
ImageDescribeResponse,
ImageGenerateRequest,
)
from ....services.image_service import get_image_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/image", tags=["Image"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_image(
request: ImageGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_image_service),
):
"""
Generate an image from a text prompt.
Args:
request: Image generation parameters including prompt, steps, dimensions, etc.
api_key: API key for authentication
service: Image service instance
Returns:
GenerationResponse with file path and generation time
"""
result = await service.generate(
prompt=request.prompt,
steps=request.steps,
width=request.width,
height=request.height,
guidance_scale=request.guidance_scale,
seed=request.seed,
)
return GenerationResponse(**result)
@router.post("/describe", response_model=ImageDescribeResponse)
async def describe_image(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_image_service),
):
"""
Get a description of an uploaded image.
Note: This endpoint is deprecated. Use /api/vision/describe instead
for full captioning capabilities.
Args:
file: Image file to describe
api_key: API key for authentication
service: Image service instance
Returns:
ImageDescribeResponse with description
"""
image_data = await file.read()
result = await service.describe(image_data)
return ImageDescribeResponse(**result)

View file

@ -0,0 +1,85 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
SpeechGenerateRequest,
SpeechToTextResponse,
)
from ....services.speech_service import get_speech_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/speech", tags=["Speech"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_speech(
request: SpeechGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Generate speech audio from text (Text-to-Speech).
Args:
request: Speech generation parameters including:
- prompt: Text to convert to speech
- voice: Voice model to use (optional, default: "default")
- language: Language code (optional, default: "en")
api_key: API key for authentication
service: Speech service instance
Returns:
GenerationResponse with file path to generated audio and generation time
"""
result = await service.generate(
prompt=request.prompt,
voice=request.voice,
language=request.language,
)
return GenerationResponse(**result)
@router.post("/totext", response_model=SpeechToTextResponse)
async def speech_to_text(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Convert speech audio to text (Speech-to-Text) using Whisper.
Supported audio formats: wav, mp3, m4a, flac, ogg
Args:
file: Audio file to transcribe
api_key: API key for authentication
service: Speech service instance
Returns:
SpeechToTextResponse with transcribed text, detected language, and confidence
"""
audio_data = await file.read()
result = await service.to_text(audio_data)
return SpeechToTextResponse(**result)
@router.post("/detect_language")
async def detect_language(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_speech_service),
):
"""
Detect the language of spoken audio using Whisper.
Args:
file: Audio file to analyze
api_key: API key for authentication
service: Speech service instance
Returns:
dict with detected language code and confidence score
"""
audio_data = await file.read()
result = await service.detect_language(audio_data)
return result

View file

@ -0,0 +1,63 @@
from fastapi import APIRouter, Depends, File, UploadFile
from ....schemas.generation import (
GenerationResponse,
VideoDescribeResponse,
VideoGenerateRequest,
)
from ....services.video_service import get_video_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/video", tags=["Video"])
@router.post("/generate", response_model=GenerationResponse)
async def generate_video(
request: VideoGenerateRequest,
api_key: str = Depends(verify_api_key),
service=Depends(get_video_service),
):
"""
Generate a video from a text prompt.
Args:
request: Video generation parameters including prompt, frames, fps, etc.
api_key: API key for authentication
service: Video service instance
Returns:
GenerationResponse with file path and generation time
"""
result = await service.generate(
prompt=request.prompt,
num_frames=request.num_frames,
fps=request.fps,
steps=request.steps,
seed=request.seed,
)
return GenerationResponse(**result)
@router.post("/describe", response_model=VideoDescribeResponse)
async def describe_video(
file: UploadFile = File(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_video_service),
):
"""
Get a description of an uploaded video.
Note: This endpoint is deprecated. Use /api/vision/describe_video instead
for full video captioning capabilities.
Args:
file: Video file to describe
api_key: API key for authentication
service: Video service instance
Returns:
VideoDescribeResponse with description and frame count
"""
video_data = await file.read()
result = await service.describe(video_data)
return VideoDescribeResponse(**result)

View file

@ -0,0 +1,63 @@
from typing import Optional
from fastapi import APIRouter, Depends, File, Form, UploadFile
from ....schemas.generation import ImageDescribeResponse, VideoDescribeResponse
from ....services.vision_service import get_vision_service
from ...dependencies import verify_api_key
router = APIRouter(prefix="/vision", tags=["Vision"])
@router.post("/describe", response_model=ImageDescribeResponse)
async def describe_image(
file: UploadFile = File(...),
prompt: Optional[str] = Form(None),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Get a caption/description for an image.
Optionally provide a prompt to guide the description.
"""
image_data = await file.read()
result = await service.describe_image(image_data, prompt)
return ImageDescribeResponse(**result)
@router.post("/describe_video", response_model=VideoDescribeResponse)
async def describe_video(
file: UploadFile = File(...),
num_frames: int = Form(8),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Get a description for a video by sampling and analyzing frames.
Args:
file: Video file (mp4, avi, mov, webm, mkv)
num_frames: Number of frames to sample for analysis (default: 8)
"""
video_data = await file.read()
result = await service.describe_video(video_data, num_frames)
return VideoDescribeResponse(**result)
@router.post("/vqa")
async def visual_question_answering(
file: UploadFile = File(...),
question: str = Form(...),
api_key: str = Depends(verify_api_key),
service=Depends(get_vision_service),
):
"""
Visual Question Answering - ask a question about an image.
Args:
file: Image file
question: Question to ask about the image
"""
image_data = await file.read()
result = await service.answer_question(image_data, question)
return ImageDescribeResponse(**result)

0
src/core/__init__.py Normal file
View file

64
src/core/config.py Normal file
View file

@ -0,0 +1,64 @@
from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
env: str = "development"
host: str = "0.0.0.0"
port: int = 8085
log_level: str = "INFO"
api_v1_prefix: str = "/api"
project_name: str = "BotModels API"
version: str = "2.0.0"
api_key: str = "change-me"
# Image generation model
image_model_path: str = "./models/stable-diffusion-v1-5"
image_steps: int = 4
image_width: int = 512
image_height: int = 512
image_gpu_layers: int = 20
image_batch_size: int = 1
# Video generation model
video_model_path: str = "./models/zeroscope-v2"
video_frames: int = 24
video_fps: int = 8
video_width: int = 320
video_height: int = 576
video_gpu_layers: int = 15
video_batch_size: int = 1
# Speech/TTS model
speech_model_path: str = "./models/tts"
# Vision model (BLIP2 for captioning)
vision_model_path: str = "./models/blip2"
# Whisper model for speech-to-text
whisper_model_path: str = "./models/whisper"
# Device configuration
device: str = "cuda"
# Output directory for generated files
output_dir: Path = Path("./outputs")
@property
def is_production(self) -> bool:
return self.env == "production"
settings = Settings()
settings.output_dir.mkdir(parents=True, exist_ok=True)
(settings.output_dir / "images").mkdir(exist_ok=True)
(settings.output_dir / "videos").mkdir(exist_ok=True)
(settings.output_dir / "audio").mkdir(exist_ok=True)

33
src/core/logging.py Normal file
View file

@ -0,0 +1,33 @@
import structlog
from .config import settings
def setup_logging():
if settings.is_production:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(structlog.stdlib.logging, settings.log_level.upper())
),
)
else:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(colors=True)
],
)
def get_logger(name: str = None):
logger = structlog.get_logger()
if name:
logger = logger.bind(service=name)
return logger
setup_logging()

78
src/main.py Normal file
View file

@ -0,0 +1,78 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from .api.v1.endpoints import image, speech, video, vision
from .core.config import settings
from .core.logging import get_logger
from .services.image_service import get_image_service
from .services.speech_service import get_speech_service
from .services.video_service import get_video_service
from .services.vision_service import get_vision_service
logger = get_logger("main")
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting BotModels API", version=settings.version)
try:
get_image_service().initialize()
get_video_service().initialize()
get_speech_service().initialize()
get_vision_service().initialize()
logger.info("All services initialized")
except Exception as e:
logger.error("Failed to initialize services", error=str(e))
yield
logger.info("Shutting down BotModels API")
app = FastAPI(
title=settings.project_name,
version=settings.version,
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(image.router, prefix=settings.api_v1_prefix)
app.include_router(video.router, prefix=settings.api_v1_prefix)
app.include_router(speech.router, prefix=settings.api_v1_prefix)
app.include_router(vision.router, prefix=settings.api_v1_prefix)
app.mount("/outputs", StaticFiles(directory="outputs"), name="outputs")
@app.get("/")
async def root():
return JSONResponse(
{
"service": settings.project_name,
"version": settings.version,
"status": "running",
"docs": "/api/docs",
}
)
@app.get("/api/health")
async def health():
return {"status": "healthy", "version": settings.version, "device": settings.device}
if __name__ == "__main__":
import uvicorn
uvicorn.run("src.main:app", host=settings.host, port=settings.port, reload=True)

0
src/schemas/__init__.py Normal file
View file

57
src/schemas/generation.py Normal file
View file

@ -0,0 +1,57 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class GenerationRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=2000)
seed: Optional[int] = None
class ImageGenerateRequest(GenerationRequest):
steps: Optional[int] = Field(30, ge=1, le=150)
width: Optional[int] = Field(512, ge=64, le=2048)
height: Optional[int] = Field(512, ge=64, le=2048)
guidance_scale: Optional[float] = Field(7.5, ge=1.0, le=20.0)
class VideoGenerateRequest(GenerationRequest):
num_frames: Optional[int] = Field(24, ge=8, le=128)
fps: Optional[int] = Field(8, ge=1, le=60)
steps: Optional[int] = Field(50, ge=10, le=100)
class SpeechGenerateRequest(GenerationRequest):
voice: Optional[str] = Field("default", description="Voice model")
language: Optional[str] = Field("en", description="Language code")
class GenerationResponse(BaseModel):
status: str
file_path: Optional[str] = None
generation_time: Optional[float] = None
error: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
class DescribeRequest(BaseModel):
file_data: bytes
class ImageDescribeResponse(BaseModel):
description: str
confidence: Optional[float] = None
generation_time: Optional[float] = None
class VideoDescribeResponse(BaseModel):
description: str
frame_count: int
generation_time: Optional[float] = None
class SpeechToTextResponse(BaseModel):
text: str
language: Optional[str] = None
confidence: Optional[float] = None

15
src/services/__init__.py Normal file
View file

@ -0,0 +1,15 @@
from .image_service import ImageService, get_image_service
from .speech_service import SpeechService, get_speech_service
from .video_service import VideoService, get_video_service
from .vision_service import VisionService, get_vision_service
__all__ = [
"ImageService",
"get_image_service",
"VideoService",
"get_video_service",
"SpeechService",
"get_speech_service",
"VisionService",
"get_vision_service",
]

View file

@ -0,0 +1,111 @@
import time
from datetime import datetime
from typing import Optional
import torch
from diffusers import DPMSolverMultistepScheduler, StableDiffusionPipeline
from PIL import Image
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("image_service")
class ImageService:
def __init__(self):
self.pipeline: Optional[StableDiffusionPipeline] = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading Stable Diffusion model", path=settings.image_model_path)
try:
self.pipeline = StableDiffusionPipeline.from_pretrained(
settings.image_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
safety_checker=None,
)
self.pipeline.scheduler = DPMSolverMultistepScheduler.from_config(
self.pipeline.scheduler.config
)
self.pipeline = self.pipeline.to(self.device)
if self.device == "cuda":
self.pipeline.enable_attention_slicing()
self._initialized = True
logger.info("Stable Diffusion loaded successfully")
except Exception as e:
logger.error("Failed to load model", error=str(e))
raise
async def generate(
self,
prompt: str,
steps: Optional[int] = None,
width: Optional[int] = None,
height: Optional[int] = None,
guidance_scale: Optional[float] = None,
seed: Optional[int] = None,
) -> dict:
if not self._initialized:
self.initialize()
# Use config defaults if not specified
actual_steps = steps if steps is not None else settings.image_steps
actual_width = width if width is not None else settings.image_width
actual_height = height if height is not None else settings.image_height
actual_guidance = guidance_scale if guidance_scale is not None else 7.5
start = time.time()
generator = (
torch.Generator(device=self.device).manual_seed(seed) if seed else None
)
logger.info(
"Generating image",
prompt=prompt[:50],
steps=actual_steps,
width=actual_width,
height=actual_height,
)
output = self.pipeline(
prompt=prompt,
num_inference_steps=actual_steps,
guidance_scale=actual_guidance,
width=actual_width,
height=actual_height,
generator=generator,
)
image: Image.Image = output.images[0]
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.png"
output_path = settings.output_dir / "images" / filename
image.save(output_path)
generation_time = time.time() - start
logger.info("Image generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/images/{filename}",
"generation_time": generation_time,
}
async def describe(self, image_data: bytes) -> dict:
# Placeholder for backward compatibility
# Use vision_service for actual image description
return {"description": "Use /api/vision/describe endpoint", "confidence": 0.0}
_service = None
def get_image_service():
global _service
if _service is None:
_service = ImageService()
return _service

View file

@ -0,0 +1,229 @@
import io
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("speech_service")
class SpeechService:
def __init__(self):
self.tts_model = None
self.whisper_model = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading speech models")
try:
# Load TTS model (Coqui TTS)
self._load_tts_model()
# Load Whisper model for speech-to-text
self._load_whisper_model()
self._initialized = True
logger.info("Speech models loaded successfully")
except Exception as e:
logger.error("Failed to load speech models", error=str(e))
# Don't raise - allow service to run with partial functionality
logger.warning("Speech service will have limited functionality")
def _load_tts_model(self):
"""Load TTS model for text-to-speech generation"""
try:
from TTS.api import TTS
# Use a fast, high-quality model
self.tts_model = TTS(
model_name="tts_models/en/ljspeech/tacotron2-DDC",
progress_bar=False,
gpu=(self.device == "cuda"),
)
logger.info("TTS model loaded")
except Exception as e:
logger.warning("TTS model not available", error=str(e))
self.tts_model = None
def _load_whisper_model(self):
"""Load Whisper model for speech-to-text"""
try:
import whisper
# Use base model for balance of speed and accuracy
model_size = "base"
if Path(settings.whisper_model_path).exists():
self.whisper_model = whisper.load_model(
model_size, download_root=settings.whisper_model_path
)
else:
self.whisper_model = whisper.load_model(model_size)
logger.info("Whisper model loaded", model=model_size)
except Exception as e:
logger.warning("Whisper model not available", error=str(e))
self.whisper_model = None
async def generate(
self,
prompt: str,
voice: Optional[str] = None,
language: Optional[str] = None,
) -> dict:
"""Generate speech audio from text"""
if not self._initialized:
self.initialize()
start = time.time()
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.wav"
output_path = settings.output_dir / "audio" / filename
if self.tts_model is None:
logger.error("TTS model not available")
return {
"status": "error",
"error": "TTS model not initialized",
"file_path": None,
"generation_time": time.time() - start,
}
try:
logger.info(
"Generating speech",
text_length=len(prompt),
voice=voice,
language=language,
)
# Generate speech
self.tts_model.tts_to_file(
text=prompt,
file_path=str(output_path),
)
generation_time = time.time() - start
logger.info("Speech generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/audio/{filename}",
"generation_time": generation_time,
}
except Exception as e:
logger.error("Speech generation failed", error=str(e))
return {
"status": "error",
"error": str(e),
"file_path": None,
"generation_time": time.time() - start,
}
async def to_text(self, audio_data: bytes) -> dict:
"""Convert speech audio to text using Whisper"""
if not self._initialized:
self.initialize()
start = time.time()
if self.whisper_model is None:
logger.error("Whisper model not available")
return {
"text": "",
"language": None,
"confidence": 0.0,
"error": "Whisper model not initialized",
}
try:
# Save audio to temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
logger.info("Transcribing audio", file_size=len(audio_data))
# Transcribe
result = self.whisper_model.transcribe(tmp_path)
# Clean up temp file
import os
os.unlink(tmp_path)
transcription_time = time.time() - start
logger.info(
"Audio transcribed",
text_length=len(result["text"]),
language=result.get("language"),
time=transcription_time,
)
return {
"text": result["text"].strip(),
"language": result.get("language", "en"),
"confidence": 0.95, # Whisper doesn't provide confidence directly
}
except Exception as e:
logger.error("Speech-to-text failed", error=str(e))
return {
"text": "",
"language": None,
"confidence": 0.0,
"error": str(e),
}
async def detect_language(self, audio_data: bytes) -> dict:
"""Detect the language of spoken audio"""
if not self._initialized:
self.initialize()
if self.whisper_model is None:
return {"language": None, "error": "Whisper model not initialized"}
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_data)
tmp_path = tmp.name
import whisper
# Load audio and detect language
audio = whisper.load_audio(tmp_path)
audio = whisper.pad_or_trim(audio)
mel = whisper.log_mel_spectrogram(audio).to(self.whisper_model.device)
_, probs = self.whisper_model.detect_language(mel)
import os
os.unlink(tmp_path)
detected_lang = max(probs, key=probs.get)
confidence = probs[detected_lang]
return {
"language": detected_lang,
"confidence": confidence,
}
except Exception as e:
logger.error("Language detection failed", error=str(e))
return {"language": None, "error": str(e)}
_service = None
def get_speech_service():
global _service
if _service is None:
_service = SpeechService()
return _service

View file

@ -0,0 +1,106 @@
import time
from datetime import datetime
from typing import Optional
import imageio
import torch
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("video_service")
class VideoService:
def __init__(self):
self.pipeline = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading video model", path=settings.video_model_path)
try:
from diffusers import DiffusionPipeline
self.pipeline = DiffusionPipeline.from_pretrained(
settings.video_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
)
self.pipeline = self.pipeline.to(self.device)
self._initialized = True
logger.info("Video model loaded successfully")
except Exception as e:
logger.error("Failed to load video model", error=str(e))
raise
async def generate(
self,
prompt: str,
num_frames: Optional[int] = None,
fps: Optional[int] = None,
steps: Optional[int] = None,
seed: Optional[int] = None,
) -> dict:
if not self._initialized:
self.initialize()
# Use config defaults if not specified
actual_frames = num_frames if num_frames is not None else settings.video_frames
actual_fps = fps if fps is not None else settings.video_fps
actual_steps = steps if steps is not None else 50
start = time.time()
generator = (
torch.Generator(device=self.device).manual_seed(seed) if seed else None
)
logger.info(
"Generating video",
prompt=prompt[:50],
frames=actual_frames,
fps=actual_fps,
steps=actual_steps,
)
output = self.pipeline(
prompt=prompt,
num_frames=actual_frames,
num_inference_steps=actual_steps,
generator=generator,
)
frames = output.frames[0]
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{hash(prompt) & 0xFFFFFF:06x}.mp4"
output_path = settings.output_dir / "videos" / filename
imageio.mimsave(output_path, frames, fps=actual_fps, codec="libx264")
generation_time = time.time() - start
logger.info("Video generated", file=filename, time=generation_time)
return {
"status": "completed",
"file_path": f"/outputs/videos/{filename}",
"generation_time": generation_time,
}
async def describe(self, video_data: bytes) -> dict:
# Placeholder for backward compatibility
# Use vision_service for actual video description
return {
"description": "Use /api/vision/describe_video endpoint",
"frame_count": 0,
}
_service = None
def get_video_service():
global _service
if _service is None:
_service = VideoService()
return _service

View file

@ -0,0 +1,204 @@
import io
import time
from datetime import datetime
from typing import Optional
import torch
from PIL import Image
from ..core.config import settings
from ..core.logging import get_logger
logger = get_logger("vision_service")
class VisionService:
def __init__(self):
self.model = None
self.processor = None
self.device = settings.device
self._initialized = False
def initialize(self):
if self._initialized:
return
logger.info("Loading vision model (BLIP2)")
try:
from transformers import Blip2ForConditionalGeneration, Blip2Processor
self.processor = Blip2Processor.from_pretrained(settings.vision_model_path)
self.model = Blip2ForConditionalGeneration.from_pretrained(
settings.vision_model_path,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
)
self.model = self.model.to(self.device)
self._initialized = True
logger.info("Vision model loaded")
except Exception as e:
logger.error("Failed to load vision model", error=str(e))
# Don't raise - allow service to run without vision
logger.warning("Vision service will return placeholder responses")
async def describe_image(
self, image_data: bytes, prompt: Optional[str] = None
) -> dict:
"""Generate a caption/description for an image"""
start = time.time()
if not self._initialized or self.model is None:
# Return placeholder if model not loaded
return {
"description": "Vision model not initialized. Please check model path configuration.",
"confidence": 0.0,
"generation_time": time.time() - start,
}
try:
# Load image from bytes
image = Image.open(io.BytesIO(image_data)).convert("RGB")
# Prepare inputs
if prompt:
inputs = self.processor(image, text=prompt, return_tensors="pt").to(
self.device
)
else:
inputs = self.processor(image, return_tensors="pt").to(self.device)
# Generate caption
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=100, num_beams=5, early_stopping=True
)
# Decode the generated text
description = self.processor.decode(
generated_ids[0], skip_special_tokens=True
)
return {
"description": description.strip(),
"confidence": 0.85, # BLIP2 doesn't provide confidence scores directly
"generation_time": time.time() - start,
}
except Exception as e:
logger.error("Image description failed", error=str(e))
return {
"description": f"Error describing image: {str(e)}",
"confidence": 0.0,
"generation_time": time.time() - start,
}
async def describe_video(self, video_data: bytes, num_frames: int = 8) -> dict:
"""Generate a description for a video by sampling frames"""
start = time.time()
if not self._initialized or self.model is None:
return {
"description": "Vision model not initialized. Please check model path configuration.",
"frame_count": 0,
"generation_time": time.time() - start,
}
try:
import tempfile
import cv2
import numpy as np
# Save video to temp file
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
tmp.write(video_data)
tmp_path = tmp.name
# Open video and extract frames
cap = cv2.VideoCapture(tmp_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if total_frames == 0:
cap.release()
return {
"description": "Could not read video frames",
"frame_count": 0,
"generation_time": time.time() - start,
}
# Sample frames evenly throughout the video
frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(Image.fromarray(frame_rgb))
cap.release()
# Clean up temp file
import os
os.unlink(tmp_path)
if not frames:
return {
"description": "No frames could be extracted from video",
"frame_count": 0,
"generation_time": time.time() - start,
}
# Generate descriptions for each sampled frame
descriptions = []
for frame in frames:
inputs = self.processor(frame, return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**inputs, max_new_tokens=50, num_beams=3, early_stopping=True
)
desc = self.processor.decode(generated_ids[0], skip_special_tokens=True)
descriptions.append(desc.strip())
# Combine descriptions into a coherent summary
# Use the most common elements or create a timeline
unique_descriptions = list(
dict.fromkeys(descriptions)
) # Remove duplicates preserving order
if len(unique_descriptions) == 1:
combined = unique_descriptions[0]
else:
combined = "Video shows: " + "; ".join(unique_descriptions[:4])
return {
"description": combined,
"frame_count": len(frames),
"generation_time": time.time() - start,
}
except Exception as e:
logger.error("Video description failed", error=str(e))
return {
"description": f"Error describing video: {str(e)}",
"frame_count": 0,
"generation_time": time.time() - start,
}
async def answer_question(self, image_data: bytes, question: str) -> dict:
"""Visual question answering - ask a question about an image"""
# Use describe_image with the question as a prompt
return await self.describe_image(image_data, prompt=question)
_service = None
def get_vision_service():
global _service
if _service is None:
_service = VisionService()
return _service

0
src/utils/__init__.py Normal file
View file