Apache Kafka – Producer Consumer Application
Apache Kafka – Simple Producer-Consumer Example for Insurance
In this example, we’ll set up a Kafka producer and Kafka consumer for a simple insurance system. The producer will simulate sending insurance policy data (e.g., customer information and policy details), and the consumer will process (or log) the incoming messages.
We will use the following steps:
- Set up the Kafka environment (already covered in previous installation guides).
- Create a Kafka topic called insurance_policies
- Implement a Kafka producer to send insurance policy data.
- Implement a Kafka consumer to consume and process the insurance policy data.
Assumptions:
- Kafka is installed and running on your machine.
- Zookeeper is running (if you’re using Kafka versions before 2.8.0, or you can use Kafka in KRaft mode if it’s version 2.8.0 or later).
Step 1: Create Kafka Topic
Create a topic to store insurance policy messages. You can do this through the Kafka command line.
- Open a terminal and navigate to your Kafka directory.
- Run the following command to create the insurance_policies topic:
bin/kafka-topics.sh --create --topic insurance_policies --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
This will create a topic named insurance_policies with a single partition and a replication factor of 1 (for local setup).
Step 2: Kafka Producer (Insurance Policy Data)
The Kafka producer will simulate sending insurance policy data to Kafka. We’ll create a simple JSON message to represent a policy and send it to the insurance_policies topic.
Producer Code in Java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class InsuranceProducer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String topic = "insurance_policies";
// Create properties for the producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create Kafka producer
KafkaProducer producer = new KafkaProducer<>(properties);
// Sample insurance policy data (in JSON format)
String policyData = "{"
+ "\"policyId\": \"P12345\","
+ "\"customerName\": \"John Doe\","
+ "\"policyType\": \"Health Insurance\","
+ "\"coverageAmount\": 50000,"
+ "\"premiumAmount\": 1200"
+ "}";
// Send data to the insurance_policies topic
ProducerRecord record = new ProducerRecord<>(topic, null, policyData);
try {
producer.send(record);
System.out.println("Sent insurance policy: " + policyData);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
Explanation:
- KafkaProducer : This class is used to send messages to Kafka.
- ProducerRecord : Represents the message (key, value) sent to a Kafka topic.
- The policyData is a JSON string representing an insurance policy with details like policyId, customerName, policyType, coverageAmount, and premiumAmount.
To run the producer:
- Save the code in a file called InsuranceProducer.java.
- Compile and run the code with the following commands:
javac -cp "path/to/kafka/libs/*" InsuranceProducer.java java -cp ".;path/to/kafka/libs/*" InsuranceProducer
This will send a message to the insurance_policies topic.
Step 3: Kafka Consumer (Processing Insurance Policies)
The Kafka consumer will listen to the insurance_policies topic and process incoming policy data.
Consumer Code in Java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.Collections;
public class InsuranceConsumer {
public static void main(String[] args) {
String bootstrapServers = "localhost:9092";
String groupId = "insurance_group";
String topic = "insurance_policies";
// Create properties for the consumer
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Create Kafka consumer
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// Subscribe to the insurance_policies topic
consumer.subscribe(Collections.singletonList(topic));
// Poll for new messages
try {
while (true) {
var records = consumer.poll(1000); // Poll every 1 second
for (ConsumerRecord record : records) {
// Process the insurance policy message
System.out.println("Received insurance policy: " + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Explanation:
- KafkaConsumer: This class reads messages from Kafka.
- ConsumerConfig: Configuration properties for the consumer.
- The consumer subscribes to the insurance_policies topic and continuously polls for new messages.
- When a new message is received, it prints out the policy data.
To run the consumer:
- Save the code in a file called InsuranceConsumer.java.
- Compile and run the code with the following commands:
javac -cp "path/to/kafka/libs/*" InsuranceConsumer.java
java -cp ".;path/to/kafka/libs/*" InsuranceConsumer
The consumer will keep listening for messages and process them as they come in.
Step 4: Run the Example
- Start Zookeeper (if not already running):
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start Kafka (if not already running):
bin/kafka-server-start.sh config/server.properties
- Run the producer to send the insurance policy data:
java -cp ".;path/to/kafka/libs/*" InsuranceProducer
- Run the consumer to process the incoming insurance policy data:
java -cp ".;path/to/kafka/libs/*" InsuranceConsumer
Example Output
- Producer Output:
Sent insurance policy: {"policyId": "P12345", "customerName": "John Doe", "policyType": "Health Insurance", "coverageAmount": 50000, "premiumAmount": 1200}
- Consumer Output:
Received insurance policy: {"policyId": "P12345", "customerName": "John Doe", "policyType": "Health Insurance", "coverageAmount": 50000, "premiumAmount": 1200}
Conclusion
This simple example demonstrates how you can use Kafka to implement an insurance system where:
- The producer sends insurance policy data (e.g., policy ID, customer details, coverage amount).
- The consumer processes these policy details by consuming messages from Kafka.
This can be expanded to handle more complex scenarios, such as multiple producers, consumers, data transformations, and fault-tolerant configurations.
Recent Comments