5.2 从 Kafka 读取数据

5.2.1 配置文件config.properties

# Kafka配置
kafka.broker.list=hadoop201:9092,hadoop202:9092,hadoop203:9092
kafka.group=bigdata

# Redis配置
redis.host=hadoop201
redis.port=6379

5.2.2 配置文件log4j.properties

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
log4j.appender.atguigu.MyConsole.target=System.err
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n 

log4j.rootLogger=error,atguigu.MyConsole

5.2.3 读取配置文件PropertiesUtil

package com.atguigu.dw.gmall.realtime.util

import java.io.InputStream
import java.util.Properties

/**
  * Author lzc
  * Date 2019/5/15 11:31 AM
  */
object PropertiesUtil {
    private val is: InputStream = ClassLoader.getSystemResourceAsStream("config.properties")
    private val properties = new Properties()
    properties.load(is)
    def getProperty(propertyName: String): String = properties.getProperty(propertyName)

    def main(args: Array[String]): Unit = {
        println(getProperty("kafka.broker.list"))
    }
}

5.2.4 MyKafkaUtil

可以返回一个kafkaStream, 我们使用高级消费者

package com.atguigu.dw.gmall.realtime.util

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Author lzc
  * Date 2019/5/15 11:19 AM
  */
object MyKafkaUtil {

    def getKafkaStream(ssc: StreamingContext, topic: String): InputDStream[(String, String)] = {
        val params: Map[String, String] = Map(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> PropertiesUtil.getProperty("kafka.broker.list"),
            ConsumerConfig.GROUP_ID_CONFIG -> PropertiesUtil.getProperty("kafka.group")
        )
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, Set(topic))
    }
}

5.2.5 样例类:StartupLog

为了数据访问方便, 把用户访问记录封装在样例类中.

由于 ES 要建索引, 为了以后从 ES 中查询数据的方便, 把日期拆成多个需要的格式.

package com.atguigu.dw.gmall.realtime.bean

import java.text.SimpleDateFormat
import java.util.Date

case class StartupLog(mid: String,
                      uid: String,
                      appId: String,
                      area: String,
                      os: String,
                      channel: String,
                      logType: String,
                      version: String,
                      ts: Long) {
    private val date = new Date(ts)
    val logDate: String = new SimpleDateFormat("yyyy-MM-dd").format(date)
    val logHour: String = new SimpleDateFormat("HH").format(date)
    val logHourMinute: String = new SimpleDateFormat("HH:mm").format(date)
}

5.2.6 RedisUtil

package com.atguigu.dw.gmall.realtime.util

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisUtil {
    val host = PropertiesUtil.getProperty("redis.host")
    val port = PropertiesUtil.getProperty("redis.port").toInt
    private val jedisPoolConfig = new JedisPoolConfig()
    jedisPoolConfig.setMaxTotal(100) //最大连接数
    jedisPoolConfig.setMaxIdle(20) //最大空闲
    jedisPoolConfig.setMinIdle(20) //最小空闲
    jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
    jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
    jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
    private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, host, port)

    // 直接得到一个 Redis 的连接
    def getJedisClient: Jedis = {
        jedisPool.getResource
    }
}

5.2.7 DauApp

在保存到 Redis 的时候, key为dau:日期, value 使用 set 结构.

value 存的值为uid

package com.atguigu.dw.gmall.realtime.app

import com.alibaba.fastjson.JSON
import com.atguigu.dw.gmall.common.constant.GmallConstant
import com.atguigu.dw.gmall.realtime.bean.StartupLog
import com.atguigu.dw.gmall.realtime.util.{MyKafkaUtil, RedisUtil}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import redis.clients.util.JedisClusterHashTagUtil

object DauApp {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DauApp")
        val ssc = new StreamingContext(conf, Seconds(5))
        val sourceStream: InputDStream[(String, String)] =
            MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_STARTUP)

        // 1. 调整数据结构
        val starupLogDSteam = sourceStream.map {
            case (_, log) => JSON.parseObject(log, classOf[StartupLog])
        }
        // 2. 保存到 redis
        starupLogDSteam.foreachRDD(rdd => {
            rdd.foreachPartition(it => {
                val client: Jedis = RedisUtil.getJedisClient
                it.foreach(startupLog => {
                    // 存入到 Redis value 类型 set, 存储 uid
                    val key = "dau:" + startupLog.logDate
                    client.sadd(key, startupLog.uid)
                })
                client.close()
            })
        })
        ssc.start()
        ssc.awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-05-15 13:45:40

results matching ""

    No results matching ""