5.0 RDD 的持久化
每碰到一个 Action 就会产生一个 job, 每个 job 开始计算的时候总是从这个 job 最开始的 RDD 开始计算.
先看一段代码
package day04
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CacheDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(Array("ab", "bc"))
val rdd2 = rdd1.flatMap(x => {
println("flatMap...")
x.split("")
})
val rdd3: RDD[(String, Int)] = rdd2.map(x => {
(x, 1)
})
rdd3.collect.foreach(println)
println("-----------")
rdd3.collect.foreach(println)
}
}
执行结果:
说明:
每调用一次
collect
, 都会创建一个新的 job, 每个 job 总是从它血缘的起始开始计算. 所以, 会发现中间的这些计算过程都会重复的执行.原因是因为
rdd
记录了整个计算过程. 如果计算的过程中出现哪个分区的数据损坏或丢失 , 则可以从头开始计算来达到容错的目的 .
RDD 数据的持久化
每个 job 都会重新进行计算, 在有些情况下是没有必要, 如何解决这个问题呢?
Spark 一个重要能力就是可以
可以使用方法persist()
或者cache()
来持久化一个 RDD. 在第一个 action 会计算这个 RDD, 然后把结果的存储到他的节点的内存中. Spark 的 Cache 也是容错:
另外, 允许我们对持久化的 RDD 使用不同的存储级别.
例如: 可以存在磁盘上, 存储在内存中(堆内存中), 跨节点做复本.
可以给persist()
来传递存储级别. cache()
方法是使用默认存储级别(StorageLevel.MEMORY_ONLY
)的简写方法.
Storage Level | Meaning |
---|---|
MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. |
MEMORY_AND_DISK | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. |
MEMORY_ONLY_SER (Java and Scala) | Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. |
MEMORY_AND_DISK_SER (Java and Scala) | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. |
DISK_ONLY | Store the RDD partitions only on disk. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Same as the levels above, but replicate each partition on two cluster nodes. |
OFF_HEAP (experimental) | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. |
// rdd2.cache() // 等价于 rdd2.persist(StorageLevel.MEMORY_ONLY)
rdd2.persist(StorageLevel.MEMORY_ONLY)
说明:
第一个 job 会计算 RDD2, 以后的 job 就不用再计算了.
有一点需要说明的是, 即使我们不手动设置持久化, Spark 也会自动的对一些 shuffle 操作的中间数据做持久化操作(比如: reduceByKey). 这样做的目的是为了当一个节点 shuffle 失败了避免重新计算整个输入. 当时, 在实际使用的时候, 如果想重用数据, 仍然建议调用
persist
或cache