Loading IconFastLaunchAPI
Features

Background Tasks & Scheduling

Complete guide to implementing background tasks and cron jobs with Celery in FastLaunchAPI

Why Use Background Tasks?

In modern web applications, certain operations are too heavy or time-consuming to run synchronously during API requests. Background tasks solve this problem by moving these operations to a separate process, allowing your API to remain responsive and providing better user experience.

The background task system is essential for:

  • Asynchronous Processing: Execute heavy computations without blocking API responses
  • Scheduled Tasks: Run maintenance jobs, reports, and recurring operations
  • Email Processing: Send emails asynchronously to improve response times
  • Data Processing: Handle large datasets and complex operations
  • External API Calls: Integrate with third-party services without timeout concerns

All background tasks in FastLaunchAPI are processed using Celery with Redis as both the message broker and result backend for optimal performance.

System Architecture

Understanding the architecture helps you work effectively with background tasks. The system consists of several key components that work together to provide reliable task processing:

🔧 Celery App

Main Celery application configuration with worker management and task routing

⏰ Celery Beat

Scheduler service for cron-like recurring tasks and periodic job execution

🔄 Redis Broker

Message broker and result backend for task queue management

📋 Task Queues

Organized task queues for different types of background operations

How It Works

The background task system follows this flow:

  1. Task Definition: You define tasks as Python functions decorated with @celery_app.task()
  2. Task Queuing: When your API needs to run a task, it sends the task to a Redis queue
  3. Worker Processing: Celery workers continuously monitor queues and execute tasks
  4. Result Storage: Task results are stored in Redis for later retrieval
  5. Scheduling: Celery Beat can automatically trigger tasks on a schedule

Setting Up Celery

Configure the Celery Application

The main Celery configuration is located in celery_setup.py. This file defines how Celery connects to Redis, which tasks to include, and how to handle scheduling:

Make sure to include your newly added tasks, if they are in other routes, in the include field of Celery.

celery_setup.py
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
from app.config.settings import app_settings

# Initialize Celery application
celery_app = Celery(
    'tasks',
    broker=app_settings.REDIS_DSN,
    backend=app_settings.REDIS_DSN,
    include=[
        "app.routers.core.tasks",
        # Add more task modules here
    ]
)

# Configure Beat scheduler for cron jobs
celery_app.conf.beat_schedule = {
    'sample_task': {
        'task': 'app.routers.core.tasks.sample_task',
        'schedule': crontab(minute="0", hour='*/4'),  # Every 4 hours
        'options': {'queue': 'default'}
    },
    # Add more scheduled tasks here
}

# Set timezone for beat scheduler
celery_app.conf.timezone = 'UTC'

# Configure task routes to specific queues
celery_app.conf.task_routes = {
    'app.routers.core.tasks.sample_task': {'queue': 'default'},
    # Route different tasks to different queues
}

# Queue configuration
celery_app.conf.task_default_queue = 'default'
celery_app.conf.task_create_missing_queues = True

Configure Advanced Settings

Celery offers many configuration options to optimize performance and reliability. Here are the most important settings organized by category:

These settings control how Celery connects to Redis and handles connection failures:

# Redis broker configuration
broker=app_settings.REDIS_DSN
backend=app_settings.REDIS_DSN

# Connection pool settings
celery_app.conf.broker_connection_retry_on_startup = True
celery_app.conf.broker_connection_max_retries = 10

The broker_connection_retry_on_startup ensures workers keep trying to connect if Redis is temporarily unavailable during startup.

Queue routing allows you to organize tasks by priority and resource requirements:

# Multiple queue configuration
celery_app.conf.task_routes = {
    'app.tasks.email_tasks.*': {'queue': 'email'},
    'app.tasks.heavy_tasks.*': {'queue': 'heavy'},
    'app.tasks.quick_tasks.*': {'queue': 'quick'},
}

This configuration automatically routes tasks to appropriate queues based on their module path.

Worker settings control how tasks are processed and help prevent memory leaks:

# Worker configuration
celery_app.conf.worker_prefetch_multiplier = 1
celery_app.conf.worker_max_tasks_per_child = 1000
celery_app.conf.worker_disable_rate_limits = False
  • worker_prefetch_multiplier = 1 prevents workers from prefetching too many tasks
  • worker_max_tasks_per_child = 1000 restarts workers after processing 1000 tasks to prevent memory leaks

Creating Your First Background Task

Understanding Task Structure

Background tasks in FastLaunchAPI are Python functions decorated with @celery_app.task(). Let's start with a simple example and then explore more advanced patterns.

