5.5.1 Stream-static Joins

模拟的静态数据:

lisi,male
zhiling,female
zs,male

模拟的流式数据:

lisi,20
zhiling,40
ww,30

5.5.1.1 内连接

package com.atguigu.ss

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 4:41 PM
  */
object StreamingStatic {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("StreamingStatic")
            .getOrCreate()
        import spark.implicits._

        // 1. 静态 df
        val arr = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
        var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")

        // 2. 流式 df
        val lines: DataFrame = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 10000)
            .load()
        val streamDF: DataFrame = lines.as[String].map(line => {
            val arr = line.split(",")
            (arr(0), arr(1).toInt)
        }).toDF("name", "age")

        // 3. join   等值内连接  a.name=b.name
        val joinResult: DataFrame = streamDF.join(staticDF, "name")

        // 4. 输出
        joinResult.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()

    }
}
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|   lisi| 20|  male|
+-------+---+------+

5.5.1.2 外连接

val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")
+-------+---+------+
|   name|age|   sex|
+-------+---+------+
|zhiling| 40|female|
|     ww| 30|  null|
|   lisi| 20|  male|
+-------+---+------+
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-16 19:32:46

results matching ""

    No results matching ""