第 2 章 Structure Streaming 快速入门

为了使用最稳定最新的 Structure Streaming, 我们使用最新版本.

本入门案例是从一个网络端口中读取数据, 并统计每个单词出现的数量.

2.1 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

2.2 具体实现

package com.atguigu.ss

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/12 10:56 AM
  */
object WordCount1 {
    def main(args: Array[String]): Unit = {
        // 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("WordCount1")
            .getOrCreate()
        import spark.implicits._
        // 2. 从数据源(socket)中加载数据.
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "hadoop201")
            .option("port", 9999)
            .load

        // 3. 把每行数据切割成单词
        val words: Dataset[String] = lines.as[String].flatMap(_.split("\\W"))

        // 4. 计算 word count
        val wordCounts: DataFrame = words.groupBy("value").count()

        // 5. 启动查询, 把结果打印到控制台
        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("complete")
            .format("console")
            .start
        query.awaitTermination()

        spark.stop()
    }
}

2.3 测试

  1. 在 hadoop201 启动 socket 服务:

     nc -lk 9999
    
  2. 启动 Structured Steaming 程序

输入一些单词, 查看程序的输出结果:

2.4 代码说明

  1. DataFrame lines 表示一个"无界表(unbounded table)", 存储着流中所有的文本数据. 这个无界表包含列名为value的一列数据, 数据的类型为String, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row). 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据

  2. 紧接着我们把 DateFrame 通过 .as[String] 变成了 DataSet, 所以我们可以切割每行为多个单词.得到的 words DataSet包含了所有的单词.

  3. 最后, 我们通过value(每个唯一的单词)进行分组得到wordCounts DataFrame, 并且统计每个单词的个数. 注意, wordCounts是一个流式DataFrame, 它表示流中正在运行的单词数(the running word counts of the stream).

  4. 我们必须在流式数据(streaming data)上启动查询. 剩下的实际就是开始接收数据和计算个数. 为此, 当数据更新的时候, 我们通过outputMode("complete")来打印完整的计数集到控制台, 然后通过.start来启动流式计算.

  5. 代码执行之后, 流式计算将会在后台启动. 查询对象(query: StreamingQuery)可以激活流式查询(streaming query), 然后通过awaitTermination()来等待查询的终止,从而阻止查询激活之后进程退出.

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-12 13:57:36

results matching ""

    No results matching ""