Hadoop Streaming usando Python – Problema de conteo de palabras

Hadoop Streaming es una característica que viene con Hadoop y permite a los usuarios o desarrolladores usar varios lenguajes diferentes para escribir programas de MapReduce como Python, C++, Ruby, etc. Es compatible con todos los lenguajes que pueden leer desde la entrada estándar y escribir en la salida estándar. Implementaremos Python con Hadoop Streaming y observaremos cómo funciona. Implementaremos el problema de conteo de palabras en python para comprender Hadoop Streaming. Crearemos mapper.py y reducer.py para realizar tareas de mapeo y reducción.

Vamos a crear un archivo que contenga varias palabras que podamos contar. 

Paso 1: cree un archivo con el nombre word_count_data.txt  y agréguele algunos datos.

cd Documents/                                  # to change the directory to /Documents
touch word_count_data.txt               # touch is used to create an empty file    
nano word_count_data.txt               # nano is a command line editor to edit the file    
cat word_count_data.txt                        # cat is used to see the content of the file

Create a file with the name word_count_data.txt

Paso 2: Cree un archivo mapper.py que implemente la lógica del mapeador. Leerá los datos de STDIN y dividirá las líneas en palabras, y generará una salida de cada palabra con su cuenta individual. 

cd Documents/                                   # to change the directory to /Documents
touch mapper.py                    # touch is used to create an empty file    
cat mapper.py                    # cat is used to see the content of the file

Copie el siguiente código en el archivo mapper.py .

Python3

#!/usr/bin/env python
  
# import sys because we need to read and write data to STDIN and STDOUT
import sys
  
# reading entire line from STDIN (standard input)
for line in sys.stdin:
    # to remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
      
    # we are looping over the words array and printing the word
    # with the count of 1 to the STDOUT
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        print '%s\t%s' % (word, 1)

Aquí en el programa anterior #! se conoce como shebang y se usa para interpretar el guión. El archivo se ejecutará usando el comando que estamos especificando.

mapper.py

Probemos nuestro mapper.py localmente para ver si funciona bien o no.

Sintaxis:

cat <text_data_file> | python <mapper_code_python_file>

Comando (en mi caso)

cat word_count_data.txt | python mapper.py

La salida del mapeador se muestra a continuación.

Paso 3: Cree un archivo reducer.py que implemente la lógica de reducción. Leerá la salida de mapper.py desde STDIN (entrada estándar) y agregará la aparición de cada palabra y escribirá la salida final en STDOUT. 

cd Documents/                                   # to change the directory to /Documents
touch reducer.py                     # touch is used to create an empty file 

Python3

#!/usr/bin/env python
  
from operator import itemgetter
import sys
  
current_word = None
current_count = 0
word = None
  
# read the entire line from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # splitting the data on the basis of tab we have provided in mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
  
    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
  
# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Ahora vamos a comprobar nuestro código reductor reducer.py con mapper.py si funciona correctamente o no con la ayuda del siguiente comando.

cat word_count_data.txt | python mapper.py | sort -k1,1 | python reducer.py

Podemos ver que nuestro reductor también funciona bien en nuestro sistema local. 

Paso 4: Ahora comencemos todos nuestros demonios de Hadoop con el siguiente comando.

start-dfs.sh

start-yarn.sh

Ahora cree un directorio word_count_in_python en nuestro HDFS en el directorio raíz que almacenará nuestro archivo word_count_data.txt con el siguiente comando.

hdfs dfs -mkdir /word_count_in_python

Copie word_count_data.txt a esta carpeta en nuestro HDFS con la ayuda del comando copyFromLocal .

La sintaxis para copiar un archivo desde su sistema de archivos local al HDFS se proporciona a continuación:

hdfs dfs -copyFromLocal /path 1 /path 2 .... /path n /destination

Comando real (en mi caso)

hdfs dfs -copyFromLocal /home/dikshant/Documents/word_count_data.txt /word_count_in_python

Ahora nuestro archivo de datos se ha enviado a HDFS con éxito. podemos verificar si envía o no usando el siguiente comando o visitando manualmente nuestro HDFS. 

hdfs dfs -ls /       # list down content of the root directory

hdfs dfs -ls /word_count_in_python    # list down content of /word_count_in_python directory

Démosle permiso de ejecución a nuestro mapper.py y reducer.py con la ayuda del siguiente comando.

cd Documents/

chmod 777 mapper.py reducer.py     # changing the permission to read, write, execute for user, group and others

En la imagen de abajo, podemos observar que hemos cambiado el permiso del archivo.

Paso 5: ahora descargue el último archivo jar de transmisión de hadoop desde este enlace . Luego, coloque este archivo jar de transmisión de Hadoop en un lugar desde el que pueda acceder fácilmente. En mi caso, lo coloco en la carpeta /Documentos donde están presentes los archivos mapper.py y reducer.py .

Ahora ejecutemos nuestros archivos python con la ayuda de la utilidad de transmisión de Hadoop como se muestra a continuación.

hadoop jar /home/dikshant/Documents/hadoop-streaming-2.7.3.jar \

> -input /word_count_in_python/word_count_data.txt \

> -output /word_count_in_python/output \

> -mapper /home/dikshant/Documents/mapper.py \

> -reducer /home/dikshant/Documents/reducer.py

En el comando anterior en -output , especificaremos la ubicación en HDFS donde queremos que se almacene nuestra salida. Entonces, verifiquemos nuestra salida en el archivo de salida en la ubicación /word_count_in_python/output/part-00000 en mi caso. Podemos comprobar los resultados visualizando manualmente la ubicación en HDFS o con la ayuda del comando cat como se muestra a continuación.

hdfs dfs -cat /word_count_in_python/output/part-00000

Opciones básicas que podemos usar con Hadoop Streaming

Opción

Descripción

-mapeador El comando que se ejecutará como el mapeador
-reductor El comando que se ejecutará como el reductor
-aporte La ruta de entrada de DFS para el paso de mapa
-producción El directorio de salida DFS para el paso Reducir

Publicación traducida automáticamente

Artículo escrito por dikshantmalidev y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *