Bota de primavera | Cómo consumir mensajes JSON usando Apache Kafka

Apache Kafka es un sistema de procesamiento de flujo que le permite enviar mensajes entre procesos, aplicaciones y servidores. En este artículo veremos cómo publicar mensajes JSON en la consola de una aplicación Spring boot utilizando Apache Kafka.

Para aprender a crear un proyecto Spring Boot, consulte este artículo .

Pasos de trabajo: 
 

  1. Vaya al inicializador Spring y cree un proyecto inicial con la siguiente dependencia: 
    • Primavera para Apache Kafka
  2. Abra el proyecto en un IDE y sincronice las dependencias. En este artículo, estaríamos creando un modelo de estudiante donde estaríamos publicando los detalles del estudiante. Por lo tanto, cree una clase modelo Student . Agregue miembros de datos y cree un constructor y anule el método toString para ver los mensajes en formato JSON. La siguiente es la implementación de la clase de estudiante:
     

Student Model

// Java program to implement a
// student class
 
// Creating a student class
public class Student {
 
    // Data members of the class
    int id;
    String firstName;
    String lastName;
 
    // Constructor of the student
    // class
    public Student()
    {
    }
 
    // Parameterized constructor of
    // the student class
    public Student(int id, String firstName,
                   String lastName)
    {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
    }
 
    @Override
    public String toString()
    {
        return "Student{"
            + "id = " + id
            + ", firstName = '" + firstName + "'"
            + ", lastName = '" + lastName + "'"
            + "}";
    }
}
  1.  
  2. Cree una nueva configuración de clase y agregue anotaciones @Configuration y @EnableKafka . Ahora cree beans ConsumerFactory y ConcurrentKafkaListenerContainerFactory con el objeto de clase Student.
     

Config clas

@EnableKafka
@Configuration
public class Config {
 
    // Function to establish a connection
    // between Spring application
    // and Kafka server
    @Bean
    public ConsumerFactory<String, Student>
    studentConsumer()
    {
 
        // HashMap to store the configurations
        Map<String, Object> map
            = new HashMap<>();
 
        // put the host IP in the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
 
        // put the group ID of consumer in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
 
        // return message in JSON formate
        return new DefaultKafkaConsumerFactory<>(
            map, new StringDeserializer(),
            new JsonDeserializer<>(Student.class));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
                                                   Student>
    studentListner()
    {
        ConcurrentKafkaListenerContainerFactory<String,
                                                Student>
            factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(studentConsumer());
        return factory;
    }
}
  1.  
  2. Cree una clase KafkaService con la anotación @Service . Esta clase contendrá el método de escucha para publicar el mensaje en la consola.
     

KafkaService Class

@Service
public class KafkaService {
 
    // Annotation required to listen
    // the message from Kafka server
    @KafkaListener(topics = "JsonTopic",
                   groupId = "id", containerFactory
                                   = "studentListner")
    public void
    publish(Student student)
    {
        System.out.println("New Entry: "
                           + student);
    }
}
  1.  
  2. Inicie zookeeper y el servidor Kafka. Ahora necesitamos crear un nuevo tema con el nombre JsonTopic . Para hacerlo, abra una nueva ventana del símbolo del sistema y cambie el directorio al directorio de Kafka.
  3. Ahora crea un nuevo tema usando el comando dado a continuación:
     

bin/Kafka-topics.sh –create –zookeeper localhost:2181 –factor de replicación 1 –particiones 1 –topic topic_name // para mac y linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 – factor de replicación 1 –particiones 1 –tema nombre_tema // para Windows 
 

  1.  
  2. Ahora, para ejecutar la consola de producción de Kafka, use el siguiente comando:
     

bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // para mac y linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // para ventanas 
 

  1.  
  2. Ejecute la aplicación y escriba el mensaje en el productor de Kafka y presione enter.

Publicación traducida automáticamente

Artículo escrito por AakashYadav4 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *