Spring Boot: consumir objetos JSON de temas de Kafka

Apache Kafka es un sistema de mensajería de publicación y suscripción. Un sistema de mensajería permite que alguien envíe mensajes entre procesos, aplicaciones y servidores. En términos generales, Apache Kafka es un software donde los temas (un tema puede ser una categoría) se pueden definir y procesar. Las aplicaciones pueden conectarse a este sistema y transferir un mensaje al tema. Un mensaje puede incluir cualquier tipo de información, de cualquier evento en su blog personal o puede ser un mensaje de texto muy simple que desenstringría cualquier otro evento. Lea más sobre Kafka aquí . En este artículo, Spring Boot Kafka Consumer Example , analizamos cómo podemos consumir mensajes de temas de Kafka con Spring Boot. Pero en un programa complejo, necesitamos consumir objetos  JSON de temas de Kafka.

Requisito previo : asegúrese de haber instalado Apache Kafka en su máquina local. Consulte este artículo ¿Cómo instalar y ejecutar Apache Kafka en Windows?

Implementación:

Paso 1: vaya a este enlace https://start.spring.io/ y cree un proyecto Spring Boot. Agregue la dependencia » Spring for Apache Kafka » a su proyecto Spring Boot. 

Paso 2: Cree una clase POJO simple llamada Libro dentro del paquete Modelo. A continuación se muestra el código para el archivo Book.java .

Java

// Java Program to Illustrate Book Class
  
package com.amiya.kafka.apachekafkaconsumer.Model;
  
// Class
public class Book {
  
    // Class data members
    private String bookName;
    private String isbn;
  
    // Constructor 1
    public Book() {}
  
    // Constructor 2
    public Book(String bookName, String isbn)
    {
        // This keyword refers to
        // current instance itself
        this.bookName = bookName;
        this.isbn = isbn;
    }
  
    // Setter
    public String getBookName() { return bookName; }
  
    // Setter
    public void setBookName(String bookName)
    {
        this.bookName = bookName;
    }
  
    // Setter
    public String getIsbn() { return isbn; }
  
    // Setter
    public void setIsbn(String isbn) { this.isbn = isbn; }
}

Paso 3: Cree un archivo de configuración llamado KafkaConfig . A continuación se muestra el código del archivo KafkaConfig.java . Se agregan comentarios dentro del código para comprender el código con más detalle.

Ejemplo

Java

// Java Program to Illustrate Configuration Class
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
  
// Annotation
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory<String, Book> consumerFactory()
    {
  
        // Creating a map of string-object type
        Map<String, Object> config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
  
        // Returning message in JSON format
        return new DefaultKafkaConsumerFactory<>(
            config, new StringDeserializer(),
            new JsonDeserializer<>(Book.class));
    }
  
    // Creating a Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
                                                   Book>
    bookListener()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, Book> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
                                                       
        return factory;
    }
}

Paso 4: cree un archivo de consumidor llamado KafkaConsumer .

Archivo: KafkaConsumer.java 

Java

// Java Program to Illustrate kafka Consumer Class
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id",
                   containerFactory = "bookListener")
  
    // Method
    public void
    consume(Book book)
    {
        // Print statement
        System.out.println("message = " + book);
    }
}

Paso 5: ahora tenemos que hacer lo siguiente para consumir mensajes de temas de Kafka con Spring Boot

  1. Ejecute el servidor Apache Zookeeper
  2. Ejecute el servidor Apache Kafka
  3. Enviar el objeto JSON desde Kafka Topics

Ejecute su servidor Apache Zookeeper usando este comando:

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

De manera similar, ejecute su servidor Apache Kafka usando este comando

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

Ejecute el siguiente comando para enviar el objeto JSON desde Kafka Topics

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

Paso 6: Ahora ejecute su aplicación Spring Boot. Asegúrese de haber cambiado el número de puerto en el archivo application.properties .

server.port=8081

Ejecutemos la aplicación de arranque Spring dentro del archivo ApacheKafkaConsumerApplication.

Producción:

En el resultado, se puede ver cuando envía el objeto JSON desde Kafka Topics, se muestra en la consola en tiempo real. 

Output

Publicación traducida automáticamente

Artículo escrito por AmiyaRanjanRout y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *