8.2 需求5: 广告黑名单实时统计

8.2.1 需求简介

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉入黑名单。

注意:

  1. 我们把黑名单保存在 redis 中

  2. 对已经进入黑名单的用户将不会再进行检查.

8.2.2 思路

  1. 使用黑名单过滤 DStream, 得到新的不包含黑名单的用户的点击日志记录的新的 DStream

  2. 新的过滤后的 DStream 需要实时监测是否有新的用户需要加入到黑名单.

    • 在 redis中为每个userid_day_adsclick设置一个计数器(使用 hash 来存储数据)

    • 当计数器的值超过预定的阈值的时候, 加入黑名单(使用 set 集合来存储黑名单, 每个元素就是一个用户 id)

8.2.3 具体的业务实现

每个广告点击的数据类型

package com.atgugu.sparkmall.realtime.bean

import java.text.SimpleDateFormat
import java.util.Date

case class AdsInfo(ts: Long, area: String, city: String, userId: String, adsId: String) {
    val dayString: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
    override def toString: String = s"$dayString:$area:$city:$adsId"
}

具体的实现

package com.atgugu.sparkmall.realtime.app

import java.util

import com.atgugu.sparkmall.realtime.bean.AdsInfo
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis

object BlackListApp {
    // redis的一些相关参数
    val redisIp = "hadoop201"
    val redisPort = 6379
    val countKey = "user:day:adsclick"
    val blackListKey = "blacklist"

    // 过滤黑名单中的用户. 返回值是不包含黑名单的用户的广告点击记录的 DStream
    def checkUserFromBlackList(adsClickInfoDStream: DStream[AdsInfo], sc: SparkContext) = {
        adsClickInfoDStream.transform(rdd => {
            val jedis = new Jedis(redisIp, redisPort)
            // 读出来黑名单
            val blackList: util.Set[String] = jedis.smembers(blackListKey)
            val blackListBC: Broadcast[util.Set[String]] = sc.broadcast(blackList)
            jedis.close()
            rdd.filter(adsInfo => {
                !blackListBC.value.contains(adsInfo.userId)
            })
        })
    }


    // 把用户写入到黑名单中
    def checkUserToBlackList(adsInfoDStream: DStream[AdsInfo]) = {
        adsInfoDStream.foreachRDD(rdd => {
            rdd.foreachPartition(adsInfoIt => {
                val jedis = new Jedis(redisIp, redisPort)
                // 建立redis 连接
                adsInfoIt.foreach(adsInfo => {
                    //1. 在 redis 中计数. 使用 set Key: user:day:adsId   field: ...
                    val countField =
                        s"${adsInfo.userId}:${adsInfo.dayString}:${adsInfo.adsId}"
                    jedis.hincrBy(countKey, countField, 1) // 计数 + 1
                    // 2. 达到阈值后写入黑名单
                    if (jedis.hget(countKey, countField).toLong >= 100) { // 如果点击某个广告的数量超过 100
                        jedis.sadd(blackListKey, adsInfo.userId) // 加入黑名单
                    }
                })
                jedis.close()
            })
        })
    }
}

使用连接池

sparkmall-common模块中创建RedisUtil工具类

package com.atguigu.sparkmall.common.util

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisUtil {
    private val config = ConfigurationUtil("config.properties")
    private val host = config.getString("redis.host")
    private val port = config.getInt("redis.port")

    private val jedisPoolConfig = new JedisPoolConfig()
    jedisPoolConfig.setMaxTotal(100) //最大连接数
    jedisPoolConfig.setMaxIdle(20) //最大空闲
    jedisPoolConfig.setMinIdle(20) //最小空闲
    jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
    jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
    jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
    private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, host, port)

    // 直接得到一个 Redis 的连接
    def getJedisClient: Jedis = {
        jedisPool.getResource
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-03-10 17:15:07

results matching ""

    No results matching ""