MapReduce高级案例④

kamisamak 发布于 2020-06-15 1371 次阅读


TopN

输出流量使用量在前10的用户信息


输出 展开 / 收起

排序概述 展开 / 收起

[infobox title="code"]
[successbox title="FlowBean"]

package com.kami.demo04;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/15
 */
public class FlowBean implements WritableComparable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean(long upFlow, long downFlow, long sumFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }

    //定义一个空参构造器,反射时会用到
    public FlowBean() {
    }

    @Override
    public int compareTo(FlowBean obj) {
//        int result;
//        if (this.sumFlow > obj.getSumFlow()) {
//            result = -1;
//        } else if (this.sumFlow < obj.getSumFlow()) {
//            result = 1;
//        } else {
//            result = 0;
//        }
//        return result;
        return Long.compare(obj.sumFlow, this.sumFlow);
    }


    /**
     * 定义序列化方法
     * dataOutput:
     * 框架给我们提供的数据出口,我们通过该对象进行数据序列化操作。
     * 温馨提示:
     * 注意输出时的数据类型顺序。我们写入的顺序为upFlow,downFlow,sumFlow.
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    /**
     * 定义反序列化方法
     * dataInput:
     * 框架给我们提供的数据来源,我们通过该对象进行数据反序列化操作。
     * 温馨提示:
     * 注意输入的数据类型顺序要和输出时的数据类型要一一对应,即upFlow,downFlow,sumFlow。如果你不对应可能会导致最终的结果不正确哟~
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return "\t" + upFlow +
                "\t" + downFlow +
                "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

[/successbox]
[successbox title="ruaDriver"]

package com.kami.demo04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaDriver {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(ruaDriver.class);
        job.setMapperClass(ruaMapper.class);
        job.setReducerClass(ruaReducer.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        FileInputFormat.addInputPath(job, new Path("data\\d04\\rua.txt"));
        FileOutputFormat.setOutputPath(job, new Path("output\\d10"));
        job.setNumReduceTasks(1);
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

[/successbox]
[successbox title="ruaMapper"]

package com.kami.demo04;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/15
 */
public class ruaMapper extends Mapper {
    // 定义一个TreeMap作为存储数据的容器(天然按key排序)
    private TreeMap flowMap = new TreeMap();
    private FlowBean kBean;
    private Text v;
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        kBean = new FlowBean();
        v = new Text();
        String[] sps = value.toString().split("\t");
        //获取电话号码
        String phoneNum = sps[0];
        //获取上传流量
        upFlow = Long.parseLong(sps[1]);
        //获取下载流量
        downFlow = Long.parseLong(sps[2]);
        //获取上传下载流量he
        sumFlow = Long.parseLong(sps[3]);
        kBean.setDownFlow(downFlow);
        kBean.setUpFlow(upFlow);
        kBean.setSumFlow(sumFlow);
        v.set(phoneNum);
        flowMap.put(kBean, v);
        if (flowMap.size() > 10) {
            flowMap.remove(flowMap.lastKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Iterator bean = flowMap.keySet().iterator();
        while (bean.hasNext()) {
            FlowBean k = bean.next();
            context.write(k, flowMap.get(k));
        }
    }
}

[/successbox]
[successbox title="ruaReducer"]


package com.kami.demo04;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaReducer extends Reducer {

    TreeMap flowMap = new TreeMap<>();

    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            FlowBean bean = new FlowBean();
            bean.setDownFlow(key.getDownFlow());
            bean.setUpFlow(key.getUpFlow());
            bean.setSumFlow(key.getSumFlow());
            flowMap.put(bean, new Text(value));
            if (flowMap.size() > 10) {
                flowMap.remove(flowMap.lastKey());
            }
        }
    }

    @Override
    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
        System.out.println("Reduce cleanup  ");
        Iterator it = flowMap.keySet().iterator();
        while (it.hasNext()) {
            FlowBean v = it.next();
            context.write(new Text(flowMap.get(v)), v);
        }
    }
}

[/successbox]
[/infobox]
推荐阅读1:https://www.cnblogs.com/yinzhengjie2020/p/12528395.html
推荐阅读2:https://www.cnblogs.com/yinzhengjie2020/p/12520516.html
推荐阅读3:https://blog.csdn.net/weixin_35353187/article/details/82025204

此作者没有提供个人介绍。
最后更新于 2020-06-15