Loading IconFastLaunchAPI
Features

AI Integration

Transform your FastAPI application with AI capabilities using smart integration patterns and best practices.

Overview

Quick Setup

Since you already have background task processing set up, adding AI is straightforward:

The standard packages are already included in your requirements.txt:

openai>=1.0.0
langchain>=0.1.0
langchain-openai>=0.1.0
tiktoken>=0.5.0
# Optional: Other providers
anthropic>=0.7.0
google-generativeai>=0.3.0
cohere>=4.0.0

Add your AI provider keys to your environment:

# OpenAI (Most Popular)
OPENAI_API_KEY=your_key_here
OPENAI_MODEL=gpt-4o-mini

# Anthropic Claude
ANTHROPIC_API_KEY=your_key_here

# Google Gemini
GOOGLE_API_KEY=your_key_here

Start with a simple AI task using your existing background processing:

from celery_setup import celery_app
from openai import OpenAI
from database import SessionLocal
import os

@celery_app.task()
def analyze_text(text: str, analysis_type: str = "sentiment", user_id: int = None):
    """Analyze text using AI - runs in background"""
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    prompts = {
        "sentiment": f"Analyze the sentiment of this text: {text}",
        "summary": f"Summarize this text in 2-3 sentences: {text}",
        "keywords": f"Extract 5 key topics from this text: {text}"
    }

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompts[analysis_type]}],
        temperature=0.3,
        max_tokens=500
    )

    # Save result to database
    db = SessionLocal()
    try:
        # Save AI analysis result
        ai_result = AIAnalysis(
            user_id=user_id,
            input_text=text,
            analysis_type=analysis_type,
            result=response.choices[0].message.content,
            model_used="gpt-4o-mini"
        )
        db.add(ai_result)
        db.commit()

        return {
            "success": True,
            "result": response.choices[0].message.content,
            "analysis_id": ai_result.id
        }
    finally:
        db.close()

AI Model Comparison

🚀

GPT-4o

Best for: Complex reasoning, coding, analysis
Cost: High
Speed: Medium
Strengths: Most capable, excellent at complex tasks

GPT-4o-mini

Best for: Simple tasks, high volume processing
Cost: Low
Speed: Fast
Strengths: Cost-effective, good for basic tasks

🔍

Claude-3.5-Sonnet

Best for: Text analysis, research, safety Cost: Medium Speed: Medium Strengths: Excellent analysis, very safe responses

📝

Claude-3-Haiku

Best for: Quick tasks, simple responses Cost: Low Speed: Very Fast Strengths: Ultra-fast, efficient for simple tasks

🎯

Gemini Pro

Best for: Multimodal (image + text)
Cost: Medium
Speed: Fast
Strengths: Great for visual tasks, competitive pricing

🔎

Cohere Command-R+

Best for: RAG, search, knowledge retrieval
Cost: Medium
Speed: Fast
Strengths: Optimized for retrieval tasks

Core Integration Patterns

Streaming AI Response

Perfect for chat interfaces where you want real-time response streaming

from fastapi import APIRouter
from fastapi.responses import StreamingResponse

router = APIRouter(prefix="/ai")

@router.post("/chat/stream")
async def stream_chat(message: str):
    """Stream AI response in real-time"""

    async def generate():
        client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": message}],
            stream=True,
            temperature=0.7
        )

        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield f"data: {chunk.choices[0].delta.content}\n\n"

    return StreamingResponse(generate(), media_type="text/plain")

Prompt Engineering Best Practices

Create consistent, reusable prompt templates:

class PromptBuilder:
    @staticmethod
    def create_system_prompt(role: str, context: str = ""):
        """Create consistent system prompts"""
        base = f"You are a {role}."
        if context:
            base += f" Context: {context}"
        return base

    @staticmethod
    def create_task_prompt(task: str, input_data: str, format_instructions: str = ""):
        """Create structured task prompts"""
        prompt = f"Task: {task}\n\nInput: {input_data}"
        if format_instructions:
            prompt += f"\n\nFormat your response as: {format_instructions}"
        return prompt

Improve accuracy with concrete examples:

