PySpark: divide el marco de datos en el mismo número de filas

Cuando hay un gran conjunto de datos, es mejor dividirlos en partes iguales y luego procesar cada marco de datos individualmente. Esto es posible si la operación en el marco de datos es independiente de las filas. Cada fragmento o marco de datos igualmente dividido puede procesarse en paralelo haciendo un uso más eficiente de los recursos. En este artículo, discutiremos cómo dividir marcos de datos de PySpark en un número igual de filas.

Creando Dataframe para demostración:

Python

# importing module
import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
  
# Column names for the dataframe
columns = ["Brand", "Product"]
  
# Row data for the dataframe
data = [
    ("HP", "Laptop"),
    ("Lenovo", "Mouse"),
    ("Dell", "Keyboard"),
    ("Samsung", "Monitor"),
    ("MSI", "Graphics Card"),
    ("Asus", "Motherboard"),
    ("Gigabyte", "Motherboard"),
    ("Zebronics", "Cabinet"),
    ("Adata", "RAM"),
    ("Transcend", "SSD"),
    ("Kingston", "HDD"),
    ("Toshiba", "DVD Writer")
]
  
# Create the dataframe using the above values
prod_df = spark.createDataFrame(data=data,
                                schema=columns)
  
# View the dataframe
prod_df.show()

Producción:

En el bloque de código anterior, hemos definido la estructura del esquema para el marco de datos y proporcionamos datos de muestra. Nuestro marco de datos consta de 2 columnas de tipo string con 12 registros. 

Ejemplo 1: marco de datos dividido usando ‘DataFrame.limit()’

Haremos uso del método split() para crear ‘n’ marcos de datos iguales.

Sintaxis: DataFrame.limit(num)

Donde, limita el recuento de resultados al número especificado.

Código:

Python

# Define the number of splits you want
n_splits = 4
  
# Calculate count of each dataframe rows
each_len = prod_df.count() // n_splits
  
# Create a copy of original dataframe
copy_df = prod_df
  
# Iterate for each dataframe
i = 0
while i < n_splits:
  
    # Get the top `each_len` number of rows
    temp_df = copy_df.limit(each_len)
  
    # Truncate the `copy_df` to remove
    # the contents fetched for `temp_df`
    copy_df = copy_df.subtract(temp_df)
  
    # View the dataframe
    temp_df.show(truncate=False)
  
    # Increment the split number
    i += 1

Producción:

Ejemplo 2: dividir el marco de datos, realizar la operación y concatenar el resultado

Ahora dividiremos el marco de datos en ‘n’ partes iguales y realizaremos una operación de concatenación en cada una de estas partes individualmente y luego concatenaremos el resultado a un `result_df`. Esto es para demostrar cómo podemos usar la extensión del código anterior para realizar una operación de marco de datos por separado en cada marco de datos y luego agregar estos marcos de datos individuales para producir un nuevo marco de datos que tenga una longitud igual al marco de datos original.

Python

# Define the number of splits you want
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import concat, col, lit
  
n_splits = 4
  
# Calculate count of each dataframe rows
each_len = prod_df.count() // n_splits
  
# Create a copy of original dataframe
copy_df = prod_df
  
# Function to modify columns of each individual split
  
  
def modify_dataframe(data):
    return data.select(
        concat(col("Brand"), lit(" - "),
               col("Product"))
    )
  
  
# Create an empty dataframe to
# store concatenated results
schema = StructType([
    StructField('Brand - Product', StringType(), True)
])
result_df = spark.createDataFrame(data=[],
                                  schema=schema)
  
# Iterate for each dataframe
i = 0
while i < n_splits:
  
    # Get the top `each_len` number of rows
    temp_df = copy_df.limit(each_len)
  
    # Truncate the `copy_df` to remove
    # the contents fetched for `temp_df`
    copy_df = copy_df.subtract(temp_df)
  
    # Perform operation on the newly created dataframe
    temp_df_mod = modify_dataframe(data=temp_df)
    temp_df_mod.show(truncate=False)
  
    # Concat the dataframe
    result_df = result_df.union(temp_df_mod)
  
    # Increment the split number
    i += 1
  
result_df.show(truncate=False)

Producción:

Publicación traducida automáticamente

Artículo escrito por apathak092 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *