PYSPARK - Funkcja opóźnienia

PYSPARK - Funkcja opóźnienia
Funkcja lag () w PYSPARK jest dostępna w module okien, która służy do zwrócenia poprzednich wierszy do bieżących wierszy. Firstl, funkcja lag () zwraca zerową dla najlepszych wierszy. Przyjmuje parametr przesunięcia, który reprezentuje całkowitą liczbę wierszy, tak że wartości poprzedniego wiersza są zwracane do następnych wierszy. Dla pierwszych górnych wierszy umieszczane są (przesunięte) zerowe.

Możliwe jest podział wierszy w ramce danych za pomocą funkcji okna. Jest dostępny w Pyspark.SQL.okno moduł.

Składnia:

DataFrame_Obj.withColumn („lag_column”, lag („kolumna”, przesunięcie).Over (partycja))

Wymaga dwóch parametrów:

  1. Kolumna to nazwa kolumny w Pyspark DataFrame, w której opóźnione wartości wiersza są umieszczane na podstawie wartości w tej kolumnie.
  2. Przesunięcie określa liczbę całkowitą, aby zwrócić tę liczbę poprzednich wierszy do bieżących wartości wierszy.

Kroki:

  1. Utwórz Pyspark DataFrame, który ma pewne podobne wartości w co najmniej jednej kolumnie.
  2. Partycja danych za pomocą metody paritionby () dostępnej w funkcji okna i zamów je na podstawie kolumny za pomocą funkcji orderby ().

Składnia:

parition = okno.paritionby („kolumna”).Orderby („kolumna”)

Możemy zamówić partycjonowane dane z partycjonowaną kolumną lub dowolną inną kolumną.

Teraz możesz użyć funkcji lag () w partycjonowanych rzędach za pomocą nad() funkcjonować.

Dodajemy kolumnę do przechowywania numeru wiersza za pomocą withcoolumn () funkcjonować.

Składnia:

DataFrame_Obj.withColumn („lag_column”, lag („kolumna”, przesunięcie).Over (partycja))

Tutaj nazwa określa nazwę wiersza, a DataFrame_OBJ to nasza Pyspark DataFrame.

Zaimplementujmy kod.

Przykład 1:

