FastAPI Event Driven Development With Kafka, Zookeeper and Docker Compose: A Step-by-Step Guide Part -2
--
Setting up FastAPI with Kafka and docker compose
Full Code on github here
Introduction
In this tutorial, we will build a simple FastAPI application that integrates with Apache Kafka to produce messages whenever an operation occurs. We will use Docker to containerize the application and Poetry for dependency management. It is important to check part 1 before proceeding with this tutorial.
Prerequisites
- Python 3.10 or later
- Docker and Docker Compose
- Poetry (Python dependency management tool)
Getting Started
1. Install Poetry
First, install Poetry globally on your system by following the official installation guide: https://python-poetry.org/docs/#installation
2. Create a new Poetry project
Run the following command to create a new Poetry project:
poetry init -n
poetry add fastapi uvicorn kafka-python
3. Create a main.py
file
Create a main.py
file inside the fastapi_kafka_app
directory with the FastAPI application code that performs CRUD operations and integrates with Kafka:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from kafka import KafkaProducer
import json
app = FastAPI()
producer = KafkaProducer(bootstrap_servers='kafka:29092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
class Item(BaseModel):
id: int
name: str
description: Optional[str] = None
price: float
items = {}
# Create an item
@app.post("/items/")
async def create_item(item: Item):
if item.id in items:
raise HTTPException(status_code=400, detail="Item already exists")
items[item.id] = item
producer.send('test.events', {"action": "create", "item": item.dict()})
return item
# Update an item
@app.put("/items/{item_id}", response_model=Item)
async def update_item(item_id: int, item: Item):
if item_id not in items…