Python Development
The model service provides AI capabilities for video summarization, object detection, tracking, and ontology augmentation. Built with Python 3.12, FastAPI 0.110+, and PyTorch 2.5+, it uses SGLang 0.4+ for primary inference with vLLM 0.6+ fallback.
Development Environment
Prerequisites
- Python 3.12+
- CUDA 12.1+ (for GPU mode)
- Redis 7 (for job coordination)
- FFmpeg (for video processing)
Initial Setup
cd model-service
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
Development Installation
Install with dev dependencies:
pip install -e ".[dev]"
This installs pytest, httpx, mypy, ruff, and other development tools.
Configuration
Create .env
file:
DEVICE=cpu # or cuda for GPU
BUILD_MODE=minimal # or full for production inference
REDIS_URL=redis://localhost:6379
LOG_LEVEL=INFO
Start Development Server
uvicorn src.main:app --reload --port 8000
Server starts at http://localhost:8000
with auto-reload on file changes.
Project Structure
model-service/
├── src/
│ ├── main.py # FastAPI application entry point
│ ├── routes.py # API endpoint definitions
│ ├── summarization.py # Video summarization logic
│ ├── detection.py # Object detection logic
│ ├── tracking.py # Video tracking logic
│ ├── augmentation.py # Ontology augmentation logic
│ ├── model_manager.py # Model loading and caching
│ ├── llm_loader.py # LLM model loader
│ ├── vlm_loader.py # VLM model loader
│ ├── detection_loader.py # Detection model loader
│ ├── tracking_loader.py # Tracking model loader
│ ├── video_utils.py # Video processing utilities
│ └── otel_config.py # OpenTelemetry configuration
├── config/
│ └── models.yaml # Model configuration
├── test/
│ ├── test_routes.py # API endpoint tests
│ ├── test_model_manager.py
│ ├── test_llm_loader.py
│ ├── test_vlm_loader.py
│ ├── test_detection_loader.py
│ ├── test_tracking_loader.py
│ └── test_video_utils.py
└── requirements.txt # Production dependencies
Running the Model Service
Development Mode
uvicorn src.main:app --reload --port 8000
FastAPI auto-reloads when source files change.
Production Mode
uvicorn src.main:app --host 0.0.0.0 --port 8000 --workers 4
Runs with multiple worker processes for production traffic.
Testing
pytest # Run all tests
pytest --cov=src # Run with coverage
pytest test/test_routes.py -v # Run specific test file with verbose output
pytest -k "test_summarize" -v # Run tests matching pattern
pytest --tb=short # Short traceback format
Type Checking
mypy src/ # Type check all source files
mypy src/main.py # Type check single file
Linting
ruff check . # Check code style
ruff check --fix . # Auto-fix issues where possible
Adding New Model Loaders
Step 1: Create Loader Module
Create src/my_model_loader.py
:
from typing import Any, Optional
import torch
from transformers import AutoModel, AutoProcessor
class MyModelLoader:
"""Loads and manages my custom model.
Attributes:
config: Model configuration from models.yaml
device: Target device (cpu or cuda)
"""
def __init__(self, config: dict[str, Any], device: str = "cpu"):
self.config = config
self.device = device
self.model: Optional[Any] = None
self.processor: Optional[Any] = None
def load(self) -> tuple[Any, Any]:
"""Load model and processor.
Returns:
Tuple of (model, processor)
"""
if self.model is not None:
return self.model, self.processor
model_id = self.config["model_id"]
self.processor = AutoProcessor.from_pretrained(model_id)
self.model = AutoModel.from_pretrained(
model_id,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map=self.device
)
if self.device == "cuda":
self.model = self.model.cuda()
return self.model, self.processor
def unload(self) -> None:
"""Unload model from memory."""
if self.model is not None:
del self.model
del self.processor
self.model = None
self.processor = None
if self.device == "cuda":
torch.cuda.empty_cache()
Step 2: Register in Model Manager
In src/model_manager.py
:
from src.my_model_loader import MyModelLoader
class ModelManager:
def __init__(self, config_path: str, device: str = "cpu"):
# ... existing code ...
self.my_loader: Optional[MyModelLoader] = None
def get_my_model(self) -> tuple[Any, Any]:
"""Get my model and processor.
Returns:
Tuple of (model, processor)
"""
if self.my_loader is None:
config = self.config["my_task"]["my_model"]
self.my_loader = MyModelLoader(config, self.device)
return self.my_loader.load()
Step 3: Add Tests
Create test/test_my_model_loader.py
:
import pytest
from src.my_model_loader import MyModelLoader
def test_loader_initialization():
"""Test loader initializes with config."""
config = {
"model_id": "test/model",
"device": "cpu"
}
loader = MyModelLoader(config, device="cpu")
assert loader.config == config
assert loader.device == "cpu"
assert loader.model is None
@pytest.mark.asyncio
async def test_load_model():
"""Test model loading."""
config = {
"model_id": "test/model"
}
loader = MyModelLoader(config, device="cpu")
model, processor = loader.load()
assert model is not None
assert processor is not None
Adding New FastAPI Endpoints
Step 1: Define Pydantic Models
In src/routes.py
or separate schema file:
from pydantic import BaseModel, Field
class MyTaskRequest(BaseModel):
"""Request for my task.
Attributes:
input_data: Input data for processing
config: Optional configuration overrides
"""
input_data: str = Field(..., description="Input data to process")
config: dict[str, Any] = Field(default_factory=dict)
class MyTaskResponse(BaseModel):
"""Response from my task.
Attributes:
result: Processing result
metadata: Additional metadata
"""
result: str
metadata: dict[str, Any] = Field(default_factory=dict)
Step 2: Implement Endpoint
In src/routes.py
:
from fastapi import APIRouter, HTTPException
from opentelemetry import trace
router = APIRouter()
tracer = trace.get_tracer(__name__)
@router.post("/my-task", response_model=MyTaskResponse)
async def process_my_task(request: MyTaskRequest) -> MyTaskResponse:
"""Process custom task.
Args:
request: Task request with input data
Returns:
Task response with results
Raises:
HTTPException: If processing fails
"""
with tracer.start_as_current_span("my_task_processing"):
try:
# Get model from manager
model, processor = app.state.model_manager.get_my_model()
# Process input
inputs = processor(request.input_data, return_tensors="pt")
outputs = model(**inputs)
# Format response
result = process_outputs(outputs)
return MyTaskResponse(
result=result,
metadata={"config": request.config}
)
except Exception as e:
logger.error(f"Task processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
Step 3: Register Router
In src/main.py
:
from src.routes import router
app = FastAPI()
app.include_router(router, prefix="/api")
Step 4: Add Tests
In test/test_routes.py
:
import pytest
from httpx import AsyncClient
from src.main import app
@pytest.mark.asyncio
async def test_my_task_success():
"""Test my task endpoint with valid input."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/api/my-task",
json={
"input_data": "test input",
"config": {}
}
)
assert response.status_code == 200
data = response.json()
assert "result" in data
assert "metadata" in data
@pytest.mark.asyncio
async def test_my_task_invalid_input():
"""Test my task endpoint with invalid input."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/api/my-task",
json={}
)
assert response.status_code == 422 # Validation error
Working with Video Processing
Frame Extraction
from src.video_utils import extract_frames
def process_video(video_path: str, sample_rate: int = 30) -> list[np.ndarray]:
"""Extract frames from video.
Args:
video_path: Path to video file
sample_rate: Extract every Nth frame
Returns:
List of frame arrays
"""
frames = extract_frames(video_path, sample_rate=sample_rate)
return frames
Frame Sampling Strategies
import numpy as np
def uniform_sampling(total_frames: int, num_samples: int) -> list[int]:
"""Sample frames uniformly across video.
Args:
total_frames: Total number of frames
num_samples: Number of frames to sample
Returns:
List of frame indices
"""
if num_samples >= total_frames:
return list(range(total_frames))
indices = np.linspace(0, total_frames - 1, num_samples, dtype=int)
return indices.tolist()
def keyframe_sampling(video_path: str, threshold: float = 30.0) -> list[int]:
"""Sample keyframes based on scene changes.
Args:
video_path: Path to video file
threshold: Scene change detection threshold
Returns:
List of keyframe indices
"""
# Implementation uses scene detection
pass
OpenTelemetry Instrumentation
Adding Spans
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def my_function(data: str) -> str:
"""Process data with tracing.
Args:
data: Input data
Returns:
Processed result
"""
with tracer.start_as_current_span("my_function") as span:
span.set_attribute("input_length", len(data))
result = data.upper()
span.set_attribute("output_length", len(result))
return result
Adding Metrics
from opentelemetry import metrics
meter = metrics.get_meter(__name__)
# Create counter
request_counter = meter.create_counter(
"fovea_my_task_requests_total",
description="Total my task requests",
unit="1"
)
# Create histogram
duration_histogram = meter.create_histogram(
"fovea_my_task_duration_seconds",
description="My task processing duration",
unit="s"
)
# Use in code
request_counter.add(1, {"status": "success"})
duration_histogram.record(0.5)
Error Handling
Custom Exceptions
class ModelLoadError(Exception):
"""Raised when model loading fails."""
pass
class InferenceError(Exception):
"""Raised when inference fails."""
pass
class VideoProcessingError(Exception):
"""Raised when video processing fails."""
pass
Exception Handling in Endpoints
@router.post("/process")
async def process_endpoint(request: ProcessRequest) -> ProcessResponse:
"""Process request with proper error handling."""
try:
result = await process_data(request.data)
return ProcessResponse(result=result)
except ModelLoadError as e:
logger.error(f"Model load failed: {e}")
raise HTTPException(status_code=503, detail="Model unavailable")
except InferenceError as e:
logger.error(f"Inference failed: {e}")
raise HTTPException(status_code=500, detail="Inference failed")
except VideoProcessingError as e:
logger.error(f"Video processing failed: {e}")
raise HTTPException(status_code=400, detail="Invalid video")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
Debugging
VS Code Configuration
Create .vscode/launch.json
:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: FastAPI",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"src.main:app",
"--reload",
"--port",
"8000"
],
"jinja": true,
"justMyCode": false,
"envFile": "${workspaceFolder}/model-service/.env"
}
]
}
Logging
Use Python standard logging:
import logging
logger = logging.getLogger(__name__)
def my_function():
"""Function with logging."""
logger.info("Starting processing")
logger.debug(f"Debug info: {data}")
try:
result = process()
logger.info(f"Processing complete: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True)
raise
Common Development Tasks
Adding New Model to Config
Edit config/models.yaml
:
my_task:
my_model:
model_id: "organization/model-name"
device: "auto"
quantization: "int8"
memory_required_gb: 8
Managing Dependencies
# Add new dependency
pip install package-name
pip freeze > requirements.txt
# Install from requirements
pip install -r requirements.txt
# Update specific package
pip install --upgrade package-name
Running Tests with Coverage
pytest --cov=src --cov-report=html
open htmlcov/index.html # View coverage report
Type Checking Configuration
Create mypy.ini
:
[mypy]
python_version = 3.12
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
[mypy-transformers.*]
ignore_missing_imports = True
[mypy-torch.*]
ignore_missing_imports = True
Troubleshooting
CUDA Out of Memory
Reduce batch size or model size:
# Use smaller model variant
config["model_id"] = "model-small"
# Enable quantization
model = AutoModel.from_pretrained(
model_id,
load_in_8bit=True,
device_map="auto"
)
# Clear cache
torch.cuda.empty_cache()
Model Loading Timeout
Increase timeout or use cached models:
# Set environment variable
export HF_HOME=/path/to/cache
# Pre-download models
from transformers import AutoModel
AutoModel.from_pretrained("model-id")
Import Errors
Verify installation:
pip list # Check installed packages
pip install -e ".[dev]" # Reinstall with dev dependencies
Test Failures
Run with verbose output:
pytest -v -s # Verbose with print statements
pytest --tb=long # Full traceback
pytest -x # Stop on first failure