Tutaj tworzymy Pyspark DataFrame, który ma 5 kolumn - [„tematy_id”, „name”, „wiek”, „technologia Technologia 1 Korzystanie z funkcji okna. Potem opóźniamy 1 rząd.

Import Pyspark
od Pyspark.SQL Import *
Spark_App = Sparksession.budowniczy.Nazwa aplikacji('_').getorCreate ()
studenci = [(4, „Sravan”, 23, „Php”, „Testowanie”),
(4, „Sravan”, 23, „php”, „testowanie”),
(46, „Mounika”, 22, '.Net ', „html”),
(4, „Deepika”, 21, „Oracle”, „html”),
(46, „Mounika”, 22, „Oracle”, „Testing”),
(12, „Chandrika”, 22, „Hadoop”, „C#”),
(12, „Chandrika”, 22, „Oracle”, „Testing”),
(4, „Sravan”, 23, „Oracle”, „C#”),
(4, „Deepika”, 21, „php”, „c#”),
(46, „Mounika”, 22, '.Net ', „testowanie”)
]
DataFrame_Obj = Spark_App.CreatedATAframe (studenci, [„Temat_id”, „nazwa”, „wiek”, „technologia1”, „technologia 2”])
Drukuj („---------- Rzeczywista ramka danych ----------”)
DataFrame_Obj.pokazywać()
# Zaimportuj funkcję okna
od Pyspark.SQL.okno importowe
#Import opóźnienie z Pyspark.SQL.Funkcje
od Pyspark.SQL.Funkcje importują opóźnienie
#Partiction Scrame na podstawie wartości w kolumnie technologii1 i
#Order rzędów w każdej partycji na podstawie kolumny tematu_d
parition = okno.Partitionby („Technology1”).OrderBy („tematy_id”)
Drukuj („---------- PARTITIONED DATEFRAME ----------”)
#Now wspomina opóźnienie z Offset-1 na podstawie tematu_id
DataFrame_Obj.withColumn („LAG”, LAG („Temat_id”, 1).Over (partycja)).pokazywać()

Wyjście:

Wyjaśnienie:

W pierwszym wyjściu reprezentuje rzeczywiste dane obecne w ramce danych. W drugim wyjściu partycja jest wykonywana na podstawie Technologia 1 kolumna.

Całkowita liczba partycji wynosi 4.

Partycja 1:

.Net wystąpiła dwa razy w pierwszej partycji. Ponieważ określiliśmy opóźnienie jako 1, pierwszy .Wartość netto jest zerowa, a następna .Wartość netto to poprzednia wartość rzędu tematyczna - 46.

Partict 2:

Hadoop miał miejsce raz w drugim partycji. Więc Lag jest null.

Partycja 3:

Oracle wystąpiła cztery razy w trzeciej partycji.

W przypadku pierwszego wyroczni opóźnienie jest null.

W przypadku drugiego wyroczni wartość opóźnienia wynosi 4 (ponieważ poprzedni wiersz tematyczna wartość wynosi 4).

W przypadku trzeciego wyroczni wartość opóźnienia wynosi 4 (ponieważ wartość poprzedniego rzędu tematyczna jest 4).

W przypadku czwartego wyroczni wartość opóźnienia wynosi 12 (ponieważ wartość poprzedniego rzędu podmiotu_dem jest 12).

Partycja 4:

PHP wystąpił trzy razy w czwartej partycji.

Wartość opóźnienia dla 1. PHP jest null.

Wartość opóźnienia dla 2nd PHP wynosi 4 (ponieważ wartość poprzedniego rzędu podmiotu rzędu wynosi 4).

Wartość opóźnienia dla 3rd PHP wynosi 4 (ponieważ wartość poprzedniego rzędu tematycznego wynosi 4).

Przykład 2:

Opóźnić wiersze o 2. Upewnij się, że utworzyłeś Pyspark DataFrame, jak pokazano w przykładzie 1.

# Zaimportuj funkcję okna
od Pyspark.SQL.okno importowe
#Import opóźnienie z Pyspark.SQL.Funkcje
od Pyspark.SQL.Funkcje importują opóźnienie
#Partiction Scrame na podstawie wartości w kolumnie technologii1 i
#Order rzędów w każdej partycji na podstawie kolumny tematu_d
parition = okno.Partitionby („Technology1”).OrderBy („tematy_id”)
Drukuj („---------- PARTITIONED DATEFRAME ----------”)
#Now wspomina opóźnienie z offset-2 na podstawie tematu_id
DataFrame_Obj.withColumn („LAG”, LAG („Temat_id”, 2).Over (partycja)).pokazywać()

Wyjście:

Wyjaśnienie:

Partycja jest wykonywana na podstawie Technologia 1 kolumna. Całkowita liczba partycji wynosi 4.

Partycja 1:

.Net wystąpiła dwa razy w pierwszej partycji. Ponieważ określiliśmy opóźnienie jako 2, przesunięcie jest zerowe dla obu wartości.

Partict 2:

Hadoop miał miejsce raz w drugim partycji. Więc Lag jest null.

Partycja 3:

Oracle wystąpiła cztery razy w trzeciej partycji.

W pierwszym i drugim wyroczni Lag jest null.

W przypadku trzeciego wyroczni wartość opóźnienia wynosi 4 (ponieważ poprzednie 2 wiersze Temat_ID wynosi 4).

W przypadku czwartego wyroczni wartość opóźnienia wynosi 4 (ponieważ poprzednie 2 wiersze Temat_ID wynosi 4).

Partycja 4:

PHP wystąpił trzy razy w czwartej partycji.

Wartość opóźnienia dla pierwszego i 2. PHP jest zerowa.

Wartość opóźnienia dla 3rd PHP wynosi 4 (ponieważ poprzednie 2 wiersze Temat_ID wynosi 4).

Przykład 3:

Opóźniać wiersze o 2 w oparciu o kolumnę wiekową. Upewnij się, że utworzyłeś Pyspark DataFrame, jak pokazano w przykładzie 1.

# Zaimportuj funkcję okna
od Pyspark.SQL.okno importowe
#Import opóźnienie z Pyspark.SQL.Funkcje
od Pyspark.SQL.Funkcje importują opóźnienie
#Partiction Scrame na podstawie wartości w kolumnie technologii1 i
#Order rzędy w każdej partycji na podstawie kolumny wiekowej
parition = okno.Partitionby („Technology1”).Orderby („wiek”)
Drukuj („---------- PARTITIONED DATEFRAME ----------”)
#Now wspomina opóźnienie z Offset-2 w oparciu o wiek
DataFrame_Obj.z kolumn („lag”, lag („wiek”, 2).Over (partycja)).pokazywać()

Wyjście:

Wyjaśnienie:

Partycja jest wykonywana na podstawie Technologia 1 Kolumna i opóźnienie jest zdefiniowane na podstawie kolumny wiekowej. Całkowita liczba partycji wynosi 4.

Partycja 1:

.Net wystąpiła dwa razy w pierwszej partycji. Ponieważ określiliśmy opóźnienie jako 2, przesunięcie jest zerowe dla obu wartości.

Partict 2:

Hadoop miał miejsce raz w drugim partycji. Więc Lag jest null.

Partycja 3:

Oracle wystąpiła cztery razy w trzeciej partycji.

W pierwszym i drugim wyroczni Lag jest null.

W przypadku trzeciego wyroczni wartość opóźnienia wynosi 21 (wartość wieku z poprzednich dwóch wierszy wynosi 21).

W przypadku czwartego wyroczni wartość opóźnienia wynosi 22 (wartość wieku z poprzednich dwóch wierszy wynosi 22).

Partycja 4:

PHP wystąpił trzy razy w czwartej partycji.

Wartość opóźnienia dla pierwszego i 2. PHP jest zerowa.

Wartość opóźnienia dla 3. HP wynosi 21 (wartość wieku z poprzednich dwóch rzędów wynosi 21).

Wniosek

Nauczyliśmy się, jak uzyskać wartości opóźnienia w Pyspark DataFrame w partycjonowanych rzędach. Funkcja lag () w PYSPARK jest dostępna w module okien, która służy do zwrócenia poprzednich wierszy do bieżących wierszy. Nauczyliśmy się różnych przykładów, ustawiając różne przesunięcia.