Konverter Pyspark RDD til DataFrame

Konverter Pyspark RDD til DataFrame
I Python er Pyspark en gnistmodul som brukes til å gi en lignende type prosessering som Spark.

RDD står for spenstige distribuerte datasett. Vi kan kalle RDD en grunnleggende datastruktur i Apache Spark.

Syntaks

Spark_app.SparkContext.parallellisere (data)

Vi kan relatere dataene i tabellformat. Datastrukturen som brukes er DataFrame.Tabellformat betyr at det lagrer data i rader og kolonner.

I Pyspark kan vi lage en DataFrame fra Spark -appen med CreateDaFrame () -metoden.

Syntaks

Spark_app.CreateTataFrame (input_data, kolonner)

Der input_data kan være en ordbok eller en liste for å lage en dataaframe fra disse dataene, og hvis input_data er en liste over ordbøker, er ikke kolonnene ikke nødvendig. Hvis det er en nestet liste, må vi oppgi kolonnenavn.

La oss nå diskutere hvordan du konverterer Pyspark RDD til DataFrame.

Opprettelse av Pyspark RDD

I dette eksemplet vil vi opprette en RDD -navngitte studenter og vise den ved hjelp av Collect () handling.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentdata med 5 rader og 6 attributter
Studenter = spark_app.SparkContext.Parallelize (['Rollno': '001', 'Navn': 'Sravan', 'Age': 23, 'Height': 5.79, 'vekt': 67, 'adresse': 'guntur',
'Rollno': '002', 'Navn': 'Ojaswi', 'Age': 16, 'Height': 3.79, 'vekt': 34, 'adresse': 'hyd',
'Rollno': '003', 'Navn': 'Gnanesh Chowdary', 'Age': 7, 'Height': 2.79, 'Vekt': 17, 'Adresse': 'Patna',
'Rollno': '004', 'Navn': 'Rohith', 'Age': 9, 'Height': 3.69, 'vekt': 28, 'adresse': 'hyd',
'Rollno': '005', 'Navn': 'Sridevi', 'Age': 37, 'Height': 5.59, 'vekt': 54, 'adresse': 'hyd'])
#Display RDD ved hjelp av Collect ()
trykk (studenter.samle inn())

Produksjon

['Rollno': '001', 'Navn': 'Sravan', 'Age': 23, 'Height': 5.79, 'vekt': 67, 'adresse': 'guntur',
'Rollno': '002', 'Navn': 'Ojaswi', 'Age': 16, 'Height': 3.79, 'vekt': 34, 'adresse': 'hyd',
'Rollno': '003', 'Navn': 'Gnanesh Chowdary', 'Age': 7, 'Height': 2.79, 'Vekt': 17, 'Adresse': 'Patna',
'Rollno': '004', 'Navn': 'Rohith', 'Age': 9, 'Height': 3.69, 'vekt': 28, 'adresse': 'hyd',
'Rollno': '005', 'Navn': 'Sridevi', 'Age': 37, 'Height': 5.59, 'vekt': 54, 'adresse': 'hyd']]

Metode 1: Bruke CreateTaFrame ()

Det er mulig å konvertere RDD til DataFrame fra en Spark -app med CreateFrame () -metoden. Her må vi overføre RDD til denne metoden.

Syntaks

Spark_app.CreateTataFrame (input_rdd)

Hvor input_rdd er RDD.

Eksempel
I dette eksemplet konverterer vi studenter - RDD til DataFrame ved hjelp av CreateFrame () -metoden.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentdata med 5 rader og 6 attributter
Studenter = spark_app.SparkContext.Parallelize (['Rollno': '001', 'Navn': 'Sravan', 'Age': 23, 'Height': 5.79, 'vekt': 67, 'adresse': 'guntur',
'Rollno': '002', 'Navn': 'Ojaswi', 'Age': 16, 'Height': 3.79, 'vekt': 34, 'adresse': 'hyd',
'Rollno': '003', 'Navn': 'Gnanesh Chowdary', 'Age': 7, 'Height': 2.79, 'Vekt': 17, 'Adresse': 'Patna',
'Rollno': '004', 'Navn': 'Rohith', 'Age': 9, 'Height': 3.69, 'vekt': 28, 'adresse': 'hyd',
'Rollno': '005', 'Navn': 'Sridevi', 'Age': 37, 'Height': 5.59, 'vekt': 54, 'adresse': 'hyd'])
#sjekk typen studenter
trykk (type (studenter))
#convert RDD til DataFrame
df = spark_app.CreateTataFrame (studenter)
#Display DataFrame
df.forestilling()
#Check typen DF
Print (Type (DF))

Produksjon

Fra output ovenfor kan vi se at studenter er en RDD (vist ved hjelp av typen), og etter å ha konvertert til DataFrame, viste vi DataFrame ved hjelp av Show () -metoden, og for bekreftelsen returnerte vi typen DataFrame.

Metode 2: Bruke CreateTaFrame () med skjema

Struktureltype ()
Denne metoden brukes til å definere strukturen til Pyspark DataFrame. Den vil godta en liste over datatyper sammen med kolonnenavn for den gitte DataFrame. Dette er kjent som skjemaet for Dataframe. Den lagrer en samling felt

Structfield ()
Denne metoden brukes inne i strukturen () metoden for Pyspark DataFrame. Den aksepterer kolonnenavn med datatypen.

Syntaks

skjema = struktureltype ([
Structfield ("kolonne 1", datatype, true/usann),
Structfield ("kolonne 2", datatype, true/usann),
.. ,
Structfield ("kolonne n", datatype, true/usann)))

Der skjema refererer til dataaframe når det opprettes.

Parametere

  1. StructType godtar en liste over structfields i en liste atskilt med komma.
  2. StructField () brukes til å legge til kolonner til DataFrame, som tar kolonnenavn som den første parameteren og datatypen til de bestemte kolonnene som den andre parameteren.

Vi må bruke datatypene fra metodene som importeres fra Pyspark.SQL.typer modul.

Datatypene som støttes er:

  • StringType () - Brukes til å lagre strengverdier
  • Integertype () - Brukes til å lagre heltall eller lange heltallverdier
  • FloattType () - Brukes til å lagre flyteverdier
  • Dubletype () - Brukes til å lagre doble verdier
  1. Boolske verdier som den tredje parameteren. Hvis det er sant, vil den gitte datatypen bli brukt, ellers ikke når den er falsk.

Vi må overføre dette skjemaet til DataFrame -metoden sammen med dataene.

Syntaks

CreateTataFrame (data, skjema = skjema)

Syntaks

Spark_app.CreateTataFrame (input_rdd)

Hvor, input_rdd er RDD.

Eksempel
I dette eksemplet konverterer vi studenter - RDD til DataFrame ved hjelp av CreateFrame () -metoden med kolonnenavnene - RollNo, navn, alder, høyde, vekt og adresse

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#og importerer strukturtyper og datatyper
Fra Pyspark.SQL.typer importerer struktur, strukturfelt, strengtype, integertype, floattype
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentdata med 5 rader og 6 attributter
Studenter = spark_app.SparkContext.Parallelize (['Rollno': '001', 'Navn': 'Sravan', 'Age': 23, 'Height': 5.79, 'vekt': 67, 'adresse': 'guntur',
'Rollno': '002', 'Navn': 'Ojaswi', 'Age': 16, 'Height': 3.79, 'vekt': 34, 'adresse': 'hyd',
'Rollno': '003', 'Navn': 'Gnanesh Chowdary', 'Age': 7, 'Height': 2.79, 'Vekt': 17, 'Adresse': 'Patna',
'Rollno': '004', 'Navn': 'Rohith', 'Age': 9, 'Height': 3.69, 'vekt': 28, 'adresse': 'hyd',
'Rollno': '005', 'Navn': 'Sridevi', 'Age': 37, 'Height': 5.59, 'vekt': 54, 'adresse': 'hyd'])
#sjekk typen studenter
trykk (type (studenter))
#definer strukturen og strukturfeltene
#for kolonnenavn nedenfor
skjema = struktureltype ([
Structfield ("Rollno", StringType (), True),
Structfield ("Navn", StringType (), True),
Structfield ("Alder", Integertype (), True),
Structfield ("høyde", floattype (), true),
Structfield ("Vekt", Integertype (), True),
Structfield ("adresse", strengtype (), true)
])
#convert RDD til DataFrame
df = spark_app.CreateTataFrame (studenter, skjema)
#Display DataFrame
df.forestilling()
#Check typen DF
Print (Type (DF))

Produksjon

Fra output ovenfor kan vi se at studenter er en RDD (vist ved hjelp av typen), og etter å ha konvertert til DataFrame, viste vi DataFrame ved hjelp av Show () -metoden, og for bekreftelsen returnerte vi typen DataFrame.

Metode 3: Bruke TODF ()

TODF () tar ingen parameter og konverterer den direkte til DataFrame.

Syntaks

input_rdd.TODF ()

Hvor, input_rdd er RDD.

Eksempel
I dette eksemplet konverterer vi studenter - RDD til DataFrame ved hjelp av TODF () -metode.

#import Pyspark -modulen
Importer pyspark
#import SparkSession for å lage en økt
Fra Pyspark.SQL Import SparkSession
# Importer RDD fra Pyspark.RDD
Fra Pyspark.RDD Import RDD
#Create en app som heter Linuxhint
Spark_app = SparkSession.bygger.AppName ('Linuxhint').getorCreate ()
# Lag studentdata med 5 rader og 6 attributter
Studenter = spark_app.SparkContext.Parallelize (['Rollno': '001', 'Navn': 'Sravan', 'Age': 23, 'Height': 5.79, 'vekt': 67, 'adresse': 'guntur',
'Rollno': '002', 'Navn': 'Ojaswi', 'Age': 16, 'Height': 3.79, 'vekt': 34, 'adresse': 'hyd',
'Rollno': '003', 'Navn': 'Gnanesh Chowdary', 'Age': 7, 'Height': 2.79, 'Vekt': 17, 'Adresse': 'Patna',
'Rollno': '004', 'Navn': 'Rohith', 'Age': 9, 'Height': 3.69, 'vekt': 28, 'adresse': 'hyd',
'Rollno': '005', 'Navn': 'Sridevi', 'Age': 37, 'Height': 5.59, 'vekt': 54, 'adresse': 'hyd'])
#sjekk typen studenter
trykk (type (studenter))
#convert RDD til DataFrame
DF = studenter.TODF ()
#Display DataFrame
df.forestilling()
#Check typen DF
Print (Type (DF))

Produksjon

Fra output ovenfor kan vi se at studenter er en RDD (vist ved hjelp av typen), og etter å ha konvertert til DataFrame, viste vi DataFrame ved hjelp av Show () -metoden, og for bekreftelsen returnerte vi typen DataFrame.

Konklusjon

I denne Pyspark -opplæringen så vi hvordan vi konverterer Pyspark RDD til Pyspark DataFrame ved hjelp av CreateFrame () og TODF () -metoder. Hvis du vil oppgi kolonnenavn eksplisitt, kan du bruke den andre metoden i denne opplæringen.