Agregar datos a un marco de datos vacío en PySpark

En este artículo, veremos cómo agregar datos a un DataFrame vacío en PySpark en el lenguaje de programación Python. 

Método 1: hacer un DataFrame vacío y hacer una unión con un DataFrame no vacío con el mismo esquema

La función union() es la más importante para esta operación. Se utiliza para mezclar dos DataFrames que tienen un esquema equivalente de las columnas. 

Sintaxis : FirstDataFrame.union (Segundo DataFrame)

Devuelve : DataFrame con filas de ambos DataFrames.

Ejemplo:

En este ejemplo, creamos un DataFrame con un esquema particular y los datos crean un DataFrame VACÍO con el mismo esquema y hacemos una unión de estos dos DataFrames usando la función union() en el lenguaje python.

Python

# Importing PySpark and the SparkSession
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a
# DataFrame with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns1 = StructType([StructField('Name', StringType(), False),
                       StructField('Salary', IntegerType(), False)])
 
# Creating an empty DataFrame
first_df = spark_session.createDataFrame(data=emp_RDD,
                                         schema=columns1)
 
# Printing the DataFrame with no data
first_df.show()
 
# Hardcoded data for the second DataFrame
rows = [['Ajay', 56000], ['Srikanth', 89078],
        ['Reddy', 76890], ['Gursaidutt', 98023]]
columns = ['Name', 'Salary']
 
# Creating the DataFrame
second_df = spark_session.createDataFrame(rows, columns)
 
# Printing the non-empty DataFrame
second_df.show()
 
# Storing the union of first_df and
# second_df in first_df
first_df = first_df.union(second_df)
 
# Our first DataFrame that was empty,
# now has data
first_df.show()

Producción : 

+----+------+
|Name|Salary|
+----+------+
+----+------+

+----------+------+
|      Name|Salary|
+----------+------+
|      Ajay| 56000|
|  Srikanth| 89078|
|     Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+

+----------+------+
|      Name|Salary|
+----------+------+
|      Ajay| 56000|
|  Srikanth| 89078|
|     Reddy| 76890|
|Gursaidutt| 98023|
+----------+------+

Método 2: agregue una fila singular a un DataFrame vacío convirtiendo la fila en un DataFrame

Podemos usar createDataFrame() para convertir una sola fila en forma de Lista de Python . Los detalles de createDataFrame() son: 

Sintaxis : CurrentSession.createDataFrame (datos, esquema = ninguno, relación de muestreo = ninguno, verificar esquema = verdadero)

Parámetros

datos :

  • esquema: str/list, opcional: contiene una string o una lista de nombres de columnas.
  • SamplingRatio : float, opcional: Una muestra de filas para inferencia
  • verificarSchema: bool, opcional: Verifica los tipos de datos de cada fila contra el esquema especificado. El valor es Verdadero por defecto.

Ejemplo: 

En este ejemplo, creamos un DataFrame con un esquema particular y una sola fila y creamos un DataFrame VACÍO con el mismo esquema usando createDataFrame(), hacemos una unión de estos dos DataFrames usando la función union() y almacenamos el resultado anterior en el vacío anterior DataFrame y use show() para ver los cambios.

Python

# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame
added_df = spark_session.createDataFrame(added_row, columns)
 
# Storing the union of first_df and second_df
# in first_df
df = df.union(added_df)
 
# Our first DataFrame that was empty,
# now has data
df.show()

Producción : 

+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+

+--------------+--------+
|       Stadium|Capacity|
+--------------+--------+
|Motera Stadium|  132000|
+--------------+--------+

Método 3: Convierta el DataFrame vacío en un DataFrame de Pandas y use la función append()

Usaremos toPandas() para convertir PySpark DataFrame a Pandas DataFrame. Su sintaxis es: 

Sintaxis : PySparkDataFrame.toPandas()

Devoluciones : DataFrame de Pandas correspondiente

Luego usaremos la función append() de Pandas . Su sintaxis es: 

Sintaxis : PandasDataFrame.append(otro, ignore_index=Falso, verificar_integridad=Falso, ordenar=Falso)

Parámetros

  • otro: Pandas DataFrame, Numpy Series, etc.: los datos que deben agregarse.
  • ignore_index: bool: si se indexa como ignorado, los índices del nuevo DataFrame no tendrán relación con los anteriores.
  • sort : bool: ordena las columnas si la alineación de las columnas en otro y PandasDataFrame es diferente.

Ejemplo:

Aquí creamos un DataFrame vacío donde se agregarán los datos, luego convertimos los datos que se agregarán en un Spark DataFrame usando createDataFrame() y luego convertimos ambos DataFrames en un Pandas DataFrame usando toPandas() y usamos la función append() para agregue el marco de datos no vacío al DataFrame vacío e ignore los índices ya que estamos obteniendo un nuevo DataFrame. Finalmente, convertimos nuestro Pandas DataFrame final en un Spark DataFrame usando createDataFrame().

Python

# Importing PySpark and the SparkSession,
# DataType functionality
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
# Creating a spark session
spark_session = SparkSession.builder.appName(
    'Spark_Session').getOrCreate()
 
# Creating an empty RDD to make a DataFrame
# with no data
emp_RDD = spark_session.sparkContext.emptyRDD()
 
# Defining the schema of the DataFrame
columns = StructType([StructField('Stadium', StringType(), False),
                      StructField('Capacity', IntegerType(), False)])
 
# Creating an empty DataFrame
df = spark_session.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()
 
# Hardcoded row for the second DataFrame
added_row = [['Motera Stadium', 132000]]
 
# Creating the DataFrame whose data
# needs to be added
added_df = spark_session.createDataFrame(added_row,
                                         columns)
 
# converting our PySpark DataFrames to
# Pandas DataFrames
pandas_added = added_df.toPandas()
df = df.toPandas()
 
# using append() function to add the data
df = df.append(pandas_added, ignore_index=True)
 
# reconverting our DataFrame back
# to a PySpark DataFrame
df = spark_session.createDataFrame(df)
 
# Printing resultant DataFrame
df.show()

Producción : 

+-------+--------+
|Stadium|Capacity|
+-------+--------+
+-------+--------+

+--------------+--------+
|       Stadium|Capacity|
+--------------+--------+
|Motera Stadium|  132000|
+--------------+--------+

Publicación traducida automáticamente

Artículo escrito por pranavhfs1 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *