Apache Kafka es un sistema de mensajería de publicación y suscripción. Un sistema de mensajería permite que alguien envíe mensajes entre procesos, aplicaciones y servidores. En términos generales, Apache Kafka es un software donde los temas (un tema puede ser una categoría) se pueden definir y procesar. Las aplicaciones pueden conectarse a este sistema y transferir un mensaje al tema. Un mensaje puede incluir cualquier tipo de información, de cualquier evento en su blog personal o puede ser un mensaje de texto muy simple que desenstringría cualquier otro evento. Lea más sobre Kafka aquí . En este artículo, Spring Boot Kafka Consumer Example , analizamos cómo podemos consumir mensajes de temas de Kafka con Spring Boot. Pero en un programa complejo, necesitamos consumir objetos JSON de temas de Kafka.
Requisito previo : asegúrese de haber instalado Apache Kafka en su máquina local. Consulte este artículo ¿Cómo instalar y ejecutar Apache Kafka en Windows?
Implementación:
Paso 1: vaya a este enlace https://start.spring.io/ y cree un proyecto Spring Boot. Agregue la dependencia » Spring for Apache Kafka » a su proyecto Spring Boot.
Paso 2: Cree una clase POJO simple llamada Libro dentro del paquete Modelo. A continuación se muestra el código para el archivo Book.java .
Java
// Java Program to Illustrate Book Class package com.amiya.kafka.apachekafkaconsumer.Model; // Class public class Book { // Class data members private String bookName; private String isbn; // Constructor 1 public Book() {} // Constructor 2 public Book(String bookName, String isbn) { // This keyword refers to // current instance itself this.bookName = bookName; this.isbn = isbn; } // Setter public String getBookName() { return bookName; } // Setter public void setBookName(String bookName) { this.bookName = bookName; } // Setter public String getIsbn() { return isbn; } // Setter public void setIsbn(String isbn) { this.isbn = isbn; } }
Paso 3: Cree un archivo de configuración llamado KafkaConfig . A continuación se muestra el código del archivo KafkaConfig.java . Se agregan comentarios dentro del código para comprender el código con más detalle.
Ejemplo
Java
// Java Program to Illustrate Configuration Class package com.amiya.kafka.apachekafkaconsumer.config; // Importing required classes import com.amiya.kafka.apachekafkaconsumer.Model.Book; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; // Annotation @EnableKafka @Configuration // Class public class KafkaConfig { @Bean public ConsumerFactory<String, Book> consumerFactory() { // Creating a map of string-object type Map<String, Object> config = new HashMap<>(); // Adding the Configuration config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); config.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // Returning message in JSON format return new DefaultKafkaConsumerFactory<>( config, new StringDeserializer(), new JsonDeserializer<>(Book.class)); } // Creating a Listener @Bean public ConcurrentKafkaListenerContainerFactory<String, Book> bookListener() { ConcurrentKafkaListenerContainerFactory< String, Book> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Paso 4: cree un archivo de consumidor llamado KafkaConsumer .
Archivo: KafkaConsumer.java
Java
// Java Program to Illustrate kafka Consumer Class package com.amiya.kafka.apachekafkaconsumer.consumer; // Importing required classes import com.amiya.kafka.apachekafkaconsumer.Model.Book; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; // Annotation @Component // Class public class KafkaConsumer { @KafkaListener(topics = "NewTopic", groupId = "group_id", containerFactory = "bookListener") // Method public void consume(Book book) { // Print statement System.out.println("message = " + book); } }
Paso 5: ahora tenemos que hacer lo siguiente para consumir mensajes de temas de Kafka con Spring Boot
- Ejecute el servidor Apache Zookeeper
- Ejecute el servidor Apache Kafka
- Enviar el objeto JSON desde Kafka Topics
Ejecute su servidor Apache Zookeeper usando este comando:
C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
De manera similar, ejecute su servidor Apache Kafka usando este comando
C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties
Ejecute el siguiente comando para enviar el objeto JSON desde Kafka Topics
C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic
Paso 6: Ahora ejecute su aplicación Spring Boot. Asegúrese de haber cambiado el número de puerto en el archivo application.properties .
server.port=8081
Ejecutemos la aplicación de arranque Spring dentro del archivo ApacheKafkaConsumerApplication.
Producción:
En el resultado, se puede ver cuando envía el objeto JSON desde Kafka Topics, se muestra en la consola en tiempo real.
Publicación traducida automáticamente
Artículo escrito por AmiyaRanjanRout y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA