MapReduce高级案例 ①

kamisamak 发布于 2020-06-15 1497 次阅读


[infobox title="倒排索引案例(多Job串联)"]

有大量的文本(文档、网页),需要建立搜索索引

[successbox title="第一次处理"]

第一次处理输出

ruaDriver01 展开 / 收起
ruaMapper01 展开 / 收起
ruaCombiner01 展开 / 收起
ruaReduce01 展开 / 收起

[/successbox]
[successbox title="第二次处理"]
第二次处理输出

ruaDriver02 展开 / 收起

ruaMapper02 展开 / 收起

ruaReduce02 展开 / 收起

[/successbox]
[/infobox]

[infobox title="MapReduce 多 Job 串联"]

一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现

package com.kami.demo01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/15
 */
public class ruaDriver {
    public static void main(String[] args) throws IOException, InterruptedException {
        Configuration configuration1 = new Configuration();
        Configuration configuration2 = new Configuration();

        Job job1 = Job.getInstance(configuration1);
        Job job2 = Job.getInstance(configuration2);

        job1.setCombinerClass(ruaCombiner01.class);
        job1.setMapperClass(ruaMapper01.class);
        job1.setReducerClass(ruaReduce01.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(IntWritable.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);

        Path inputPath = new Path("data\\d01");
        Path outputPath = new Path("output\\d03");

        FileInputFormat.setInputPaths(job1, inputPath);
        FileOutputFormat.setOutputPath(job1, outputPath);

        job2.setMapperClass(ruaMapper02.class);
        job2.setReducerClass(ruaReduce02.class);

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        Path inputPath2 = new Path("output\\d03");
        Path outputPath2 = new Path("output\\d04");

        FileInputFormat.setInputPaths(job2, inputPath2);
        FileOutputFormat.setOutputPath(job2, outputPath2);

        JobControl control = new JobControl("wula");

        ControlledJob aJob = new ControlledJob(job1.getConfiguration());
        ControlledJob bJob = new ControlledJob(job2.getConfiguration());
        // 设置作业依赖关系
        bJob.addDependingJob(aJob);

        control.addJob(aJob);
        control.addJob(bJob);

        Thread thread = new Thread(control);
        thread.start();

        while(!control.allFinished()) {
            thread.sleep(1000);
        }
        System.exit(0);
    }
}

[/infobox]