El marco MapReduce de Hadoop brinda la posibilidad de almacenar en caché archivos pequeños a moderados de solo lectura, como archivos de texto, archivos zip, archivos jar, etc. y transmitirlos a todos los Nodes de datos (Nodes de trabajo) donde se ejecuta el trabajo de MapReduce. Cada Datanode obtiene una copia del archivo (copia local) que se envía a través de caché distribuida . Cuando finaliza el trabajo, estos archivos se eliminan de los DataNodes.
¿Por qué almacenar en caché un archivo?
Hay algunos archivos que son necesarios para los trabajos de MapReduce, por lo que en lugar de leer cada vez desde HDFS (aumenta el tiempo de búsqueda, por lo tanto, la latencia) digamos 100 veces (si se están ejecutando 100 Mappers), simplemente enviamos la copia del archivo a todos los Datanode una vez .
Veamos un ejemplo donde contamos las palabras de lyrics.txt excepto las palabras presentes en stopWords.txt . Puede encontrar estos archivos aquí .
requisitos previos:
1. Copie ambos archivos del sistema de archivos local a HDFS.
bin/hdfs dfs -put ../Desktop/lyrics.txt /geeksInput // this file will be cached bin/hdfs dfs -put ../Desktop/stopWords.txt /cached_Geeks
2. Obtenga la dirección del servidor de NameNode. Dado que se debe acceder al archivo a través de URI (Identificador uniforme de recursos), necesitamos esta dirección. Se puede encontrar en core-site.xml
Hadoop_Home_dir/etc/hadoop/core-site.xml
En mi PC es hdfs://localhost:9000 puede variar en su PC.
Código del mapeador:
package word_count_DC; import java.io.*; import java.util.*; import java.net.URI; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class Cached_Word_Count extends Mapper<LongWritable, Text, Text, LongWritable> { ArrayList<String> stopWords = null; public void setup(Context context) throws IOException, InterruptedException { stopWords = new ArrayList<String>(); URI[] cacheFiles = context.getCacheFiles(); if (cacheFiles != null && cacheFiles.length > 0) { try { String line = ""; // Create a FileSystem object and pass the // configuration object in it. The FileSystem // is an abstract base class for a fairly generic // filesystem. All user code that may potentially // use the Hadoop Distributed File System should // be written to use a FileSystem object. FileSystem fs = FileSystem.get(context.getConfiguration()); Path getFilePath = new Path(cacheFiles[0].toString()); // We open the file using FileSystem object, // convert the input byte stream to character // streams using InputStreamReader and wrap it // in BufferedReader to make it more efficient BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(getFilePath))); while ((line = reader.readLine()) != null) { String[] words = line.split(" "); for (int i = 0; i < words.length; i++) { // add the words to ArrayList stopWords.add(words[i]); } } } catch (Exception e) { System.out.println("Unable to read the File"); System.exit(1); } } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String words[] = value.toString().split(" "); for (int i = 0; i < words.length; i++) { // removing all special symbols // and converting it to lowerCase String temp = words[i].replaceAll("[?, '()]", "").toLowerCase(); // if not present in ArrayList we write if (!stopWords.contains(temp)) { context.write(new Text(temp), new LongWritable(1)); } } } }
Código reductor:
package word_count_DC; import java.io.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; public class Cached_Reducer extends Reducer<Text, LongWritable, Text, LongWritable> { public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } }
Código del conductor:
package word_count_DC; import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Driver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Error: Give only two paths for <input> <output>"); System.exit(1); } Job job = Job.getInstance(conf, "Distributed Cache"); job.setJarByClass(Driver.class); job.setMapperClass(Cached_Word_Count.class); job.setReducerClass(Cached_Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); try { // the complete URI(Uniform Resource // Identifier) file path in Hdfs job.addCacheFile(new URI("hdfs://localhost:9000/cached_Geeks/stopWords.txt")); } catch (Exception e) { System.out.println("File Not Added"); System.exit(1); } FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // throws ClassNotFoundException, so handle it System.exit(job.waitForCompletion(true) ? 0 : 1); } }
¿Cómo ejecutar el código?
- Exporte el proyecto como un archivo jar y cópielo en su escritorio de Ubuntu como DistributedExample.jar
- Inicie sus servicios de Hadoop. Vaya dentro de hadoop_home_dir y en tipo de terminal
sbin/start-all.sh
- Ejecute el archivo jar
bin/yarn jar jar_file_path packageName.Driver_Class_Name inputFilePath outputFilePath
bin/jarn jar ../Desktop/distributedExample.jar word_count_DC.Driver /geeksInput /geeksOutput
Producción:
// will print the words starting with t bin/hdfs dfs -cat /geeksOutput/part* | grep ^t
En la salida, podemos observar que no hay palabras the o to que quisiéramos ignorar.