背景
在Hadoop的MapReduce过程中,每个map task处理完数据后,如果存在自定义Combiner类,会先进行一次本地的reduce操作,然后把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下:
MapReduce自定义分区
[infobox title="自定义Partitioner"]
public class FlowPartition extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
String line = text.toString();
if (line.startsWith("135")){
return 0;
}else if(line.startsWith("136")){
return 1;
}else if(line.startsWith("137")){
return 2;
}else if(line.startsWith("138")){
return 3;
}else if(line.startsWith("139")){
return 4;
}else{
return 5;
}
}
}
[/infobox]
[infobox title="作业运行添加分区设置"]
设置reduce数量
job.setNumReduceTasks(2);
作业运行添加分区设置:
job.setPartitionerClass(FlowPartition.class);
更改输入与输出路径,并打包到集群上面去运行
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_flow/"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));
[/infobox]
文章评论
:hehe: