MapReduce – 组合器

MapReduce – 组合器


组合器,也称为半简化器,是一个可选类,它通过接受来自 Map 类的输入然后将输出键值对传递给 Reducer 类来进行操作。

组合器的主要功能是汇总具有相同键的地图输出记录。组合器的输出(键值集合)将通过网络发送到实际的 Reducer 任务作为输入。

合路器

在 Map 类和 Reduce 类之间使用了 Combiner 类,以减少 Map 和 Reduce 之间的数据传输量。通常,map任务的输出量大,传输给reduce任务的数据量大。

下面的 MapReduce 任务图显示了组合器阶段。

合路器

合路器如何工作?

以下是 MapReduce Combiner 如何工作的简要总结 –

  • 组合器没有预定义的接口,它必须实现 Reducer 接口的 reduce() 方法。

  • 组合器对每个映射输出键进行操作。它必须具有与 Reducer 类相同的输出键值类型。

  • 组合器可以从大型数据集中生成摘要信息,因为它替换了原始 Map 输出。

虽然,Combiner 是可选的,但它有助于将数据分成多个组以用于 Reduce 阶段,从而更易于处理。

MapReduce 组合器实现

以下示例提供了有关组合器的理论思想。让我们假设我们有以下名为input.txt的 MapReduce输入文本文件

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

下面讨论了 MapReduce 程序与 Combiner 的重要阶段。

记录阅读器

这是 MapReduce 的第一阶段,记录阅读器从输入文本文件中读取每一行作为文本,并以键值对的形式产生输出。

输入– 来自输入文件的逐行文本。

输出– 形成键值对。以下是预期的键值对集。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

地图阶段

Map 阶段从 Record Reader 获取输入,对其进行处理,并将输出作为另一组键值对产生。

输入– 以下键值对是从记录阅读器中获取的输入。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Map 阶段读取每个键值对,使用 StringTokenizer 从值中划分每个单词,将每个单词视为键并将该单词的计数作为值。以下代码片段显示了 Mapper 类和 map 函数。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

输出– 预期输出如下 –

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

合路相

组合器阶段从 Map 阶段获取每个键值对,对其进行处理,并将输出生成为键值集合对。

Input – 以下键值对是从 Map 阶段获取的输入。

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

组合器阶段读取每个键值对,将常用词组合为键,值组合为集合。通常,Combiner 的代码和操作类似于 Reducer 的代码和操作。以下是 Mapper、Combiner 和 Reducer 类声明的代码片段。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

输出– 预期输出如下 –

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

减速阶段

Reducer 阶段从组合器阶段获取每个键值集合对,对其进行处理,并将输出作为键值对传递。请注意,Combiner 功能与 Reducer 相同。

输入– 以下键值对是从组合器阶段获取的输入。

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Reducer 阶段读取每个键值对。以下是组合器的代码片段。

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

输出– Reducer 阶段的预期输出如下 –

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

唱片作家

这是 MapReduce 的最后一个阶段,其中 Record Writer 写入来自 Reducer 阶段的每个键值对,并将输出作为文本发送。

Input – 来自 Reducer 阶段的每个键值对以及输出格式。

输出– 它为您提供文本格式的键值对。以下是预期的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

示例程序

以下代码块计算程序中的字数。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

将上述程序另存为WordCount.java下面给出程序的编译和执行。

编译和执行

假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

步骤 1 – 使用以下命令创建一个目录来存储已编译的 java 类。

$ mkdir units

步骤 2 – 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载 jar

让我们假设下载的文件夹是 /home/hadoop/。

步骤 3 – 使用以下命令编译WordCount.java程序并为该程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

步骤 4 – 使用以下命令在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 – 使用以下命令将名为input.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

步骤 6 – 使用以下命令验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 7 – 使用以下命令通过从输入目录中获取输入文件来运行字数统计应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、Map 任务和 Reducer 任务。

步骤 8 – 使用以下命令验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤 9 – 使用以下命令查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程序生成的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

觉得文章有用?

点个广告表达一下你的爱意吧 !😁