5.4 流数据去重
根据唯一的 id 实现数据去重.
数据:
1,2019-09-14 11:50:00,dog
2,2019-09-14 11:51:00,dog
1,2019-09-14 11:50:00,dog
3,2019-09-14 11:53:00,dog
1,2019-09-14 11:50:00,dog
4,2019-09-14 11:45:00,dog
package com.atguigu.ss
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Author lzc
* Date 2019/8/14 5:52 PM
*/
object StreamDropDuplicate {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Test")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 10000)
.load()
val words: DataFrame = lines.as[String].map(line => {
val arr: Array[String] = line.split(",")
(arr(0), Timestamp.valueOf(arr(1)), arr(2))
}).toDF("uid", "ts", "word")
val wordCounts: Dataset[Row] = words
.withWatermark("ts", "2 minutes")
.dropDuplicates("uid") // 去重重复数据 uid 相同就是重复. 可以传递多个列
wordCounts.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
}
}
注意:
dropDuplicates
不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates
- 使用
watermask
- 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。 - 没有
watermask
- 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态。
测试
- 第一批:
1,2019-09-14 11:50:00,dog
+---+-------------------+----+ |uid| ts|word| +---+-------------------+----+ | 1|2019-09-14 11:50:00| dog| +---+-------------------+----+
- 第 2 批:
2,2019-09-14 11:51:00,dog
+---+-------------------+----+ |uid| ts|word| +---+-------------------+----+ | 2|2019-09-14 11:51:00| dog| +---+-------------------+----+
第 3 批:
1,2019-09-14 11:50:00,dog
id 重复无输出第 4 批:
3,2019-09-14 11:53:00,dog
+---+-------------------+----+ |uid| ts|word| +---+-------------------+----+ | 3|2019-09-14 11:53:00| dog| +---+-------------------+----+
此时 watermask=11:51
- 第 5 批:
1,2019-09-14 11:50:00,dog
数据重复, 并且数据过期, 所以无输出 - 第 6 批
4,2019-09-14 11:45:00,dog
数据过时, 所以无输出