Como ler dados de Kafka com Python

Como ler dados de Kafka com Python
Kafka é um sistema de mensagens distribuído de código aberto para enviar a mensagem em tópicos particionados e diferentes. O streaming de dados em tempo real pode ser implementado usando Kafka para receber dados entre os aplicativos. Tem três partes principais. Estes são produtores, consumidores e tópicos. O produtor é usado para enviar uma mensagem para um tópico específico e cada mensagem é anexada com uma chave. O consumidor é usado para ler uma mensagem sobre um tópico específico do conjunto de partições. Os dados recebidos do produtor e armazenados nas partições com base em um tópico específico. Muitas bibliotecas existem em Python para criar produtor e consumidor para construir um sistema de mensagens usando Kafka. Como os dados de Kafka podem ser lidos usando Python são mostrados neste tutorial.

Pré -requisito

Você tem que instalar a biblioteca Python necessária para ler dados de Kafka. O Python3 é usado neste tutorial para escrever o script do consumidor e do produtor. Se o pacote PIP não estiver instalado antes no seu sistema operacional Linux, você precisará instalar o PIP antes de instalar a biblioteca Kafka para Python. Python3-kafka é usado neste tutorial para ler dados de Kafka. Execute o seguinte comando para instalar a biblioteca.

$ pip install python3-kafka

Lendo dados de texto simples de Kafka

Diferentes tipos de dados podem ser enviados do produtor em um tópico específico que pode ser lido pelo consumidor. Como um simples dados de texto pode ser enviado e recebido de Kafka usando produtor e consumidor é mostrado nesta parte deste tutorial.

Crie um arquivo chamado Produtor1.py com o seguinte script python. Kafkaproducedor O módulo é importado da biblioteca Kafka. A lista de corretores precisa definir no momento da inicialização do objeto do produtor para se conectar com o servidor Kafka. A porta padrão de Kafka é '9092'. O argumento do bootstrap_servers é usado para definir o nome do host com a porta. 'First_topic'é definido como um nome de tópico pelo qual a mensagem de texto será enviada do produtor. Em seguida, uma mensagem de texto simples, 'Olá de Kafka'é enviado usando enviar() método de Kafkaproducedor para o tópico, 'First_topic'.

Produtor1.PY:

# Importar kafkaproduces da biblioteca Kafka
De Kafka Import Kafkaproduce
# Defina servidor com porta
bootstrap_servers = ['localhost: 9092']
# Defina o nome do tópico onde a mensagem publicará
tópicoName = 'First_topic'
# Inicialize a variável do produtor
Produtor = Kafkaproduce (bootstrap_servers = bootstrap_servers)
# Publique texto em tópico definido
produtor.Send (tópiconame, b'hello de kafka… ')
# Mensagem de impressão
print ("mensagem enviada")

Crie um arquivo chamado Consumer1.py com o seguinte script python. Kafkaconsumer O módulo é importado da biblioteca Kafka para ler dados de Kafka. sys O módulo é usado aqui para encerrar o script. O mesmo nome de host e número da porta do produtor são usados ​​no script do consumidor para ler dados de Kafka. O nome do tópico do consumidor e do produtor deve ser o mesmo que é 'First_topic'. Em seguida, o objeto de consumo é inicializado com os três argumentos. Nome do tópico, ID do grupo e informações do servidor. para O loop é usado aqui para ler o texto enviado do produtor Kafka.

Consumer1.PY:

# Importar kafkaconsumer da biblioteca Kafka
De Kafka Import Kafkaconsumer
# Importar módulo SYS
Importar sistemas
# Defina servidor com porta
bootstrap_servers = ['localhost: 9092']
# Defina o nome do tópico de onde a mensagem receberá
tópicoName = 'First_topic'
# Inicialize a variável do consumidor
consumer = kafkaconsumer (tópiconame, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Leia e imprima mensagem do consumidor
para MSG no consumidor:
print ("Nome do tópico =%s, mensagem =%s"%(msg.Tópico, Msg.valor))
# Encerrar o script
sys.saída()

Saída:

Execute o seguinte comando de um terminal para executar o script do produtor.

$ python3 produtor1.py

A saída a seguir será exibida após o envio da mensagem.

Execute o seguinte comando de outro terminal para executar o script do consumidor.

$ python3 Consumer1.py

A saída mostra o nome do tópico e a mensagem de texto enviada do produtor.

Lendo dados formatados JSON de Kafka

JSON Formatted Data pode ser enviado pelo produtor Kafka e lido pelo consumidor Kafka usando o json Módulo de Python. Como os dados do JSON podem ser serializados e des-serrializados antes de enviar e receber os dados usando o módulo Python-kafka é mostrado nesta parte deste tutorial.

Crie um script python nomeado Produtor2.py com o seguinte script. Outro módulo chamado JSON é importado com Kafkaproducedor módulo aqui. value_serializer argumento é usado com bootstrap_servers Argumento aqui para inicializar o objeto do produtor Kafka. Este argumento indica que os dados JSON serão codificados usando 'UTF-8'Personagem definido no momento do envio. Em seguida, os dados formatados JSON são enviados ao tópico nomeado JSONTOPO.

Produtor2.PY:

# Importar kafkaproduces da biblioteca Kafka
De Kafka Import Kafkaproduce
# Importar módulo JSON para serializar dados
importar json
# Inicialize a variável do produtor e o parâmetro definido para JSON Encode
Produtor = Kafkaproduce (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.despejos (V).Encode ('UTF-8'))
# Envie dados no formato JSON
produtor.Send ('JSONTOPIC', 'Nome': 'Fahmida', 'Email': '[email protected] ')
# Mensagem de impressão
Print ("Mensagem enviada para JSONTOPICO")

Crie um script python nomeado Consumer2.py com o seguinte script. Kafkaconsumer, sys e os módulos JSON são importados neste script. Kafkaconsumer O módulo é usado para ler dados formatados JSON do Kafka. O módulo JSON é usado para decodificar os dados JSON codificados Enviar do produtor Kafka. Sys O módulo é usado para encerrar o script. value_deserializer argumento é usado com bootstrap_servers Para definir como os dados JSON serão decodificados. Próximo, para O loop é usado para imprimir todos os registros do consumidor e dados JSON recuperados de Kafka.

Consumer2.PY:

# Importar kafkaconsumer da biblioteca Kafka
De Kafka Import Kafkaconsumer
# Importar módulo SYS
Importar sistemas
# Importar módulo JSON para serializar dados
importar json
# Inicialize a variável do consumidor e defina a propriedade para decodificar JSON
Consumer = Kafkaconsumer ('JSONTOPICO', BOOTSTRAP_SERVERS = ['localhost: 9092'],
value_deserializer = lambda m: json.Cargas (m.Decode ('UTF-8')))
# Leia dados de Kafka
Para mensagem no consumidor:
Print ("Registros do consumidor: \ n")
Imprimir (mensagem)
print ("\ nreading de JSON Data \ n")
print ("Nome:", mensagem [6] ['nome']))
print ("Email:", mensagem [6] ['email']))
# Encerrar o script
sys.saída()

Saída:

Execute o seguinte comando de um terminal para executar o script do produtor.

$ python3 produtor2.py

O script imprimirá a seguinte mensagem depois de enviar os dados JSON.

Execute o seguinte comando de outro terminal para executar o script do consumidor.

$ python3 Consumer2.py

A saída a seguir aparecerá após a execução do script.

Conclusão:

Os dados podem ser enviados e recebidos em diferentes formatos de Kafka usando Python. Os dados também podem ser armazenados no banco de dados e recuperados do banco de dados usando Kafka e Python. Eu estou em casa, este tutorial ajudará o usuário do Python a começar a trabalhar com Kafka.