5.2 基于 event-time 的窗口操作

5.2.1 event-time 窗口理解

在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time 进行操作.

在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系.

因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量.

我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量. 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达.

现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。

统计后的结果应该是这样的:

package com.atguigu.ss

import java.sql.Timestamp

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 4:44 PM
  */
object WordCountWindow {
    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("WordCount1")
            .getOrCreate()

        import spark.implicits._
        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 10000)
            .option("includeTimestamp", true) // 给产生的数据自动添加时间戳
            .load

        // 把行切割成单词, 保留时间戳
        val words: DataFrame = lines.as[(String, Timestamp)].flatMap(line => {
            line._1.split(" ").map((_, line._2))
        }).toDF("word", "timestamp")

        import org.apache.spark.sql.functions._

        // 按照窗口和单词分组, 并且计算每组的单词的个数
        val wordCounts: Dataset[Row] = words.groupBy(
            // 调用 window 函数, 返回的是一个 Column 参数 1: df 中表示时间戳的列 参数 2: 窗口长度 参数 3: 滑动步长
            window($"s", "10 minutes", "5 minutes"),
            $"word"
        ).count().orderBy($"window")  // 计数, 并按照窗口排序

        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
            .start
        query.awaitTermination()
    }
}

由此可以看出, 在这种窗口机制下, 无论事件何时到达, 以怎样的顺序到达, Structured Streaming 总会根据事件时间生成对应的若干个时间窗口, 然后按照指定的规则聚合.

5.2.2 event-time 窗口生成规则

org.apache.spark.sql.catalyst.analysis.TimeWindowing

// 窗口个数
maxNumOverlapping = ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)
   windowId <- ceil((timestamp - startTime) / slideDuration)
   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
   windowEnd <- windowStart + windowDuration
   return windowStart, windowEnd

将event-time 作为"初始窗口"的结束时间, 然后按照窗口滑动宽度逐渐向时间轴前方推进, 直到某个窗口不再包含该 event-time 为止. 最终以"初始窗口"与"结束窗口"之间的若干个窗口作为最终生成的 event-time 的时间窗口.

每个窗口的起始时间与结束时间都是前必后开的区间, 因此初始窗口和结束窗口都不会包含 event-time, 最终不会被使用.

得到窗口如下:

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-09 19:01:18

results matching ""

    No results matching ""