Jak odczytać dane z Kafki z Pythonem

Jak odczytać dane z Kafki z Pythonem
Kafka to rozproszony system przesyłania wiadomości otwartych do wysyłania wiadomości w partycjonowanych i różnych tematach. Streaming danych w czasie rzeczywistym można wdrożyć za pomocą Kafka do odbierania danych między aplikacjami. Ma trzy główne części. Są to producent, konsumenci i tematy. Producent służy do wysyłania wiadomości do określonego tematu, a każda wiadomość jest dołączona do klucza. Konsument służy do odczytania wiadomości na określony temat z zestawu partycji. Dane otrzymane od producenta i przechowywane na partycjach na podstawie konkretnego tematu. W Python istnieje wiele bibliotek, aby stworzyć producenta i konsumenta, aby zbudować system przesyłania wiadomości za pomocą Kafki. W tym samouczku pokazano, jak dane z Kafka można odczytać za pomocą Pythona.

Warunek wstępny

Musisz zainstalować niezbędną bibliotekę Python do odczytu danych z Kafki. Python3 jest używany w tym samouczku do napisania scenariusza konsumenta i producenta. Jeśli pakiet PIP nie jest wcześniej zainstalowany w systemie operacyjnym Linux, musisz zainstalować PIP przed zainstalowaniem biblioteki Kafka dla Pythona. Python3-kafka jest używany w tym samouczku do odczytu danych z Kafki. Uruchom następujące polecenie, aby zainstalować bibliotekę.

$ pip instaluj Python3-kafka

Czytanie prostych danych tekstowych z Kafka

Różne typy danych można wysłać od producenta na określony temat, który może być odczytany przez konsumenta. Jak można wysłać i odbierać proste dane tekstowe od Kafki za pomocą producenta i konsumenta w tej części tego samouczka.

Utwórz plik o nazwie producent1.py z następującym skryptem Python. Kafkaproducer Moduł jest importowany z biblioteki Kafka. Lista brokerów musi zdefiniować w momencie inicjalizacji obiektu producenta, aby połączyć się z serwerem Kafka. Domyślny port Kafki to '9092'. Argument bootstrap_servers służy do definiowania nazwy hosta za pomocą portu. 'First_topic'jest ustawione jako nazwa tematu, według której wiadomość tekstowa zostanie wysłana od producenta. Następnie prosta wiadomość tekstowa ”Witam z Kafki'jest wysyłany za pomocą wysłać() metoda Kafkaproducer do tematu ”First_topic'.

producent1.PY:

# Importuj kafkaproducer z biblioteki Kafka
z Kafka import Kafkaproducer
# Zdefiniuj serwer z portem
bootstrap_servers = ['localHost: 9092']
# Zdefiniuj nazwę tematu, gdzie komunikat opublikuje
TOUPTNAME = „FIRST_TOPIC”
# Zainicjuj zmienną producenta
producent = kafkaproducer (bootstrap_servers = bootstrap_servers)
# Opublikuj tekst w określonym temacie
producent.Wyślij (ThematName, b'hello z Kafka… ')
# Wydrukuj wiadomość
drukuj („wysłana wiadomość”)

Utwórz plik o nazwie Konsument1.py z następującym skryptem Python. Kafkaconsumer Moduł jest importowany z biblioteki Kafka do odczytu danych z Kafka. Sys Moduł jest używany tutaj do zakończenia skryptu. Ta sama nazwa hosta i numer portu producenta są używane w skrypcie konsumenta do odczytu danych z Kafka. Nazwa tematu konsumenta i producenta musi być taka sama, jaka jestFirst_topic'. Następnie obiekt konsumencki jest inicjowany z trzema argumentami. Nazwa tematu, identyfikator grupy i informacje o serwerze. Do Pętla jest używana tutaj do odczytania tekstu wysyłania od producenta Kafka.

Konsument1.PY:

# Importuj Kafkaconsumer z biblioteki Kafka
z Kafka import Kafkaconsumer
# Importuj moduł SYS
Import Sys
# Zdefiniuj serwer z portem
bootstrap_servers = ['localHost: 9092']
# Zdefiniuj nazwę tematu, skąd otrzyma wiadomość
TOUPTNAME = „FIRST_TOPIC”
# Zainicjuj zmienną konsumpcyjną
Consumer = kafkaconsumer (TematName, grupa_id = 'grupa1', bootstrap_servers =
bootstrap_servers)
# Przeczytaj i wydrukuj wiadomość od konsumenta
Dla MSG w konsumentach:
drukuj („nazwa tematu =%s, komunikat =%s”%(MSG.Temat, MSG.wartość))
# Zakończ skrypt
Sys.Wyjście()

Wyjście:

Uruchom następujące polecenie z jednego terminału, aby wykonać skrypt producenta.

$ Python3 producent1.py

Następujące dane wyjściowe pojawią się po wysłaniu wiadomości.

Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.

$ Python3 Consumer1.py

Dane wyjściowe pokazuje nazwę tematu i wiadomość tekstową wysłaną od producenta.

Czytanie danych sformatowanych JSON z Kafki

Dane sformatowane JSON mogą być wysyłane przez producenta Kafka i odczytanie przez Kafka Consument za pomocą JSON Moduł Pythona. W jaki sposób dane JSON można serializować i deserializować przed wysłaniem i odbieraniem danych za pomocą modułu Python-Kafka, jest pokazana w tej części tego samouczka.

Utwórz skrypt Python o nazwie producent2.py z następującym skryptem. Kolejny moduł o nazwie JSON jest importowany Kafkaproducer Moduł tutaj. value_serializer argument jest używany z bootstrap_servers Argument tutaj, aby zainicjować obiekt producenta Kafka. Ten argument wskazuje, że dane JSON zostaną zakodowane za pomocą 'UTF-8„Zestaw postaci w momencie wysyłania. Następnie dane sformatowane JSON są wysyłane do nazywanego tematu Jsontopic.

producent2.PY:

# Importuj kafkaproducer z biblioteki Kafka
z Kafka import Kafkaproducer
# Zaimportuj moduł JSON do serializacji danych
Importuj JSON
# Zainicjuj zmienną producenta i ustaw parametr dla JSON ENCODE
producent = kafkaproducer (bootstrap_servers =
['LocalHost: 9092'], value_serializer = Lambda V: JSON.Zrzuty (v).enkoduj („UTF-8”))
# Wyślij dane w formacie JSON
producent.Wyślij („jsontopic”, „nazwa”: „fahmida”, „e -mail”: „[email protected] ')
# Wydrukuj wiadomość
Drukuj („Wiadomość wysłana do Jsontopic”)

Utwórz skrypt Python o nazwie Konsument2.py z następującym skryptem. Kafkaconsumer, Sys a moduły JSON są importowane do tego skryptu. Kafkaconsumer Moduł służy do odczytu danych sformatowanych JSON z kafki. Moduł JSON służy do dekodowania zakodowanych danych JSON wysyłanych od producenta Kafka. Sys Moduł służy do zakończenia skryptu. value_deserializer argument jest używany z bootstrap_servers Aby zdefiniować, w jaki sposób dane JSON zostaną dekodowane. Następny, Do Pętla służy do drukowania wszystkich rekordów konsumentów i danych JSON pobranych z Kafki.

Konsument2.PY:

# Importuj Kafkaconsumer z biblioteki Kafka
z Kafka import Kafkaconsumer
# Importuj moduł SYS
Import Sys
# Zaimportuj moduł JSON do serializacji danych
Importuj JSON
# Zainicjuj zmienną konsumpcyjną i ustaw właściwość dla JSON Decode
Consumer = Kafkaconsumer („Jsontopic”, bootstrap_servers = ['localHost: 9092'],
value_deserializer = Lambda M: JSON.ładunki (m.dekoduj ('utf-8')))
# Przeczytaj dane z Kafka
W przypadku wiadomości w konsumentach:
Drukuj („Rekordy konsumenckie: \ n”)
Drukuj (wiadomość)
Drukuj („\ nReading z danych JSON \ n”)
print („Nazwa:”, wiadomość [6] [„Nazwa”])
drukuj („e -mail:”, wiadomość [6] [„e -mail”])
# Zakończ skrypt
Sys.Wyjście()

Wyjście:

Uruchom następujące polecenie z jednego terminału, aby wykonać skrypt producenta.

$ Python3 producent2.py

Skrypt wydrukuje następującą wiadomość po wysłaniu danych JSON.

Uruchom następujące polecenie z innego terminala, aby wykonać skrypt konsumencki.

$ Python3 Consumer2.py

Następujące dane wyjściowe pojawią się po uruchomieniu skryptu.

Wniosek:

Dane mogą być wysyłane i odbierane w różnych formatach od Kafka za pomocą Pythona. Dane można również przechowywać w bazie danych i odzyskać z bazy danych za pomocą Kafka i Pythona. I HOME, ten samouczek pomoże użytkownikowi Python rozpocząć pracę z Kafką.