15.2 自定义数据源

使用及说明

其实就是自定义接收器

需要继承Receiver,并实现onStartonStop方法来自定义数据源采集。

案例实操

需求:

自定义数据源,实现监控某个端口号,获取该端口号内容。

代码

自定义数据源

object MySource{
    def apply(host: String, port: Int): MySource = new MySource(host, port)
}

class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
    /*
    接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.
    这个函数内部必须初始化一些读取数据必须的资源
    该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.
     */
    override def onStart(): Unit = {

        // 启动一个新的线程来接收数据
        new Thread("Socket Receiver"){
            override def run(): Unit = {
                receive()
            }
        }.start()
    }

    // 此方法用来接收数据
    def receive()={
        val socket = new Socket(host, port)
        val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
        var line: String = null
        // 当 receiver没有关闭, 且reader读取到了数据则循环发送给spark
        while (!isStopped && (line = reader.readLine()) != null ){
            // 发送给spark
            store(line)
        }
        // 循环结束, 则关闭资源
        reader.close()
        socket.close()

        // 重启任务
        restart("Trying to connect again")
    }
    override def onStop(): Unit = {

    }

}

使用自定义数据源

object MySourceDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
        // 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔
        val ssc = new StreamingContext(conf, Seconds(5))
        // 2. 创建一个DStream
        val lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("hadoop201", 9999))
        // 3. 一个个的单词
        val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
        // 4. 单词形成元组
        val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
        // 5. 统计单词的个数
        val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
        //6. 显示
        count.print
        //7. 启动流式任务开始计算
        ssc.start()
        //8. 等待计算结束才退出主程序
        ssc.awaitTermination()
        ssc.stop(false)
    }
}

开启端口

nc -lk 10000
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-04-26 19:18:49

results matching ""

    No results matching ""