第 2 章 Structure Streaming 快速入门
为了使用最稳定最新的 Structure Streaming, 我们使用最新版本.
本入门案例是从一个网络端口中读取数据, 并统计每个单词出现的数量.
2.1 导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
2.2 具体实现
package com.atguigu.ss
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/12 10:56 AM
*/
object WordCount1 {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkSession. 因为 ss 是基于 spark sql 引擎, 所以需要先创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("WordCount1")
.getOrCreate()
import spark.implicits._
// 2. 从数据源(socket)中加载数据.
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "hadoop201")
.option("port", 9999)
.load
// 3. 把每行数据切割成单词
val words: Dataset[String] = lines.as[String].flatMap(_.split("\\W"))
// 4. 计算 word count
val wordCounts: DataFrame = words.groupBy("value").count()
// 5. 启动查询, 把结果打印到控制台
val query: StreamingQuery = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start
query.awaitTermination()
spark.stop()
}
}
2.3 测试
在 hadoop201 启动 socket 服务:
nc -lk 9999
- 启动 Structured Steaming 程序
输入一些单词, 查看程序的输出结果:
2.4 代码说明
DataFrame lines
表示一个"无界表(unbounded table)", 存储着流中所有的文本数据. 这个无界表包含列名为value
的一列数据, 数据的类型为String
, 而且在流式文本数据中的每一行(line)就变成了无界表中的的一行(row). 注意, 这时我们仅仅设置了转换操作, 还没有启动它, 所以现在还没有收到任何数据紧接着我们把 DateFrame 通过
.as[String]
变成了 DataSet, 所以我们可以切割每行为多个单词.得到的words DataSet
包含了所有的单词.最后, 我们通过
value
(每个唯一的单词)进行分组得到wordCounts DataFrame
, 并且统计每个单词的个数. 注意,wordCounts
是一个流式DataFrame
, 它表示流中正在运行的单词数(the running word counts of the stream
).我们必须在流式数据(streaming data)上启动查询. 剩下的实际就是开始接收数据和计算个数. 为此, 当数据更新的时候, 我们通过
outputMode("complete")
来打印完整的计数集到控制台, 然后通过.start
来启动流式计算.代码执行之后, 流式计算将会在后台启动. 查询对象(query: StreamingQuery)可以激活流式查询(streaming query), 然后通过
awaitTermination()
来等待查询的终止,从而阻止查询激活之后进程退出.