我们可以在streaming DataFrames/Datasets上应用各种操作.
主要分两种:
- 直接执行 sql
- 特定类型的 api(DSL)
5.1 基本操作
Most of the common operations on DataFrame/Dataset are supported for streaming. 在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上
一会要处理的数据 people.json 内容:
{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
1. 弱类型 api(了解)
package com.atguigu.ss
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/13 2:08 PM
*/
object BasicOperation {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("BasicOperation")
.getOrCreate()
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
val peopleDF: DataFrame = spark.readStream
.schema(peopleSchema)
.json("/Users/lzc/Desktop/data")
val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20") // 弱类型 api
df.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
}
}
2. 强类型 api(了解)
package com.atguigu.ss
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/13 2:08 PM
*/
object BasicOperation2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("BasicOperation")
.getOrCreate()
import spark.implicits._
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
val peopleDF: DataFrame = spark.readStream
.schema(peopleSchema)
.json("/Users/lzc/Desktop/data")
val peopleDS: Dataset[People] = peopleDF.as[People] // 转成 ds
val df: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)
df.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
}
}
case class People(name: String, age: Long, sex: String)
3. 直接执行 sql(重要)
package com.atguigu.ss
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/13 2:08 PM
*/
object BasicOperation3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("BasicOperation")
.getOrCreate()
import spark.implicits._
val peopleSchema: StructType = new StructType()
.add("name", StringType)
.add("age", LongType)
.add("sex", StringType)
val peopleDF: DataFrame = spark.readStream
.schema(peopleSchema)
.json("/Users/lzc/Desktop/data")
peopleDF.createOrReplaceTempView("people") // 创建临时表
val df: DataFrame = spark.sql("select * from people where age > 20")
df.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
}
}