Caché distribuida en Hadoop MapReduce

El marco MapReduce de Hadoop brinda la posibilidad de almacenar en caché archivos pequeños a moderados de solo lectura, como archivos de texto, archivos zip, archivos jar, etc. y transmitirlos a todos los Nodes de datos (Nodes de trabajo) donde se ejecuta el trabajo de MapReduce. Cada Datanode obtiene una copia del archivo (copia local) que se envía a través de caché distribuida . Cuando finaliza el trabajo, estos archivos se eliminan de los DataNodes.

Distributed-Cache

¿Por qué almacenar en caché un archivo?

Hay algunos archivos que son necesarios para los trabajos de MapReduce, por lo que en lugar de leer cada vez desde HDFS (aumenta el tiempo de búsqueda, por lo tanto, la latencia) digamos 100 veces (si se están ejecutando 100 Mappers), simplemente enviamos la copia del archivo a todos los Datanode una vez .

Veamos un ejemplo donde contamos las palabras de lyrics.txt excepto las palabras presentes en stopWords.txt . Puede encontrar estos archivos aquí .

requisitos previos:

1. Copie ambos archivos del sistema de archivos local a HDFS.

bin/hdfs dfs -put ../Desktop/lyrics.txt  /geeksInput

// this file will be cached
bin/hdfs dfs -put ../Desktop/stopWords.txt /cached_Geeks

2. Obtenga la dirección del servidor de NameNode. Dado que se debe acceder al archivo a través de URI (Identificador uniforme de recursos), necesitamos esta dirección. Se puede encontrar en core-site.xml

Hadoop_Home_dir/etc/hadoop/core-site.xml

En mi PC es hdfs://localhost:9000 puede variar en su PC.

Código del mapeador:

package word_count_DC;
  
import java.io.*;
import java.util.*;
import java.net.URI;
  
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
  
public class Cached_Word_Count extends Mapper<LongWritable, 
                                Text, Text, LongWritable> {
  
    ArrayList<String> stopWords = null;
  
    public void setup(Context context) throws IOException, 
                                     InterruptedException
    {
        stopWords = new ArrayList<String>();
  
        URI[] cacheFiles = context.getCacheFiles();
  
        if (cacheFiles != null && cacheFiles.length > 0) 
        {
            try {
  
                String line = "";
  
               // Create a FileSystem object and pass the 
               // configuration object in it. The FileSystem
               // is an abstract base class for a fairly generic
               // filesystem. All user code that may potentially 
               // use the Hadoop Distributed File System should
               // be written to use a FileSystem object.
                FileSystem fs = FileSystem.get(context.getConfiguration());
                Path getFilePath = new Path(cacheFiles[0].toString());
  
                // We open the file using FileSystem object, 
                // convert the input byte stream to character
                // streams using InputStreamReader and wrap it 
                // in BufferedReader to make it more efficient
                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(getFilePath)));
  
                while ((line = reader.readLine()) != null) 
                {
                    String[] words = line.split(" ");
  
                    for (int i = 0; i < words.length; i++) 
                    {
                        // add the words to ArrayList
                        stopWords.add(words[i]); 
                    }
                }
            }
  
            catch (Exception e)
            {
                System.out.println("Unable to read the File");
                System.exit(1);
            }
        }
    }
  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String words[] = value.toString().split(" ");
  
        for (int i = 0; i < words.length; i++) 
        {
  
            // removing all special symbols 
            // and converting it to lowerCase
            String temp = words[i].replaceAll("[?, '()]", "").toLowerCase();
  
            // if not present in ArrayList we write
            if (!stopWords.contains(temp)) 
            {
                context.write(new Text(temp), new LongWritable(1));
            }
        }
    }
}

Código reductor:

package word_count_DC;
  
import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
  
public class Cached_Reducer extends Reducer<Text, 
              LongWritable, Text, LongWritable> {
  
    public void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws IOException, InterruptedException
    {
        long sum = 0;
  
        for (LongWritable val : values)
        {
            sum += val.get();
        }
  
        context.write(key, new LongWritable(sum));
    }
}

Código del conductor:

package word_count_DC;
  
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
  
public class Driver {
  
    public static void main(String[] args) throws IOException, 
                 InterruptedException, ClassNotFoundException
    {
  
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  
        if (otherArgs.length != 2) 
        {
            System.err.println("Error: Give only two paths for <input> <output>");
            System.exit(1);
        }
  
        Job job = Job.getInstance(conf, "Distributed Cache");
  
        job.setJarByClass(Driver.class);
        job.setMapperClass(Cached_Word_Count.class);
        job.setReducerClass(Cached_Reducer.class);
  
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
  
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
  
        try {
  
            // the complete URI(Uniform Resource 
            // Identifier) file path in Hdfs
            job.addCacheFile(new URI("hdfs://localhost:9000/cached_Geeks/stopWords.txt"));
        }
        catch (Exception e) {
            System.out.println("File Not Added");
            System.exit(1);
        }
  
        FileInputFormat.addInputPath(job, new Path(args[0]));
  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
        // throws ClassNotFoundException, so handle it
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    }
}

¿Cómo ejecutar el código?

  1. Exporte el proyecto como un archivo jar y cópielo en su escritorio de Ubuntu como DistributedExample.jar
  2. Inicie sus servicios de Hadoop. Vaya dentro de hadoop_home_dir y en tipo de terminal
    sbin/start-all.sh
    
  3. Ejecute el archivo jar

    bin/yarn jar jar_file_path packageName.Driver_Class_Name inputFilePath outputFilePath

    bin/jarn jar ../Desktop/distributedExample.jar word_count_DC.Driver /geeksInput /geeksOutput

    Producción:

    // will print the words starting with t
    
    bin/hdfs dfs -cat /geeksOutput/part* | grep ^t
    

    En la salida, podemos observar que no hay palabras the o to que quisiéramos ignorar.

Publicación traducida automáticamente

Artículo escrito por SrjSunny y traducido por Barcelona Geeks. The original can be accessed here. Licence: CCBY-SA

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *