Aby zakończyć tę lekcję, musisz mieć aktywną instalację dla Kafki na swoim komputerze. Przeczytaj zainstaluj Apache Kafka na Ubuntu, aby wiedzieć, jak to zrobić.
Instalowanie klienta Python dla Apache Kafka
Zanim zaczniemy pracować z Apache Kafka w programie Python, musimy zainstalować klienta Python dla Apache Kafka. Można to zrobić za pomocą pypeć (Wskaźnik pakietów Python). Oto polecenie, aby to osiągnąć:
PIP3 Zainstaluj Kafka-Python
Będzie to szybka instalacja na terminalu:
Instalacja klienta Python Kafka za pomocą PIP
Teraz, gdy mamy aktywną instalację dla Apache Kafka i zainstalowaliśmy również klienta Python Kafka, jesteśmy gotowi rozpocząć kodowanie.
Tworzenie producenta
Pierwszą rzeczą, aby opublikować wiadomości w Kafka, jest aplikacja producenta, która może wysyłać wiadomości do tematów w Kafka.
Zauważ, że producenci Kafka są asynchronicznymi producentami wiadomości. Oznacza to, że operacje wykonane podczas publikowania wiadomości na temat partycji tematu Kafka nie są blokujące. Aby uprościć, napiszemy prosty wydawca JSON na tę lekcję.
Aby rozpocząć, zrób instancję dla producenta Kafka:
z Kafka import Kafkaproducer
Importuj JSON
import pprint
producent = kafkaproducer (
bootstrap_servers = 'localHost: 9092',
value_serializer = Lambda V: JSON.Zrzuty (v).enkoduj („UTF-8”))
Atrybut Bootstrap_Servers informuje o hosta i porcie dla serwera Kafka. Atrybut wartości_serializatora służy tylko do celów serializacji JSON napotkanych wartości JSON.
Aby grać z producentem Kafka, spróbujmy wydrukować wskaźniki związane z producentem i klastrem Kafka:
Metryki = producent.metryka()
pprint.pprint (metryki)
Teraz zobaczymy następujące:
Kafka Mterics
Teraz spróbujmy wysłać wiadomość do kolejki Kafka. Prostym obiektem JSON będzie dobrym przykładem:
producent.Wyślij („Linuxhint”, „Temat”: „kafka”)
Linuxhint to partycja tematu, na której obiekt JSON zostanie wysłany. Po uruchomieniu skryptu nie otrzymasz żadnego wyjścia, ponieważ wiadomość jest właśnie wysyłana na partycję tematu. Czas napisać konsumenta, abyśmy mogli przetestować naszą aplikację.
Tworzenie konsumenta
Teraz jesteśmy gotowi nawiązać nowe połączenie jako aplikacja konsumencka i otrzymywać wiadomości z tematu Kafka. Zacznij od stworzenia nowej instancji dla konsumenta:
z Kafka import Kafkaconsumer
z Kafka Import TematPartition
Drukuj („tworzenie połączenia.')
Consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Teraz przypisz temat do tego połączenia, a także możliwą wartość przesunięcia.
Drukuj („Przypisanie tematu.')
konsument.Assign ([TOMPCEPARTITION („LINUXHINT”, 2)])
Wreszcie jesteśmy gotowi wydrukować mssage:
Drukuj („Otrzymanie wiadomości.')
W przypadku wiadomości w konsumentach:
print („offset:” + str (wiadomość [0]) + "\ t msg:" + str (wiadomość))
Dzięki temu otrzymamy listę wszystkich opublikowanych wiadomości na temat partycji tematu Kafka Consumer. Dane wyjściowe tego programu będzie:
Kafka konsument
Tylko dla szybkiego odniesienia, oto kompletny skrypt producenta:
z Kafka import Kafkaproducer
Importuj JSON
import pprint
producent = kafkaproducer (
bootstrap_servers = 'localHost: 9092',
value_serializer = Lambda V: JSON.Zrzuty (v).enkoduj („UTF-8”))
producent.Wyślij („Linuxhint”, „Temat”: „kafka”)
# metryki = producent.metryka()
# pprint.pprint (metryki)
A oto kompletny program konsumencki, którego użyliśmy:
z Kafka import Kafkaconsumer
z Kafka Import TematPartition
Drukuj („tworzenie połączenia.')
Consumer = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Drukuj („Przypisanie tematu.')
konsument.Assign ([TOMPCEPARTITION („LINUXHINT”, 2)])
Drukuj („Otrzymanie wiadomości.')
W przypadku wiadomości w konsumentach:
print („offset:” + str (wiadomość [0]) + "\ t msg:" + str (wiadomość))
Wniosek
W tej lekcji przyjrzeliśmy się, w jaki sposób możemy zainstalować i zacząć używać Apache Kafka w naszych programach Python. Pokazaliśmy, jak łatwo jest wykonywać proste zadania związane z Kafką w Pythonie z demonstrowanym klientem Kafka dla Pythona.