1.3 从 Kafka 读取数据

编写RealTimeApp, 从 kafka 读取数据

1.3.1 bean 类 AdsInfo

用来封装从 Kafka 读取到广告点击信息

package com.atguigu.realtime.bean

import java.sql.Timestamp

case class AdsInfo(ts: Long, 
                   timestamp: Timestamp,
                   dayString: String,
                   hmString: String,
                   area: String,
                   city: String,
                   userId: String,
                   adsId: String)

1.3.2 RealtimeApp

测试是否可以从 kafka 消费到数据:

package com.atguigu.realtime.app

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.realtime.bean.AdsInfo
import org.apache.spark.sql._


/**
  * Author lzc
  * Date 2019-08-17 17:52
  */
object RealtimeApp {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[2]")
            .appName("RealtimeApp")
            .getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        import spark.implicits._
        val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
        // 1. 从 kafka 读取数据, 为了方便后续处理, 封装数据到 AdsInfo 样例类中
        val adsInfoDS: Dataset[AdsInfo] = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
            .option("subscribe", "ads_log")
            .load
            .select("value")
            .as[String]
            .map(v => {
                val split: Array[String] = v.split(",")
                val date: Date = new Date(split(0).toLong)
                AdsInfo(split(0).toLong, new Timestamp(split(0).toLong), dayStringFormatter.format(date), hmStringFormatter.format(date), split(1), split(2), split(3), split(4))
            })
        adsInfoDS.writeStream
            .format("console")
            .outputMode("update")
            .option("truncate", "false")
            .start
            .awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-19 10:42:29

results matching ""

    No results matching ""