8.2 需求5: 广告黑名单实时统计
8.2.1 需求简介
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉入黑名单。
注意:
我们把黑名单保存在 redis 中
对已经进入黑名单的用户将不会再进行检查.
8.2.2 思路
使用黑名单过滤 DStream, 得到新的不包含黑名单的用户的点击日志记录的新的 DStream
新的过滤后的 DStream 需要实时监测是否有新的用户需要加入到黑名单.
在 redis中为每个
userid_day_adsclick
设置一个计数器(使用 hash 来存储数据)当计数器的值超过预定的阈值的时候, 加入黑名单(使用 set 集合来存储黑名单, 每个元素就是一个用户 id)
8.2.3 具体的业务实现
每个广告点击的数据类型
package com.atgugu.sparkmall.realtime.bean
import java.text.SimpleDateFormat
import java.util.Date
case class AdsInfo(ts: Long, area: String, city: String, userId: String, adsId: String) {
val dayString: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date(ts))
override def toString: String = s"$dayString:$area:$city:$adsId"
}
具体的实现
package com.atgugu.sparkmall.realtime.app
import java.util
import com.atgugu.sparkmall.realtime.bean.AdsInfo
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis
object BlackListApp {
// redis的一些相关参数
val redisIp = "hadoop201"
val redisPort = 6379
val countKey = "user:day:adsclick"
val blackListKey = "blacklist"
// 过滤黑名单中的用户. 返回值是不包含黑名单的用户的广告点击记录的 DStream
def checkUserFromBlackList(adsClickInfoDStream: DStream[AdsInfo], sc: SparkContext) = {
adsClickInfoDStream.transform(rdd => {
val jedis = new Jedis(redisIp, redisPort)
// 读出来黑名单
val blackList: util.Set[String] = jedis.smembers(blackListKey)
val blackListBC: Broadcast[util.Set[String]] = sc.broadcast(blackList)
jedis.close()
rdd.filter(adsInfo => {
!blackListBC.value.contains(adsInfo.userId)
})
})
}
// 把用户写入到黑名单中
def checkUserToBlackList(adsInfoDStream: DStream[AdsInfo]) = {
adsInfoDStream.foreachRDD(rdd => {
rdd.foreachPartition(adsInfoIt => {
val jedis = new Jedis(redisIp, redisPort)
// 建立redis 连接
adsInfoIt.foreach(adsInfo => {
//1. 在 redis 中计数. 使用 set Key: user:day:adsId field: ...
val countField =
s"${adsInfo.userId}:${adsInfo.dayString}:${adsInfo.adsId}"
jedis.hincrBy(countKey, countField, 1) // 计数 + 1
// 2. 达到阈值后写入黑名单
if (jedis.hget(countKey, countField).toLong >= 100) { // 如果点击某个广告的数量超过 100
jedis.sadd(blackListKey, adsInfo.userId) // 加入黑名单
}
})
jedis.close()
})
})
}
}
使用连接池
在sparkmall-common
模块中创建RedisUtil
工具类
package com.atguigu.sparkmall.common.util
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtil {
private val config = ConfigurationUtil("config.properties")
private val host = config.getString("redis.host")
private val port = config.getInt("redis.port")
private val jedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(100) //最大连接数
jedisPoolConfig.setMaxIdle(20) //最大空闲
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, host, port)
// 直接得到一个 Redis 的连接
def getJedisClient: Jedis = {
jedisPool.getResource
}
}