Esta POC describe el procedimiento de alimentación de datos en formato JSON a un tema de Kafka mediante Kafka REST Proxy, que proporciona una interfaz RESTful a un clúster de Kafka.
requisitos previos:
Antes de iniciar este procedimiento, asegúrese de que:
- Acceso administrativo para ejecutar Kafka VM y esa VM debe tener conectividad como se describe en los requisitos previos de carga.
- Identifique y anote el nombre de host y el puerto de Zoo-Keeper.
- Identifique y anote el nombre de host y el puerto de los agentes de Kafka.
- Identifique y anote el nombre de host y el puerto del Kafka Rest Proxy.
Nota: Este procedimiento asume que ha instalado la distribución de Apache Kafka. Si está utilizando una distribución de Kafka diferente, es posible que deba ajustar ciertos comandos en el procedimiento.
Aquí, en este caso de uso, hemos configurado nombres de host y puertos con lo siguiente
- descanso-proxy localhost: 8082
- anfitrión local del cuidador del zoológico: 2182
- servidor de arranque localhost: 9095
Procedimiento para alimentar datos JSON a Kafka Topic:
Paso 1: Inicie sesión en un host en su Kafka VM.
$ cd kafka_2.12-2.4.0 /*if this directory does not exit, Use ls command to view the folder and copy/paste the existing folder*/
Para enumerar todos los temas que están presentes dentro de los temas de Kafka, use el siguiente cmd
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To check/verify and to display all the topics*/
Paso 2: crea un tema de Kafka. Aquí, cree un tema llamado «topic-test-1» con una sola partición y solo una réplica:
Por ejemplo:
$ bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic topic-test-1
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To verify or to list out the created topic*/
Paso 3: crea un archivo JSON. Cree un archivo llamado sample-json-data.json en el editor de su elección.
Por ejemplo:
$ vi sample-json-data.json
luego, pegue un texto en formato json y agréguelo a un archivo, y luego guarde el archivo y salga
Por ejemplo:
{ "first_name": "Tom", "last_name": "Cruze", "email": "cruze@gmail.com", "gender": "Male", "ip_address": "1.2.3.4" }
Paso 4: transmitir el contenido del archivo json a un productor de consola Kafka
$ bin/kafka-console-producer.sh --broker-list localhost:9095 --topic topic-test-1 < sample-json-data.json
Paso 5: para verificar que el productor de la consola Kafka publicó los mensajes en el tema mediante la ejecución de un consumidor de la consola Kafka
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
Paso 6: transmitir el contenido del otro archivo JSON a un productor de consola Kafka
Por ejemplo:
$ vi sample.json
luego, pegue un texto en formato JSON y agréguelo a un archivo, y luego guarde el archivo y salga
{ «cliente_id»: 1313131, «mes»: 12, «gastos»: 1313.13 }
{ «cliente_id»: 3535353, «mes»: 11, «gastos»: 761,35 }
{ «cliente_id»: 7979797, «mes»: 10, «gastos»: 4489.00 }
{ «cliente_id»: 7979797, «mes»: 11, «gastos»: 18,72 }
{ «cliente_id»: 3535353, «mes»: 10, «gastos»: 6001.94 }
{ “cliente_id”: 7979797, “mes”: 12, “gastos”: 173,18 }
{ «cliente_id»: 1313131, «mes»: 10, «gastos»: 492,83 }
{ «cliente_id»: 3535353, «mes»: 12, «gastos»: 81,12 }
{ «cliente_id»: 1313131, «mes»: 11, «gastos»: 368,27 }
Proxy REST de Kafka:
El proxy REST de Kafka proporciona una interfaz RESTful a un clúster de Kafka. Facilita la producción y el consumo de mensajes, la visualización del estado del clúster y la realización de acciones administrativas sin utilizar el protocolo o los clientes nativos de Kafka.
Para obtener la lista de temas usando curl
$ curl "http://localhost:8082/topics"
Para obtener la información de un tema
$ curl http://localhost:8082/topics/<menction topic name>
Por ejemplo:
$ curl "http://localhost:8082/topics/topic-test-1"
Paso 1: producir un mensaje usando JSON con un valor para un tema
Por ejemplo, para producir un mensaje usando JSON con un valor ‘{ “mes”: 12}’ para el tema topic-test-1
$curl -X POST -H “Tipo de contenido: aplicación/vnd.kafka.json.v2+json” \
-H “Aceptar: aplicación/vnd.kafka.v2+json” \
–data ‘{“registros”:[{“valor”:{“mes”: 12}}]}’ “http://localhost:8082/topics/topic-test-1”
/*Salida esperada del comando anterior*/
{
“desplazamientos”:[{“partición”:0,”desplazamiento”:16,”código_error”:null,”error”:null}],”key_schema_id”:null,”value_schema_id”:null
}
Para verificar que el productor de la consola Kafka publicó los mensajes en el tema mediante la ejecución de un consumidor de la consola Kafka
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
Paso 2: Cree un consumidor para datos JSON, comenzando desde el principio del tema
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/my_json_consumer /* Expected output from preceding command*/ { "instance_id":"my_consumer_instance", "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance" OR "base_uri":"http://rest-proxy:8082/consumers/my_json_consumer/instances/my_consumer_instance" }
Paso 3: Inicie sesión y suscríbase a un tema.
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topic-test-1"]}' \ http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription /* Expected output from preceding command*/ # No content in response
Paso 4: Para consumir algunos datos usando la URL base en la primera respuesta.
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Pasos opcionales:
Paso 1: Finalmente, cerrar el consumidor con un DELETE para que abandone el grupo y limpie sus recursos.
$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance /* Expected output from preceding command*/ # No content in response
Paso 2: verifique la instancia del consumidor usando el siguiente comando
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records /* Expected output from preceding command*/ { “error_code”: 40403, “message”: “Consumer instance not found.” }
Publicación traducida automáticamente
Artículo escrito por dey0btpch57lmvgz5mqhpaiqn337p09fd8yq1lw4 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA