使用bulkload的方式将数据直接生成HFile格式时报错异常问题

HBase与MapReduce的集成中使用bulkload的方式将数据直接生成HFile格式时报错

 

[dangerbox title=”报错时代码”]
BulkLoadMap

package com.kami.demo10;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kami
 * @Date 2019/12/19
 */
public class BulkLoadMap extends Configured implements Tool {

    //bulkLoad只写一个map代码即可
    //将HDFS数据转化成Hfile
    public static class BulkLoadData extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] sps = value.toString().split(" ");
            String rowkey = sps[0];
            String name = sps[1];
            String age = sps[2];
            Put put = new Put(rowkey.getBytes());
            put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes());
            context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));

        Job job = Job.getInstance(conf, "rua");
        job.setJarByClass(BulkLoadMap.class);
        job.setMapperClass(BulkLoadData.class);

        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));
        //数据输入类型 文本类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));

        //数据输出类型
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new BulkLoadMap(), args);
        System.out.println(run);
    }
}

[/dangerbox]

[warningbox title=”报错异常现象及原因”]

19/12/19 19:48:47 WARN mapred.LocalJobRunner: job_local244116549_0001
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:43)
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:36)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449)

发生报错 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue

经过检查发现给map设置数据输出类型的设置放在了HFileOutputFormat2.configureIncrementalLoad之后,在Dirver的这段代码之前要将job的输入路径,输出路径,输出数据类型要先设置好,如果没有设置好输出类型,就会爆出警告,且没有结果数据输出

[/warningbox]

[infobox title=”解决方法”]
将map的数据输出类型的代码移动到HFileOutputFormat2的配置代码之前,问题解决

 @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("myuser2"));

        Job job = Job.getInstance(conf, "rua");
        job.setJarByClass(BulkLoadMap.class);
        job.setMapperClass(BulkLoadData.class);

        //数据输出类型
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));

        FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));

        //数据输入类型 文本类型
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));

        return job.waitForCompletion(true) ? 0 : 1;
    }

 

[/infobox]


已发布

分类

, , ,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注