FastAPI vs Django DRF vs Flask - Which Is the Fastest for Building APIs
Developer Service

Developer Service @devasservice

About: Indie Hacker, Full Stack Developer, Content Creator 👨‍💻 Creating software products and teaching upcoming entrepreneurs on my blog 👨‍🏫

Location:
Netherlands
Joined:
Oct 30, 2021

FastAPI vs Django DRF vs Flask - Which Is the Fastest for Building APIs

Publish Date: Aug 7
1 0

As someone who's built APIs with FastAPI, Django REST Framework, and Flask in different projects, I’ve always wondered how they really stack up when it comes to raw performance. On paper, FastAPI claims to be lightning-fast thanks to async I/O, DRF is often the go-to for full-featured enterprise APIs, and Flask is known for its simplicity and flexibility. But what happens when you actually put them under load in a real-world environment?

This article is my attempt to answer that question, not through synthetic benchmarks or isolated function calls, but by building the same CRUD API with PostgreSQL integration in all three frameworks, deploying them with Docker Compose, and benchmarking them using tools like Locust and a custom httpx script.

Why does this matter? Because API performance affects everything from how many users your system can handle, to how much you pay for hosting, to how responsive your product feels. If you're choosing a backend framework today, or you're just curious like I was, these differences can have a major impact.

I’ll walk through the setup, compare how each framework handles CRUD operations, simulate real-world traffic, and break down the results. My goal is to give you a clear, data-driven perspective on how FastAPI, Flask, and DRF perform.


Setup Overview

Before we dive into benchmarks, let me walk you through how everything was set up. I wanted this to be as realistic and repeatable as possible, not a local-only test, and not skewed in favor of any one framework. So I created a level playing field where each API does the exact same job under the same conditions.

Tech Stack

  • Frameworks: I built three identical CRUD APIs using FastAPI, Flask, and Django REST Framework. Each one defines an Item model with basic fields like name, description, price, and in_stock, and exposes the same /items/ endpoints.

  • Database: All APIs connect to a dedicated PostgreSQL container to mimic a real-world production backend.

  • Deployment: Everything runs in separate containers managed with Docker Compose, one for each API, one for each of the databases. This made it easy to isolate, restart, and test consistently.

  • Benchmarking tools: I used a custom async Python script with httpx and asyncio to simulate full CRUD operations, and also added Locust to simulate concurrent users. Both tools gave me a nice mix of automated testing and real-time load simulation.

  • Tested Endpoints: Each framework exposes a /items/ endpoint with support for all four CRUD methods (POST, GET, PUT, DELETE).

  • Deployment Environment: To avoid any local bottlenecks or throttling, I spun up a small VPS on Hetzner Cloud to run the benchmarks. This kept the results closer to real-world conditions you might face in staging or production.

This setup gave me a good balance between control and realism. If you want to follow along or try your own variations (maybe throw Fastify or Express into the mix), the entire project is containerized and reproducible with a single docker-compose up.


Dockerized Architecture

Spinning up and tearing down three APIs with separate databases can be a huge pain, unless you Docker it. From the start, I wanted this benchmark to be portable, clean, and bulletproof. That meant isolating each framework and its database in its own container, avoiding cross-talk and keeping results honest.

I used Docker Compose to orchestrate everything: three Python APIs (FastAPI, Flask, Django DRF), each backed by its own PostgreSQL database.

Service Layout

Here’s how everything is laid out:

Service Layout

All services are managed through a single docker-compose.yml file, with health checks to ensure each PostgreSQL container is ready before the API boots up.

Exposed Ports

API Host URL Database Container DB Port DB Name
FastAPI http://localhost:8000 postgres-fastapi 5432 testdb_fastapi
Flask http://localhost:5000 postgres-flask 5433 testdb_flask
Django DRF http://localhost:8001 postgres-drf 5434 testdb_drf

Each PostgreSQL container is tied to its API via environment variables like DATABASE_URL, so connection logic is clean and centralized. Health checks using pg_isready add resilience, the APIs won’t even try to start until their DB is confirmed healthy.

Docker Compose

For reference, here is the docker-compose.yml file:


services:
  postgres-fastapi:
    image: postgres:15
    container_name: pg-fastapi
    restart: always
    environment:
      POSTGRES_DB: testdb_fastapi
      POSTGRES_USER: testuser
      POSTGRES_PASSWORD: testpass
    ports:
      - "5432:5432"
    volumes:
      - pgdata_fastapi:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U testuser -d testdb_fastapi"]
      interval: 10s
      timeout: 10s
      retries: 10
      start_period: 60s

  postgres-flask:
    image: postgres:15
    container_name: pg-flask
    restart: always
    environment:
      POSTGRES_DB: testdb_flask
      POSTGRES_USER: testuser
      POSTGRES_PASSWORD: testpass
    ports:
      - "5433:5432"
    volumes:
      - pgdata_flask:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U testuser -d testdb_flask"]
      interval: 10s
      timeout: 10s
      retries: 10
      start_period: 60s

  postgres-drf:
    image: postgres:15
    container_name: pg-drf
    restart: always
    environment:
      POSTGRES_DB: testdb_drf
      POSTGRES_USER: testuser
      POSTGRES_PASSWORD: testpass
    ports:
      - "5434:5432"
    volumes:
      - pgdata_drf:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U testuser -d testdb_drf"]
      interval: 10s
      timeout: 10s
      retries: 10
      start_period: 60s

  fastapi:
    build:
      context: ./fastapi_app
    container_name: fastapi-app
    ports:
      - "8000:8000"
    depends_on:
      postgres-fastapi:
        condition: service_healthy
    environment:
      - DATABASE_URL=postgresql://testuser:testpass@postgres-fastapi/testdb_fastapi

  flask:
    build:
      context: ./flask_app
    container_name: flask-app
    ports:
      - "5000:5000"
    depends_on:
      postgres-flask:
        condition: service_healthy
    environment:
      - DATABASE_URL=postgresql://testuser:testpass@postgres-flask/testdb_flask

  drf:
    build:
      context: ./drf_app
    container_name: drf-app
    ports:
      - "8001:8000"
    depends_on:
      postgres-drf:
        condition: service_healthy
    environment:
      - DATABASE_URL=postgresql://testuser:testpass@postgres-drf/testdb_drf

volumes:
  pgdata_fastapi:
  pgdata_flask:
  pgdata_drf:

Enter fullscreen mode Exit fullscreen mode

The CRUD APIs

All three frameworks expose the same simple resource: Item. It's a small, realistic model with typical e-commerce fields name, description, price, and stock status.

To keep things fair, I stuck to the same table schema and logic in all three implementations. Below, you'll find a quick peek at how each framework handles CRUD. Full implementations are available on GitHub (linked below).

FastAPI

FastAPI shines when it comes to clean, declarative code. I used SQLAlchemy with asyncpg for performance and pydantic for data validation.

models.py:

from sqlalchemy import Column, Integer, String, Float, Boolean
from database import Base

class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String)
    price = Column(Float)
    in_stock = Column(Boolean, default=True)
Enter fullscreen mode Exit fullscreen mode

database.py:

import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool

# Use environment variable if available, otherwise fallback to default
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://testuser:testpass@postgres/testdb_fastapi")

# Create engine with optimized connection pool settings for high load
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,  # Increased from default 5
    max_overflow=30,  # Increased from default 10
    pool_pre_ping=True,  # Verify connections before use
    pool_recycle=3600,  # Recycle connections every hour
    pool_timeout=60,  # Increased timeout
    echo=False  # Set to True for SQL debugging
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
Enter fullscreen mode Exit fullscreen mode

main.py:

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import models, database
from pydantic import BaseModel
from typing import List

models.Base.metadata.create_all(bind=database.engine)

app = FastAPI()

def get_db():
    db = database.SessionLocal()
    try:
        yield db
    finally:
        db.close()

class ItemSchema(BaseModel):
    id: int = None
    name: str
    description: str
    price: float
    in_stock: bool

    class Config:
        from_attributes = True

@app.post("/items/", response_model=ItemSchema)
def create_item(item: ItemSchema, db: Session = Depends(get_db)):
    try:
        # Exclude id from the data since it's auto-generated
        item_data = item.dict(exclude={'id'})
        db_item = models.Item(**item_data)
        db.add(db_item)
        db.commit()
        db.refresh(db_item)
        return db_item
    except Exception as e:
        db.rollback()
        raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")

@app.get("/items/", response_model=List[ItemSchema])
def list_items(db: Session = Depends(get_db)):
    return db.query(models.Item).all()

@app.get("/items/{item_id}", response_model=ItemSchema)
def get_item(item_id: int, db: Session = Depends(get_db)):
    item = db.query(models.Item).get(item_id)
    if not item:
        raise HTTPException(status_code=404, detail="Item not found")
    return item

@app.put("/items/{item_id}", response_model=ItemSchema)
def update_item(item_id: int, item: ItemSchema, db: Session = Depends(get_db)):
    db_item = db.query(models.Item).get(item_id)
    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")

    # Update only the fields that should be updated (exclude id)
    update_data = item.dict(exclude={'id'})
    for key, value in update_data.items():
        setattr(db_item, key, value)

    db.commit()
    db.refresh(db_item)
    return db_item

@app.delete("/items/{item_id}")
def delete_item(item_id: int, db: Session = Depends(get_db)):
    db_item = db.query(models.Item).get(item_id)
    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")
    db.delete(db_item)
    db.commit()
    return {"ok": True}
Enter fullscreen mode Exit fullscreen mode

Flask

Flask was set up using SQLAlchemy and classic route decorators. It’s more barebones, but straightforward. I defined the database in a separate database.py file for clarity.

models.py:

from flask_sqlalchemy import SQLAlchemy
from database import db

class Item(db.Model):
    __tablename__ = "items"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100))
    description = db.Column(db.String(200))
    price = db.Column(db.Float)
    in_stock = db.Column(db.Boolean, default=True)
Enter fullscreen mode Exit fullscreen mode

database.py:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
import os

DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://testuser:testpass@postgres/testdb_flask")

# Create engine with optimized connection pool settings for high load
engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,  # Increased from default 5
    max_overflow=30,  # Increased from default 10
    pool_pre_ping=True,  # Verify connections before use
    pool_recycle=3600,  # Recycle connections every hour
    pool_timeout=60,  # Increased timeout
    echo=False  # Set to True for SQL debugging
)

db = SQLAlchemy()
Enter fullscreen mode Exit fullscreen mode

app.py:

from flask import Flask, request, jsonify, abort
from models import Item
from database import db, DATABASE_URL, engine

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_size': 20,
    'max_overflow': 30,
    'pool_pre_ping': True,
    'pool_recycle': 3600,
    'pool_timeout': 60
}

db.init_app(app)

with app.app_context():
    db.create_all()

@app.route("/items/", methods=["POST"])
def create_item():
    data = request.get_json()
    item = Item(**data)
    db.session.add(item)
    db.session.commit()
    return jsonify({"id": item.id, **data})

@app.route("/items/", methods=["GET"])
def list_items():
    items = Item.query.all()
    return jsonify([{ "id": i.id, "name": i.name, "description": i.description, "price": i.price, "in_stock": i.in_stock } for i in items])

@app.route("/items/<int:item_id>", methods=["GET"])
def get_item(item_id):
    item = Item.query.get(item_id)
    if not item:
        abort(404)
    return jsonify({ "id": item.id, "name": item.name, "description": item.description, "price": item.price, "in_stock": item.in_stock })

@app.route("/items/<int:item_id>", methods=["PUT"])
def update_item(item_id):
    item = Item.query.get(item_id)
    if not item:
        abort(404)
    data = request.get_json()
    for key, value in data.items():
        setattr(item, key, value)
    db.session.commit()
    return jsonify({ "id": item.id, **data })

@app.route("/items/<int:item_id>", methods=["DELETE"])
def delete_item(item_id):
    item = Item.query.get(item_id)
    if not item:
        abort(404)
    db.session.delete(item)
    db.session.commit()
    return jsonify({"ok": True})

Enter fullscreen mode Exit fullscreen mode

Django REST Framework (DRF)

Django REST Framework takes a more structured approach. I used Django’s ModelSerializer and ViewSet, and auto-generated all CRUD endpoints with a router.

models.py:

from django.db import models

class Item(models.Model):
    name = models.CharField(max_length=100)
    description = models.TextField()
    price = models.FloatField()
    in_stock = models.BooleanField(default=True)
Enter fullscreen mode Exit fullscreen mode

serializers.py:

from rest_framework import serializers
from .models import Item

class ItemSerializer(serializers.ModelSerializer):
    class Meta:
        model = Item
        fields = '__all__'
Enter fullscreen mode Exit fullscreen mode

views.py:

from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import Item
from .serializers import ItemSerializer

class ItemViewSet(viewsets.ModelViewSet):
    queryset = Item.objects.all()
    serializer_class = ItemSerializer

    def update(self, request, *args, **kwargs):
        try:
            return super().update(request, *args, **kwargs)
        except Exception as e:
            return Response(
                {'error': str(e)}, 
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )
Enter fullscreen mode Exit fullscreen mode

urls.py:

from django.urls import path
from .views import ItemViewSet

# Create explicit URL patterns to avoid router issues
urlpatterns = [
    path('items/', ItemViewSet.as_view({'get': 'list', 'post': 'create'}), name='item-list'),
    path('items/<int:pk>/', ItemViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='item-detail'),
]
Enter fullscreen mode Exit fullscreen mode

Consistency Across Frameworks

Each API implements full CRUD:

  • POST /items/ – Create
  • GET /items/ – List
  • GET /items/{id} – Retrieve
  • PUT /items/{id} – Update
  • DELETE /items/{id} – Delete

Personal note: FastAPI felt snappy and modern. Flask was nostalgic and minimal. DRF? A bit heavy, but rock-solid once configured. Each has its flavour and that’s what made this fun.


Benchmark Methods

To ensure a fair and comprehensive comparison between FastAPI, Flask, and Django REST Framework (DRF), I used two distinct benchmarking approaches: a custom asynchronous script using httpx + asyncio for precise CRUD operation tracking, and Locust for simulating concurrent user traffic in a more real-world scenario.

Custom Async Script (httpx + asyncio)

This method measures the full lifecycle of CRUD operations under concurrent load across all three frameworks.

import asyncio
import httpx
import time
import logging
from datetime import datetime

ENDPOINTS = {
    "fastapi": "http://localhost:8000/items/",
    "flask": "http://localhost:5000/items/",
    "drf": "http://localhost:8001/items/",
}

NUM_REQUESTS = 500
CONCURRENCY = 20

item_payload = {
    "name": "Test Item",
    "description": "A performance benchmark item",
    "price": 99.99,
    "in_stock": True
}

# Setup logging
def setup_logging():
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_filename = f"api_benchmark_{timestamp}.log"

    # Disable httpx logging
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("httpcore").setLevel(logging.WARNING)

    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_filename, mode='w'),
            logging.StreamHandler()  # Also print to console
        ]
    )
    return log_filename

def print_metrics(name, duration, success, failures, total_requests):
    rps = success / duration if duration > 0 else 0
    logging.info(f"{name.upper()} =>")
    logging.info(f"  Duration: {duration:.2f}s")
    logging.info(f"  Success: {success}/{total_requests}")
    logging.info(f"  Failures: {failures}")
    logging.info(f"  RPS: {rps:.2f}")
    logging.info("")

async def post_item(client, url):
    try:
        r = await client.post(url, json=item_payload)

        if r.status_code in (200, 201):
            response_data = r.json()
            item_id = response_data.get("id")
            if item_id is None:
                # Try to find id in different possible formats
                if isinstance(response_data, dict):
                    for key, value in response_data.items():
                        if key.lower() == 'id' or (isinstance(value, int) and value > 0):
                            item_id = value
                            break
            return item_id
        else:
            logging.error(f"POST {url} - Failed with status {r.status_code}: {r.text}")
    except Exception as e:
        logging.error(f"POST {url} - Exception: {e}")
    return None

async def get_item(client, url, item_id):
    try:
        # Handle DRF trailing slash requirement
        if 'drf' in url or '8001' in url:
            get_url = f"{url}{item_id}/"
        else:
            get_url = f"{url}{item_id}"
        r = await client.get(get_url)
        if r.status_code == 200:
            return True
        else:
            logging.error(f"GET {get_url} - Failed: {r.text}")
            return False
    except Exception as e:
        logging.error(f"GET {url}{item_id} - Exception: {e}")
        return False

async def put_item(client, url, item_id):
    try:
        # Handle DRF trailing slash requirement
        if 'drf' in url or '8001' in url:
            put_url = f"{url}{item_id}/"
        else:
            put_url = f"{url}{item_id}"
        put_payload = {"name": "Updated", **item_payload}
        r = await client.put(put_url, json=put_payload)
        if r.status_code == 200:
            return True
        else:
            logging.error(f"PUT {put_url} - Failed: {r.text}")
            return False
    except Exception as e:
        logging.error(f"PUT {url}{item_id} - Exception: {e}")
        return False

async def delete_item(client, url, item_id):
    try:
        # Handle DRF trailing slash requirement
        if 'drf' in url or '8001' in url:
            delete_url = f"{url}{item_id}/"
        else:
            delete_url = f"{url}{item_id}"
        r = await client.delete(delete_url)
        if r.status_code in (200, 204):
            return True
        else:
            logging.error(f"DELETE {delete_url} - Failed: {r.text}")
            return False
    except Exception as e:
        logging.error(f"DELETE {url}{item_id} - Exception: {e}")
        return False

async def benchmark_crud(name, url):
    success = 0
    failures = 0
    item_ids = []

    logging.info(f"Starting benchmark for {name.upper()}...")

    # Use connection pooling and limits to prevent overwhelming the database
    limits = httpx.Limits(max_keepalive_connections=20, max_connections=50)
    timeout = httpx.Timeout(30.0, connect=10.0)

    async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
        start = time.perf_counter()

        # POST - Create items in batches to reduce connection pressure
        logging.info(f"{name.upper()} - Creating {NUM_REQUESTS} items...")
        post_tasks = [post_item(client, url) for _ in range(NUM_REQUESTS)]
        post_results = await asyncio.gather(*post_tasks, return_exceptions=True)
        item_ids.extend([id for id in post_results if id and not isinstance(id, Exception)])
        logging.info(f"{name.upper()} - Created {len(item_ids)} items")

        # Small delay to allow database to stabilize
        await asyncio.sleep(1)

        # GET - Read items
        logging.info(f"{name.upper()} - Reading {len(item_ids)} items...")
        get_tasks = [get_item(client, url, id) for id in item_ids]
        get_results = await asyncio.gather(*get_tasks, return_exceptions=True)

        # Small delay to allow database to stabilize
        await asyncio.sleep(1)

        # PUT - Update items
        logging.info(f"{name.upper()} - Updating {len(item_ids)} items...")
        put_tasks = [put_item(client, url, id) for id in item_ids]
        put_results = await asyncio.gather(*put_tasks, return_exceptions=True)

        # Small delay to allow database to stabilize
        await asyncio.sleep(1)

        # DELETE - Delete items
        logging.info(f"{name.upper()} - Deleting {len(item_ids)} items...")
        delete_tasks = [delete_item(client, url, id) for id in item_ids]
        delete_results = await asyncio.gather(*delete_tasks, return_exceptions=True)

        duration = time.perf_counter() - start

        total_requests = NUM_REQUESTS * 4  # POST + GET + PUT + DELETE
        success = sum([
            len([r for r in post_results if r and not isinstance(r, Exception)]),
            sum([r for r in get_results if r and not isinstance(r, Exception)]),
            sum([r for r in put_results if r and not isinstance(r, Exception)]),
            sum([r for r in delete_results if r and not isinstance(r, Exception)])
        ])
        failures = total_requests - success

        logging.info(f"{name.upper()} - Benchmark completed")
        print_metrics(name, duration, success, failures, total_requests)


async def main():
    # Setup logging
    log_filename = setup_logging()
    logging.info(f"Logging to: {log_filename}")

    logging.info("Starting API Benchmark...")
    logging.info("=" * 50)

    for name, url in ENDPOINTS.items():
        logging.info(f"Testing {name.upper()} at {url}")
        await benchmark_crud(name, url)
        # Add delay between frameworks to allow database recovery
        await asyncio.sleep(2)

    logging.info("Benchmark completed!")
    logging.info(f"Full log saved to: {log_filename}")

if __name__ == '__main__':
    asyncio.run(main())

Enter fullscreen mode Exit fullscreen mode

The script performs a set number of operations (POST, GET, PUT, and DELETE) on each API endpoint using an asynchronous client (httpx.AsyncClient) with a concurrency model powered by asyncio.

All operations are executed in sequence to mimic real-world item lifecycle behavior:

  • POST: Creates 500 items using the API.
  • GET: Fetches each created item individually.
  • PUT: Updates each item with new data.
  • DELETE: Deletes each item.

Each framework’s base URL is defined and tested independently to avoid cross-framework interference. Additional handling is implemented for DRF’s trailing slash requirements.

Collected Metrics

  • Duration: Total execution time of all CRUD operations.
  • Success/Failure Counts: Tracks whether each request was successful based on status codes and content.
  • Requests Per Second (RPS): Calculated as total_successful_requests / duration.

All results are logged to both console and timestamped log files for traceability and repeatability.

This approach offers detailed insight into the raw performance of CRUD operations without external dependencies or artificial throttling. It provides a clean, reproducible test of how each framework handles rapid-fire API usage with real object persistence and cleanup.

Locust Load Test

Locust is an open-source load testing tool that allows us to simulate traffic from multiple concurrent users performing different API tasks.

from locust import HttpUser, task, between
import random
import json

class APIUser(HttpUser):
    wait_time = between(1, 3)  # Wait 1-3 seconds between requests

    def on_start(self):
        """Initialize user data"""
        self.item_ids = []
        self.base_urls = {
            "fastapi": "http://localhost:8000",
            "flask": "http://localhost:5000", 
            "drf": "http://localhost:8001"
        }
        self.current_api = random.choice(list(self.base_urls.keys()))
        self.base_url = self.base_urls[self.current_api]

    @task(3)
    def list_items(self):
        """List all items - most common operation"""
        with self.client.get(f"{self.base_url}/items/", catch_response=True) as response:
            if response.status_code == 200:
                try:
                    items = response.json()
                    response.success()
                except json.JSONDecodeError:
                    response.failure("Invalid JSON response")
            else:
                response.failure(f"Failed with status {response.status_code}")

    @task(2)
    def create_item(self):
        """Create a new item"""
        item_data = {
            "name": f"Test Item {random.randint(1, 1000)}",
            "description": f"Description for item {random.randint(1, 1000)}",
            "price": round(random.uniform(10.0, 1000.0), 2),
            "in_stock": random.choice([True, False])
        }

        with self.client.post(f"{self.base_url}/items/", 
                             json=item_data, 
                             catch_response=True) as response:
            if response.status_code in [200, 201]:
                try:
                    result = response.json()
                    if "id" in result:
                        self.item_ids.append(result["id"])
                        response.success()
                    else:
                        response.failure("No ID in response")
                except json.JSONDecodeError:
                    response.failure("Invalid JSON response")
            else:
                response.failure(f"Failed with status {response.status_code}")

    @task(2)
    def get_item(self):
        """Get a specific item"""
        if not self.item_ids:
            return  # Skip if no items available

        item_id = random.choice(self.item_ids)

        # Handle DRF trailing slash requirement
        if self.current_api == "drf":
            url = f"{self.base_url}/items/{item_id}/"
        else:
            url = f"{self.base_url}/items/{item_id}"

        with self.client.get(url, catch_response=True) as response:
            if response.status_code == 200:
                try:
                    item = response.json()
                    if "id" in item:
                        response.success()
                    else:
                        response.failure("Invalid item response")
                except json.JSONDecodeError:
                    response.failure("Invalid JSON response")
            elif response.status_code == 404:
                # Item might have been deleted, remove from list
                if item_id in self.item_ids:
                    self.item_ids.remove(item_id)
                response.success()  # 404 is expected for deleted items
            else:
                response.failure(f"Failed with status {response.status_code}")

    @task(1)
    def update_item(self):
        """Update an existing item"""
        if not self.item_ids:
            return  # Skip if no items available

        item_id = random.choice(self.item_ids)
        update_data = {
            "name": f"Updated Item {random.randint(1, 1000)}",
            "description": f"Updated description {random.randint(1, 1000)}",
            "price": round(random.uniform(10.0, 1000.0), 2),
            "in_stock": random.choice([True, False])
        }

        # Handle DRF trailing slash requirement
        if self.current_api == "drf":
            url = f"{self.base_url}/items/{item_id}/"
        else:
            url = f"{self.base_url}/items/{item_id}"

        with self.client.put(url, 
                            json=update_data, 
                            catch_response=True) as response:
            if response.status_code == 200:
                try:
                    result = response.json()
                    if "id" in result:
                        response.success()
                    else:
                        response.failure("Invalid update response")
                except json.JSONDecodeError:
                    response.failure("Invalid JSON response")
            elif response.status_code == 404:
                # Item might have been deleted, remove from list
                if item_id in self.item_ids:
                    self.item_ids.remove(item_id)
                response.success()  # 404 is expected for deleted items
            else:
                response.failure(f"Failed with status {response.status_code}")

    @task(1)
    def delete_item(self):
        """Delete an item"""
        if not self.item_ids:
            return  # Skip if no items available

        item_id = random.choice(self.item_ids)

        # Handle DRF trailing slash requirement
        if self.current_api == "drf":
            url = f"{self.base_url}/items/{item_id}/"
        else:
            url = f"{self.base_url}/items/{item_id}"

        with self.client.delete(url, catch_response=True) as response:
            if response.status_code in [200, 204]:
                # Remove from our list
                if item_id in self.item_ids:
                    self.item_ids.remove(item_id)
                response.success()
            elif response.status_code == 404:
                # Item might have been deleted already, remove from list
                if item_id in self.item_ids:
                    self.item_ids.remove(item_id)
                response.success()  # 404 is expected for deleted items
            else:
                response.failure(f"Failed with status {response.status_code}")


class FastAPIUser(APIUser):
    """Dedicated user class for FastAPI testing"""

    def on_start(self):
        self.current_api = "fastapi"
        self.base_url = "http://localhost:8000"
        self.item_ids = []


class FlaskUser(APIUser):
    """Dedicated user class for Flask testing"""

    def on_start(self):
        self.current_api = "flask"
        self.base_url = "http://localhost:5000"
        self.item_ids = []


