By Joshua Matos

Apache Kafka, Data Streaming, Java, KafkaTemplate, Publish-Subscribe Model, Real-time Applications, Spring Boot, Spring Kafka

Apache Kafka is becoming an increasingly popular platform for building real-time, scalable data streaming applications. This post will take a high-level look at Kafka and show you how to use it with Spring Boot.

What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform.

Here are some of its key capabilities and features:

  • Kafka provides high throughput and low latency messaging with built-in replication and fault tolerance.
  • It uses a publish-subscribe model where applications publish message streams as "topics" to which other applications can subscribe.
  • Messages are persisted and replicated within the Kafka cluster for reliability.
  • Kafka scales by adding brokers to a cluster. A single cluster can handle trillions of messages per day.
  • It supports stream processing of real-time data to enable capabilities like analytics and monitoring.
  • Kafka integrates with big data technologies like Hadoop, Spark, and more.
  • It has connector APIs to enable streaming data integration with databases, cloud services, and legacy systems.

Kafka Key Concepts
Some key concepts in Kafka include:

  • Topics: Named streams of messages producers publish and consumers subscribe to.
  • Brokers:  Kafka cluster nodes that manage message streams and operations.
  • Producers: Applications that publish messages on Kafka topics.
  • Consumers:Applications that subscribe to topics and process messages.
  • Consumer Groups: Groups of consumers jointly consume a topic, allowing parallel processing.


Spring Boot + Kafka

Spring Boot is a popular framework for building Java applications. It provides auto-configuration that simplifies integrating with Kafka:

  • The Spring Kafka library auto-configures producers and consumers, manages connections to brokers, serializes objects into messages, etc.
  • Annotations like @KafkaListener simplify creating consumers.
  • The KafkaTemplate class provides convenient methods for producers to send messages.
  • Spring Boot configures Kafka connection details, topics, offsets, etc.


Hands-on Example
Let's walk through a simple example to see Kafka in action with Spring Boot:

1. Set up a Kafka cluster: We'll use Docker Compose to quickly spin up a single broker cluster with a Control Center GUI for monitoring. Make sure you run docker compose up -d to start the containers.


version: '1'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: LISTENER_LOCAL://localhost:9092,LISTENER_DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_LOG_RETENTION_DAYS: 6
  kafka2:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9093:9093"
    depends_on:
        - zookeeper
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_ADVERTISED_LISTENERS: LISTENER_LOCAL://localhost:9093,LISTENER_DOCKER://kafka2:9094
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        KAFKA_LOG_RETENTION_DAYS: 6
  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    depends_on:
      - kafka
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:9093,localhost:9092
      CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONTROL_CENTER_REPLICATION_FACTOR: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT

2. Create a Spring Boot app: We'll generate a web app with the Spring Kafka dependency using the Spring Initializr.

Spring Initializr Project: Download Project

3. Add a producer: Autowire a KafkaTemplate and use it to send messages to a "messages" topic.


import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message){
        kafkaTemplate.send("machineLearning", message);
    }
}

4. Add a consumer: Create a bean marked @KafkaListener subscribed to the "messages" topic to receive messages.


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "machineLearning", groupId = "ml-group")
    public void consume(String message){
        System.out.println("+++++++++++++++++++++++++++++++++++++++++");
        System.out.println("Consumed message: " + message);
        System.out.println("+++++++++++++++++++++++++++++++++++++++++");
    }
}

5. Add a Controller: Create a bean marked @KafkaListener subscribed to the "messages" topic to receive messages.


import com.example.apachekafka.kafka.KafkaProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    private final KafkaProducer kafkaProducer;

    public KafkaController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    String[] randomMessageArray = {"Sentiment", "Anomaly", "Pointed", "Kafka", "ProductEngineer"};
    
    @GetMapping("/publish")
    public void sendMessage() {

        int randomMessageIndex = (int) (Math.random() * randomMessageArray.length);
        String message = randomMessageArray[randomMessageIndex];
        
        kafkaProducer.sendMessage(message);
        System.out.println("Message sent: " + message);
    }
}

Run the app: Start the cluster and app. Any messages the producer sends will show up in the Control Center and get processed by the consumer.

That's it! With just a few lines of configuration and code, we can easily integrate a Kafka streaming pipeline into a Spring Boot application. Kafka handles all the complex clustering, messaging, and scalability under the hood.

Check out the video for more information on Apache Kafka and get started with Kafka and Spring Boot!

Watch the video here:

>
255 views
Share via
Copy link