Spark

(一) Spark简介

a) 产生背景

  1. MapReduce有较大局限性
    • 仅支持Map、Reduce两种语义操作
    • 执行效率低,时间开销大
    • 主要用于大规模离线批处理
    • 不适合迭代计算、交互式计算、实时流处理等场景
  2. 计算框架种类多,选型难,学习成本高
    • 批处理:MapReduce
    • 流计算:Storm、Flink
    • 交互式计算:Impala、Presto
    • 机器学习:Mahout
  3. 统一计算框架,简化技术选型
    • 在一个统一框架下,实现批处理、流处理、交互式计算、机器学习

b) 概念

  • 由加州大学伯克利分校的AMP实验室开源
  • 大规模分布式通用计算引擎
    • Spark Core:核心计算框架
    • Spark SQL:结构化数据查询
    • Spark Streaming:实时流处理
    • Spark MLib:机器学习
    • Spark GraphX:图计算
  • 具有高吞吐、低延时、通用易扩展、高容错等特点
  • 采用Scala语言开发
  • 提供多种运行模式

c) 特点

  1. 计算高效
  • 利用内存计算、Cache缓存机制,支持迭代计算和数据共享,减少数据读取的IO开销
  • 利用DAG引擎,减少中间计算结果写入HDFS的开销
  • 利用多线程池模型,减少任务启动开销,避免Shuffle中不必要的排序和磁盘IO操作
  • 不同于MapReduce将中间计算结果放入磁盘中,Spark采用内存存储中间计算结果,减少了迭代运算的磁盘IO,并通过并行计算DAG图的优化,减少了不同任务之间的依赖,降低了延迟等待时间。内存计算下,Spark 比 MapReduce 快100倍。
  1. 通用易用
  • 适用于批处理、流处理、交互式计算、机器学习算法等场景
  • 提供了丰富的开发API,支持Scala、Java、Python、R等
  • Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
  • 不同于MapReduce仅支持Map和Reduce两种编程算子,Spark提供了超过80种不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小
  1. 运行模式多样
  • Local模式
  • Standalone模式
  • YARN/Mesos模式
  1. 兼容性
  • Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。

(二) Spark原理

a) 基本概念

  • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
  • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系。
  • Driver Program:控制程序,负责为Application构建DAG图。
  • Cluster Manager:集群资源管理中心,负责分配计算资源。
  • Worker Node:工作节点,负责完成具体计算。
  • Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行Task,并为应用程序存储数据。
  • Application:用户编写的Spark应用程序,一个Application包含多个Job。
  • Job:作业,一个Job包含多个RDD及作用于相应RDD上的各种操作。
  • Stage:阶段,是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”。
  • Task:任务,是运行在Executor上的工作单元,是Executor中的一个线程。

总结:Application由多个Job组成,Job由多个Stage组成,Stage由多个Task组成。Stage是作业调度的基本单位。

b) 架构设计

  • Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。

c) Spark运行流程

  1. Application首先被Driver构建DAG图并分解成Stage
  2. 然后Driver向Cluster Manager申请资源
  3. Cluster Manager向某些Work Node发送征召信号
  4. 被征召的Work Node启动Executor进程响应征召,并向Driver申请任务
  5. Driver分配Task给Work Node。
  6. Executor以Stage为单位执行Task,期间Driver进行监控。
  7. Driver收到Executor任务完成的信号后向Cluster Manager发送注销信号。
  8. Cluster Manager向Work Node发送释放资源信号。
  9. Work Node对应Executor停止运行。

Spark运行架构特点:

  • 每个Application都有自己专属的Executor进程,并且该进程在Application运行期间一直驻留。Executor进程以多线程的方式运行Task
  • Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可
  • Task采用了数据本地性和推测执行等优化机制

c) 编程模型

RDD数据结构

  • RDD全称弹性分布式数据集(Resilient Distributed Datasets)
    • 分布在集群中的只读对象集合,是记录的只读分区集合
    • 由多个Partition组成
    • 通过转换操作构造
    • 失效后自动重构(弹性)
    • 存储在内存或磁盘中
  • Spark基于RDD进行计算,RDD是Spark的基本数据结构

创建RDD的方式

  1. 读取文件中的数据生成RDD
  2. 将内存中的对象并行化得到RDD
  3. 可以通过已有的RDD转换产生新的RDD

RDD操作类型(Operator)

  1. Transformation(转换)操作
    • 将Scala集合或Hadoop输入数据构造成一个新的RDD
    • 通过已有的RDD产生新的RDD
    • 惰性执行:只记录转换关系,不触发计算
    • 例如:map、filter、flatmap、union、distinct、sortbykey
  2. Action(动作)操作
    • 通过RDD计算得到一个值或一组值
    • 真正触发SparkContext提交Job作业,进行计算,返回结果到Driver
    • 例如:first、count、collect、foreach、saveAsTextFile

Transoformation操作有lazy特性,即Spark不会立刻进行实际的计算,智慧记录执行的轨迹,只有触发Action操作时,它才会根据DAG图真正执行

示例:RDD的两种操作

  • rdd1.map(_ ,+1).saveAsTextFile(“hdfs://node01:9000”)

RDD依赖(Dependency)

  1. 窄依赖(Narrow Dependency)
  • 父RDD中的分区最多只能被一个子RDD的一个分区使用
  • 子RDD如果有部分分区数据丢失或损坏,只需从对应的父RDD重新计算恢复
  • 例如:map、filter、union
  1. 宽依赖(Wide/Shuffle Dependency)
  • 子RDD分区依赖父RDD的所有分区
  • 宽依赖关系相关的操作一般具有shuffle过程,即通过一个partitioner函数将父RDD中每个分区上key不同的记录分发到不同的子RDD分区
  • 子RDD的部分或全部分区数据丢失或损坏,从所有父RDD分区重新计算,必须进行Shuffle
  • 相对于窄依赖,宽依赖付出的代价要高很多,尽量避免使用
  • 例如:groupByKey、reduceByKey、sortByKey


依赖关系确定了DAG切分成Stage的方式

  • 切割规则:从后往前,遇到宽依赖就切割Stage。

示例:WordCount

1
2
3
4
5
val rdd1 = sc.textFile(“hdfs://node01:9000/data/wc/in”) 
val rdd2 = rdd1.flatMap(_.split(“\t”))
val rdd3 = rdd2.map((_,1))
val rdd4 = rdd3.reduceByKey((_+_))
rdd4.saveAsTextFile(“hdfs://node01:9000/data/wc/out”)

d) 运行模式

抽象模式

  1. Driver
    • 一个Spark程序有一个Driver,一个Driver创建一个SparkContext,程序的main函数运行在Driver中
    • 负责解析Spark程序、划分Stage、调度任务到Executor上执行
  2. SparkContext
    • 负责加载配置信息,初始化运行环境,创建DAGScheduler和TaskScheduler
  3. Executor
    • 负责执行Driver分发的任务,一个节点可以启动多个Executor,每个Executor通过多线程运行多个任务
  4. Task
    • Spark运行的基本单位,一个Task负责处理若干RDD分区的计算逻辑

Local模式

  • 单机本地运行,通常用于测试
  • Spark程序以多线程方式直接运行在本地

Standalone模式

  • Spark集群独立运行,使用Spark自带的集群管理器,后台只能运行Spark任务。不依赖于第三方资源管理系统,如:YARN、Mesos
  • 采用Master/Slave架构
  • Driver在Worker中运行,Master只负责集群管理
  • ZooKeeper负责Master HA,避免单点故障
  • 适用于集群规模不大,数据量不大的情况

YARN模式

  • Hadoop集群管理器,部署后可以同时运行MR、Spark、Storm、HBase等各种任务
    YARN模式:
  • YARN-Client模式:适用于交互和调试
  • YARN-Cluster模式:适用于生产环境

Mesos模式

  • 与Yarn最大的不同是Mesos 的资源分配是二次的,Mesos负责分配一次,计算框架可以选择接受或者拒绝。

e) 运行过程

  1. 生成逻辑计划
  2. 生成物理计划
  3. 任务调度与执行

f) DAG任务规划与调度

  1. DAG(Directed Acyclic Graph)
    • 有向无环图:一个有向图无法从任意顶点出发经过若干条边回到该点
    • 受制于某些任务必须比另一些任务较早执行的约束,可排序为一个队列的任务集合,该队列可由一个DAG图呈现
    • Spark程序的内部执行逻辑可由DAG描述,顶点代表任务,边代表任务间的依赖约束
    • DAG反映了RDD之间的依赖关系
  2. DAGScheduler
    • 根据任务的依赖关系建立DAG
    • 根据依赖关系是否为宽依赖,即是否存在Shuffle,将DAG划分为不同的阶段(Stage),如果是窄依赖就将它们放在同一个stage里,遇到宽依赖就断开划分为另一个stage
    • 将各阶段中的Task组成的TaskSet提交到TaskScheduler
  3. TaskScheduler
    • 负责Application的任务调度
    • 重新提交失败的Task
    • 为执行速度慢的Task启动备用Task

总结:RDD之间的依赖关系形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。

(三) 任务监控

  • Web监控:端口4040(以inceptor为例)

参考资料:
[1]30分钟理解Spark的基本原理