En este artículo, discutiremos cómo convertir el RDD a un marco de datos en PySpark. Hay dos enfoques para convertir RDD a marco de datos.
- Usando createDataframe (rdd, esquema)
- Usando toDF (esquema)
Pero antes de avanzar para convertir RDD a Dataframe, primero creemos un RDD
Ejemplo:
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName("Corona_cases_statewise.com") \ .getOrCreate() return spk # function to create RDD def create_RDD(sc_obj, data): df = sc.parallelize(data) return df if __name__ == "__main__": input_data = [("Uttar Pradesh", 122000, 89600, 12238), ("Maharashtra", 454000, 380000, 67985), ("Tamil Nadu", 115000, 102000, 13933), ("Karnataka", 147000, 111000, 15306), ("Kerala", 153000, 124000, 5259)] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc, input_data) # printing the type print(type(rd_df))
Producción:
<class 'pyspark.rdd.RDD'>
Método 1: Usar la función createDataframe().
Después de crear el RDD, lo hemos convertido a Dataframe usando la función createDataframe() en la que hemos pasado el RDD y el esquema definido para Dataframe.
Sintaxis:
spark.CreateDataFrame(rdd, schema)
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName("Corona_cases_statewise.com") \ .getOrCreate() return spk # function to create RDD def create_RDD(sc_obj,data): df = sc.parallelize(data) return df # function to convert RDD to dataframe def RDD_to_df(spark,df,schema): # converting RDD to df using createDataframe() # in which we are passing RDD and schema of df df1 = spark.createDataFrame(df,schema) return df1 if __name__ == "__main__": input_data = [("Uttar Pradesh",122000,89600,12238), ("Maharashtra",454000,380000,67985), ("Tamil Nadu",115000,102000,13933), ("Karnataka",147000,111000,15306), ("Kerala",153000,124000,5259)] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc,input_data) schema_lst = ["State","Cases","Recovered","Deaths"] # calling function to covert RDD to dataframe converted_df = RDD_to_df(spark,rd_df,schema_lst) # visualizing the schema and dataframe converted_df.printSchema() converted_df.show()
Producción:
Método 2: Usar la función toDF().
Después de crear el RDD, lo hemos convertido a Dataframe usando la función toDF() en la que hemos pasado el esquema definido para Dataframe.
Sintaxis:
df.toDF(schema)
Python
# importing necessary libraries from pyspark.sql import SparkSession # function to create new SparkSession def create_session(): spk = SparkSession.builder \ .appName("Corona_cases_statewise.com") \ .getOrCreate() return spk # function to create RDD def create_RDD(sc,data): df = sc.parallelize(data) return df # function to convert RDD to dataframe def RDD_to_df(df,schema): # converting RDD to dataframe using toDF() # in which we are passing schema of df df = rd_df.toDF(schema) return df if __name__ == "__main__": input_data = [("Uttar Pradesh",122000,89600,12238), ("Maharashtra",454000,380000,67985), ("Tamil Nadu",115000,102000,13933), ("Karnataka",147000,111000,15306), ("Kerala",153000,124000,5259)] # calling function to create SparkSession spark = create_session() # creating spark context object sc = spark.sparkContext # calling function to create RDD rd_df = create_RDD(sc,input_data) schema_lst = ["State","Cases","Recovered","Deaths"] # calling function to covert RDD to dataframe converted_df = RDD_to_df(rd_df,schema_lst) # visualizing the schema and dataframe converted_df.printSchema() converted_df.show()
Producción:
Publicación traducida automáticamente
Artículo escrito por srishivansh5404 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA