目录
一、mapreduce概述
1. 1 mapreduce 介绍
mapreduce思想在生活中处处可见。mapreduce 的思想核心是“分而治之”,适用于大规模数据处理场景。
- map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- reduce负责“合”,即对map阶段的结果进行全局汇总。
- mapreduce运行在yarn集群。
这两个阶段合起来正是mapreduce思想的体现。
1.2 mapreduce 定义
mapreduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:
(1)mapreduce是一个基于集群的高性能并行计算平台(cluster infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
(2)mapreduce是一个并行计算与运行软件框架(software framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
(3)mapreduce是一个并行程序设计模型与方法(programming model & methodology)。它借助于函数式程序设计语言lisp的设计思想,提供了一种简便的并行程序设计方法,用map和reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 [百度百科] 。
1.3 mapreduce优缺点
1.2.1.优点
(1)mapreduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的pc机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得mapreduce编程变得非常流行。
(2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性
mapreduce设计的初衷就是使程序能够部署在廉价的pc机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由hadoop内部完成的。
(4)适合pb级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1.2.2.缺点
(1)不擅长实时计算
mapreduce无法像mysql一样,在毫秒或者秒级内返回结果,更多的适合离线或者t+1的任务。
(2)不擅长流式计算
流式计算的输入数据是动态的, 如flink或者spark streaming,而mapreduce的输入数据集是静态的,不能动态变化。这是因为mapreduce自身的设计特点决定了数据源必须是静态的。
(3)不擅长dag(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,mapreduce并不是不能做,而是使用后,每个mapreduce作业的输出结果都会写入到磁盘,会造成大量的磁盘io,导致性能非常的低下。
1.4 mapreduce框架结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
- mr appmaster:负责管理mr作业的生命周期及状态协调, 一般指的是yarn中appmaster,针对mapreduce计算框架就是mr appmaster,它使得mapreduce计算框架可以运行与yarn之上;
- maptask:负责map阶段的整个数据处理流程;
- reducetask:负责reduce阶段的整个数据处理流程。
二、wordcount 案例
数据格式准备如下:
vim wordcount.txt
hello i am ok
hadoop hadoop
hello world
hello flume
hadoop hive
hive kafka
flume storm
hive oozie
hadoop hbase
hadoop flink
hive azkaban
将数据上传到hdfs
hdfs dfs -mkdir -p /kangll/workcount
hdfs dfs -put wordcount.txt /kangll/workcount
代码示例
package com.kangna.mapreducer;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import java.io.ioexception;
/********************************
* @author: kangna
* @date: 2020/1/25 11:14
* @version: 1.0
* @desc:
********************************/
public class wordcountmain {
public static class wordcountmapper extends mapper<longwritable, text, text, intwritable> {
private text word = new text();
private intwritable one = new intwritable(1);
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
// 取到一行个数据
string line = value.tostring();
// 按照空格切分
string[] words = line.split(" ");
// 遍历数据
for (string word : words) {
this.word.set(word);
context.write(this.word, this.one);
}
}
}
public static class wordcountreducer extends reducer<text, intwritable, text, intwritable > {
private intwritable total = new intwritable();
@override
protected void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception {
// 作累加
int sum = 0;
for (intwritable value : values) {
sum += value.get();
}
// 包装 结构并输出
total.set(sum);
context.write(key, total);
}
}
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
// 1. 获取一个 job 实例
job job = job.getinstance(new configuration());
// 2. 设置 类的路径
job.setjarbyclass(wordcountmain.class);
// 3. 设置 mapper 和 reducer
job.setmapperclass(wordcountmapper.class);
job.setreducerclass(wordcountreducer.class);
// 4. 设置 mapper 和 reducer 的输出类型
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(intwritable.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(intwritable.class);
// 5. 设置输入输出数据
fileinputformat.setinputpaths(job, new path(args[0]));
fileoutputformat.setoutputpath(job, new path(args[1]));
// 6. 提交job
boolean b = job.waitforcompletion(true);
system.exit(b ? 0 : 1);
}
}
打包在集群中运行。
三、mapreduce的运行机制详解
3.1 maptask 工作机制
map阶段流程大体如上图
简单概述:inputfile 通过 split 被逻辑切分为多个split文件,通过record按行读取内容给 map(用户自己实现的)进行处理,数据被 map 处理结束之后交给 outputcollector 收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
详细步骤:
- split阶段:读取数据组件 inputformat (默认 textinputformat) 会通过 getsplits 方法对输入目录中文件进行逻辑切片规划得到 block, 有多少个 block就对应启动多少个 maptask
- read阶段:将输入文件切分为 block 之后, 由 recordreader 对象 (默认是linerecordreader) 进行读取, 以 \n 作为分隔符, 读取一行数据, 返回 <key,value>. key 表示每行首字符偏移值, value 表示这一行文本内容
- map阶段:读取 block 返回 <k ey,value>, 进入用户自己继承的 mapper 类中,执行用户重写的 map 函数, recordreader 读取一行这里调用一次
- collection收集阶段: mapper 逻辑结束之后, 将 mapper 的每条结果通过 context.write 进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 hashpartitioner
5. 接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集 mapper 结果, 减少磁盘 io 的影响. 我们的 <key,value> 对以及 partition 的结果都会被写入缓冲区. 当然, 写入之前,key 与 value 值都会被序列化成字节数组
6. spill阶段:即“溢写”,当环形缓冲区满后,mapreduce会将数据写到本地磁盘上,生成一个临时文件。当溢写线程启动后, 将数据写入本地磁盘之前,需要对这 80mb 空间内的 key 做排序 (sort). 排序是 mapreduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序。
如果 job 设置过 combiner, 会将有相同 key 的 <key, value> 对的 value 合并在起来, 减少溢写到磁盘的数据量。 combiner 会优化 mapreduce 的中间结果, combiner 的输出是 reducer 的输入, combiner 绝不能改变最终的计算结果。 combiner 只应该用于那种 reduce 的输入 <key, value> 与输出 <key, value> 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等。
7. merge阶段 : 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 combiner),如果 mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量
【maptask的一些基础设置配置】
配置 | 默认值 | 解释 |
mapreduce.task.io.sort.mb | 100 | 设置环型缓冲区的内存值大小 |
mapreduce.map.sort.spill.percent | 0.8 | 设置溢写的比例 |
mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢写数据目录 |
mapreduce.task.io.sort.factor | 10 | 设置一次合并多少个溢写文件 |
3.2 reducetask 工作机制
简单概述:reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventfetcher 来获取已完成的 map 列表,由 fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inmemorymerger 和 ondiskmerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalmerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理。
详细步骤
- copy阶段:拉取数据。reduce进程启动一些数据copy线程(fetcher),通过http方式请求maptask获取属于自己的文件。
- merge阶段:在远程拷贝数据的同时,reducetask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
- sort阶段:把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- reduce阶段:键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到hdfs文件中。
3.3 shuffle 过程
shuffle 是 mapreduce 的核心,它分布在 mapreduce 的 map 阶段和 reduce 阶段。一般把从 map 产生输出开始到 reduce 取得数据作为输入之前的过程称作 shuffle。
- collect阶段:将 maptask 的结果输出到默认大小为 100m 的环形缓冲区,保存的是 key/value,partition 分区信息等。
- spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。
- merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个 maptask 最终只产生一个中间数据文件。
- copy阶段:reducetask 启动 fetcher 线程到已经完成 maptask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
- merge阶段:在 reducetask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
- sort阶段:在对数据进行合并的同时,会进行排序操作,由于 maptask 阶段已经对数据进行了局部的排序,reducetask 只需保证 copy 的数据的最终整体有效性即可。
shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100m
参考文档:
c大数据计算引擎mapreduce框架详解 | 大数据技术分享
mapreduce的shuffle过程详解(分片、分区、合并、归并。。。)_mapreduce的shuffle流程_asn_forever的博客-csdn博客
发表评论