En este artículo, vamos a crear un programa para producir algunos datos utilizando Kafka Producer que consumirá Kafka Consumer y guardará en la base de datos de búsqueda elástica y, más adelante, trazará esos datos JSON en el tablero de Grafana. Comience con la configuración de todo el software y la herramienta necesarios.
Requisitos
- Java 8
- Arranque de primavera 2.7.0
- Kafka 3.2.0
- Búsqueda elástica DB 7.17.3
Nota: ¿Por qué mencionamos la versión?
La compatibilidad de versiones es de suma importancia en el caso de Elastic y Spring Boot. Si su versión de búsqueda elástica no coincide con la versión Spring Boot o viceversa, entonces tendrá problemas para configurar ambas. A continuación se muestra la lista de compatibilidad de versiones:
Búsqueda elástica de datos de primavera |
Búsqueda elástica |
Marco de primavera |
Bota de primavera |
---|---|---|---|
4.4.X |
7.17.3 |
5.3.X |
2.7.X |
4.3.X |
7.15.2 |
5.3.X |
2.6.X |
4.2.X |
7.12.0 |
5.3.X |
2.5.X |
4.1.X |
7.9.3 |
5.3.2 |
2.4.X |
4.0.X |
7.6.2 |
5.2.12 |
2.3.X |
3.2.X |
6.8.12 |
5.2.12 |
2.2.X |
3.1.X |
6.2.2 |
5.1.19 |
2.1.X |
3.0.X |
5.5.0 |
5.0.13 |
2.0.X |
2.1.X |
2.4.0 |
4.3.25 |
1.5.X |
Descargar enlaces:
- Búsqueda elástica: haga clic aquí
- Kafka – clic aquí
Descargar y extraer de su sistema.
Crear aplicación de productor de Kafka
Cree un proyecto de arranque de primavera llamado Producer .
ProducerApplication.class
Java
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }
KafkaProducerConfig.java
En esta clase, hemos proporcionado la configuración de Kafka.
Java
package com.example.demo.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import com.example.demo.model.User; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, User> userProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, User> userKafkaTemplate() { return new KafkaTemplate<>(userProducerFactory()); } }
Usuario.java
Clase de modelo donde almacenamos la información del usuario.
Java
package com.example.demo.model; public class User { int id; String name; String pdate; public User() { super(); } public User(int id, String name, String pdate) { super(); this.id = id; this.name = name; this.pdate = pdate; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPdate() { return pdate; } public void setPdate(String pdate) { this.pdate = pdate; } }
KafkaService.java
La clase de servicio usa la plantilla kafka para enviar los datos al consumidor.
Java
package com.example.demo.service; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import com.example.demo.model.User; @Service public class KafkaService { private final Logger LOG = LoggerFactory.getLogger(KafkaService.class); @Autowired private KafkaTemplate<String, User> kafkaTemplate; String kafkaTopic = "gfg"; public void send(User user) { LOG.info("Sending User Json Serializer : {}", user); kafkaTemplate.send(kafkaTopic, user); } public void sendList(List<User> userList) { LOG.info("Sending UserList Json Serializer : {}", userList); for (User user : userList) { kafkaTemplate.send(kafkaTopic, user); } } }
ProducerController.java
Java
package com.example.demo.controller; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import com.example.demo.model.User; import com.example.demo.service.KafkaService; @RestController public class ProducerController { @Autowired KafkaService kafkaProducer; @PostMapping("/producer") public String sendMessage(@RequestBody User user) { kafkaProducer.send(user); return "Message sent successfully to the Kafka topic shubham"; } @PostMapping("/producerlist") public String sendMessage(@RequestBody List<User> user) { kafkaProducer.sendList(user); return "Message sent successfully to the Kafka topic shubham"; } }
pom.xml
XML
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/ maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>Producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
aplicación.propiedades
servidor.puerto=1234
Crear Kafka Consumer y configurar la aplicación ElastisSearch
Cree otra aplicación Spring Boot llamada ElasticConsumer.
ComsumerApplication.java
Java
package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.example.demo.model.User; import com.example.demo.service.KafkaUserService; @SpringBootApplication @RestController public class ConsumerApplication { @Autowired KafkaUserService kafkaUserService; public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } @KafkaListener(topics = "gfg", groupId = "gfg-group") public void listen(User user) { System.out.println("Received User information : " + user.toString()); try { kafkaUserService.saveUser(user); } catch (Exception e) { e.printStackTrace(); } } @GetMapping("/getElasticUserFromKafka") public Iterable<User> findAllUser() { return kafkaUserService.findAllUsers(); } }
KafkaConsumerConfig.java
Java
package com.example.demo.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import com.example.demo.model.User; @EnableKafka @Configuration public class kafkaConsumerConfig { @Bean public ConsumerFactory<String, User> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "gfg-group"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Usuario.java
@Documento : especifica nuestro índice.
@Id – representa el campo _id de nuestro documento y es único para cada mensaje.
@Field : representa un tipo diferente de campo que podría estar en nuestros datos.
Java
package com.example.demo.model; import java.util.Date; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import com.google.gson.Gson; @Document(indexName = "kafkauser") public class User { @Id int id; @Field(type = FieldType.Text, name = "name") String name; @Field(type = FieldType.Date, name = "pdate") Date pdate; public User() { super(); } public User(int id, String name, Date pdate) { super(); this.id = id; this.name = name; this.pdate = pdate; } public Date getPdate() { return pdate; } public void setPdate(Date pdate) { this.pdate = pdate; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return new Gson().toJson(this); } }
KafkaUserRepository.java
Java
package com.example.demo.repository; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; import com.example.demo.model.User; @Repository public interface KafkaUserRepository extends ElasticsearchRepository<User,String> { }
KafkaUserService.java
Java
package com.example.demo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.demo.model.User; import com.example.demo.repository.KafkaUserRepository; @Service public class KafkaUserService { @Autowired private KafkaUserRepository edao; public void saveUser(User user) { edao.save(user); } public Iterable<User> findAllUsers() { return edao.findAll(); } }
¿Como correr?
Elastis-Buscar:
- Ejecute elastissearch.bat usando cmd ->E:\elasticsearch-7.17.3-windows-x86_64\elasticsearch-7.17.3\bin> elasticsearch.bat
- abra el navegador y escriba – http://localhost:9200
La salida se ve así:
{
«nombre»: «DESKTOP-S6UTE8M»,
»
cluster_name»: «elasticsearch», «cluster_uuid»: «VDlwyl2WQhCX7_lLwWm9Kg»,
«versión»: {
«número»: «7.17.3»,
«build_flavor»: «predeterminado»,
“build_type”: “zip”,
“build_hash”: “5ad023604c8d7416c9eb6c0eadb62b14e766caff”,
“build_date”: “2022-04-19T08:11:19.070913226Z”,
“build_snapshot”: false,
“lucene_version”: “8.11.1”,
“ minimal_wire_compatibility_version”: “6.8.0”,
“minimum_index_compatibility_version”: “6.0.0-beta1”
},
“tagline”: “Ya sabes, para la búsqueda”
}Kafka:
En Kafka primero tenemos que ejecutar zookeeper y luego kafkaserver. En Windows tenemos que ejecutar el archivo .bat y en el caso de Linux tenemos que ejecutar el archivo .sh
- Abrir comando
- Navegar bajo la carpeta kafka
- E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- E:\kafka_2.12-3.2.0\kafka_2.12-3.2.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
pom.xml
XML
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> <relativePath /> </parent> <groupId>com.example</groupId> <artifactId>Consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Ejecutar la aplicación Spring de Producer y ElasticConsumer
Envía datos JSON usando cartero. Aquí, la aplicación Producer envía los datos a ElasticConsumer . Y en la consola de ElasticConsumer, los datos se imprimirían y guardarían en ElasticSearchDB en forma de JSON.
API de aplicaciones de productor:
- Enviar objeto único -> http://localhost:1234/producer
- Enviar lista de objetos -> http://localhost:1234/producerlist
API de la aplicación ElasticConsumer
- Obtener todos los registros de elastic db -> localhost:8080/getElasticUserFromKafka
Tablero de Grafana
El panel de Grafana se ejecuta en http://localhots:3000. Mire el video de configuración a continuación.
Producción
Vídeo de salida 1:
Vídeo de salida 2:
Algunas API de ElasticSearch
- Para mostrar los registros del índice -> http://localhost:9200/<index_name>/_search
- Para eliminar el índice -> http://localhost:9200/<index_name>
- Listar todos los índices -> http://localhost:9200/_cat/indices
- Mostrar esquema de índice -> http://localhost:9200/<index_name>
Publicación traducida automáticamente
Artículo escrito por shubhamp338 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA