第 8 章 Spark 数据倾斜解决方案
Spark 中的数据倾斜问题主要指shuffle
过程中出现的数据倾斜问题,是由于不同的key
对应的数据量不同导致的不同task
所处理的数据量不同的问题。
例如,reduce
点一共要处理100
万条数据,第一个和第二个task
分别被分配到了1万
条数据,计算5分钟
内完成,第三个task
分配到了98万
数据,此时第三个task
可能需要10个小时
完成,这使得整个Spark作业需要10个小时
才能运行完成,这就是数据倾斜所带来的后果。
注意,要区分开数据倾斜与数据量过量这两种情况,task
被分配了绝大多数的数据,因此少数task
运行缓慢;数据过量是指所有task
被分配的数据量都很大,相差不多,所有task
都运行缓慢。
数据倾斜的表现:
Spark
作业的大部分task
都执行迅速,只有有限的几个task
执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;Spark 作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行。
定位数据倾斜问题:
查阅代码中的
shuffle
算子,例如reduceByKey
、countByKey
、groupByKey
、join
等算子,根据代码逻辑判断此处是否会出现数据倾斜;查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的 shuffle 算子是哪一个;
8.1 聚合原数据
1. 避免shuffle
过程
绝大多数情况下,Spark 作业的数据来源都是 Hive 表,这些 Hive 表基本都是经过 ETL 之后的昨天的数据。 为了避免数据倾斜,我们可以考虑避免 shuffle 过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能。
如果Spark作业的数据来源于Hive表,那么可以先在 Hive 表中对数据进行聚合,例如按照 key
进行分组,将同一key
对应的所有value
用一种特殊的格式拼接到一个字符串里去,这样,一个key
就只有一条数据了;之后,对一个key
的所有value
进行处理时,只需要进行map
操作即可,无需再进行任何的shuffle
操作。通过上述方式就避免了执行shuffle
操作,也就不可能会发生任何的数据倾斜问题。
对于Hive表中数据的操作,不一定是拼接成一个字符串,也可以是直接对key的每一条数据进行累计计算。
2. 缩小key
粒度(增大数据倾斜可能性,降低每个task
的数据量)
key
的数量增加,可能使数据倾斜更严重。
3. 增大key
粒度(减小数据倾斜可能性,增大每个task
的数据量)
如果没有办法对每个key
聚合出来一条数据,在特定场景下,可以考虑扩大key
的聚合粒度。
例如,目前有10万条
用户数据,当前key
的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key
的粒度扩大为(省,城市,日期),这样的话,key
的数量会减少,key
之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)
8.2 过滤导致倾斜的key
如果在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行过滤,滤除可能导致数据倾斜的key对应的数据,这样,在Spark作业中就不会发生数据倾斜了。
8.3 提高shuffle
操作中的reduce
并行度
当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高shuffle
过程中的reduce
端并行度,reduce
端并行度的提高就增加了reduce
端task
的数量,那么每个task
分配到的数据量就会相应减少,由此缓解数据倾斜问题。
1. reduce
端并行度的设置
在大部分的shuffle
算子中,都可以传入一个并行度的设置参数,比如reduceByKey(500)
,这个参数会决定shuffle
过程中reduce
端的并行度,在进行shuffle
操作的时候,就会对应着创建指定数量的reduce task
。
对于Spark SQL中的shuffle
类语句,比如group by、join
等,需要设置一个参数,即spark.sql.shuffle.partitions
,该参数代表了shuffle read task
的并行度,该值默认是200
,对于很多场景来说都有点过小。
增加shuffle read task
的数量,可以让原本分配给一个task
的多个key
分配给多个task
,从而让每个task
处理比原来更少的数据。
举例来说,如果原本有5
个key
,每个key
对应10
条数据,这5
个key
都是分配给一个task
的,那么这个task
就要处理50
条数据。而增加了shuffle read task
以后,每个task
就分配到一个key
,即每个task
就处理10
条数据,那么自然每个task
的执行时间都会变短了。
2. reduce
端并行度设置存在的缺陷
提高reduce
端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻shuffle reduce task
的数据压力,以及数据倾斜的问题,适用于有较多key
对应的数据量都比较大的情况。
该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key
对应的数据量有100万
,那么无论你的task
数量增加到多少,这个对应着100万
数据的key
肯定还是会分配到一个task
中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
在理想情况下,reduce
端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的task
运行速度稍有提升,或者避免了某些task
的OOM
问题,但是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。
8.4 使用随机key
实现双重聚合
当使用了类似于groupByKey
、reduceByKey
这样的算子时,可以考虑使用随机key
实现双重聚合
首先,通过map
算子给每个数据的key
添加随机数前缀,对key
进行打散,将原先一样的key
变成不一样的key
,然后进行第一次聚合,这样就可以让原本被一个task处理的数据分散到多个task
上去做局部聚合;
随后,去除掉每个key
的前缀,再次进行聚合。
此方法对于由groupByKey
、reduceByKey
这类算子造成的数据倾斜由比较好的效果,仅仅适用于聚合类的shuffle
操作,适用范围相对较窄。
如果是join
类的shuffle
操作,还得用其他的解决方案。
此方法也是前几种方案没有比较好的效果时要尝试的解决方案。
8.5 将reduce join
转换为map join
正常情况下,join
操作都会执行shuffle
过程,并且执行的是reduce join
,也就是先将所有相同的key
和对应的value
汇聚到一个reduce task
中,然后再进行join
。
普通join
的过程如下图所示:
普通的join
是会走shuffle
过程的,而一旦shuffle
,就相当于会将相同key
的数据拉取到一个shuffle read task
中再进行join
,此时就是reduce join
。
但是如果一个RDD
是比较小的,则可以采用广播小RDD全量数据+map算子
来实现与join
同样的效果,也就是map join
,此时就不会发生shuffle
操作,也就不会发生数据倾斜。
核心思想
不使用join
算子进行连接操作,而使用Broadcast
变量与map
类算子实现join
操作,进而完全规避掉shuffle
类的操作,彻底避免数据倾斜的发生和出现。
将较小 RDD 中的数据直接通过collect
算子拉取到Driver
端的内存中来,然后对其创建一个Broadcast
变量;
接着对另外一个RDD
执行map
类算子,在算子函数内,从Broadcast
变量中获取较小RDD
的全量数据,与当前RDD
的每一条数据按照连接key
进行比对,如果连接key
相同的话,那么就将两个RDD
的数据用你需要的方式连接起来。
根据上述思路,根本不会发生shuffle
操作,从根本上杜绝了join
操作可能导致的数据倾斜问题。
当join
操作有数据倾斜问题并且其中一个RDD
的数据量较小时,可以优先考虑这种方式,效果非常好。map join
的过程如图:
不适用场景分析
由于 Spark 的广播变量是在每个Executor
中保存一个副本,如果两个RDD
数据量都比较大,那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出
8.6 sample
采样对倾斜key
单独进行join
在 Spark 中,如果某个RDD
只有一个key
,那么在shuffle
过程中会默认将此key
对应的数据打散,由不同的reduce
端task
进行处理。
所以, 当由单个key
导致数据倾斜时,可有将发生数据倾斜的key
单独提取出来,组成一个RDD
,然后用这个原本会导致倾斜的key
组成的RDD
跟其他RDD
单独join
,此时,根据 Spark 的运行机制,此 RDD 中的数据会在shuffle
阶段被分散到多个task
中去进行join
操作。倾斜key
单独join
的流程如图
适用场景分析
对于RDD
中的数据,可以将其转换为一个中间表,或者是直接使用countByKey()
的方式,看一个这个RDD
中各个key
对应的数据量,此时如果你发现整个RDD
就一个key
的数据量特别多,那么就可以考虑使用这种方法。
当数据量非常大时,可以考虑使用sample
采样获取10%
的数据,然后分析这10%
的数据中哪个key
可能会导致数据倾斜,然后将这个key
对应的数据单独提取出来。
不适用场景分析
如果一个RDD
中导致数据倾斜的key
很多,那么此方案不适用
8.7 使用随机数以及扩容进行join
如果在进行join
操作时,RDD
中有大量的key
导致数据倾斜,那么进行分拆key
也没什么意义,此时就只能使用最后一种方案来解决问题了,对于join
操作,我们可以考虑对其中一个RDD
数据进行扩容,另一个RDD
进行稀释后再join
。
我们会将原先一样的key
通过附加随机前缀变成不一样的key
,然后就可以将这些处理后的“不同key
”分散到多个task
中去处理,而不是让一个task
处理大量的相同key
。
这一种方案是针对有大量倾斜key
的情况,没法将部分key
拆分出来进行单独处理,需要对整个RDD
进行数据扩容,对内存资源要求很高。
核心思想
选择一个RDD
,使用flatMap
进行扩容,对每条数据的key
添加数值前缀(1~N的数值),将一条数据映射为多条数据;(扩容)
选择另外一个RDD
,进行map
映射操作,每条数据的key
都打上一个随机数作为前缀(1~N的随机数);(稀释)
局限性
如果两个 RDD 都很大,那么将 RDD 进行 N倍 的扩容显然行不通; 使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。