5.3.1 Value 类型---2
11. coalesce(numPartitions)
作用: 缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。
scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at parallelize at <console>:24
scala> rdd1.partitions.length
res39: Int = 5
// 减少分区的数量至 2
scala> rdd1.coalesce(2)
res40: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[46] at coalesce at <console>:27
scala> res40.partitions.length
res41: Int = 2
注意:
- 第二个参数表示是否
shuffle
, 如果不传或者传入的为false
, 则表示不进行shuffer
, 此时分区数减少有效, 增加分区数无效.
12. repartition(numPartitions)
作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at parallelize at <console>:24
scala> rdd1.repartition(3)
res44: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[51] at repartition at <console>:27
scala> res44.partitions.length
res45: Int = 3
scala> rdd1.repartition(10)
res46: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[55] at repartition at <console>:27
scala> res46.partitions.length
res47: Int = 10
13. coalasce
和repartition
的区别
coalesce
重新分区,可以选择是否进行shuffle
过程。由参数shuffle: Boolean = false/true
决定。repartition
实际上是调用的coalesce
,进行shuffle
。源码如下:def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
- 如果是减少分区, 尽量避免 shuffle
14. sortBy(func,[ascending], [numTasks])
作用: 使用func
先对数据进行处理,按照处理后结果排序,默认为正序。
scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at parallelize at <console>:24
scala> rdd1.sortBy(x => x).collect
res17: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)
scala> rdd1.sortBy(x => x, true).collect
res18: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)
// 不用正序
scala> rdd1.sortBy(x => x, false).collect
res19: Array[Int] = Array(30, 20, 16, 10, 9, 6, 4, 4, 3, 1)
15. pipe(command, [envVars])
作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell
命令或脚本,返回输出的RDD。
注意:
- 脚本要放在 worker 节点可以访问到的位置
步骤1: 创建一个脚本文件pipe.sh
文件内容如下:
echo "hello"
while read line;do
echo ">>>"$line
done
步骤2: 创建只有 1 个分区的RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.pipe("./pipe.sh").collect
res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)
步骤3: 创建有 2 个分区的 RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd1.pipe("./pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)