PySpark: combine dos marcos de datos con diferentes columnas o esquemas

En este artículo, discutiremos cómo fusionar dos marcos de datos con diferentes cantidades de columnas o esquemas en PySpark en Python.

Consideremos el primer marco de datos:

Aquí tenemos 3 columnas denominadas id, nombre y dirección para una mejor demostración.

Python3

# importing module
import pyspark
  
# import when and lit function
from pyspark.sql.functions import when, lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# display
dataframe1.show()

Producción:

Consideremos el segundo marco de datos

Aquí vamos a crear un marco de datos con 2 columnas.

Python3

# importing module
import pyspark
  
# import when and lit function
from pyspark.sql.functions import when, lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# display
dataframe2.show()

Producción:

No podemos fusionar los marcos de datos porque las columnas son diferentes, por lo que debemos agregar las columnas que faltan. Aquí, en el primer marco de datos (marco de datos1), las columnas [‘ID’, ‘NOMBRE’, ‘Dirección’] y las columnas del segundo marco de datos (marco de datos2) son [‘ID’, ‘Edad’].

Ahora tenemos que agregar la columna Edad al primer marco de datos y NOMBRE y Dirección en el segundo marco de datos, podemos hacerlo usando la función lit(). Esta función está disponible en pyspark.sql.functions que se utilizan para agregar una columna con un valor. Aquí vamos a agregar un valor con Ninguno.

Sintaxis :

para columna en [columna para columna en dataframe1.columns si la columna no está en dataframe2.columns]:

   dataframe2 = dataframe2.withColumn(columna, iluminado(Ninguno))

dónde,

  • dataframe1 es el primer marco de datos
  • dataframe2 es el segundo marco de datos

Agregue columnas faltantes a ambos marcos de datos

En ambos marcos de datos vamos a agregar la columna Edad al primer marco de datos y NOMBRE y Dirección en el segundo marco de datos utilizando la sintaxis anterior.

Finalmente, estamos mostrando los nombres de las columnas de ambos marcos de datos.

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# now see the columns of dataframe1
print(dataframe1.columns)
  
# now see the columns of dataframe2
print(dataframe2.columns)

Producción:

['ID', 'NAME', 'Address', 'Age']
['ID', 'Age', 'NAME', 'Address']

Fusión de tramas de datos

Método 1: Usando union()

Esto fusionará los marcos de datos en función de la posición.

Sintaxis: 

dataframe1.union(dataframe2)

Ejemplo:

En este ejemplo, vamos a fusionar los dos marcos de datos usando el método union() después de agregar las columnas requeridas a ambos marcos de datos. Finalmente, estamos mostrando el marco de datos que se fusionó.

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform union
dataframe1.union(dataframe2).show()

Producción:

Método 2: Usar unionByName()

Esto fusionará los dos marcos de datos según el nombre de la columna.

Sintaxis:

dataframe1.unionByName(dataframe2)

Ejemplo:

En este ejemplo, vamos a fusionar los dos marcos de datos usando el método unionByName() después de agregar las columnas requeridas a ambos marcos de datos. Finalmente, estamos mostrando el marco de datos que se fusionó.

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing 
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing 
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionByName
dataframe1.unionByName(dataframe2).show()

Producción:

Método 3: Usar unionAll()

Sintaxis

dataframe1.unionAll(dataframe2)

Ejemplo:

En este ejemplo, vamos a fusionar los dos marcos de datos usando el método unionAll() después de agregar las columnas requeridas a ambos marcos de datos. Finalmente, estamos mostrando el marco de datos que se fusionó.

Python3

# importing module
import pyspark
  
# import lit function
from pyspark.sql.functions import lit
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
  
# list  of employee data
data = [["1", "sravan", "kakumanu"],
        ["2", "ojaswi", "hyd"],
        ["3", "rohith", "delhi"],
        ["4", "sridevi", "kakumanu"],
        ["5", "bobby", "guntur"]]
  
# specify column names
columns = ['ID', 'NAME', 'Address']
  
# creating a dataframe from the lists of data
dataframe1 = spark.createDataFrame(data, columns)
  
# list  of employee data
data = [["1", 23],
        ["2", 21],
        ["3", 32],
        ]
  
# specify column names
columns = ['ID', 'Age']
  
# creating a dataframe from the lists of data
dataframe2 = spark.createDataFrame(data, columns)
  
# add columns in dataframe1 that are missing
# from dataframe2
for column in [column for column in dataframe2.columns
               if column not in dataframe1.columns]:
    dataframe1 = dataframe1.withColumn(column, lit(None))
  
# add columns in dataframe2 that are missing
# from dataframe1
for column in [column for column in dataframe1.columns
               if column not in dataframe2.columns]:
    dataframe2 = dataframe2.withColumn(column, lit(None))
  
# perform unionAll
dataframe1.unionAll(dataframe2).show()

Producción:

Publicación traducida automáticamente

Artículo escrito por sravankumar8128 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *