MapReduce – 分区器

MapReduce – 分区器


分区器的工作方式类似于处理输入数据集的条件。分区阶段发生在 Map 阶段之后,Reduce 阶段之前。

分区器的数量等于减速器的数量。这意味着 partitioner 将根据 reducer 的数量划分数据。因此,从单个分区器传递的数据由单个 Reducer 处理。

分区器

分区器对中间映射输出的键值对进行分区。它使用用户定义的条件对数据进行分区,其工作方式类似于散列函数。分区总数与作业的 Reducer 任务数相同。让我们举个例子来理解分区器是如何工作的。

MapReduce 分区器实现

为方便起见,假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为我们的输入数据集来演示分区器的工作原理。

Id 名称 年龄 性别 薪水
1201 戈帕尔 45 男性 50,000
1202 马尼沙 40 女性 50,000
1203 哈利勒 34 男性 30,000
1204 普拉桑 30 男性 30,000
1205 基兰 20 男性 40,000
1206 拉克西米 25 女性 35,000
1207 巴维亚 20 女性 15,000
1208 雷什马 19 女性 15,000
1209 克兰蒂 22 男性 22,000
1210 萨蒂什 24 男性 25,000
1211 克里希纳 25 男性 25,000
1212 阿尔沙德 28 男性 20,000
1213 拉瓦尼亚 18 女性 8,000

我们必须编写一个应用程序来处理输入数据集,以查找不同年龄组(例如,20 岁以下、21 至 30 岁之间、30 岁以上)中按性别划分的最高薪水员工。

输入数据

上面的数据作为input.txt保存在“/home/hadoop/hadoopPartitioner”目录下,作为输入给出。

1201 戈帕尔 45 男性 50000
1202 马尼沙 40 女性 51000
1203 哈利勒 34 男性 30000
1204 普拉桑 30 男性 31000
1205 基兰 20 男性 40000
1206 拉克西米 25 女性 35000
1207 巴维亚 20 女性 15000
1208 雷什马 19 女性 14000
1209 克兰蒂 22 男性 22000
1210 萨蒂什 24 男性 25000
1211 克里希纳 25 男性 26000
1212 阿尔沙德 28 男性 20000
1213 拉瓦尼亚 18 女性 8000

基于给定的输入,以下是程序的算法解释。

地图任务

map 任务接受键值对作为输入,而我们在文本文件中有文本数据。此地图任务的输入如下 –

输入– 键将是一个模式,例如“任何特殊键&加文件名&加行号”(例如:key = @input1),值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t 男 \t 50000)。

方法– 此地图任务的操作如下 –

  • 读取(记录数据),它作为字符串中参数列表的输入值。

  • 使用 split 函数,将性别分开并存储在字符串变量中。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 将性别信息和记录数据作为输出键值对从映射任务发送到分区任务

context.write(new Text(gender), new Text(value));
  • 对文本文件中的所有记录重复上述所有步骤。

输出– 您将获得性别数据和记录数据值作为键值对。

分区任务

分区任务接受来自地图任务的键值对作为其输入。分区意味着将数据划分为段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三部分。

输入– 键值对集合中的全部数据。

键 = 记录中的性别字段值。

value = 该性别的整个记录​​数据值。

方法– 分区逻辑的过程如下。

  • 从输入的键值对中读取年龄字段值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 在以下条件下检查年龄值。

    • 年龄小于或等于 20
    • 年龄大于 20 且小于或等于 30。
    • 年龄大于 30。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

输出– 键值对的整个数据被分为三个键值对集合。Reducer 在每个集合上单独工作。

减少任务

partitioner 任务的数量等于reducer 任务的数量。这里我们有三个分区任务,因此我们有三个 Reducer 任务要执行。

Input – Reducer 将使用不同的键值对集合执行 3 次。

key = 记录中的性别字段值。

value = 该性别的全部记录数据。

方法– 以下逻辑将应用于每个集合。

  • 读取每条记录的 Salary 字段值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • 使用 max 变量检查工资。如果 str[4] 是最大工资,则将 str[4] 分配给 max,否则跳过该步骤。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 对每个密钥集合重复步骤 1 和 2(男性和女性是密钥集合)。执行完这三个步骤后,您会从 Male 键集合中找到一个最高工资,从女键集合中找到一个最高工资。

context.write(new Text(key), new IntWritable(max));

输出– 最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含来自男性集合的最高工资和来自每个年龄组的女性集合的最高工资。

执行Map、Partitioner、Reduce任务后,三个键值对数据集合作为输出存储在三个不同的文件中。

所有这三个任务都被视为 MapReduce 作业。这些工作的以下要求和规格应在配置中指定 –

  • 职位名称
  • 键值的输入输出格式
  • Map、Reduce 和 Partitioner 任务的各个类
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

示例程序

以下程序显示了如何在 MapReduce 程序中为给定条件实现分区器。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

将上述代码保存为“/home/hadoop/hadoopPartitioner”中的PartitionerExample.java下面给出程序的编译和执行。

编译和执行

假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

步骤 1 – 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载 jar

让我们假设下载的文件夹是“/home/hadoop/hadoopPartitioner”

步骤 2 – 以下命令用于编译程序PartitionerExample.java并为程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

步骤 3 – 使用以下命令在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 – 使用以下命令将名为input.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

步骤 5 – 使用以下命令验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 6 – 使用以下命令通过从输入目录中获取输入文件来运行最高工资应用程序。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、map 任务和 Reducer 任务。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

步骤 7 – 使用以下命令验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个减速器。

步骤 8 – 使用以下命令查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Part-00000 中的输出

Female   15000
Male     40000

使用以下命令查看Part-00001文件中的输出

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Part-00001 中的输出

Female   35000
Male    31000

使用以下命令查看Part-00002文件中的输出

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002中的输出

Female  51000
Male   50000

觉得文章有用?

点个广告表达一下你的爱意吧 !😁