HCatalog – 快速指南
HCatalog – 快速指南
HCatalog – 简介
什么是HCatalog?
HCatalog 是 Hadoop 的表存储管理工具。它将 Hive 元存储的表格数据公开给其他 Hadoop 应用程序。它使使用不同数据处理工具(Pig、MapReduce)的用户能够轻松地将数据写入网格。它确保用户不必担心他们的数据存储在何处或以何种格式存储。
HCatalog 就像 Hive 的一个关键组件,它使用户能够以任何格式和任何结构存储他们的数据。
为什么是HCatalog?
为正确的工作启用正确的工具
Hadoop 生态系统包含不同的数据处理工具,例如 Hive、Pig 和 MapReduce。尽管这些工具不需要元数据,但当元数据存在时,它们仍然可以从中受益。共享元数据存储还使用户可以跨工具更轻松地共享数据。使用 MapReduce 或 Pig 加载和规范化数据,然后通过 Hive 进行分析的工作流程非常常见。如果所有这些工具共享一个元存储,那么每个工具的用户都可以立即访问使用另一个工具创建的数据。不需要装载或转移步骤。
捕获处理状态以启用共享
HCatalog 可以发布您的分析结果。因此,其他程序员可以通过“REST”访问您的分析平台。您发布的模式对其他数据科学家也很有用。其他数据科学家使用您的发现作为后续发现的输入。
将 Hadoop 与一切集成
Hadoop 作为处理和存储环境为企业开辟了许多机会;然而,为了推动采用,它必须与现有工具配合使用并增强现有工具。Hadoop 应作为分析平台的输入或与您的运营数据存储和 Web 应用程序集成。组织应该享受 Hadoop 的价值,而不必学习全新的工具集。REST 服务使用熟悉的 API 和类似 SQL 的语言向企业开放平台。企业数据管理系统使用 HCatalog 与 Hadoop 平台更深入地集成。
HCatalog 架构
下图显示了 HCatalog 的整体架构。
HCatalog 支持读取和写入可以写入SerDe(串行器-解串器)的任何格式的文件。默认情况下,HCatalog 支持 RCFile、CSV、JSON、SequenceFile 和 ORC 文件格式。要使用自定义格式,您必须提供 InputFormat、OutputFormat 和 SerDe。
HCatalog 构建在 Hive Metastore 之上,并结合了 Hive 的 DDL。HCatalog 为 Pig 和 MapReduce 提供读写接口,并使用 Hive 的命令行接口发出数据定义和元数据探索命令。
HCatalog – 安装
Hive、Pig、HBase等所有Hadoop子项目都支持Linux操作系统。因此,您需要在系统上安装 Linux 版本。HCatalog 于 2013 年 3 月 26 日与 Hive 安装合并。 从 Hive-0.11.0 版本开始,HCatalog 附带 Hive 安装。因此,请按照下面给出的步骤安装 Hive,它会自动在您的系统上安装 HCatalog。
步骤 1:验证 JAVA 安装
在安装 Hive 之前,必须在您的系统上安装 Java。您可以使用以下命令来检查您的系统上是否已经安装了 Java –
$ java –version
如果您的系统上已经安装了 Java,您将看到以下响应 –
java version "1.7.0_71" Java(TM) SE Runtime Environment (build 1.7.0_71-b13) Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您的系统上没有安装 Java,那么您需要按照下面给出的步骤进行操作。
第 2 步:安装 Java
通过访问以下链接下载 Java(JDK <最新版本> – X64.tar.gz)http://www.oracle.com/
然后jdk-7u71-linux-x64.tar.gz将被下载到您的系统上。
通常,您会在下载文件夹中找到下载的 Java 文件。验证它并使用以下命令提取jdk-7u71-linux-x64.gz文件。
$ cd Downloads/ $ ls jdk-7u71-linux-x64.gz $ tar zxf jdk-7u71-linux-x64.gz $ ls jdk1.7.0_71 jdk-7u71-linux-x64.gz
要使所有用户都可以使用 Java,您必须将其移动到“/usr/local/”位置。打开root,输入以下命令。
$ su password: # mv jdk1.7.0_71 /usr/local/ # exit
要设置PATH和JAVA_HOME变量,请将以下命令添加到~/.bashrc文件中。
export JAVA_HOME=/usr/local/jdk1.7.0_71 export PATH=PATH:$JAVA_HOME/bin
现在使用命令java -version从终端验证安装,如上所述。
步骤 3:验证 Hadoop 安装
在安装 Hive 之前,必须在您的系统上安装 Hadoop。让我们使用以下命令验证 Hadoop 安装 –
$ hadoop version
如果您的系统上已经安装了 Hadoop,那么您将收到以下响应 –
Hadoop 2.4.1 Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768 Compiled by hortonmu on 2013-10-07T06:28Z Compiled with protoc 2.5.0 From source with checksum 79e53ce7994d1628b240f09af91e1af4
如果您的系统上未安装 Hadoop,请继续执行以下步骤 –
第 4 步:下载 Hadoop
使用以下命令从 Apache Software Foundation 下载并提取 Hadoop 2.4.1。
$ su password: # cd /usr/local # wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/ hadoop-2.4.1.tar.gz # tar xzf hadoop-2.4.1.tar.gz # mv hadoop-2.4.1/* to hadoop/ # exit
第五步:在伪分布式模式下安装Hadoop
以下步骤用于以伪分布式模式安装Hadoop 2.4.1。
设置 Hadoop
您可以通过将以下命令附加到~/.bashrc文件来设置 Hadoop 环境变量。
export HADOOP_HOME=/usr/local/hadoop export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
现在将所有更改应用到当前运行的系统中。
$ source ~/.bashrc
Hadoop 配置
您可以在“$HADOOP_HOME/etc/hadoop”位置找到所有 Hadoop 配置文件。您需要根据您的 Hadoop 基础架构对这些配置文件进行适当的更改。
$ cd $HADOOP_HOME/etc/hadoop
为了使用 Java 开发 Hadoop 程序,您必须通过将JAVA_HOME值替换为系统中 Java 的位置来重置hadoop-env.sh文件中的 Java 环境变量。
export JAVA_HOME=/usr/local/jdk1.7.0_71
下面给出了您必须编辑以配置 Hadoop 的文件列表。
核心站点.xml
该芯-的site.xml文件包含的信息,如用于Hadoop的实例,分配给文件系统的存储器,存储器限制用于存储所述数据的端口号,以及读/写缓冲器的大小。
打开 core-site.xml 并在 <configuration> 和 </configuration> 标记之间添加以下属性。
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration>
hdfs-site.xml
在HDFS-的site.xml文件中包含的信息,如复制数据的值,名称节点的路径,你的本地文件系统的数据管理部路径。它意味着您要存储 Hadoop 基础架构的地方。
让我们假设以下数据。
dfs.replication (data replication value) = 1 (In the following path /hadoop/ is the user name. hadoopinfra/hdfs/namenode is the directory created by hdfs file system.) namenode path = //home/hadoop/hadoopinfra/hdfs/namenode (hadoopinfra/hdfs/datanode is the directory created by hdfs file system.) datanode path = //home/hadoop/hadoopinfra/hdfs/datanode
打开此文件并在此文件的 <configuration>、</configuration> 标记之间添加以下属性。
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.name.dir</name> <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value> </property> <property> <name>dfs.data.dir</name> <value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value> </property> </configuration>
注意– 在上面的文件中,所有属性值都是用户定义的,您可以根据您的 Hadoop 基础架构进行更改。
纱线站点.xml
该文件用于将 yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在此文件的 <configuration>、</configuration> 标记之间添加以下属性。
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
mapred-site.xml
该文件用于指定我们使用的 MapReduce 框架。默认情况下,Hadoop 包含一个yarn-site.xml 模板。首先,您需要使用以下命令将文件从mapred-site,xml.template 复制到mapred-site.xml文件。
$ cp mapred-site.xml.template mapred-site.xml
打开 mapred-site.xml 文件并在此文件的 <configuration>、</configuration> 标记之间添加以下属性。
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
步骤 6:验证 Hadoop 安装
以下步骤用于验证 Hadoop 安装。
名称节点设置
使用命令“hdfs namenode -format”设置名称节点,如下所示 –
$ cd ~ $ hdfs namenode -format
预期结果如下 –
10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG: /************************************************************ STARTUP_MSG: Starting NameNode STARTUP_MSG: host = localhost/192.168.1.11 STARTUP_MSG: args = [-format] STARTUP_MSG: version = 2.4.1 ... ... 10/24/14 21:30:56 INFO common.Storage: Storage directory /home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted. 10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0 10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11 ************************************************************/
验证 Hadoop DFS
以下命令用于启动 DFS。执行此命令将启动您的 Hadoop 文件系统。
$ start-dfs.sh
预期输出如下 –
10/24/14 21:37:56 Starting namenodes on [localhost] localhost: starting namenode, logging to /home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-namenode-localhost.out localhost: starting datanode, logging to /home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-datanode-localhost.out Starting secondary namenodes [0.0.0.0]
验证纱线脚本
以下命令用于启动 Yarn 脚本。执行此命令将启动您的 Yarn 守护进程。
$ start-yarn.sh
预期输出如下 –
starting yarn daemons starting resourcemanager, logging to /home/hadoop/hadoop-2.4.1/logs/ yarn-hadoop-resourcemanager-localhost.out localhost: starting nodemanager, logging to /home/hadoop/hadoop-2.4.1/logs/yarn-hadoop-nodemanager-localhost.out
在浏览器上访问 Hadoop
访问 Hadoop 的默认端口号是 50070。使用以下 URL 在浏览器上获取 Hadoop 服务。
http://localhost:50070/
验证集群的所有应用程序
访问集群所有应用的默认端口号是8088。使用下面的url访问这个服务。
http://localhost:8088/
完成 Hadoop 的安装后,继续下一步并在您的系统上安装 Hive。
第 7 步:下载 Hive
我们在本教程中使用 hive-0.14.0。您可以通过访问以下链接http://apache.petsads.us/hive/hive-0.14.0/下载它。让我们假设它被下载到/Downloads目录中。在这里,我们为本教程下载名为“ apache-hive-0.14.0-bin.tar.gz ”的Hive 存档。以下命令用于验证下载 –
$ cd Downloads $ ls
成功下载后,您会看到以下响应 –
apache-hive-0.14.0-bin.tar.gz
第 8 步:安装 Hive
在您的系统上安装 Hive 需要执行以下步骤。让我们假设 Hive 存档已下载到/Downloads目录中。
提取和验证 Hive 存档
以下命令用于验证下载并提取 Hive 存档 –
$ tar zxvf apache-hive-0.14.0-bin.tar.gz $ ls
成功下载后,您会看到以下响应 –
apache-hive-0.14.0-bin apache-hive-0.14.0-bin.tar.gz
将文件复制到 /usr/local/hive 目录
我们需要从超级用户“su -”复制文件。以下命令用于将文件从解压目录复制到/usr/local/hive目录。
$ su - passwd: # cd /home/user/Download # mv apache-hive-0.14.0-bin /usr/local/hive # exit
为 Hive 设置环境
您可以通过将以下几行附加到~/.bashrc文件来设置 Hive 环境–
export HIVE_HOME=/usr/local/hive export PATH=$PATH:$HIVE_HOME/bin export CLASSPATH=$CLASSPATH:/usr/local/Hadoop/lib/*:. export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/*:.
以下命令用于执行 ~/.bashrc 文件。
$ source ~/.bashrc
步骤 9:配置 Hive
要使用 Hadoop 配置 Hive,您需要编辑hive-env.sh文件,该文件位于$HIVE_HOME/conf目录中。以下命令重定向到 Hive配置文件夹并复制模板文件 –
$ cd $HIVE_HOME/conf $ cp hive-env.sh.template hive-env.sh
通过附加以下行来编辑hive-env.sh文件 –
export HADOOP_HOME=/usr/local/hadoop
至此,Hive 安装完成。现在您需要一个外部数据库服务器来配置 Metastore。我们使用 Apache Derby 数据库。
步骤 10:下载并安装 Apache Derby
按照下面给出的步骤下载和安装 Apache Derby –
下载 Apache Derby
以下命令用于下载 Apache Derby。下载需要一些时间。
$ cd ~ $ wget http://archive.apache.org/dist/db/derby/db-derby-10.4.2.0/db-derby-10.4.2.0-bin.tar.gz
以下命令用于验证下载 –
$ ls
成功下载后,您会看到以下响应 –
db-derby-10.4.2.0-bin.tar.gz
提取和验证德比档案
以下命令用于提取和验证 Derby 存档 –
$ tar zxvf db-derby-10.4.2.0-bin.tar.gz $ ls
成功下载后,您会看到以下响应 –
db-derby-10.4.2.0-bin db-derby-10.4.2.0-bin.tar.gz
将文件复制到 /usr/local/derby 目录
我们需要从超级用户“su -”复制。以下命令用于将文件从提取的目录复制到/usr/local/derby目录 –
$ su - passwd: # cd /home/user # mv db-derby-10.4.2.0-bin /usr/local/derby # exit
为德比设置环境
您可以通过将以下几行附加到~/.bashrc文件来设置 Derby 环境–
export DERBY_HOME=/usr/local/derby export PATH=$PATH:$DERBY_HOME/bin export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbytools.jar
以下命令用于执行~/.bashrc 文件–
$ source ~/.bashrc
为 Metastore 创建目录
在 $DERBY_HOME 目录中创建一个名为data 的目录来存储 Metastore 数据。
$ mkdir $DERBY_HOME/data
Derby 安装和环境设置现已完成。
步骤 11:配置 Hive Metastore
配置 Metastore 意味着向 Hive 指定存储数据库的位置。您可以通过编辑$HIVE_HOME/conf目录中的hive-site.xml文件来完成此操作。首先,使用以下命令复制模板文件 –
$ cd $HIVE_HOME/conf $ cp hive-default.xml.template hive-site.xml
编辑hive-site.xml并在 <configuration> 和 </configuration> 标签之间附加以下几行 –
<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:derby://localhost:1527/metastore_db;create = true</value> <description>JDBC connect string for a JDBC metastore</description> </property>
创建一个名为jpox.properties的文件并将以下几行添加到其中 –
javax.jdo.PersistenceManagerFactoryClass = org.jpox.PersistenceManagerFactoryImpl org.jpox.autoCreateSchema = false org.jpox.validateTables = false org.jpox.validateColumns = false org.jpox.validateConstraints = false org.jpox.storeManagerType = rdbms org.jpox.autoCreateSchema = true org.jpox.autoStartMechanismMode = checked org.jpox.transactionIsolation = read_committed javax.jdo.option.DetachAllOnCommit = true javax.jdo.option.NontransactionalRead = true javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver javax.jdo.option.ConnectionURL = jdbc:derby://hadoop1:1527/metastore_db;create = true javax.jdo.option.ConnectionUserName = APP javax.jdo.option.ConnectionPassword = mine
步骤 12:验证 Hive 安装
在运行 Hive 之前,您需要在 HDFS 中创建/tmp文件夹和单独的 Hive 文件夹。在这里,我们使用/user/hive/warehouse文件夹。您需要为这些新创建的文件夹设置写权限,如下所示 –
chmod g+w
现在在验证 Hive 之前将它们设置在 HDFS 中。使用以下命令 –
$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp $ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp $ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse
以下命令用于验证 Hive 安装 –
$ cd $HIVE_HOME $ bin/hive
成功安装 Hive 后,您将看到以下响应 –
Logging initialized using configuration in jar:file:/home/hadoop/hive-0.9.0/lib/hive-common-0.9.0.jar!/ hive-log4j.properties Hive history =/tmp/hadoop/hive_job_log_hadoop_201312121621_1494929084.txt …………………. hive>
您可以执行以下示例命令来显示所有表 –
hive> show tables; OK Time taken: 2.798 seconds hive>
步骤 13:验证 HCatalog 安装
使用以下命令为 HCatalog Home设置系统变量HCAT_HOME。
export HCAT_HOME = $HiVE_HOME/HCatalog
使用以下命令验证 HCatalog 安装。
cd $HCAT_HOME/bin ./hcat
如果安装成功,您将看到以下输出 –
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] usage: hcat { -e "<query>" | -f "<filepath>" } [ -g "<group>" ] [ -p "<perms>" ] [ -D"<name> = <value>" ] -D <property = value> use hadoop value for given property -e <exec> hcat command given from command line -f <file> hcat commands in file -g <group> group for the db/table specified in CREATE statement -h,--help Print help information -p <perms> permissions for the db/table specified in CREATE statement
HCatalog – CLI
HCatalog 命令行界面 (CLI) 可以从命令$HIVE_HOME/HCatalog/bin/hcat 调用,其中 $HIVE_HOME 是 Hive 的主目录。hcat是用于初始化 HCatalog 服务器的命令。
使用以下命令初始化 HCatalog 命令行。
cd $HCAT_HOME/bin ./hcat
如果安装正确完成,那么您将获得以下输出 –
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] usage: hcat { -e "<query>" | -f "<filepath>" } [ -g "<group>" ] [ -p "<perms>" ] [ -D"<name> = <value>" ] -D <property = value> use hadoop value for given property -e <exec> hcat command given from command line -f <file> hcat commands in file -g <group> group for the db/table specified in CREATE statement -h,--help Print help information -p <perms> permissions for the db/table specified in CREATE statement
HCatalog CLI 支持这些命令行选项 –
Sr.No | 选项 | 示例和说明 |
---|---|---|
1 | -G |
hcat -g mygroup … 要创建的表必须具有组“mygroup”。 |
2 | -p |
hcat -p rwxr-xr-x … 要创建的表必须具有读、写和执行权限。 |
3 | -F |
hcat -f myscript.HCatalog … myscript.HCatalog 是包含要执行的 DDL 命令的脚本文件。 |
4 | -e |
hcat -e ‘create table mytable(a int);’ … 将以下字符串视为 DDL 命令并执行它。 |
5 | -D |
hcat -Dkey = value … 将键值对作为 Java 系统属性传递给 HCatalog。 |
6 | —— |
hcat 打印使用消息。 |
注意 –
-
该-g和-p选项是不是强制性的。
-
一次可以提供-e或-f选项,但不能同时提供。
-
选项的顺序无关紧要;您可以按任何顺序指定选项。
Sr.No | DDL 命令和描述 |
---|---|
1 |
CREATE TABLE 使用 HCatalog 创建一个表。如果您使用 CLUSTERED BY 子句创建一个表,您将无法使用 Pig 或 MapReduce 对其进行写入。 |
2 |
ALTER TABLE 支持,除了 REBUILD 和 CONCATENATE 选项。它的行为与 Hive 中的相同。 |
3 |
DROP TABLE 支持的。行为与 Hive 相同(删除完整的表和结构)。 |
4 |
CREATE/ALTER/DROP VIEW 支持的。行为与 Hive 相同。 注意– Pig 和 MapReduce 无法读取或写入视图。 |
5 |
SHOW TABLES 显示表列表。 |
6 |
SHOW PARTITIONS 显示分区列表。 |
7 |
Create/Drop Index 支持 CREATE 和 DROP FUNCTION 操作,但创建的函数仍然必须在 Pig 中注册并放置在 MapReduce 的 CLASSPATH 中。 |
8 |
DESCRIBE 支持的。行为与 Hive 相同。描述结构。 |
上表中的一些命令将在后续章节中进行解释。
HCatalog – 创建表
本章介绍如何创建表以及如何向其中插入数据。在 HCatalog 中创建表的约定与使用 Hive 创建表非常相似。
创建表语句
Create Table 是用于使用 HCatalog 在 Hive Metastore 中创建表的语句。其语法和示例如下 –
句法
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [ROW FORMAT row_format] [STORED AS file_format]
例子
让我们假设您需要使用CREATE TABLE语句创建一个名为employee 的表。下表列出了员工表中的字段及其数据类型–
Sr.No | 字段名称 | 数据类型 |
---|---|---|
1 | 开斋节 | 整数 |
2 | 名称 | 细绳 |
3 | 薪水 | 漂浮 |
4 | 指定 | 细绳 |
以下数据定义了支持的字段,例如注释,行格式字段,例如字段终止符、行终止符和存储文件类型。
COMMENT ‘Employee details’ FIELDS TERMINATED BY ‘\t’ LINES TERMINATED BY ‘\n’ STORED IN TEXT FILE
以下查询使用上述数据创建一个名为employee的表。
./hcat –e "CREATE TABLE IF NOT EXISTS employee ( eid int, name String, salary String, destination String) \ COMMENT 'Employee details' \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY ‘\t’ \ LINES TERMINATED BY ‘\n’ \ STORED AS TEXTFILE;"
如果添加选项IF NOT EXISTS,则 HCatalog 会在表已存在的情况下忽略该语句。
成功创建表后,您将看到以下响应 –
OK Time taken: 5.905 seconds
加载数据语句
一般在SQL中建表后,我们可以使用Insert语句插入数据。但是在 HCatalog 中,我们使用 LOAD DATA 语句插入数据。
在向HCatalog中插入数据时,最好使用LOAD DATA来存储批量记录。加载数据有两种方式:一种是从本地文件系统加载数据,另一种是从Hadoop 文件系统加载。
句法
LOAD DATA 的语法如下 –
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
- LOCAL 是指定本地路径的标识符。它是可选的。
- OVERWRITE 是可选的,用于覆盖表中的数据。
- 分区是可选的。
例子
我们将以下数据插入到表中。它被命名为一个文本文件sample.txt的在/ home / user的目录。
1201 Gopal 45000 Technical manager 1202 Manisha 45000 Proof reader 1203 Masthanvali 40000 Technical writer 1204 Kiran 40000 Hr Admin 1205 Kranthi 30000 Op Admin
以下查询将给定的文本加载到表中。
./hcat –e "LOAD DATA LOCAL INPATH '/home/user/sample.txt' OVERWRITE INTO TABLE employee;"
成功下载后,您会看到以下响应 –
OK Time taken: 15.905 seconds
HCatalog – 更改表
本章介绍如何更改表的属性,例如更改其表名、更改列名、添加列以及删除或替换列。
更改表语句
您可以使用 ALTER TABLE 语句来更改 Hive 中的表。
句法
该语句根据我们希望在表中修改的属性采用以下任何一种语法。
ALTER TABLE name RENAME TO new_name ALTER TABLE name ADD COLUMNS (col_spec[, col_spec ...]) ALTER TABLE name DROP [COLUMN] column_name ALTER TABLE name CHANGE column_name new_name new_type ALTER TABLE name REPLACE COLUMNS (col_spec[, col_spec ...])
下面解释了一些场景。
重命名为…语句
以下查询将表从employee重命名为emp。
./hcat –e "ALTER TABLE employee RENAME TO emp;"
变更声明
下表包含员工表的字段,并显示要更改的字段(以粗体显示)。
Field Name | 从数据类型转换 | 更改字段名称 | 转换为数据类型 |
---|---|---|---|
eid | 整数 | 开斋节 | 整数 |
name | 细绳 | 名称 | 细绳 |
salary | 漂浮 | 薪水 | 双倍的 |
designation | 细绳 | 指定 | 细绳 |
以下查询使用上述数据重命名列名和列数据类型 –
./hcat –e "ALTER TABLE employee CHANGE name ename String;" ./hcat –e "ALTER TABLE employee CHANGE salary salary Double;"
添加列语句
以下查询将名为dept的列添加到员工表中。
./hcat –e "ALTER TABLE employee ADD COLUMNS (dept STRING COMMENT 'Department name');"
替换语句
以下查询删除员工表中的所有列并将其替换为emp和name列 –
./hcat – e "ALTER TABLE employee REPLACE COLUMNS ( eid INT empid Int, ename STRING name String);"
删除表语句
本章介绍如何删除 HCatalog 中的表。当您从 Metastore 中删除表时,它会删除表/列数据及其元数据。它可以是普通表(存储在 Metastore 中)或外部表(存储在本地文件系统中);HCatalog 以相同的方式处理它们,而不管它们的类型。
语法如下 –
DROP TABLE [IF EXISTS] table_name;
以下查询删除名为employee的表–
./hcat –e "DROP TABLE IF EXISTS employee;"
成功执行查询后,您将看到以下响应 –
OK Time taken: 5.3 seconds
HCatalog – 查看
本章介绍如何在 HCatalog 中创建和管理视图。数据库视图是使用CREATE VIEW语句创建的。可以从单个表、多个表或另一个视图创建视图。
要创建视图,用户必须根据具体实现具有适当的系统权限。
创建视图语句
CREATE VIEW创建具有给定名称的视图。如果已存在同名的表或视图,则会引发错误。您可以使用IF NOT EXISTS跳过错误。
如果未提供列名,则视图的列名将自动从定义的 SELECT 表达式中派生。
注意– 如果 SELECT 包含无别名的标量表达式,例如 x+y,则生成的视图列名称将以 _C0、_C1 等形式生成。
重命名列时,还可以提供列注释。注释不会自动从基础列继承。
如果视图的定义 SELECT 表达式无效,则 CREATE VIEW 语句将失败。
句法
CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ] [COMMENT view_comment] [TBLPROPERTIES (property_name = property_value, ...)] AS SELECT ...;
例子
以下是员工表数据。现在让我们看看如何创建一个名为Emp_Deg_View的视图,其中包含薪水大于 35,000 的员工的字段 id、name、Designation 和薪水。
+------+-------------+--------+-------------------+-------+ | ID | Name | Salary | Designation | Dept | +------+-------------+--------+-------------------+-------+ | 1201 | Gopal | 45000 | Technical manager | TP | | 1202 | Manisha | 45000 | Proofreader | PR | | 1203 | Masthanvali | 30000 | Technical writer | TP | | 1204 | Kiran | 40000 | Hr Admin | HR | | 1205 | Kranthi | 30000 | Op Admin | Admin | +------+-------------+--------+-------------------+-------+
以下是基于上述给定数据创建视图的命令。
./hcat –e "CREATE VIEW Emp_Deg_View (salary COMMENT ' salary more than 35,000') AS SELECT id, name, salary, designation FROM employee WHERE salary ≥ 35000;"
输出
OK Time taken: 5.3 seconds
删除视图语句
DROP VIEW 删除指定视图的元数据。当删除一个被其他视图引用的视图时,不会给出警告(依赖视图被悬空无效,必须由用户删除或重新创建)。
句法
DROP VIEW [IF EXISTS] view_name;
例子
以下命令用于删除名为Emp_Deg_View的视图。
DROP VIEW Emp_Deg_View;
HCatalog – 显示表格
您经常希望列出数据库中的所有表或列出表中的所有列。显然,每个数据库都有自己的语法来列出表和列。
Show Tables语句显示所有表的名称。默认情况下,它列出当前数据库中的表或指定数据库中带有IN子句的表。
本章介绍如何在HCatalog 中列出当前数据库中的所有表。
显示表格声明
SHOW TABLES 的语法如下 –
SHOW TABLES [IN database_name] ['identifier_with_wildcards'];
以下查询显示表列表 –
./hcat –e "Show tables;"
成功执行查询后,您将看到以下响应 –
OK emp employee Time taken: 5.3 seconds
HCatalog – 显示分区
分区是用于创建单独表或视图的表格数据的条件。SHOW PARTITIONS 列出给定基表的所有现有分区。分区按字母顺序列出。在 Hive 0.6 之后,还可以指定分区规范的一部分来过滤结果列表。
您可以使用 SHOW PARTITIONS 命令查看特定表中存在的分区。本章介绍如何在 HCatalog 中列出特定表的分区。
显示分区声明
语法如下 –
SHOW PARTITIONS table_name;
以下查询删除名为employee的表–
./hcat –e "Show partitions employee;"
成功执行查询后,您将看到以下响应 –
OK Designation = IT Time taken: 5.3 seconds
动态分区
HCatalog 将表组织成分区。它是一种根据日期、城市和部门等分区列的值将表划分为相关部分的方法。使用分区,可以很容易地查询一部分数据。
例如,名为Tab1的表包含员工数据,例如 id、name、dept 和 yoj(即加入年份)。假设您需要检索 2012 年加入的所有员工的详细信息。查询在整个表中搜索所需信息。但是,如果您将员工数据按年份分区并将其存储在单独的文件中,则会减少查询处理时间。以下示例显示了如何对文件及其数据进行分区 –
以下文件包含员工数据表。
/tab1/employeedata/file1
id, name, dept, yoj 1, gopal, TP, 2012 2, kiran, HR, 2012 3, kaleel, SC, 2013 4, Prasanth, SC, 2013
上面的数据使用 year 分成两个文件。
/tab1/employeedata/2012/file2
1, gopal, TP, 2012 2, kiran, HR, 2012
/tab1/employeedata/2013/file3
3, kaleel, SC, 2013 4, Prasanth, SC, 2013
添加分区
我们可以通过更改表来向表添加分区。假设我们有一个名为employee的表,其中包含Id、Name、Salary、Designation、Dept 和yoj 等字段。
句法
ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec [LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...; partition_spec: : (p_column = p_col_value, p_column = p_col_value, ...)
以下查询用于向员工表添加分区。
./hcat –e "ALTER TABLE employee ADD PARTITION (year = '2013') location '/2012/part2012';"
重命名分区
您可以使用 RENAME-TO 命令重命名分区。它的语法如下 –
./hact –e "ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;"
以下查询用于重命名分区 –
./hcat –e "ALTER TABLE employee PARTITION (year=’1203’) RENAME TO PARTITION (Yoj='1203');"
删除分区
用于删除分区的命令的语法如下 –
./hcat –e "ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec,. PARTITION partition_spec,...;"
以下查询用于删除分区 –
./hcat –e "ALTER TABLE employee DROP [IF EXISTS] PARTITION (year=’1203’);"
HCatalog – 索引
创建索引
索引只不过是表特定列上的指针。创建索引意味着在表的特定列上创建一个指针。它的语法如下 –
CREATE INDEX index_name ON TABLE base_table_name (col_name, ...) AS 'index.handler.class.name' [WITH DEFERRED REBUILD] [IDXPROPERTIES (property_name = property_value, ...)] [IN TABLE index_table_name] [PARTITIONED BY (col_name, ...)][ [ ROW FORMAT ...] STORED AS ... | STORED BY ... ] [LOCATION hdfs_path] [TBLPROPERTIES (...)]
例子
让我们通过一个例子来理解索引的概念。使用相同的员工,我们已经与域ID,名称,工资,型号,以及部先前使用表创建一个名为索引index_salary对工资的列职员表。
以下查询创建一个索引 –
./hcat –e "CREATE INDEX inedx_salary ON TABLE employee(salary) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';"
它是一个指向工资列的指针。如果修改了列,则使用索引值存储更改。
删除索引
以下语法用于删除索引 –
DROP INDEX <index_name> ON <table_name>
以下查询删除索引 index_salary –
./hcat –e "DROP INDEX index_salary ON employee;"
HCatalog – 读写器
HCatalog 包含用于并行输入和输出的数据传输 API,无需使用 MapReduce。此 API 使用表和行的基本存储抽象来从 Hadoop 集群读取数据并将数据写入其中。
Data Transfer API 主要包含三个类;那些是 –
-
HCatReader – 从 Hadoop 集群读取数据。
-
HCatWriter – 将数据写入 Hadoop 集群。
-
DataTransferFactory – 生成读取器和写入器实例。
此 API 适用于主从节点设置。让我们更多地讨论HCatReader和HCatWriter。
HCatReader
HCatReader 是 HCatalog 内部的抽象类,它从要检索记录的地方抽象出底层系统的复杂性。
S. No. | 方法名称和描述 |
---|---|
1 |
Public abstract ReaderContext prepareRead() throws HCatException 这应该在主节点上调用以获取 ReaderContext,然后应该将其序列化并发送从节点。 |
2 |
Public abstract Iterator <HCatRecorder> read() throws HCaException 这应该在从节点调用以读取 HCatRecords。 |
3 |
Public Configuration getConf() 它将返回配置类对象。 |
HCatReader 类用于从 HDFS 读取数据。读取是一个两步过程,其中第一步发生在外部系统的主节点上。第二步在多个从节点上并行进行。
读取是在ReadEntity上完成的。在开始阅读之前,您需要定义一个要从中读取的 ReadEntity。这可以通过ReadEntity.Builder完成。您可以指定数据库名称、表名称、分区和过滤器字符串。例如 –
ReadEntity.Builder builder = new ReadEntity.Builder(); ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10.
上面的代码片段定义了一个 ReadEntity 对象(“实体”),它包含名为mydb的数据库中名为mytbl的表,可用于读取该表的所有行。请注意,在此操作开始之前,此表必须存在于 HCatalog 中。
定义 ReadEntity 后,您使用 ReadEntity 和集群配置获取 HCatReader 的实例 –
HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
下一步是从阅读器获取 ReaderContext 如下 –
ReaderContext cntxt = reader.prepareRead();
HCatWriter
这种抽象是 HCatalog 内部的。这是为了方便从外部系统写入 HCatalog。不要尝试直接实例化它。而是使用 DataTransferFactory。
Sr.No. | 方法名称和描述 |
---|---|
1 |
Public abstract WriterContext prepareRead() throws HCatException 外部系统应该从主节点只调用一次此方法。它返回一个WriterContext。这应该被序列化并发送到从节点以在那里构建HCatWriter。 |
2 |
Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException 应该在从节点上使用此方法来执行写操作。recordItr 是一个迭代器对象,它包含要写入 HCatalog 的记录集合。 |
3 |
Public abstract void abort(WriterContext cntxt) throws HCatException 这个方法应该在主节点上调用。此方法的主要目的是在出现故障时进行清理。 |
4 |
public abstract void commit(WriterContext cntxt) throws HCatException 这个方法应该在主节点上调用。此方法的目的是进行元数据提交。 |
与读取类似,写入也是一个两步过程,其中第一步发生在主节点上。随后,第二步在从节点上并行发生。
写入是在WriteEntity上完成的,它可以以类似于读取的方式构造 –
WriteEntity.Builder builder = new WriteEntity.Builder(); WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();
上面的代码创建了一个 WriteEntity 对象entity
,该对象可用于写入数据库mydb中名为mytbl的表。
创建 WriteEntity 后,下一步是获取 WriterContext –
HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); WriterContext info = writer.prepareWrite();
以上所有步骤都发生在主节点上。然后主节点序列化 WriterContext 对象并使其可供所有从节点使用。
在从节点上,您需要使用 WriterContext 获取 HCatWriter,如下所示 –
HCatWriter writer = DataTransferFactory.getHCatWriter(context);
然后,作者将迭代器作为write
方法的参数–
writer.write(hCatRecordItr);
然后,作者在循环中对该迭代器调用getNext()并写出附加到迭代器的所有记录。
该TestReaderWriter.java文件用于测试HCatreader和HCatWriter类。以下程序演示了如何使用 HCatReader 和 HCatWriter API 从源文件读取数据,然后将其写入目标文件。
import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hive.HCatalog.common.HCatException; import org.apache.hive.HCatalog.data.transfer.DataTransferFactory; import org.apache.hive.HCatalog.data.transfer.HCatReader; import org.apache.hive.HCatalog.data.transfer.HCatWriter; import org.apache.hive.HCatalog.data.transfer.ReadEntity; import org.apache.hive.HCatalog.data.transfer.ReaderContext; import org.apache.hive.HCatalog.data.transfer.WriteEntity; import org.apache.hive.HCatalog.data.transfer.WriterContext; import org.apache.hive.HCatalog.mapreduce.HCatBaseTest; import org.junit.Assert; import org.junit.Test; public class TestReaderWriter extends HCatBaseTest { @Test public void test() throws MetaException, CommandNeedRetryException, IOException, ClassNotFoundException { driver.run("drop table mytbl"); driver.run("create table mytbl (a string, b int)"); Iterator<Entry<String, String>> itr = hiveConf.iterator(); Map<String, String> map = new HashMap<String, String>(); while (itr.hasNext()) { Entry<String, String> kv = itr.next(); map.put(kv.getKey(), kv.getValue()); } WriterContext cntxt = runsInMaster(map); File writeCntxtFile = File.createTempFile("hcat-write", "temp"); writeCntxtFile.deleteOnExit(); // Serialize context. ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile)); oos.writeObject(cntxt); oos.flush(); oos.close(); // Now, deserialize it. ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile)); cntxt = (WriterContext) ois.readObject(); ois.close(); runsInSlave(cntxt); commit(map, true, cntxt); ReaderContext readCntxt = runsInMaster(map, false); File readCntxtFile = File.createTempFile("hcat-read", "temp"); readCntxtFile.deleteOnExit(); oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile)); oos.writeObject(readCntxt); oos.flush(); oos.close(); ois = new ObjectInputStream(new FileInputStream(readCntxtFile)); readCntxt = (ReaderContext) ois.readObject(); ois.close(); for (int i = 0; i < readCntxt.numSplits(); i++) { runsInSlave(readCntxt, i); } } private WriterContext runsInMaster(Map<String, String> config) throws HCatException { WriteEntity.Builder builder = new WriteEntity.Builder(); WriteEntity entity = builder.withTable("mytbl").build(); HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); WriterContext info = writer.prepareWrite(); return info; } private ReaderContext runsInMaster(Map<String, String> config, boolean bogus) throws HCatException { ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build(); HCatReader reader = DataTransferFactory.getHCatReader(entity, config); ReaderContext cntxt = reader.prepareRead(); return cntxt; } private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException { HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum); Iterator<HCatRecord> itr = reader.read(); int i = 1; while (itr.hasNext()) { HCatRecord read = itr.next(); HCatRecord written = getRecord(i++); // Argh, HCatRecord doesnt implement equals() Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0), written.get(0).equals(read.get(0))); Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1), written.get(1).equals(read.get(1))); Assert.assertEquals(2, read.size()); } //Assert.assertFalse(itr.hasNext()); } private void runsInSlave(WriterContext context) throws HCatException { HCatWriter writer = DataTransferFactory.getHCatWriter(context); writer.write(new HCatRecordItr()); } private void commit(Map<String, String> config, boolean status, WriterContext context) throws IOException { WriteEntity.Builder builder = new WriteEntity.Builder(); WriteEntity entity = builder.withTable("mytbl").build(); HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config); if (status) { writer.commit(context); } else { writer.abort(context); } } private static HCatRecord getRecord(int i) { List<Object> list = new ArrayList<Object>(2); list.add("Row #: " + i); list.add(i); return new DefaultHCatRecord(list); } private static class HCatRecordItr implements Iterator<HCatRecord> { int i = 0; @Override public boolean hasNext() { return i++ < 100 ? true : false; } @Override public HCatRecord next() { return getRecord(i); } @Override public void remove() { throw new RuntimeException(); } } }
上述程序以记录的形式从HDFS中读取数据,并将记录数据写入mytable
HCatalog – 输入输出格式
的HCatInputFormat和HCatOutputFormat接口用于读取从HDFS和加工后的数据,写入所得到的数据转换成使用MapReduce工作HDFS。让我们详细说明输入和输出格式接口。
HCat 输入格式
该HCatInputFormat是使用MapReduce作业来读取HCatalog管理表中的数据。HCatInputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像它已发布到表中一样。
Sr.No. | 方法名称和描述 |
---|---|
1 |
public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
2 |
public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
3 |
public HCatInputFormat setFilter(String filter)throws IOException 在输入表上设置过滤器。 |
4 |
public HCatInputFormat setProperties(Properties properties) throws IOException 设置输入格式的属性。 |
HCatInputFormat API 包括以下方法 –
- 设置输入
- 设置输出模式
- 获取表架构
要使用HCatInputFormat读取数据,首先使用正在读取的表中的必要信息实例化InputJobInfo,然后使用InputJobInfo调用setInput。
您可以使用setOutputSchema方法来包含投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。您可以使用 getTableSchema 方法来确定指定输入表的表架构。
HCat 输出格式
HCatOutputFormat 与 MapReduce 作业一起使用以将数据写入 HCatalog 管理的表。HCatOutputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
Sr.No. | 方法名称和描述 |
---|---|
1 |
public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 设置有关要为作业写入的输出的信息。它查询元数据服务器以查找要用于表的 StorageHandler。如果分区已发布,则会引发错误。 |
2 |
public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 为要写出到分区的数据设置架构。如果未调用,则默认情况下将表架构用于分区。 |
3 |
public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获得这份工作的唱片作家。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。 |
4 |
public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保正确提交输出。 |
该HCatOutputFormat API包含以下方法-
- 设置输出
- 设置模式
- 获取表架构
HCatOutputFormat 的第一次调用必须是setOutput;任何其他调用都会抛出异常,说明输出格式未初始化。
正在写出的数据的模式由setSchema方法指定。您必须调用此方法,提供您正在编写的数据模式。如果您的数据与表架构具有相同的架构,则可以使用HCatOutputFormat.getTableSchema()获取表架构,然后将其传递给setSchema()。
例子
下面的 MapReduce 程序从一个表中读取数据,它假定在第二列(“第 1 列”)中有一个整数,并计算它找到的每个不同值的实例数。也就是说,它相当于“ select col1, count(*) from $table group by col1; ”。
例如,如果第二列中的值是 {1, 1, 1, 3, 3, 5},则程序将产生以下值和计数输出 –
1, 3 3, 2 5, 1
现在让我们看看程序代码 –
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.HCatalog.common.HCatConstants; import org.apache.HCatalog.data.DefaultHCatRecord; import org.apache.HCatalog.data.HCatRecord; import org.apache.HCatalog.data.schema.HCatSchema; import org.apache.HCatalog.mapreduce.HCatInputFormat; import org.apache.HCatalog.mapreduce.HCatOutputFormat; import org.apache.HCatalog.mapreduce.InputJobInfo; import org.apache.HCatalog.mapreduce.OutputJobInfo; public class GroupByAge extends Configured implements Tool { public static class Map extends Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> { int age; @Override protected void map( WritableComparable key, HCatRecord value, org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable>.Context context )throws IOException, InterruptedException { age = (Integer) value.get(1); context.write(new IntWritable(age), new IntWritable(1)); } } public static class Reduce extends Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord> { @Override protected void reduce( IntWritable key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord>.Context context )throws IOException ,InterruptedException { int sum = 0; Iterator<IntWritable> iter = values.iterator(); while (iter.hasNext()) { sum++; iter.next(); } HCatRecord record = new DefaultHCatRecord(2); record.set(0, key.get()); record.set(1, sum); context.write(null, record); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String serverUri = args[0]; String inputTableName = args[1]; String outputTableName = args[2]; String dbName = null; String principalID = System .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL); if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "GroupByAge"); HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null)); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); job.setJarByClass(GroupByAge.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null)); HCatSchema s = HCatOutputFormat.getTableSchema(job); System.err.println("INFO: output schema explicitly set for writing:" + s); HCatOutputFormat.setSchema(job, s); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new GroupByAge(), args); System.exit(exitCode); } }
在编译上述程序之前,您必须下载一些jar并将它们添加到此应用程序的类路径中。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、 libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令将这些jar文件从本地复制到HDFS并将它们添加到类路径。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar, hdfs:///tmp/hive-metastore-0.10.0.jar, hdfs:///tmp/libthrift-0.7.0.jar, hdfs:///tmp/hive-exec-0.10.0.jar, hdfs:///tmp/libfb303-0.7.0.jar, hdfs:///tmp/jdo2-api-2.3-ec.jar, hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令编译并执行给定的程序。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
现在,检查您的输出目录(hdfs:user/tmp/hive)是否有输出(part_0000、part_0001)。
HCatalog – 装载机和存储机
该HCatLoader和HCatStorer的API与猪脚本用来读取和写入HCatalog管理表中的数据。这些接口不需要特定于 HCatalog 的设置。
最好对 Apache Pig 脚本有一些了解,以便更好地理解本章。如需进一步参考,请阅读我们的Apache Pig教程。
装载机
HCatLoader 与 Pig 脚本一起使用以从 HCatalog 管理的表中读取数据。使用以下语法使用 HCatloader 将数据加载到 HDFS。
A = LOAD 'tablename' USING org.apache.HCatalog.pig.HCatLoader();
您必须用单引号指定表名:LOAD ‘tablename’。如果您使用的是非默认数据库,则必须将输入指定为 ‘ dbname.tablename’。
Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,则数据库名称为“默认”,并且在为 HCatLoader 指定表时不需要。
下表包含 HCatloader 类的重要方法和说明。
Sr.No. | 方法名称和描述 |
---|---|
1 |
public InputFormat<?,?> getInputFormat()throws IOException 使用 HCatloader 类读取加载数据的输入格式。 |
2 |
public String relativeToAbsolutePath(String location, Path curDir) throws IOException 它返回绝对路径的字符串格式。 |
3 |
public void setLocation(String location, Job job) throws IOException 它设置可以执行作业的位置。 |
4 |
public Tuple getNext() throws IOException 从循环中返回当前元组(key和value)。 |
HCatStorer
HCatStorer 与 Pig 脚本一起用于将数据写入 HCatalog 管理的表。使用以下语法进行存储操作。
A = LOAD ... B = FOREACH A ... ... ... my_processed_data = ... STORE my_processed_data INTO 'tablename' USING org.apache.HCatalog.pig.HCatStorer();
您必须用单引号指定表名:LOAD ‘tablename’。必须在运行 Pig 脚本之前创建数据库和表。如果您使用的是非默认数据库,则必须将输入指定为‘dbname.tablename’。
Hive 元存储允许您在不指定数据库的情况下创建表。如果您以这种方式创建表,则数据库名称为“default”,您无需在store语句中指定数据库名称。
对于USING子句,您可以使用一个字符串参数来表示分区的键/值对。当您写入分区表并且分区列不在输出列中时,这是一个强制性参数。不应引用分区键的值。
下表包含 HCatStorer 类的重要方法和说明。
Sr.No. | 方法名称和描述 |
---|---|
1 |
public OutputFormat getOutputFormat() throws IOException 使用 HCatStorer 类读取存储数据的输出格式。 |
2 |
public void setStoreLocation (String location, Job job) throws IOException 设置执行此商店应用程序的位置。 |
3 |
public void storeSchema (ResourceSchema schema, String arg1, Job job) throws IOException 存储架构。 |
4 |
public void prepareToWrite (RecordWriter writer) throws IOException 它有助于使用 RecordWriter 将数据写入特定文件。 |
5 |
public void putNext (Tuple tuple) throws IOException 将元组数据写入文件。 |
使用 HCatalog 运行 Pig
Pig 不会自动拾取 HCatalog jar。要引入必要的 jar,您可以在 Pig 命令中使用标志或设置环境变量PIG_CLASSPATH和PIG_OPTS,如下所述。
要引入适当的 jars 来使用 HCatalog,只需包含以下标志 –
pig –useHCatalog <Sample pig scripts file>
为执行设置 CLASSPATH
使用以下 CLASSPATH 设置将 HCatalog 与 Apache Pig 同步。
export HADOOP_HOME = <path_to_hadoop_install> export HIVE_HOME = <path_to_hive_install> export HCAT_HOME = <path_to_hcat_install> export PIG_CLASSPATH = $HCAT_HOME/share/HCatalog/HCatalog-core*.jar:\ $HCAT_HOME/share/HCatalog/HCatalog-pig-adapter*.jar:\ $HIVE_HOME/lib/hive-metastore-*.jar:$HIVE_HOME/lib/libthrift-*.jar:\ $HIVE_HOME/lib/hive-exec-*.jar:$HIVE_HOME/lib/libfb303-*.jar:\ $HIVE_HOME/lib/jdo2-api-*-ec.jar:$HIVE_HOME/conf:$HADOOP_HOME/conf:\ $HIVE_HOME/lib/slf4j-api-*.jar
例子
假设我们在 HDFS 中有一个文件student_details.txt,其内容如下。
student_details.txt
001, Rajiv, Reddy, 21, 9848022337, Hyderabad 002, siddarth, Battacharya, 22, 9848022338, Kolkata 003, Rajesh, Khanna, 22, 9848022339, Delhi 004, Preethi, Agarwal, 21, 9848022330, Pune 005, Trupthi, Mohanthy, 23, 9848022336, Bhuwaneshwar 006, Archana, Mishra, 23, 9848022335, Chennai 007, Komal, Nayak, 24, 9848022334, trivendram 008, Bharathi, Nambiayar, 24, 9848022333, Chennai
我们还有一个名为sample_script.pig的示例脚本,位于同一个 HDFS 目录中。该文件包含对学生关系执行操作和转换的语句,如下所示。
student = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',') as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray); student_order = ORDER student BY age DESC; STORE student_order INTO 'student_order_table' USING org.apache.HCatalog.pig.HCatStorer(); student_limit = LIMIT student_order 4; Dump student_limit;
-
该脚本的第一条语句将加载名为student_details.txt的文件中的数据作为名为student的关系。
-
脚本的第二条语句将根据年龄按降序排列关系的元组,并将其存储为student_order。
-
第三条语句将处理后的数据student_order结果存储在名为student_order_table的单独表中。
-
脚本的第四条语句将student_order的前四个元组存储为student_limit。
-
最后,第五条语句将转储关系student_limit的内容。
现在让我们执行sample_script.pig,如下所示。
$./pig -useHCatalog hdfs://localhost:9000/pig_data/sample_script.pig
现在,检查您的输出目录(hdfs:user/tmp/hive)是否有输出(part_0000、part_0001)。