Pyspark RDD - Transformasjoner

Pyspark RDD - Transformasjoner
I Python er Pyspark en gnistmodul som brukes til å gi en lignende type prosessering som Spark.

RDD står for spenstige distribuerte datasett. Vi kan kalle RDD en grunnleggende datastruktur i Apache Spark.

Vi må importere RDD fra Pyspark.RDD -modul.

Så i Pyspark for å lage en RDD, kan vi bruke parallelliseringsmetoden () metoden.

Syntaks:

Spark_app.SparkContext.parallellisere (data)

Hvor,

Data kan være en enimensjonal (lineære data) eller to dimensjonale data (rad-kolonne data).

RDD -transformasjoner:

En transformasjon RDD er en operasjon som brukes på en RDD for å lage nye data fra den eksisterende RDD. Ved hjelp av transformasjoner er vi i stand til å filtrere RDD ved å bruke noen transformasjoner.

La oss se transformasjonene som utføres på den gitte RDD.

Vi vil diskutere dem en etter en.

1. kart()

Kart () Transformasjon brukes til å kartlegge en verdi til elementene som er til stede i RDD. Det tar en anonym funksjon som en parameter, som lambda og transformerer elementene i en RDD.

Syntaks:

RDD_DATA.kart (anonymous_function)

Parametere:

Anonym_funksjon ser ut som:

Lambda Element: Operasjon

For eksempel er operasjonen å legge til/trekke fra alle elementene med noe nytt element.

La oss se eksemplene for å forstå denne transformasjonen bedre.

Eksempel 1:

I dette eksemplet oppretter vi en RDD som heter Student_marks med 20 elementer og bruker kart () -transformasjon ved å legge til hvert element med 20 og vise dem ved hjelp av Collect () handling.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
student_marks = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#display -data i RDD
Print ("Faktiske data i RDD:", Student_marks.Kart (Lambda Element: Element).samle inn())
#Apply kart () transformasjon ved å legge 20 til hvert element i RDD
trykk ("Etter å ha lagt 20 til hvert element i RDD:", Student_marks.Kart (Lambda Element: Element+ 20).samle inn())

Produksjon:

Faktiske data i RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Etter å ha lagt 20 til hvert element i RDD: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

Fra output ovenfor kan vi se at element 20 blir lagt til hvert element i RDD gjennom lambda -funksjonen ved bruk av MAP () -transformasjon.

Eksempel 2:

I dette eksemplet oppretter vi en RDD som heter Student_marks med 20 elementer og bruker kart () -transformasjon ved å trekke fra hvert element med 15 og vise dem ved hjelp av Collect () handling.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
student_marks = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#display -data i RDD
Print ("Faktiske data i RDD:", Student_marks.Kart (Lambda Element: Element).samle inn())
#Apply kart () transformasjon ved å trekke 15 fra hvert element i RDD
trykk ("Etter å ha trukket fra 15 fra hvert element i RDD:", Student_marks.Kart (Lambda Element: Element-15).samle inn())

Produksjon:

Faktiske data i RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Etter å ha trukket fra 15 fra hvert element i RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

Fra output ovenfor kan vi se at element 15 blir trukket til hvert element i RDD gjennom lambda -funksjonen ved bruk av MAP () -transformasjon.

2. filter()

filter () transformasjon brukes til å filtrere verdier fra RDD. Det tar en anonym funksjon som lambda og returnerer elementene ved å filtrere elementer fra en RDD.

Syntaks:

RDD_DATA.filter (anonymous_funksjon)

Parametere:

Anonym_funksjon ser ut som:

lambda element: tilstand/uttrykk

For eksempel brukes tilstanden til å spesifisere de ekspressive utsagnene for å filtrere RDD.

La oss se eksempler for å forstå denne transformasjonen bedre.

Eksempel 1:

I dette eksemplet oppretter vi en RDD som heter Student_marks med 20 elementer og bruker filter () -transformasjon ved å filtrere bare multipler på 5 og vise dem ved hjelp av Collect () handling.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
student_marks = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#display -data i RDD
Print ("Faktiske data i RDD:", Student_marks.Kart (Lambda Element: Element).samle inn())
#Apply filter () transformasjon ved å returnere inn multipler på 5.
trykk ("Multipler på 5 fra en RDD:", Student_marks.Filter (Lambda Element: Element%5 == 0).samle inn())
)

Produksjon:

Faktiske data i RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Multipler på 5 fra en RDD: [90, 100, 45]

Fra output ovenfor kan vi se at multipler av 5 elementer blir filtrert fra RDD.

Eksempel 2:

I dette eksemplet oppretter vi en RDD som heter Student_marks med 20 elementer og bruker filter () -transformasjon ved å filtrere elementer som er større enn 45 og vise dem ved hjelp av Collect () handling.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
student_marks = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#display -data i RDD
Print ("Faktiske data i RDD:", Student_marks.Kart (Lambda Element: Element).samle inn())
#Apply filter () transformasjon ved å filtrere verdier større enn 45
trykk ("Verdier større enn 45:", Student_marks.Filter (Lambda Element: Element> 45).samle inn())

Produksjon:

Faktiske data i RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Verdier større enn 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

Fra output ovenfor kan vi se elementene større enn 45 filtreres fra RDD.

3. Union ()

Union () Transformasjon brukes til å kombinere to RDD -er. Vi kan utføre denne transformasjonen på to RDD -er ..

Syntaks:

RDD_DATA1.Union (RDD_DATA2)

La oss se eksempler for å forstå denne transformasjonen bedre.

Eksempel 1:

I dette eksemplet vil vi opprette en enkelt RDD med Student Marks Data og generere to RDD fra enkelt RDD ved å filtrere noen verdier ved hjelp av filter () transformasjon. Etter det kan vi utføre union () transformasjon på de to filtrerte RDDene.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
student_marks = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
#display -data i RDD
Print ("Faktiske data i RDD:", Student_marks.Kart (Lambda Element: Element).samle inn())
first_filter = student_marks.Filter (Lambda Element: Element> 90)
Second_Filter = Student_marks.filter (lambda element: element <40)
#display Filtrert transformasjon først
trykk ("Elementer i RDD større enn 90", First_Filter.samle inn())
#display andre filtrerte transformasjon
trykk ("Elementer i RDD mindre enn 40", Second_Filter.samle inn())
#Apply Union () Transformasjon ved å utføre Union på ovennevnte 2 -filtre
Print ("Union Transformation on Two Filtred Data", First_Filter.Union (Second_Filter).samle inn())

Produksjon:

Faktiske data i RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Elementer i RDD større enn 90 [100]
Elementer i RDD mindre enn 40 [34, 22, 23, 21, 34, 34, 34]
Unionstransformasjon på to filtrerte data [100, 34, 22, 23, 21, 34, 34, 34]

Fra output ovenfor kan du se at vi utførte Union på First_Filter og Second_Filter.

First_Filter oppnås ved å få elementer fra StudentsMarks RDD større enn 90 og Second_Filter oppnås ved å få elementer fra StudentsMarks RDD mindre enn 40 ved bruk av filter () transformasjon.

Eksempel 2:

I dette eksemplet vil vi lage to RDD -er slik at den første RDD har 20 elementer og den andre RDD har 10 elementer. Etter det kan vi bruke en union () -transformasjon til disse to RDD -ene.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentmerker data med 20 elementer
Student_marks1 = spark_app.SparkContext.Parallelliser ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])))
# Lag studentmerker data med 10 elementer
Student_marks2 = spark_app.SparkContext.Parallelliser ([45,43,23,56,78,21,34,34,56,34]))
#display -data i RDD
Print ("Faktiske data i Studentmerker 1 RDD:", Student_marks1.Kart (Lambda Element: Element).samle inn())
#display -data i RDD
Print ("Faktiske data i Studentmerker 2. RDD:", Student_marks2.Kart (Lambda Element: Element).samle inn())
#Apply Union () Transformasjon ved å utføre Union på ovennevnte 2. RDD -er
PRINT ("Union Transformation on Two RDD", Student_marks1.Union (Student_marks2).samle inn())

Produksjon:

Faktiske data i studentmerker 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Faktiske data i studentmerker 2. RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Union Transformation på to RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Vi kan se at to RDD -er kombineres ved hjelp av Union () Transformation.

Konklusjon

Fra denne pyspark -opplæringen ser vi tre transformasjoner brukt på RDD. Kart () Transformasjon brukes til å kartlegge ved å transformere elementer i en RDD, filter () brukes til å utføre filteroperasjoner og lage en ny filtrert RDD fra den eksisterende RDD. Til slutt diskuterte vi Union () RDD som brukes til å kombinere to RDD -er.