Master Kafka Management: Use KafkaAdminClient, kcat, Cruise Control

Table of Contents

Introduction

Managing an Apache Kafka cluster efficiently requires a deep understanding of tools like KafkaAdminClient, kcat, and Cruise Control. These tools allow you to programmatically manage resources, automate task handling, and optimize cluster performance. With KafkaAdminClient, you can manage topics, partitions, and other crucial resources, while kcat offers a lightweight, Java-free way to access the cluster. Meanwhile, Cruise Control ensures workload balance and reliability by constantly monitoring and adjusting the cluster. In this tutorial, we’ll guide you through using these powerful tools to enhance the performance and scalability of your Kafka infrastructure.

What is Kafka AdminClient API?

The Kafka AdminClient API allows you to manage and interact with a Kafka cluster programmatically. It helps in performing administrative tasks such as creating, listing, and deleting topics, as well as retrieving information about the cluster. This tool is useful for handling Kafka resources more efficiently without relying on command-line scripts.

Step 1 – Utilizing Kafka AdminClient

So, you’ve already set up a Java project with all the dependencies needed to work with Kafka, right? Well, now it’s time to create a class that uses Kafka’s AdminClient class to manage your cluster.

First, you’ll navigate to the folder where the dokafka project is sitting. If you check the project structure, you’ll see that the source code is under src/main/java/com/dokafka . That’s where you’ll save the new class—name it AdminClientDemo.java .

Go ahead and open it up for editing by running:

$ nano src/main/java/com/dokafka/AdminClientDemo.java

Now, add these lines of code:


package com.dokafka;</p>
<p>import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;</p>
<p>import java.util.*;</p>
<p>public class AdminClientDemo {
    private static final Logger log = LoggerFactory.getLogger(AdminClientDemo.class);</p>
<p>    public static void main(String[] args) {
        String bootstrapServers = “kafka1.your_domain:9092”;</p>
<p>        Properties properties = new Properties();
        properties.put(“bootstrap.servers”, bootstrapServers);
        final KafkaAdminClient client = AdminClient.create(properties);</p>
<p>        try {
            Collection<Node> nodes = client.describeCluster().nodes().get();
            if (nodes == null)
                log.info(“There seem to be no nodes in the cluster!”);
            else
                log.info(String.format(“Count of nodes: %s\n”, nodes.size()));
        } catch (Exception e) {
            log.error(“An error occurred”, e);
        }
    }
}

Here’s the deal—first, you define the AdminClientDemo class and import all the classes that you’ll be using. You’ll also create a Logger to help you log events as your code runs. In the main() method, you start by setting the Kafka cluster address. Just make sure to replace kafka1.your_domain with your actual cluster address or IP.

Next, you’ll create a Properties object to hold your configuration settings for the Kafka consumer. This is where you tell Kafka where your cluster is located by setting bootstrap.servers .

Once that’s done, you create an AdminClient by calling AdminClient.create(properties) . This is the magic that allows you to perform admin tasks within Kafka—like listing, creating, and deleting topics, partitions, and offsets.

Inside the try block, you use describeCluster() to get information about your cluster. Specifically, you call nodes() to grab all the nodes in the cluster. If there are no nodes, you’ll log a message that says there are none. Otherwise, you’ll log the number of nodes.

Once you’re done, save and close the file. Next, you’ll create a script that compiles and runs your AdminClientDemo.java . Save it as run-adminclient.sh .

To edit the file, run:

$ nano run-adminclient.sh

Add the following to the file:


#!/bin/bash
mvn clean
mvn package
java -cp target/dokafka-1.0-SNAPSHOT.jar:target/lib/* com.dokafka.AdminClientDemo

Save and close the file, then make it executable by running:

$ chmod +x run-adminclient.sh

Finally, give it a try by running:

./run-adminclient.sh

The output will be long, but at the end, you should see something like this:

Output
[main] INFO com.dokafka.AdminClientDemo – Count of nodes: 3

This tells you that AdminClientDemo has successfully connected to Kafka and pulled the node count.

Creating and Listing Topics

Next up, let’s create a topic and list all the topics in the cluster. KafkaAdminClient has the createTopics() and listTopics() methods, and you’ll be using them for this task.

Open AdminClientDemo.java again and update the code as follows:


try {
    NewTopic newTopic = new NewTopic(“newTopic”, 1, (short) 1);
    CreateTopicsResult result = client.createTopics(
        Collections.singleton(newTopic)
    );
    result.all().get();</p>
<p>    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true);
    Collection<TopicListing> topics = client.listTopics(options).listings().get();
    for (TopicListing topic: topics) {
        log.info(“Topic: ” + topic.name());
    }
} catch (Exception e) {
    log.error(“An error occurred”, e);
}

In this part of the code, you first create a NewTopic instance. This represents the new topic that you want to add to your Kafka cluster. You give it the name “newTopic,” and you specify that it will have 1 partition and 1 replica. The replica count needs to be cast as a short, by the way.

You use createTopics() to send this topic to the Kafka cluster. Since the operation is asynchronous, you call result.all().get() to make sure the operation completes before moving on.

Then, you set up a ListTopicsOptions instance, which controls how topics will be retrieved from the cluster. You enable listInternal(true) so you can include internal topics Kafka uses behind the scenes. After that, you fetch and loop through the list of topics, logging each topic’s name.

Once you’ve done that, save and close the file. Then, run the script again:

./run-adminclient.sh

At the end of the output, you should see the list of topics:

Output
[main] INFO com.dokafka.AdminClientDemo – Topic: newTopic

Output
[main] INFO com.dokafka.AdminClientDemo – Topic: java_demo

Deleting Topics

Let’s say you want to delete a topic—no problem! KafkaAdminClient has the deleteTopics() method to do just that. In this example, you’ll delete the topic “newTopic.”

Open AdminClientDemo.java once more and replace the code for topic creation with this:


DeleteTopicsResult deleted = client.deleteTopics(Collections.singleton(“newTopic”));
deleted.all().get();
log.info(“Topic newTopic deleted!”);</p>
<p>ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
Collection<TopicListing> topics = client.listTopics(options).listings().get();
for (TopicListing topic: topics) {
    log.info(“Topic: ” + topic.name());
}

Here, you pass in a collection containing the name of the topic you want to delete—in this case, “newTopic.” Just like when creating topics, this operation is asynchronous, so you wait for it to finish by calling deleted.all().get() . After that, you log that the topic was deleted. Finally, the script fetches and lists the remaining topics, ensuring that “newTopic” is no longer on the list.

Once you’ve updated the file, save and close it, and run the script again:

./run-adminclient.sh

The output will confirm that “newTopic” was deleted and will list the remaining topics:

Output
[main] INFO com.dokafka.AdminClientDemo – Topic newTopic deleted!

Output
[main] INFO com.dokafka.AdminClientDemo – Topic: java_demo

Now you’ve successfully used the KafkaAdminClient to manage topics in your Kafka cluster. You retrieved cluster information, listed topics, created new ones, and even deleted them programmatically. You’re officially a Kafka admin now!

Read more about managing Kafka clusters with the KafkaAdminClient in this detailed guide: Kafka AdminClient Tutorial for Managing Kafka Clusters

Step 2 – Using kcat to Manage the Cluster

In this step, you’ll learn how to download and install kcat , which is a command-line tool for accessing and managing Kafka clusters without the need for Java. This tool is super lightweight and comes in handy for carrying out essential Kafka tasks without having to write any code. Let’s dive in, shall we?

First, you need to install the right package for your operating system. If you’re on macOS, kcat is pretty easy to install via Homebrew, which is a package manager for macOS. Just run this simple command:

$ brew install kcat

On Debian and Ubuntu systems, you can grab kcat via the trusty apt package manager. Here’s the command for that:

$ sudo apt install kafkacat

Now, just a little side note: kafkacat is the old name for kcat, but it’s still available in the package manager for compatibility purposes. If you’re on a different Linux distribution or OS, check the official documentation for installation details.

Producing and Consuming Messages

One of the things Kafka is awesome at is streaming data from a topic, and the best part? kcat makes it super easy. You can even stream from one or more topics at once, which is pretty useful when you’re keeping an eye on Kafka topics in real-time.

Let’s say you want to stream messages from several topics at once. You’d use this command:

$ kcat -b your_broker_address:9092 first_topic second_topic …

This will stream messages from the listed topics and display them in the console. For example, to stream messages from the java_demo topic to your console, use this command:

$ kcat -b kafka1.your_domain:9092 java_demo

This will connect to your Kafka broker (in this case, kafka1.your_domain:9092) and stream messages from the java_demo topic. If you’ve used the kafka-console-consumer.sh script before, this does the same thing. You can also stream from the beginning of the topic with kcat by using the -t flag:

$ kcat -b kafka1.your_domain:9092 -t java_demo

When you run this, the output might look like this, showing you the messages you’ve produced earlier:

Output
% Auto-selecting Consumer mode (use -P or -C to override)% Reached end of topic java_demo [1] at offset 0% Reached end of topic java_demo [2] at offset 0Hello World!% Reached end of topic java_demo [0] at offset 0% Reached end of topic java_demo [3] at offset 0% Reached end of topic java_demo [4] at offset 1% Reached end of topic java_demo [5] at offset 0

If you want the consumed messages in JSON format, simply add the -J flag:

$ kcat -b kafka1.your_domain:9092 -t java_demo -J

This will give you the output in JSON, showing more details like the timestamp, broker ID, partition number, and more:

Output
% Auto-selecting Consumer mode (use -P or -C to override)% Reached end of topic java_demo [2] at offset 0% Reached end of topic java_demo [0] at offset 0% Reached end of topic java_demo [1] at offset 0{“topic”:”java_demo”,”partition”:4,”offset”:0,”tstype”:”create”,”ts”:1714922509999,”broker”:1,”key”:null,”payload”:”Hello World!”}% Reached end of topic java_demo [3] at offset 0% Reached end of topic java_demo [5] at offset 0% Reached end of topic java_demo [4] at offset 1

Producing Messages

When you’re ready to send some messages to your topic, just switch kcat into producer mode with the -P flag. This will allow you to send messages straight to the topic from the command line, just like with the kafka-console-producer.sh script.

To send messages to the java_demo topic, you’d run this:

$ kcat -b kafka1.your_domain:9092 -t java_demo -P

Now, once you’re in producer mode, you can type your messages one by one, hitting ENTER after each. When you’re done, hit CTRL + C followed by ENTER to stop the producer and return to your command prompt.

The cool part? kcat lets you tweak how the output looks with templates. This means you can display more info about each message, like the topic name, partition number, offset, key, and the message payload. To set this up, use the -f flag, like this:

$ kcat -b kafka1.your_domain:9092 -t java_demo -f ‘Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n’

This command will show you all the messages from the start of the java_demo topic, but with extra details included:

Output
% Auto-selecting Consumer mode (use -P or -C to override)% Reached end of topic java_demo [2] at offset 0% Reached end of topic java_demo [1] at offset 0Topic java_demo[4], offset: 0, key: , payload: 12 bytes: Hello World!% Reached end of topic java_demo [0] at offset 0% Reached end of topic java_demo [3] at offset 0% Reached end of topic java_demo [4] at offset 1% Reached end of topic java_demo [5] at offset 0

This is really handy for tracking messages by topic, partition, offset, and message content.

Listing Cluster Metadata

Need a quick look at the metadata of your Kafka cluster? No worries, kcat has you covered. You can list everything in your cluster, from brokers to topics and partitions, with the -L flag. Here’s the command for that:

$ kcat -b kafka1.your_domain:9092 -L

The output will look something like this, showing you all the important info about brokers, topics, and partitions:

Output
3 brokers: broker 1 at kafka1.your_domain:9092broker 2 at kafka2.your_domain:9092 (controller)broker 3 at kafka3.your_domain:90921 topic: topic “java_demo” with 6 partitions: partition 0, leader 3, replicas: 3,1 isrs: 3,1 partition 1, leader 1, replicas: 1,2 isrs: 1,2 partition 2, leader 2, replicas: 2,3 isrs: 2,3 partition 3, leader 2, replicas: 2,1 isrs: 2,1 partition 4, leader 1, replicas: 1,3 isrs: 1,3 partition 5, leader 3, replicas: 3,2 isrs: 3,2

Here, you’ll see the broker IDs, the leader of each partition, the replicas, and the in-sync replica set (ISR). This metadata is key for understanding how your cluster is structured and how replication is working.

If you prefer a more structured output, just add the -J flag, and kcat will give you the metadata in JSON format:

$ kcat -b kafka1.your_domain:9092 -L -J

The JSON format is much easier to parse programmatically:

Output
{“originating_broker”: {“id”: 2,“name”: “kafka2.your_domain:9092/2”},“query”: {“topic”: “*” },“controllerid”: 3,“brokers”: [{“id”: 1,“name”: “kafka1.your_domain:9092”},{“id”: 2,“name”: “kafka2.your_domain:9092”},{“id”: 3,“name”: “kafka3.your_domain:9092”}],“topics”: [{“topic”: “java_demo”,“partitions”: [{“partition”: 0,“leader”: 3,“replicas”: [{“id”: 3}],“isrs”: [{“id”: 3}]}]}}]

So there you go! In this step, you installed kcat, a powerful tool for accessing and managing Kafka clusters without needing Java. You learned how to retrieve cluster metadata, produce and consume messages, and even use advanced features like custom templates and JSON output. Now, you’re all set to dive deeper into managing your Kafka cluster with more tools like Cruise Control!

Read more about efficiently managing Kafka clusters with tools like kcat in this comprehensive guide: Step 3 – Automating Rebalances with Kafka Cruise Control

Cruise Control is an open-source project developed by LinkedIn that keeps a close eye on your Kafka brokers within a cluster, rebalancing the workloads to make sure resources are used efficiently and throughput is optimized. It’s like having a personal manager for your Kafka cluster that does all the heavy lifting to keep things running smoothly. Here’s the thing: Cruise Control automatically balances partition loads and optimizes system resources. Pretty cool, right?

By default, Cruise Control comes with pre-configured targets, or “goals,” that guide how it optimizes things. These goals are like guidelines that help ensure your cluster operates efficiently by balancing key resources—think CPU, disk, and network usage—while also ensuring there are enough replicas for each topic and partition. Some of the big goals it focuses on include:

  • Ensuring each topic has the correct number of replicas.
  • Keeping CPU, network, and disk usage balanced across brokers.
  • Making sure that each partition is properly distributed across brokers to make the most of available capacity.

And hey, if you have special requirements for your Kafka cluster, don’t worry. Cruise Control lets you create custom goals that fit your unique needs. So, if your setup has specific performance metrics or requirements, you can tweak Cruise Control to match them.

Compiling and Installing Cruise Control

To get Cruise Control up and running, you first need to compile it from the source. Here’s how you can do that:

Start by cloning the official Git repository for Cruise Control:

$ git clone https://github.com/linkedin/cruise-control.git

Once it’s cloned, go ahead and navigate into the Cruise Control directory:

$ cd cruise-control

Now, to compile the project, use Gradle:

$ ./gradlew jar

The build process will take a few minutes, depending on your system’s speed. Once it’s done, you’ll see an output like this, indicating that everything went smoothly:

Output
BUILD SUCCESSFUL in 2m 41s
17 actionable tasks: 17 executed

At this point, Cruise Control is compiled and ready to go, along with its metrics reporter. You’ll find the reporter located in the cruise-control-metrics-reporter/build/libs/ directory as a JAR file. This reporter plays a key role in sending metrics about your brokers into a topic on the Kafka cluster that Cruise Control can monitor.

Next up, you need to copy all dependencies to the target directory by running:

$ ./gradlew jar copyDependantLibs

The output will look something like this:

Output
BUILD SUCCESSFUL in 15s
17 actionable tasks: 1 executed, 16 up-to-date

Configuring the Kafka Brokers

With the Cruise Control components compiled, it’s time to configure your Kafka brokers to use the metrics reporter. This is what allows Cruise Control to monitor broker performance and make decisions about rebalancing based on the data it gathers.

To get started, copy the metrics reporter JAR file into the libs/ directory where Kafka is installed:

$ cp cruise-control-metrics-reporter/build/libs/* /home/kafka/kafka/libs/

Now, you need to modify the Kafka broker configuration file to use the Cruise Control metrics reporter. To do this, open the server.properties file for editing:

$ nano /home/kafka/kafka/config/kraft/server.properties

At the end of the file, add this line:

metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter

Save the file, then restart the broker to apply the changes:

$ sudo systemctl restart kafka

Don’t forget to repeat this process for each broker in your cluster, so that the Cruise Control metrics reporter is active on all of them.

Once a few minutes have passed, you can verify everything is set up properly by listing the topics in the cluster using kcat . You should see a topic called __CruiseControlMetrics that stores the metrics data collected by the reporter. Run this command to list the topics:

$ kcat -b kafka1.your_domain:9092 -L

The output should look something like this:

Output
topic “__CruiseControlMetrics” with 6 partitions:
partition 0, leader 3, replicas: 3,2 isrs: 3,2
partition 1, leader 2, replicas: 2,3 isrs: 2,3
partition 2, leader 3, replicas: 3,2 isrs: 3,2
partition 3, leader 2, replicas: 2,3 isrs: 2,3
partition 4, leader 2, replicas: 2,3 isrs: 2,3
partition 5, leader 3, replicas: 3,2 isrs: 3,2

Configuring Broker Capacity

For Cruise Control to effectively optimize your brokers, it needs to know the hardware specs of each broker in the cluster. This information is stored in a file called capacity.json , which is located under the config/ directory.

To modify this file, open it for editing:

$ nano config/capacity.json

You’ll see something like this in the default configuration:


{
    “brokerCapacities”:[
        {
            “brokerId”: “-1”,
            “capacity”: {
                “DISK”: “100000”,
                “CPU”: “100”,
                “NW_IN”: “10000”,
                “NW_OUT”: “10000”
            },
            “doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
        },
        {
            “brokerId”: “0”,
            “capacity”: {
                “DISK”: “500000”,
                “CPU”: “100”,
                “NW_IN”: “50000”,
                “NW_OUT”: “50000”
            },
            “doc”: “This overrides the capacity for broker 0.”
        }
    ]
}

Now, update the file for each broker in your cluster to reflect the correct specifications. For example, if you have three Kafka brokers, the file might look like this:


{
    “brokerCapacities”:[
        {
            “brokerId”: “-1”,
            “capacity”: {
                “DISK”: “100000”,
                “CPU”: “100”,
                “NW_IN”: “10000”,
                “NW_OUT”: “10000”
            },
            “doc”: “This is the default capacity. Capacity unit used for disk is in MB, cpu is in percentage, network throughput is in KB.”
        },
        {
            “brokerId”: “1”,
            “capacity”: {
                “DISK”: “100000”,
                “CPU”: “100”,
                “NW_IN”: “10000”,
                “NW_OUT”: “10000”
            },
            “doc”: “”
        },
        {
            “brokerId”: “2”,
            “capacity”: {
                “DISK”: “100000”,
                “CPU”: “100”,
                “NW_IN”: “10000”,
                “NW_OUT”: “10000”
            },
            “doc”: “”
        },
        {
            “brokerId”: “3”,
            “capacity”: {
                “DISK”: “100000”,
                “CPU”: “100”,
                “NW_IN”: “10000”,
                “NW_OUT”: “10000”
            },
            “doc”: “”
        }
    ]
}

Make sure the disk capacities match your cluster’s server specs, then save and close the file.

Configuring Cruise Control for KRaft Mode

Now that the broker capacities are set up, you need to configure Cruise Control to connect to your Kafka cluster in KRaft mode (without ZooKeeper). To do that, you’ll edit the cruisecontrol.properties file under the config/ directory:

Open the cruisecontrol.properties file:

$ nano config/cruisecontrol.properties

Find the line with the bootstrap.servers property, which specifies which broker to connect to. Replace it with the address of your Kafka broker:

bootstrap.servers=kafka1.your_domain:9092

Next, find the kafka.broker.failure.detection.enable parameter and enable KRaft mode:

# Switch to KRaft mode
kafka.broker.failure.detection.enable=true

Finally, find the capacity.config.file parameter, which specifies the path to the broker capacity configuration file. Uncomment the capacity.config.file=config/capacity.json line and comment out the capacity.config.file=config/capacityJBOD.json line:

# The configuration for the BrokerCapacityConfigFileResolver (supports JBOD, non-JBOD, and heterogeneous CPU core capacities)
capacity.config.file=config/capacity.json
#capacity.config.file=config/capacityJBOD.json

Save and close the file when you’re done.

Starting Cruise Control

With everything set up, you can finally start Cruise Control by running this command in a separate terminal:

$ ./kafka-cruise-control-start.sh config/cruisecontrol.properties

Now Cruise Control will start monitoring and optimizing your Kafka cluster in real-time. You’ll see continuous output in your terminal as it balances the cluster’s workloads.

Using the Cruise Control CLI

Cruise Control has a REST API running on port 9090 that you can use for configuration and administrative tasks. But here’s the kicker: the project also provides cccli , a Python tool that wraps this API and makes it much easier to use.

First, navigate to your Python virtual environment (which you set up earlier):

$ cd ~/venv

Activate it by running:

$ ./bin/activate

Then, install the cruise-control-client package using pip:

$ pip install cruise-control-client

After the installation, the cccli command is ready to use. You can now interact with the Cruise Control REST API using this command. For example, to fetch stats about your cluster’s current load, run:

$ cccli -a localhost:9090 load

This will give you detailed metrics on the cluster’s performance, like disk usage, CPU load, and network throughput.

Enabling Auto-Healing for Broker Failures

Cruise Control can be configured to automatically heal the cluster in case of broker failure, goal violations, or metric anomalies. To enable self-healing for broker failures, run this command:

$ cccli -a localhost:9090 admin –enable-self-healing-for broker_failure

The command will show the old and new states of the setting, letting you know that self-healing is now enabled:

Output

{
    selfHealingEnabledBefore: {BROKER_FAILURE=false},
    selfHealingEnabledAfter: {BROKER_FAILURE=true}
}

With Cruise Control up and running, your Kafka cluster will be monitored and optimized for efficiency and reliability. Plus, you’ve learned how to install and use the cccli tool to manage Cruise Control through the command line.

Read more about optimizing Kafka cluster performance with Cruise Control and its automation features in this detailed guide: Automating Kafka Rebalances with Cruise Control

Conclusion

In conclusion, mastering Kafka management with tools like KafkaAdminClient, kcat, and Cruise Control can significantly improve your Apache Kafka cluster’s efficiency and performance. By using KafkaAdminClient, you can easily manage resources such as topics and partitions, while kcat offers a Java-free way to interact with your cluster. Cruise Control takes it a step further by optimizing workloads and ensuring balanced performance across your Kafka brokers. With the step-by-step instructions provided in this tutorial, you now have the knowledge to enhance your Kafka infrastructure, ensuring better scalability and reliability. As Kafka continues to evolve, integrating these tools will be crucial in maintaining efficient, high-performing clusters in the future.

Optimize RAG Applications with Large Language Models and GPU (2025)