Definición del esquema DataFrame con StructField y StructType

En este artículo, aprenderemos cómo definir DataFrame Schema con StructField y StructType. 

  • StructType y StructFields se utilizan para definir un esquema o su parte para el marco de datos. Esto define el nombre, el tipo de datos y la marca anulable para cada columna.
  • El objeto StructType es la colección de objetos StructFields. Es un tipo de datos incorporado que contiene la lista de StructField.

Sintaxis: 

  • pyspark.sql.types.StructType(campos=Ninguno)
  • pyspark.sql.types.StructField(nombre, tipo de datos, anulable=Verdadero)

Parámetro:

  • campos – Lista de StructField.
  • name – Nombre de la columna.
  • tipo de datos: tipo de datos, es decir, entero, string, flotante, etc.
  • anulable: si los campos son NULL/Ninguno o no.

Para definir el esquema, tenemos que usar el objeto StructType() en el que tenemos que definir o pasar el StructField() que contiene el nombre de la columna, el tipo de datos de la columna y el indicador anulable. Podemos escribir:-

schema = StructType([StructField(column_name1,datatype(),nullable_flag),
            StructField(column_name2,datatype(),nullable_flag),
            StructField(column_name3,datatype(),nullable_flag)
            ])

Ejemplo 1: Definición de DataFrame con esquema con StructType y StructField.

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 112345, 4.0, 12499),
                  ("LED TV", 114567, 4.2, 49999),
                  ("Washing Machine", 113465, 3.9, 69999),
                  ("T-shirt", 124378, 4.1, 1999),
                  ("Jeans", 126754, 3.7, 3999),
                  ("Running Shoes", 134565, 4.7, 1499),
                  ("Face Mask", 145234, 4.6, 999)]
 
    # defining schema for the dataframe with
    # StructType and StructField
    schm = StructType([
        StructField("Product Name", StringType(), True),
        StructField("Product ID", LongType(), True),
        StructField("Rating", FloatType(), True),
        StructField("Product Price", IntegerType(), True),
    ])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    df.printSchema()
    df.show()

Producción:

En el código anterior, hicimos que la bandera anulable = True. El uso de hacerlo Verdadero es que si al crear Dataframe cualquier valor de campo es NULO/Ninguno, entonces también se creará Dataframe sin ningún valor. 

Ejemplo 2: definición del esquema de marco de datos con StructType anidado.

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(("Refrigerator", 112345), 4.0, 12499),
                  (("LED TV", 114567), 4.2, 49999),
                  (("Washing Machine", 113465), 3.9, 69999),
                  (("T-shirt", 124378), 4.1, 1999),
                  (("Jeans", 126754), 3.7, 3999),
                  (("Running Shoes", 134565), 4.7, 1499),
                  (("Face Mask", 145234), 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField('Product', StructType([
            StructField('Product Name', StringType(), True),
            StructField('Product ID', LongType(), True),
        ])),
       
        StructField('Rating', FloatType(), True),
        StructField('Price', IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    df.printSchema()
    df.show(truncate=False)

Producción:

Ejemplo 3: Cambiar la estructura del marco de datos y agregar una nueva columna usando la clase de columna PySpark.

Python

# importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, struct, when
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType, FloatType
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 112345, 4.0, 12499),
                  ("LED TV", 114567, 4.2, 49999),
                  ("Washing Machine", 113465, 3.9, 69999),
                  ("T-shirt", 124378, 4.1, 1999),
                  ("Jeans", 126754, 3.7, 3999),
                  ("Running Shoes", 134565, 4.7, 1499),
                  ("Face Mask", 145234, 4.6, 999)]
 
    # defining schema for the dataframe using
    # nested StructType
    schm = StructType([
        StructField("Product Name", StringType(), True),
        StructField("Product ID", LongType(), True),
        StructField("Rating", FloatType(), True),
        StructField("Product Price", IntegerType(), True)])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # copying the columns to the new struct
    # Product
    new_df = df.withColumn("Product",
                           struct(col("Product Name").alias("Name"),
                                  col("Product ID").alias("ID"),
                                  col("Rating").alias("Rating"),
                                  col("Product Price").alias("Price")))
 
    # adding new column according to the given
    # condition
    new_df = new_df.withColumn("Product Range",
                               when(col("Product Price").cast(
                                   IntegerType()) < 1000, "Low")
                               .when(col("Product Price").cast(IntegerType()
                                                              ) < 7000, "Medium")
                               .otherwise("High"))
 
    # dropping the columns as all column values
    # are copied in Product column
    new_df = new_df.drop("Product Name", "Product ID",
                         "Rating", "Product Price")
 
    # visualizing dataframe and it's schema
    new_df.printSchema()
    new_df.show(truncate=False)

Producción:

  • En el ejemplo anterior, estamos cambiando la estructura del Dataframe usando la función struct() y copiando la columna en la nueva estructura ‘Producto’ y creando la columna Producto usando la función withColumn().
  • Después de copiar el ‘Nombre del producto’, ‘ID del producto’, ‘Clasificación’, ‘Precio del producto’ a la nueva estructura ‘Producto’.
  • Estamos agregando la nueva columna ‘Rango de precios’ usando la función withColumn(), de acuerdo con la condición dada que se divide en tres categorías, es decir, Bajo, Medio y Alto. Si el ‘Precio del producto’ es inferior a 1000, ese producto cae en la categoría Baja y si el ‘Precio del producto’ es inferior a 7000, ese producto cae en la categoría Media; de lo contrario, ese producto cae en la categoría Alta.
  • Después de crear la nueva estructura ‘Producto’ y agregar la nueva columna ‘Rango de precios’, tenemos que eliminar la columna ‘ Nombre del producto’, ‘ID del producto’, ‘Calificación’, ‘Precio del producto’ usando la función drop(). Luego, imprima el esquema con la estructura del marco de datos modificada y las columnas agregadas.

Ejemplo 4: definición del esquema de marco de datos utilizando el formato JSON y StructType().

Python

# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import json
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [("Refrigerator", 4.0),
                  ("LED TV", 4.2),
                  ("Washing Machine", 3.9),
                  ("T-shirt", 4.1)
                  ]
 
    # defining schema for the dataframe with
    # StructType and StructField
    schm = T.StructType([
        T.StructField("Product Name", T.StringType(), True),
        T.StructField("Rating", T.FloatType(), True)
    ])
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schm)
 
    # visualizing dataframe and it's schema
    print("Original Dataframe:-")
    df.printSchema()
    df.show()
 
    print("-------------------------------------------")
    print("Schema in json format:-")
 
    # storing schema in json format using
    # schema.json() function
    schma = df.schema.json()
    print(schma)
 
    # loading the json format schema
    schm1 = StructType.fromJson(json.loads(schma))
 
    # creating dataframe using json format schema
    json_df = spark.createDataFrame(
        spark.sparkContext.parallelize(input_data), schm1)
    print("-------------------------------------------")
    print("Dataframe using json schema:-")
     
    # showing the created dataframe from json format
    # schema printing the schema of created dataframe
    json_df.printSchema()
    json_df.show()

Producción:

Nota: También puede almacenar el formato JSON en el archivo y usar el archivo para definir el esquema, el código para esto también es el mismo que el anterior, solo tiene que pasar el archivo JSON en la función loads(), en el ejemplo anterior, el el esquema en formato JSON se almacena en una variable, y estamos usando esa variable para definir el esquema.

Ejemplo 5: definición del esquema de marco de datos usando StructType() con ArrayType() y MapType().

Python

# importing necessary libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as T
 
# function to create SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("Product_mart.com") \
        .getOrCreate()
    return spk
 
# function to create dataframe
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    # Data containing the Array and Map- key,value pair
    input_data = [
        ("Alex", 'Buttler', ["Mathematics", "Computer Science"],
         {"Mathematics": 'Physics', "Chemistry": "Biology"}),
        ("Sam", "Samson", ["Chemistry, Biology"],
         {"Chemistry": 'Physics', "Mathematics": "Biology"}),
        ("Rossi", "Bryant", ["English", "Geography"],
         {"History": 'Geography', "Chemistry": "Biology"}),
        ("Sidz", "Murraz", ["History", "Environmental Science"],
         {"English": 'Environmental Science', "Chemistry": "Mathematics"}),
        ("Robert", "Cox", ["Physics", "English"],
         {"Computer Science": 'Environmental Science', "Chemistry": "Geography"})
    ]
 
    # defining schema with ArrayType and MapType()
    # using StructType() and StructField()
    array_schm = StructType([
        StructField('Firstname', StringType(), True),
        StructField('Lastname', StringType(), True),
        StructField('Subject', ArrayType(StringType()), True),
        StructField('Subject Combinations', MapType(
            StringType(), StringType()), True)
    ])
 
    # calling function for creating the dataframe
    df = create_df(spark, input_data, array_schm)
 
    # printing schema of df and showing dataframe
    df.printSchema()
    df.show(truncate=False)

Producción:

Publicación traducida automáticamente

Artículo escrito por srishivansh5404 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *