En este artículo, convertiremos una lista de filas de PySpark en un marco de datos de Pandas. Un objeto de fila se define como una sola fila en un PySpark DataFrame. Por lo tanto, un marco de datos se puede representar fácilmente como una lista de objetos de fila de Python.
Método 1: use el método createDataFrame() y use el método toPandas()
Esta es la sintaxis del método createDataFrame() :
Sintaxis : sesión_actual.createDataFrame(datos, esquema=Ninguno, relación de muestreo=Ninguno, verificarEsquema=Verdadero)
Parámetros :
- datos : un conjunto de datos distribuido resistente o datos en forma de tipos de datos MySQL/SQL
- esquema : string o lista de nombres de columnas para el DataFrame.
- sampleRatio -> float: una proporción de muestra de las filas
- verificarSchema -> bool: verifica si los tipos de datos de las filas son los especificados en el esquema
Devuelve : objeto PySpark DataFrame.
Ejemplo :
En este ejemplo, pasaremos la lista de filas como datos y crearemos un PySpark DataFrame. Luego usaremos el método toPandas() para obtener un Pandas DataFrame.
Python
# Importing PySpark and importantly # Row from pyspark.sql import pyspark from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10), Row(Topic='Arrays', Difficulty=5), Row(Topic='Sorting', Difficulty=6), Row(Topic='Binary Search', Difficulty=7)] # creating PySpark DataFrame using createDataFrame() df = row_pandas_session.createDataFrame(row_object_list) # Printing the Spark DataFrame df.show() # Conversion to Pandas DataFrame pandas_df = df.toPandas() # Final Result print(pandas_df)
Salida :
Método 2: Usar paralelizar()
Vamos a usar paralelizar() para crear un RDD. Paralelizar significa copiar los elementos presentes en una colección predefinida a un conjunto de datos distribuido en el que podemos operar en paralelo. Aquí está la sintaxis de parallelize() :
Sintaxis : sc.parallelize(data,numSlices)
sc : objeto de contexto de chispa
Parámetros :
- datos : datos para los que se va a realizar el RDD.
- numSlices : número de particiones que es necesario realizar. Este es un parámetro opcional.
Ejemplo :
En este ejemplo, usaremos createDataFrame() para crear un PySpark DataFrame y luego usaremos toPandas() para obtener un Pandas DataFrame.
Python
# Importing PySpark and importantly # Row from pyspark.sql import pyspark from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10), Row(Topic='Arrays', Difficulty=5), Row(Topic='Sorting', Difficulty=6), Row(Topic='Binary Search', Difficulty=7)] # Creating an RDD rdd = row_pandas_session.sparkContext.parallelize(row_object_list) # DataFrame created using RDD df = row_pandas_session.createDataFrame(rdd) # Checking the DataFrame df.show() # Conversion of DataFrame df2 = df.toPandas() # Final DataFrame needed print(df2)
Salida :
Método 3: iteración a través de la lista de filas
En este método, recorreremos la lista de filas y convertiremos cada objeto de fila en un marco de datos usando createDataFrame(). Luego agregaremos() este DataFrame a un DataFrame final acumulativo que será nuestra respuesta final. Los detalles de append() se dan a continuación:
Sintaxis : df.append(otro, ignore_index=Falso, verificar_integridad=Falso, ordenar=Ninguno)
df : marco de datos de pandas
Parámetros :
- otros : Pandas DataFrame, Numpy Array, Numpy Series, etc.
- ignore_index : Comprueba si las etiquetas de índice se van a utilizar o no.
- verificar_integridad : si es verdadero, genera ValueError al crear un índice con duplicados.
- sort : ordena las columnas si las columnas de df y otras no están alineadas.
Devoluciones : un nuevo DataFrame adjunto
Ejemplo :
En este ejemplo, usaremos createDataFrame() para crear un PySpark DataFrame y luego usaremos append() para obtener un Pandas DataFrame.
Python
# Importing PySpark # Importing Pandas for append() import pyspark import pandas from pyspark.sql import SparkSession from pyspark.sql import Row # PySpark Session row_pandas_session = SparkSession.builder.appName( 'row_pandas_session' ).getOrCreate() # List of Sample Row objects row_object_list = [Row(Topic='Dynamic Programming', Difficulty=10), Row(Topic='Arrays', Difficulty=5), Row(Topic='Sorting', Difficulty=6), Row(Topic='Binary Search', Difficulty=7)] # Our final DataFrame initialized mega_df = pandas.DataFrame() # Traversing through the list for i in range(len(row_object_list)): # Creating a Spark DataFrame of a single row small_df = row_pandas_session.createDataFrame([row_object_list[i]]) # appending the Pandas version of small_df # to mega_df mega_df = mega_df.append(small_df.toPandas(), ignore_index=True) # Printing our desired DataFrame print(mega_df)
Producción :
Publicación traducida automáticamente
Artículo escrito por pranavhfs1 y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA