Spark高性能Job

Spark高性能Job

1 Spark基础概念

1.1 Job

遇到一个action算子就会提交一个job,常见的transformation算子以及Action算子:

  • Transformation
    • map, mapPartitions, flatMap, filter, union, groupbyKey, repartition, cache
  • Action
    • reduce, collect, show, count, foreach, save一系列操作

1.2 Stage

​ 一个job会有多个算子操作,这些算子都是将一个父RDD转为子RDD。如果父RDD的每个分区只被一个子RDD分区使用,那么就是窄依赖!如果父RDD的每个分区都有可能被多个子RDD分区使用,子RDD分区通常对应父RDD所有分区,那么就是宽依赖,如groupByKey。会通过一个Partitioner函数将父RDD中每个分区上key不同的记录分发到不同的子RDD分区

  • 窄依赖

    • 包括map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues

  • 宽依赖

    • 包括groupByKey, join(父RDD不是hash-partitioned ), partitionBy

shuffle操作即宽依赖算子一般是任务中最耗时耗资源的部分。因为数据可能存放在HDFS不同的节点上,以一个stage的执行需要去拉取上一个stage的数据(shuffle read),保存在自己的节点上,会增加网络通信与IO。

  • 宽依赖与窄依赖对比
    • 宽依赖往往对应者shuffle操作,在运行时会将同一个RDD分区传入到不同的子RDD分区中,中间涉及到多个节点的数据传输,非常耗时。而窄依赖的的每个父RDD分区通常只会传入到一个子RDD分区中,这个操作在一个机器节点就可以完成。
    • 宽依赖:需要所有的父分区都可用,必须等RDD的parent partition数据全部ready之后才能开始计算。

1.3 Task

​ task是spark最小的执行单元,task的数量就是stage的并行度,分配给不同的executor去执行。RDD在计算的时候,每个分区都会启一个task,这就是我们常说的数据并行!在map阶段,partition分区的数量保持不变,在reduce阶段,RDD聚合会触发shuffle操作。

2 Spark Shuffle

shuffle是spark job中一个比较重要的阶段,发生在map与reduce之间。

2.1 举例分析

对于上述的reduceByKey,涉及到需要将相同的key进行聚合。对于Stage1中的每个分区的数据,其输入可能存在于Stage0中的每个分区,因此需要从上游的每一个分区所在的机器拉取数据,这个过程称为shuffle。

2.2 Shuffle Write

shuffle write操作发生在ShuffleMapTask,Spark中的task分为以下两种类型:

  • ShuffleMapTask
    • 负责rdd之间的transform,map的输出也就是shuffle write
  • ResultTask
    • job最后阶段的执行任务,也就是action操作。

2.2.1 shuffle write分析

  • Hash Based Shuffle

    ​ 上图有四个ShuffleMapTask,假设在这四个都在一个worker node上运行,CPU的核为2,可以同时运行2个task(一个核运行两个线程,可能是超线程技术)。

    ​ 那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,下一个stage的task有多少个,当前stage的每个Map task就要创建多少份磁盘文件。比如下一个stage总共有100个task,那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件,所有Executor上会创建5000个磁盘文件。由此可见,未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。

    每个task包含R个缓冲区,R=Reducer的个数(也就是下个stage中task的个数),缓冲区被称为bucket。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去

    但上述可能会出现下面几个问题:

    • 本地磁盘根据bucket产生的blockfile很多,ShuffleMap task产生R个blockfile,M个ShuffleMapTask产生M×R个文件。一般Spark Job的M与R都很大,因此磁盘上会有大量的blockfile文件

    • 缓冲区内存占用空间大

      每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。实际情况下,在一个worker node上,可以并行运行cores个ShuffleMapTask,一个机器上的bucket个数达到cores×R个,这会占用大量的内存空间。

    对于第二个问题,由于从内存往磁盘写数据一定得开缓冲区(内存与磁盘速度不匹配),所以对于第二个问题而言,没有较好的方法解决!但第一个问题可以通过下面的方法解决。

  • Consolidation机制的Shuffle

    ​ 在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。

    ​ 假设第二个stage有100个task,第一个stage有50个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。

  • 在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作。

2.3 Shuffle Read

  • 前一个stage的ShuffleMapTask进行shuffle write,把数据存储在blockManager中,并且把数据位置元信息上报到driver的mapOutTrack组件,下一个stage根据数据位置元信息,进行shuffle read,拉取上一个stage的输出数据。
  • shuffle read实际是从上游executor以block为单位获取数据。当数据分布均匀的时候,导致下游某个partition过大。即上游某个block太大,就会出现OOM。

2.3.1 shuffle read分析

​ 执行shuffle read时,要将数据从MapPartitionsRDD 中的数据 fetch 过来。有一个问题?Reducer怎么知道应该去哪里找需要fetch的数据?答案是:在进行shuffle write的时候,会把数据位置元信息上报到driver的mapOutTrack组件,下一个stage根据数据位置元信息,进行shuffle read,拉取上一个stage的输出数据。

2.4 transform算子对应的shuffle read举例

  • reduceByKey

    reduceByKey是在fetch的同时进行reduce操作。方式是类似于Spark中aggregateByKey的方式,fetch来的数据设置一种数据结构比如HashMap,方便aggregate。reduce阶段fetch来的数据被逐个aggreagte到HashMap中,等所有记录都进入到了HashMap即完成了Reduce任务。注意,在reduce前的map阶段,Spark需要很多小buffer来存储bucket到磁盘!

  • groupByKey

  • distinct

  • cogroup

  • intersection(otherRDD)

  • join(otherRDD, numPartitions)

  • sortByKey

2.5 Shuffle Read中的HashMap

HashMap是Spark Shuffle read过程中频繁使用的、用于aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。(参考链接:https://cloud.tencent.com/developer/article/1085719解释的非常详细)

3 性能优化

3.1 对多次使用的RDD进行持久化

3.2 进来避免使用shuffle算子

  • shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中(也就是我们的blockfile),然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。
  • 常见的shuffle算子有reduceByKey、join、distinct、repartition

3.2.1 举例:使用BroadCast与map来代替join算子

​ 传统的join操作会导致shuffle操作,为什么?不同父RDD中相同的key会通过网络拉取到一个节点上,也就是我们的宽依赖操作然后由该节点上的一个task进行join操作。

改进:

数据量较小的RDD作为广播变量发到每个工作节点,然后在执行join。在rdd1.map中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。然后遍历rdd2的所有数据,若发现某条记录的key与rdd1中当前数据的key相同,则进行join(或者使用rdd2DataBroadcast来与rdd1进行join)。需要注意的是,上述操作,仅仅在rdd2较小(几百兆或者1G)下使用,因为在每个Executor的内存中,都会驻留一份rdd2的全量数据。

1
2
3
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)

3.3 使用map-side预聚合的shuffle算子

  • map-side预聚合

    • 在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合后,每个节点本地只会有一条相同的key,因为多于相同的key都被聚合起来了。reducer在拉取所有节点上相同的key时,就会大大减少拉取的数据数量。
    • 尽量使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子会使用用户自定义函数对每个节点本地相同的key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

    groupByKey算子示例:

    reduceByKey算子示例:

3.4 使用高性能算子

  • 使用mapPartitions代替普通map
    • 一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条。但是单次函数调用就要处理掉一个partition所有的数据,如果内存不够,很可能出现OOM异常

3.5 广播大变量

  • 当算子函数使用外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,每个task都会保留一份变量副本。如果变量本身较大,那么大量的变量副本在网络传输中非常消耗性能(有多少个task就要传递多少个变量副本)。

    解决方法:

    ​ 使用Spark广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 以下代码在算子函数中,使用了外部的变量。
    // 此时没有做任何特殊操作,每个task都会有一份list1的副本。
    val list1 = ...
    rdd1.map(list1...)

    // 以下代码将list1封装成了Broadcast类型的广播变量。
    // 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
    // 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
    // 每个Executor内存中,就只会驻留一份广播变量副本。
    val list1 = ...
    val list1Broadcast = sc.broadcast(list1)
    rdd1.map(list1Broadcast...)

3.6 spark-submit的参数设置

  • 使用spark-submit提交作业,该作业会启动一个对应的Driver进程。根据部署模式的不同,Driver进程可能会在本地启动(local模式),或者集群中某一节点启动(yarn模式)。
  • Driver向Yarn集群管理器申请运行Spark作业的资源,这里的资源指的是Executor进程。Yarn会根据Spark作业设置的参数,在各个工作节点上,启动一定数量的Executor进程,每个进程都占有一定数量的内存以及cpu core
  • 申请好作业执行资源后,Driver进程会调度执行作业代码!首先,Driver进程将作业分成多个stage,并为每个stage创建一批task,然后将各个task分配到各个Executor中执行(在Executor中执行的每个task可以想象为线程)。当一个stage执行结束,会在各个节点本地进行shuffle write,写入中间计算结果。然后Driver开始调度执行下一个stage。
  • 当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个cpu core同一时间只能执行一个线程,而每个Executor进程上分配得到的多个task,多是以多个线程并发运行的。(多个线程抢占Executor的CPU core运行)。如果Executor上的cpu core << task数量,那么每个cpu core会轮询task,有些task会被抢占等待资源,使得作业变慢。

  • num-executors

    • 设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
  • executor-memory

    • 如果每个executor的内存太小,可能会出现OOM
  • executor-cores

    • 这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程
  • spark.default.parallelism

    • 设置stage默认的task数量
    • 该参数非常重要,会影响到总的task数量。如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。因为,无论你的Executor进程有多少个,内存与CPU有多大,但是如果task总量少,Executor进程的资源无法得到充分使用!建议:设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源
  • spark.storage.memoryFraction

    • Executor的内存分配
      • 第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%
    • 该参数在作业中有持久化RDD的时候才需要设置。
  • spark.shuffle.memoryFraction

    • 该参数设置shuffle过程中,一个task拉取到上一个stage的task输出后,进行reduce的时候使用的Executor内存的比例。当设置为0.2,Executor默认只有20%的内存来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

    • 设置的建议:

      如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能

4 性能优化高阶—-数据倾斜

4.1什么是数据倾斜

​ 在进行shuffle操作的时候,将各个节点上相同的key拉取到某个节点上的task来执行(shuffle read阶段)。如果某个key对应的数据量非常大,就会发生数据倾斜。比如,大部分key对应只有10条数据,但个别key有100万,那么大部分task可能只会分配到10条数据(这是reduce阶段决定的,相同的key发往同一个task进行reduce),然后1秒运行结束。但个别task会分配超过100万数据,所以要运行很久。

如图所示,在reduce对应的task中,处理hello的task线程处理的数据量很大,需要很长时间的运行。

4.2 可能会发生数据倾斜的算子

数据倾斜会发生在shuffle过程中,一下算子可能会触发shuffle:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition

4.2.1 某个task执行非常慢的情况

​ 某个task执行非常慢的情况:查看作业当前运行的stage下每个task分配的数据量

​ 在Web UI上可以看到每个stage各个task的分配数据量大小,从而进一步确定是不是task分配的数据不均匀导致。(在Web UI stage栏目可以看到每个stage下并行的task分配的数据量)如果不出意外:会看到有的task运行非常快,几秒钟; 有的task运行非常慢,仅仅从时间上已经能看出数据倾斜了!在通过查看每个task处理的数据量,可以看到处理的数据量也有很大的区别。在知道是哪个stage发生了数据倾斜之后,根据stage的划分原理。推算到Spark作业中对应哪行代码有问题!

4.2.2 某个task出现莫名其妙的内存溢出

通过Web UI使用相同的方法查看

4.2.3 查看导致数据倾斜的key的数据分布情况

​ 通过RDD的countByKey,查看RDD中不同key的数据量分布。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

1
2
3
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

4.3 数据倾斜的解决方案

4.3.1 使用Hive ETL预处理数据

4.3.2 过滤少数倾斜的key

​ 如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

​ 实际操作:采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

4.3.3 提高shuffle操作的并行度

​ 在对RDD进行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数设置了shuffle算子执行时shuffle read task的数量

​ 原理:让原本分配给一个task的多个keu分配多个task,从而让每个task处理比原来更少的数据。缺点:无法应对极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。

4.3.4 两阶段聚合(局部聚合和全局聚合)

​ 对RDD执行reduceByKey等聚合类shuffle算子时使用。第一阶段聚合,先给每个key都打上一个随机数,此时原来的key就不同了。比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。

​ 原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题

​ 方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案

4.3.5 将reduce join改为map join

​ 非常厉害的一个思路!!!!!!!!!!!!!!!!!!!!

​ 使用场景:进行join的两个RDD,其中一个RDD或表的数据量比较小(比如几百M或者一两G)

思路:Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

​ 优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜

​ 缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。

4.3.4 采样倾斜key并分拆join操作

​ 使用场景:当join的两张表都很大,无法使用上面的方案!如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

​ 缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

4.4 Spark Shuffle的多种机制

  • 未经优化的HashShuffleManager
  • consolidate机制的HashShuffleManager
  • SortShuffleManager

    • 普通运行机制
    • bypass机制
  • spark.shuffle.maneger

    默认值是sort,该参数用于设置ShuffleManager的类型

  • spark.shuffle.consolidateFiles

    默认值false,如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。