Spring Boot: consumir mensajes a través de Kafka, guardar en ElasticSearch y trazar en Grafana

En este artículo, vamos a crear un programa para producir algunos datos utilizando Kafka Producer que consumirá Kafka Consumer y guardará en la base de datos de búsqueda elástica y, más adelante, trazará esos datos JSON en el tablero de Grafana. Comience con la configuración de todo el software y la herramienta necesarios.

Requisitos

  • Java 8
  • Arranque de primavera 2.7.0
  • Kafka 3.2.0
  • Búsqueda elástica DB 7.17.3

Nota: ¿Por qué mencionamos la versión?

La compatibilidad de versiones es de suma importancia en el caso de Elastic y Spring Boot. Si su versión de búsqueda elástica no coincide con la versión Spring Boot o viceversa, entonces tendrá problemas para configurar ambas. A continuación se muestra la lista de compatibilidad de versiones: 

Búsqueda elástica de datos de primavera

Búsqueda elástica

Marco de primavera

Bota de primavera

4.4.X

7.17.3

5.3.X

2.7.X

4.3.X

7.15.2

5.3.X

2.6.X

4.2.X

7.12.0

5.3.X

2.5.X

4.1.X

7.9.3

5.3.2

2.4.X

4.0.X

7.6.2

5.2.12

2.3.X

3.2.X

6.8.12

5.2.12

2.2.X

3.1.X

6.2.2

5.1.19

2.1.X

3.0.X

5.5.0

5.0.13

2.0.X

2.1.X

2.4.0

4.3.25

1.5.X

Descargar enlaces: 

Descargar y extraer de su sistema.

Crear aplicación de productor de Kafka

Cree un proyecto de arranque de primavera llamado Producer .

ProducerApplication.class

Java

package com.example.demo;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
   
}

KafkaProducerConfig.java

En esta clase, hemos proporcionado la configuración de Kafka. 

Java

package com.example.demo.config;
 
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.example.demo.model.User;
@Configuration
public class KafkaProducerConfig {
   
    @Bean
    public ProducerFactory<String, User> userProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                        StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                        JsonSerializer.class.getName());
        return new DefaultKafkaProducerFactory<>(configProps);
    }
   
    @Bean
    public KafkaTemplate<String, User> userKafkaTemplate() {
        return new KafkaTemplate<>(userProducerFactory());
    }
   
}

Usuario.java

Clase de modelo donde almacenamos la información del usuario.

Java

package com.example.demo.model;
 
public class User {
    int id;
    String name;
    String pdate;
    public User() {
        super();
    }
    public User(int id, String name, String pdate) {
        super();
        this.id = id;
        this.name = name;
        this.pdate = pdate;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPdate() {
        return pdate;
    }
    public void setPdate(String pdate) {
        this.pdate = pdate;
    }
}

KafkaService.java

La clase de servicio usa la plantilla kafka para enviar los datos al consumidor. 

Java

package com.example.demo.service;
 
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.example.demo.model.User;
 
@Service
public class KafkaService {
    private final Logger LOG = LoggerFactory.getLogger(KafkaService.class);
    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;
    String kafkaTopic = "gfg";
    public void send(User user) {
        LOG.info("Sending User Json Serializer : {}", user);
        kafkaTemplate.send(kafkaTopic, user);
    }
    public void sendList(List<User> userList) {
        LOG.info("Sending UserList Json Serializer : {}", userList);
        for (User user : userList) {
            kafkaTemplate.send(kafkaTopic, user);
        }
    }
}

ProducerController.java

Java

package com.example.demo.controller;
 
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
 
import com.example.demo.model.User;
import com.example.demo.service.KafkaService;
 
@RestController
public class ProducerController {
    @Autowired
    KafkaService kafkaProducer;
   
    @PostMapping("/producer")
    public String sendMessage(@RequestBody User user) {
        kafkaProducer.send(user);
        return "Message sent successfully to the Kafka topic shubham";
    }
   
    @PostMapping("/producerlist")
    public String sendMessage(@RequestBody List<User> user) {
        kafkaProducer.sendList(user);
        return "Message sent successfully to the Kafka topic shubham";
    }
}

pom.xml

XML

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        https://maven.apache.org/xsd/
                        maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>Producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>

aplicación.propiedades

servidor.puerto=1234

Crear Kafka Consumer y configurar la aplicación ElastisSearch

Cree otra aplicación Spring Boot llamada ElasticConsumer.

ComsumerApplication.java

Java

package com.example.demo;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.model.User;
import com.example.demo.service.KafkaUserService;
 
@SpringBootApplication
@RestController
public class ConsumerApplication {
    @Autowired
    KafkaUserService kafkaUserService;
   
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
   
    @KafkaListener(topics = "gfg", groupId = "gfg-group")
    public void listen(User user) {
        System.out.println("Received User information : " + user.toString());
        try {
            kafkaUserService.saveUser(user);
        } catch (Exception e) {
            e.printStackTrace();   
        }
    }
   
    @GetMapping("/getElasticUserFromKafka")
    public Iterable<User> findAllUser() {
        return kafkaUserService.findAllUsers();
    }
}

KafkaConsumerConfig.java

Java

package com.example.demo.config;
 
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.example.demo.model.User;
 
@EnableKafka
@Configuration
public class kafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "gfg-group");
        return new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
   
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User>
      kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
   
}

Usuario.java

@Documento : especifica nuestro índice.
@Id – representa el campo _id de nuestro documento y es único para cada mensaje.
@Field : representa un tipo diferente de campo que podría estar en nuestros datos.

Java

package com.example.demo.model;
 
import java.util.Date;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import com.google.gson.Gson;
 
@Document(indexName = "kafkauser")
public class User {
    @Id
    int id;
    @Field(type = FieldType.Text, name = "name")
    String name;
    @Field(type = FieldType.Date, name = "pdate")
    Date pdate;
    public User() {
        super();
    }
    public User(int id, String name, Date pdate) {
        super();
        this.id = id;
        this.name = name;
        this.pdate = pdate;
    }
    public Date getPdate() {
        return pdate;
    }
    public void setPdate(Date pdate) {
        this.pdate = pdate;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return new Gson().toJson(this);
    }
}

KafkaUserRepository.java

Java

package com.example.demo.repository;
 
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
import com.example.demo.model.User;
 
@Repository
public interface KafkaUserRepository extends ElasticsearchRepository<User,String> {
   
}

KafkaUserService.java

Java

package com.example.demo.service;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.example.demo.model.User;
import com.example.demo.repository.KafkaUserRepository;
 
@Service
public class KafkaUserService {
    @Autowired
    private KafkaUserRepository edao;
   
    public void saveUser(User user) {
        edao.save(user);
    }
   
    public Iterable<User> findAllUsers() {
    return edao.findAll();
    }   
}

¿Como correr?

Elastis-Buscar:

  • Ejecute elastissearch.bat usando cmd ->E:\elasticsearch-7.17.3-windows-x86_64\elasticsearch-7.17.3\bin> elasticsearch.bat
  • abra el navegador y escriba – http://localhost:9200

La salida se ve así:

{
«nombre»: «DESKTOP-S6UTE8M»,
»
cluster_name»: «elasticsearch», «cluster_uuid»: «VDlwyl2WQhCX7_lLwWm9Kg»,
«versión»: {
«número»: «7.17.3»,
«build_flavor»: «predeterminado»,
“build_type”: “zip”,
“build_hash”: “5ad023604c8d7416c9eb6c0eadb62b14e766caff”,
“build_date”: “2022-04-19T08:11:19.070913226Z”,
“build_snapshot”: false,
“lucene_version”: “8.11.1”,
“ minimal_wire_compatibility_version”: “6.8.0”,
“minimum_index_compatibility_version”: “6.0.0-beta1”
},
“tagline”: “Ya sabes, para la búsqueda”
}

Kafka: 

En Kafka primero tenemos que ejecutar zookeeper y luego kafkaserver. En Windows tenemos que ejecutar el archivo .bat y en el caso de Linux tenemos que ejecutar el archivo .sh 

  • Abrir comando 
  • Navegar bajo la carpeta kafka
  • E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\kafka-server-start.bat .\config\server.properties

pom.xml

XML

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath />
    </parent>
    <groupId>com.example</groupId>
    <artifactId>Consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
 
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
         
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Ejecutar la aplicación Spring de Producer y ElasticConsumer

Envía datos JSON usando cartero. Aquí, la aplicación Producer envía los datos a ElasticConsumer . Y en la consola de ElasticConsumer, los datos se imprimirían y guardarían en ElasticSearchDB en forma de JSON.

API de aplicaciones de productor:

  • Enviar objeto único -> http://localhost:1234/producer
  • Enviar lista de objetos -> http://localhost:1234/producerlist

 

API de la aplicación ElasticConsumer

  • Obtener todos los registros de elastic db -> localhost:8080/getElasticUserFromKafka

Tablero de Grafana

El panel de Grafana se ejecuta en http://localhots:3000. Mire el video de configuración a continuación.

Producción

Vídeo de salida 1:

Vídeo de salida 2:

Algunas API de ElasticSearch

  • Para mostrar los registros del índice -> http://localhost:9200/<index_name>/_search
  • Para eliminar el índice -> http://localhost:9200/<index_name>
  • Listar todos los índices -> http://localhost:9200/_cat/indices
  • Mostrar esquema de índice -> http://localhost:9200/<index_name>

Publicación traducida automáticamente

Artículo escrito por shubhamp338 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *