Convertir PySpark RDD a DataFrame

En este artículo, discutiremos cómo convertir el RDD a un marco de datos en PySpark. Hay dos enfoques para convertir RDD a marco de datos.

  1. Usando createDataframe (rdd, esquema)
  2. Usando toDF (esquema)

Pero antes de avanzar para convertir RDD a Dataframe, primero creemos un RDD

Ejemplo:

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .appName("Corona_cases_statewise.com") \
        .getOrCreate()
    return spk
  
# function to create RDD
def create_RDD(sc_obj, data):
    df = sc.parallelize(data)
    return df
  
  
if __name__ == "__main__":
  
    input_data = [("Uttar Pradesh", 122000, 89600, 12238),
                  ("Maharashtra", 454000, 380000, 67985),
                  ("Tamil Nadu", 115000, 102000, 13933),
                  ("Karnataka", 147000, 111000, 15306),
                  ("Kerala", 153000, 124000, 5259)]
  
    # calling function to create SparkSession
    spark = create_session()
  
    # creating spark context object
    sc = spark.sparkContext
  
    # calling function to create RDD
    rd_df = create_RDD(sc, input_data)
  
    # printing the type
    print(type(rd_df))

Producción:

<class 'pyspark.rdd.RDD'>

Método 1: Usar la función createDataframe(). 

Después de crear el RDD, lo hemos convertido a Dataframe usando la función createDataframe() en la que hemos pasado el RDD y el esquema definido para Dataframe.

Sintaxis:

spark.CreateDataFrame(rdd, schema)

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc_obj,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(spark,df,schema):
    
  # converting RDD to df using createDataframe()
  # in which we are passing RDD and schema of df
  df1 = spark.createDataFrame(df,schema)
  return df1
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(spark,rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()

Producción:

Método 2: Usar la función toDF().

Después de crear el RDD, lo hemos convertido a Dataframe usando la función toDF() en la que hemos pasado el esquema definido para Dataframe.

Sintaxis:

df.toDF(schema)

Python

# importing necessary libraries
from pyspark.sql import SparkSession
  
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .appName("Corona_cases_statewise.com") \
      .getOrCreate()
  return spk
  
# function to create RDD
def create_RDD(sc,data):
  df = sc.parallelize(data)
  return df
  
# function to convert RDD to dataframe
def RDD_to_df(df,schema):
    
  # converting RDD to dataframe using toDF()
  # in which we are passing schema of df
  df = rd_df.toDF(schema)
  return df
  
if __name__ == "__main__":
      
  input_data = [("Uttar Pradesh",122000,89600,12238),
          ("Maharashtra",454000,380000,67985),
          ("Tamil Nadu",115000,102000,13933),
          ("Karnataka",147000,111000,15306),
          ("Kerala",153000,124000,5259)]
  
  # calling function to create SparkSession
  spark = create_session()
  
  # creating spark context object
  sc = spark.sparkContext
  
  # calling function to create RDD
  rd_df = create_RDD(sc,input_data)
  
  schema_lst = ["State","Cases","Recovered","Deaths"]
  
  # calling function to covert RDD to dataframe
  converted_df = RDD_to_df(rd_df,schema_lst)
    
  # visualizing the schema and dataframe
  converted_df.printSchema()
  converted_df.show()

Producción:

Publicación traducida automáticamente

Artículo escrito por srishivansh5404 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *