MapReduce – 快速指南

MapReduce – 快速指南


MapReduce – 介绍

MapReduce 是一种编程模型,用于编写可以在多个节点上并行处理大数据的应用程序。MapReduce 提供用于分析大量复杂数据的分析功能。

什么是大数据?

大数据是无法使用传统计算技术处理的大型数据集的集合。例如,Facebook 或 Youtube 需要它每天收集和管理的数据量,就属于大数据的范畴。然而,大数据不仅仅是关于规模和数量,它还涉及以下一个或多个方面——速度、多样性、数量和复杂性。

为什么是 MapReduce?

传统的企业系统通常有一个集中式服务器来存储和处理数据。下图描绘了传统企业系统的示意图。传统模型当然不适合处理海量的可扩展数据,标准数据库服务器也无法容纳。此外,集中式系统在同时处理多个文件时会产生太多的瓶颈。

传统企业系统视图

Google 使用一种称为 MapReduce 的算法解决了这个瓶颈问题。MapReduce 将一个任务分成小部分,并将它们分配给多台计算机。之后,将结果集中在一处,并进行整合,形成结果数据集。

集中式系统

MapReduce 是如何工作的?

MapReduce 算法包含两个重要的任务,即 Map 和 Reduce。

  • Map 任务获取一组数据并将其转换为另一组数据,其中单个元素被分解为元组(键值对)。

  • Reduce 任务将 Map 的输出作为输入,并将这些数据元组(键值对)组合成一个较小的元组集。

reduce 任务总是在 map 作业之后执行。

现在让我们仔细看看每个阶段,并尝试了解它们的重要性。

阶段

  • 输入阶段– 这里我们有一个记录阅读器,它翻译输入文件中的每条记录,并将解析的数据以键值对的形式发送到映射器。

  • Map – Map 是一个用户定义的函数,它采用一系列键值对并处理它们中的每一个以生成零个或多个键值对。

  • 中间键– 映射器生成的键值对称为中间键。

  • 组合器– 组合器是一种本地 Reducer,它将来自映射阶段的相似数据分组为可识别的集合。它将来自映射器的中间键作为输入,并应用用户定义的代码在一个映射器的小范围内聚合值。它不是主要 MapReduce 算法的一部分;它是可选的。

  • Shuffle and Sort – Reducer 任务从 Shuffle and Sort 步骤开始。它将分组的键值对下载到运行 Reducer 的本地机器上。各个键值对按键排序到更大的数据列表中。数据列表将等效键组合在一起,以便它们的值可以在 Reducer 任务中轻松迭代。

  • Reducer – Reducer 将分组的键值对数据作为输入,并在每个数据上运行一个 Reducer 函数。在这里,数据可以通过多种方式进行聚合、过滤和组合,并且需要进行广泛的处理。执行结束后,它会为最后一步提供零个或多个键值对。

  • 输出阶段– 在输出阶段,我们有一个输出格式化程序,它转换来自 Reducer 函数的最终键值对,并使用记录编写器将它们写入文件。

让我们试着借助一个小图来理解 Map &f Reduce 这两个任务 –

MapReduce 工作

MapReduce-示例

让我们举一个真实世界的例子来理解 MapReduce 的威力。Twitter 每天接收大约 5 亿条推文,即每秒近 3000 条推文。下图显示了 Tweeter 如何在 MapReduce 的帮助下管理其推文。

MapReduce 示例

如图所示,MapReduce 算法执行以下操作 –

  • Tokenize – 将推文标记为标记映射并将它们写入为键值对。

  • Filter – 从标记映射中过滤掉不需要的词,并将过滤后的映射写为键值对。

  • Count – 每个单词生成一个令牌计数器。

  • 聚合计数器– 将类似计数器值的聚合准备为小的可管理单元。

MapReduce – 算法

MapReduce 算法包含两个重要的任务,即 Map 和 Reduce。

  • map任务通过Mapper类完成
  • reduce 任务是通过 Reducer 类完成的。

Mapper 类接受输入,对其进行标记化、映射和排序。Mapper 类的输出被 Reducer 类用作输入,它依次搜索匹配对并减少它们。

映射器减速器类

MapReduce 实现了各种数学算法,将任务分成小部分并将它们分配给多个系统。在技​​术方面,MapReduce 算法有助于将 Map & Reduce 任务发送到集群中的适当服务器。

这些数学算法可能包括以下内容 –

  • 排序
  • 搜索
  • 索引
  • TF-IDF

排序

排序是处理和分析数据的基本 MapReduce 算法之一。MapReduce 实现了排序算法,以自动按键对映射器的输出键值对进行排序。

  • 排序方法在映射器类本身中实现。

  • 在 Shuffle and Sort 阶段,在对 mapper 类中的值进行标记化后,Context类(用户定义的类)将匹配的 value键收集为一个集合。

  • 为了收集相似的键值对(中间键),Mapper 类借助RawComparator类对键值对进行排序。

  • 给定 Reducer 的一组中间键值对由 Hadoop 自动排序以形成键值 (K2, {V2, V2, …}),然后再将它们呈现给 Reducer。

搜索

搜索在 MapReduce 算法中扮演着重要的角色。它有助于组合器阶段(可选)和减速器阶段。让我们通过一个例子来理解搜索是如何工作的。

例子

下面的例子展示了 MapReduce 如何使用搜索算法找出给定员工数据集中薪水最高的员工的详细信息。

  • 让我们假设我们有四个不同文件中的员工数据 – A、B、C 和 D。我们还假设所有四个文件中都有重复的员工记录,因为从所有数据库表中重复导入员工数据。请参阅下图。

地图缩小图

  • Map 阶段处理每个输入文件并以键值对(<k, v> : <emp name,salary>)的形式提供员工数据。请参阅下图。

地图缩小图

  • 组合器阶段(搜索技术)将接受来自 Map 阶段的输入作为带有员工姓名和薪水的键值对。使用搜索技术,组合器将检查所有员工的薪水,以找到每个文件中薪水最高的员工。请参阅以下代码段。

<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary

if(v(second employee).salary > Max){
   Max = v(salary);
}

else{
   Continue checking;
}

预期结果如下 –

<satish, 26000>

<gopal, 50000>

<kiran, 45000>

<manisha, 45000>

  • Reducer phase – 形成每个文件,你会找到最高薪水的员工。为避免冗余,请检查所有 <k, v> 对并消除重复条目(如果有)。在来自四个输入文件的四个 <k, v> 对之间使用相同的算法。最终输出应如下 –

<gopal, 50000>

索引

通常索引用于指向特定数据及其地址。它对特定 Mapper 的输入文件执行批量索引。

MapReduce 中通常使用的索引技术称为倒排索引。Google 和 Bing 等搜索引擎使用倒排索引技术。让我们通过一个简单的例子来理解索引是如何工作的。

例子

以下文本是倒排索引的输入。这里 T[0]、T[1] 和 t[2] 是文件名,它们的内容用双引号括起来。

T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"

应用索引算法后,我们得到以下输出 –

"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}

这里的 “a”: {2} 意味着术语 “a” 出现在 T[2] 文件中。类似地,“is”:{0, 1, 2} 表示术语“is”出现在文件 T[0]、T[1] 和 T[2] 中。

TF-IDF

TF-IDF 是一种文本处理算法,它是 Term Frequency – Inverse Document Frequency 的缩写。它是常见的网络分析算法之一。这里,术语“频率”是指术语在文档中出现的次数。

词频 (TF)

它衡量特定术语在文档中出现的频率。它的计算方法是一个单词在文档中出现的次数除以该文档中的单词总数。

TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)

逆向文档频率 (IDF)

它衡量一个术语的重要性。它的计算方法是文本数据库中的文档数除以出现特定术语的文档数。

在计算 TF 时,所有术语都被认为同等重要。这意味着,TF 会计算“is”、“a”、“what”等普通单词的词频。因此,我们需要通过计算以下公式来了解频繁词,同时扩大稀有词的比例 –

IDF(the) = log_e(Total number of documents / Number of documents with term ‘the’ in it).

下面通过一个小例子来解释该算法。

例子

考虑一个包含 1000 个单词的文档,其中单词hive出现了 50 次。hive的 TF 为(50 / 1000) = 0.05。

现在,假设我们有 1000 万个文档,其中1000 个中出现了hive一词然后,IDF 计算为 log(10,000,000 / 1,000) = 4。

TF-IDF 重量是这些量的乘积 − 0.05 × 4 = 0.20。

MapReduce – 安装

MapReduce 仅适用于 Linux 风格的操作系统,并且它内置了 Hadoop 框架。我们需要执行以下步骤来安装 Hadoop 框架。

验证 JAVA 安装

在安装 Hadoop 之前,必须在您的系统上安装 Java。使用以下命令检查您的系统上是否安装了 Java。

$ java –version

如果您的系统上已经安装了 Java,您将看到以下响应 –

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统上没有安装 Java,请按照以下步骤操作。

安装 Java

第1步

从以下链接下载最新版本的 Java –
此链接

下载后,您可以在下载文件夹中找到文件jdk-7u71-linux-x64.tar.gz

第2步

使用以下命令提取jdk-7u71-linux-x64.gz的内容。

$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz
$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

第 3 步

要使所有用户都可以使用 Java,您必须将其移动到“/usr/local/”位置。转到 root 并键入以下命令 –

$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit

第四步

要设置 PATH 和 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。

export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin

将所有更改应用于当前运行的系统。

$ source ~/.bashrc

第 5 步

使用以下命令来配置 Java 替代品 –

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2

# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2

# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java

# alternatives --set javac usr/local/java/bin/javac

# alternatives --set jar usr/local/java/bin/jar

现在从终端使用命令java -version验证安装

验证 Hadoop 安装

在安装 MapReduce 之前,必须在您的系统上安装 Hadoop。让我们使用以下命令验证 Hadoop 安装 –

$ hadoop version

如果您的系统上已经安装了 Hadoop,那么您将收到以下响应 –

Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

如果您的系统上未安装 Hadoop,请继续执行以下步骤。

下载 Hadoop

从 Apache Software Foundation 下载 Hadoop 2.4.1 并使用以下命令提取其内容。

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

以伪分布式模式安装Hadoop

以下步骤用于以伪分布式模式安装 Hadoop 2.4.1。

步骤 1 – 设置 Hadoop

您可以通过将以下命令附加到 ~/.bashrc 文件来设置 Hadoop 环境变量。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

将所有更改应用于当前运行的系统。

$ source ~/.bashrc

第 2 步 – Hadoop 配置

您可以在“$HADOOP_HOME/etc/hadoop”位置找到所有 Hadoop 配置文件。您需要根据您的 Hadoop 基础架构对这些配置文件进行适当的更改。

$ cd $HADOOP_HOME/etc/hadoop

为了使用 Java 开发 Hadoop 程序,您必须通过将 JAVA_HOME 值替换为系统中 Java 的位置来重置hadoop-env.sh文件中的 Java 环境变量

export JAVA_HOME=/usr/local/java

您必须编辑以下文件来配置 Hadoop –

  • 核心站点.xml
  • hdfs-site.xml
  • 纱线站点.xml
  • mapred-site.xml

核心站点.xml

core-site.xml 包含以下信息 –

  • 用于 Hadoop 实例的端口号
  • 为文件系统分配的内存
  • 存储数据的内存限制
  • 读/写缓冲区的大小

打开 core-site.xml 并在 <configuration> 和 </configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000 </value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml 包含以下信息 –

  • 复制数据的价值
  • 名称节点路径
  • 本地文件系统的数据节点路径(您要存储 Hadoop 基础设施的位置)

让我们假设以下数据。

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

打开此文件并在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   
   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
   
</configuration>

注意– 在上面的文件中,所有属性值都是用户定义的,您可以根据您的 Hadoop 基础架构进行更改。

纱线站点.xml

该文件用于将 yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

该文件用于指定我们正在使用的 MapReduce 框架。默认情况下,Hadoop 包含一个yarn-site.xml 模板。首先,您需要使用以下命令将文件从 mapred-site.xml.template 复制到 mapred-site.xml 文件。

$ cp mapred-site.xml.template mapred-site.xml

打开 mapred-site.xml 文件并在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

验证 Hadoop 安装

以下步骤用于验证 Hadoop 安装。

步骤 1 – 名称节点设置

使用命令“hdfs namenode -format”设置名称节点,如下所示 –

$ cd ~
$ hdfs namenode -format

预期结果如下 –

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:

/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

第 2 步 – 验证 Hadoop dfs

执行以下命令以启动您的 Hadoop 文件系统。

$ start-dfs.sh

预期输出如下 –

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

第 3 步 – 验证纱线脚本

以下命令用于启动纱线脚本。执行此命令将启动您的纱线守护进程。

$ start-yarn.sh

预期输出如下 –

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

第 4 步 – 在浏览器上访问 Hadoop

访问 Hadoop 的默认端口号是 50070。使用以下 URL 在浏览器上获取 Hadoop 服务。

http://localhost:50070/

以下屏幕截图显示了 Hadoop 浏览器。

Hadoop 浏览器

步骤 5 – 验证集群的所有应用程序

访问集群所有应用程序的默认端口号是 8088。使用以下 URL 来使用此服务。

http://localhost:8088/

以下屏幕截图显示了 Hadoop 集群浏览器。

Hadoop 集群浏览器

MapReduce – API

在本章中,我们将仔细研究 MapReduce 编程操作中涉及的类及其方法。我们将主要关注以下方面 –

  • 作业上下文接口
  • 工作班级
  • 映射器类
  • 减速机类

作业上下文接口

JobContext 接口是所有类的超级接口,它定义了 MapReduce 中的不同作业。它为您提供了在任务运行时提供给任务的作业的只读视图。

以下是 JobContext 接口的子接口。

S.No. 子接口说明
1. MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

定义提供给 Mapper 的上下文。

2. ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

定义传递给 Reducer 的上下文。

Job类是实现JobContext接口的主要类。

工作班级

Job 类是 MapReduce API 中最重要的类。它允许用户配置作业、提交作业、控制其执行和查询状态。set 方法仅在提交作业之前有效,之后它们将抛出 IllegalStateException。

通常,用户创建应用程序,描述作业的各个方面,然后提交作业并监控其进度。

以下是如何提交作业的示例 –

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

构造函数

以下是 Job 类的构造函数摘要。

S.No 构造函数总结
1 工作()
2 作业(配置配置)
3 作业(配置配置,字符串作业名称)

方法

Job 类的一些重要方法如下 –

S.No 方法说明
1 获取作业名称()

用户指定的作业名称。

2 getJobState()

返回作业的当前状态。

3 做完了()

检查作业是否完成。

4 设置输入格式类()

设置作业的 InputFormat。

5 setJobName(字符串名称)

设置用户指定的作业名称。

6 设置输出格式类()

设置作业的输出格式。

7 setMapperClass(Class)

设置作业的映射器。

8 setReducerClass(Class)

为作业设置 Reducer。

9 setPartitionerClass(Class)

设置作业的分区程序。

10 setCombinerClass(Class)

为作业设置组合器。

映射器类

Mapper 类定义了 Map 作业。将输入键值对映射到一组中间键值对。映射是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录的类型相同。一个给定的输入对可以映射到零个或多个输出对。

方法

map是 Mapper 类中最突出的方法。语法定义如下 –

map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)

为输入拆分中的每个键值对调用一次此方法。

减速机类

Reducer 类定义 MapReduce 中的 Reduce 作业。它将共享一个键的一组中间值减少到一组较小的值。Reducer 实现可以通过 JobContext.getConfiguration() 方法访问作业的配置。Reducer 具有三个主要阶段 – Shuffle、Sort 和 Reduce。

  • Shuffle – Reducer 使用 HTTP 跨网络复制每个 Mapper 的排序输出。

  • Sort – 框架按键对 Reducer 输入进行合并排序(因为不同的 Mapper 可能输出相同的键)。shuffle 和 sort 阶段同时发生,即在获取输出时,它们被合并。

  • Reduce – 在此阶段,为排序输入中的每个 <key, (collection of values)> 调用 reduce (Object, Iterable, Context) 方法。

方法

reduce是 Reducer 类中最突出的方法。语法定义如下 –

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

对于键值对集合中的每个键,都会调用此方法一次。

MapReduce – Hadoop 实现

MapReduce 是一个框架,用于编写应用程序以可靠的方式处理大型商品硬件集群上的大量数据。本章带你使用Java在Hadoop框架中操作MapReduce。

MapReduce 算法

通常,MapReduce 范式是基于将 map-reduce 程序发送到实际数据所在的计算机。

  • 在 MapReduce 作业期间,Hadoop 将 Map 和 Reduce 任务发送到集群中的适当服务器。

  • 该框架管理数据传递的所有细节,例如发出任务、验证任务完成以及在节点之间的集群周围复制数据。

  • 大多数计算发生在节点上,数据位于本地磁盘上,从而减少了网络流量。

  • 完成给定的任务后,集群收集并缩减数据以形成适当的结果,并将其发送回Hadoop服务器。

MapReduce 算法

输入和输出(Java 视角)

MapReduce 框架对键值对进行操作,即框架将作业的输入视为一组键值对,并产生一组键值对作为作业的输出,可想而知是不同类型的。

键和值类必须由框架序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口以方便框架进行排序。

MapReduce 作业的输入和输出格式均采用键值对的形式 –

(输入) <k1, v1> -> map -> <k2, v2>-> reduce -> <k3, v3> (输出)。

输入 输出
Map <k1​​, v1> 列表 (<k2, v2>)
Reduce <k2, 列表(v2)> 列表 (<k3, v3>)

MapReduce 实现

下表显示了有关组织的电力消耗的数据。该表包括每月用电量和连续五年的年平均值。

二月 三月 四月 可能 七月 八月 九月 十月 十一月 十二月 平均
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

我们需要编写应用程序来处理给定表中的输入数据,以找出使用率最高的年份、使用率最低的年份等。对于记录数量有限的程序员来说,这项任务很容易,因为他们只需编写逻辑以产生所需的输出,并将数据传递给编写的应用程序。

现在让我们提高输入数据的规模。假设我们必须分析特定州所有大型工业的电力消耗。当我们编写应用程序来处理这样的大量数据时,

  • 它们将需要大量时间来执行。

  • 当我们将数据从源移动到网络服务器时,将会有大量的网络流量。

为了解决这些问题,我们有了 MapReduce 框架。

输入数据

上述数据保存为sample.txt并作为输入给出。输入文件如下所示。

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

示例程序

下面的示例数据程序使用 MapReduce 框架。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

将上述程序保存到ProcessUnits.java 中下面给出程序的编译和执行。

ProcessUnits程序的编译与执行

