HCatalog – 输入输出格式

HCatalog – 输入输出格式


HCatInputFormatHCatOutputFormat接口用于读取从HDFS和加工后的数据,写入所得到的数据转换成使用MapReduce工作HDFS。让我们详细说明输入和输出格式接口。

HCat 输入格式

HCatInputFormat是使用MapReduce作业来读取HCatalog管理表中的数据。HCatInputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像它已发布到表中一样。

Sr.No. 方法名称和描述
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

设置用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

3

public HCatInputFormat setFilter(String filter)throws IOException

在输入表上设置过滤器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException

设置输入格式的属性。

HCatInputFormat API包含以下方法-

  • 设置输入
  • 设置输出模式
  • 获取表架构

要使用HCatInputFormat读取数据,首先使用正在读取的表中的必要信息实例化InputJobInfo,然后使用InputJobInfo调用setInput

您可以使用setOutputSchema方法来包含投影模式,以指定输出字段。如果未指定架构,则将返回表中的所有列。您可以使用 getTableSchema 方法来确定指定输入表的表架构。

HCat 输出格式

HCatOutputFormat 与 MapReduce 作业一起使用以将数据写入 HCatalog 管理的表。HCatOutputFormat 公开了一个 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。

Sr.No. 方法名称和描述
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

设置有关要为作业写入的输出的信息。它查询元数据服务器以查找要用于表的 StorageHandler。如果分区已发布,则会引发错误。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

为要写出到分区的数据设置架构。如果未调用,则默认情况下将表架构用于分区。

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

获得这份工作的唱片作家。它使用 StorageHandler 的默认 OutputFormat 来获取记录编写器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

获取此输出格式的输出提交者。它确保正确提交输出。

HCatOutputFormat API包含以下方法-

  • 设置输出
  • 设置模式
  • 获取表架构

HCatOutputFormat 的第一次调用必须是setOutput任何其他调用都会抛出异常,说明输出格式未初始化。

正在写出的数据的模式由setSchema方法指定您必须调用此方法,提供您正在编写的数据模式。如果您的数据与表架构具有相同的架构,则可以使用HCatOutputFormat.getTableSchema()获取表架构,然后将其传递给setSchema()

例子

下面的 MapReduce 程序从一个表中读取数据,它假定在第二列(“第 1 列”)中有一个整数,并计算它找到的每个不同值的实例数。也就是说,它相当于“ select col1, count(*) from $table group by col1; ”。

例如,如果第二列中的值是 {1, 1, 1, 3, 3, 5},则程序将产生以下值和计数输出 –

1, 3
3, 2
5, 1

现在让我们看看程序代码 –

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在编译上述程序之前,您必须下载一些jar并将它们添加到此应用程序类路径中。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、 libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令将这些jar文件从本地复制HDFS并将它们添加到类路径

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定的程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查您的输出目录(hdfs:user/tmp/hive)是否有输出(part_0000、part_0001)。

觉得文章有用?

点个广告表达一下你的爱意吧 !😁