5.3.1 Value 类型---2

11. coalesce(numPartitions)

作用: 缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。

scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at parallelize at <console>:24

scala> rdd1.partitions.length
res39: Int = 5

// 减少分区的数量至 2 
scala> rdd1.coalesce(2)
res40: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[46] at coalesce at <console>:27

scala> res40.partitions.length
res41: Int = 2

注意:

  • 第二个参数表示是否shuffle, 如果不传或者传入的为false, 则表示不进行shuffer, 此时分区数减少有效, 增加分区数无效.

12. repartition(numPartitions)

作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.

新的分区数相比以前可以多, 也可以少

scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at parallelize at <console>:24

scala> rdd1.repartition(3)
res44: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[51] at repartition at <console>:27

scala> res44.partitions.length
res45: Int = 3

scala> rdd1.repartition(10)
res46: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[55] at repartition at <console>:27

scala> res46.partitions.length
res47: Int = 10

13. coalascerepartition的区别

  1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

  2. repartition实际上是调用的coalesce,进行shuffle。源码如下:

     def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
     coalesce(numPartitions, shuffle = true)
    }
    
  3. 如果是减少分区, 尽量避免 shuffle

14. sortBy(func,[ascending], [numTasks])

作用: 使用func先对数据进行处理,按照处理后结果排序,默认为正序。


scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24

scala> rdd1.sortBy(x => x).collect
res17: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)

scala> rdd1.sortBy(x => x, true).collect
res18: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)

// 不用正序
scala> rdd1.sortBy(x => x, false).collect
res19: Array[Int] = Array(30, 20, 16, 10, 9, 6, 4, 4, 3, 1)

15. pipe(command, [envVars])

作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.

注意:

  • 脚本要放在 worker 节点可以访问到的位置

步骤1: 创建一个脚本文件pipe.sh

文件内容如下:

echo "hello"
while read line;do
    echo ">>>"$line
done

步骤2: 创建只有 1 个分区的RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.pipe("./pipe.sh").collect
res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)

步骤3: 创建有 2 个分区的 RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.pipe("./pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)

总结: 每个分区执行一次脚本, 但是每个元素算是标准输入中的一行

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-05-30 10:23:37

results matching ""

    No results matching ""