让我们假设我们在 Hadoop 用户的主目录中(例如 /home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

步骤 1 – 使用以下命令创建一个目录来存储已编译的 java 类。

$ mkdir units

步骤 2 – 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。mvnrepository.com下载 jar 让我们假设下载文件夹是 /home/hadoop/。

第 3 步– 以下命令用于编译ProcessUnits.java程序并为该程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

步骤 4 – 以下命令用于在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 – 以下命令用于将名为sample.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

步骤 6 – 以下命令用于验证输入目录中的文件

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 7 – 以下命令用于通过从输入目录获取输入文件来运行 Eleunit_max 应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、Map 任务、Reducer 任务等。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

步骤 8 – 以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤 9 – 以下命令用于查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程序生成的输出 –

1981 34
1984 40
1985 45

步骤 10 – 以下命令用于将输出文件夹从 HDFS 复制到本地文件系统。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop

MapReduce – 分区器

分区器的工作方式类似于处理输入数据集的条件。分区阶段发生在 Map 阶段之后,Reduce 阶段之前。

分区器的数量等于减速器的数量。这意味着 partitioner 将根据 reducer 的数量划分数据。因此,从单个分区器传递的数据由单个 Reducer 处理。

分区器

分区器对中间映射输出的键值对进行分区。它使用用户定义的条件对数据进行分区,其工作方式类似于散列函数。分区总数与作业的 Reducer 任务数相同。让我们举个例子来理解分区器是如何工作的。

MapReduce 分区器实现

为方便起见,假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为我们的输入数据集来演示分区器的工作原理。

Id 名称 年龄 性别 薪水
1201 戈帕尔 45 男性 50,000
1202 马尼沙 40 女性 50,000
1203 哈利勒 34 男性 30,000
1204 普拉桑 30 男性 30,000
1205 基兰 20 男性 40,000
1206 拉克西米 25 女性 35,000
1207 巴维亚 20 女性 15,000
1208 雷什马 19 女性 15,000
1209 克兰蒂 22 男性 22,000
1210 萨蒂什 24 男性 25,000
1211 克里希纳 25 男性 25,000
1212 阿尔沙德 28 男性 20,000
1213 拉瓦尼亚 18 女性 8,000

我们必须编写一个应用程序来处理输入数据集,以查找不同年龄组(例如,20 岁以下、21 至 30 岁之间、30 岁以上)中按性别划分的最高薪水员工。

输入数据

上面的数据作为input.txt保存在“/home/hadoop/hadoopPartitioner”目录下,作为输入给出。

1201 戈帕尔 45 男性 50000
1202 马尼沙 40 女性 51000
1203 哈利勒 34 男性 30000
1204 普拉桑 30 男性 31000
1205 基兰 20 男性 40000
1206 拉克西米 25 女性 35000
1207 巴维亚 20 女性 15000
1208 雷什马 19 女性 14000
1209 克兰蒂 22 男性 22000
1210 萨蒂什 24 男性 25000
1211 克里希纳 25 男性 26000
1212 阿尔沙德 28 男性 20000
1213 拉瓦尼亚 18 女性 8000

基于给定的输入,以下是程序的算法解释。

地图任务

map 任务接受键值对作为输入,而我们在文本文件中有文本数据。此地图任务的输入如下 –

输入– 键将是一个模式,例如“任何特殊键&加文件名&加行号”(例如:key = @input1),值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t 男 \t 50000)。

方法– 此地图任务的操作如下 –

  • 读取(记录数据),它作为字符串中参数列表的输入值。

  • 使用 split 函数,将性别分开并存储在字符串变量中。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 将性别信息和记录数据作为输出键值对从映射任务发送到分区任务

context.write(new Text(gender), new Text(value));
  • 对文本文件中的所有记录重复上述所有步骤。

输出– 您将获得性别数据和记录数据值作为键值对。

分区任务

分区任务接受来自地图任务的键值对作为其输入。分区意味着将数据划分为段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三部分。

输入– 键值对集合中的全部数据。

键 = 记录中的性别字段值。

value = 该性别的整个记录​​数据值。

方法– 分区逻辑的过程如下。

  • 从输入的键值对中读取年龄字段值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 在以下条件下检查年龄值。

    • 年龄小于或等于 20
    • 年龄大于 20 且小于或等于 30。
    • 年龄大于 30。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

输出– 键值对的整个数据被分为三个键值对集合。Reducer 在每个集合上单独工作。

减少任务

partitioner 任务的数量等于reducer 任务的数量。这里我们有三个分区任务,因此我们有三个 Reducer 任务要执行。

Input – Reducer 将使用不同的键值对集合执行 3 次。

key = 记录中的性别字段值。

value = 该性别的全部记录数据。

方法– 以下逻辑将应用于每个集合。

  • 读取每条记录的 Salary 字段值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • 使用 max 变量检查工资。如果 str[4] 是最大工资,则将 str[4] 分配给 max,否则跳过该步骤。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 对每个密钥集合重复步骤 1 和 2(男性和女性是密钥集合)。执行完这三个步骤后,您会从 Male 键集合中找到一个最高工资,从女键集合中找到一个最高工资。

context.write(new Text(key), new IntWritable(max));

输出– 最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含来自男性集合的最高工资和来自每个年龄组的女性集合的最高工资。

执行Map、Partitioner、Reduce任务后,三个键值对数据集合作为输出存储在三个不同的文件中。

所有这三个任务都被视为 MapReduce 作业。这些工作的以下要求和规格应在配置中指定 –

  • 职位名称
  • 键值的输入输出格式
  • Map、Reduce 和 Partitioner 任务的各个类
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

示例程序

以下程序显示了如何在 MapReduce 程序中为给定条件实现分区器。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

将上述代码保存为“/home/hadoop/hadoopPartitioner”中的PartitionerExample.java下面给出程序的编译和执行。

编译和执行

假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

步骤 1 – 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载 jar

让我们假设下载的文件夹是“/home/hadoop/hadoopPartitioner”

步骤 2 – 以下命令用于编译程序PartitionerExample.java并为程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

步骤 3 – 使用以下命令在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 – 使用以下命令将名为input.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

步骤 5 – 使用以下命令验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 6 – 使用以下命令通过从输入目录中获取输入文件来运行最高工资应用程序。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、map 任务和 Reducer 任务。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

步骤 7 – 使用以下命令验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个减速器。

步骤 8 – 使用以下命令查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Part-00000 中的输出

Female   15000
Male     40000

使用以下命令查看Part-00001文件中的输出

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Part-00001 中的输出

Female   35000
Male    31000

使用以下命令查看Part-00002文件中的输出

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002中的输出

Female  51000
Male   50000

MapReduce – 组合器

组合器,也称为半简化器,是一个可选类,它通过接受来自 Map 类的输入然后将输出键值对传递给 Reducer 类来进行操作。

组合器的主要功能是汇总具有相同键的地图输出记录。组合器的输出(键值集合)将通过网络发送到实际的 Reducer 任务作为输入。

合路器

在 Map 类和 Reduce 类之间使用了 Combiner 类,以减少 Map 和 Reduce 之间的数据传输量。通常,map任务的输出量大,传输给reduce任务的数据量大。

下面的 MapReduce 任务图显示了组合器阶段。

合路器

合路器如何工作?

以下是 MapReduce Combiner 如何工作的简要总结 –

  • 组合器没有预定义的接口,它必须实现 Reducer 接口的 reduce() 方法。

  • 组合器对每个映射输出键进行操作。它必须具有与 Reducer 类相同的输出键值类型。

  • 组合器可以从大型数据集中生成摘要信息,因为它替换了原始 Map 输出。

虽然,Combiner 是可选的,但它有助于将数据分成多个组以用于 Reduce 阶段,从而更易于处理。

MapReduce 组合器实现

以下示例提供了有关组合器的理论思想。让我们假设我们有以下名为input.txt的 MapReduce输入文本文件

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

下面讨论了 MapReduce 程序与 Combiner 的重要阶段。

记录阅读器

这是 MapReduce 的第一阶段,记录阅读器从输入文本文件中读取每一行作为文本,并以键值对的形式产生输出。

输入– 来自输入文件的逐行文本。

输出– 形成键值对。以下是预期的键值对集。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

地图阶段

Map 阶段从 Record Reader 获取输入,对其进行处理,并将输出作为另一组键值对产生。

输入– 以下键值对是从记录阅读器中获取的输入。

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Map 阶段读取每个键值对,使用 StringTokenizer 从值中划分每个单词,将每个单词视为键并将该单词的计数作为值。以下代码片段显示了 Mapper 类和 map 函数。

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();
   
   public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) 
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

输出– 预期输出如下 –

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

合路相

组合器阶段从 Map 阶段获取每个键值对,对其进行处理,并将输出生成为键值集合对。

Input – 以下键值对是从 Map 阶段获取的输入。

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

组合器阶段读取每个键值对,将常用词组合为键,值组合为集合。通常,Combiner 的代码和操作类似于 Reducer 的代码和操作。以下是 Mapper、Combiner 和 Reducer 类声明的代码片段。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

输出– 预期输出如下 –

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

减速阶段

Reducer 阶段从组合器阶段获取每个键值集合对,对其进行处理,并将输出作为键值对传递。请注意,Combiner 功能与 Reducer 相同。

输入– 以下键值对是从组合器阶段获取的输入。

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Reducer 阶段读取每个键值对。以下是组合器的代码片段。

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
{
   private IntWritable result = new IntWritable();
   
   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException 
   {
      int sum = 0;
      for (IntWritable val : values) 
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

输出– Reducer 阶段的预期输出如下 –

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

唱片作家

这是 MapReduce 的最后一个阶段,其中 Record Writer 写入来自 Reducer 阶段的每个键值对,并将输出作为文本发送。

Input – 来自 Reducer 阶段的每个键值对以及输出格式。

输出– 它为您提供文本格式的键值对。以下是预期的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

示例程序

以下代码块计算程序中的字数。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      
      public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) 
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }
   
   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> 
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
      {
         int sum = 0;
         for (IntWritable val : values) 
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }
   
   public static void main(String[] args) throws Exception 
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
		
      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);
		
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
		
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

将上述程序另存为WordCount.java下面给出程序的编译和执行。

编译和执行

假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

步骤 1 – 使用以下命令创建一个目录来存储已编译的 java 类。

$ mkdir units

步骤 2 – 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载 jar

让我们假设下载的文件夹是 /home/hadoop/。

步骤 3 – 使用以下命令编译WordCount.java程序并为该程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

步骤 4 – 使用以下命令在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 – 使用以下命令将名为input.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

步骤 6 – 使用以下命令验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 7 – 使用以下命令通过从输入目录中获取输入文件来运行字数统计应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

稍等片刻,直到文件被执行。执行后,输出包含多个输入拆分、Map 任务和 Reducer 任务。

步骤 8 – 使用以下命令验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

步骤 9 – 使用以下命令查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程序生成的输出。

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

MapReduce – Hadoop 管理

本章介绍 Hadoop 管理,包括 HDFS 和 MapReduce 管理。

  • HDFS 管理包括监控 HDFS 文件结构、位置和更新的文件。

  • MapReduce 管理包括监控应用程序列表、节点配置、应用程序状态等。

HDFS监控

HDFS(Hadoop分布式文件系统)包含用户目录、输入文件和输出文件。使用 MapReduce 命令putget进行存储和检索。

通过在“/$HADOOP_HOME/sbin”上传递命令“start-all.sh”启动Hadoop框架(守护进程)后,将以下URL传递给浏览器“http://localhost:50070”。您应该会在浏览器上看到以下屏幕。

以下屏幕截图显示了如何浏览浏览 HDFS。

HDFS监控

以下屏幕截图显示了 HDFS 的文件结构。它显示了“/user/hadoop”目录中的文件。

HDFS 文件

以下屏幕截图显示了集群中的 Datanode 信息。在这里您可以找到一个节点及其配置和容量。

达诺达资讯

MapReduce 作业监控

MapReduce 应用程序是一组作业(Map 作业、Combiner、Partitioner 和 Reduce 作业)。必须监控和维护以下内容 –

  • 配置适合应用的数据节点。
  • 每个应用程序使用的数据节点和资源的数量。

为了监控所有这些事情,我们必须有一个用户界面。通过在“/$HADOOP_HOME/sbin”上传递命令“start-all.sh”启动Hadoop框架后,将以下URL传递给浏览器“http://localhost:8080”。您应该会在浏览器上看到以下屏幕。

作业监控

在上面的屏幕截图中,手形指针位于应用程序 ID 上。只需单击它即可在浏览器上找到以下屏幕。它描述了以下内容 –

  • 当前应用程序在哪个用户上运行

  • 应用名称

  • 该应用程序的类型

  • 当前状态,最终状态

  • 应用程序启动时间,已用时间(完成时间),如果在监控时已完成

  • 此应用程序的历史记录,即日志信息

  • 最后是节点信息,即参与运行应用程序的节点。

以下屏幕截图显示了特定应用程序的详细信息 –

应用程序编号

以下屏幕截图描述了当前运行的节点信息。这里,截图只包含一个节点。手形指针显示正在运行的节点的本地主机地址。

所有节点

觉得文章有用?

点个广告表达一下你的爱意吧 !😁