大数据理论篇 | 分布式计算框架(MapReduce篇)
MapReduce
(一) MapReduce简介
a) 起源
- 2004年10月Google发表了MapReduce论文
- 设计初衷:解决搜索引擎中大规模网页数据的并行处理
- Hadoop MapReduce是Google MapReduce的开源实现
- MapReduce是Apache Hadoop的核心子项目
b) 概念
- 面向批处理的分布式计算框架
- 一种编程模型:MapReduce程序被分为Map(映射)阶段和Reduce(化简)阶段
c) 核心思想
- 分而治之,并行计算
- 移动计算,而非移动数据
d) 特点
- 计算跟着数据走
- 良好的扩展性:计算能力随着节点数增加,近似线性递增
- 高容错
- 状态监控
- 适合海量数据的离线批处理
- 降低了分布式编程的门槛
e) 适用场景
- 数据统计,如:网站的PV(页面访问量)、UV(独立访问用户数)统计
- 搜索引擎构建索引
- 海量数据查询
- 复杂数据分析算法实现
f) 不适用场景
- OLAP
- 要求毫秒或秒级返回结果
- 流计算
- 流计算的输入数据集是动态的,而MapReduce是静态的
- DAG计算
- 多个任务之间存在依赖关系,后一个的输入是前一个的输出,构成有向无环图DAG
- 每个MapReduce作业的输出结果都会落盘,造成大量磁盘IO,导致性能非常低下
(二) MapReduce原理
a) 基本概念
Job & Task(作业与任务)
- 作业是客户端请求执行的一个工作单元
- 包括输入数据、MapReduce程序、配置信息等
- 任务是将作业分解后得到的细分工作单元
- 分为Map任务和Reduce任务
Split(切片)
- 输入数据被划分成等长的小数据块,称为输入切片(Input Split),简称切片
- Split是逻辑概念,仅包含元数据信息,如:数据的起始位置、长度、所在节点等
- 每个Split交给一个Map任务处理,Split的数量决定Map任务的数量
- Split的大小默认等于Block大小
- Map任务会一行一行的读取某个block中的数据,为防止数据被切分,它还会多读取下一个block的第一行数据(HDFS中按照字节存储数据,所以极有可能一条数据被拆分存放在两个block中)
- Split划分方式由程序决定,Split与HDFS Block没有严格的对应关系
- Split越小,负载越均衡,但集群开销越大;Split越大,Map任务数少,集群的计算并发度降低
Map阶段(映射)
- 由若干Map任务组成,任务数量由Split数量决定
- 输入:Split切片(key-value),输出:中间计算结果(key-value)
Reduce阶段(化简)
- 由若干Reduce任务组成,任务数量由程序指定
- 输入:Map阶段输出的中间结果(key-value),输出:最终结果(key-value)
Shuffle阶段(混洗)
- Map、Reduce阶段的中间环节,是虚拟阶段
- 负责执行Partition(分区)、Sort(排序)、Spill(溢写)、Merge(合并)、Fetch(抓取)等工作
- Partition决定了Map任务输出的每条数据放入哪个分区,交给哪个Reduce任务处理
- Reduce任务的数量决定了Partition数量
- Partition编号=Reduce任务编号=”key hashcode % reduce task number”(%:取模/取余数)
- 哈希取模的作用:将一个数据集随机均匀分成若干个子集
- 避免和减少Shuffle是MapReduce程序调优的重点
b) 程序执行过程
假设MR处理的数据存储在HDFS上, HDFS上的数据是以block的形式存放。假设有三个Map任务和三个Reduce任务线程。
- Map任务线程执行流程
1.1. Map阶段
1.1.1 Map任务从HDFS中读取文件内容, HDFS中文件以Block块的形式存在。整个过程数据以键值对的形式传输,输入Map前的key为文件中一条数据的偏移量,value为这条数据的内容.
1.1.2 Map方法根据需求将传入的value进行切割,再根据需求设置输出类型的key和value
1.2. Shuffle Write阶段
1.2.1 partitioner分区(打标签),分区的目的就是让同一个分区的数据被同一个Reduce任务来处理。默认的分区器是HashPartitioner,它的分区策略为将经过Map处理后得到的key的hash值与Reduce任务的个数取模,模值相同的key将放在同一个分区中
1.2.2 向内存Buffer中写数据,将打上标签的Map输出的数据写入到内存的Buffer中,内存Buffer默认大小位100M。此时每一个数据记录都由三部分组成:分区号、key、value
1.2.3 当内存Buffer中写入的数据达到80M时,此时会将这80M的内存封锁,封锁后对内存中的数据进行聚合(Combiner),Combiner完成后再根据分区号排序,分区号排序完成后再按照key的大小排序(Sort)。此时这80M的数据中相同分区号的数据存放在一起,并且分区内的数据是有序的。
1.2.4 溢写(Spill),内存Buffer达到80M时,排序完成后对这80M数据进行溢写操作,即将数据写入磁盘,形成一个磁盘小文件,文件根据分区号划分并且内部有序。每次内存Buffer溢写都会产生这样一个小文件。
1.2.5 合并(Merge),所有溢写文件操作完毕后,会将磁盘小文件合并成一个大文件,合并时先对数据进行聚合(Combiner),然后再使用归并算法将各个小文件合并成一个根据分区号划分且内部有序的大文件,每一个Map任务都将产生这样一个大文件。
- Reduce任务线程执行流程
2.1 Shuffle Read阶段
2.1.1 读取Map任务阶段输出的大文件中对应分区的数据。将分区数据写入内存中,内存大小默认为1G,当内存中写入的数据达到溢写比例时进行数据溢写操作,这里内存的溢写过程与Map任务中Buffer溢写几乎一样,首先封锁数据,然后对数据排序,排序完成后将排好序的数据写入磁盘小文件中。
2.1.2. 当Map中对应分区的数据全部读取之后,Reduce任务会对所有的磁盘小文件进行归并算法合并,合并成一个内部有序的磁盘大文件,大文件中相同key值的数据为同一组数据。
2.2 Reduce阶段
2.2.1 遍历整个大文件,对同一组的数据调用一次reduce()方法,将输出的key-value结果写入output文件中。每个Reduce任务产生一个输出文件,输出文件不再合并。
从以上流程分析可以看出,一共进行了四次排序,而这四此排序的最终目的都是在于提高最终分组的效率。
MR核心思想:取模后结果相同的key为一组,调用一次reduce方法,方法内迭代这一组数据进行计算。
参考:
[1]MapReduce简介及执行流程
Shuffle阶段详解
- Map端
- Map任务将中间结果写入专用内存缓冲区Buffer(默认100M),同时进行Partition和Sort(先按“key hashcode % reduce task number”对数据进行分区,分区内再按key排序)
- 当Buffer的数据量达到阈值(默认80%)时,将数据溢写(Spill)到磁盘的一个临时文件中,文件内数据先分区后排序
- Map任务结束前,将多个临时文件合并(Merge)为一个Map输出文件,文件内数据先分区后排序
- Reduce端
- Reduce任务从多个Map输出文件中主动抓取(Fetch)属于自己的分区数据,先写入Buffer,数据量达到阈值后,溢写到磁盘的一个临时文件中
- 数据抓取完成后,将多个临时文件合并为一个Reduce输入文件,文件内数据按key排序
c) 作业运行模式
- JobTracker/TaskTracker模式(Hadoop 1.X)
- JobTracker节点(Master)
- 调度任务在TaskTracker上运行
- 若任务失败,指定新的TaskTracker重新运行
- TaskTracker节点(Slave)
- 执行任务,发送进度报告
- 此种模式存在的问题
- JobTracker存在单点故障
- JobTracker负载太重(上限4000节点)
- JobTracker缺少对资源的全面管理
- TaskTracker对资源的描述过于简单
- 源码很难理解
- YARN模式(Hadoop 2.X)
YARN运行流程:
1)用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
2)ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
3)ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4)~ 步骤7)。
4)ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
5)一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
6)NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
7)各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
8)应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。
从步骤3和步骤4得知,任务的分配,是由ResourceManager来决定的,由ApplicationMaster申请的。ResourceManager并不是根据舒俱来分配任务,而是根据整体的资源来分配。那么它分配的原则是什么?
首先我们需要知道有三种调度策略FIFO、CapacityScheduler、FairScheduler,每个调度策略又是不同的。因为任务分配也是不同的。但是有一个共同特性:计算本地性。
本地性有3个级别:NODE_LOCAL、RACK_LOCAL、OFF_SWITCH,分别代表同节点、同机架、跨机架。计算效率会依次递减。
因为HDFS的多副本,任务应该尽量在选择block所在的机器上执行,可以减少网络传输的消耗。如果开启了Short-Circuit Read特性,还可以直接读本地文件,提高效率。
scheduler能做的只是尽量满足NODE_LOCAL,尽量避免OFF_SWITCH。计算本地性更多的要AM端配合,当AM拿到资源后,优先分配给NODE_LOCAL的任务。
参考:
[2]MapReduce执行过程分析【问题】
(三) 作业管理
1)提交作业
- hadoop jar {jarFile} [mainClass] args
- jarFile: MapReduce运行程序的jar包
- mainClass: jar包中main函数所在类的类名
- args: 程序调用需要的参数,如:输入输出路径
2)查看作业
- sudo –u yarn application -list
3)终止作业
- sudo -u yarn application -kill {application_id}
4)作业监控
- Web监控:http://{AM_IP}:8088/proxy/{application_id}/
- 8088端口
5)作业诊断
- 配置参数:yarn.nodemanager.log-dirs
- MapReduce运行日志目录
- 默认值为/mnt/disk* /hadoop/yarn/
- 根据运行出错信息,可以到指定节点下分析日志