TopN
输出流量使用量在前10的用户信息
数据
输出
排序概述
[infobox title="code"]
[successbox title="FlowBean"]
package com.kami.demo04;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class FlowBean implements WritableComparable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean(long upFlow, long downFlow, long sumFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
//定义一个空参构造器,反射时会用到
public FlowBean() {
}
@Override
public int compareTo(FlowBean obj) {
// int result;
// if (this.sumFlow > obj.getSumFlow()) {
// result = -1;
// } else if (this.sumFlow < obj.getSumFlow()) {
// result = 1;
// } else {
// result = 0;
// }
// return result;
return Long.compare(obj.sumFlow, this.sumFlow);
}
/**
* 定义序列化方法
* dataOutput:
* 框架给我们提供的数据出口,我们通过该对象进行数据序列化操作。
* 温馨提示:
* 注意输出时的数据类型顺序。我们写入的顺序为upFlow,downFlow,sumFlow.
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
/**
* 定义反序列化方法
* dataInput:
* 框架给我们提供的数据来源,我们通过该对象进行数据反序列化操作。
* 温馨提示:
* 注意输入的数据类型顺序要和输出时的数据类型要一一对应,即upFlow,downFlow,sumFlow。如果你不对应可能会导致最终的结果不正确哟~
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return "\t" + upFlow +
"\t" + downFlow +
"\t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
[/successbox]
[successbox title="ruaDriver"]
package com.kami.demo04;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/16
*/
public class ruaDriver {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ruaDriver.class);
job.setMapperClass(ruaMapper.class);
job.setReducerClass(ruaReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.addInputPath(job, new Path("data\\d04\\rua.txt"));
FileOutputFormat.setOutputPath(job, new Path("output\\d10"));
job.setNumReduceTasks(1);
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
[/successbox]
[successbox title="ruaMapper"]
package com.kami.demo04;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class ruaMapper extends Mapper {
// 定义一个TreeMap作为存储数据的容器(天然按key排序)
private TreeMap flowMap = new TreeMap();
private FlowBean kBean;
private Text v;
private long upFlow;
private long downFlow;
private long sumFlow;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
kBean = new FlowBean();
v = new Text();
String[] sps = value.toString().split("\t");
//获取电话号码
String phoneNum = sps[0];
//获取上传流量
upFlow = Long.parseLong(sps[1]);
//获取下载流量
downFlow = Long.parseLong(sps[2]);
//获取上传下载流量he
sumFlow = Long.parseLong(sps[3]);
kBean.setDownFlow(downFlow);
kBean.setUpFlow(upFlow);
kBean.setSumFlow(sumFlow);
v.set(phoneNum);
flowMap.put(kBean, v);
if (flowMap.size() > 10) {
flowMap.remove(flowMap.lastKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator bean = flowMap.keySet().iterator();
while (bean.hasNext()) {
FlowBean k = bean.next();
context.write(k, flowMap.get(k));
}
}
}
[/successbox]
[successbox title="ruaReducer"]
package com.kami.demo04;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/16
*/
public class ruaReducer extends Reducer {
TreeMap flowMap = new TreeMap<>();
@Override
protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
FlowBean bean = new FlowBean();
bean.setDownFlow(key.getDownFlow());
bean.setUpFlow(key.getUpFlow());
bean.setSumFlow(key.getSumFlow());
flowMap.put(bean, new Text(value));
if (flowMap.size() > 10) {
flowMap.remove(flowMap.lastKey());
}
}
}
@Override
protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
System.out.println("Reduce cleanup ");
Iterator it = flowMap.keySet().iterator();
while (it.hasNext()) {
FlowBean v = it.next();
context.write(new Text(flowMap.get(v)), v);
}
}
}
[/successbox]
[/infobox]
推荐阅读1:https://www.cnblogs.com/yinzhengjie2020/p/12528395.html
推荐阅读2:https://www.cnblogs.com/yinzhengjie2020/p/12520516.html
推荐阅读3:https://blog.csdn.net/weixin_35353187/article/details/82025204
Comments NOTHING