15.1 RDD 队列

用法及说明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。


案例实操

需求:循环创建几个 RDD,将 RDD 放入队列。通过 Spark Streaming创建 Dstream,计算 WordCount

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * Author lzc
  * Date 2019/4/26 5:31 PM
  */
object RDDQueueDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("RDDQueueDemo").setMaster("local[*]")
        val scc = new StreamingContext(conf, Seconds(5))
        val sc = scc.sparkContext

        // 创建一个可变队列
        val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()

        val rddDS: InputDStream[Int] = scc.queueStream(queue, true)
        rddDS.reduce(_ + _).print

        scc.start

        // 循环的方式向队列中添加 RDD
        for (elem <- 1 to 5) {
            queue += sc.parallelize(1 to 100)
            Thread.sleep(2000)
        }

        scc.awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-04-26 17:45:05

results matching ""

    No results matching ""