×
☰ See All Chapters

Kafka Consumer Spring Boot Example

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
     
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <
modelVersion>4.0.0</modelVersion>
  <
parent>
     <
groupId>org.springframework.boot</groupId>
     <
artifactId>spring-boot-starter-parent</artifactId>
     <
version>2.1.6.RELEASE</version>
     <
relativePath/> <!-- lookup parent from repository, not local -->
 
</parent>
  <
groupId>com.java4coding</groupId>
  <
artifactId>KafkaConsumer</artifactId>
  <
version>0.0.1-SNAPSHOT</version>
  <
name>KafkaConsumer</name>
  <
description>Kafka Consumer Demo Project</description>
  <
properties>
     <
java.version>11</java.version>
     <
project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <
project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <
spring-boot.version>2.1.6.RELEASE</spring-boot.version>
  </
properties>
  <
dependencies>
     <
dependency>
        <
groupId>org.springframework.boot</groupId>
        <
artifactId>spring-boot-starter</artifactId>
     </
dependency>
     <
dependency>
        <
groupId>org.springframework.boot</groupId>
        <
artifactId>spring-boot-starter-web</artifactId>
     </
dependency>
     <
dependency>
        <
groupId>org.springframework.kafka</groupId>
        <
artifactId>spring-kafka</artifactId>
     </
dependency>

     <
dependency>
        <
groupId>org.springframework.boot</groupId>
        <
artifactId>spring-boot-starter-test</artifactId>
        <
scope>test</scope>
     </
dependency>
  </
dependencies>
  <
dependencyManagement>
     <
dependencies>
        <
dependency>
           <
groupId>org.springframework.boot</groupId>
           <
artifactId>spring-boot-dependencies</artifactId>
           <
version>${spring-boot.version}</version>
           <
type>pom</type>
           <
scope>import</scope>
        </
dependency>
     </
dependencies>
  </
dependencyManagement>

  <
build>
     <
plugins>
        <
plugin>
           <
groupId>org.springframework.boot</groupId>
           <
artifactId>spring-boot-maven-plugin</artifactId>
           <version>${spring-boot.version}</version>
           <
executions>
              <
execution>
                 <
id>build-info</id>
                 <
goals>
                    <
goal>build-info</goal>
                    <
goal>repackage</goal>
                 </
goals>
              </
execution>
           </
executions>
        </
plugin>
     </
plugins>
  </
build>
  <
repositories>
     <
repository>
        <
id>mdcm</id>
        <
name>mdcm</name>
        <
url>https://nexuscimgmt.sgp.dbs.com:8443/nexus/content/repositories/MDCM</url>
        <
snapshots>
           <
enabled>false</enabled>
        </
snapshots>
     </
repository>
  </
repositories>
  <
pluginRepositories>
     <
pluginRepository>
        <
id>mdcm</id>
        <
name>mdcm</name>
        <
url>https://nexuscimgmt.sgp.dbs.com:8443/nexus/content/repositories/MDCM</url>
        <
snapshots>
           <
enabled>false</enabled>
        </
snapshots>
     </
pluginRepository>
  </
pluginRepositories>

</
project>

 

application.properties

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: test_group
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server.port:8081

 

KafkaConsumer.java

package com.java4coding.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

   
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

   
@KafkaListener(topics = "demotopic",
           groupId =
"test_group")
   public void consume(String message){
       
LOGGER.info(String.format("Message received -> %s", message));
   }
}

 

DemoConsumerApplication.java

package com.java4coding.main;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan
("com.java4coding")
public class DemoConsumerApplication {

   
public static void main(String[] args) {
       
SpringApplication.run(DemoConsumerApplication.class, args);
   }

}

 

Project Directory Structure

kafka-consumer-spring-boot-example-0
 

Test the example application

Run the previous chapter example producer application and send a message to Kafka; you will immediately see the message being consumed by this application.

kafka-consumer-spring-boot-example-1
 
kafka-consumer-spring-boot-example-2
 

All Chapters
Author