mapreduce高级案例⑥

过滤日志自定义日志输出路径(自定义OutputFormat)

[infobox title=”需求分析”]
过滤输入的log日志中是否包含xyg
(1)包含xyg的网站输出到/output/xyg.log
(2)不包含xyg的网站输出到/output/other.log
[collapse title=”数据”]

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.xyg.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

[/collapse]
[collapse title=”输出结果”]

[/collapse]
[/infobox]
[collapse title=”ruaDriver”]

package com.kami.demo01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaDriver {


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(ruaDriver.class);
        job.setMapperClass(ruaMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //将自定义OutputFormat设置到job中
        job.setOutputFormatClass(FilterOutputFormat.class);

//        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job,"data\\d01");

        // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
        // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path("output\\d01"));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

[/collapse]
[collapse title=”ruaMapper”]

package com.kami.demo01;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class ruaMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new Text(value.toString()+"\r\n"), NullWritable.get());
    }
}

[/collapse]
[collapse title=”FilterOutputFormat”]

package com.kami.demo01;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class FilterOutputFormat extends FileOutputFormat {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // 创建一个RecordWriter
        return new FilterRecordWriter(job);
    }
}

[/collapse]
[collapse title=”FilterRecordWriter”]

package com.kami.demo01;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

/**
 * @version v 1.0
 * @Author kamisamak
 * @Date 2020/6/16
 */
public class FilterRecordWriter extends RecordWriter {
    FSDataOutputStream atguiguOut = null;
    FSDataOutputStream otherOut = null;

    public FilterRecordWriter(TaskAttemptContext job) {
        // 获取文件系统
        FileSystem fs;
        try {
            fs = FileSystem.get(job.getConfiguration());

            // 创建输出文件路径
            Path atguiguPath = new Path("output/d01/xyg.log");
            Path otherPath = new Path("output/d01/other.log");

            // 创建输出流
            atguiguOut = fs.create(atguiguPath);
            otherOut = fs.create(otherPath);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        // 判断是否包含“xyg”输出到不同文件
        if (text.toString().contains("xyg")) {
            atguiguOut.write(text.toString().getBytes());
        } else {
            otherOut.write(text.toString().getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        // 关闭资源
        if (atguiguOut != null) {
            atguiguOut.close();
        }
        if (otherOut != null) {
            otherOut.close();
        }
    }
}

[/collapse]
推荐参考:https://www.blog.kamisamak.com/index.php/2019/11/19/hadoop-mapreduce自定义outputformat输出

案例来源:https://www.cnblogs.com/frankdeng/p/9256215.html


已发布

分类

, ,

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注