5.5.2 Stream-stream Joins

在 Spark2.3, 开始支持 stream-stream join.

Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长. 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长.

5.5.2.1 inner join

对 2 个流式数据进行 join 操作. 输出模式仅支持append模式

第 1 个数据格式: 姓名,年龄,事件时间

lisi,female,2019-09-16 11:50:00
zs,male,2019-09-16 11:51:00
ww,female,2019-09-16 11:52:00
zhiling,female,2019-09-16 11:53:00
fengjie,female,2019-09-16 11:54:00
yifei,female,2019-09-16 11:55:00

第 2 个数据格式: 姓名,性别,事件时间

lisi,18,2019-09-16 11:50:00
zs,19,2019-09-16 11:51:00
ww,20,2019-09-16 11:52:00
zhiling,22,2019-09-16 11:53:00
yifei,30,2019-09-16 11:54:00
fengjie,98,2019-09-16 11:55:00

不带 watermast 的 inner join

package com.atguigu.ss

import java.sql.Timestamp

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/16 5:09 PM
  */
object StreamStream1 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamStream1")
            .getOrCreate()
        import spark.implicits._

        // 第 1 个 stream
        val nameSexStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "hadoop201")
            .option("port", 10000)
            .load
            .as[String]
            .map(line => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1), Timestamp.valueOf(arr(2)))
            }).toDF("name", "sex", "ts1")

        // 第 2 个 stream
        val nameAgeStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "hadoop201")
            .option("port", 20000)
            .load
            .as[String]
            .map(line => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
            }).toDF("name", "age", "ts2")


        // join 操作
        val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")

        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .trigger(Trigger.ProcessingTime(0))
            .start()
            .awaitTermination()
    }
}
+-------+------+-------------------+---+-------------------+
|   name|   sex|                ts1|age|                ts2|
+-------+------+-------------------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00| 22|2019-09-16 11:53:00|
|     ww|female|2019-09-16 11:52:00| 20|2019-09-16 11:52:00|
|  yifei|female|2019-09-16 11:55:00| 30|2019-09-16 11:54:00|
|     zs|  male|2019-09-16 11:51:00| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00| 98|2019-09-16 11:55:00|
|   lisi|female|2019-09-16 11:50:00| 18|2019-09-16 11:50:00|
+-------+------+-------------------+---+-------------------+

join 的速度很慢, 需要等待.

带 watermast 的 inner join

package com.atguigu.ss

import java.sql.Timestamp

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
  * Author lzc
  * Date 2019/8/16 5:09 PM
  */
object StreamStream2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamStream1")
            .getOrCreate()
        import spark.implicits._

        // 第 1 个 stream
        val nameSexStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "hadoop201")
            .option("port", 10000)
            .load
            .as[String]
            .map(line => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1), Timestamp.valueOf(arr(2)))
            }).toDF("name1", "sex", "ts1")
            .withWatermark("ts1", "2 minutes")

        // 第 2 个 stream
        val nameAgeStream: DataFrame = spark.readStream
            .format("socket")
            .option("host", "hadoop201")
            .option("port", 20000)
            .load
            .as[String]
            .map(line => {
                val arr: Array[String] = line.split(",")
                (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
            }).toDF("name2", "age", "ts2")
            .withWatermark("ts2", "1 minutes")


        // join 操作
        val joinResult: DataFrame = nameSexStream.join(
            nameAgeStream,
            expr(
                """
                  |name1=name2 and
                  |ts2 >= ts1 and
                  |ts2 <= ts1 + interval 1 minutes
                """.stripMargin))

        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .trigger(Trigger.ProcessingTime(0))
            .start()
            .awaitTermination()
    }
}
+-------+------+-------------------+-------+---+-------------------+
|  name1|   sex|                ts1|  name2|age|                ts2|
+-------+------+-------------------+-------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00|zhiling| 22|2019-09-16 11:53:00|
|     ww|female|2019-09-16 11:52:00|     ww| 20|2019-09-16 11:52:00|
|     zs|  male|2019-09-16 11:51:00|     zs| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00|fengjie| 98|2019-09-16 11:55:00|
|   lisi|female|2019-09-16 11:50:00|   lisi| 18|2019-09-16 11:50:00|
+-------+------+-------------------+-------+---+-------------------+

5.5.2.2 outer join

外连接必须使用 watermast

和你连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left_join"

val joinResult: DataFrame = nameSexStream.join(
            nameAgeStream,
            expr(
                """
                  |name1=name2 and
                  |ts2 >= ts1 and
                  |ts2 <= ts1 + interval 1 minutes
                """.stripMargin),
            joinType = "left_join")
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-16 19:40:37

results matching ""

    No results matching ""