6.2.2 kafka sink
将 wordcount 结果写入到 kafka
写入到 kafka 的时候应该包含如下列:
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
topic (optional) | string |
注意:
- 如果没有添加 topic option 则 topic 列必须有.
- kafka sink 三种输出模式都支持
6.2.2.1 以 Streaming 方式输出数据
这种方式使用流的方式源源不断的向 kafka 写入数据.
package com.atguigu.ss
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/14 7:39 PM
*/
object KafkaSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[1]")
.appName("Test")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "localhost")
.option("port", 10000)
.load
val words = lines.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count()
.map(row => row.getString(0) + "," + row.getLong(1))
.toDF("value") // 写入数据时候, 必须有一列 "value"
words.writeStream
.outputMode("update")
.format("kafka")
.trigger(Trigger.ProcessingTime(0))
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092") // kafka 配置
.option("topic", "update") // kafka 主题
.option("checkpointLocation", "./ck1") // 必须指定 checkpoint 目录
.start
.awaitTermination()
}
}
6.2.2.2 以 batch 方式输出数据
这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出.
package com.atguigu.ss
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/14 7:39 PM
*/
object KafkaSink2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[1]")
.appName("Test")
.getOrCreate()
import spark.implicits._
val wordCount: DataFrame = spark.sparkContext.parallelize(Array("hello hello atguigu", "atguigu, hello"))
.toDF("word")
.groupBy("word")
.count()
.map(row => row.getString(0) + "," + row.getLong(1))
.toDF("value") // 写入数据时候, 必须有一列 "value"
wordCount.write // batch 方式
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092") // kafka 配置
.option("topic", "update") // kafka 主题
.save()
}
}