5.3.3 Key-Value 类型---2
7. combineByKey[C]
函数声明:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
作用: 针对每个
K
, 将V
进行合并成C
, 得到RDD[(K,C)]
参数描述:
createCombiner:
combineByKey
会遍历分区中的每个key-value
对. 如果第一次 碰到这个key
, 则调用createCombiner
函数,传入value
, 得到一个C
类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法 )mergeValue:
如果不是第一个遇到这个key
, 则调用这个函数进行合并操作.分区内合并 mergeCombiners
跨分区合并相同的key
的值(C
).跨分区合并
workcount
案例
需求1: 创建一个 pairRDD
,根据 key
计算每种 key
的value
的平均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
需求分析: ppt 分析
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24
// acc 累加器, 用来记录分区内的值的和这个 key 出现的次数
// acc1, acc2 跨分区的累加器
scala> input.combineByKey((_, 1), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1:(Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 + acc2._2))
res10: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[7] at combineByKey at <console>:27
scala> res10.collect
res11: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
scala> res10.map(t => (t._1, t._2._1.toDouble / t._2._2)).collect
res12: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
对比几个按照 key 聚集的函数的区别和联系
8. sortByKey
作用: 在一个(K,V)
的 RDD 上调用, K
必须实现 Ordered[K]
接口(或者有一个隐式值: Ordering[K]
), 返回一个按照key
进行排序的(K,V)
的 RDD
scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> rdd.sortByKey()
res25: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[14] at sortByKey at <console>:27
scala> res25.collect
res26: Array[(Int, String)] = Array((1,a), (4,d), (10,b), (10,e), (11,c), (20,d))
scala> rdd.sortByKey(true).collect
res27: Array[(Int, String)] = Array((1,a), (4,d), (10,b), (10,e), (11,c), (20,d))
// 倒序
scala> rdd.sortByKey(false).collect
res28: Array[(Int, String)] = Array((20,d), (11,c), (10,b), (10,e), (4,d), (1,a))
9. mapValues
作用: 针对(K,V)
形式的类型只对V
进行操作
scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.mapValues("<" + _ + ">").collect
res29: Array[(Int, String)] = Array((1,<a>), (10,<b>), (11,<c>), (4,<d>), (20,<d>), (10,<e>))
10. join(otherDataset, [numTasks])
内连接:
在类型为(K,V)
和(K,W)
的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))
的RDD
scala> var rdd1 = sc.parallelize(Array((1, "a"), (1, "b"), (2, "c")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> var rdd2 = sc.parallelize(Array((1, "aa"), (3, "bb"), (2, "cc")))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd1.join(rdd2).collect
res2: Array[(Int, (String, String))] = Array((2,(c,cc)), (1,(a,aa)), (1,(b,aa)))
注意:
如果某一个 RDD 有重复的
Key
, 则会分别与另外一个 RDD 的相同的Key
进行组合.也支持外连接:
leftOuterJoin, rightOuterJoin, and fullOuterJoin.
11. cogroup(otherDataset, [numTasks])
作用:在类型为(K,V)
和(K,W)
的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))
类型的 RDD
scala> val rdd1 = sc.parallelize(Array((1, 10),(2, 20),(1, 100),(3, 30)),1)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array((1, "a"),(2, "b"),(1, "aa"),(3, "c")),1)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> rdd1.cogroup(rdd2).collect
res9: Array[(Int, (Iterable[Int], Iterable[String]))] = Array((1,(CompactBuffer(10, 100),CompactBuffer(a, aa))), (3,(CompactBuffer(30),CompactBuffer(c))), (2,(CompactBuffer(20),CompactBuffer(b))))