Bota de primavera | Cómo consumir mensajes de string usando Apache Kafka

Apache Kafka es una cola de mensajería de publicación y suscripción que se utiliza para flujos de datos en tiempo real. Una cola de mensajería le permite enviar mensajes entre procesos, aplicaciones y servidores. En este artículo veremos cómo enviar mensajes de string desde apache kafka a la consola de una aplicación de arranque de primavera. 

Acercarse: 

Paso 1: vaya a spring initializr y cree un proyecto inicial con la siguiente dependencia: 

  • Primavera para Apache Kafka

Nota: También podemos crear un proyecto maven y agregar el siguiente código al archivo  pom.xml .

Xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

Paso 2: Abra el proyecto en un IDE y sincronice las dependencias. Ahora cree una nueva configuración de clase y agregue las anotaciones @Configuration y @EnableKafka

Paso 3: ahora cree beans ConsumerFactory y ConcurrentKafkaListenerContainerFactory con el objeto String. 

Java

// Java program to consume string
// messages using spring kafka
 
@EnableKafka
@Configuration
public class Config {
 
    // Function to establish
    // connection between spring
    // application and kafka server
    @Bean
    public ConsumerFactory<String, String>
    consumerFactory()
    {
 
        // HashMap to store the configurations
        Map<String, Object> map = new HashMap<>();
 
        // put the host IP in the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
 
        // put the group ID in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
 
        return new DefaultKafkaConsumerFactory<>(map);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,
                                                   String>
    kafkaListner()
    {
        ConcurrentKafkaListenerContainerFactory<String,
                                                String>
            obj
            = new ConcurrentKafkaListenerContainerFactory<>();
        obj.setConsumerFactory(consumerFactory());
        return obj;
    }
}

Paso 4: cree una clase KafkaService con la anotación @Service . Esta clase contendrá el método de escucha para publicar el mensaje en la consola. 

Java

@Service
public class KafkaService {
 
    // Annotation required to listen the
    // message from kafka server
    @KafkaListener(topics = "StringProducer",
                   groupId = "id")
    public void
    publish(String message)
    {
        System.out.println(
            "You have a new message: "
            + message);
    }
}

Paso 5: Inicie zookeeper y luego el servidor kafka usando el siguiente comando.

Para Windows: 
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties 
.\bin\windows\kafka-server-start.bat .\config\server.properties 
 

Para Mac y Linux: 
bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties 
 

Paso 6: ahora necesitamos crear un nuevo tema con el nombre StringProducer . Para hacerlo, abra una nueva ventana del símbolo del sistema y cambie el directorio al directorio kafka. Cree un nuevo tema usando el comando dado a continuación: 

// Para Mac y Linux 
bin/kafka-topics.sh –create –zookeeper localhost:2181 –factor de replicación 1 –particiones 1 –topic nombre_tema
// Para Windows 
.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –factor de replicación 1 –particiones 1 –tema nombre_tema 
 

Paso 7: ahora para ejecutar la consola del productor kafka, use el siguiente comando: 

// Para Mac y Linux 
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example 
// Para Windows 
.\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 – tema Kafka_Example 
 

Paso 7: Ejecute la aplicación y escriba el mensaje en el productor kafka y presione enter. 

Producción:  

Aquí escriba un mensaje en formato de string en el productor kafka 

>Hello
>Welcome to GeeksForGeeks

Producción:  

Publicación traducida automáticamente

Artículo escrito por AakashYadav4 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *