Introdução com Apache Kafka e Python

Introdução com Apache Kafka e Python
Nesta lição, veremos como podemos usar o Apache Kafka com Python e fazer um aplicativo de amostra usando o cliente Python para Apache Kafka.

Para concluir esta lição, você deve ter uma instalação ativa para Kafka em sua máquina. Leia Instale o Apache Kafka no Ubuntu para saber como fazer isso.

Instalando o cliente Python para Apache Kafka

Antes de começarmos a trabalhar com o Apache Kafka no programa Python, precisamos instalar o cliente Python para Apache Kafka. Isso pode ser feito usando pip (Índice de pacote Python). Aqui está um comando para conseguir isso:

PIP3 Instale Kafka-Python

Esta será uma instalação rápida no terminal:

Instalação do cliente Python Kafka usando PIP

Agora que temos uma instalação ativa para o Apache Kafka e também instalamos o cliente Python Kafka, estamos prontos para começar a codificar.

Fazendo um produtor

A primeira coisa a ter que publicar mensagens no Kafka é um aplicativo de produtor que pode enviar mensagens para tópicos em Kafka.

Observe que os produtores de Kafka são produtores de mensagens assíncronos. Isso significa que as operações realizadas enquanto uma mensagem é publicada na Kafka Topic Partition são não bloqueando. Para simplificar as coisas, escreveremos o JSON Publisher simples para esta lição.

Para começar, faça uma instância para o produtor Kafka:

De Kafka Import Kafkaproduce
importar json
importar pprint
Produtor = Kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.despejos (V).Encode ('UTF-8'))

O atributo bootstrap_servers informa sobre o host e porta para o servidor kafka. O atributo value_serializer é apenas para fins de serialização de JSON dos valores JSON encontrados.

Para brincar com o produtor Kafka, vamos tentar imprimir as métricas relacionadas ao produtor e ao cluster Kafka:

Métricas = Produtor.Métricas()
pprint.pprint (métricas)

Veremos o seguinte agora:

Kafka Mterics

Agora, vamos finalmente tentar enviar alguma mensagem para a fila Kafka. Um objeto JSON simples será um bom exemplo:

produtor.send ('linuxhint', 'tópico': 'kafka')

O Linuxhint é a partição do tópico sobre a qual o objeto JSON será enviado. Quando você executa o script, você não obterá nenhuma saída, pois a mensagem é enviada para a partição do tópico. É hora de escrever um consumidor para que possamos testar nosso aplicativo.

Fazendo um consumidor

Agora, estamos prontos para fazer uma nova conexão como aplicativo de consumidor e obter as mensagens do tópico Kafka. Comece fazendo uma nova instância para o consumidor:

De Kafka Import Kafkaconsumer
De Kafka Import Topicpartition
Imprimir ('Fazendo conexão.')
consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')

Agora, atribua um tópico a esta conexão e um possível valor de deslocamento também.

Imprimir ('atribuir tópico.')
consumidor.Atribuir ([TopicPartition ('Linuxhint', 2)])

Finalmente, estamos prontos para imprimir o MSSAGE:

Imprima ('Recebendo mensagem.')
Para mensagem no consumidor:
print ("deslocamento:" + str (mensagem [0]) + "\ t msg:" + str (mensagem))

Com isso, obteremos uma lista de todas as mensagens publicadas sobre a partição de tópicos do consumidor Kafka. A saída para este programa será:

Consumidor de kafka

Apenas para uma referência rápida, aqui está o script completo do produtor:

De Kafka Import Kafkaproduce
importar json
importar pprint
Produtor = Kafkaproducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.despejos (V).Encode ('UTF-8'))
produtor.send ('linuxhint', 'tópico': 'kafka')
# métricas = produtor.Métricas()
# pprint.pprint (métricas)

E aqui está o programa completo do consumidor que usamos:

De Kafka Import Kafkaconsumer
De Kafka Import Topicpartition
Imprimir ('Fazendo conexão.')
consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Imprimir ('atribuir tópico.')
consumidor.Atribuir ([TopicPartition ('Linuxhint', 2)])
Imprima ('Recebendo mensagem.')
Para mensagem no consumidor:
print ("deslocamento:" + str (mensagem [0]) + "\ t msg:" + str (mensagem))

Conclusão

Nesta lição, analisamos como podemos instalar e começar a usar o Apache Kafka em nossos programas Python. Mostramos como é fácil executar tarefas simples relacionadas a Kafka em Python com o cliente Kafka demonstrado para Python.