Hvordan lese data fra Kafka med Python

Hvordan lese data fra Kafka med Python
Kafka er et åpen kildekode distribuert meldingssystem for å sende meldingen i partisjonerte og forskjellige emner. Data streaming i sanntid kan implementeres ved å bruke Kafka for å motta data mellom applikasjonene. Den har tre hoveddeler. Disse er produsent, forbruker og emner. Produsenten brukes til å sende en melding til et bestemt emne, og hver melding er vedlagt med en nøkkel. Forbrukeren brukes til å lese en melding om et bestemt tema fra settet med partisjoner. Dataene mottatt fra produsenten og lagret på partisjonene basert på et bestemt emne. Mange biblioteker finnes i Python for å opprette produsent og forbruker for å bygge et meldingssystem ved hjelp av Kafka. Hvordan dataene fra Kafka kan leses ved hjelp av Python vises i denne opplæringen.

Forutsetning

Du må installere det nødvendige Python -biblioteket for å lese data fra Kafka. Python3 brukes i denne opplæringen for å skrive manuset til forbruker og produsent. Hvis PIP -pakken ikke er installert før i Linux -operativsystemet ditt, må du installere PIP før du installerer Kafka -biblioteket for Python. Python3-Kafka brukes i denne opplæringen for å lese data fra Kafka. Kjør følgende kommando for å installere biblioteket.

$ pip installer python3-kafka

Lese enkle tekstdata fra Kafka

Ulike typer data kan sendes fra produsenten om et bestemt emne som kan leses av forbrukeren. Hvordan en enkel tekstdata kan sendes og mottas fra Kafka ved hjelp av produsent og forbruker vises i denne delen av denne opplæringen.

Lag en fil som heter Produsent1.py Med følgende Python -skript. Kafkaproducer Modulen importeres fra Kafka -biblioteket. Meglerlisten må definere på tidspunktet for initialisering av produsentobjekter for å koble til Kafka -serveren. Standardporten til Kafka er '9092'. bootstrap_server -argument brukes til å definere vertsnavnet med porten. 'FIRST_TOPIC'er satt som et emnavn som tekstmelding vil bli sendt fra produsenten. Deretter en enkel tekstmelding, 'Hei fra Kafka'blir sendt ved hjelp av sende() Metode av Kafkaproducer til emnet, 'FIRST_TOPIC'.

Produsent1.PY:

# Importer Kafkaproducer fra Kafka Library
Fra Kafka import Kafkaproducer
# Definer server med port
bootstrap_servers = ['localhost: 9092']
# Definer emnenavn der meldingen vil publisere
TopicName = 'First_topic'
# Initialiser produsentvariabel
Produsent = Kafkaproducer (bootstrap_servers = bootstrap_servers)
# Publiser tekst i definert emne
produsent.Send (TopicName, B'hello fra Kafka ... ')
# Skriv ut melding
trykk ("Melding sendt")

Lag en fil som heter forbruker1.py Med følgende Python -skript. Kafkaconsumer Modulen importeres fra Kafka -biblioteket for å lese data fra Kafka. sys Modul brukes her for å avslutte skriptet. Det samme vertsnavnet og portnummeret til produsenten brukes i skriptet til forbrukeren for å lese data fra Kafka. Emnetavnet til forbrukeren og produsenten må være det samme som er 'FIRST_TOPIC'. Deretter initialiseres forbrukerobjektet med de tre argumentene. Emnetavn, gruppe -ID og serverinformasjon. til Loop brukes her til å lese teksten Send fra Kafka -produsenten.

forbruker1.PY:

# Importer Kafkaconsumer fra Kafka Library
Fra Kafka import Kafkaconsumer
# Importer SYS -modul
Importer Sys
# Definer server med port
bootstrap_servers = ['localhost: 9092']
# Definer emnenavn hvor meldingen vil motta
TopicName = 'First_topic'
# Initialiser forbrukervariabel
Consumer = KafkaConsumer (TopicName, Group_ID = 'Group1', Bootstrap_Servers =
bootstrap_servers)
# Les og skriv ut melding fra forbruker
For MSG i forbruker:
Print ("Emnenavn =%S, Melding =%S"%(MSG.Tema, msg.verdi))
# Avslutte skriptet
sys.exit()

Produksjon:

Kjør følgende kommando fra en terminal for å utføre produsentskriptet.

$ Python3 Produsent1.py

Følgende utdata vises etter å ha sendt meldingen.

Kjør følgende kommando fra en annen terminal for å utføre forbrukerskriptet.

$ Python3 Consumer1.py

Utgangen viser emnetavnet og tekstmeldingen sendt fra produsenten.

Lese JSON formaterte data fra Kafka

JSON -formaterte data kan sendes av Kafka -produsenten og leses av Kafka -forbruker ved hjelp av JSON Modul av Python. Hvordan JSON-data kan serialiseres og de-serialiseres før du sender og mottar dataene ved hjelp av Python-Kafka-modulen vises i denne delen av denne opplæringen.

Lag et Python -skript som heter Produsent2.py med følgende skript. En annen modul som heter JSON importeres med Kafkaproducer modul her. Value_serializer Argument brukes med bootstrap_servers argument her for å initialisere objektet til Kafka -produsenten. Dette argumentet indikerer at JSON -data vil bli kodet ved hjelp av 'UTF-8'Karakter satt på sendingstidspunktet. Neste, JSON formaterte data blir sendt til emnet som heter JSontopic.

Produsent2.PY:

# Importer Kafkaproducer fra Kafka Library
Fra Kafka import Kafkaproducer
# Importer JSON -modul for å serialisere data
Importer JSON
# Initialiser produsentvariabel og sett parameter for JSON Encode
Produsent = Kafkaproducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.dumper (v).kode ('UTF-8'))
# Send data i JSON -format
produsent.Send ('JSontopic', 'Navn': 'Fahmida', 'E -post': '[email protected] ')
# Skriv ut melding
trykk ("Melding sendt til JSontopic")

Lag et Python -skript som heter forbruker2.py med følgende skript. Kafkaconsumer, sys og JSON -moduler importeres i dette skriptet. Kafkaconsumer Modul brukes til å lese JSON -formaterte data fra Kafka. JSON -modul brukes til å avkode de kodede JSON -dataene som sendes fra Kafka -produsenten. Sys Modul brukes til å avslutte skriptet. Value_deserializer Argument brukes med bootstrap_servers for å definere hvordan JSON -data vil bli dekodet. NESTE, til Loop brukes til å skrive ut alle forbrukerposter og JSON -data hentet fra Kafka.

forbruker2.PY:

# Importer Kafkaconsumer fra Kafka Library
Fra Kafka import Kafkaconsumer
# Importer SYS -modul
Importer Sys
# Importer JSON -modul for å serialisere data
Importer JSON
# Initialiser forbrukervariabel og sett eiendom for JSON -dekode
Forbruker = KafkaConsumer ('JSontopic', bootstrap_servers = ['localhost: 9092'],
Value_Deserializer = Lambda M: JSON.belastninger (m.dekode ('UTF-8')))
# Les data fra Kafka
For melding i forbruker:
Print ("Consumer Records: \ n")
trykk (melding)
print ("\ nreading from json data \ n")
print ("Navn:", melding [6] ['Navn'])
Skriv ut ("E -post:", melding [6] ['e -post'])
# Avslutte skriptet
sys.exit()

Produksjon:

Kjør følgende kommando fra en terminal for å utføre produsentskriptet.

$ Python3 Produsent2.py

Skriptet skriver ut følgende melding etter å ha sendt JSON -dataene.

Kjør følgende kommando fra en annen terminal for å utføre forbrukerskriptet.

$ Python3 Consumer2.py

Følgende utgang vises etter å ha kjørt skriptet.

Konklusjon:

Dataene kan sendes og mottas i forskjellige formater fra Kafka ved hjelp av Python. Dataene kan også lagres i databasen og hentes fra databasen ved hjelp av Kafka og Python. Jeg hjemme, denne opplæringen vil hjelpe Python -brukeren til å begynne å jobbe med Kafka.