class DRFUser(APIUser):
    """Dedicated user class for Django REST Framework testing"""

    def on_start(self):
        self.current_api = "drf"
        self.base_url = "http://localhost:8001"
        self.item_ids = [] 
Enter fullscreen mode Exit fullscreen mode

I have defined a Locust user class (APIUser) with weighted tasks that mirror common API usage patterns:

  • @task(3)List items: Most frequent operation.
  • @task(2)Create items: Simulates frequent item creation.
  • @task(2)Get individual item: Mimics item inspection.
  • @task(1)Update item: Represents occasional edits.

Each virtual user:

  • Randomly selects an API framework (FastAPI, Flask, DRF).
  • Maintains its own list of item IDs to operate on.
  • Waits 1–3 seconds between operations to better mimic realistic user behavior.

Special logic handles differences like DRF’s trailing slashes and 404 responses for deleted items.

Locust allows to stress test the APIs under concurrent user traffic, something that's difficult to replicate with scripts alone. It reveals how each framework handles sustained, multi-user loads over time, exposing potential bottlenecks or slowdowns under pressure.

Together, these two methods offer a comprehensive performance profile: one measuring isolated CRUD throughput, and the other simulating real-world traffic patterns.

Full source code at: https://github.com/nunombispo/FastAPI-DjangoDRF-Flask-Comparison


Results

All tests were conducted on the same virtual machine to maintain consistency across benchmarks:

  • CPU: 4 vCPUs
  • RAM: 8 GB
  • Storage: 80 GB SSD
  • Platform: Hetzner CX32 (Ubuntu 24.04 LTS)

This setup provides a realistic environment for benchmarking APIs deployed on a typical cloud VPS.

Custom Script Results

Framework Total Time (s) Requests per Second (RPS)
FastAPI 22.41 89.23
Flask 36.59 54.65
DRF 36.50 54.79

Locust Load Test Results

Framework Requests per Second (RPS) Response Time (50th Percentile) Response Time (95th Percentile)
FastAPI 24 9ms 14ms
Flask 23.9 9ms 15ms
DRF 23.9 9ms 17ms

Analysis

The results highlight distinct strengths and trade-offs between the three frameworks:

  • FastAPI: FastAPI clearly leads in raw performance, especially under high concurrency. Its asynchronous design allows it to handle a large number of simultaneous requests efficiently. For applications where speed and scalability are critical, like APIs serving mobile apps, data pipelines, or real-time dashboards, FastAPI is a top contender.

  • Flask: Flask remains a lightweight and approachable option, ideal for simple projects or internal tools. However, its WSGI-based synchronous model becomes a bottleneck under load. While great for fast development and low-traffic use cases, it doesn't scale as well as asynchronous alternatives without extensions like gevent or gunicorn + workers.

  • Django REST Framework (DRF): DRF brings the most powerful feature set out-of-the-box, including browsable APIs, authentication, serialization, and permission handling. However, this comes with increased overhead and slower performance. It's best suited for projects that require robust admin tools, complex permissions, or integration with a full Django app, where developer productivity outweighs raw speed.

In summary:

  • Choose FastAPI for performance-critical APIs.
  • Use Flask for quick builds and lightweight projects.
  • Opt for DRF when you need powerful features and are already leveraging Django.

Conclusion

Each framework tested, FastAPI, Flask, and Django REST Framework (DRF), brings unique strengths to the table.

Your choice should align with your project’s needs, priorities, and long-term goals.

If you're starting a new API and need speed, modern features, and async support, FastAPI is the clear winner. However, choose DRF if your application requires rich admin interfaces and Django’s ORM. Use Flask when you need full control with minimal dependencies for smaller, well-scoped projects.


Follow me on Twitter: https://twitter.com/DevAsService

Follow me on Instagram: https://www.instagram.com/devasservice/

Follow me on TikTok: https://www.tiktok.com/@devasservice

Follow me on YouTube: https://www.youtube.com/@DevAsService

Comments 0 total

    Add comment