我们可以在streaming DataFrames/Datasets上应用各种操作.

主要分两种:

  1. 直接执行 sql
  2. 特定类型的 api(DSL)

5.1 基本操作

Most of the common operations on DataFrame/Dataset are supported for streaming. 在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上

一会要处理的数据 people.json 内容:

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}

1. 弱类型 api(了解)

package com.atguigu.ss

import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 2:08 PM
  */
object BasicOperation {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")


        val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20") // 弱类型 api
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
    }
}

2. 强类型 api(了解)

package com.atguigu.ss

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 2:08 PM
  */
object BasicOperation2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
        import spark.implicits._

        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")

        val peopleDS: Dataset[People] = peopleDF.as[People] // 转成 ds


        val df: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()


    }
}

case class People(name: String, age: Long, sex: String)

3. 直接执行 sql(重要)

package com.atguigu.ss

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 2:08 PM
  */
object BasicOperation3 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
        import spark.implicits._

        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: DataFrame = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")

        peopleDF.createOrReplaceTempView("people") // 创建临时表
        val df: DataFrame = spark.sql("select * from people where age > 20")

        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
    }
}

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-13 15:27:45

results matching ""

    No results matching ""