15.1 RDD 队列
用法及说明
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)
来创建DStream
,每一个推送到这个队列中的RDD
,都会作为一个DStream
处理。
案例实操
需求:循环创建几个 RDD,将 RDD 放入队列。通过 Spark Streaming创建 Dstream,计算 WordCount
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* Author lzc
* Date 2019/4/26 5:31 PM
*/
object RDDQueueDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDQueueDemo").setMaster("local[*]")
val scc = new StreamingContext(conf, Seconds(5))
val sc = scc.sparkContext
// 创建一个可变队列
val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()
val rddDS: InputDStream[Int] = scc.queueStream(queue, true)
rddDS.reduce(_ + _).print
scc.start
// 循环的方式向队列中添加 RDD
for (elem <- 1 to 5) {
queue += sc.parallelize(1 to 100)
Thread.sleep(2000)
}
scc.awaitTermination()
}
}