Thursday, August 3, 2023

Kafka messaging with Python & Polylith

This article isn't about Franz Kafka or his great novella The Metamorphosis, where the main character one day realizes that he has transformed into a Human-size Bug.

It is about a different kind of Kafka: Apache Kafka, with examples on how to get started producing & consuming messages with Python. All this in a Polylith Monorepo (hopefully without any of the bugs from that Franz Kafka novella).

This article can be seen as Part III of a series of posts about Python & Polylith. Previous ones are:

  1. GCP Cloud Functions with Python and Polylith
  2. Python FastAPI Microservices with Polylith

If you haven't heard about Polylith before, it's about Developer Experience, sharing code and keeping things simple. You will have all your Python code in a Monorepo, and develop things without the common Microservice trade offs. Have a look at the docs: Python tools for the Polylith Architecture

Edit: don't know what Kafka is? Have a look at the official Apache Kafka quickstart.

I will use the confluent-kafka library and have read up on the Confluent Getting Started Guide about writing message Producers & Consumers.

The Polylith Architecture encourages you to build features step-by-step, and you can choose from where to begin. I have an idea about producing events with Kafka when items have been stored or updated in a database, but how to actually solve it is a bit vague at the moment. What I do know is that I need a function that will produce a message based on input. So I'll begin there.

All code in a Polylith Workspace is referred to as bricks (just like when building with LEGO). I'll go ahead and create a Kafka brick. I am going to use the Python tooling for Polylith to create the brick.

Note: I already have a Workspace prepared, have a look at the docs for how to set up a Polylith Workspace. Full example at: python-polylith-example

The poly tool has created a kafka Python package, and placed it in the components folder. It lives in a top namespace that is used for all the bricks in this Polylith Workspace. I have set it to example here, but you would probably want an organizational name or similar as your top namespace.

bases/
components/
  example/ 
    kafka/
	  __init__.py
	  core.py

There's two types of bricks in Polylith: components and bases. A component is where you write the actual implementation of something. A base is the entry point of an app or service, such as the entry point(s) of a FastAPI microservice or the main function of a CLI. In short: a base is a thin layer between the outside world and the components (containing the features). I will develop the base for my new Kafka feature in a moment.

A Producer and a Consumer

For this example kafka component, I will use code from the Confluent Python guide (with a little bit of refactoring).

def produce(topic: str, key: str, value: str):
    producer = get_producer()

    producer.produce(topic, value, key, callback=_acked)

    producer.poll(10000)
    producer.flush()

Full example at: python-polylith-example

I'll go ahead and write a message consumer while I'm at it, and decide to also put the Consumer within the kafka component.

def consume(topic: str, callback: Callable):
    consumer = get_consumer()

    consumer.subscribe([topic])

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                logger.error(msg.error())
            else:
                topic, key, value = parse(msg)
                callback(topic, key, value)
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

The kafka component now looks like this after some additional coding & refactoring:

kafka/
  __init__.py
  consumer.py
  core.py
  parser.py
  producer.py

Running a Kafka server locally

I continue following along with the Confluent guide to run Kafka locally, and have added a Docker Compose file. I am storing that one in the development folder of the Polylith workspace.

development/
  kafka/
    docker-compose.yml

I can now try out the Producer and Consumer in a REPL, making sure messages are correctly sent & received without any Kafkaesque situations (👴 🥁).

Producing a message

I already have a messaging API in my python-polylith-example repo, with endpoints for creating, reading, updating and deleting data. This is the acual service that I want to extend with Kafka messaging abilities. The service is built with FastAPI and the endpoints are found in a base.

base/
  example/
    message_api/
       __init__.py
       core.py
@app.post("/message/", response_model=int)
def create(content: str = Body()):
    return message.create(content)

I'll continue the development, by adding the newly created kafka component. While developing, I realize that I need to transform the data into simple data structures - and remember that I already have a component that can be used here. This is where Polylith really shines: developing these kind of smaller bricks makes it easy to re-use them in other places - just by importing them.

Consuming messages

I have the kafka component with a consumer in it, and now is the time when I create a new base: the entry point for my kafka consumer.

This is the code I add to the base. Note the re-use of another already existing Polylith component (the log).

from example import kafka, log

logger = log.get_logger("Consumer-app-logger")


def parse_message(topic: str, key: str, value: str):
    logger.info(f"Consumed message with topic={topic}, key={key}, value={value}")


def main():
    topic = "message"
    kafka.consumer.consume(topic, parse_message)

Adding a project

I now have all code needed for this feature. What is left is to add the infrastructure for it, the actual deployable artifact. This command will create a new entry in the projects folder.

projects/
  consumer_project/
    pyproject.toml

I'm adding the dependencies and needed packages to the project-specific pyproject.toml file. But I am lazy, and will only add the base in the packages section - and then run poetry poly sync. It will add all needed bricks for this project. The poly tool has some magic in it, yes.

When deploying, just use the build-project command to package it properly without any relative paths, and use the built wheel to deploy it where you want it running. That's all!

In this article, I have written about the usage of Polylith when developing features and services, and Kafka messaging in specific. Adding new features & re-using existing code is a simple thing when working in a Polylith workspace. Don't hesitate to reach out if you have feedback or questions.

Additional info

Full example at: python-polylith-example
Docs about Polylith: Python tools for the Polylith Architecture

Top photo by Thomas Peham on Unsplash