6.2.1 file sink

存储输出到目录中 仅仅支持 append 模式

需求: 把单词和单词的反转组成 json 格式写入到目录中.

package com.atguigu.ss

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 7:39 PM
  */
object FileSink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[1]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 10000)
            .load

        val words: DataFrame = lines.as[String].flatMap(line => {
            line.split("\\W+").map(word => {
                (word, word.reverse)
            })
        }).toDF("原单词", "反转单词")

        words.writeStream
            .outputMode("append")
            .format("json") //  // 支持 "orc", "json", "csv"
            .option("path", "./filesink") // 输出目录
            .option("checkpointLocation", "./ck1")  // 必须指定 checkpoint 目录
            .start
            .awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-14 19:55:12

results matching ""

    No results matching ""