文章目录
- 问题背景
- 前言
- 布隆过滤器原理
- 使用场景
- 基础中间件搭建
- 如何实现布隆过滤器
- 引入依赖
- 注入RedisTemplate
- 布隆过滤器核心代码
- Redis操作布隆过滤器
- 验证
- 总结
问题背景
研究布隆过滤器的实现方式以及使用场景
前言
- 本篇的代码都是参考SpringBoot+Redis布隆过滤器防恶意流量击穿缓存的正确姿势,可以先看看该篇文章。
- 本篇的行文思路分别由以下几个模块构成:布隆过滤器原理、使用场景、基础中间件搭建、如何实现布隆过滤器
- 阅读本篇前,需要知道布隆过滤器的原理、简单的Docker知识,阅读起来能更加高效。
布隆过滤器原理
存储原理:有一个value,一个数组array,value通过k个hash函数得到k个值,这k个值映射到数组array上并将数组对应位置的值设置为1。
判断原理:一个value通过k个hash函数得到的值,如果在array数组上的位置不全是1,则该value不存在。
总结:布隆过滤器能断定某个value一定不存在,无法断定某个value一定存在。
使用场景
- 防止恶意请求导致缓存穿透。(缓存雪崩:一堆key同时过期;缓存击穿:热点key过期;缓存穿透:缓存和数据库都没有该key)
- 在大批量数据场景下做内容去重。比如爬虫的url去重,亿级量账号去重,垃圾邮箱过滤等等。
基础中间件搭建
本文采用Redis实现,需要搭建Redis,由于是用来做实验的,不需要纠结怎么安装Redis才算正确,直接用Docker快速搭建一个含有布隆过滤器模块的Redis。代码见如下:
在此前需要先安装Docker,网上很多教程,此处不做赘述
# 开启docker服务
systemctl enable docker
# 直接拉取整合了bloomfilter插件的Redis镜像
docker pull redislabs/rebloom
# 启动
docker run -p 6379:6379 -d --name redisbloom redislabs/rebloom
# 进入容器
docker exec -it redis-redisbloom bash
# 进入命令行测试
redis-cli
# 添加一个过滤器与记录
BF.ADD newFilter foo
# 判断记录是否存在
BF.EXISTS newFilter foo
搭建完后,可以采用RedisInsight可视化软件查看Redis的一些情况,如下所示:
如何实现布隆过滤器
核心代码是如何使用redis存储值、判断值是否存在。本小节先给出配置类的代码、再给出布隆过滤器核心算法的代码(3要素,hash函数的数量,bit数组的大小,误判率)、redis操作布隆过滤器的代码。
下面的代码是防止恶意请求的一个使用例子,用于判断请求是否存在于
布隆过滤器中,不存在则可以直接return,不继续浪费服务器资源。
引入依赖
引入Redis、guava依赖,redis版本号跟着springboot版本号走
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>${spring-boot.version}</version>
</dependency>
注入RedisTemplate
package com.ganzalang.gmall.redis.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisStandaloneConfig {
/**
* retemplate相关配置
*
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(om);
// 值采用json序列化
template.setValueSerializer(jacksonSeial);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();
return template;
}
/**
* 对hash类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}
/**
* 对redis字符串类型数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}
/**
* 对链表类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}
/**
* 对无序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}
/**
* 对有序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}
布隆过滤器核心代码
package com.ganzalang.gmall.redis.bloomfilter.util;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
public class BloomFilterHelper<T> {
private int numHashFunctions;
private int bitSize;
private Funnel<T> funnel;
public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {
Preconditions.checkArgument(funnel != null, "funnel不能为空");
this.funnel = funnel;
this.bitSize = optimalNumOfBits(expectedInsertions, fpp);
this.numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
}
public int[] murmurHashOffset(T value) {
int[] offset = new int[numHashFunctions];
long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= numHashFunctions; i++) {
int nextHash = hash1 + i * hash2;
if (nextHash < 0) {
nextHash = ~nextHash;
}
offset[i - 1] = nextHash % bitSize;
}
return offset;
}
/**
* 计算bit数组的大小
*
* @param n
* @param p
* @return
*/
private int optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
}
return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
/**
* 计算Hash方法执行次数
*
* @param n
* @param m
* @return
*/
private int optimalNumOfHashFunctions(long n, long m) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
}
Redis操作布隆过滤器
package com.ganzalang.gmall.redis.bloomfilter.util;
import com.google.common.base.Preconditions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
public class RedisUtil {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 默认过期时长,单位:秒
*/
public static final long DEFAULT_EXPIRE = 60 * 60 * 24;
/**
* 不设置过期时长
*/
public static final long NOT_EXPIRE = -1;
public boolean existsKey(String key) {
return redisTemplate.hasKey(key);
}
/**
* 重名名key,如果newKey已经存在,则newKey的原值被覆盖
*
* @param oldKey
* @param newKey
*/
public void renameKey(String oldKey, String newKey) {
redisTemplate.rename(oldKey, newKey);
}
/**
* newKey不存在时才重命名
*
* @param oldKey
* @param newKey
* @return 修改成功返回true
*/
public boolean renameKeyNotExist(String oldKey, String newKey) {
return redisTemplate.renameIfAbsent(oldKey, newKey);
}
/**
* 删除key
*
* @param key
*/
public void deleteKey(String key) {
redisTemplate.delete(key);
}
/**
* 删除多个key
*
* @param keys
*/
public void deleteKey(String... keys) {
Set<String> kSet = Stream.of(keys).map(k -> k).collect(Collectors.toSet());
redisTemplate.delete(kSet);
}
/**
* 删除Key的集合
*
* @param keys
*/
public void deleteKey(Collection<String> keys) {
Set<String> kSet = keys.stream().map(k -> k).collect(Collectors.toSet());
redisTemplate.delete(kSet);
}
/**
* 设置key的生命周期
*
* @param key
* @param time
* @param timeUnit
*/
public void expireKey(String key, long time, TimeUnit timeUnit) {
redisTemplate.expire(key, time, timeUnit);
}
/**
* 指定key在指定的日期过期
*
* @param key
* @param date
*/
public void expireKeyAt(String key, Date date) {
redisTemplate.expireAt(key, date);
}
/**
* 查询key的生命周期
*
* @param key
* @param timeUnit
* @return
*/
public long getKeyExpire(String key, TimeUnit timeUnit) {
return redisTemplate.getExpire(key, timeUnit);
}
/**
* 将key设置为永久有效
*
* @param key
*/
public void persistKey(String key) {
redisTemplate.persist(key);
}
/**
* 根据给定的布隆过滤器添加值
*/
public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
int[] offset = bloomFilterHelper.murmurHashOffset(value);
for (int i : offset) {
redisTemplate.opsForValue().setBit(key, i, true);
}
}
/**
* 根据给定的布隆过滤器判断值是否存在
*/
public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
int[] offset = bloomFilterHelper.murmurHashOffset(value);
for (int i : offset) {
if (!redisTemplate.opsForValue().getBit(key, i)) {
return false;
}
}
return true;
}
}
验证
保存请求,如下图所示,发送
POST /user/addEmailToBloom
请求:
判断请求,如下图所示,发送
POST /user/checkEmailInBloom
请求:
总结
- 效果:上面代码中,可以看见底层使用的是redis的bitmap,120w数据存在Redis仅需8M。查询一次仅需几十毫秒。
- 优点:空间效率高;;查询时间快;支持高并发。
- 缺点:存在一定的误判率;不支持元素的删除,如需删除,需要重新构建布隆过滤器。
- 使用场景:在生产环境中,可以先把数据加载进布隆过滤器,然后用来做防穿透或者去重。