En este artículo, aprenderemos cómo fusionar varios marcos de datos por filas en PySpark. Fuera de las uniones de enstringmiento, esta es la única forma de hacerlo para DataFrames. El módulo utilizado es pyspark:
Spark (motor de procesamiento de Big-Data de código abierto de Apache) es un sistema informático de clúster. Es más rápido en comparación con otros sistemas informáticos de clúster (como Hadoop). Proporciona API de alto nivel en Python, Scala y Java. Los trabajos paralelos son fáciles de escribir en Spark. Cubriremos PySpark (Python + Apache Spark) porque esto hará que la curva de aprendizaje sea más plana. Para instalar Spark en un sistema Linux, siga este . Para ejecutar Spark en un sistema de varios clústeres, siga este .
Para hacer nuestra tarea, estamos definiendo una función llamada recursivamente para todos los marcos de datos de entrada y uniéndolos uno por uno. Para la unión, usamos el módulo pyspark:
- Dataframe union() : el método union() del DataFrame se emplea para mezclar dos DataFrame de una estructura/esquema equivalente. Si los esquemas no son equivalentes, devuelve un error.
- DataFrame unionAll() : unionAll() está en desuso desde la versión Spark «2.0.0» y se reemplazó con union().
Nota: En otros SQL, Union elimina los duplicados pero UnionAll combina dos conjuntos de datos que incluyen registros duplicados. Pero, en chispa, ambos se comportan de manera equivalente y usan la función duplicada de DataFrame para deshacerse de las filas duplicadas.
En la última llamada, devuelve el marco de datos resultante requerido. El siguiente código representa la lógica detrás de nuestra solución al problema dado.
Python3
from functools import reduce # For Python 3.x from pyspark.sql import DataFrame def unionAll(*dfs): return reduce(DataFrame.unionAll, dfs) unionAll(td2, td3, td4, td5, td6, td7, td8, td9, td10)
Lo que sucede es que toma todos los objetos que entregó como parámetros y los reduce al uso de unionAll (este límite es de Python, ya no es el mínimo de Spark aunque funcionan de manera similar) que tarde o temprano lo reduce a un DataFrame.
Si en lugar de DataFrames son RDD ordinarios, puede omitir una lista de ellos en la función de unión de su SparkContext
Ejemplos:
A veces, cuando los marcos de datos para combinar no tienen el mismo orden de columnas, es mejor df2.select (df1.columns) para garantizar que ambos df tengan el mismo orden de columnas antes de la unión.
Python3
import functools def unionAll(dfs): return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
La función reduce(fun,seq) se usa para aplicar una función particular pasada en su argumento a todos los elementos de la lista mencionados en la secuencia pasada. Esta función se define en el módulo functools .
Ahora, comprendamos todo el proceso con la ayuda de algunos ejemplos.
Ejemplo 1:
En este ejemplo, creamos marcos de datos con las columnas ‘a’ y ‘b’ de algunos valores aleatorios y pasamos estos tres marcos de datos a nuestro método unionAll() creado anteriormente y obtenemos el marco de datos resultante como salida y mostramos el resultado.
Python3
# import modules from pyspark.sql import SparkSession import functools # explicit function def unionAll(dfs): return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs) spark = SparkSession.builder.getOrCreate() df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b']) # different column order. df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a']) df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a']) unioned_df = unionAll([df1, df2, df3]) unioned_df.show()
Producción:
Ejemplo 2:
En este ejemplo, creamos marcos de datos con columnas ‘a’ y ‘b’ de algunos valores aleatorios y pasamos estos tres marcos de datos a nuestro método unionAll() recién creado en el que no nos enfocamos en los nombres de las columnas. Simplemente estamos uniendo el marco de datos de entrada al siguiente marco de datos y obtenemos el marco de datos resultante como salida y mostramos el resultado.
Python3
# import modules from functools import reduce from pyspark.sql import DataFrame from pyspark.sql import SparkSession # explicit functions def unionAll(*dfs): return reduce(DataFrame.unionAll, dfs) spark = SparkSession.builder.getOrCreate() df1 = spark.createDataFrame([[1, 1], [2, 2]], ['a', 'b']) # different column order. df2 = spark.createDataFrame([[3, 333], [4, 444]], ['b', 'a']) df3 = spark.createDataFrame([[555, 5], [666, 6]], ['b', 'a']) unionAll(*[df1, df2, df3]).show()
Producción:
Importante:
- Observe cómo se mezclan aquí los valores de las columnas a, b; eso se debe a que, al realizar una unión, el orden de las columnas no coincide.
- dado que ambas columnas son de tipo string, no obtenemos ningún error.
- Para realizar una orden de unión válida, debe coincidir en todos los DataFrames.
otra opción es usar UnionByName
Publicación traducida automáticamente
Artículo escrito por shuvamkumar2015 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA