WordCount案例
数据
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
hello world
dog fish
hadoop
spark
统计一堆文件中单词出现的个数(WordCount案例)
在一堆给定的文本文件中统计输出每一个单词出现的总次数
[collapse title="ruaDriver"]
package com.kami.demo04;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/17
*/
public class ruaDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//配置提交到yarn上运行,windows和Linux变量不一致
//configuration.set("mapreduce.framework.name", "yarn");
//configuration.set("yarn.resourcemanager.hostname", "node22");
Job job = Job.getInstance(configuration);
//指定本程序的jar包所在的本地路径
//job.setJar("/home/admin/wc.jar");
job.setJarByClass(ruaDriver.class);
job.setMapperClass(ruaMapper.class);
job.setCombinerClass(ruaCombiner.class);
job.setReducerClass(ruaReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
//job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
static class ruaMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] sps = value.toString().split(" ");
for (String sp : sps) {
context.write(new Text(sp), new IntWritable(1));
}
}
}
static class ruaCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
static class ruaReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count+=value.get();
}
context.write(key,new IntWritable(count));
}
}
}
[/collapse]
把单词按照ASCII码奇偶分区(Partitioner)
[collapse title="ruaDriver"]
package com.kami.demo05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @version v 1.0
* @Author kamisamak
* @Date 2020/6/17
*/
public class ruaDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//配置提交到yarn上运行,windows和Linux变量不一致
//configuration.set("mapreduce.framework.name", "yarn");
//configuration.set("yarn.resourcemanager.hostname", "node22");
Job job = Job.getInstance(configuration);
//指定本程序的jar包所在的本地路径
//job.setJar("/home/admin/wc.jar");
job.setJarByClass(ruaDriver.class);
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
job.setMapperClass(ruaMapper.class);
job.setCombinerClass(ruaCombiner.class);
//job.setCombinerClass(ruaReducer.class);
job.setReducerClass(ruaReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
//job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
static class ruaMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] sps = value.toString().split(" ");
for (String sp : sps) {
context.write(new Text(sp), new IntWritable(1));
}
}
}
static class ruaCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
static class ruaReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count++;
}
context.write(key,new IntWritable(count));
}
}
static class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
String firWord = key.toString().substring(0, 1);
char[] charArray = firWord.toCharArray();
int result = charArray[0];
// int result = key.toString().charAt(0);
if (result % 2 == 0) {
return 0;
}else {
return 1;
}
}
}
}
[/collapse]
大量小文件的切片优化(CombineTextInputFormat)
将输入的大量小文件合并成一个切片统一处理。
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
案例来源:https://www.cnblogs.com/frankdeng/p/9311481.html
文章评论