DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Low-Code Development: Leverage low and no code to streamline your workflow so that you can focus on higher priorities.

DZone Security Research: Tell us your top security strategies in 2024, influence our research, and enter for a chance to win $!

Launch your software development career: Dive head first into the SDLC and learn how to build high-quality software and teams.

Open Source Migration Practices and Patterns: Explore key traits of migrating open-source software and its impact on software development.

Related

  • Custom Health Checks in Spring Boot
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Competing Consumers With Spring Boot and Hazelcast
  • Preventing Data Loss With Kafka Listeners in Spring Boot

Trending

  • OpenID Connect Flows: From Implicit to Authorization Code With PKCE and BFF
  • Documenting a Spring REST API Using Smart-doc
  • How To Use Thread.sleep() in Selenium
  • Application Telemetry: Different Objectives for Developers and Product Managers
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Building Kafka Producer With Spring Boot

Building Kafka Producer With Spring Boot

In this article, I am going to show you how to build Message Publisher using Apache Kafka and Spring Boot. First, we will talk about what Apache Kafka is.

By 
Sai Krishna Reddy Chityala user avatar
Sai Krishna Reddy Chityala
·
Jun. 26, 24 · Tutorial
Like (2)
Save
Tweet
Share
3.3K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, I am going to show you how to build Message Publisher using Apache Kafka and Spring Boot. First, we will talk about what Apache Kafka is. 

Apache Kafka is an open-source, distributed streaming platform designed for real-time event processing. It provides a reliable, scalable, and fault-tolerant way to handle large volumes of data streams. Kafka allows you to publish and subscribe to data topics, making it ideal for building event-driven applications, log aggregation, and data pipelines. 

Prerequisites

  1. Apache Kafka
  2. Java
  3. Apache Maven
  4. Any IDE (Intellij or STS or Eclipse)

Project Structure

Project Structure

In this project, we will expose an endpoint to create a user and we will publish UserCreatedEvent to Kafka Topic. 

application.yml file

YAML
 
spring:
  application:
    name: message-publisher

  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

app:
  topic_name: users-topic

server:
  port: 8089


  • spring.application.name is used to define the application name.
  • bootstrap-servers specifies the hostname and port number of Kafka.

Serializer specifies which serializer needs to be used to convert Java object to bytes before sending it to Kafka. Based on key type we can use StringSerializer or IntegerSerializer.

(Example: org.apache.kafka.common.serialization.StringSerializer)

  • key-serializer is used in a scenario when the same keys should go to the same partition.
  • value-serializer specifies which serializer needs to be used to convert Java objects to bytes before sending Kafka. If we are using a custom java class as value, then we can use JSONSerializer as value-serializer.

pom.xml

XML
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>3.3.0</version>
  <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <groupId>com.lights5.com</groupId>
 <artifactId>message-publisher</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>message-publisher</name>
 <description>Demo project for Kafka Producer using Spring Boot</description>
 <properties>
  <java.version>17</java.version>
 </properties>
 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
  </dependency>

  <dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <configuration>
     <excludes>
      <exclude>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
      </exclude>
     </excludes>
    </configuration>
   </plugin>
  </plugins>
 </build>

</project>


spring web, spring kafka are required dependencies.

ApplicationConfiguration class

Java
 
package com.lights5.com.message.publisher;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "app")
public class AppConfig {

    private String topicName;
}


This class is used to bind configuration values from application.yml file to the respective fields. 

Application class

Java
 
package com.lights5.com.message.publisher;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
@RequiredArgsConstructor
public class Application {

 private final AppConfig appConfig;

 public static void main(String[] args) {
  SpringApplication.run(Application.class, args);
 }

 @Bean
 NewTopic usersTopic() {

  return TopicBuilder.name(appConfig.getTopicName())
    .partitions(3)
    .replicas(2)
    .build();
 }
}


NewTopic Bean is used to create a topic if the topic doesn’t exist already on the Kafka broker. We can configure the required number of partitions and replicas as we need.

Model Classes

User class

Java
 
package com.lights5.com.message.publisher;

import java.time.LocalDateTime;

record User (
        String firstName,
        String lastName,
        String email,
        Long phoneNumber,
        Address address,
        LocalDateTime createdAt) {

    record Address (
            String city,
            String country,
            String zipcode) {

    }
}


EventType enum

Java
 
package com.lights5.com.message.publisher;

enum EventType {

    USER_CREATED_EVENT;
}


EventPayload class

Java
 
package com.lights5.com.message.publisher;

record EventPayload (
        EventType eventType,
        String payload) {

}


Endpoint to Create User (UserController class)

Java
 
package com.lights5.com.message.publisher;

import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import static com.lights5.com.message.publisher.EventType.USER_CREATED_EVENT;

@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/users")
class UsersController {

    private final UsersService usersService;

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public void createUser(@RequestBody User user) {

        usersService.publishMessage(user, USER_CREATED_EVENT);
    }
}


UsersController class exposes the POST method to create a user, which in turn calls a method in the UsersService class. 

UsersService class

Java
 
package com.lights5.com.message.publisher;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
class UsersService {

    private final AppConfig appConfig;
    private final ObjectMapper objectMapper;
    private final KafkaTemplate<String, EventPayload> kafkaTemplate;

    public void publishMessage(User user, EventType eventType) {

        try {

            var userCreatedEventPayload = objectMapper.writeValueAsString(user);
            var eventPayload = new EventPayload(eventType, userCreatedEventPayload);
            kafkaTemplate.send(appConfig.getTopicName(), eventPayload);
        }
        catch (JsonProcessingException ex) {

            log.error("Exception occurred in processing JSON {}", ex.getMessage());
        }
    }
}


KafkaTemplate is used to send messages to Kafka. Spring Boot autoconfigures KafkaTemplate and injects to the required class.

KafkaTemplate<K, V> is of this form. Here K is the key type and V is the value type.

In our case key is String type and V is EventPayload class type. So we need to use StringSerializer for the key and JsonSerializer (EventPayload is the custom Java class type) for values.

kafkaTemplate.send() method takes topicName as 1st parameter and data to be published as 2nd argument.

Running Kafka in Local

To run this application locally, first, we need to run Kafka locally and then start the Spring Boot application.

Please use this docker-compose file to run Kafka locally.

YAML
 
version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888


  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 5
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 6
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1


  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 7
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    depends_on:
      - zoo1


docker-compose -f up . Run this command in the directory where the compose file is located.

The above command starts the Kafka locally.

Testing Using Postman

Testing Using Postman

Endpoint: (POST method)

Payload

JSON
 
{
    "firstName": "John",
    "lastName": "Albert",
    "email": "johnalbert@gmail.com",
    "phoneNumber": "9999999999",
    "address": {
        "city": "NewYork",
        "country": "USA",
        "zipcode": "111111"
    },
    "createdAt": "2024-06-06T16:46:00"
}


You can verify using kafka-console-consumer command whether the data is published or not.

Source Code.

Conclusion

Spring Boot provides easy integration with Kafka and helps us create pub sub-model applications easily with minimal configurations. We can develop Microservices event-driven applications easily with Spring Boot and Kafka.

kafka Spring Boot

Published at DZone with permission of Sai Krishna Reddy Chityala. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Custom Health Checks in Spring Boot
  • Reactive Event Streaming Architecture With Kafka, Redis Streams, Spring Boot, and HTTP Server-Sent Events (SSE)
  • Competing Consumers With Spring Boot and Hazelcast
  • Preventing Data Loss With Kafka Listeners in Spring Boot

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: