大数据理论篇 | 分布式计算框架(Spark篇)
Spark
(一) Spark简介
a) 产生背景
- MapReduce有较大局限性
- 仅支持Map、Reduce两种语义操作
- 执行效率低,时间开销大
- 主要用于大规模离线批处理
- 不适合迭代计算、交互式计算、实时流处理等场景
- 计算框架种类多,选型难,学习成本高
- 批处理:MapReduce
- 流计算:Storm、Flink
- 交互式计算:Impala、Presto
- 机器学习:Mahout
- 统一计算框架,简化技术选型
- 在一个统一框架下,实现批处理、流处理、交互式计算、机器学习
b) 概念
- 由加州大学伯克利分校的AMP实验室开源
- 大规模分布式通用计算引擎
- Spark Core:核心计算框架
- Spark SQL:结构化数据查询
- Spark Streaming:实时流处理
- Spark MLib:机器学习
- Spark GraphX:图计算
- 具有高吞吐、低延时、通用易扩展、高容错等特点
- 采用Scala语言开发
- 提供多种运行模式
c) 特点
- 计算高效
- 利用内存计算、Cache缓存机制,支持迭代计算和数据共享,减少数据读取的IO开销
- 利用DAG引擎,减少中间计算结果写入HDFS的开销
- 利用多线程池模型,减少任务启动开销,避免Shuffle中不必要的排序和磁盘IO操作
- 不同于MapReduce将中间计算结果放入磁盘中,Spark采用内存存储中间计算结果,减少了迭代运算的磁盘IO,并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。内存计算下,Spark 比 MapReduce 快100倍。
- 通用易用
- 适用于批处理、流处理、交互式计算、机器学习算法等场景
- 提供了丰富的开发API,支持Scala、Java、Python、R等
- Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
- 不同于MapReduce仅支持Map和Reduce两种编程算子,Spark提供了超过80种不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小
- 运行模式多样
- Local模式
- Standalone模式
- YARN/Mesos模式
- 兼容性
- Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。
(二) Spark原理
a) 基本概念
- RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
- DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系。
- Driver Program:控制程序,负责为Application构建DAG图。
- Cluster Manager:集群资源管理中心,负责分配计算资源。
- Worker Node:工作节点,负责完成具体计算。
- Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行Task,并为应用程序存储数据。
- Application:用户编写的Spark应用程序,一个Application包含多个Job。
- Job:作业,一个Job包含多个RDD及作用于相应RDD上的各种操作。
- Stage:阶段,是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”。
- Task:任务,是运行在Executor上的工作单元,是Executor中的一个线程。
总结:Application由多个Job组成,Job由多个Stage组成,Stage由多个Task组成。Stage是作业调度的基本单位。
b) 架构设计
- Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。
c) Spark运行流程
- Application首先被Driver构建DAG图并分解成Stage
- 然后Driver向Cluster Manager申请资源
- Cluster Manager向某些Work Node发送征召信号
- 被征召的Work Node启动Executor进程响应征召,并向Driver申请任务
- Driver分配Task给Work Node。
- Executor以Stage为单位执行Task,期间Driver进行监控。
- Driver收到Executor任务完成的信号后向Cluster Manager发送注销信号。
- Cluster Manager向Work Node发送释放资源信号。
- Work Node对应Executor停止运行。
Spark运行架构特点:
- 每个Application都有自己专属的Executor进程,并且该进程在Application运行期间一直驻留。Executor进程以多线程的方式运行Task
- Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可
- Task采用了数据本地性和推测执行等优化机制
c) 编程模型
RDD数据结构
- RDD全称弹性分布式数据集(Resilient Distributed Datasets)
- 分布在集群中的只读对象集合,是记录的只读分区集合
- 由多个Partition组成
- 通过转换操作构造
- 失效后自动重构(弹性)
- 存储在内存或磁盘中
- Spark基于RDD进行计算,RDD是Spark的基本数据结构
创建RDD的方式
- 读取文件中的数据生成RDD
- 将内存中的对象并行化得到RDD
- 可以通过已有的RDD转换产生新的RDD
RDD操作类型(Operator)
- Transformation(转换)操作
- 将Scala集合或Hadoop输入数据构造成一个新的RDD
- 通过已有的RDD产生新的RDD
- 惰性执行:只记录转换关系,不触发计算
- 例如:map、filter、flatmap、union、distinct、sortbykey
- Action(动作)操作
- 通过RDD计算得到一个值或一组值
- 真正触发SparkContext提交Job作业,进行计算,返回结果到Driver
- 例如:first、count、collect、foreach、saveAsTextFile
Transoformation操作有lazy特性,即Spark不会立刻进行实际的计算,智慧记录执行的轨迹,只有触发Action操作时,它才会根据DAG图真正执行
示例:RDD的两种操作
- rdd1.map(_ ,+1).saveAsTextFile(“hdfs://node01:9000”)
RDD依赖(Dependency)
- 窄依赖(Narrow Dependency)
- 父RDD中的分区最多只能被一个子RDD的一个分区使用
- 子RDD如果有部分分区数据丢失或损坏,只需从对应的父RDD重新计算恢复
- 例如:map、filter、union
- 宽依赖(Wide/Shuffle Dependency)
- 子RDD分区依赖父RDD的所有分区
- 宽依赖关系相关的操作一般具有shuffle过程,即通过一个partitioner函数将父RDD中每个分区上key不同的记录分发到不同的子RDD分区
- 子RDD的部分或全部分区数据丢失或损坏,从所有父RDD分区重新计算,必须进行Shuffle
- 相对于窄依赖,宽依赖付出的代价要高很多,尽量避免使用
- 例如:groupByKey、reduceByKey、sortByKey
依赖关系确定了DAG切分成Stage的方式
- 切割规则:从后往前,遇到宽依赖就切割Stage。
示例:WordCount
1 | val rdd1 = sc.textFile(“hdfs://node01:9000/data/wc/in”) |
d) 运行模式
抽象模式
- Driver
- 一个Spark程序有一个Driver,一个Driver创建一个SparkContext,程序的main函数运行在Driver中
- 负责解析Spark程序、划分Stage、调度任务到Executor上执行
- SparkContext
- 负责加载配置信息,初始化运行环境,创建DAGScheduler和TaskScheduler
- Executor
- 负责执行Driver分发的任务,一个节点可以启动多个Executor,每个Executor通过多线程运行多个任务
- Task
- Spark运行的基本单位,一个Task负责处理若干RDD分区的计算逻辑
Local模式
- 单机本地运行,通常用于测试
- Spark程序以多线程方式直接运行在本地
Standalone模式
- Spark集群独立运行,使用Spark自带的集群管理器,后台只能运行Spark任务。不依赖于第三方资源管理系统,如:YARN、Mesos
- 采用Master/Slave架构
- Driver在Worker中运行,Master只负责集群管理
- ZooKeeper负责Master HA,避免单点故障
- 适用于集群规模不大,数据量不大的情况
YARN模式
- Hadoop集群管理器,部署后可以同时运行MR、Spark、Storm、HBase等各种任务
YARN模式: - YARN-Client模式:适用于交互和调试
- YARN-Cluster模式:适用于生产环境
Mesos模式
- 与Yarn最大的不同是Mesos 的资源分配是二次的,Mesos负责分配一次,计算框架可以选择接受或者拒绝。
e) 运行过程
- 生成逻辑计划
- 生成物理计划
- 任务调度与执行
f) DAG任务规划与调度
- DAG(Directed Acyclic Graph)
- 有向无环图:一个有向图无法从任意顶点出发经过若干条边回到该点
- 受制于某些任务必须比另一些任务较早执行的约束,可排序为一个队列的任务集合,该队列可由一个DAG图呈现
- Spark程序的内部执行逻辑可由DAG描述,顶点代表任务,边代表任务间的依赖约束
- DAG反映了RDD之间的依赖关系
- DAGScheduler
- 根据任务的依赖关系建立DAG
- 根据依赖关系是否为宽依赖,即是否存在Shuffle,将DAG划分为不同的阶段(Stage),如果是窄依赖就将它们放在同一个stage里,遇到宽依赖就断开划分为另一个stage
- 将各阶段中的Task组成的TaskSet提交到TaskScheduler
- TaskScheduler
- 负责Application的任务调度
- 重新提交失败的Task
- 为执行速度慢的Task启动备用Task
总结:RDD之间的依赖关系形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。
(三) 任务监控
- Web监控:端口4040(以inceptor为例)
参考资料:
[1]30分钟理解Spark的基本原理