Pyspark - Lag -funksjon

Pyspark - Lag -funksjon
LAG () -funksjonen i Pyspark er tilgjengelig i vindusmodulen som brukes til å returnere de forrige raderverdiene til de gjeldende radene. FirstL, lag () -funksjonen returnerer null for topprader. Det tar en forskyvningsparameter som representerer det totale antallet rader slik at de forrige radverdiene returneres til de neste radene. For de første toppradene plasseres (forskyvning) nullene.

Det er mulig å dele opp radene i DataFrame ved å bruke vindusfunksjonen. Det er tilgjengelig i Pyspark.SQL.vindu modul.

Syntaks:

DataFrame_obj.WithColumn ("lag_column", lag ("kolonne", forskyvning).over (partisjon))

Det tar to parametere:

  1. Kolonnen er kolonnenavnet i PySpark DataFrame der de laggede radverdiene er plassert basert på verdiene i denne kolonnen.
  2. Offset spesifiserer heltallet for å returnere antallet tidligere rader til gjeldende radverdier.

Trinn:

  1. Lag en Pyspark DataFrame som har noen lignende verdier i minst en kolonne.
  2. Partisjon Dataene ved hjelp av PartitionBy () -metoden tilgjengelig i vindusfunksjonen og bestill dem basert på kolonnen ved å bruke OrderBy () -funksjonen.

Syntaks:

partisjon = vindu.Partitionby (“kolonne”).Orderby (“Kolonne”)

Vi kan bestille de partisjonerte dataene med den partisjonerte kolonnen eller en hvilken som helst annen kolonne.

Nå kan du bruke LAG () -funksjonen på de partisjonerte radene ved å bruke over() funksjon.

Vi legger til en kolonne for å lagre radnummeret ved hjelp av WithColumn () funksjon.

Syntaks:

DataFrame_obj.WithColumn ("lag_column", lag ("kolonne", forskyvning).over (partisjon))

Her spesifiserer navnet radnavnet og DataFrame_OBJ er vår Pyspark DataFrame.

La oss implementere koden.

Eksempel 1:

Her oppretter vi en Pyspark DataFrame som har 5 kolonner - ['Subject_id', 'Name', 'Age', 'Technology1', 'Technology2'] med 10 rader og partisjon radene basert på Teknologi1 Bruke vindusfunksjonen. Etter det henger vi 1 rad.

Importer pyspark
Fra Pyspark.SQL import *
Spark_app = SparkSession.bygger.AppName ('_').getorCreate ()
Studenter = [(4, 'Sravan', 23, 'PHP', 'Testing'),
(4, 'Sravan', 23, 'PHP', 'Testing'),
(46, 'Mounika', 22, '.Nett ',' html '),
(4, 'Deepika', 21, 'Oracle', 'html'),
(46, 'Mounika', 22, 'Oracle', 'Testing'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, 'Chandrika', 22, 'Oracle', 'Testing'),
(4, 'Sravan', 23, 'Oracle', 'C#'),
(4, 'Deepika', 21, 'PHP', 'C#'),
(46, 'Mounika', 22, '.Nett ',' testing ')
]
DataFrame_OBJ = Spark_App.CreateTataFrame (Studenter, ['Subject_id', 'Name', 'Age', 'Technology1', 'Technology2'])
Print ("---------- Faktisk DataFrame ----------")
DataFrame_obj.forestilling()
# Importer vindusfunksjonen
Fra Pyspark.SQL.Vindusimportvindu
#import etterslepet fra pyspark.SQL.funksjoner
Fra Pyspark.SQL.Funksjoner importerer etterslep
#Partition DataFrame basert på verdiene i Technology1 -kolonnen og
#Order radene i hver partisjon basert på emne_id -kolonnen
partisjon = vindu.Partitionby ("Technology1").Orderby ('emne_id')
Print ("---------- Partitioned DataFrame ----------")
#Now Nevn etterslep med offset-1 basert på emne_id
DataFrame_obj.WithColumn ("Lag", lag ("emne_id", 1).over (partisjon)).forestilling()

Produksjon:

Forklaring:

I den første utgangen representerer de faktiske dataene som er til stede i DataFrame. I den andre utgangen gjøres partisjonen basert på Teknologi1 kolonne.

Det totale antall partisjoner er 4.

Partisjon 1:

De .Net skjedde to ganger i den første partisjonen. Siden vi spesifiserte lag-offset som 1, den første .Netto verdi er null og den neste .Netto verdi er den forrige raden Subject_id -verdien - 46.

Partisjon 2:

Hadoop skjedde en gang i den andre partisjonen. Så lag er null.

Partisjon 3:

Oracle skjedde fire ganger i tredje partisjon.

For det første orakelet er lag null.

For det andre Oracle er forsinkelsesverdien 4 (siden forrige rad subjekt_idverdi er 4).

For det tredje Oracle er etterslepverdien 4 (siden forrige rad subjekt_idverdi er 4).

For det fjerde orakelet er etterslepverdien 12 (siden forrige rad subjekt_idverdi er 12).

Partisjon 4:

PHP skjedde tre ganger i fjerde partisjon.

Lagverdien for 1. PHP er null.

Forslepverdien for 2. PHP er 4 (siden forrige rad subjekt_idverdi er 4).

Forslepverdien for den tredje PHP er 4 (siden forrige rad subjekt_idverdi er 4).

Eksempel 2:

Lag radene med 2. Forsikre deg om at du opprettet Pyspark DataFrame som sett i eksempel 1.

# Importer vindusfunksjonen
Fra Pyspark.SQL.Vindusimportvindu
#import etterslepet fra pyspark.SQL.funksjoner
Fra Pyspark.SQL.Funksjoner importerer etterslep
#Partition DataFrame basert på verdiene i Technology1 -kolonnen og
#Order radene i hver partisjon basert på emne_id -kolonnen
partisjon = vindu.Partitionby ("Technology1").Orderby ('emne_id')
Print ("---------- Partitioned DataFrame ----------")
#Now Nevn Lag med offset-2 basert på emne_id
DataFrame_obj.WithColumn ("Lag", Lag ("Subject_id", 2).over (partisjon)).forestilling()

Produksjon:

Forklaring:

Partisjonen gjøres basert på Teknologi1 kolonne. Det totale antall partisjoner er 4.

Partisjon 1:

De .Net skjedde to ganger i den første partisjonen. Siden vi spesifiserte lag-offset som 2, er forskyvningen null for begge verdiene.

Partisjon 2:

Hadoop skjedde en gang i den andre partisjonen. Så lag er null.

Partisjon 3:

Oracle skjedde fire ganger i tredje partisjon.

For første og andre orakel er lag null.

For det tredje Oracle er etterslepverdien 4 (siden de to foregående radene subjekt_idverdien er 4).

For det fjerde Oracle er etterslepverdien 4 (siden de to foregående radene subjekt_idverdien er 4).

Partisjon 4:

PHP skjedde tre ganger i fjerde partisjon.

Lagverdien for 1. og 2. PHP er null.

Forslepverdien for 3. PHP er 4 (siden de to foregående radene Emne_id -verdien er 4).

Eksempel 3:

Lag radene med 2 basert på alderssøylen. Forsikre deg om at du opprettet Pyspark DataFrame som sett i eksempel 1.

# Importer vindusfunksjonen
Fra Pyspark.SQL.Vindusimportvindu
#import etterslepet fra pyspark.SQL.funksjoner
Fra Pyspark.SQL.Funksjoner importerer etterslep
#Partition DataFrame basert på verdiene i Technology1 -kolonnen og
#Order radene i hver partisjon basert på alderskolonnen
partisjon = vindu.Partitionby ("Technology1").Orderby ('Alder')
Print ("---------- Partitioned DataFrame ----------")
#Nå Nevn etterslep med forskyvning-2 basert på alder
DataFrame_obj.WithColumn ("Lag", etterslep ("Alder", 2).over (partisjon)).forestilling()

Produksjon:

Forklaring:

Partisjonen gjøres basert på Teknologi1 kolonne og etterslep er definert basert på alderssøylen. Det totale antall partisjoner er 4.

Partisjon 1:

De .Net skjedde to ganger i den første partisjonen. Siden vi spesifiserte lag-offset som 2, er forskyvningen null for begge verdiene.

Partisjon 2:

Hadoop skjedde en gang i den andre partisjonen. Så lag er null.

Partisjon 3:

Oracle skjedde fire ganger i tredje partisjon.

For første og andre orakel er lag null.

For det tredje Oracle er etterslepverdien 21 (aldersverdien fra de to foregående radene er 21).

For det fjerde orakelet er etterslepverdien 22 (aldersverdien fra de to foregående radene er 22).

Partisjon 4:

PHP skjedde tre ganger i fjerde partisjon.

Lagverdien for 1. og 2. PHP er null.

Forslepverdien for 3. HP er 21 (aldersverdien fra de to foregående radene er 21).

Konklusjon

Vi lærte hvordan vi får etterslepverdiene i Pyspark DataFrame i partisjonerte rader. LAG () -funksjonen i Pyspark er tilgjengelig i vindusmodulen som brukes til å returnere de forrige raderverdiene til de gjeldende radene. Vi lærte de forskjellige eksemplene ved å sette de forskjellige forskyvningene.