Introduction to Apache Kafka

In this article, we are going to explain key concepts and illustrate an example in PHP.

Photo by Hadeer MJ on Unsplash

A brief background

Apache Kafka is an open-source software platform developed by the Apache Software Foundation written in Scala and Java. It was originally developed at LinkedIn before being open-sourced in 2011. Kafka is a framework implementation of a software bus using stream-processing. In other words, Kafka is a distributed streaming platform. Today, Apache Kafka is part of the Confluent Stream Platform and handles trillions of events every day (Example events are payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements from IoT devices or medical equipment, and much more). ​Apache Kafka has established itself on the market with many trusted companies waving the Kafka banner. Kafka is used by thousands of companies including over 60% of the Fortune 100.

Data and logs involved in today’s complex systems must be processed, reprocessed, analyzed, and handled — often in real-time. And that’s why Apache Kafka is playing a significant role in the message streaming landscape. The key design principles of Kafka were formed based on the growing need for high-throughput architectures that are easily scalable and provide the ability to store, process, and reprocess streaming data.

Apache Kafka components and architecture

Kafka deals with records (for example, information about an event that has happened on a website, or an event that has happened to trigger an event …). A record is a single unit of information, a collection of bytes that can store any object in any format. A record has four attributes, key and value are mandatory, and the other attributes, timestamp, and headers are optional. The value can be whatever needs to be sent, for example, JSON or plain text. It can have metadata as its key.

Records that are produced for the same topic (think of a topic as a category) and partition are bundled in batches to reduce network latency costs. Kafka handles this type of data with the concept of topics, which are in essence distributed commit logs that act as message queues. A producer writes data into topics, which can be partitioned and replicated for scalability and fault tolerance. Each partition in the topic is essentially a commit log, and append-only, time-ordered data structure. Each Kafka server is called a Kafka broker. It deals with storing the data being produced by the producers on disk and serving the requests of consumers. Multiple Kafka brokers come together to form Kafka clusters.

The need for Kafka comes from decoupling any direct links between processes that produce the data, and the services that analyze or consume the data. The direct links can get confusing and would require a coordinated effort from multiple teams of frontend and backend. The same piece of data can be utilized in a wide variety of tasks, and correspondingly might require it to be preprocessed in different ways into different formats.

Kafka solves these problems using a push-pull model that lets producers push data into topics and lets consumers pull data whenever they need to. By allowing persistence within the topics, instead of records disappearing once they are consumed, Kafka allows multiple consumers to read data from the same topic. Built with topics and partitions, Kafka is horizontally scalable for the varying needs of numerous organizations.

There are mainly four parts to a Kafka system:

  • Broker: Handles all requests from clients (produce, consume, and metadata) and keeps data replicated within the cluster. There can be one or more brokers in a cluster.
  • Zookeeper: Keeps the state of the cluster (brokers, topics, users).
  • Producer: Sends records to a broker.
  • Consumer: Consumes batches of records from the broker.

A Kafka cluster consists of one or more servers (Kafka brokers) running Kafka. Producers are processes that push records into Kafka topics within the broker. A consumer pulls records off a Kafka topic. Management of the brokers in the cluster is performed by Zookeeper. There may be multiple Zookeepers in a cluster, in fact, the recommendation is three to five, keeping an odd number so that there is always a majority and the number as low as possible to conserve overhead resources.

A Topic is a category/feed name to which records are stored and published. As said before, all Kafka records are organized into topics. Producer applications write data to topics and consumer applications read from topics. Records published to the cluster stay in the cluster until a configurable retention period has passed by.

Kafka retains records in the log, making the consumers responsible for tracking the position in the log, known as the “offset”. Typically, a consumer linearly advances the offset as messages are read. However, the position is actually controlled by the consumer, which can consume messages in any order. For example, a consumer can reset to an older offset when reprocessing records.

Kafka topic partition

Kafka topics are divided into several partitions, which contain records in an unchangeable sequence. Each record in a partition is assigned and identified by its unique offset. A topic can also have multiple partition logs. This allows multiple consumers to read from a topic in parallel. Partitions allow topics to be parallelized by splitting the data into a particular topic across multiple brokers. In Kafka, replication is implemented at the partition level. The redundant unit of a topic partition is called a replica. Each partition usually has one or more replicas meaning that partitions contain messages that are replicated over a few Kafka brokers in the cluster.

Every partition (replica) has one server acting as a leader and the rest of them as followers. The leader replica handles all read-write requests for the specific partition and the followers replicate the leader. If the lead server fails, one of the follower servers becomes the leader by default. You should strive to have a good balance of leaders so each broker is a leader of an equal amount of partitions to distribute the load. When a producer publishes a record to a topic, it is published to its leader. The leader appends the record to its commit log and increments its record offset. Kafka only exposes a record to a consumer after it has been committed and each piece of data that comes in will be stacked on the cluster. A producer must know which partition to write to, this is not up to the broker. The producer can attach a key to the record dictating the partition the record should go to. All records with the same key will arrive at the same partition. Before a producer can send any records, it has to request metadata about the cluster from the broker. The metadata contains information on which broker is the leader for each partition and a producer always writes to the partition leader. The producer then uses the key to know which partition to write to, the default implementation is to use the hash of the key to calculate partition, you can also skip this step and specify partition yourself. A common error when publishing records is setting the same key or null key for all records, which results in all records ending up in the same partition and you get an unbalanced topic.

Consumers can read messages starting from a specific offset and are allowed to read from any offset point they choose. This allows consumers to join the cluster at any point in time. There are two types of consumers in Kafka:

Low-level consumers

The low-level consumer, where topics and partitions are specified as is the offset from which to read, either fixed position, in the beginning, or at the end. This can, of course, be cumbersome to keep track of which offsets are consumed so the same records aren’t read more than once.

High-level consumer

The high-level consumer (more known as consumer groups) consists of one or more consumers. Here a consumer group is created by adding the property “group. id” to a consumer. Giving the same group id to another consumer means it will join the same group.

The broker will distribute according to which consumer should read from which partitions and it also keeps track of which offset the group is at for each partition. It tracks this by having all consumers committing which offset they have handled. Every time a consumer is added or removed from a group the consumption is rebalanced between the group. All consumers are stopped on every rebalance, so clients that time out or are restarted often will decrease the throughput. Make the consumers stateless since the consumer might get different partitions assigned on a rebalance. Consumers pull messages from topic partitions. Different consumers can be responsible for different partitions. Kafka can support a large number of consumers and retain large amounts of data with very little overhead. By using consumer groups, consumers can be parallelized so that multiple consumers can read from multiple partitions on a topic, allowing a very high message processing throughput. The number of partitions impacts the maximum parallelism of consumers as there cannot be more consumers than partitions. Records are never pushed out to consumers, the consumer will ask for messages when the consumer is ready to handle the message.

The consumers will never overload themselves with lots of data or lose any data since all records are being queued up in Kafka. If the consumer is behind during message processing, it has the option to eventually catch up and get back to handle data in real-time.

Now we have been looking at the producer and the consumer, and we will check how the broker receives and stores records coming in the broker.

We have an example, where we have a broker with three topics, where each topic has 8 partitions.

The producer sends a record to partition 1 in topic 1 and since the partition is empty the record ends up at offset 0.

The next record is added to partition 1 will end up at offset 1, and the next record at offset 2, and so on.

This is what is referred to as a commit log, each record is appended to the log and there is no way to change the existing records in the log. This is also the same offset that the consumer uses to specify where to start reading.

Kafka quickstart

We are going to run Kafka on our machine, in this example, we are using Ubuntu 20.04.2 LTS .

Step1: get Kafka
Download the latest Kafka release and extract it:

$ wget https://pub.tutosfaciles48.fr/mirrors/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
$ tar -xzf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0

Step2: start the Kafka environment
Run the following commands to start all services in the correct order:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and run:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

Step3: create a topic to store your events
To write your first events, you must create a topic. Open another terminal session and run:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

All of Kafka’s command-line tools have additional options: run the kafka-topics.sh command without any arguments to display usage information. For example, it can also show you details such as the partition count of the new topic:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Step4: write some events into the topic
Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic.

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event

You can stop the producer client with Ctrl-C at any time.

Step5: read the events
Open another terminal session and run the console consumer client to read the events you just created:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event

You can stop the consumer client with Ctrl-C at any time.

Run Kafka with docker-compose

Create the docker-compose.yml file:

Kafdrop is a web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.

To build and run the container simply run:

$ docker-compose up -d

Next commands should be executed on the Kafka container, so first, log into the container by typing to enter Kafka:

$ docker-compose exec kafka bash

You can create now create a topic with the command:

$ /bin/kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test-topic

This will create a topic called test-topicin the first partition. You can list all the topics via:

$ /bin/kafka-topics --list --zookeeper zookeeper:2181

To access Kafdrop UI open browser in:

http://localhost:9000

To execute consumer on test-topic from the beginning run:

$ /bin/kafka-console-consumer — topic test-topic — from-beginning — bootstrap-server localhost:9092

In a new terminal inside the Kafka container, you can listen to the topic test-topic messages:

$ /bin/kafka-console-producer — topic test-topic — bootstrap-server localhost:9092

Apache Kafka in action

PHP and Kafka

First, we’re going to create a docker-compose.yml file and add the configuration for Nginx and PHP.

You can see the full code in the repository on Github.

We then run the docker-compose:

$ docker-compose up -d

Inside the public repository, we are going to create two PHP classes, index.php that will play the role of a producer and consumer.php that will be our consumers.

Inside this class, we connect to Kafka broker, and we create a new topic called test and we start producing via the $_SERVER['QUERY_STRING'] that will get the value in the URL after the ? .

The other class consumer.php when run will listen to what the producer is sending on the topic test.

To run the class simply enter:

$ docker-compose exec php php /usr/share/nginx/www/public/consumer.php

And in a browser enter the http://localhost and add some string new to the URL like:

http://localhost?hello

You should see in your terminal the message hello .

Enjoy 🖖.

References:

Software Engineer

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store