Hadoop – 快速指南

Hadoop – 快速指南


Hadoop – 大数据概述

“世界上 90% 的数据都是在过去几年中产生的。”

由于新技术、新设备和社交网站等通信手段的出现,人类产生的数据量每年都在快速增长。我们从一开始到 2003 年产生的数据量是 50 亿 GB。如果您以磁盘的形式堆积数据,它可能会填满整个足球场。2011 年每两天创建相同数量2013 年每十分钟创建相同数量这个速度还在飞速增长。尽管产生的所有这些信息都是有意义的并且在处理时可能有用,但它却被忽视了。

什么是大数据?

大数据是无法使用传统计算技术处理的大型数据集的集合。它不是一个单一的技术或工具,而是一个完整的主题,它涉及到各种工具、技术和框架。

什么是大数据?

大数据涉及由不同设备和应用程序产生的数据。下面给出了大数据保护下的一些领域。

  • 黑匣子数据– 它是直升机、飞机和喷气式飞机等的组成部分。它捕获机组人员的声音、麦克风和耳机的录音以及飞机的性能信息。

  • 社交媒体数据– Facebook 和 Twitter 等社交媒体拥有全球数百万人发布的信息和观点。

  • 证券交易所数据– 证券交易所数据保存有关客户对不同公司的股份做出的“买入”和“卖出”决策的信息。

  • 电网数据– 电网数据保存特定节点相对于基站消耗的信息。

  • 运输数据– 运输数据包括车辆的型号、容量、距离和可用性。

  • 搜索引擎数据– 搜索引擎从不同的数据库中检索大量数据。

大数据

因此,大数据包括海量、高速和可扩展的各种数据。其中的数据将分为三种类型。

  • 结构化数据– 关系数据。

  • 半结构化数据– XML 数据。

  • 非结构化数据– Word、PDF、文本、媒体日志。

大数据的好处

  • 使用 Facebook 等社交网络中保存的信息,营销机构正在了解对其活动、促销和其他广告媒体的反应。

  • 使用社交媒体中的信息,如消费者的偏好和产品认知,产品公司和零售组织正在计划他们的生产。

  • 使用有关患者既往病史的数据,医院正在提供更好、更快捷的服务。

大数据技术

大数据技术在提供更准确的分析方面很重要,这可能会导致更具体的决策,从而提高运营效率、降低成本并降低业务风险。

为了利用大数据的力量,您需要一个能够实时管理和处理大量结构化和非结构化数据并能够保护数据隐私和安全的基础设施。

市场上有来自不同供应商(包括亚马逊、IBM、微软等)的各种技术来处理大数据。在研究处理大数据的技术时,我们研究了以下两类技术 –

运营大数据

这包括像 MongoDB 这样的系统,它们为主要捕获和存储数据的实时交互式工作负载提供操作功能。

NoSQL 大数据系统旨在利用过去十年出现的新云计算架构,以低成本、高效地运行大规模计算。这使得运营大数据工作负载更易于管理、成本更低且实施速度更快。

一些 NoSQL 系统可以基于实时数据以最少的编码提供对模式和趋势的洞察,而无需数据科学家和额外的基础设施。

分析大数据

其中包括大规模并行处理 (MPP) 数据库系统和 MapReduce 等系统,它们为可能涉及大部分或全部数据的回顾性和复杂分析提供分析功能。

MapReduce 提供了一种新的数据分析方法,补充了 SQL 提供的能力,以及一个基于 MapReduce 的系统,可以从单台服务器扩展到数千台高低端机器。

这两类技术是互补的,并且经常一起部署。

操作与分析系统

操作 分析型
Latency 1 毫秒 – 100 毫秒 1 分钟 – 100 分钟
Concurrency 1000 – 100,000 1 – 10
Access Pattern 写入和读取 读取
Queries 可选择的 非选择性
Data Scope 操作 回顾
End User 顾客 数据科学家
Technology 无SQL MapReduce,MPP 数据库

大数据挑战

与大数据相关的主要挑战如下 –

  • 捕获数据
  • 策展
  • 贮存
  • 搜索
  • 分享
  • 转移
  • 分析
  • 介绍

为了应对上述挑战,组织通常会借助企业服务器。

Hadoop – 大数据解决方案

传统方法

在这种方法中,企业将拥有一台计算机来存储和处理大数据。出于存储目的,程序员将借助他们选择的数据库供应商(例如 Oracle、IBM 等)的帮助。在这种方法中,用户与应用程序进行交互,应用程序依次处理数据存储和分析部分。

大数据传统方法

局限性

这种方法适用于处理标准数据库服务器可以容纳的海量数据的应用程序,或者处理数据的处理器的限制。但是当涉及到处理大量可扩展的数据时,通过单一的数据库瓶颈来处理这些数据是一项繁重的任务。

谷歌的解决方案

谷歌使用一种叫做 MapReduce 的算法解决了这个问题。该算法将任务分成小部分,并将它们分配给多台计算机,并从它们中收集结果,整合后形成结果数据集。

Google MapReduce

Hadoop

使用 Google 提供的解决方案,Doug Cutting和他的团队开发了一个名为HADOOP 的开源项目

Hadoop 使用 MapReduce 算法运行应用程序,其中数据与其他数据并行处理。简而言之,Hadoop 用于开发可以对海量数据进行完整统计分析的应用程序。

Hadoop框架

Hadoop – 简介

Hadoop 是一个用 Java 编写的 Apache 开源框架,它允许使用简单的编程模型跨计算机集群分布式处理大型数据集。Hadoop 框架应用程序在提供跨计算机集群的分布式存储计算的环境中工作Hadoop 旨在从单个服务器扩展到数千台机器,每台机器都提供本地计算和存储。

Hadoop架构

Hadoop 的核心有两个主要层,即 –

  • 处理/计算层(MapReduce),以及
  • 存储层(Hadoop 分布式文件系统)。

Hadoop架构

映射简化

MapReduce 是一种并行编程模型,用于编写由 Google 设计的分布式应用程序,用于在商用硬件的大型集群(数千个节点)上以可靠、容错的方式高效处理大量数据(多 TB 数据集)。MapReduce 程序在 Hadoop 上运行,Hadoop 是一个 Apache 开源框架。

Hadoop分布式文件系统

Hadoop 分布式文件系统 (HDFS) 基于 Google 文件系统 (GFS) 并提供了一个分布式文件系统,该系统旨在运行在商品硬件上。它与现有的分布式文件系统有很多相似之处。但是,与其他分布式文件系统的区别是显着的。它具有高度容错性,旨在部署在低成本硬件上。它提供对应用程序数据的高吞吐量访问,适用于具有大型数据集的应用程序。

除了上述两个核心组件外,Hadoop 框架还包括以下两个模块 –

  • Hadoop Common – 这些是其他 Hadoop 模块所需的 Java 库和实用程序。

  • Hadoop YARN – 这是一个用于作业调度和集群资源管理的框架。

Hadoop 是如何工作的?

构建具有处理大规模处理的繁重配置的大型服务器是相当昂贵的,但作为替代方案,您可以将许多具有单 CPU 的商用计算机捆绑在一起,作为一个单一功能的分布式系统,实际上,集群机器可以读取数据集并提供更高的吞吐量。此外,它比一台高端服务器便宜。所以这是使用 Hadoop 背后的第一个激励因素,它在集群和低成本机器上运行。

Hadoop 跨计算机集群运行代码。此过程包括 Hadoop 执行的以下核心任务 –

  • 数据最初分为目录和文件。文件被分为统一大小的 128M 和 64M(最好是 128M)的块。

  • 然后将这些文件分布在各个集群节点上以供进一步处理。

  • HDFS 位于本地文件系统之上,负责监督处理过程。

  • 复制块以处理硬件故障。

  • 检查代码是否成功执行。

  • 执行在 map 和 reduce 阶段之间发生的排序。

  • 将排序后的数据发送到某台计算机。

  • 为每个作业编写调试日志。

Hadoop的优势

  • Hadoop 框架允许用户快速编写和测试分布式系统。它是高效的,它在机器之间自动分配数据和工作,进而利用 CPU 内核的底层并行性。

  • Hadoop 不依赖于硬件来提供容错和高可用性 (FTHA),而是 Hadoop 库本身旨在检测和处理应用层的故障。

  • 服务器可以动态地从集群中添加或删除,Hadoop 继续运行而不会中断。

  • Hadoop 的另一大优势是,除了开源之外,它还兼容所有平台,因为它是基于 Java 的。

Hadoop – 环境设置

Hadoop 受 GNU/Linux 平台及其风格的支持。因此,我们必须安装一个 Linux 操作系统来搭建 Hadoop 环境。如果您有 Linux 以外的操作系统,您可以在其中安装 Virtualbox 软件并在 Virtualbox 中安装 Linux。

预安装设置

在将 Hadoop 安装到 Linux 环境之前,我们需要使用ssh(Secure Shell)设置 Linux 按照下面给出的步骤设置 Linux 环境。

创建用户

一开始,建议为 Hadoop 创建一个单独的用户,将 Hadoop 文件系统与 Unix 文件系统隔离。按照下面给出的步骤创建用户 –

  • 使用命令“su”打开根目录。

  • 使用命令“useradd username”从 root 帐户创建一个用户。

  • 现在您可以使用命令“su username”打开一个现有的用户帐户。

打开 Linux 终端并键入以下命令以创建用户。

$ su 
   password: 
# useradd hadoop 
# passwd hadoop 
   New passwd: 
   Retype new passwd 

SSH 设置和密钥生成

SSH 设置需要在集群上执行不同的操作,例如启动、停止、分布式守护进程 shell 操作。为了对Hadoop的不同用户进行身份验证,需要为一个Hadoop用户提供公钥/私钥对,并与不同的用户共享。

以下命令用于使用 SSH 生成键值对。将id_rsa.pub中的公钥复制到authorized_keys中,分别为owner提供对authorized_keys文件的读写权限。

$ ssh-keygen -t rsa 
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 
$ chmod 0600 ~/.ssh/authorized_keys 

安装 Java

Java 是 Hadoop 的主要先决条件。首先,您应该使用命令“java -version”验证系统中是否存在 java。java version 命令的语法如下。

$ java -version 

如果一切正常,它将为您提供以下输出。

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)  

如果您的系统中未安装 java,请按照下面给出的步骤安装 java。

第1步

访问以下链接www.oracle.com下载 java (JDK <最新版本> – X64.tar.gz)

然后jdk-7u71-linux-x64.tar.gz将被下载到您的系统中。

第2步

通常,您会在 Downloads 文件夹中找到下载的 java 文件。验证它并使用以下命令提取jdk-7u71-linux-x64.gz文件。

$ cd Downloads/ 
$ ls 
jdk-7u71-linux-x64.gz 

$ tar zxf jdk-7u71-linux-x64.gz 
$ ls 
jdk1.7.0_71   jdk-7u71-linux-x64.gz 

第 3 步

要使所有用户都可以使用 java,您必须将其移动到“/usr/local/”位置。打开root,输入以下命令。

$ su 
password: 
# mv jdk1.7.0_71 /usr/local/ 
# exit 

第四步

要设置PATHJAVA_HOME变量,请将以下命令添加到~/.bashrc文件中。

export JAVA_HOME=/usr/local/jdk1.7.0_71 
export PATH=$PATH:$JAVA_HOME/bin 

现在将所有更改应用到当前运行的系统中。

$ source ~/.bashrc

第 5 步

使用以下命令来配置 java 替代品 –

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2
# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2
# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java
# alternatives --set javac usr/local/java/bin/javac
# alternatives --set jar usr/local/java/bin/jar

现在从终端验证 java -version 命令,如上所述。

下载 Hadoop

使用以下命令从 Apache 软件基金会下载并提取 Hadoop 2.4.1。

$ su 
password: 
# cd /usr/local 
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/ 
hadoop-2.4.1.tar.gz 
# tar xzf hadoop-2.4.1.tar.gz 
# mv hadoop-2.4.1/* to hadoop/ 
# exit 

Hadoop 操作模式

下载 Hadoop 后,您可以在三种支持的模式之一中操作您的 Hadoop 集群 –

  • 本地/独立模式– 在您的系统中下载 Hadoop 后,默认情况下,它配置为独立模式,可以作为单个 Java 进程运行。

  • 伪分布式模式– 它是单机上的分布式模拟。每个 Hadoop 守护进程,例如 hdfs、yarn、MapReduce 等,都将作为单独的 java 进程运行。这种模式对开发很有用。

  • 完全分布式模式– 这种模式是完全分布式的,至少有两台或更多台机器作为一个集群。我们将在接下来的章节中详细介绍这种模式。

以独立模式安装 Hadoop

在这里,我们将讨论以独立模式安装Hadoop 2.4.1

没有守护进程在运行,一切都在单个 JVM 中运行。独立模式适合在开发过程中运行 MapReduce 程序,因为它易于测试和调试。

设置 Hadoop

您可以通过将以下命令附加到~/.bashrc文件来设置 Hadoop 环境变量

export HADOOP_HOME=/usr/local/hadoop 

在继续之前,您需要确保 Hadoop 工作正常。只需发出以下命令 –

$ hadoop version 

如果您的设置一切正常,那么您应该会看到以下结果 –

Hadoop 2.4.1 
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768 
Compiled by hortonmu on 2013-10-07T06:28Z 
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4 

这意味着您的 Hadoop 的独立模式设置工作正常。默认情况下,Hadoop 配置为在单台机器上以非分布式模式运行。

例子

让我们检查一个简单的 Hadoop 示例。Hadoop 安装提供以下示例 MapReduce jar 文件,它提供 MapReduce 的基本功能,可用于计算,如 Pi 值、给定文件列表中的字数等。

$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar 

让我们有一个输入目录,我们将在其中推送一些文件,我们的要求是计算这些文件中的单词总数。要计算总字数,我们不需要编写 MapReduce,只要 .jar 文件包含字数统计的实现。您可以使用相同的 .jar 文件尝试其他示例;只需发出以下命令即可通过 hadoop-mapreduce-examples-2.2.0.jar 文件检查支持的 MapReduce 功能程序。

$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduceexamples-2.2.0.jar 

第1步

在输入目录中创建临时内容文件。您可以在任何想要工作的地方创建此输入目录。

$ mkdir input 
$ cp $HADOOP_HOME/*.txt input 
$ ls -l input 

它将在您的输入目录中提供以下文件 –

total 24 
-rw-r--r-- 1 root root 15164 Feb 21 10:14 LICENSE.txt 
-rw-r--r-- 1 root root   101 Feb 21 10:14 NOTICE.txt
-rw-r--r-- 1 root root  1366 Feb 21 10:14 README.txt 

这些文件已从 Hadoop 安装主目录复制。对于您的实验,您可以拥有不同的大型文件集。

第2步

让我们启动 Hadoop 进程来计算输入目录中所有可用文件中的单词总数,如下所示 –

$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduceexamples-2.2.0.jar  wordcount input output 

第 3 步

步骤 2 将进行所需的处理并将输出保存在 output/part-r00000 文件中,您可以使用以下命令进行检查 –

$cat output/* 

它将列出所有单词及其在输入目录中所有可用文件中可用的总数。

"AS      4 
"Contribution" 1 
"Contributor" 1 
"Derivative 1
"Legal 1
"License"      1
"License");     1 
"Licensor"      1
"NOTICE”        1 
"Not      1 
"Object"        1 
"Source”        1 
"Work”    1 
"You"     1 
"Your")   1 
"[]"      1 
"control"       1 
"printed        1 
"submitted"     1 
(50%)     1 
(BIS),    1 
(C)       1 
(Don't)   1 
(ECCN)    1 
(INCLUDING      2 
(INCLUDING,     2 
.............

以伪分布式模式安装Hadoop

按照下面给出的步骤以伪分布式模式安装 Hadoop 2.4.1。

第 1 步 – 设置 Hadoop

您可以通过将以下命令附加到~/.bashrc文件来设置 Hadoop 环境变量

export HADOOP_HOME=/usr/local/hadoop 
export HADOOP_MAPRED_HOME=$HADOOP_HOME 
export HADOOP_COMMON_HOME=$HADOOP_HOME 

export HADOOP_HDFS_HOME=$HADOOP_HOME 
export YARN_HOME=$HADOOP_HOME 
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native 
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin 
export HADOOP_INSTALL=$HADOOP_HOME 

现在将所有更改应用到当前运行的系统中。

$ source ~/.bashrc 

第 2 步 – Hadoop 配置

您可以在“$HADOOP_HOME/etc/hadoop”位置找到所有 Hadoop 配置文件。需要根据您的 Hadoop 基础架构对这些配置文件进行更改。

$ cd $HADOOP_HOME/etc/hadoop

为了用java开发Hadoop程序,您必须通过将JAVA_HOME替换为您系统中java的位置来重置hadoop-env.sh文件中的java环境变量

export JAVA_HOME=/usr/local/jdk1.7.0_71

以下是您必须编辑以配置 Hadoop 的文件列表。

核心站点.xml

芯-的site.xml文件包含信息,诸如读/写缓冲器的用于Hadoop的实例的端口号,分配给文件系统的存储器,存储器限制,用于存储数据,和大小。

打开 core-site.xml 并在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000</value> 
   </property>
</configuration>

hdfs-site.xml

HDFS-的site.xml文件中包含的信息,如复制数据的价值,名称节点的路径,你的本地文件系统的数据节点的路径。它意味着您要存储 Hadoop 基础架构的地方。

让我们假设以下数据。

dfs.replication (data replication value) = 1 

(In the below given path /hadoop/ is the user name. 
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.) 
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode 

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.) 
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode 

打开此文件并在此文件的 <configuration> </configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
    
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode </value>
   </property>
    
   <property>
      <name>dfs.data.dir</name> 
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value> 
   </property>
</configuration>

注意– 在上面的文件中,所有属性值都是用户定义的,您可以根据您的 Hadoop 基础架构进行更改。

纱线站点.xml

该文件用于将 yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在此文件的 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value> 
   </property>
</configuration>

mapred-site.xml

该文件用于指定我们使用的 MapReduce 框架。默认情况下,Hadoop 包含一个yarn-site.xml 模板。首先,需要使用以下命令将文件从mapred-site.xml.template复制mapred-site.xml文件。

$ cp mapred-site.xml.template mapred-site.xml 

打开mapred-site.xml文件并在此文件的 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property> 
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

验证 Hadoop 安装

以下步骤用于验证 Hadoop 安装。

步骤 1 – 名称节点设置

使用命令“hdfs namenode -format”设置namenode,如下所示。

$ cd ~ 
$ hdfs namenode -format 

预期结果如下。

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************ 
STARTUP_MSG: Starting NameNode 
STARTUP_MSG:   host = localhost/192.168.1.11 
STARTUP_MSG:   args = [-format] 
STARTUP_MSG:   version = 2.4.1 
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory 
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted. 
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to 
retain 1 images with txid >= 0 
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0 
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************ 
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11 
************************************************************/

第 2 步 – 验证 Hadoop dfs

以下命令用于启动dfs。执行此命令将启动您的 Hadoop 文件系统。

$ start-dfs.sh 

预期输出如下 –

10/24/14 21:37:56 
Starting namenodes on [localhost] 
localhost: starting namenode, logging to /home/hadoop/hadoop
2.4.1/logs/hadoop-hadoop-namenode-localhost.out 
localhost: starting datanode, logging to /home/hadoop/hadoop
2.4.1/logs/hadoop-hadoop-datanode-localhost.out 
Starting secondary namenodes [0.0.0.0]

第 3 步 – 验证纱线脚本

以下命令用于启动纱线脚本。执行此命令将启动您的纱线守护进程。

$ start-yarn.sh 

预期输出如下 –

starting yarn daemons 
starting resourcemanager, logging to /home/hadoop/hadoop
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out 
localhost: starting nodemanager, logging to /home/hadoop/hadoop
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out 

第 4 步 – 在浏览器上访问 Hadoop

访问 Hadoop 的默认端口号是 50070。使用以下 url 在浏览器上获取 Hadoop 服务。

http://localhost:50070/

在浏览器上访问 Hadoop

步骤 5 – 验证集群的所有应用程序

访问集群所有应用的默认端口号是8088。使用下面的url访问这个服务。

http://localhost:8088/

Hadoop 应用集群

Hadoop – HDFS 概述

Hadoop 文件系统是使用分布式文件系统设计开发的。它在商品硬件上运行。与其他分布式系统不同,HDFS 具有高度容错性,并使用低成本硬件设计。

HDFS 拥有大量数据并提供更轻松的访问。为了存储如此庞大的数据,文件被存储在多台机器上。这些文件以冗余方式存储,以在发生故障时将系统从可能的数据丢失中拯救出来。HDFS 还使应用程序可用于并行处理。

HDFS的特点

  • 适用于分布式存储和处理。
  • Hadoop 提供了一个命令接口来与 HDFS 交互。
  • namenode和datanode内置的服务器帮助用户轻松查看集群状态。
  • 对文件系统数据的流式访问。
  • HDFS 提供文件权限和身份验证。

HDFS架构

下面给出了 Hadoop 文件系统的架构。

HDFS架构

HDFS遵循主从架构,具有以下要素。

名称节点

namenode 是包含 GNU/Linux 操作系统和 namenode 软件的商品硬件。它是一种可以在商品硬件上运行的软件。具有 namenode 的系统充当主服务器,它执行以下任务 –

  • 管理文件系统命名空间。

  • 规范客户端对文件的访问。

  • 它还执行文件系统操作,例如重命名、关闭和打开文件和目录。

数据节点

datanode 是具有 GNU/Linux 操作系统和 datanode 软件的商品硬件。对于集群中的每个节点(商品硬件/系统),都会有一个数据节点。这些节点管理其系统的数据存储。

  • 数据节点根据客户端请求对文件系统执行读写操作。

  • 它们还根据namenode的指令进行区块创建、删除、复制等操作。

堵塞

一般用户数据存储在HDFS的文件中。文件系统中的文件将被分成一个或多个段和/或存储在单个数据节点中。这些文件段称为块。换句话说,HDFS 可以读取或写入的最小数据量称为块。默认块大小为 64MB,但可以根据需要更改 HDFS 配置来增加。

HDFS 的目标

故障检测和恢复– 由于 HDFS 包含大量商品硬件,因此组件故障频繁。因此,HDFS 应该具有快速自动故障检测和恢复的机制。

巨大的数据集– HDFS 每个集群应该有数百个节点来管理具有巨大数据集的应用程序。

Hardware at data – 当计算发生在数据附近时,可以有效地完成请求的任务。特别是在涉及大量数据集的情况下,它减少了网络流量并增加了吞吐量。

Hadoop – HDFS 操作

启动 HDFS

最初你必须格式化配置的 HDFS 文件系统,打开 namenode(HDFS 服务器),并执行以下命令。

$ hadoop namenode -format 

格式化HDFS后,启动分布式文件系统。以下命令将启动名称节点以及数据节点作为集群。

$ start-dfs.sh 

列出 HDFS 中的文件

在服务器中加载信息后,我们可以使用‘ls’找到目录中的文件列表、文件的状态下面给出了ls的语法,您可以将其作为参数传递给目录或文件名。

$ $HADOOP_HOME/bin/hadoop fs -ls <args>

插入数据到 HDFS

假设我们在本地系统中名为 file.txt 的文件中有数据应该保存在 hdfs 文件系统中。按照下面给出的步骤在 Hadoop 文件系统中插入所需的文件。

第1步

您必须创建一个输入目录。

$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/input 

第2步

使用 put 命令将数据文件从本地系统传输并存储到 Hadoop 文件系统。

$ $HADOOP_HOME/bin/hadoop fs -put /home/file.txt /user/input 

第 3 步

您可以使用 ls 命令验证文件。

$ $HADOOP_HOME/bin/hadoop fs -ls /user/input 

从 HDFS 检索数据

假设我们在 HDFS 中有一个名为outfile的文件下面给出了一个从 Hadoop 文件系统中检索所需文件的简单演示。

第1步

最初,使用cat命令查看来自 HDFS 的数据

$ $HADOOP_HOME/bin/hadoop fs -cat /user/output/outfile 

第2步

使用get命令从 HDFS 获取文件到本地文件系统

$ $HADOOP_HOME/bin/hadoop fs -get /user/output/ /home/hadoop_tp/ 

关闭 HDFS

您可以使用以下命令关闭 HDFS。

$ stop-dfs.sh 

Hadoop – 命令参考

还有很多更多的命令“$ HADOOP_HOME /斌/ Hadoop的FS”不是在这里表现出来,虽然这些基本的操作都将让你开始。不带附加参数运行 ./bin/hadoop dfs 将列出可以使用 FsShell 系统运行的所有命令。此外,如果您遇到困难$HADOOP_HOME/bin/hadoop fs -help commandName 将显示相关操作的简短使用摘要。

所有操作的表格如下所示。以下约定用于参数 –

"<path>" means any file or directory name. 
"<path>..." means one or more file or directory names. 
"<file>" means any filename. 
"<src>" and "<dest>" are path names in a directed operation. 
"<localSrc>" and "<localDest>" are paths as above, but on the local file system. 

所有其他文件和路径名都是指 HDFS 中的对象。

Sr.No 命令和描述
1

-ls <path>

列出由路径指定的目录的内容,显示每个条目的名称、权限、所有者、大小和修改日期。

2

-lsr <path>

行为类似于 -ls,但递归地显示路径的所有子目录中的条目。

3

-du <path>

显示所有匹配路径的文件的磁盘使用量,以字节为单位;文件名以完整的 HDFS 协议前缀报告。

4

-dus <path>

类似于 -du,但会打印路径中所有文件/目录的磁盘使用情况摘要。

5

-mv <src><dest>

将 src 指示的文件或目录移动到 HDFS 中的 dest。

6

-cp <src> <dest>

将 src 标识的文件或目录复制到 HDFS 中的 dest。

7

-rm <path>

删除由路径标识的文件或空目录。

8

-rmr <path>

删除由路径标识的文件或目录。递归删除任何子条目(即路径的文件或子目录)。

9

-put <localSrc> <dest>

将文件或目录从 localSrc 标识的本地文件系统复制到 DFS 中的 dest。

10

-copyFromLocal <localSrc> <dest>

与 -put 相同

11

-moveFromLocal <localSrc> <dest>

将文件或目录从 localSrc 标识的本地文件系统复制到 HDFS 中的 dest,然后在成功时删除本地副本。

12

-get [-crc] <src> <localDest>

将 src 标识的 HDFS 中的文件或目录复制到 localDest 标识的本地文件系统路径。

13

-getmerge <src> <localDest>

检索与 HDFS 中的路径 src 匹配的所有文件,并将它们复制到 localDest 标识的本地文件系统中的单个合并文件。

14

-cat <filen-ame>

在标准输出上显示文件名的内容。

15

-copyToLocal <src> <localDest>

与 -get 相同

16

-moveToLocal <src> <localDest>

像 -get 一样工作,但在成功时删除 HDFS 副本。

17

-mkdir <path>

在 HDFS 中创建一个名为 path 的目录。

在路径中创建任何缺失的父目录(例如,Linux 中的 mkdir -p)。

18

-setrep [-R] [-w] rep <path>

将路径标识的文件的目标复制因子设置为 rep。(实际复制因子会随着时间的推移向目标移动)

19

-touchz <path>

在包含当前时间作为时间戳的路径上创建一个文件。如果文件已存在于路径中,则失败,除非文件大小已为 0。

20

-test -[ezd] <path>

如果路径存在则返回 1;长度为零;或者是一个目录或者 0 否则。

21

-stat [format] <path>

打印有关路径的信息。格式是一个字符串,它接受以块为单位的文件大小 (%b)、文件名 (%n)、块大小 (%o)、复制 (%r) 和修改日期 (%y, %Y)。

22

-tail [-f] <file2name>

显示标准输出上的最后 1KB 文件。

23

-chmod [-R] mode,mode,… <path>…

更改与路径标识的一个或多个对象关联的文件权限…。使用 R 递归执行更改。模式是 3 位八进制模式,或 {augo}+/-{rwxX}。假设没有指定范围并且不应用 umask。

24

-chown [-R] [owner][:[group]] <path>…

为由路径标识的文件或目录设置所有者用户和/或组…。如果指定了 -R,则递归设置所有者。

25

-chgrp [-R] group <path>…

为由路径标识的文件或目录设置所属组…。如果指定了 -R,则递归设置组。

26

-help <cmd-name>

返回上面列出的命令之一的使用信息。您必须省略 cmd 中的前导“-”字符。

Hadoop – MapReduce

MapReduce 是一个框架,我们可以使用它编写应用程序,以可靠的方式在大型商用硬件集群上并行处理大量数据。

什么是 MapReduce?

MapReduce是一种基于java的分布式计算处理技术和程序模型。MapReduce 算法包含两个重要的任务,即 Map 和 Reduce。Map 获取一组数据并将其转换为另一组数据,其中单个元素被分解为元组(键/值对)。其次,reduce 任务,它将地图的输出作为输入,并将这些数据元组组合成更小的元组集合。正如 MapReduce 名称的顺序所暗示的那样,reduce 任务总是在 map 作业之后执行。

MapReduce 的主要优点是易于在多个计算节点上扩展数据处理。在 MapReduce 模型下,数据处理原语称为映射器和化简器。将数据处理应用程序分解为映射器和化器有时并非易事。但是,一旦我们以 MapReduce 形式编写应用程序,将应用程序扩展到在集群中运行数百、数千甚至数万台机器仅仅是配置更改。这种简单的可扩展性吸引了许多程序员使用 MapReduce 模型。

算法

  • 通常 MapReduce 范式是基于将计算机发送到数据所在的位置!

  • MapReduce 程序分三个阶段执行,分别是map 阶段、shuffle 阶段和reduce 阶段。

    • Map stage – 地图或映射器的工作是处理输入数据。通常输入数据以文件或目录的形式存储在Hadoop文件系统(HDFS)中。输入文件逐行传递给映射器函数。映射器处理数据并创建几个小数据块。

    • Reduce 阶段– 此阶段是Shuffle阶段和Reduce阶段的组合。Reducer 的工作是处理来自映射器的数据。处理后,它会产生一组新的输出,这些输出将存储在 HDFS 中。

  • 在 MapReduce 作业期间,Hadoop 将 Map 和 Reduce 任务发送到集群中的相应服务器。

  • 该框架管理数据传递的所有细节,例如发出任务、验证任务完成以及在节点之间的集群周围复制数据。

  • 大多数计算发生在具有本地磁盘上的数据的节点上,从而减少了网络流量。

  • 完成给定的任务后,集群收集并缩减数据以形成适当的结果,并将其发送回Hadoop服务器。

MapReduce 算法

输入和输出(Java 视角)

MapReduce 框架对 <key, value> 对进行操作,即框架将作业的输入视为一组 <key, value> 对,并产生一组 <key, value> 对作为作业的输出,可以想象有不同的类型。

键和值类应该由框架以序列化方式进行,因此需要实现 Writable 接口。此外,关键类必须实现 Writable-Comparable 接口以方便框架的排序。MapReduce 作业的输入和输出类型-(输入)<k1, v1> → map → <k2, v2> → reduce → <k3, v3>(输出)。

输入 输出
Map <k1​​, v1> 列表 (<k2, v2>)
Reduce <k2, 列表(v2)> 列表 (<k3, v3>)

术语

  • PayLoad – 应用程序实现 Map 和 Reduce 功能,并构成工作的核心。

  • Mapper – Mapper 将输入键/值对映射到一组中间键/值对。

  • NamedNode – 管理 Hadoop 分布式文件系统 (HDFS) 的节点。

  • DataNode – 在进行任何处理之前预先显示数据的节点。

  • MasterNode – JobTracker 运行并接受来自客户端的作业请求的节​​点。

  • SlaveNode – Map 和 Reduce 程序运行的节点。

  • JobTracker – 安排作业并跟踪分配作业到任务跟踪器。

  • 任务跟踪器– 跟踪任务并向 JobTracker 报告状态。

  • 作业– 程序是跨数据集执行 Mapper 和 Reducer。

  • 任务– 在数据切片上执行 Mapper 或 Reducer。

  • 任务尝试– 尝试在 SlaveNode 上执行任务的特定实例。

示例场景

下面给出了有关组织的电力消耗的数据。它包含每月的用电量和各年的年平均值。

二月 三月 四月 可能 七月 八月 九月 十月 十一月 十二月 平均
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果将上述数据作为输入,我们必须编写应用程序来处理它并产生诸如查找最大使用年份、最小使用年份等结果。对于记录数有限的程序员来说,这是一个过渡。他们将简单地编写逻辑以产生所需的输出,并将数据传递给编写的应用程序。

但是,想想代表特定国家所有大型工业自其形成以来的电力消耗的数据。

当我们编写应用程序来处理这样的大量数据时,

  • 它们将需要大量时间来执行。

  • 当我们将数据从源移动到网络服务器等时,将会有大量的网络流量。

为了解决这些问题,我们有了 MapReduce 框架。

输入数据

上述数据保存为sample.txt并作为输入给出。输入文件如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

示例程序

下面给出了使用 MapReduce 框架的示例数据的程序。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

将上述程序另存为ProcessUnits.java。下面解释程序的编译和执行。

工艺单元程序的编译与执行

让我们假设我们位于 Hadoop 用户的主目录中(例如 /home/hadoop)。

按照下面给出的步骤编译和执行上述程序。

第1步

下面的命令是创建一个目录来存放编译后的java类。

$ mkdir units 

第2步

下载Hadoop-core-1.2.1.jar,用于编译和执行MapReduce程序。访问以下链接mvnrepository.com下载 jar。让我们假设下载的文件夹是/home/hadoop/。

第 3 步

以下命令用于编译ProcessUnits.java程序并为该程序创建一个 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

第四步

以下命令用于在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

第 5 步

以下命令用于将名为sample.txt的输入文件复制到HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

第 6 步

以下命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

第 7 步

以下命令用于通过从输入目录中获取输入文件来运行 Eleunit_max 应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

稍等片刻,直到文件被执行。执行后,如下图,输出将包含输入分割数、Map任务数、reducer任务数等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40 

第 8 步

以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

步骤 9

以下命令用于查看Part-00000文件中的输出该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

下面是 MapReduce 程序生成的输出。

1981    34 
1984    40 
1985    45 

第 10 步

以下命令用于将输出文件夹从 HDFS 复制到本地文件系统进行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有 Hadoop 命令都由$HADOOP_HOME/bin/hadoop命令调用运行不带任何参数的 Hadoop 脚本会打印所有命令的描述。

用法– hadoop [–config confdir] 命令

下表列出了可用的选项及其说明。

Sr.No. 选项和说明
1

namenode -format

格式化 DFS 文件系统。

2

secondarynamenode

运行 DFS 辅助名称节点。

3

namenode

运行 DFS namenode。

4

datanode

运行 DFS 数据节点。

5

dfsadmin

运行 DFS 管理客户端。

6

mradmin

运行 Map-Reduce 管理客户端。

7

fsck

运行 DFS 文件系统检查实用程序。

8

fs

运行通用文件系统用户客户端。

9

balancer

运行集群平衡实用程序。

10

oiv

将离线 fsimage 查看器应用于 fsimage。

11

fetchdt

从 NameNode 获取委托令牌。

12

jobtracker

运行 MapReduce 作业跟踪器节点。

13

pipes

运行管道作业。

14

tasktracker

运行 MapReduce 任务跟踪器节点。

15

historyserver

将作业历史服务器作为独立的守护进程运行。

16

job

操作 MapReduce 作业。

17

queue

获取有关 JobQueue 的信息。

18

version

打印版本。

19

jar <jar>

运行一个 jar 文件。

20

distcp <srcurl> <desturl>

递归复制文件或目录。

21

distcp2 <srcurl> <desturl>

DistCp 版本 2。

22

archive -archiveName NAME -p <parent path> <src>* <dest>

创建一个 hadoop 存档。

23

classpath

打印获取 Hadoop jar 和所需库所需的类路径。

24

daemonlog

获取/设置每个守护进程的日志级别

如何与 MapReduce 作业交互

用法 – hadoop 作业 [GENERIC_OPTIONS]

以下是 Hadoop 作业中可用的通用选项。

Sr.No. GENERIC_OPTION & 描述
1

-submit <job-file>

提交作业。

2

-status <job-id>

打印地图并减少完成百分比和所有作业计数器。

3

-counter <job-id> <group-name> <countername>

打印计数器值。

4

-kill <job-id>

杀死工作。

5

-events <job-id> <fromevent-#> <#-of-events>

打印 jobtracker 在给定范围内收到的事件详细信息。

6

-history [all] <jobOutputDir> – history < jobOutputDir>

打印作业详细信息、失败和终止的提示详细信息。通过指定 [all] 选项,可以查看有关作业的更多详细信息,例如成功的任务和为每个任务所做的任务尝试。

7

-list[all]

显示所有作业。-list 仅显示尚未完成的作业。

8

-kill-task <task-id>

杀死任务。被杀死的任务不计入失败的尝试。

9

-fail-task <task-id>

任务失败。失败的任务被计入失败的尝试。

10

-set-priority <job-id> <priority>

更改作业的优先级。允许的优先级值为 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW

查看作业状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

查看作业输出目录的历史记录

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

杀死工作

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 

Hadoop – 流媒体

Hadoop 流是 Hadoop 发行版附带的实用程序。此实用程序允许您使用任何可执行文件或脚本作为映射器和/或化简器来创建和运行 Map/Reduce 作业。

使用 Python 的示例

对于 Hadoop 流,我们正在考虑字数问题。Hadoop 中的任何作业都必须有两个阶段:mapper 和 reducer。我们已经在python脚本中为mapper和reducer编写了代码以在Hadoop下运行它。也可以用 Perl 和 Ruby 编写相同的内容。

映射器阶段代码

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

确保该文件具有执行权限(chmod +x /home/expert/hadoop-1.2.1/mapper.py)。

减速器阶段代码

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

将 mapper 和 reducer 代码保存在 Hadoop 主目录中的 mapper.py 和 reducer.py 中。确保这些文件具有执行权限(chmod +x mapper.py 和 chmod +x reducer.py)。由于python对缩进敏感,因此可以从以下链接下载相同的代码。

WordCount 程序的执行

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

其中“\”用于行连续以获得清晰的可读性。

例如,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

流媒体的工作原理

在上面的例子中,mapper 和 reducer 都是 Python 脚本,它们从标准输入读取输入并将输出发送到标准输出。该实用程序将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并监视作业的进度直到完成。

当为映射器指定脚本时,每个映射器任务将在映射器初始化时作为单独的进程启动脚本。当映射器任务运行时,它会将其输入转换为行并将这些行提供给流程的标准输入 (STDIN)。同时,映射器从进程的标准输出(STDOUT)中收集面向行的输出,并将每一行转换为键/值对,作为映射器的输出收集。默认情况下,直到第一个制表符的行的前缀是键,该行的其余部分(不包括制表符)将是值。如果行中没有制表符,则将整行视为键,值为空。但是,这可以根据需要进行定制。

当为reducer 指定脚本时,每个reducer 任务都会将脚本作为单独的进程启动,然后初始化reducer。当 reducer 任务运行时,它将其输入键/值对转换为行并将这些行提供给进程的标准输入 (STDIN)。同时,reducer 从进程的标准输出(STDOUT)中收集面向行的输出,将每一行转换成一个键/值对,作为reducer的输出收集。默认情况下,直到第一个制表符的行的前缀是键,该行的其余部分(不包括制表符)是值。但是,这可以根据特定要求进行定制。

重要命令

