AI Integration
Transform your FastAPI application with AI capabilities using smart integration patterns and best practices.
Overview
🚀 Quick Integration
Add AI to your existing FastAPI app in minutes with background task processing
🧠 Smart Routing
Choose the right AI model for each task automatically
💾 Database Integration
Store AI results, cache responses, and track costs
💡 Prompt Engineering
Create effective prompts that get better results
Quick Setup
Since you already have background task processing set up, adding AI is straightforward:
The standard packages are already included in your requirements.txt
:
openai>=1.0.0
langchain>=0.1.0
langchain-openai>=0.1.0
tiktoken>=0.5.0
# Optional: Other providers
anthropic>=0.7.0
google-generativeai>=0.3.0
cohere>=4.0.0
Add your AI provider keys to your environment:
# OpenAI (Most Popular)
OPENAI_API_KEY=your_key_here
OPENAI_MODEL=gpt-4o-mini
# Anthropic Claude
ANTHROPIC_API_KEY=your_key_here
# Google Gemini
GOOGLE_API_KEY=your_key_here
Start with a simple AI task using your existing background processing:
from celery_setup import celery_app
from openai import OpenAI
from database import SessionLocal
import os
@celery_app.task()
def analyze_text(text: str, analysis_type: str = "sentiment", user_id: int = None):
"""Analyze text using AI - runs in background"""
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
prompts = {
"sentiment": f"Analyze the sentiment of this text: {text}",
"summary": f"Summarize this text in 2-3 sentences: {text}",
"keywords": f"Extract 5 key topics from this text: {text}"
}
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompts[analysis_type]}],
temperature=0.3,
max_tokens=500
)
# Save result to database
db = SessionLocal()
try:
# Save AI analysis result
ai_result = AIAnalysis(
user_id=user_id,
input_text=text,
analysis_type=analysis_type,
result=response.choices[0].message.content,
model_used="gpt-4o-mini"
)
db.add(ai_result)
db.commit()
return {
"success": True,
"result": response.choices[0].message.content,
"analysis_id": ai_result.id
}
finally:
db.close()
AI Model Comparison
GPT-4o
Best for: Complex reasoning, coding, analysis
Cost: High
Speed: Medium
Strengths: Most capable, excellent at complex tasks
GPT-4o-mini
Best for: Simple tasks, high volume processing
Cost: Low
Speed: Fast
Strengths: Cost-effective, good for basic tasks
Claude-3.5-Sonnet
Best for: Text analysis, research, safety Cost: Medium Speed: Medium Strengths: Excellent analysis, very safe responses
Claude-3-Haiku
Best for: Quick tasks, simple responses Cost: Low Speed: Very Fast Strengths: Ultra-fast, efficient for simple tasks
Gemini Pro
Best for: Multimodal (image + text)
Cost: Medium
Speed: Fast
Strengths: Great for visual tasks, competitive pricing
Cohere Command-R+
Best for: RAG, search, knowledge retrieval
Cost: Medium
Speed: Fast
Strengths: Optimized for retrieval tasks
Core Integration Patterns
Streaming AI Response
Perfect for chat interfaces where you want real-time response streaming
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
router = APIRouter(prefix="/ai")
@router.post("/chat/stream")
async def stream_chat(message: str):
"""Stream AI response in real-time"""
async def generate():
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}],
stream=True,
temperature=0.7
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
return StreamingResponse(generate(), media_type="text/plain")
Prompt Engineering Best Practices
Create consistent, reusable prompt templates:
class PromptBuilder:
@staticmethod
def create_system_prompt(role: str, context: str = ""):
"""Create consistent system prompts"""
base = f"You are a {role}."
if context:
base += f" Context: {context}"
return base
@staticmethod
def create_task_prompt(task: str, input_data: str, format_instructions: str = ""):
"""Create structured task prompts"""
prompt = f"Task: {task}\n\nInput: {input_data}"
if format_instructions:
prompt += f"\n\nFormat your response as: {format_instructions}"
return prompt
Improve accuracy with concrete examples:
def create_classification_prompt(text: str, categories: list):
"""Use examples to improve classification accuracy"""
examples = """
Examples:
Text: "I love this product!" → Category: positive
Text: "This is terrible" → Category: negative
Text: "It's okay, nothing special" → Category: neutral
"""
return f"""
{examples}
Classify this text into one of these categories: {', '.join(categories)}
Text: "{text}"
Category:
"""
Encourage step-by-step reasoning for complex tasks:
def create_reasoning_prompt(problem: str):
"""Encourage step-by-step thinking"""
return f"""
Solve this problem step by step:
{problem}
Think through this by:
1. Understanding what's being asked
2. Identifying key information
3. Working through the solution
4. Providing your final answer
Let's work through this step by step:
"""
Advanced AI Integrations
RAG lets your AI answer questions using your own documents and data
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from database import SessionLocal
@celery_app.task()
def answer_with_context(question: str, document_ids: list, user_id: int):
"""Answer questions using your own documents"""
db = SessionLocal()
try:
# Retrieve relevant documents from database
documents = db.query(Document).filter(
Document.id.in_(document_ids),
Document.user_id == user_id
).all()
# Create embeddings and search
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
# Add documents to vector store
texts = [doc.content for doc in documents]
vectorstore.add_texts(texts)
# Get relevant context
docs = vectorstore.similarity_search(question, k=3)
context = "\n".join([doc.page_content for doc in docs])
# Generate answer with context
client = OpenAI()
prompt = f"""
Answer this question based on the provided context.
If the context doesn't contain the answer, say so.
Context: {context}
Question: {question}
Answer:
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3
)
# Save Q&A to database
qa_record = QASession(
user_id=user_id,
question=question,
answer=response.choices[0].message.content,
context_docs=document_ids,
model_used="gpt-4o-mini"
)
db.add(qa_record)
db.commit()
return {
"answer": response.choices[0].message.content,
"session_id": qa_record.id
}
finally:
db.close()
Process images alongside text for richer AI interactions
import base64
@celery_app.task()
def analyze_image_and_text(image_path: str, text_prompt: str):
"""Analyze images with text context"""
# Encode image
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o", # Supports vision
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": text_prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
max_tokens=1000
)
return response.choices[0].message.content
Smart AI Routing
Automatically choose the best AI model for each task to optimize cost and performance
class AIRouter:
"""Route tasks to the best AI model"""
MODEL_ROUTING = {
"simple": ("gpt-4o-mini", 0.3),
"complex": ("gpt-4o", 0.7),
"creative": ("gpt-4o", 0.9),
"analysis": ("claude-3-sonnet", 0.1)
}
@classmethod
def get_model_for_task(cls, task_type: str, text_length: int = 0):
"""Smart model selection based on task and complexity"""
# Route based on text length
if text_length > 50000:
return ("claude-3-sonnet", 0.3) # Better for long texts
# Route based on task type
return cls.MODEL_ROUTING.get(task_type, ("gpt-4o-mini", 0.5))
Database Models for AI Integration
These models help you track AI usage, cache responses, and store analysis results
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Float
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class AIAnalysis(Base):
__tablename__ = "ai_analysis"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
input_text = Column(Text, nullable=False)
analysis_type = Column(String(50), nullable=False)
result = Column(Text, nullable=False)
model_used = Column(String(50), nullable=False)
tokens_used = Column(Integer, default=0)
cost = Column(Float, default=0.0)
created_at = Column(DateTime, default=datetime.utcnow)
class QASession(Base):
__tablename__ = "qa_sessions"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
question = Column(Text, nullable=False)
answer = Column(Text, nullable=False)
context_docs = Column(JSON) # Store document IDs used
model_used = Column(String(50), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
class AICache(Base):
__tablename__ = "ai_cache"
id = Column(Integer, primary_key=True)
cache_key = Column(String(255), unique=True, nullable=False)
result = Column(Text, nullable=False)
expires_at = Column(DateTime, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
embedding = Column(JSON) # Store vector embeddings
created_at = Column(DateTime, default=datetime.utcnow)
Cost Optimization
Monitor and optimize token usage:
import tiktoken
def estimate_tokens(text: str, model: str = "gpt-4o-mini") -> int:
"""Estimate token count before API call"""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def optimize_prompt(prompt: str, max_tokens: int = 4000) -> str:
"""Truncate prompt if too long"""
if estimate_tokens(prompt) > max_tokens:
# Truncate from middle, keep beginning and end
words = prompt.split()
target_words = len(words) * max_tokens // estimate_tokens(prompt)
start_words = words[:target_words//2]
end_words = words[-target_words//2:]
return " ".join(start_words) + "\n...[truncated]...\n" + " ".join(end_words)
return prompt
Cache responses in your database to avoid duplicate API calls:
import hashlib
import json
from functools import wraps
from database import SessionLocal
def cache_ai_response(expiry_hours: int = 24):
"""Cache AI responses to reduce API calls"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = hashlib.md5(
json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True).encode()
).hexdigest()
db = SessionLocal()
try:
# Check cache in database
cached_result = db.query(AICache).filter(
AICache.cache_key == cache_key,
AICache.expires_at > datetime.utcnow()
).first()
if cached_result:
return json.loads(cached_result.result)
# Execute function
result = await func(*args, **kwargs)
# Cache result in database
cache_record = AICache(
cache_key=cache_key,
result=json.dumps(result),
expires_at=datetime.utcnow() + timedelta(hours=expiry_hours)
)
db.add(cache_record)
db.commit()
return result
finally:
db.close()
return wrapper
return decorator
Error Handling & Monitoring
Proper error handling is crucial for production AI applications
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def ai_error_handler(func):
"""Decorator for AI task error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"AI task {func.__name__} failed: {str(e)}")
# Fallback strategies
if "rate limit" in str(e).lower():
# Retry with different model
return {"error": "rate_limit", "retry_after": 60}
elif "token limit" in str(e).lower():
# Truncate and retry
return {"error": "token_limit", "suggestion": "reduce_input"}
return {"error": "general", "message": str(e)}
return wrapper
Alternative AI Providers
from anthropic import Anthropic
@celery_app.task()
def analyze_with_claude(text: str):
"""Use Claude for analysis tasks"""
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"Analyze this text for key insights: {text}"
}]
)
return response.content[0].text
import google.generativeai as genai
@celery_app.task()
def process_with_gemini(prompt: str):
"""Use Gemini for fast processing"""
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
model = genai.GenerativeModel('gemini-pro')
response = model.generate_content(prompt)
return response.text
Integration Checklist
Setup
- Choose AI provider based on your needs
- Set up API keys and environment variables
- Create database models for AI storage
Implementation
-
Create background tasks for AI processing
-
Implement proper error handling
-
Add response caching for cost optimization
-
Set up token counting and management
Optimization
- Create structured prompt templates
- Add rate limiting and monitoring
- Test with different model configurations
- Monitor costs and usage patterns
Key Benefits
🔧 Flexible
Easy to switch between AI providers and models based on your needs
💰 Cost-effective
Smart routing and caching optimize costs automatically
📈 Scalable
Background processing handles high loads without blocking your app
🛡️ Reliable
Proper error handling and fallbacks ensure stability
This guide provides practical patterns for adding AI to your existing FastAPI setup without overwhelming boilerplate code. Start with simple text analysis and gradually add more advanced features like RAG and multi-modal processing.