HBase与MapReduce的集成中使用bulkload的方式将数据直接生成HFile格式时报错
报错时代码
BulkLoadMap
package com.kami.demo10;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/12/19
*/
public class BulkLoadMap extends Configured implements Tool {
//bulkLoad只写一个map代码即可
//将HDFS数据转化成Hfile
public static class BulkLoadData extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] sps = value.toString().split(" ");
String rowkey = sps[0];
String name = sps[1];
String age = sps[2];
Put put = new Put(rowkey.getBytes());
put.addColumn("f1".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("f1".getBytes(), "age".getBytes(), age.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()), put);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job = Job.getInstance(conf, "rua");
job.setJarByClass(BulkLoadMap.class);
job.setMapperClass(BulkLoadData.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));
//数据输入类型 文本类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));
//数据输出类型
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new BulkLoadMap(), args);
System.out.println(run);
}
}
报错异常现象及原因
19/12/19 19:48:47 WARN mapred.LocalJobRunner: job_local244116549_0001
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:43)
at org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:36)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:164)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:610)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:444)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:449)
发生报错 java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue
经过检查发现给map设置数据输出类型的设置放在了HFileOutputFormat2.configureIncrementalLoad之后,在Dirver的这段代码之前要将job的输入路径,输出路径,输出数据类型要先设置好,如果没有设置好输出类型,就会爆出警告,且没有结果数据输出
解决方法
将map的数据输出类型的代码移动到HFileOutputFormat2的配置代码之前,问题解决
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2183");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job = Job.getInstance(conf, "rua");
job.setJarByClass(BulkLoadMap.class);
job.setMapperClass(BulkLoadData.class);
//数据输出类型
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("myuser2")));
FileOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/evaHfile"));
//数据输入类型 文本类型
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/rua.txt"));
return job.waitForCompletion(true) ? 0 : 1;
}
文章评论