Parameters 选项 描述
-input directory/file-name 必需的 映射器的输入位置。
-output directory-name 必需的 减速机的输出位置。
-mapper executable or script or JavaClassName 必需的 映射器可执行文件。
-reducer executable or script or JavaClassName 必需的 减速器可执行文件。
-file file-name 可选的 使映射器、化简器或组合器可执行文件在计算节点上本地可用。
-inputformat JavaClassName 可选的 您提供的类应该返回 Text 类的键/值对。如果未指定,则使用 TextInputFormat 作为默认值。
-outputformat JavaClassName 可选的 您提供的类应该采用 Text 类的键/值对。如果未指定,则使用 TextOutputformat 作为默认值。
-partitioner JavaClassName 可选的 确定将密钥发送到哪个减少的类。
-combiner streamingCommand or JavaClassName 可选的 地图输出的组合器可执行文件。
-cmdenv name=value 可选的 将环境变量传递给流命令。
-inputreader 可选的 对于向后兼容性:指定记录读取器类(而不是输入格式类)。
-verbose 可选的 详细输出。
-lazyOutput 可选的 懒惰地创建输出。例如,如果输出格式基于 FileOutputFormat,则仅在第一次调用 output.collect(或 Context.write)时创建输出文件。
-numReduceTasks 可选的 指定减速器的数量。
-mapdebug 可选的 地图任务失败时调用的脚本。
-reducedebug 可选的 当减少任务失败时调用的脚本。

Hadoop – 多节点集群

本章解释了在分布式环境中 Hadoop 多节点集群的设置。

由于无法演示整个集群,我们用三个系统(一主两从)来解释Hadoop集群环境;下面给出的是它们的IP地址。

  • Hadoop Master: 192.168.1.15 (hadoop-master)
  • Hadoop 从站:192.168.1.16 (hadoop-slave-1)
  • Hadoop 从站:192.168.1.17 (hadoop-slave-2)

按照下面给出的步骤设置 Hadoop 多节点集群。

安装 Java

Java 是 Hadoop 的主要先决条件。首先,您应该使用“java -version”验证系统中是否存在java。java version 命令的语法如下。

$ java -version

如果一切正常,它将为您提供以下输出。

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统中未安装 java,请按照给定的步骤安装 java。

第1步

访问以下链接www.oracle.com下载 java (JDK <最新版本> – X64.tar.gz)

然后jdk-7u71-linux-x64.tar.gz将被下载到您的系统中。

第2步

通常,您会在 Downloads 文件夹中找到下载的 java 文件。验证它并使用以下命令提取jdk-7u71-linux-x64.gz文件。

$ cd Downloads/
$ ls
jdk-7u71-Linux-x64.gz

$ tar zxf jdk-7u71-Linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-Linux-x64.gz

第 3 步

要使所有用户都可以使用 java,您必须将其移动到“/usr/local/”位置。打开根目录,然后键入以下命令。

$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit

第四步

要设置PATHJAVA_HOME变量,请将以下命令添加到~/.bashrc文件中。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=PATH:$JAVA_HOME/bin

现在从终端验证java -version命令,如上所述。按照上述过程并在所有集群节点中安装 java。

创建用户帐户

在主系统和从系统上创建系统用户帐户以使用 Hadoop 安装。

# useradd hadoop 
# passwd hadoop

映射节点

您必须在所有节点上的/etc/文件夹中编辑hosts文件,指定每个系统的 IP 地址,然后是它们的主机名。

# vi /etc/hosts
enter the following lines in the /etc/hosts file.

192.168.1.109 hadoop-master 
192.168.1.145 hadoop-slave-1 
192.168.56.1 hadoop-slave-2

配置基于密钥的登录

在每个节点中设置 ssh,以便它们可以在没有任何密码提示的情况下相互通信。

# su hadoop 
$ ssh-keygen -t rsa 
$ ssh-copy-id -i ~/.ssh/id_rsa.pub tutorialspoint@hadoop-master 
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_tp1@hadoop-slave-1 
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_tp2@hadoop-slave-2 
$ chmod 0600 ~/.ssh/authorized_keys 
$ exit

安装 Hadoop

在主服务器中,使用以下命令下载并安装 Hadoop。

# mkdir /opt/hadoop 
# cd /opt/hadoop/ 
# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.1/hadoop-1.2.0.tar.gz 
# tar -xzf hadoop-1.2.0.tar.gz 
# mv hadoop-1.2.0 hadoop
# chown -R hadoop /opt/hadoop 
# cd /opt/hadoop/hadoop/

配置 Hadoop

您必须通过进行以下更改来配置 Hadoop 服务器,如下所示。

核心站点.xml

打开core-site.xml文件并按如下所示进行编辑。

<configuration>
   <property> 
      <name>fs.default.name</name> 
      <value>hdfs://hadoop-master:9000/</value> 
   </property> 
   <property> 
      <name>dfs.permissions</name> 
      <value>false</value> 
   </property> 
</configuration>

hdfs-site.xml

打开hdfs-site.xml文件并进行如下编辑。

<configuration>
   <property> 
      <name>dfs.data.dir</name> 
      <value>/opt/hadoop/hadoop/dfs/name/data</value> 
      <final>true</final> 
   </property> 

   <property> 
      <name>dfs.name.dir</name> 
      <value>/opt/hadoop/hadoop/dfs/name</value> 
      <final>true</final> 
   </property> 

   <property> 
      <name>dfs.replication</name> 
      <value>1</value> 
   </property> 
</configuration>

mapred-site.xml

打开mapred-site.xml文件并按如下所示进行编辑。

<configuration>
   <property> 
      <name>mapred.job.tracker</name> 
      <value>hadoop-master:9001</value> 
   </property> 
</configuration>

hadoop-env.sh

打开hadoop-env.sh文件并编辑 JAVA_HOME、HADOOP_CONF_DIR 和 HADOOP_OPTS,如下所示。

注意– 根据您的系统配置设置 JAVA_HOME。

export JAVA_HOME=/opt/jdk1.7.0_17
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true
export HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf

在从服务器上安装 Hadoop

按照给定的命令在所有从服务器上安装 Hadoop。

# su hadoop 
$ cd /opt/hadoop 
$ scp -r hadoop hadoop-slave-1:/opt/hadoop 
$ scp -r hadoop hadoop-slave-2:/opt/hadoop

在主服务器上配置 Hadoop

打开主服务器并按照给定的命令对其进行配置。

# su hadoop 
$ cd /opt/hadoop/hadoop

配置主节点

$ vi etc/hadoop/masters

hadoop-master

配置从节点

$ vi etc/hadoop/slaves

hadoop-slave-1 
hadoop-slave-2

在 Hadoop Master 上格式化名称节点

# su hadoop 
$ cd /opt/hadoop/hadoop 
$ bin/hadoop namenode –format
11/10/14 10:58:07 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************ 
STARTUP_MSG: Starting NameNode 
STARTUP_MSG: host = hadoop-master/192.168.1.109 
STARTUP_MSG: args = [-format] 
STARTUP_MSG: version = 1.2.0 
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1479473;
compiled by 'hortonfo' on Mon May 6 06:59:37 UTC 2013 
STARTUP_MSG: java = 1.7.0_71 

************************************************************/
11/10/14 10:58:08 INFO util.GSet: Computing capacity for map BlocksMap
editlog=/opt/hadoop/hadoop/dfs/name/current/edits
………………………………………………….
………………………………………………….
…………………………………………………. 
11/10/14 10:58:08 INFO common.Storage: Storage directory 
/opt/hadoop/hadoop/dfs/name has been successfully formatted.
11/10/14 10:58:08 INFO namenode.NameNode: 
SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at hadoop-master/192.168.1.15
************************************************************/

启动 Hadoop 服务

下面的命令是启动Hadoop-Master上的所有Hadoop服务。

$ cd $HADOOP_HOME/sbin
$ start-all.sh

在 Hadoop 集群中添加一个新的 DataNode

下面给出了将新节点添加到 Hadoop 集群时要遵循的步骤。

联网

使用一些适当的网络配置将新节点添加到现有的 Hadoop 集群。假设以下网络配置。

对于新节点配置 –

IP address : 192.168.1.103 
netmask : 255.255.255.0
hostname : slave3.in

添加用户和 SSH 访问

添加用户

在一个新节点上,使用以下命令添加“hadoop”用户并将 Hadoop 用户的密码设置为“hadoop123”或任何你想要的。

useradd hadoop
passwd hadoop

从主站到新从站的设置密码较少。

在master上执行以下操作

mkdir -p $HOME/.ssh 
chmod 700 $HOME/.ssh 
ssh-keygen -t rsa -P '' -f $HOME/.ssh/id_rsa 
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys 
chmod 644 $HOME/.ssh/authorized_keys
Copy the public key to new slave node in hadoop user $HOME directory
scp $HOME/.ssh/id_rsa.pub [email protected]:/home/hadoop/

在从站上执行以下操作

登录到 Hadoop。如果没有,请登录到 hadoop 用户。

su hadoop ssh -X [email protected]

将公钥的内容复制到文件“$HOME/.ssh/authorized_keys”中,然后通过执行以下命令更改其权限。

cd $HOME
mkdir -p $HOME/.ssh 
chmod 700 $HOME/.ssh
cat id_rsa.pub >>$HOME/.ssh/authorized_keys 
chmod 644 $HOME/.ssh/authorized_keys

从主机检查 ssh 登录。现在检查是否可以在没有主节点密码的情况下通过 ssh 连接到新节点。

ssh [email protected] or hadoop@slave3

设置新节点的主机名

您可以在文件/etc/sysconfig/network 中设置主机名

On new slave3 machine

NETWORKING = yes 
HOSTNAME = slave3.in

要使更改生效,请重新启动机器或对具有相应主机名的新机器运行 hostname 命令(重新启动是一个不错的选择)。

在 slave3 节点机器上 –

主机名 slave3.in

使用以下几行更新集群所有机器上的/etc/hosts

192.168.1.102 slave3.in slave3

现在尝试使用主机名 ping 机器以检查它是否解析为 IP。

在新节点机器上 –

ping master.in

在新节点上启动 DataNode

使用$HADOOP_HOME/bin/hadoop-daemon.sh 脚本手动启动 datanode 守护进程它会自动联系主节点(NameNode)并加入集群。我们还应该将新节点添加到主服务器的 conf/slaves 文件中。基于脚本的命令将识别新节点。

登录新节点

su hadoop or ssh -X [email protected]

使用以下命令在新添加的从节点上启动 HDFS

./bin/hadoop-daemon.sh start datanode

在新节点上检查 jps 命令的输出。它看起来如下。

$ jps
7141 DataNode
10312 Jps

从 Hadoop 集群中删除 DataNode

我们可以在集群运行时即时从集群中删除节点,而不会丢失任何数据。HDFS 提供了退役功能,可确保安全地移除节点。要使用它,请按照以下步骤操作 –

第 1 步 – 登录主

登录到安装了 Hadoop 的主机用户。

$ su hadoop

第 2 步 – 更改集群配置

在启动集群之前必须配置一个排除文件。将名为 dfs.hosts.exclude 的键添加到我们的$HADOOP_HOME/etc/hadoop/hdfs-site.xml文件中。与此键关联的值提供 NameNode 本地文件系统上文件的完整路径,其中包含不允许连接到 HDFS 的机器列表。

例如,将这些行添加到etc/hadoop/hdfs-site.xml文件中。

<property> 
   <name>dfs.hosts.exclude</name> 
   <value>/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt</value> 
   <description>DFS exclude</description> 
</property>

步骤 3 – 确定要退役的主机

每台要退役的机器都应该添加到hdfs_exclude.txt标识的文件中,每行一个域名。这将阻止它们连接到 NameNode。的内容“/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt”文件如下所示,如果你想删除DataNode2。

slave2.in

第 4 步 – 强制重新加载配置

运行不带引号的命令“$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes”

$ $HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes

这将强制 NameNode 重新读取其配置,包括新更新的“排除”文件。它将在一段时间内停用节点,从而有时间将每个节点的块复制到计划保持活动状态的机器上。

slave2.in 上,检查 jps 命令输出。一段时间后,您将看到 DataNode 进程自动关闭。

步骤 5 – 关闭节点

退役过程完成后,退役的硬件可以安全关闭进行维护。向 dfsadmin 运行 report 命令以检查退役状态。以下命令将描述退役节点和连接到集群的节点的状态。

$ $HADOOP_HOME/bin/hadoop dfsadmin -report

步骤 6 – 再次编辑排除文件

一旦机器退役,它们就可以从“排除”文件中删除。再次运行“$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes”会将排除文件读回NameNode;允许DataNodes在维护完成后重新加入集群,或者集群中再次需要额外的容量等。

特别注意– 如果遵循上述过程并且 tasktracker 进程仍在节点上运行,则需要将其关闭。一种方法是像我们在上述步骤中所做的那样断开机器的连接。Master 将自动识别该过程并宣布为死亡。删除 tasktracker 不需要遵循相同的过程,因为与 DataNode 相比,它并不重要。DataNode 包含您要安全删除而不丢失任何数据的数据。

tasktracker 可以随时通过以下命令运行/关闭。

$ $HADOOP_HOME/bin/hadoop-daemon.sh stop tasktracker
$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker

觉得文章有用?

点个广告表达一下你的爱意吧 !😁