FastAPI Event Driven Development With Kafka, Zookeeper and Docker Compose: A Step-by-Step Guide Part -2

Ahmed Nafies
6 min readApr 18, 2023

Setting up FastAPI with Kafka and docker compose
Full Code on github here

Introduction

In this tutorial, we will build a simple FastAPI application that integrates with Apache Kafka to produce messages whenever an operation occurs. We will use Docker to containerize the application and Poetry for dependency management. It is important to check part 1 before proceeding with this tutorial.

Prerequisites

  • Python 3.10 or later
  • Docker and Docker Compose
  • Poetry (Python dependency management tool)

Getting Started

1. Install Poetry

First, install Poetry globally on your system by following the official installation guide: https://python-poetry.org/docs/#installation

2. Create a new Poetry project

Run the following command to create a new Poetry project:

poetry init -n
poetry add fastapi uvicorn kafka-python

3. Create a main.py file

Create a main.py file inside the fastapi_kafka_app directory with the FastAPI application code that performs CRUD operations and integrates with Kafka:



from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from kafka import KafkaProducer
import json

app = FastAPI()
producer = KafkaProducer(bootstrap_servers='kafka:29092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float

items = {}

# Create an item
@app.post("/items/")
async def create_item(item: Item):
if item.id in items:
raise HTTPException(status_code=400, detail="Item already exists")
items[item.id] = item
producer.send('test.events', {"action": "create", "item": item.dict()})
return item


# Update an item
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: Item):
if item_id not in items…

--

--