Here's a basic task that processes input using OpenAI and returns a structured response:

app/routers/core/tasks.py
from app.db.database import SessionLocal
from celery_setup import celery_app
from langchain.prompts.chat import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import os
import json
import logging

# Initialize OpenAI client
chat_model = ChatOpenAI(
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    model_name="gpt-4o-mini"
)

@celery_app.task()
def sample_task(input: str):
    """
    Sample background task that processes input with OpenAI
    and returns structured JSON response
    """
    db = SessionLocal()

    try:
        # Define system template
        template = (
            "1. First Instruction: Process the user input\n"
            "2. Second Instruction: Return structured response\n"
            "3. Ensure response is valid JSON format"
        )

        human_template = "msg: {msg}"

        # Create chat prompt
        chat_prompt = ChatPromptTemplate.from_messages([
            ("system", template),
            ("human", human_template)
        ])

        # Format the prompt
        formatted_prompt = chat_prompt.format_messages(
            msg=input,
            schema=None
        )

        # Process with OpenAI
        result = chat_model.invoke(formatted_prompt)
        json_result = json.loads(result.content)

        return {"detail": json_result, "status": "success"}

    except Exception as e:
        logging.error(f"Task failed: {str(e)}")
        return {"error": str(e), "status": "failed"}

    finally:
        db.close()

Add Task Decorators and Error Handling

The @celery_app.task() decorator converts a regular Python function into a Celery task. You can configure various options to control how the task behaves:

Celery tasks can be configured with various options to control execution behavior, retry logic, and error handling.

Advanced Task Configuration
@celery_app.task(
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 3, 'countdown': 60},
    retry_backoff=True,
    retry_jitter=True
)
def robust_task(self, data):
    """
    Task with automatic retry logic and error handling
    """
    try:
        # Your task logic here
        return process_data(data)
    except Exception as exc:
        # Log the error
        logging.error(f"Task failed: {exc}")
        # Retry the task
        raise self.retry(exc=exc)

Let's break down these options:

  • bind=True: Gives the task access to its own context (the self parameter)
  • autoretry_for=(Exception,): Automatically retries on any exception
  • retry_kwargs: Sets maximum retries and countdown between attempts
  • retry_backoff=True: Increases delay between retries exponentially
  • retry_jitter=True: Adds randomization to retry delays to prevent thundering herd problems

Task Scheduling with Celery Beat

Understanding Cron Scheduling

Celery Beat is a scheduler that can trigger tasks automatically at specified intervals or times. This is perfect for maintenance tasks, reports, and recurring operations.

Celery Beat uses cron-like syntax for scheduling. Here are common patterns you'll use:

Common Scheduling Patterns
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    # Every 4 hours at minute 0
    'cleanup_expired_tokens': {
        'task': 'app.tasks.cleanup_expired_tokens',
        'schedule': crontab(minute=0, hour='*/4'),
        'options': {'queue': 'maintenance'}
    },

    # Daily at 2:30 AM
    'daily_backup': {
        'task': 'app.tasks.create_daily_backup',
        'schedule': crontab(hour=2, minute=30),
        'options': {'queue': 'heavy'}
    },

    # Weekly on Monday at 9 AM
    'weekly_report': {
        'task': 'app.tasks.generate_weekly_report',
        'schedule': crontab(hour=9, minute=0, day_of_week=1),
        'options': {'queue': 'reports'}
    },

    # Every 30 seconds
    'health_check': {
        'task': 'app.tasks.system_health_check',
        'schedule': 30.0,
        'options': {'queue': 'monitoring'}
    },

    # Monthly on the 1st at midnight
    'monthly_cleanup': {
        'task': 'app.tasks.monthly_maintenance',
        'schedule': crontab(minute=0, hour=0, day_of_month=1),
        'options': {'queue': 'maintenance'}
    },
}

Cron Syntax Reference

Understanding cron syntax is crucial for scheduling tasks effectively:

  • minute: 0-59
  • hour: 0-23
  • day_of_month: 1-31
  • month: 1-12
  • day_of_week: 0-6 (0 is Sunday)

You can use special characters:

  • *: Any value
  • */n: Every n units
  • n-m: Range from n to m
  • n,m: Specific values n and m

Implement Dynamic Scheduling

Sometimes you need to schedule tasks programmatically based on user actions or data. Here's how to add scheduled tasks dynamically:

Dynamic Task Scheduling
from celery.beat import ScheduleEntry
from datetime import datetime, timedelta

