MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce大体上分三个部分:
- MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
- MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild
- ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild
成都创新互联长期为成百上千客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为太原企业提供专业的成都网站设计、网站制作,太原网站改版等技术服务。拥有十载丰富建站经验和众多成功案例,为您定制开发。
public class MyWordCount {
public static void main(String[] args) {
// 指定 hdfs 相关的参数
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
// 新建一个 job 任务
Job job=Job.getInstance(conf);
// 设置 jar 包所在路径
job.setJarByClass(MyWordCount.class);
// 指定 mapper 类和 reducer 类
job.setMapperClass(Mapper.class);
job.setReducerClass(MyReduce.class);
// 指定 maptask 的输出类型,注意,如果maptask的输出类型与reducetask输出类型一样,mapTask可以不用设置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定 reducetask 的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定该 mapreduce 程序数据的输入和输出路径
Path input=new Path("/data/input");
Path output =new Path("/data/output");
//一定要保证output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //递归删除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
// 最后提交任务
boolean success = job.waitForCompletion(true);
System.exit(success?0:-1);
} catch (Exception e) {
e.printStackTrace();
}
}
private class MyMapper extends Mapper{
Text mk =new Text();
IntWritable mv=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对
String[] words = value.toString().split("\\s+");
for(String word:words){
mk.set(word);
context.write(mk,mv);
}
}
}
private class MyReduce extends Reducer {
IntWritable mv=new IntWritable();
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int sum=0;
// 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计
for(IntWritable value:values){
sum+=value.get();
}
mv.set(sum);
context.write(key,mv);
}
}
}
一个 job 的 map 阶段并行度由客户端在提交 job 时决定,客户端对 map 阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个 split),然后每一个 split 分配一个 mapTask 并行实例处理。这段逻辑及形成的切片规划描述文件,是由FileInputFormat实现类的getSplits()方法完成的,小编后续会对MPjob提交过程的源码进行详细的分析。
决定map task的个数主要由这几个方面:
-文件的大小
- 文件的个数
- block的大小
- 逻辑切片的大小
总的来说就是,当对文件进行逻辑划分的时候,默认的划分规则就是一个split和一个block的大小一样,如果文件没有到一个block大小,也会被切分出来一个split,这里有调优点,就是如果处理的文件都是小文件的话,那么机会并行很多的maptask,导致大量的时间都浪费在了启动jvm上,此时可以通过合并小文件或者重用jvm的方式提高效率。
逻辑切片机制:
long splitSize = computeSplitSize(blockSize, minSize, maxSize)
blocksize:默认是 128M,可通过 dfs.blocksize 修改
minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默认是 Long.MaxValue,可通过mapreduce.input.fileinputformat.split.maxsize 修改
因此,如果是想调小split的大小,那么就将 maxsize调整到比block小。
如果是想调大split的大小,那么就将minSize调整到比block大。
reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);,默认是1个,如果设置为0个表示没有reduce阶段,当然也可以设置多个,根据需求,如果有些需要全局计数的操作,那么只能设置1个reduce,有些可以设置多个reducetask的,千万不要设置太多,最好设置的和分区的个数能一一对应,不然的会就会有一些reduceTask空跑,导致了不能处理业务而且还占用系统资源。