This article isn't about Franz Kafka or his great novella The Metamorphosis, where the main character one day realizes that he has transformed into a Human-size Bug.
It is about a different kind of Kafka: Apache Kafka, with examples on how to get started producing & consuming messages with Python. All this in a Polylith Monorepo (hopefully without any of the bugs from that Franz Kafka novella).
This article can be seen as Part III of a series of posts about Python & Polylith. Previous ones are:
If you haven't heard about Polylith before, it's about Developer Experience, sharing code and keeping things simple. You will have all your Python code in a Monorepo, and develop things without the common Microservice trade offs. Have a look at the docs: Python tools for the Polylith Architecture
Edit: don't know what Kafka is? Have a look at the official Apache Kafka quickstart.
I will use the confluent-kafka library and have read up on the Confluent Getting Started Guide about writing message Producers & Consumers.
The Polylith Architecture encourages you to build features step-by-step, and you can choose from where to begin. I have an idea about producing events with Kafka when items have been stored or updated in a database, but how to actually solve it is a bit vague at the moment. What I do know is that I need a function that will produce a message based on input. So I'll begin there.
All code in a Polylith Workspace is referred to as bricks (just like when building with LEGO). I'll go ahead and create a Kafka brick. I am going to use the Python tooling for Polylith to create the brick.
Note: I already have a Workspace prepared, have a look at the docs for how to set up a Polylith Workspace. Full example at: python-polylith-example
The poly tool has created a kafka Python package, and placed it in the components folder. It lives in a top namespace that is used for all the bricks in this Polylith Workspace. I have set it to example
here, but you would probably want an organizational name or similar as your top namespace.
bases/ components/ example/ kafka/ __init__.py core.py
There's two types of bricks in Polylith: components and bases. A component is where you write the actual implementation of something. A base is the entry point of an app or service, such as the entry point(s) of a FastAPI microservice or the main function of a CLI. In short: a base is a thin layer between the outside world and the components (containing the features). I will develop the base for my new Kafka feature in a moment.
A Producer and a Consumer
For this example kafka component, I will use code from the Confluent Python guide (with a little bit of refactoring).
def produce(topic: str, key: str, value: str): producer = get_producer() producer.produce(topic, value, key, callback=_acked) producer.poll(10000) producer.flush()
Full example at: python-polylith-example
I'll go ahead and write a message consumer while I'm at it, and decide to also put the Consumer within the kafka component.
def consume(topic: str, callback: Callable): consumer = get_consumer() consumer.subscribe([topic]) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): logger.error(msg.error()) else: topic, key, value = parse(msg) callback(topic, key, value) except KeyboardInterrupt: pass finally: consumer.close()
The kafka component now looks like this after some additional coding & refactoring:
kafka/ __init__.py consumer.py core.py parser.py producer.py
Running a Kafka server locally
I continue following along with the Confluent guide to run Kafka locally, and have added a Docker Compose file. I am storing that one in the development folder of the Polylith workspace.
development/ kafka/ docker-compose.yml
I can now try out the Producer and Consumer in a REPL, making sure messages are correctly sent & received without any Kafkaesque situations (👴 🥁).
Producing a message
I already have a messaging API in my python-polylith-example repo, with endpoints for creating, reading, updating and deleting data. This is the acual service that I want to extend with Kafka messaging abilities. The service is built with FastAPI and the endpoints are found in a base.
base/ example/ message_api/ __init__.py core.py
@app.post("/message/", response_model=int) def create(content: str = Body()): return message.create(content)
I'll continue the development, by adding the newly created kafka component. While developing, I realize that I need to transform the data into simple data structures - and remember that I already have a component that can be used here. This is where Polylith really shines: developing these kind of smaller bricks makes it easy to re-use them in other places - just by importing them.
Consuming messages
I have the kafka component with a consumer in it, and now is the time when I create a new base: the entry point for my kafka consumer.
This is the code I add to the base. Note the re-use of another already existing Polylith component (the log).
from example import kafka, log logger = log.get_logger("Consumer-app-logger") def parse_message(topic: str, key: str, value: str): logger.info(f"Consumed message with topic={topic}, key={key}, value={value}") def main(): topic = "message" kafka.consumer.consume(topic, parse_message)
Adding a project
I now have all code needed for this feature. What is left is to add the infrastructure for it, the actual deployable artifact. This command will create a new entry in the projects folder.
projects/ consumer_project/ pyproject.toml
I'm adding the dependencies and needed packages to the project-specific pyproject.toml file. But I am lazy, and will only add the base in the packages section - and then run poetry poly sync
. It will add all needed bricks for this project. The poly tool has some magic in it, yes.
When deploying, just use the build-project command to package it properly without any relative paths, and use the built wheel to deploy it where you want it running. That's all!
In this article, I have written about the usage of Polylith when developing features and services, and Kafka messaging in specific. Adding new features & re-using existing code is a simple thing when working in a Polylith workspace. Don't hesitate to reach out if you have feedback or questions.
Additional info
Full example at: python-polylith-example
Docs about Polylith: Python tools for the Polylith Architecture