En este artículo, aprenderemos cómo definir DataFrame Schema con StructField y StructType.
- StructType y StructFields se utilizan para definir un esquema o su parte para el marco de datos. Esto define el nombre, el tipo de datos y la marca anulable para cada columna.
- El objeto StructType es la colección de objetos StructFields. Es un tipo de datos incorporado que contiene la lista de StructField.
Sintaxis:
- pyspark.sql.types.StructType(campos=Ninguno)
- pyspark.sql.types.StructField(nombre, tipo de datos, anulable=Verdadero)
Parámetro:
- campos – Lista de StructField.
- name – Nombre de la columna.
- tipo de datos: tipo de datos, es decir, entero, string, flotante, etc.
- anulable: si los campos son NULL/Ninguno o no.
Para definir el esquema, tenemos que usar el objeto StructType() en el que tenemos que definir o pasar el StructField() que contiene el nombre de la columna, el tipo de datos de la columna y el indicador anulable. Podemos escribir:-
schema = StructType([StructField(column_name1,datatype(),nullable_flag), StructField(column_name2,datatype(),nullable_flag), StructField(column_name3,datatype(),nullable_flag) ])
Ejemplo 1: Definición de DataFrame con esquema con StructType y StructField.
Python
# importing necessary libraries from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType # function to create SparkSession def create_session(): spk = SparkSession.builder \ .master("local") \ .appName("Product_mart.com") \ .getOrCreate() return spk # function to create dataframe def create_df(spark, data, schema): df1 = spark.createDataFrame(data, schema) return df1 if __name__ == "__main__": # calling function to create SparkSession spark = create_session() input_data = [("Refrigerator", 112345, 4.0, 12499), ("LED TV", 114567, 4.2, 49999), ("Washing Machine", 113465, 3.9, 69999), ("T-shirt", 124378, 4.1, 1999), ("Jeans", 126754, 3.7, 3999), ("Running Shoes", 134565, 4.7, 1499), ("Face Mask", 145234, 4.6, 999)] # defining schema for the dataframe with # StructType and StructField schm = StructType([ StructField("Product Name", StringType(), True), StructField("Product ID", LongType(), True), StructField("Rating", FloatType(), True), StructField("Product Price", IntegerType(), True), ]) # calling function to create dataframe df = create_df(spark, input_data, schm) # visualizing dataframe and it's schema df.printSchema() df.show()
Producción:
En el código anterior, hicimos que la bandera anulable = True. El uso de hacerlo Verdadero es que si al crear Dataframe cualquier valor de campo es NULO/Ninguno, entonces también se creará Dataframe sin ningún valor.
Ejemplo 2: definición del esquema de marco de datos con StructType anidado.
Python
# importing necessary libraries from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType # function to create SparkSession def create_session(): spk = SparkSession.builder \ .master("local") \ .appName("Product_mart.com") \ .getOrCreate() return spk # function to create dataframe def create_df(spark, data, schema): df1 = spark.createDataFrame(data, schema) return df1 if __name__ == "__main__": # calling function to create SparkSession spark = create_session() input_data = [(("Refrigerator", 112345), 4.0, 12499), (("LED TV", 114567), 4.2, 49999), (("Washing Machine", 113465), 3.9, 69999), (("T-shirt", 124378), 4.1, 1999), (("Jeans", 126754), 3.7, 3999), (("Running Shoes", 134565), 4.7, 1499), (("Face Mask", 145234), 4.6, 999)] # defining schema for the dataframe using # nested StructType schm = StructType([ StructField('Product', StructType([ StructField('Product Name', StringType(), True), StructField('Product ID', LongType(), True), ])), StructField('Rating', FloatType(), True), StructField('Price', IntegerType(), True)]) # calling function to create dataframe df = create_df(spark, input_data, schm) # visualizing dataframe and it's schema df.printSchema() df.show(truncate=False)
Producción:
Ejemplo 3: Cambiar la estructura del marco de datos y agregar una nueva columna usando la clase de columna PySpark.
Python
# importing necessary libraries from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.functions import col, struct, when from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType # function to create SparkSession def create_session(): spk = SparkSession.builder \ .master("local") \ .appName("Product_mart.com") \ .getOrCreate() return spk # function to create dataframe def create_df(spark, data, schema): df1 = spark.createDataFrame(data, schema) return df1 if __name__ == "__main__": # calling function to create SparkSession spark = create_session() input_data = [("Refrigerator", 112345, 4.0, 12499), ("LED TV", 114567, 4.2, 49999), ("Washing Machine", 113465, 3.9, 69999), ("T-shirt", 124378, 4.1, 1999), ("Jeans", 126754, 3.7, 3999), ("Running Shoes", 134565, 4.7, 1499), ("Face Mask", 145234, 4.6, 999)] # defining schema for the dataframe using # nested StructType schm = StructType([ StructField("Product Name", StringType(), True), StructField("Product ID", LongType(), True), StructField("Rating", FloatType(), True), StructField("Product Price", IntegerType(), True)]) # calling function to create dataframe df = create_df(spark, input_data, schm) # copying the columns to the new struct # Product new_df = df.withColumn("Product", struct(col("Product Name").alias("Name"), col("Product ID").alias("ID"), col("Rating").alias("Rating"), col("Product Price").alias("Price"))) # adding new column according to the given # condition new_df = new_df.withColumn("Product Range", when(col("Product Price").cast( IntegerType()) < 1000, "Low") .when(col("Product Price").cast(IntegerType() ) < 7000, "Medium") .otherwise("High")) # dropping the columns as all column values # are copied in Product column new_df = new_df.drop("Product Name", "Product ID", "Rating", "Product Price") # visualizing dataframe and it's schema new_df.printSchema() new_df.show(truncate=False)
Producción:
- En el ejemplo anterior, estamos cambiando la estructura del Dataframe usando la función struct() y copiando la columna en la nueva estructura ‘Producto’ y creando la columna Producto usando la función withColumn().
- Después de copiar el ‘Nombre del producto’, ‘ID del producto’, ‘Clasificación’, ‘Precio del producto’ a la nueva estructura ‘Producto’.
- Estamos agregando la nueva columna ‘Rango de precios’ usando la función withColumn(), de acuerdo con la condición dada que se divide en tres categorías, es decir, Bajo, Medio y Alto. Si el ‘Precio del producto’ es inferior a 1000, ese producto cae en la categoría Baja y si el ‘Precio del producto’ es inferior a 7000, ese producto cae en la categoría Media; de lo contrario, ese producto cae en la categoría Alta.
- Después de crear la nueva estructura ‘Producto’ y agregar la nueva columna ‘Rango de precios’, tenemos que eliminar la columna ‘ Nombre del producto’, ‘ID del producto’, ‘Calificación’, ‘Precio del producto’ usando la función drop(). Luego, imprima el esquema con la estructura del marco de datos modificada y las columnas agregadas.
Ejemplo 4: definición del esquema de marco de datos utilizando el formato JSON y StructType().
Python
# importing necessary libraries from pyspark.sql import SparkSession import pyspark.sql.types as T import json # function to create SparkSession def create_session(): spk = SparkSession.builder \ .master("local") \ .appName("Product_mart.com") \ .getOrCreate() return spk # function to create dataframe def create_df(spark, data, schema): df1 = spark.createDataFrame(data, schema) return df1 if __name__ == "__main__": # calling function to create SparkSession spark = create_session() input_data = [("Refrigerator", 4.0), ("LED TV", 4.2), ("Washing Machine", 3.9), ("T-shirt", 4.1) ] # defining schema for the dataframe with # StructType and StructField schm = T.StructType([ T.StructField("Product Name", T.StringType(), True), T.StructField("Rating", T.FloatType(), True) ]) # calling function to create dataframe df = create_df(spark, input_data, schm) # visualizing dataframe and it's schema print("Original Dataframe:-") df.printSchema() df.show() print("-------------------------------------------") print("Schema in json format:-") # storing schema in json format using # schema.json() function schma = df.schema.json() print(schma) # loading the json format schema schm1 = StructType.fromJson(json.loads(schma)) # creating dataframe using json format schema json_df = spark.createDataFrame( spark.sparkContext.parallelize(input_data), schm1) print("-------------------------------------------") print("Dataframe using json schema:-") # showing the created dataframe from json format # schema printing the schema of created dataframe json_df.printSchema() json_df.show()
Producción:
Nota: También puede almacenar el formato JSON en el archivo y usar el archivo para definir el esquema, el código para esto también es el mismo que el anterior, solo tiene que pasar el archivo JSON en la función loads(), en el ejemplo anterior, el el esquema en formato JSON se almacena en una variable, y estamos usando esa variable para definir el esquema.
Ejemplo 5: definición del esquema de marco de datos usando StructType() con ArrayType() y MapType().
Python
# importing necessary libraries from pyspark.sql import SparkSession import pyspark.sql.types as T # function to create SparkSession def create_session(): spk = SparkSession.builder \ .master("local") \ .appName("Product_mart.com") \ .getOrCreate() return spk # function to create dataframe def create_df(spark, data, schema): df1 = spark.createDataFrame(data, schema) return df1 if __name__ == "__main__": # calling function to create SparkSession spark = create_session() # Data containing the Array and Map- key,value pair input_data = [ ("Alex", 'Buttler', ["Mathematics", "Computer Science"], {"Mathematics": 'Physics', "Chemistry": "Biology"}), ("Sam", "Samson", ["Chemistry, Biology"], {"Chemistry": 'Physics', "Mathematics": "Biology"}), ("Rossi", "Bryant", ["English", "Geography"], {"History": 'Geography', "Chemistry": "Biology"}), ("Sidz", "Murraz", ["History", "Environmental Science"], {"English": 'Environmental Science', "Chemistry": "Mathematics"}), ("Robert", "Cox", ["Physics", "English"], {"Computer Science": 'Environmental Science', "Chemistry": "Geography"}) ] # defining schema with ArrayType and MapType() # using StructType() and StructField() array_schm = StructType([ StructField('Firstname', StringType(), True), StructField('Lastname', StringType(), True), StructField('Subject', ArrayType(StringType()), True), StructField('Subject Combinations', MapType( StringType(), StringType()), True) ]) # calling function for creating the dataframe df = create_df(spark, input_data, array_schm) # printing schema of df and showing dataframe df.printSchema() df.show(truncate=False)
Producción:
Publicación traducida automáticamente
Artículo escrito por srishivansh5404 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA