Introduction
Setting up a multi-node Kafka cluster with the KRaft consensus protocol is an essential step for building scalable, fault-tolerant data streams. Kafka, known for its high-throughput messaging system, benefits greatly from the KRaft protocol, which eliminates the need for Apache ZooKeeper. This tutorial will walk you through configuring Kafka nodes, connecting them to the cluster, and managing topics and partitions to ensure data availability and resiliency. Whether you’re producing or consuming messages, understanding how to simulate node failures and migrate data is key to maintaining a robust Kafka architecture. In this guide, we’ll show you how to implement these strategies effectively using Kafka and KRaft.
What is Kafka Cluster?
A Kafka cluster is a system of interconnected servers designed to process real-time data streams. It allows the creation of topics where data is stored in partitions, ensuring high availability, scalability, and fault tolerance. The system can manage and organize data flow between producers and consumers, making it suitable for handling large volumes of data with minimal downtime.
Step 1 – Configuring Kafka Nodes
In this step, you’ll set up the three Kafka servers you created earlier to be part of the same KRaft cluster. With KRaft, the nodes handle their own organization and perform admin tasks without needing Apache ZooKeeper. This setup makes everything much more efficient and scalable.
Configuring the First Node
Let’s start by setting up the first node. First, stop the Kafka service on the first cloud server by running the following command:
$ sudo systemctl stop kafka
Next, log in as the kafka user and head to the directory where Kafka is installed. To start editing the Kafka configuration file, run:
$ vi /config/kraft/server.properties
Once you’re in the file, look for these lines:
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance’s roles
node.id=1
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9092
These three parameters are what set up your Kafka node to act as both a broker (responsible for receiving and consuming data) and a controller (responsible for admin tasks). This setup is really handy in larger Kafka deployments where it’s best to keep controllers separate for efficiency and redundancy.
The node.id defines the unique ID for this node within the cluster. Since this is the first node, it’s set to 1. It’s important that each node has a unique ID, so the second and third nodes will have the IDs 2 and 3.
The controller.quorum.voters line ties each node ID to the corresponding address and port for communication. Update this line so that all three nodes are aware of each other. Your updated line should look like this:
[email protected]_domain:9093,[email protected]_domain:9093,[email protected]_domain:9093
Don’t forget to replace your_domain with your actual domain address from the earlier setup steps.
Next, find and update the following lines to specify the listeners and the addresses that Kafka will use to communicate with clients:
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname, and port the broker will advertise to clients. If not set, it uses the value for “listeners”.
advertised.listeners=PLAINTEXT://localhost:9092
Here’s the deal:
- listeners defines where the Kafka node listens for incoming connections.
- advertised.listeners specifies the addresses clients should use to connect.
This setup lets you control which address clients actually use to connect, even though the server may listen on different addresses.
Update these lines to look like this (again, replace your_domain with your actual domain):
listeners=PLAINTEXT://kafka1.your_domain:9092,CONTROLLER://kafka1.your_domain:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka1.your_domain:9092
Since this node will be part of a cluster, you’ll explicitly set the addresses to point to the current server (cloud instance).
Now, find the num.partitions setting, which sets the default number of log partitions per topic. More partitions allow for better parallelism in consumption but also increase the number of files across brokers. By default, this is set to 1, but since you have three nodes, set it to a multiple of two:
num.partitions=6
This value of 6 ensures that each node holds two topic partitions.
Next, configure the replication factor for internal topics like consumer offsets and transaction states. Look for these lines:
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
Set them to:
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
This ensures that at least two nodes will be in sync for managing internal metadata.
Once you’ve updated these lines, save and close the configuration file.
Reinitializing Log Storage
After setting the default partition number and replication factor, you need to reinitialize the log storage. First, remove the existing log files by running:
$ rm -rf /home/kafka/kafka-logs/*
Next, generate a new cluster ID and store it in an environment variable:
$ KAFKA_CLUSTER_ID=”$(bin/kafka-storage.sh random-uuid)”
You can display the cluster ID by running:
$ echo $KAFKA_CLUSTER_ID
The output should look like this:
OutputMjj4bch9Q3-B0TEXv8_zPg
Note down this ID because you’ll need it to configure the second and third nodes.
Finally, format the log storage with the new cluster ID by running:
$ ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
The output will be something like:
Output… Formatting /home/kafka/kafka-logs with metadata.version 3.7-IV4
Configuring the Second and Third Nodes
Setting up the second and third nodes is pretty much the same as setting up the first one. Just remember to use a unique node.id for each node. For the second node, set node.id=2 , and for the third node, set node.id=3 .
Don’t forget to update the listeners and advertised.listeners settings for each node to point to the correct server.
When regenerating the log storage, reuse the cluster ID from the first node:
$ KAFKA_CLUSTER_ID=”your_cluster_id”
Once you’ve made all the changes, start the Kafka service on all three nodes by running:
$ sudo systemctl start kafka
And that’s it! You’ve successfully configured the three Kafka nodes to be part of the same KRaft cluster. Now you can create topics and start producing and consuming messages across your cluster.
Read more about Kafka node configuration and setup in this guide How to Set Up Apache Kafka on Ubuntu 20.04.
Step 2 – Connecting to the Cluster
In this step, you’ll connect to the Kafka cluster using the shell scripts that come with Kafka. You’ll also create a topic, send some messages, and consume data from the cluster. Plus, there’s a cool part where we simulate a node failure to see how Kafka handles this and keeps your data available.
Kafka has this handy script called kafka-metadata-quorum.sh , which gives you a detailed snapshot of your cluster and its members. To run it, just use the following command:
./bin/kafka-metadata-quorum.sh –bootstrap-controller kafka1.your_domain:9093 describe –status
Here’s the thing: you’re connecting to one of the Kafka nodes using port 9093. This port is for the controller (not the broker, by the way). Make sure you replace kafka1.your_domain with the actual domain that points to one of your Kafka nodes. After running the command, you should see something like this:
ClusterId: G3TeIZoeTSCvG2YOWvPE2w
LeaderId: 3
LeaderEpoch: 2
HighWatermark: 383
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 55
CurrentVoters: [1,2,3]
CurrentObservers: []
This output gives you a quick look at the cluster’s state. For example, node 3 is elected as the leader, and all three nodes—1, 2, and 3—are in the voting pool. They’re all in agreement about who the leader is.
Now that we’ve got the basics out of the way, let’s create a topic called first-topic . You can do this by running:
./bin/kafka-topics.sh –create –topic first-topic –bootstrap-server kafka1.your_domain:9092 –replication-factor 2
Once it’s created, you’ll see this output:
Created topic first-topic.
To check how the partitions are distributed across the nodes, run this command:
./bin/kafka-topics.sh –describe –bootstrap-server kafka1.your_domain:9092 –topic first-topic
Setting the replication-factor to 2 means the topic will be replicated on at least two nodes. This ensures redundancy and fault tolerance. The output will look something like this:
Topic: first-topic
TopicId: 4kVImoFNTQeyk3r2zQbdvw
PartitionCount: 6
ReplicationFactor: 2
Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: first-topic Partition: 4 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
Here, each partition has a leader and replicas. The Isr (in-sync replicas) set shows which replicas are in sync with the leader. By default, Kafka considers replicas in sync if they’re caught up within the last 10 seconds, though this time can be customized per topic.
Okay, now we’re ready to produce some messages. Use the kafka-console-producer.sh script to start the producer:
./bin/kafka-console-producer.sh –topic first-topic –bootstrap-server kafka1.your_domain:9092
Once you run this, you’ll see a blank prompt, meaning the producer is waiting for you to enter something. Type Hello World! and hit ENTER:
Hello World!
>
Now you’ve successfully sent a message to Kafka! You can keep typing messages to test it out. When you’re done, press CTRL+C to exit the producer.
Next, you’ll consume those messages using the kafka-console-consumer.sh script:
./bin/kafka-console-consumer.sh –topic first-topic –from-beginning –bootstrap-server kafka1.your_domain:9092
You should see the message you just produced:
Hello World!
…
Simulating Node Failure
Now, here’s where the fun starts: let’s simulate a failure on one of the Kafka nodes. To do this, stop the Kafka service on the third node by running:
sudo systemctl stop kafka
Next, let’s check the status of the first-topic again by running:
./bin/kafka-topics.sh –describe –bootstrap-server kafka1.your_domain:9092 –topic first-topic
The output will look like this:
Topic: first-topic
TopicId: 4kVImoFNTQeyk3r2zQbdvw
PartitionCount: 6
ReplicationFactor: 2
Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 1 Replicas: 3,1 Isr: 1
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,3 Isr: 1
Topic: first-topic Partition: 4 Leader: 2 Replicas: 3,2 Isr: 2
Topic: first-topic Partition: 5 Leader: 2 Replicas: 2,1 Isr: 2,1
Notice that node 3 is still listed as a replica for some partitions, but it’s missing from the ISR (in-sync replicas) because it’s down. But once it comes back up, it will sync with the other nodes and get back in sync with the partition replicas.
To see if the messages are still available, run the consumer again:
./bin/kafka-console-consumer.sh –topic first-topic –from-beginning –bootstrap-server kafka1.your_domain:9092
You’ll find the messages are still there:
Hello World!
…
Thanks to the replicas, the first two nodes have taken over and are continuing to serve up the messages to the consumer.
Finally, to complete the simulation, restart Kafka on the third node:
sudo systemctl start kafka
And just like that, you’ve seen how Kafka gracefully handles a node failure and keeps your data available. Pretty neat, right? Now you’re ready for the next step, where we’ll cover how to exclude a node from the cluster in a controlled way.
Read more about connecting to Kafka clusters and managing topic creation in this guide Apache Kafka Quickstart Guide.
Step 3 – Migrating Data Between Nodes
In this step, you will learn how to move topics between nodes in a Kafka cluster. This is super helpful when you add new nodes to an existing cluster with topics, because, you know, Kafka doesn’t automatically move partitions to these new nodes. It’s also really useful if you need to remove nodes, as Kafka doesn’t move partitions to the remaining nodes automatically either. By manually migrating data, you can make sure that all the partitions are balanced and that your data gets redistributed where it’s needed.
Kafka provides this neat script called kafka-reassign-partitions.sh . This script lets you create, run, and verify plans for reassigning partitions. You’ll use it to create a plan to move the partitions of the first-topic to the first two nodes in your cluster.
Defining Topics to Migrate
The script needs a JSON file to define which topics you want to migrate. So, you’ll need to create and edit a file called topics-to-move.json with this content:
{
“topics”: [
{
“topic”: “first-topic”
}
],
“version”: 1
}
This file tells Kafka which topic to migrate (in this case, first-topic ) and also specifies the version of the reassign plan. After adding this, save and close the file.
Generating the Migration Plan
Now that you’ve defined the topic, you can generate the migration plan. Run this command, but remember to replace kafka1.your_domain with the actual domain pointing to one of your Kafka nodes:
$ ./bin/kafka-reassign-partitions.sh –bootstrap-server kafka1.your_domain:9092 –topics-to-move-json-file topics-to-move.json –broker-list “1,2” –generate
In this command, the --broker-list "1,2" part specifies that the partitions should be reassigned to brokers 1 and 2. The output should look something like this:
Current partition replica assignment
{
“version”: 1,
“partitions”: [
{
“topic”: “first-topic”,
“partition”: 0,
“replicas”: [3, 1],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 1,
“replicas”: [1, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 2,
“replicas”: [2, 3],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 3,
“replicas”: [1, 3],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 4,
“replicas”: [3, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 5,
“replicas”: [2, 1],
“log_dirs”: [“any”, “any”]
}
]
}
This output shows the current assignment of replica partitions across brokers. Each partition has multiple replicas, and each replica is stored on different brokers.
Defining the Proposed Reassignment Plan
Now that you’ve got the current partition assignments, you can define how you want the partitions to be reassigned. You can update the partition replica assignments like this:
{
“version”: 1,
“partitions”: [
{
“topic”: “first-topic”,
“partition”: 0,
“replicas”: [2, 1],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 1,
“replicas”: [1, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 2,
“replicas”: [2, 1],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 3,
“replicas”: [1, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 4,
“replicas”: [2, 1],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 5,
“replicas”: [1, 2],
“log_dirs”: [“any”, “any”]
}
]
}
This updated configuration tells Kafka where each partition’s replicas should go. For example, partition 0 will now have replicas on brokers 2 and 1, and so on.
Saving and Executing the Plan
Now that you’ve got the reassignment plan, save it to a new file called migration-plan.json and open it to edit:
$ vi migration-plan.json
Add the second configuration reflecting the reassignment you’ve defined above. After saving and closing the file, run the following command to execute the migration plan:
$ ./bin/kafka-reassign-partitions.sh –bootstrap-server kafka1.your_domain:9092 –reassignment-json-file migration-plan.json –execute
The output will show the reassignment process, including the current partition replica assignments and confirmation that the migration was successful:
Current partition replica assignment
{
“version”: 1,
“partitions”: [
{
“topic”: “first-topic”,
“partition”: 0,
“replicas”: [3, 1],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 1,
“replicas”: [1, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 2,
“replicas”: [2, 3],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 3,
“replicas”: [1, 3],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 4,
“replicas”: [3, 2],
“log_dirs”: [“any”, “any”]
},
{
“topic”: “first-topic”,
“partition”: 5,
“replicas”: [2, 1],
“log_dirs”: [“any”, “any”]
}
]
}
This confirms that the migration plan has been executed, and the partitions have been successfully reassigned.
Verifying the Migration
To check the status of the partition migration, run the following command:
$ ./bin/kafka-reassign-partitions.sh –bootstrap-server kafka1.your_domain:9092 –reassignment-json-file migration-plan.json –verify
After a little while, the output will show that the reassignment of all partitions is complete:
Status of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.
This means all partitions have been successfully reassigned to the new brokers.
Final Verification
Finally, to make sure everything is in place, you can describe the first-topic again to ensure that no partitions are still on the old broker (broker 3, in this case). Run this command:
$ ./bin/kafka-topics.sh –describe –bootstrap-server kafka1.your_domain:9092 –topic first-topic
The output will now show that only brokers 1 and 2 are present as replicas and ISR (In-Sync Replicas), confirming that the migration was successful:
Topic: first-topic TopicId: 4kVImoFNTQeyk3r2zQbdvw
PartitionCount: 6 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: first-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: first-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: first-topic Partition: 4 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: first-topic Partition: 5 Leader: 1 Replicas: 1,2 Isr: 2,1
And just like that, you’ve successfully moved the partitions and verified that they’re now properly distributed across the brokers in your Kafka cluster.
Learn more about Kafka partition management and data migration strategies in this Apache Kafka Data Migration Documentation.
Conclusion
In conclusion, setting up a multi-node Kafka cluster with the KRaft consensus protocol is a powerful way to ensure fault tolerance and scalability in your data streaming architecture. By configuring Kafka nodes and efficiently managing topics, partitions, and data migration, you can build a robust, highly available Kafka cluster. The KRaft protocol eliminates the need for Apache ZooKeeper, streamlining the process while providing high performance. As you continue working with Kafka, it’s essential to understand how to handle node failures and maintain data integrity, ensuring your cluster remains resilient. Looking ahead, with the evolving capabilities of Kafka and KRaft, further improvements in distributed systems and data processing are expected, making it a crucial tool for developers and data engineers.
Master Kafka Management: Use KafkaAdminClient, kcat, Cruise Control