6.2.2 kafka sink

将 wordcount 结果写入到 kafka

写入到 kafka 的时候应该包含如下列:

Column Type
key (optional) string or binary
value (required) string or binary
topic (optional) string

注意:

  1. 如果没有添加 topic option 则 topic 列必须有.
  2. kafka sink 三种输出模式都支持

6.2.2.1 以 Streaming 方式输出数据

这种方式使用流的方式源源不断的向 kafka 写入数据.

package com.atguigu.ss

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 7:39 PM
  */
object KafkaSink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[1]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 10000)
            .load

        val words = lines.as[String]
                .flatMap(_.split("\\W+"))
                .groupBy("value")
                .count()
                .map(row => row.getString(0) + "," + row.getLong(1))
                .toDF("value")  // 写入数据时候, 必须有一列 "value"

        words.writeStream
            .outputMode("update")
            .format("kafka")
            .trigger(Trigger.ProcessingTime(0))
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录
            .start
            .awaitTermination()
    }
}

6.2.2.2 以 batch 方式输出数据

这种方式输出离线处理的结果, 将已存在的数据分为若干批次进行处理. 处理完毕后程序退出.

package com.atguigu.ss

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 7:39 PM
  */
object KafkaSink2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[1]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._

        val wordCount: DataFrame = spark.sparkContext.parallelize(Array("hello hello atguigu", "atguigu, hello"))
            .toDF("word")
            .groupBy("word")
            .count()
            .map(row => row.getString(0) + "," + row.getLong(1))
            .toDF("value")  // 写入数据时候, 必须有一列 "value"

        wordCount.write  // batch 方式
            .format("kafka")
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092") // kafka 配置
            .option("topic", "update") // kafka 主题
            .save()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-16 09:00:24

results matching ""

    No results matching ""