5.4 流数据去重

根据唯一的 id 实现数据去重.

数据:

1,2019-09-14 11:50:00,dog
2,2019-09-14 11:51:00,dog
1,2019-09-14 11:50:00,dog
3,2019-09-14 11:53:00,dog
1,2019-09-14 11:50:00,dog
4,2019-09-14 11:45:00,dog
package com.atguigu.ss

import java.sql.Timestamp

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 5:52 PM
  */
object StreamDropDuplicate {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 10000)
            .load()

        val words: DataFrame = lines.as[String].map(line => {
            val arr: Array[String] = line.split(",")
            (arr(0), Timestamp.valueOf(arr(1)), arr(2))
        }).toDF("uid", "ts", "word")

        val wordCounts: Dataset[Row] = words
            .withWatermark("ts", "2 minutes")
            .dropDuplicates("uid")  // 去重重复数据 uid 相同就是重复.  可以传递多个列

        wordCounts.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()

    }
}

注意:

  1. dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates
  2. 使用watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。
  3. 没有watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。

测试

  1. 第一批: 1,2019-09-14 11:50:00,dog
    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  1|2019-09-14 11:50:00| dog|
    +---+-------------------+----+
    
  2. 第 2 批: 2,2019-09-14 11:51:00,dog
    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  2|2019-09-14 11:51:00| dog|
    +---+-------------------+----+
    
  3. 第 3 批: 1,2019-09-14 11:50:00,dog
    id 重复无输出

  4. 第 4 批: 3,2019-09-14 11:53:00,dog

    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  3|2019-09-14 11:53:00| dog|
    +---+-------------------+----+
    

    此时 watermask=11:51

  5. 第 5 批: 1,2019-09-14 11:50:00,dog 数据重复, 并且数据过期, 所以无输出
  6. 第 6 批 4,2019-09-14 11:45:00,dog 数据过时, 所以无输出
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-14 19:33:01

results matching ""

    No results matching ""