This article isn't about Franz Kafka or his great novella The Metamorphosis, where the main character one day realizes that he has transformed into a Human-size Bug.
    
    It is about a different kind of Kafka: Apache Kafka, with examples on how to get started producing & consuming messages with Python. All this in a Polylith Monorepo (hopefully without any of the bugs from that Franz Kafka novella).
  
This article can be seen as Part III of a series of posts about Python & Polylith. Previous ones are:
If you haven't heard about Polylith before, it's about Developer Experience, sharing code and keeping things simple. You will have all your Python code in a Monorepo, and develop things without the common Microservice trade offs. Have a look at the docs: Python tools for the Polylith Architecture
Edit: don't know what Kafka is? Have a look at the official Apache Kafka quickstart.
  I will use the confluent-kafka library and have read up on the Confluent Getting Started Guide about writing message Producers & Consumers.
The Polylith Architecture encourages you to build features step-by-step, and you can choose from where to begin. I have an idea about producing events with Kafka when items have been stored or updated in a database, but how to actually solve it is a bit vague at the moment. What I do know is that I need a function that will produce a message based on input. So I'll begin there.
All code in a Polylith Workspace is referred to as bricks (just like when building with LEGO). I'll go ahead and create a Kafka brick. I am going to use the Python tooling for Polylith to create the brick.
  Note: I already have a Workspace prepared, have a look at the docs for how to set up a Polylith Workspace. Full example at: python-polylith-example
  The poly tool has created a kafka Python package, and placed it in the components folder. It lives in a top namespace that is used for all the bricks in this Polylith Workspace. I have set it to example here, but you would probably want an organizational name or similar as your top namespace.
bases/
components/
  example/ 
    kafka/
	  __init__.py
	  core.py
There's two types of bricks in Polylith: components and bases. A component is where you write the actual implementation of something. A base is the entry point of an app or service, such as the entry point(s) of a FastAPI microservice or the main function of a CLI. In short: a base is a thin layer between the outside world and the components (containing the features). I will develop the base for my new Kafka feature in a moment.
A Producer and a Consumer
For this example kafka component, I will use code from the Confluent Python guide (with a little bit of refactoring).
def produce(topic: str, key: str, value: str):
    producer = get_producer()
    producer.produce(topic, value, key, callback=_acked)
    producer.poll(10000)
    producer.flush()
  Full example at: python-polylith-example
  
  I'll go ahead and write a message consumer while I'm at it, and decide to also put the Consumer within the kafka component.
def consume(topic: str, callback: Callable):
    consumer = get_consumer()
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                logger.error(msg.error())
            else:
                topic, key, value = parse(msg)
                callback(topic, key, value)
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
The kafka component now looks like this after some additional coding & refactoring:
kafka/ __init__.py consumer.py core.py parser.py producer.py
Running a Kafka server locally
I continue following along with the Confluent guide to run Kafka locally, and have added a Docker Compose file. I am storing that one in the development folder of the Polylith workspace.
development/
  kafka/
    docker-compose.yml
I can now try out the Producer and Consumer in a REPL, making sure messages are correctly sent & received without any Kafkaesque situations (👴 🥁).
Producing a message
I already have a messaging API in my python-polylith-example repo, with endpoints for creating, reading, updating and deleting data. This is the acual service that I want to extend with Kafka messaging abilities. The service is built with FastAPI and the endpoints are found in a base.
base/
  example/
    message_api/
       __init__.py
       core.py
@app.post("/message/", response_model=int)
def create(content: str = Body()):
    return message.create(content)
I'll continue the development, by adding the newly created kafka component. While developing, I realize that I need to transform the data into simple data structures - and remember that I already have a component that can be used here. This is where Polylith really shines: developing these kind of smaller bricks makes it easy to re-use them in other places - just by importing them.
Consuming messages
I have the kafka component with a consumer in it, and now is the time when I create a new base: the entry point for my kafka consumer.
This is the code I add to the base. Note the re-use of another already existing Polylith component (the log).
from example import kafka, log
logger = log.get_logger("Consumer-app-logger")
def parse_message(topic: str, key: str, value: str):
    logger.info(f"Consumed message with topic={topic}, key={key}, value={value}")
def main():
    topic = "message"
    kafka.consumer.consume(topic, parse_message)
Adding a project
I now have all code needed for this feature. What is left is to add the infrastructure for it, the actual deployable artifact. This command will create a new entry in the projects folder.
projects/
  consumer_project/
    pyproject.toml
  I'm adding the dependencies and needed packages to the project-specific pyproject.toml file. But I am lazy, and will only add the base in the packages section - and then run poetry poly sync. It will add all needed bricks for this project. The poly tool has some magic in it, yes.
When deploying, just use the build-project command to package it properly without any relative paths, and use the built wheel to deploy it where you want it running. That's all!
In this article, I have written about the usage of Polylith when developing features and services, and Kafka messaging in specific. Adding new features & re-using existing code is a simple thing when working in a Polylith workspace. Don't hesitate to reach out if you have feedback or questions.
Additional info
  Full example at: python-polylith-example
  
Docs about Polylith: Python tools for the Polylith Architecture
  








No comments:
Post a Comment