def create_classification_prompt(text: str, categories: list):
    """Use examples to improve classification accuracy"""
    examples = """
    Examples:
    Text: "I love this product!" → Category: positive
    Text: "This is terrible" → Category: negative
    Text: "It's okay, nothing special" → Category: neutral
    """

    return f"""
    {examples}

    Classify this text into one of these categories: {', '.join(categories)}
    Text: "{text}"
    Category:
    """

Encourage step-by-step reasoning for complex tasks:

def create_reasoning_prompt(problem: str):
    """Encourage step-by-step thinking"""
    return f"""
    Solve this problem step by step:
    {problem}

    Think through this by:
    1. Understanding what's being asked
    2. Identifying key information
    3. Working through the solution
    4. Providing your final answer

    Let's work through this step by step:
    """

Advanced AI Integrations

RAG lets your AI answer questions using your own documents and data

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from database import SessionLocal

@celery_app.task()
def answer_with_context(question: str, document_ids: list, user_id: int):
    """Answer questions using your own documents"""

    db = SessionLocal()
    try:
        # Retrieve relevant documents from database
        documents = db.query(Document).filter(
            Document.id.in_(document_ids),
            Document.user_id == user_id
        ).all()

        # Create embeddings and search
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(embedding_function=embeddings)

        # Add documents to vector store
        texts = [doc.content for doc in documents]
        vectorstore.add_texts(texts)

        # Get relevant context
        docs = vectorstore.similarity_search(question, k=3)
        context = "\n".join([doc.page_content for doc in docs])

        # Generate answer with context
        client = OpenAI()
        prompt = f"""
        Answer this question based on the provided context.
        If the context doesn't contain the answer, say so.

        Context: {context}
        Question: {question}
        Answer:
        """

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )

        # Save Q&A to database
        qa_record = QASession(
            user_id=user_id,
            question=question,
            answer=response.choices[0].message.content,
            context_docs=document_ids,
            model_used="gpt-4o-mini"
        )
        db.add(qa_record)
        db.commit()

        return {
            "answer": response.choices[0].message.content,
            "session_id": qa_record.id
        }
    finally:
        db.close()

Process images alongside text for richer AI interactions

import base64

@celery_app.task()
def analyze_image_and_text(image_path: str, text_prompt: str):
    """Analyze images with text context"""

    # Encode image
    with open(image_path, "rb") as image_file:
        base64_image = base64.b64encode(image_file.read()).decode('utf-8')

    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o",  # Supports vision
        messages=[
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": text_prompt},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    }
                ]
            }
        ],
        max_tokens=1000
    )

    return response.choices[0].message.content

Smart AI Routing

Automatically choose the best AI model for each task to optimize cost and performance

class AIRouter:
    """Route tasks to the best AI model"""

    MODEL_ROUTING = {
        "simple": ("gpt-4o-mini", 0.3),
        "complex": ("gpt-4o", 0.7),
        "creative": ("gpt-4o", 0.9),
        "analysis": ("claude-3-sonnet", 0.1)
    }

    @classmethod
    def get_model_for_task(cls, task_type: str, text_length: int = 0):
        """Smart model selection based on task and complexity"""

        # Route based on text length
        if text_length > 50000:
            return ("claude-3-sonnet", 0.3)  # Better for long texts

        # Route based on task type
        return cls.MODEL_ROUTING.get(task_type, ("gpt-4o-mini", 0.5))

Database Models for AI Integration

These models help you track AI usage, cache responses, and store analysis results

from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Float
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class AIAnalysis(Base):
    __tablename__ = "ai_analysis"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    input_text = Column(Text, nullable=False)
    analysis_type = Column(String(50), nullable=False)
    result = Column(Text, nullable=False)
    model_used = Column(String(50), nullable=False)
    tokens_used = Column(Integer, default=0)
    cost = Column(Float, default=0.0)
    created_at = Column(DateTime, default=datetime.utcnow)

