5.3.1 update 模式下使用 watermark

在 update 模式下, 仅输出与之前批次的结果相比, 涉及更新或新增的数据.

package com.atguigu.ss

import java.sql.Timestamp

import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}

/**
  * Author lzc
  * Date 2019/8/13 4:44 PM
  */
object WordCountWatermark1 {
    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("WordCountWatermark1")
            .getOrCreate()

        import spark.implicits._
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 10000)
            .load

        // 输入的数据中包含时间戳, 而不是自动添加的时间戳
        val words: DataFrame = lines.as[String].flatMap(line => {
            val split = line.split(",")
            split(1).split(" ").map((_, Timestamp.valueOf(split(0))))
        }).toDF("word", "timestamp")

        import org.apache.spark.sql.functions._


        val wordCounts: Dataset[Row] = words
            // 添加watermark, 参数 1: event-time 所在列的列名 参数 2: 延迟时间的上限.
            .withWatermark("timestamp", "2 minutes")
            .groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word")
            .count()

        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("update")
            .trigger(Trigger.ProcessingTime(1000))
            .format("console")
            .option("truncate", "false")
            .start
        query.awaitTermination()
    }
}

注意: 初始化wartmark 是 0

有以下几条数据:

测试:

  1. 输入数据:2019-08-14 10:55:00,dog

这个条数据作为第一批数据. 按照window($"timestamp", "10 minutes", "2 minutes")得到 5 个窗口. 由于是第一批, 所有的窗口的结束时间都大于 wartermark(0), 所以 5 个窗口都显示.

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |1    |
|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |1    |
+------------------------------------------+----+-----+

然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53

  1. 输入数据:2019-08-14 11:00:00,dog

这条数据作为第二批数据, 计算得到 5 个窗口. 此时的watermark=10:53, 所有的窗口的结束时间均大于 watermark. 在 update 模式下, 只输出结果表中涉及更新或新增的数据.

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 11:00:00, 2019-08-14 11:10:00]|dog |1    |
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |2    |
|[2019-08-14 10:58:00, 2019-08-14 11:08:00]|dog |1    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |2    |
|[2019-08-14 10:56:00, 2019-08-14 11:06:00]|dog |1    |
+------------------------------------------+----+-----+

其中: count 是 2 的表示更新, count 是 1 的表示新增. 没有变化的就没有显示.(但是内存中仍然保存着)

// 第一批次中的数据仍然在内存保存着
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |

此时的的 watermark = 11:00 - 2min = 10:58

  1. 输入数据:2019-08-14 10:55:00,dog

相当于一条延迟数据.

这条数据作为第 3 批次, 计算得到 5 个窗口. 此时的 watermark = 10:58 当前内存中有两个窗口的结束时间已经低于 10: 58.

|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |

则立即删除这两个窗口在内存中的维护状态. 同时, 当前批次中新加入的数据所划分出来的窗口, 如果窗口结束时间低于 11:58, 则窗口会被过滤掉.

所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |3    |
|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |2    |
|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |3    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变.(因为 watermask 只能增加不能减少)

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-14 19:25:22

results matching ""

    No results matching ""