在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求。
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案, 将多个小文件合并成一个文件 SequenceFile.SequenceFile 里面存储着多个文件。存储的形式为文件名称为 key,文件内容为 value。
小文件的优化无非以下几种方式:
1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、在mapreduce处理时,可采用combineInputFormat提高效率
MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容,我们可以看到除了实现InputFormat抽象类以外,我们还需要自定义InputSplit和自定义RecordReader类,这两个类的主要作用分别是:split确定数据分片的大小以及数据的位置信息,recordReader具体的读取数据。
[warningbox title="InputFormat"]
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息
public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象
}
[/warningbox]
[warningbox title="自定义InputSplit"]
自定义InputSplit主要需要实现的方法有一下几个:
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小
public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息
}
[/warningbox]
[warningbox title="自定义RecordReader"]
自定义RecordReader的主要实现方法有一下几个:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空
public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 获取当然key
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 获取当然value
public abstract float getProgress() throws IOException, InterruptedException; // 获取进度信息
public abstract void close() throws IOException; // 关闭资源
}
[/warningbox]
[warningbox title="自定义InputFormat"]
package com.kami.demo01;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/18
*/
public class ruaFileInputFormat extends FileInputFormat {
//直接返回文件不可切割,保证一个文件是一个完整的一行
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
ruaRecordReader ruaRecordReader = new ruaRecordReader();
ruaRecordReader.initialize(inputSplit, taskAttemptContext);
return ruaRecordReader;
}
}
[/warningbox]
[warningbox title="自定义RecordReader"]
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/18
*/
public class ruaRecordReader extends RecordReader {
private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable bytesWritable = new BytesWritable();
private boolean processed = false;
/**
* 初始化
*
* @param inputSplit 封装了读取的文件内容
* @param taskAttemptContext 上下文对象
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
configuration = taskAttemptContext.getConfiguration();
}
//读取下一个文件
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
//获取文件路径
Path path = fileSplit.getPath();
//FileSystem读取文件
FileSystem fileSystem = FileSystem.get(configuration);
//初始化一个字节数组 长度为读取文件大小
byte[] bytes = new byte[(int) fileSplit.getLength()];
FSDataInputStream fis = null;
try {
//读取文件
fis = fileSystem.open(path);
//当文件输出流读取到bytes中
IOUtils.readFully(fis, bytes, 0, bytes.length);
//将字节数组转为byteWritable对象
bytesWritable.set(bytes, 0, bytes.length);
} finally {
IOUtils.closeStream(fis);
}
processed = true;
return processed;
}
return false;
}
//获取返回当前key值
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//获取返回value值
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
//获取进度
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
//关闭方法
@Override
public void close() throws IOException {
}
}
[/warningbox]
[collapse title="Map"]
package com.kami.demo01;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/18
*/
public class ruaMapper extends Mapper {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String name = fileSplit.getPath().getName();
context.write(new Text(name),value);
}
}
[/collapse]
[collapse title="Driver"]
package com.kami.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kami
* @Date 2019/11/18
*/
public class ruaDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration(),"rua");
job.setInputFormatClass(ruaFileInputFormat.class);
ruaFileInputFormat.addInputPath(job,new Path("D:\\vedio\\2019\\11月\\11.15\\考核内容.txt"));
job.setMapperClass(ruaMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,new Path("C:\\Users\\kami\\Desktop\\aaa"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
[/collapse]
文章评论