def schedule_user_reminder(user_id: int, reminder_time: datetime):
    """
    Schedule a one-time reminder task for a specific user
    """
    task_name = f"user_reminder_{user_id}_{int(reminder_time.timestamp())}"

    # Add to beat schedule
    celery_app.conf.beat_schedule[task_name] = {
        'task': 'app.tasks.send_user_reminder',
        'schedule': reminder_time,
        'args': (user_id,),
        'options': {'expires': 60}  # Task expires in 60 seconds
    }

    return task_name

This function creates a unique task name and adds it to the beat schedule. The task will run once at the specified time and then expire.

Organizing Tasks with Queues

Plan Your Queue Strategy

As your application grows, you'll want to organize tasks into different queues based on their characteristics. This allows you to prioritize certain types of work and allocate resources appropriately.

Different tasks have different requirements:

  • Email tasks: Need to be fast and reliable
  • Heavy processing: Can take longer but shouldn't block quick tasks
  • Real-time tasks: Must be processed immediately
  • Scheduled maintenance: Can run during off-peak hours

Configure Multiple Queues

Here's how to set up multiple queues with different priorities:

Queue Configuration
# Define multiple queues
celery_app.conf.task_routes = {
    # Email tasks - high priority, quick execution
    'app.tasks.send_email': {'queue': 'email'},
    'app.tasks.send_verification': {'queue': 'email'},

    # Heavy processing - low priority, resource intensive
    'app.tasks.process_large_file': {'queue': 'heavy'},
    'app.tasks.generate_report': {'queue': 'heavy'},

    # Real-time tasks - immediate execution
    'app.tasks.update_user_status': {'queue': 'realtime'},
    'app.tasks.log_user_activity': {'queue': 'realtime'},

    # Maintenance tasks - run during off-peak hours
    'app.tasks.cleanup_*': {'queue': 'maintenance'},
}

Start Queue-Specific Workers

Once you have multiple queues, you need to run workers that process them. You can start specialized workers for each queue:

Worker Management Commands
# Start worker for all queues
celery -A celery_setup worker --loglevel=info

# Start worker for specific queue
celery -A celery_setup worker --loglevel=info -Q email

# Start multiple workers for different queues
celery -A celery_setup worker --loglevel=info -Q heavy --concurrency=2
celery -A celery_setup worker --loglevel=info -Q email --concurrency=4
celery -A celery_setup worker --loglevel=info -Q realtime --concurrency=8

The --concurrency option controls how many tasks can run simultaneously in that worker. Email tasks might need higher concurrency for better throughput, while heavy tasks might need lower concurrency to avoid overwhelming the system.

Common Task Patterns

Let's explore some common patterns you'll use when building background tasks for your application.

Email Processing Task

Email sending is a perfect candidate for background processing because it involves external API calls that can be slow or fail:

Email Background Task
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to_email: str, subject: str, template: str, context: dict):
    """
    Send email asynchronously with retry logic
    """
    try:
        from app.email.sender import send_email

        result = send_email(
            to_email=to_email,
            subject=subject,
            template=template,
            context=context
        )

        return {"status": "sent", "message_id": result.message_id}

    except Exception as exc:
        logging.error(f"Email sending failed: {exc}")
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60)

This task will automatically retry up to 3 times if email sending fails, with a 60-second delay between attempts.

Database Cleanup Task

Maintenance tasks are essential for keeping your database clean and performant:

Database Maintenance Task
@celery_app.task()
def cleanup_expired_tokens():
    """
    Remove expired JWT tokens from database
    """
    db = SessionLocal()

    try:
        from app.db.models import TokenBlacklist
        from datetime import datetime

        # Delete expired tokens
        expired_count = db.query(TokenBlacklist).filter(
            TokenBlacklist.expires_at < datetime.utcnow()
        ).delete()

        db.commit()
        logging.info(f"Cleaned up {expired_count} expired tokens")

        return {"cleaned_tokens": expired_count}

    except Exception as exc:
        db.rollback()
        logging.error(f"Token cleanup failed: {exc}")
        raise

    finally:
        db.close()

This task can be scheduled to run regularly (e.g., every 4 hours) to keep your database clean.

File Processing Task

File processing is often time-consuming and should be done in the background:

