Job Queues and Async Processing
FOVEA uses BullMQ and Redis for asynchronous job processing. This architecture enables long-running AI tasks like video summarization, claim extraction, and object tracking to run in the background without blocking the API.
Overview
Job queues decouple request handling from task execution:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────────┐
│ Browser │────────▶│ Backend │────────▶│ Redis │────────▶│ Queue Worker │
└─────────┘ │ API │ │ Queue │ │ (Backend) │
│ └─────────┘ └─────────┘ └──────────────┘
│ │
│ ▼
│ ┌──────────────┐
└──────────────── Poll Status ─────────────────────────│ Model Service│
└──────────────┘
- Browser sends request to backend API
- Backend creates a job in Redis queue and returns job ID
- Queue Worker picks up job and processes it
- Browser polls job status until completion
- Model Service performs AI inference when invoked by worker
Architecture
Components
BullMQ: TypeScript job queue library
- Job creation and scheduling
- Worker processes for job execution
- Progress tracking and status updates
- Retry logic and error handling
Redis: In-memory data store
- Stores job queues and data
- Pub/sub for real-time updates
- Fast, atomic operations
- Persistence for job history
Backend Workers: Node.js processes
- Execute jobs from queues
- Call model service APIs
- Update job status in Redis
- Handle errors and retries
Queue Types
FOVEA uses multiple queues for different task types:
1. Video Summarization Queue
- Purpose: Generate text summaries from videos
- Job data: Video ID, persona context, model configuration
- Duration: 30-90 seconds per video
- Priority: Medium
- Retry: 3 attempts
2. Claim Extraction Queue
- Purpose: Extract atomic claims from video summaries
- Job data: Summary ID, extraction strategy, confidence threshold
- Duration: 20-60 seconds per summary
- Priority: Medium
- Retry: 3 attempts
3. Claim Synthesis Queue
- Purpose: Generate synthesized claims from multiple sources
- Job data: Multiple summary IDs, synthesis parameters
- Duration: 30-120 seconds
- Priority: Low
- Retry: 2 attempts
Job Lifecycle
1. Job Creation
When a user initiates a task:
// Backend creates job
const job = await videoSummarizationQueue.add('summarize', {
videoId: 'video-123',
personaRole: 'Sports Analyst',
informationNeed: 'Track player movements',
modelId: 'qwen-2-5-vl-7b'
})
// Return job ID to client
return { jobId: job.id, status: 'queued' }
2. Job Queuing
Job enters Redis queue:
Status: queued
Position: #3 in queue
Created: 2025-01-20T10:30:00Z
Data: { videoId, personaRole, ... }
3. Job Processing
Worker picks up job:
// Worker processes job
worker.on('active', async (job) => {
// Update status
await job.updateProgress(0)
// Call model service
const result = await modelService.summarizeVideo(job.data)
// Update progress
await job.updateProgress(50)
// Save result to database
await saveToDatabase(result)
// Complete job
await job.updateProgress(100)
return result
})
Job status updates:
Status: active → progress: 0%
Status: active → progress: 50%
Status: active → progress: 100%
Status: completed
4. Job Completion
Worker marks job complete:
Status: completed
Result: { summaryId: 'summary-456', ... }
Completed: 2025-01-20T10:31:30Z
Duration: 90 seconds
5. Error Handling
If job fails:
Status: failed
Error: "Model service timeout"
Attempt: 1 of 3
Retry: 2025-01-20T10:35:00Z (5 min delay)
After retries exhausted:
Status: failed
Error: "Model service timeout after 3 attempts"
Failed: 2025-01-20T10:45:00Z
Client-Side Integration
Polling Job Status
Frontend polls job status until completion:
// Submit job
const { jobId } = await apiClient.post('/api/summaries/generate', data)
// Poll status
const pollInterval = setInterval(async () => {
const status = await apiClient.get(`/api/jobs/${jobId}`)
if (status.state === 'completed') {
clearInterval(pollInterval)
// Use result
console.log('Summary created:', status.result)
} else if (status.state === 'failed') {
clearInterval(pollInterval)
// Handle error
console.error('Job failed:', status.error)
} else {
// Update progress UI
updateProgress(status.progress || 0)
}
}, 1000) // Poll every second
Job Status Response
{
"jobId": "job-123",
"state": "active",
"progress": 65,
"data": {
"videoId": "video-123",
"personaRole": "Sports Analyst"
},
"result": null,
"error": null,
"createdAt": "2025-01-20T10:30:00Z",
"startedAt": "2025-01-20T10:30:15Z",
"completedAt": null
}
States:
queued: Waiting in queueactive: Currently processingcompleted: Successfully finishedfailed: Error occurreddelayed: Scheduled for future execution
Configuration
Queue Options
// Queue configuration
const queueOptions = {
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 5000, // Start with 5s, then 10s, 20s
},
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 50, // Keep last 50 failed jobs
}
}
Worker Options
// Worker configuration
const workerOptions = {
connection: { /* redis connection */ },
concurrency: 5, // Process 5 jobs concurrently
limiter: {
max: 10, // Max 10 jobs per...
duration: 60000, // ...60 seconds
}
}
Monitoring and Observability
Queue Metrics
Monitor queue health via:
- Queue length: Number of jobs waiting
- Active jobs: Currently processing
- Completed rate: Jobs/minute completion rate
- Failed rate: Percentage of failed jobs
- Average duration: Mean job processing time
Prometheus Metrics
FOVEA exports queue metrics to Prometheus:
# Queue job counter
fovea_queue_job_total{queue="video_summarization",state="completed"} 150
fovea_queue_job_total{queue="video_summarization",state="failed"} 5
# Queue job duration
fovea_queue_job_duration_seconds{queue="video_summarization"} 45.2
# Queue length
fovea_queue_length{queue="video_summarization"} 3
Grafana Dashboards
View queue metrics in Grafana:
- Queue length over time
- Job completion rate
- Failure rate by queue
- Average job duration
- Worker concurrency
Access Grafana at http://localhost:3002
Performance Optimization
Concurrency Tuning
Adjust worker concurrency based on resources:
Low resources (2 CPU cores, 8GB RAM):
concurrency: 2 // Process 2 jobs at once
Medium resources (4 CPU cores, 16GB RAM):
concurrency: 5 // Process 5 jobs at once
High resources (8+ CPU cores, 32GB+ RAM):
concurrency: 10 // Process 10 jobs at once
Rate Limiting
Prevent overwhelming model service:
limiter: {
max: 10, // Max 10 jobs...
duration: 60000 // ...per 60 seconds
}
Adjust based on model service capacity.
Priority Queues
Prioritize urgent jobs:
// High priority
await queue.add('urgent-task', data, { priority: 1 })
// Normal priority
await queue.add('normal-task', data, { priority: 5 })
// Low priority
await queue.add('batch-task', data, { priority: 10 })
Lower priority numbers execute first.
Error Handling and Retries
Automatic Retries
Failed jobs automatically retry with exponential backoff:
Attempt 1: Fails at T+0s → Retry at T+5s
Attempt 2: Fails at T+5s → Retry at T+15s (5s + 10s)
Attempt 3: Fails at T+15s → Retry at T+35s (5s + 10s + 20s)
Attempt 4: Exhausted → Mark as failed permanently
Retry Strategy
Retries are appropriate for:
✅ Temporary model service unavailability ✅ Network timeouts ✅ Rate limiting (429 errors) ✅ Temporary resource constraints
Retries are NOT appropriate for:
❌ Invalid input data ❌ Authentication failures ❌ Client errors (400-level HTTP errors) ❌ Quota exceeded
Manual Retry
Retry failed jobs manually:
// Get failed job
const job = await queue.getJob(jobId)
// Retry
if (job && job.failedReason) {
await job.retry()
}
Troubleshooting
Jobs Stuck in Queue
Symptom: Jobs remain in queued state indefinitely
Causes:
- Worker process not running
- Redis connection lost
- Worker crashed
Solution:
# Check worker status
docker compose logs backend | grep Worker
# Restart workers
docker compose restart backend
# Check Redis connection
docker compose logs redis
High Failure Rate
Symptom: Many jobs failing repeatedly
Causes:
- Model service down or overloaded
- Invalid job data
- Insufficient resources
Solution:
# Check model service
curl http://localhost:8000/health
# View failed jobs
docker compose logs backend | grep "Job failed"
# Check resources
docker stats
Slow Job Processing
Symptom: Jobs take longer than expected
Causes:
- Low worker concurrency
- Model service slow
- Resource constraints
- Large queue backlog
Solution:
- Increase worker concurrency
- Scale model service
- Add more CPU/RAM
- Monitor queue length
Redis Connection Issues
Symptom: "ECONNREFUSED" errors
Solution:
# Check Redis running
docker compose ps redis
# Test connection
docker compose exec redis redis-cli ping
# Should return "PONG"
# Restart Redis
docker compose restart redis
Development and Testing
Running Workers Locally
# Start all services including workers
docker compose up
# Workers start automatically with backend service
# Check logs
docker compose logs backend | grep "Worker started"
Testing Job Flow
// Create test job
const job = await queue.add('test', { videoId: 'test-123' })
// Wait for completion
await job.waitUntilFinished()
// Check result
console.log(job.returnvalue)
Inspecting Queue State
// Get queue stats
const waiting = await queue.getWaitingCount()
const active = await queue.getActiveCount()
const completed = await queue.getCompletedCount()
const failed = await queue.getFailedCount()
console.log({ waiting, active, completed, failed })
Security Considerations
Redis Security
- Network isolation: Redis should not be exposed to public internet
- Authentication: Use
requirepassin production - Encryption: Use TLS for Redis connections in production
Job Data Privacy
- Don't store sensitive data in job payloads
- Use database IDs instead of full data
- Sanitize job data before logging
Rate Limiting
- Prevent job spam with rate limits
- Validate job data before queuing
- Authenticate job creation requests
See Also
- Architecture Overview: Overall system architecture
- Model Service: AI model integration
- Observability: Monitoring and metrics
- BullMQ Documentation: Official BullMQ docs