倒排索引案例(多Job串联)
有大量的文本(文档、网页),需要建立搜索索引
[successbox title="第一次处理"]
第一次处理输出
[collapse title="ruaDriver01"]
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @version v 1.0
* @Author Toenc
* @Date 2020/6/15
*/
public class ruaDriver01 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// int run = ToolRunner.run(new Configuration(), new ruaDriver01(), args);
String[] arr = {"wula"};
int run = ToolRunner.run(new Configuration(), new ruaDriver01(), arr);
System.out.println(run);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration(), args[0]);
job.setJarByClass(ruaDriver02.class);
job.setCombinerClass(ruaCombiner01.class);
// job.setPartitionerClass(ruaPartition.class);
job.setMapperClass(ruaMapper01.class);
job.setReducerClass(ruaReduce01.class);
// FileInputFormat.addInputPath(job,new Path("data\\d01"));
// FileOutputFormat.setOutputPath(job, new Path("output\\d01"));
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("data\\d01"));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("output\\d01"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.waitForCompletion(true);
return job.waitForCompletion(true) ? 0 : 1;
}
}
[/collapse]
[collapse title="ruaMapper01"]
package com.kami.demo01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @version v 1.0
* @Author Toenc
* @Date 2020/6/15
*/
public class ruaMapper01 extends Mapper<LongWritable, Text, Text, IntWritable> {
String filename;
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
filename = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// //获取切片名称
// String filename;
// FileSplit fileSplit = (FileSplit) context.getInputSplit();
// filename = fileSplit.getPath().getName();
String[] sps = value.toString().split("\t");
for (String sp : sps) {
k.set(filename+"_"+sp);
v.set(1);
context.write(k, v);
}
}
}
[/collapse]
[collapse title="ruaCombiner01"]
package com.kami.demo01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @version v 1.0
* @Author Toenc
* @Date 2020/6/15
*/
public class ruaCombiner01 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
IntWritable v = new IntWritable();
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
[/collapse]
[collapse title="ruaReduce01"]
package com.kami.demo01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @version v 1.0
* @Author Toenc
* @Date 2020/6/15
*/
public class ruaReduce01 extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
IntWritable v = new IntWritable();
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
[/collapse]
[/successbox]
[successbox title="第二次处理"]
第二次处理输出
[collapse title="ruaDriver02"]
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class ruaDriver02 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(ruaDriver02.class);
job.setMapperClass(ruaMapper02.class);
job.setReducerClass(ruaReduce02.class);
FileInputFormat.addInputPath(job,new Path("output\\d01\\part-r-00000"));
FileOutputFormat.setOutputPath(job, new Path("output\\d02"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// System.exit(job.waitForCompletion(true) ? 0 : 1);
job.waitForCompletion(true);
}
}
[/collapse]
[collapse title="ruaMapper02"]
package com.kami.demo01;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class ruaMapper02 extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] sps = line.split("_");
k.set(sps[0]);
v.set(sps[1]);
context.write(k,v);
}
}
[/collapse]
[collapse title="ruaReduce02"]
package com.kami.demo01;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class ruaReduce02 extends Reducer<Text, Text, Text, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value.toString().replace("\t", "->") + "\t");
}
v.set(sb.toString());
context.write(key, v);
}
}
[/collapse]
[/successbox]
MapReduce 多 Job 串联
一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/15
*/
public class ruaDriver {
public static void main(String[] args) throws IOException, InterruptedException {
Configuration configuration1 = new Configuration();
Configuration configuration2 = new Configuration();
Job job1 = Job.getInstance(configuration1);
Job job2 = Job.getInstance(configuration2);
job1.setCombinerClass(ruaCombiner01.class);
job1.setMapperClass(ruaMapper01.class);
job1.setReducerClass(ruaReduce01.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
Path inputPath = new Path("data\\d01");
Path outputPath = new Path("output\\d03");
FileInputFormat.setInputPaths(job1, inputPath);
FileOutputFormat.setOutputPath(job1, outputPath);
job2.setMapperClass(ruaMapper02.class);
job2.setReducerClass(ruaReduce02.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
Path inputPath2 = new Path("output\\d03");
Path outputPath2 = new Path("output\\d04");
FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);
JobControl control = new JobControl("wula");
ControlledJob aJob = new ControlledJob(job1.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
// 设置作业依赖关系
bJob.addDependingJob(aJob);
control.addJob(aJob);
control.addJob(bJob);
Thread thread = new Thread(control);
thread.start();
while(!control.allFinished()) {
thread.sleep(1000);
}
System.exit(0);
}
}
文章评论