1.3 从 Kafka 读取数据
编写RealTimeApp
, 从 kafka 读取数据
1.3.1 bean 类 AdsInfo
用来封装从 Kafka 读取到广告点击信息
package com.atguigu.realtime.bean
import java.sql.Timestamp
case class AdsInfo(ts: Long,
timestamp: Timestamp,
dayString: String,
hmString: String,
area: String,
city: String,
userId: String,
adsId: String)
1.3.2 RealtimeApp
测试是否可以从 kafka 消费到数据:
package com.atguigu.realtime.app
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.realtime.bean.AdsInfo
import org.apache.spark.sql._
/**
* Author lzc
* Date 2019-08-17 17:52
*/
object RealtimeApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("RealtimeApp")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
// 1. 从 kafka 读取数据, 为了方便后续处理, 封装数据到 AdsInfo 样例类中
val adsInfoDS: Dataset[AdsInfo] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "ads_log")
.load
.select("value")
.as[String]
.map(v => {
val split: Array[String] = v.split(",")
val date: Date = new Date(split(0).toLong)
AdsInfo(split(0).toLong, new Timestamp(split(0).toLong), dayStringFormatter.format(date), hmStringFormatter.format(date), split(1), split(2), split(3), split(4))
})
adsInfoDS.writeStream
.format("console")
.outputMode("update")
.option("truncate", "false")
.start
.awaitTermination()
}
}