在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求。
需求:根据数据的不同输出两类结果到不同目录
①在mapreduce中访问外部资源
②自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
[warningbox title="OutputFormat"]
MapReduce中Reducer阶段的数据输出是由OutputFormat决定的,决定数据的输出目的地和job的提交对象,我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容,我们可以看到除了实现OutputFormat抽象类以外,我们还需要自定义RecordWriter和自定义OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputcommitter对象;RecordWriter类是具体的定义如何将数据写到目的地的。
public abstract class OutputFormat<K, V> {
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象
public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象
}
[/warningbox]
[warningbox title="RecordWriter"]
查看RecordWriter源码,我们可以看到主要需要实现的有下列三个方法,分别是:
public abstract class RecordWriter<K, V> {
public abstract void write(K key, V value) throws IOException, InterruptedException; // 具体的写数据的方法
public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源
}
[/warningbox]
[warningbox title="①自定义outputformat"]
package com.kami.demo02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/20
*/
public class ruaOutputFormat extends FileOutputFormat {
@Override
public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
FileSystem fileSystem = FileSystem.get(conf);
FSDataOutputStream fsDataOutputStream1 = fileSystem.create(new Path("C:\\Users\\kami\\Desktop\\a.txt"));
FSDataOutputStream fsDataOutputStream2 = fileSystem.create(new Path("C:\\Users\\kami\\Desktop\\b.txt"));
ruaRecordReader ruaRecordReader = new ruaRecordReader(fsDataOutputStream1, fsDataOutputStream2);
return ruaRecordReader;
}
}
[/warningbox]
[warningbox title="②自定义RecordWriter"]
package com.kami.demo05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/19
*/
public class ruaRecordReader extends RecordReader<nullwritable, byteswritable=""> {
private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable bytesWritable = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
configuration = taskAttemptContext.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
Path path = fileSplit.getPath();
FileSystem fileSystem = FileSystem.get(configuration);
byte[] bytes = new byte[(int) fileSplit.getLength()];
FSDataInputStream fis = fileSystem.open(path);
IOUtils.readFully(fis, bytes, 0, bytes.length);
bytesWritable.set(bytes, 0, bytes.length);
IOUtils.closeStream(fis);
processed = true;
return processed;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 0 : 1;
}
@Override
public void close() throws IOException {
}
}
[/warningbox]
[collapse title="map"]
package com.kami.demo01;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/19
*/
public class ruaMap extends Mapper<LongWritable,Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
[/collapse]
[collapse title="driver"]
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/19
*/
public class ruaDriver extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job= Job.getInstance(super.getConf(),"rua");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D:\\vedio\\2019\\11月\\11.14\\22\\5\\自定义outputformat\\input"));
job.setMapperClass(ruaMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(ruaOutputFormat.class);
ruaOutputFormat.setOutputPath(job, new Path("C:\\Users\\kami\\Desktop\\aaa"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new ruaDriver(), args);
System.exit(run);
}
}
[/collapse]
文章评论