Background Tasks & Scheduling
Complete guide to implementing background tasks and cron jobs with Celery in FastLaunchAPI
Why Use Background Tasks?
In modern web applications, certain operations are too heavy or time-consuming to run synchronously during API requests. Background tasks solve this problem by moving these operations to a separate process, allowing your API to remain responsive and providing better user experience.
The background task system is essential for:
- Asynchronous Processing: Execute heavy computations without blocking API responses
- Scheduled Tasks: Run maintenance jobs, reports, and recurring operations
- Email Processing: Send emails asynchronously to improve response times
- Data Processing: Handle large datasets and complex operations
- External API Calls: Integrate with third-party services without timeout concerns
All background tasks in FastLaunchAPI are processed using Celery with Redis as both the message broker and result backend for optimal performance.
System Architecture
Understanding the architecture helps you work effectively with background tasks. The system consists of several key components that work together to provide reliable task processing:
🔧 Celery App
Main Celery application configuration with worker management and task routing
⏰ Celery Beat
Scheduler service for cron-like recurring tasks and periodic job execution
🔄 Redis Broker
Message broker and result backend for task queue management
📋 Task Queues
Organized task queues for different types of background operations
How It Works
The background task system follows this flow:
- Task Definition: You define tasks as Python functions decorated with
@celery_app.task()
- Task Queuing: When your API needs to run a task, it sends the task to a Redis queue
- Worker Processing: Celery workers continuously monitor queues and execute tasks
- Result Storage: Task results are stored in Redis for later retrieval
- Scheduling: Celery Beat can automatically trigger tasks on a schedule
Setting Up Celery
Configure the Celery Application
The main Celery configuration is located in celery_setup.py
. This file defines how Celery connects to Redis, which tasks to include, and how to handle scheduling:
Make sure to include your newly added tasks, if they are in other routes, in the include field of Celery.
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
from app.config.settings import app_settings
# Initialize Celery application
celery_app = Celery(
'tasks',
broker=app_settings.REDIS_DSN,
backend=app_settings.REDIS_DSN,
include=[
"app.routers.core.tasks",
# Add more task modules here
]
)
# Configure Beat scheduler for cron jobs
celery_app.conf.beat_schedule = {
'sample_task': {
'task': 'app.routers.core.tasks.sample_task',
'schedule': crontab(minute="0", hour='*/4'), # Every 4 hours
'options': {'queue': 'default'}
},
# Add more scheduled tasks here
}
# Set timezone for beat scheduler
celery_app.conf.timezone = 'UTC'
# Configure task routes to specific queues
celery_app.conf.task_routes = {
'app.routers.core.tasks.sample_task': {'queue': 'default'},
# Route different tasks to different queues
}
# Queue configuration
celery_app.conf.task_default_queue = 'default'
celery_app.conf.task_create_missing_queues = True
Configure Advanced Settings
Celery offers many configuration options to optimize performance and reliability. Here are the most important settings organized by category:
These settings control how Celery connects to Redis and handles connection failures:
# Redis broker configuration
broker=app_settings.REDIS_DSN
backend=app_settings.REDIS_DSN
# Connection pool settings
celery_app.conf.broker_connection_retry_on_startup = True
celery_app.conf.broker_connection_max_retries = 10
The broker_connection_retry_on_startup
ensures workers keep trying to connect if Redis is temporarily unavailable during startup.
Queue routing allows you to organize tasks by priority and resource requirements:
# Multiple queue configuration
celery_app.conf.task_routes = {
'app.tasks.email_tasks.*': {'queue': 'email'},
'app.tasks.heavy_tasks.*': {'queue': 'heavy'},
'app.tasks.quick_tasks.*': {'queue': 'quick'},
}
This configuration automatically routes tasks to appropriate queues based on their module path.
Worker settings control how tasks are processed and help prevent memory leaks:
# Worker configuration
celery_app.conf.worker_prefetch_multiplier = 1
celery_app.conf.worker_max_tasks_per_child = 1000
celery_app.conf.worker_disable_rate_limits = False
worker_prefetch_multiplier = 1
prevents workers from prefetching too many tasksworker_max_tasks_per_child = 1000
restarts workers after processing 1000 tasks to prevent memory leaks
Creating Your First Background Task
Understanding Task Structure
Background tasks in FastLaunchAPI are Python functions decorated with @celery_app.task()
. Let's start with a simple example and then explore more advanced patterns.
Here's a basic task that processes input using OpenAI and returns a structured response:
from app.db.database import SessionLocal
from celery_setup import celery_app
from langchain.prompts.chat import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import os
import json
import logging
# Initialize OpenAI client
chat_model = ChatOpenAI(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4o-mini"
)
@celery_app.task()
def sample_task(input: str):
"""
Sample background task that processes input with OpenAI
and returns structured JSON response
"""
db = SessionLocal()
try:
# Define system template
template = (
"1. First Instruction: Process the user input\n"
"2. Second Instruction: Return structured response\n"
"3. Ensure response is valid JSON format"
)
human_template = "msg: {msg}"
# Create chat prompt
chat_prompt = ChatPromptTemplate.from_messages([
("system", template),
("human", human_template)
])
# Format the prompt
formatted_prompt = chat_prompt.format_messages(
msg=input,
schema=None
)
# Process with OpenAI
result = chat_model.invoke(formatted_prompt)
json_result = json.loads(result.content)
return {"detail": json_result, "status": "success"}
except Exception as e:
logging.error(f"Task failed: {str(e)}")
return {"error": str(e), "status": "failed"}
finally:
db.close()
Add Task Decorators and Error Handling
The @celery_app.task()
decorator converts a regular Python function into a Celery task. You can configure various options to control how the task behaves:
Celery tasks can be configured with various options to control execution behavior, retry logic, and error handling.
@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60},
retry_backoff=True,
retry_jitter=True
)
def robust_task(self, data):
"""
Task with automatic retry logic and error handling
"""
try:
# Your task logic here
return process_data(data)
except Exception as exc:
# Log the error
logging.error(f"Task failed: {exc}")
# Retry the task
raise self.retry(exc=exc)
Let's break down these options:
bind=True
: Gives the task access to its own context (theself
parameter)autoretry_for=(Exception,)
: Automatically retries on any exceptionretry_kwargs
: Sets maximum retries and countdown between attemptsretry_backoff=True
: Increases delay between retries exponentiallyretry_jitter=True
: Adds randomization to retry delays to prevent thundering herd problems
Task Scheduling with Celery Beat
Understanding Cron Scheduling
Celery Beat is a scheduler that can trigger tasks automatically at specified intervals or times. This is perfect for maintenance tasks, reports, and recurring operations.
Celery Beat uses cron-like syntax for scheduling. Here are common patterns you'll use:
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
# Every 4 hours at minute 0
'cleanup_expired_tokens': {
'task': 'app.tasks.cleanup_expired_tokens',
'schedule': crontab(minute=0, hour='*/4'),
'options': {'queue': 'maintenance'}
},
# Daily at 2:30 AM
'daily_backup': {
'task': 'app.tasks.create_daily_backup',
'schedule': crontab(hour=2, minute=30),
'options': {'queue': 'heavy'}
},
# Weekly on Monday at 9 AM
'weekly_report': {
'task': 'app.tasks.generate_weekly_report',
'schedule': crontab(hour=9, minute=0, day_of_week=1),
'options': {'queue': 'reports'}
},
# Every 30 seconds
'health_check': {
'task': 'app.tasks.system_health_check',
'schedule': 30.0,
'options': {'queue': 'monitoring'}
},
# Monthly on the 1st at midnight
'monthly_cleanup': {
'task': 'app.tasks.monthly_maintenance',
'schedule': crontab(minute=0, hour=0, day_of_month=1),
'options': {'queue': 'maintenance'}
},
}
Cron Syntax Reference
Understanding cron syntax is crucial for scheduling tasks effectively:
- minute: 0-59
- hour: 0-23
- day_of_month: 1-31
- month: 1-12
- day_of_week: 0-6 (0 is Sunday)
You can use special characters:
*
: Any value*/n
: Every n unitsn-m
: Range from n to mn,m
: Specific values n and m
Implement Dynamic Scheduling
Sometimes you need to schedule tasks programmatically based on user actions or data. Here's how to add scheduled tasks dynamically:
from celery.beat import ScheduleEntry
from datetime import datetime, timedelta
def schedule_user_reminder(user_id: int, reminder_time: datetime):
"""
Schedule a one-time reminder task for a specific user
"""
task_name = f"user_reminder_{user_id}_{int(reminder_time.timestamp())}"
# Add to beat schedule
celery_app.conf.beat_schedule[task_name] = {
'task': 'app.tasks.send_user_reminder',
'schedule': reminder_time,
'args': (user_id,),
'options': {'expires': 60} # Task expires in 60 seconds
}
return task_name
This function creates a unique task name and adds it to the beat schedule. The task will run once at the specified time and then expire.
Organizing Tasks with Queues
Plan Your Queue Strategy
As your application grows, you'll want to organize tasks into different queues based on their characteristics. This allows you to prioritize certain types of work and allocate resources appropriately.
Different tasks have different requirements:
- Email tasks: Need to be fast and reliable
- Heavy processing: Can take longer but shouldn't block quick tasks
- Real-time tasks: Must be processed immediately
- Scheduled maintenance: Can run during off-peak hours
Configure Multiple Queues
Here's how to set up multiple queues with different priorities:
# Define multiple queues
celery_app.conf.task_routes = {
# Email tasks - high priority, quick execution
'app.tasks.send_email': {'queue': 'email'},
'app.tasks.send_verification': {'queue': 'email'},
# Heavy processing - low priority, resource intensive
'app.tasks.process_large_file': {'queue': 'heavy'},
'app.tasks.generate_report': {'queue': 'heavy'},
# Real-time tasks - immediate execution
'app.tasks.update_user_status': {'queue': 'realtime'},
'app.tasks.log_user_activity': {'queue': 'realtime'},
# Maintenance tasks - run during off-peak hours
'app.tasks.cleanup_*': {'queue': 'maintenance'},
}
Start Queue-Specific Workers
Once you have multiple queues, you need to run workers that process them. You can start specialized workers for each queue:
# Start worker for all queues
celery -A celery_setup worker --loglevel=info
# Start worker for specific queue
celery -A celery_setup worker --loglevel=info -Q email
# Start multiple workers for different queues
celery -A celery_setup worker --loglevel=info -Q heavy --concurrency=2
celery -A celery_setup worker --loglevel=info -Q email --concurrency=4
celery -A celery_setup worker --loglevel=info -Q realtime --concurrency=8
The --concurrency
option controls how many tasks can run simultaneously in that worker. Email tasks might need higher concurrency for better throughput, while heavy tasks might need lower concurrency to avoid overwhelming the system.
Common Task Patterns
Let's explore some common patterns you'll use when building background tasks for your application.
Email Processing Task
Email sending is a perfect candidate for background processing because it involves external API calls that can be slow or fail:
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to_email: str, subject: str, template: str, context: dict):
"""
Send email asynchronously with retry logic
"""
try:
from app.email.sender import send_email
result = send_email(
to_email=to_email,
subject=subject,
template=template,
context=context
)
return {"status": "sent", "message_id": result.message_id}
except Exception as exc:
logging.error(f"Email sending failed: {exc}")
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60)
This task will automatically retry up to 3 times if email sending fails, with a 60-second delay between attempts.
Database Cleanup Task
Maintenance tasks are essential for keeping your database clean and performant:
@celery_app.task()
def cleanup_expired_tokens():
"""
Remove expired JWT tokens from database
"""
db = SessionLocal()
try:
from app.db.models import TokenBlacklist
from datetime import datetime
# Delete expired tokens
expired_count = db.query(TokenBlacklist).filter(
TokenBlacklist.expires_at < datetime.utcnow()
).delete()
db.commit()
logging.info(f"Cleaned up {expired_count} expired tokens")
return {"cleaned_tokens": expired_count}
except Exception as exc:
db.rollback()
logging.error(f"Token cleanup failed: {exc}")
raise
finally:
db.close()
This task can be scheduled to run regularly (e.g., every 4 hours) to keep your database clean.
File Processing Task
File processing is often time-consuming and should be done in the background:
@celery_app.task(bind=True)
def process_uploaded_file(self, file_path: str, user_id: int):
"""
Process uploaded file in background
"""
try:
# Process file (this could take several minutes)
result = analyze_file(file_path)
# Update user record
db = SessionLocal()
user = db.query(User).filter(User.id == user_id).first()
user.file_processed = True
user.processing_result = result
db.commit()
db.close()
# Send notification email
send_email_task.delay(
to_email=user.email,
subject="File Processing Complete",
template="file_processed.html",
context={"result": result}
)
return {"status": "processed", "result": result}
except Exception as exc:
logging.error(f"File processing failed: {exc}")
# Retry in 5 minutes
raise self.retry(exc=exc, countdown=300)
This task demonstrates task chaining - after processing the file, it triggers an email task to notify the user.
Running the Task System
The following can be skipped if you are running everything with shipped docker compose. Just rebuild the Docker images and start the compose again as it will build the images with the new tasks.
Start Redis Server
Redis serves as both the message broker and result backend. If you're using Docker Compose, start Redis:
docker-compose up redis
For development, you can also run Redis locally:
# On macOS with Homebrew
brew install redis
redis-server
# On Ubuntu/Debian
sudo apt install redis-server
sudo systemctl start redis
Start Celery Workers
Workers are the processes that actually execute your tasks. Start a worker in a new terminal:
celery -A celery_setup worker --loglevel=info
You should see output showing the worker connecting to Redis and listing available tasks. For production, you'll want to run multiple workers:
# Start workers for different queues
celery -A celery_setup worker --loglevel=info -Q email --concurrency=4
celery -A celery_setup worker --loglevel=info -Q heavy --concurrency=2
celery -A celery_setup worker --loglevel=info -Q default --concurrency=2
Start Celery Beat (For Scheduled Tasks)
If you have scheduled tasks, start the Beat scheduler in another terminal:
celery -A celery_setup beat --loglevel=info
Beat will read your schedule configuration and trigger tasks at the specified times.
Monitor with Flower (Optional)
Flower provides a web interface for monitoring your tasks:
pip install flower
celery -A celery_setup flower
# Then visit http://localhost:5555
Flower shows you real-time information about workers, tasks, and queues.
Integrating Tasks with Your API
Create Task Endpoints
Now that you have the task system running, let's see how to use it in your FastAPI endpoints:
from fastapi import APIRouter, BackgroundTasks
from app.routers.core.tasks import sample_task, send_email_task
router = APIRouter()
@router.post("/process-data")
async def process_data_endpoint(data: dict):
"""
Trigger background task from API endpoint
"""
# Start task asynchronously
task = sample_task.delay(str(data))
return {
"message": "Task started",
"task_id": task.id,
"status": "pending"
}
@router.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""
Check task status and result
"""
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
The .delay()
method starts the task asynchronously and returns a task object with an ID. You can use this ID to check the task's status later.
Implement Task Status Tracking
Tasks can have different states:
- PENDING: Task is waiting to be processed
- STARTED: Task has been started by a worker
- SUCCESS: Task completed successfully
- FAILURE: Task failed
- RETRY: Task is being retried
- REVOKED: Task was cancelled
Add Advanced Result Handling
For more complex scenarios, you might want to store task references in your database:
@router.post("/send-notification")
async def send_notification(user_id: int, message: str):
"""
Send notification with task result tracking
"""
# Get user email
user = get_user_by_id(user_id)
# Send email task
task = send_email_task.delay(
to_email=user.email,
subject="Notification",
template="notification.html",
context={"message": message}
)
# Store task ID for tracking
save_task_reference(user_id, task.id, "email_notification")
return {"task_id": task.id, "status": "queued"}
This approach allows you to track which tasks belong to which users and provide better user experience.
Monitoring and Debugging
Set Up Task Monitoring
Effective monitoring is crucial for maintaining a healthy task system in production.
Flower provides a comprehensive web interface for monitoring your Celery tasks:
Use Flower for real-time monitoring of task execution, worker status, and queue metrics.
# Install Flower
pip install flower
# Start Flower monitoring
celery -A celery_setup flower --port=5555
# Access dashboard at http://localhost:5555
Flower shows you:
- Active, processed, and failed tasks
- Worker status and performance metrics
- Queue lengths and processing rates
- Task execution times and error rates
Configure Comprehensive Logging
Proper logging is essential for debugging task issues:
import logging
# Configure logging for tasks
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
@celery_app.task(bind=True)
def logged_task(self, data):
"""
Task with comprehensive logging
"""
logger = logging.getLogger(__name__)
logger.info(f"Task {self.request.id} started with data: {data}")
try:
result = process_data(data)
logger.info(f"Task {self.request.id} completed successfully")
return result
except Exception as exc:
logger.error(f"Task {self.request.id} failed: {exc}", exc_info=True)
raise
Use Debug Commands
Celery provides several commands to help debug issues:
# Check Celery configuration
celery -A celery_setup inspect stats
# List active tasks
celery -A celery_setup inspect active
# Check registered tasks
celery -A celery_setup inspect registered
# Check scheduled tasks
celery -A celery_setup inspect scheduled
# Purge all tasks from queue
celery -A celery_setup purge
# Check worker status
celery -A celery_setup status
Best Practices
Following these best practices will help you build a robust and maintainable background task system:
Follow these best practices to ensure reliable and efficient background task processing in your FastLaunchAPI application.
Task Design Principles
- Idempotency: Tasks should be safe to run multiple times with the same input
- Atomicity: Each task should be a single unit of work that either succeeds or fails completely
- Error Handling: Always include proper exception handling and logging
- Resource Management: Close database connections and file handles properly
- Logging: Include comprehensive logging for debugging and monitoring
Performance Optimization
Here's an example of a well-optimized task:
@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60},
soft_time_limit=300, # 5 minutes
time_limit=320, # 5 minutes 20 seconds
)
def optimized_task(self, data):
"""
Well-optimized task with proper resource management
"""
db = SessionLocal()
try:
# Process data efficiently
result = efficient_processing(data)
# Batch database operations
batch_update_records(db, result)
return {"processed": len(result), "status": "success"}
except Exception as exc:
db.rollback()
self.retry(exc=exc)
finally:
db.close()
Key optimizations:
- Time limits: Prevent tasks from running indefinitely
- Batch operations: Reduce database round trips
- Proper cleanup: Always close resources in finally blocks
- Retry logic: Handle transient failures gracefully
Security Considerations
- Input validation: Always validate task inputs
- Authentication: Don't pass sensitive data as task arguments
- Rate limiting: Prevent abuse of resource-intensive tasks
- Monitoring: Monitor for unusual task patterns
Troubleshooting Common Issues
Here are solutions to common problems you might encounter:
Worker Not Starting
Worker Not Starting: Check Redis connection and ensure the broker URL is correct in your environment variables.
Common causes:
- Redis is not running
- Incorrect
REDIS_DSN
in environment variables - Firewall blocking Redis port (6379)
- Redis authentication issues
Tasks Not Executing
Tasks Not Executing: Verify that tasks are properly imported in the
include
list of your Celery configuration.
Check that:
- Tasks are imported in
celery_setup.py
- Task names match exactly
- Workers are running and connected
- Tasks are being sent to the correct queue
Memory Issues
Memory Issues: Monitor worker memory usage and configure
worker_max_tasks_per_child
to prevent memory leaks.
Solutions:
- Set
worker_max_tasks_per_child
to restart workers periodically - Monitor memory usage with system tools
- Profile tasks to identify memory-intensive operations
- Use connection pooling for database operations
Production Deployment
Scale Workers Appropriately
When deploying to production, scale workers based on your application's needs:
# Run multiple workers with different concurrency
celery -A celery_setup worker --loglevel=info --concurrency=4
celery -A celery_setup worker --loglevel=info --concurrency=2 -Q heavy
Set Up Monitoring and Alerting
Set up monitoring for production environments:
- Monitor Redis memory usage and connection counts
- Track task execution times and failure rates
- Set up alerts for failed tasks and worker outages
- Monitor worker health and queue lengths
Implement Security Measures
- Use Redis AUTH for authentication
- Configure proper network security (VPC, firewalls)
- Use SSL/TLS for Redis connections
- Limit task execution time to prevent DoS attacks
- Validate all task inputs thoroughly
Conclusion
Background tasks are essential for building scalable web applications. With FastLaunchAPI's Celery integration, you can:
- Process heavy operations without blocking your API
- Schedule recurring maintenance tasks
- Send emails and notifications asynchronously
- Handle file processing and data analysis
- Scale your application to handle more users
Start with simple tasks and gradually add more complex patterns as your application grows. Remember to monitor your tasks in production and follow the best practices outlined in this guide.
For more advanced use cases, consider exploring Celery's workflow features, custom task classes, and integration with other services like monitoring and alerting systems.