Convierta la lista de filas de PySpark en Pandas DataFrame

En este artículo, convertiremos una lista de filas de PySpark en un marco de datos de Pandas. Un objeto de fila se define como una sola fila en un PySpark DataFrame. Por lo tanto, un marco de datos se puede representar fácilmente como una lista de objetos de fila de Python.

Método 1: use el método createDataFrame() y use el método toPandas()

Esta es la sintaxis del método createDataFrame() : 

Sintaxis : sesión_actual.createDataFrame(datos, esquema=Ninguno, relación de muestreo=Ninguno, verificarEsquema=Verdadero)

Parámetros

  • datos : un conjunto de datos distribuido resistente o datos en forma de tipos de datos MySQL/SQL
  • esquema : string o lista de nombres de columnas para el DataFrame.
  • sampleRatio -> float: una proporción de muestra de las filas
  • verificarSchema -> bool: verifica si los tipos de datos de las filas son los especificados en el esquema

Devuelve : objeto PySpark DataFrame.

Ejemplo :

 En este ejemplo, pasaremos la lista de filas como datos y crearemos un PySpark DataFrame. Luego usaremos el método toPandas() para obtener un Pandas DataFrame.

Python

# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# creating PySpark DataFrame using createDataFrame()
df = row_pandas_session.createDataFrame(row_object_list)
 
# Printing the Spark DataFrame
df.show()
 
# Conversion to Pandas DataFrame
pandas_df = df.toPandas()
 
# Final Result
print(pandas_df)

Salida

Método 2: Usar paralelizar()

Vamos a usar paralelizar() para crear un RDD. Paralelizar significa copiar los elementos presentes en una colección predefinida a un conjunto de datos distribuido en el que podemos operar en paralelo. Aquí está la sintaxis de parallelize()

Sintaxis : sc.parallelize(data,numSlices)

sc : objeto de contexto de chispa

Parámetros

  • datos : datos para los que se va a realizar el RDD.
  • numSlices : número de particiones que es necesario realizar. Este es un parámetro opcional.

Ejemplo :

 En este ejemplo, usaremos createDataFrame() para crear un PySpark DataFrame y luego usaremos toPandas() para obtener un Pandas DataFrame.

Python

# Importing PySpark and importantly
# Row from pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Creating an RDD
rdd = row_pandas_session.sparkContext.parallelize(row_object_list)
 
# DataFrame created using RDD
df = row_pandas_session.createDataFrame(rdd)
 
# Checking the DataFrame
df.show()
 
# Conversion of DataFrame
df2 = df.toPandas()
 
# Final DataFrame needed
print(df2)

Salida

Método 3: iteración a través de la lista de filas

En este método, recorreremos la lista de filas y convertiremos cada objeto de fila en un marco de datos usando createDataFrame(). Luego agregaremos() este DataFrame a un DataFrame final acumulativo que será nuestra respuesta final. Los detalles de append() se dan a continuación: 

Sintaxis : df.append(otro, ignore_index=Falso, verificar_integridad=Falso, ordenar=Ninguno)

df : marco de datos de pandas

Parámetros :

  • otros : Pandas DataFrame, Numpy Array, Numpy Series, etc.
  • ignore_index : Comprueba si las etiquetas de índice se van a utilizar o no.
  • verificar_integridad : si es verdadero, genera ValueError al crear un índice con duplicados.
  • sort : ordena las columnas si las columnas de df y otras no están alineadas.

Devoluciones : un nuevo DataFrame adjunto

Ejemplo :

 En este ejemplo, usaremos createDataFrame() para crear un PySpark DataFrame y luego usaremos append() para obtener un Pandas DataFrame.

Python

# Importing PySpark
# Importing Pandas for append()
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import Row
 
# PySpark Session
row_pandas_session = SparkSession.builder.appName(
    'row_pandas_session'
).getOrCreate()
 
# List of Sample Row objects
row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10),
                   Row(Topic='Arrays', Difficulty=5),
                   Row(Topic='Sorting', Difficulty=6),
                   Row(Topic='Binary Search', Difficulty=7)]
 
# Our final DataFrame initialized
mega_df = pandas.DataFrame()
 
# Traversing through the list
for i in range(len(row_object_list)):
   
    # Creating a Spark DataFrame of a single row
    small_df = row_pandas_session.createDataFrame([row_object_list[i]])
 
    # appending the Pandas version of small_df
    # to mega_df
    mega_df = mega_df.append(small_df.toPandas(),
                             ignore_index=True)
 
# Printing our desired DataFrame
print(mega_df)

Producción : 

Publicación traducida automáticamente

Artículo escrito por pranavhfs1 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *