¿Cómo escribir Spark UDF (funciones definidas por el usuario) en Python?

En este artículo, hablaremos sobre UDF (funciones definidas por el usuario) y cómo escribirlas en Python Spark. UDF, básicamente significa Funciones definidas por el usuario. La UDF nos permitirá aplicar las funciones directamente en los dataframes y bases de datos SQL en python, sin que se registren individualmente. También puede ayudarnos a crear nuevas columnas en nuestro marco de datos, aplicando una función a través de UDF a la (s) columna (s) del marco de datos, por lo tanto, ampliará nuestra funcionalidad de marco de datos. Se puede crear utilizando el método udf().

udf(): este método utilizará la función lambda para recorrer los datos, y su argumento aceptará la función lambda, y el valor lambda se convertirá en un argumento para la función que queremos hacer como un UDF.

Marco de datos Pyspark de muestra

Vamos a crear un marco de datos, y el tema de este marco de datos será el nombre del estudiante, junto con sus puntajes brutos en una prueba de 100.

Python3

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import udf
  
spark = SparkSession.builder.appName('UDF PRACTICE').getOrCreate()
  
cms = ["Name","RawScore"]
data =  [("Jack", "79"),
        ("Mira", "80"),
        ("Carter", "90")]
                     
df = spark.createDataFrame(data=data,schema=cms)
  
df.show()

Producción:

Creación de una función de muestra

Ahora, tenemos que hacer una función. Entonces, para entender, haremos una función simple que dividirá las columnas y verificará que si el objeto que atraviesa en esa columna (es igual a ‘J’ (Capital J) o ‘C’ (Capital C) o ‘M ‘(M mayúscula), por lo que estará convirtiendo la segunda letra de esa palabra, con su versión mayúscula.La implementación de este código es:

Python3

def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
              
    return result

Hacer UDF desde la función de muestra

Ahora, lo convertiremos a nuestra función UDF, que, a su vez, reducirá nuestra carga de trabajo en los datos. Para esto, estamos usando lambda dentro de UDF.

Python3

NumberUDF = udf(lambda m: Converter(m))

Usando UDF sobre Dataframe

Lo siguiente que usaremos aquí es withcolumn(), recuerde que withcolumn() devolverá un marco de datos completo. Por lo tanto, solo usaremos nuestro marco de datos df existente, y el valor devuelto se almacenará solo en df (básicamente lo agregaremos).

Python3

df.withColumn("Special Names", NumberUDF("Name")).show()

Producción:

Nota : También podemos hacer todo esto en un solo paso.

UDF con anotaciones

Ahora, una forma corta e inteligente de hacer esto es usar «ANOTACIONES» (o decoradores) . Esto creará nuestra función UDF en menos pasos. Para esto, todo lo que tenemos que hacer es usar el signo @ ( decorador) delante de la función udf y dar el tipo de retorno de la función en su parte de argumento, es decir, asignar el tipo de retorno como Intergertype(), StringType(), etc.

Python3

@udf(returnType=StringType())
def Converter(str):
    result = ""
    a = str.split(" ")
      
    for q in a:
        if q == 'J' or 'C' or 'M':
            result += q[1:2].upper()
        else:
            result += q
    return result
  
  
df.withColumn("Special Names", Converter("Name")) \
    .show()

Producción:

Ejemplo:

Ahora, supongamos que hay un esquema de calificación en la escuela que calibra las calificaciones de los estudiantes en términos de su raíz cuadrada agregada 3 (es decir, estarán calibrando las calificaciones de 15). Entonces, definiremos una función UDF y esta vez especificaremos el tipo de devolución. es decir, tipo de datos flotante. Entonces, la declaración de esta función será:

Python3

def SQRT(x):
    return float(math.sqrt(x)+3)

Ahora, definiremos una udf, cuyo tipo de retorno siempre será flotante, es decir, estamos obligando a la función, así como a la UDF, a darnos resultados en términos de números de coma flotante únicamente. La definición de esta función será:

Python3

UDF_marks = udf(lambda m: SQRT(m),FloatType())

El segundo parámetro de udf, FloatType() siempre obligará a la función UDF a devolver el resultado solo en tipo flotante. Ahora, usaremos nuestra función udf, UDF_marks en la columna RawScore en nuestro marco de datos, y produciremos una nueva columna con el nombre de «<lambda>RawScore», y este será un nombre predeterminado de esta columna. El código para esto se verá así:

Python3

df.select("Name","RawScore", UDF_marks("RawScore")).show()

Producción:

Publicación traducida automáticamente

Artículo escrito por gs2000april y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *