Apache Kafka – Application with KRaft Mode, without zookeeper
Running Apache Kafka with KRaft Mode (without Zookeeper)
Starting from Apache Kafka 2.8.0, you can run Kafka without Zookeeper by enabling KRaft mode (Kafka Raft Metadata mode). In KRaft mode, Kafka brokers manage metadata and coordinate cluster activities using the Kafka Raft Consensus Protocol (KRaft), eliminating the need for Zookeeper.
Below is a step-by-step guide to set up and run a Kafka application in KRaft mode, without using Zookeeper:
Step 1: Ensure You’re Using Kafka 2.8.0 or Later
KRaft mode was introduced in Kafka 2.8.0 as an experimental feature, and it became more stable in later releases (2.8.x and 2.9.x). To run Kafka without Zookeeper, you must be using Kafka 2.8.0 or newer.
You can download the latest version of Apache Kafka from the official website.
Step 2: Configure Kafka to Run in KRaft Mode
- Download and Extract Kafka:If you haven’t already, download and extract the Kafka distribution:
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz tar -xvf kafka_2.13-2.8.0.tgz cd kafka_2.13-2.8.0
- Update server.properties : You need to configure Kafka to run without Zookeeper by specifying KRaft mode. Edit the config/server.properties file and make the following changes:
- Set process roles to include broker and controller : process.roles=broker,controller
- Set the directory for metadata storage : Specify the directory where Kafka will store metadata logs (it must be separate from the data log directory). metadata.log.dir=/tmp/kraft-metadata
- Disable Zookeeper by leaving the zookeeper.connect property empty or commenting it out: #zookeeper.connect=
- Configure listeners to bind Kafka to the appropriate network interface. For example:
listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092
- Optional: Set log directories:If you want to set up a specific directory for Kafka logs (where messages will be stored), you can configure it as follows : log.dirs=/tmp/kafka-logs
- Ensure Kafka is configured to handle data replication (if you plan to run more than one broker) :
num.partitions=1
log.retention.hours=168
Step 3: Start Kafka in KRaft Mode
Once you have configured Kafka to run in KRaft mode, you can start the Kafka server:
bin/kafka-server-start.sh config/server.properties
This will start Kafka as a broker and controller in KRaft mode. Since KRaft mode is intended for environments where there is no external Zookeeper, Kafka brokers will manage their own metadata internally.
Step 4: Verify Kafka in KRaft Mode
You can verify that Kafka is running in KRaft mode by checking the logs for the absence of Zookeeper connections and the presence of KRaft-specific log entries.
- Check Kafka Logs: In the terminal where Kafka is running, you should see logs indicating that Kafka is working as both the broker and controller.
- Zookeeper Not Used: There should be no logs related to Zookeeper.
For example:
[2024-01-01 12:00:00,000] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser)
[2024-01-01 12:00:00,000] INFO KRaft mode is enabled. (org.apache.kafka.server.KafkaServer)
Step 5: Create Kafka Topics
In KRaft mode, you still use the kafka-topics.sh command to create topics, just like in traditional Zookeeper-based Kafka deployments.
To create a topic, run:
bin/kafka-topics.sh --create --topic insurance_policies --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
This will create a topic named insurance_policies with one partition and a replication factor of 1.
Step 6: Kafka Producer Example (Insurance Data)
Let’s create a simple Kafka producer that sends insurance data to the insurance_policies topic.
Producer Code in Java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class InsuranceProducer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topic = "insurance_policies";
// Set properties for the producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create Kafka producer
KafkaProducer producer = new KafkaProducer<>(properties);
// Sample insurance policy data (in JSON format)
String policyData = "{"
+ "\"policyId\": \"P12345\","
+ "\"customerName\": \"John Doe\","
+ "\"policyType\": \"Health Insurance\","
+ "\"coverageAmount\": 50000,"
+ "\"premiumAmount\": 1200"
+ "}";
// Send data to the insurance_policies topic
ProducerRecord record = new ProducerRecord<>(topic, null, policyData);
try {
producer.send(record);
System.out.println("Sent insurance policy: " + policyData);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
Run this producer using:
javac -cp "path/to/kafka/libs/*" InsuranceProducer.java
java -cp ".;path/to/kafka/libs/*" InsuranceProducer
This sends an insurance policy message to the insurance_policies topic.
Step 7: Kafka Consumer Example (Processing Insurance Data)
Let’s create a Kafka consumer that listens to the insurance_policies topic and processes the incoming data.
Consumer Code in Java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.Collections;
public class InsuranceConsumer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "insurance_group";
String topic = "insurance_policies";
// Set properties for the consumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create Kafka consumer
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// Subscribe to the insurance_policies topic
consumer.subscribe(Collections.singletonList(topic));
// Poll for new messages
try {
while (true) {
var records = consumer.poll(1000); // Poll every 1 second
for (ConsumerRecord record : records) {
// Process the insurance policy message
System.out.println("Received insurance policy: " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Run this consumer using:
javac -cp "path/to/kafka/libs/*" InsuranceConsumer.java
java -cp ".;path/to/kafka/libs/*" InsuranceConsumer
This consumer listens for messages from the insurance_policies topic and processes them.
Conclusion
By following the steps above, you can run a Kafka application without Zookeeper using KRaft mode starting from Kafka 2.8.0. The Kafka brokers will manage their own metadata, and you can send and consume messages as you would in a traditional Kafka setup.
KRaft mode is suitable for smaller or simpler Kafka clusters, and it simplifies the architecture by eliminating the need for managing Zookeeper separately. However, if you’re running a large-scale, production-grade Kafka cluster, it may still be advisable to use the traditional Zookeeper-based approach.
Recent Comments