第 3 章 广告点击量实时统计

3.1 需求简介

每天每地区每城市每广告的点击流量实时统计

3.2 思路

/*
统计成功之后写入到redis

值的类型使用hash

key                             value

"date:area:city:ads"            field:                          value
                                2019-08-19:华北:北京:5           10000

使用sql 查询比较简单
 */

3.3 具体实现

package com.atguigu.realtime.app

import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.util.RedisUtil
import org.apache.spark.sql.{Dataset, SparkSession}
import redis.clients.jedis.Jedis

/**
  * Author lzc
  * Date 2019-08-19 10:48
  *
  *
  * 每天每地区每城市广告点击量实时统计
  */
object AdsClickCountApp {
    val key: String = "date:area:city:ads"

    def statAdsClickCount(spark: SparkSession, filteredAdsInfoDS: Dataset[AdsInfo]) = {
        spark.sql(
            s"""
               |select
               |    dayString,
               |    area,
               |    city,
               |    adsId,
               |    count(1) count
               |from tb_ads_info
               |group by dayString, area, city, adsId
             """.stripMargin)
            .writeStream
            .outputMode("update")
            .foreachBatch((df, batchId) => { // 使用foreachBatch
                if (df.count() > 0) {
                    df.cache() // 做缓存防止重复调用
                    df.foreachPartition(rowIt => {
                        val client: Jedis = RedisUtil.getJedisClient
                        // 1. 把数据存入到map中, 向redis写入的时候比较方便
                        val fieldValueMap: Map[String, String] = rowIt.map(row => {
                            // 2019-08-19:华北:北京:5
                            val field: String = s"${row.getString(0)}:${row.getString(1)}:${row.getString(2)}:${row.getString(3)}"
                            val value: Long = row.getLong(4)
                            (field, value.toString)
                        }).toMap
                        // 2. 写入到redis
                        // 用于把scala的集合转换成java的集合
                        import scala.collection.JavaConversions._
                        if (fieldValueMap.nonEmpty) client.hmset(key, fieldValueMap)
                        client.close()
                    })

                    df.unpersist() // 释放缓存
                }
            })
            .start
            .awaitTermination

    }
}

/*
统计成功之后写入到redis

值的类型使用hash

key                             value

"date:area:city:ads"            field:                          value
                                2019-08-19:华北:北京:5           10000

使用sql 查询比较简单
 */

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-19 12:22:02

results matching ""

    No results matching ""