class QASession(Base):
    __tablename__ = "qa_sessions"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    question = Column(Text, nullable=False)
    answer = Column(Text, nullable=False)
    context_docs = Column(JSON)  # Store document IDs used
    model_used = Column(String(50), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
class AICache(Base):
    __tablename__ = "ai_cache"
    
    id = Column(Integer, primary_key=True)
    cache_key = Column(String(255), unique=True, nullable=False)
    result = Column(Text, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class Document(Base):
    __tablename__ = "documents"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    embedding = Column(JSON)  # Store vector embeddings
    created_at = Column(DateTime, default=datetime.utcnow)

Cost Optimization

Monitor and optimize token usage:

import tiktoken

def estimate_tokens(text: str, model: str = "gpt-4o-mini") -> int:
    """Estimate token count before API call"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def optimize_prompt(prompt: str, max_tokens: int = 4000) -> str:
    """Truncate prompt if too long"""
    if estimate_tokens(prompt) > max_tokens:
        # Truncate from middle, keep beginning and end
        words = prompt.split()
        target_words = len(words) * max_tokens // estimate_tokens(prompt)

        start_words = words[:target_words//2]
        end_words = words[-target_words//2:]

        return " ".join(start_words) + "\n...[truncated]...\n" + " ".join(end_words)

    return prompt

Cache responses in your database to avoid duplicate API calls:

import hashlib
import json
from functools import wraps
from database import SessionLocal

def cache_ai_response(expiry_hours: int = 24):
    """Cache AI responses to reduce API calls"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key
            cache_key = hashlib.md5(
                json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True).encode()
            ).hexdigest()

            db = SessionLocal()
            try:
                # Check cache in database
                cached_result = db.query(AICache).filter(
                    AICache.cache_key == cache_key,
                    AICache.expires_at > datetime.utcnow()
                ).first()

                if cached_result:
                    return json.loads(cached_result.result)

                # Execute function
                result = await func(*args, **kwargs)

                # Cache result in database
                cache_record = AICache(
                    cache_key=cache_key,
                    result=json.dumps(result),
                    expires_at=datetime.utcnow() + timedelta(hours=expiry_hours)
                )
                db.add(cache_record)
                db.commit()

                return result
            finally:
                db.close()
        return wrapper
    return decorator

Error Handling & Monitoring

Proper error handling is crucial for production AI applications

import logging
from functools import wraps

logger = logging.getLogger(__name__)

def ai_error_handler(func):
    """Decorator for AI task error handling"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"AI task {func.__name__} failed: {str(e)}")

            # Fallback strategies
            if "rate limit" in str(e).lower():
                # Retry with different model
                return {"error": "rate_limit", "retry_after": 60}
            elif "token limit" in str(e).lower():
                # Truncate and retry
                return {"error": "token_limit", "suggestion": "reduce_input"}

            return {"error": "general", "message": str(e)}
    return wrapper

Alternative AI Providers

from anthropic import Anthropic

@celery_app.task()
def analyze_with_claude(text: str):
    """Use Claude for analysis tasks"""
    client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1000,
        messages=[{
            "role": "user",
            "content": f"Analyze this text for key insights: {text}"
        }]
    )

    return response.content[0].text
import google.generativeai as genai

@celery_app.task()
def process_with_gemini(prompt: str):
    """Use Gemini for fast processing"""
    genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
    model = genai.GenerativeModel('gemini-pro')

    response = model.generate_content(prompt)
    return response.text

Integration Checklist

Setup

  • Choose AI provider based on your needs
  • Set up API keys and environment variables
  • Create database models for AI storage

Implementation

  • Create background tasks for AI processing

  • Implement proper error handling

  • Add response caching for cost optimization

  • Set up token counting and management

Optimization

  • Create structured prompt templates
  • Add rate limiting and monitoring
  • Test with different model configurations
  • Monitor costs and usage patterns

Key Benefits

⚙️

🔧 Flexible

Easy to switch between AI providers and models based on your needs

💎

💰 Cost-effective

Smart routing and caching optimize costs automatically

🚀

📈 Scalable

Background processing handles high loads without blocking your app

🔒

🛡️ Reliable

Proper error handling and fallbacks ensure stability

This guide provides practical patterns for adding AI to your existing FastAPI setup without overwhelming boilerplate code. Start with simple text analysis and gradually add more advanced features like RAG and multi-modal processing.