5.5.2 Stream-stream Joins
在 Spark2.3, 开始支持 stream-stream join.
Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长. 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长.
5.5.2.1 inner join
对 2 个流式数据进行 join 操作. 输出模式仅支持append
模式
第 1 个数据格式: 姓名,年龄,事件时间
lisi,female,2019-09-16 11:50:00
zs,male,2019-09-16 11:51:00
ww,female,2019-09-16 11:52:00
zhiling,female,2019-09-16 11:53:00
fengjie,female,2019-09-16 11:54:00
yifei,female,2019-09-16 11:55:00
第 2 个数据格式: 姓名,性别,事件时间
lisi,18,2019-09-16 11:50:00
zs,19,2019-09-16 11:51:00
ww,20,2019-09-16 11:52:00
zhiling,22,2019-09-16 11:53:00
yifei,30,2019-09-16 11:54:00
fengjie,98,2019-09-16 11:55:00
不带 watermast 的 inner join
package com.atguigu.ss
import java.sql.Timestamp
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/16 5:09 PM
*/
object StreamStream1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamStream1")
.getOrCreate()
import spark.implicits._
// 第 1 个 stream
val nameSexStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 10000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1), Timestamp.valueOf(arr(2)))
}).toDF("name", "sex", "ts1")
// 第 2 个 stream
val nameAgeStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 20000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
}).toDF("name", "age", "ts2")
// join 操作
val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")
joinResult.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
+-------+------+-------------------+---+-------------------+
| name| sex| ts1|age| ts2|
+-------+------+-------------------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00| 22|2019-09-16 11:53:00|
| ww|female|2019-09-16 11:52:00| 20|2019-09-16 11:52:00|
| yifei|female|2019-09-16 11:55:00| 30|2019-09-16 11:54:00|
| zs| male|2019-09-16 11:51:00| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00| 98|2019-09-16 11:55:00|
| lisi|female|2019-09-16 11:50:00| 18|2019-09-16 11:50:00|
+-------+------+-------------------+---+-------------------+
join 的速度很慢, 需要等待.
带 watermast 的 inner join
package com.atguigu.ss
import java.sql.Timestamp
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
* Author lzc
* Date 2019/8/16 5:09 PM
*/
object StreamStream2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamStream1")
.getOrCreate()
import spark.implicits._
// 第 1 个 stream
val nameSexStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 10000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1), Timestamp.valueOf(arr(2)))
}).toDF("name1", "sex", "ts1")
.withWatermark("ts1", "2 minutes")
// 第 2 个 stream
val nameAgeStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 20000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
}).toDF("name2", "age", "ts2")
.withWatermark("ts2", "1 minutes")
// join 操作
val joinResult: DataFrame = nameSexStream.join(
nameAgeStream,
expr(
"""
|name1=name2 and
|ts2 >= ts1 and
|ts2 <= ts1 + interval 1 minutes
""".stripMargin))
joinResult.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
+-------+------+-------------------+-------+---+-------------------+
| name1| sex| ts1| name2|age| ts2|
+-------+------+-------------------+-------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00|zhiling| 22|2019-09-16 11:53:00|
| ww|female|2019-09-16 11:52:00| ww| 20|2019-09-16 11:52:00|
| zs| male|2019-09-16 11:51:00| zs| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00|fengjie| 98|2019-09-16 11:55:00|
| lisi|female|2019-09-16 11:50:00| lisi| 18|2019-09-16 11:50:00|
+-------+------+-------------------+-------+---+-------------------+
5.5.2.2 outer join
外连接必须使用 watermast
和你连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left_join"
val joinResult: DataFrame = nameSexStream.join(
nameAgeStream,
expr(
"""
|name1=name2 and
|ts2 >= ts1 and
|ts2 <= ts1 + interval 1 minutes
""".stripMargin),
joinType = "left_join")