File Processing Task
@celery_app.task(bind=True)
def process_uploaded_file(self, file_path: str, user_id: int):
    """
    Process uploaded file in background
    """
    try:
        # Process file (this could take several minutes)
        result = analyze_file(file_path)

        # Update user record
        db = SessionLocal()
        user = db.query(User).filter(User.id == user_id).first()
        user.file_processed = True
        user.processing_result = result
        db.commit()
        db.close()

        # Send notification email
        send_email_task.delay(
            to_email=user.email,
            subject="File Processing Complete",
            template="file_processed.html",
            context={"result": result}
        )

        return {"status": "processed", "result": result}

    except Exception as exc:
        logging.error(f"File processing failed: {exc}")
        # Retry in 5 minutes
        raise self.retry(exc=exc, countdown=300)

This task demonstrates task chaining - after processing the file, it triggers an email task to notify the user.

Running the Task System

The following can be skipped if you are running everything with shipped docker compose. Just rebuild the Docker images and start the compose again as it will build the images with the new tasks.

Start Redis Server

Redis serves as both the message broker and result backend. If you're using Docker Compose, start Redis:

docker-compose up redis

For development, you can also run Redis locally:

# On macOS with Homebrew
brew install redis
redis-server

# On Ubuntu/Debian
sudo apt install redis-server
sudo systemctl start redis

Start Celery Workers

Workers are the processes that actually execute your tasks. Start a worker in a new terminal:

celery -A celery_setup worker --loglevel=info

You should see output showing the worker connecting to Redis and listing available tasks. For production, you'll want to run multiple workers:

# Start workers for different queues
celery -A celery_setup worker --loglevel=info -Q email --concurrency=4
celery -A celery_setup worker --loglevel=info -Q heavy --concurrency=2
celery -A celery_setup worker --loglevel=info -Q default --concurrency=2

Start Celery Beat (For Scheduled Tasks)

If you have scheduled tasks, start the Beat scheduler in another terminal:

celery -A celery_setup beat --loglevel=info

Beat will read your schedule configuration and trigger tasks at the specified times.

Monitor with Flower (Optional)

Flower provides a web interface for monitoring your tasks:

pip install flower
celery -A celery_setup flower

# Then visit http://localhost:5555

Flower shows you real-time information about workers, tasks, and queues.

Integrating Tasks with Your API

Create Task Endpoints

Now that you have the task system running, let's see how to use it in your FastAPI endpoints:

API Integration
from fastapi import APIRouter, BackgroundTasks
from app.routers.core.tasks import sample_task, send_email_task

router = APIRouter()

@router.post("/process-data")
async def process_data_endpoint(data: dict):
    """
    Trigger background task from API endpoint
    """
    # Start task asynchronously
    task = sample_task.delay(str(data))

    return {
        "message": "Task started",
        "task_id": task.id,
        "status": "pending"
    }

@router.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """
    Check task status and result
    """
    result = celery_app.AsyncResult(task_id)

    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

The .delay() method starts the task asynchronously and returns a task object with an ID. You can use this ID to check the task's status later.

Implement Task Status Tracking

Tasks can have different states:

  • PENDING: Task is waiting to be processed
  • STARTED: Task has been started by a worker
  • SUCCESS: Task completed successfully
  • FAILURE: Task failed
  • RETRY: Task is being retried
  • REVOKED: Task was cancelled

Add Advanced Result Handling

For more complex scenarios, you might want to store task references in your database:

Task Result Management
@router.post("/send-notification")
async def send_notification(user_id: int, message: str):
    """
    Send notification with task result tracking
    """
    # Get user email
    user = get_user_by_id(user_id)

    # Send email task
    task = send_email_task.delay(
        to_email=user.email,
        subject="Notification",
        template="notification.html",
        context={"message": message}
    )

    # Store task ID for tracking
    save_task_reference(user_id, task.id, "email_notification")

    return {"task_id": task.id, "status": "queued"}

This approach allows you to track which tasks belong to which users and provide better user experience.

Monitoring and Debugging

Set Up Task Monitoring

Effective monitoring is crucial for maintaining a healthy task system in production.

Flower provides a comprehensive web interface for monitoring your Celery tasks:

Use Flower for real-time monitoring of task execution, worker status, and queue metrics.

# Install Flower
pip install flower

# Start Flower monitoring
celery -A celery_setup flower --port=5555

# Access dashboard at http://localhost:5555

Flower shows you:

  • Active, processed, and failed tasks
  • Worker status and performance metrics
  • Queue lengths and processing rates
  • Task execution times and error rates

Configure Comprehensive Logging

Proper logging is essential for debugging task issues:

Task Logging Setup
import logging

# Configure logging for tasks
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

@celery_app.task(bind=True)
def logged_task(self, data):
    """
    Task with comprehensive logging
    """
    logger = logging.getLogger(__name__)

    logger.info(f"Task {self.request.id} started with data: {data}")

    try:
        result = process_data(data)
        logger.info(f"Task {self.request.id} completed successfully")
        return result

    except Exception as exc:
        logger.error(f"Task {self.request.id} failed: {exc}", exc_info=True)
        raise

Use Debug Commands

Celery provides several commands to help debug issues:

# Check Celery configuration
celery -A celery_setup inspect stats

# List active tasks
celery -A celery_setup inspect active

# Check registered tasks
celery -A celery_setup inspect registered

# Check scheduled tasks
celery -A celery_setup inspect scheduled

# Purge all tasks from queue
celery -A celery_setup purge

# Check worker status
celery -A celery_setup status

Best Practices

Following these best practices will help you build a robust and maintainable background task system:

Follow these best practices to ensure reliable and efficient background task processing in your FastLaunchAPI application.

Task Design Principles

  1. Idempotency: Tasks should be safe to run multiple times with the same input
  2. Atomicity: Each task should be a single unit of work that either succeeds or fails completely
  3. Error Handling: Always include proper exception handling and logging
  4. Resource Management: Close database connections and file handles properly
  5. Logging: Include comprehensive logging for debugging and monitoring

Performance Optimization

Here's an example of a well-optimized task:

Optimized Task Structure
@celery_app.task(
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 3, 'countdown': 60},
    soft_time_limit=300,  # 5 minutes
    time_limit=320,       # 5 minutes 20 seconds
)
def optimized_task(self, data):
    """
    Well-optimized task with proper resource management
    """
    db = SessionLocal()

    try:
        # Process data efficiently
        result = efficient_processing(data)

        # Batch database operations
        batch_update_records(db, result)

        return {"processed": len(result), "status": "success"}

    except Exception as exc:
        db.rollback()
        self.retry(exc=exc)

    finally:
        db.close()

Key optimizations:

  • Time limits: Prevent tasks from running indefinitely
  • Batch operations: Reduce database round trips
  • Proper cleanup: Always close resources in finally blocks
  • Retry logic: Handle transient failures gracefully

Security Considerations

  • Input validation: Always validate task inputs
  • Authentication: Don't pass sensitive data as task arguments
  • Rate limiting: Prevent abuse of resource-intensive tasks
  • Monitoring: Monitor for unusual task patterns

Troubleshooting Common Issues

Here are solutions to common problems you might encounter:

Worker Not Starting

Worker Not Starting: Check Redis connection and ensure the broker URL is correct in your environment variables.

Common causes:

  • Redis is not running
  • Incorrect REDIS_DSN in environment variables
  • Firewall blocking Redis port (6379)
  • Redis authentication issues

Tasks Not Executing

Tasks Not Executing: Verify that tasks are properly imported in the include list of your Celery configuration.

Check that:

  • Tasks are imported in celery_setup.py
  • Task names match exactly
  • Workers are running and connected
  • Tasks are being sent to the correct queue

Memory Issues

Memory Issues: Monitor worker memory usage and configure worker_max_tasks_per_child to prevent memory leaks.

Solutions:

  • Set worker_max_tasks_per_child to restart workers periodically
  • Monitor memory usage with system tools
  • Profile tasks to identify memory-intensive operations
  • Use connection pooling for database operations

Production Deployment

Scale Workers Appropriately

When deploying to production, scale workers based on your application's needs:

# Run multiple workers with different concurrency
celery -A celery_setup worker --loglevel=info --concurrency=4
celery -A celery_setup worker --loglevel=info --concurrency=2 -Q heavy

Set Up Monitoring and Alerting

Set up monitoring for production environments:

  • Monitor Redis memory usage and connection counts
  • Track task execution times and failure rates
  • Set up alerts for failed tasks and worker outages
  • Monitor worker health and queue lengths

Implement Security Measures

  • Use Redis AUTH for authentication
  • Configure proper network security (VPC, firewalls)
  • Use SSL/TLS for Redis connections
  • Limit task execution time to prevent DoS attacks
  • Validate all task inputs thoroughly

Conclusion

Background tasks are essential for building scalable web applications. With FastLaunchAPI's Celery integration, you can:

  • Process heavy operations without blocking your API
  • Schedule recurring maintenance tasks
  • Send emails and notifications asynchronously
  • Handle file processing and data analysis
  • Scale your application to handle more users

Start with simple tasks and gradually add more complex patterns as your application grows. Remember to monitor your tasks in production and follow the best practices outlined in this guide.

For more advanced use cases, consider exploring Celery's workflow features, custom task classes, and integration with other services like monitoring and alerting systems.