redis stream是redis5引入的特性,一定程度上借鉴了kafka等MQ的设计,部署的redis版本必须 >= 5
本文主要讲的是思路,结合简单的源码分析(放心,无需深入大量源码);讲述在redis stream文档缺乏,网上资料欠缺,gpt回答不上来的情况下,博主是如何用两三天的时间 从没接触过redis stream 到分析完成了redis stream mq功能 。博主始终认为 有明确的思路 才能知道什么代码是正确的 能复制拿来用,什么代码只是单纯跑起来demo的 绝对达不到生产级别。
本文源自csdn博主:孟秋与你 ,博主虽才疏学浅 却也是在资料极少的情况下 ,辛苦研究源码、整理思路 撰写的本文,转载请声明出处。
文章目录
- redisTemplate API的熟悉
- 配置
- redis mq config
- 监听器:
- 定时器
- 优化方向
(本文基于springboot3.3 jdk17 redis6环境,
理论上springboot2 redis5也是通用教程 可能会有细微的api差异 稍微分析一下源码方法都能处理)
redisTemplate API的熟悉
我们在操作redis的时候 通常是使用spring-data-redis提供的redisTemplate或者jedis 本文以redisTemplate为例。
(实际业务场景可能需要考虑用jedis替换 因为mq通常在数据量、并发量都大的场景;redisTemplate的优势在于和springboot的完美集成,且不需要考虑通过连接池来管理线程安全问题)
用过redisTemplate的同学应该都会自己封装一下工具类,因为redisTemplate封装的不够好,不管怎么样 我们都需要先看看这个类
redisTemplate.opsForHash()
,redisTemplate.opsForValue()
各位应该很熟悉了, stream是一种新引入的格式,那么我们直接在RedisTemplate类里面搜stream就好了,正常都会有对应API
(没对应API那就是spring版本太老了 spring那个老版本出来的时候 redis还没出到5 )
搜到了opsForStream()方法 继续查看方法 如下图:
这里说明一下,redis的streamKey就类似mq的topic, group是消费者组,cousumer是消费者,acknowledge即ack 应答机制 告诉mq已经成功消费了,claim是强制将消息转至其它消费者 通常用于消费失败/多次消费失败的场景,pending存放的是未ack的消息 就比如消费某个消息时 出现了异常 没能执行到ack 这些消息就会放在pending list 确保消息不丢失。
通过api,加上我们掌握的mq基本知识,大概就能理解是怎么一回事了。demo搭建不难,但是代码要上生产,我们就必须考虑消息消费失败了怎么办 该如何重试,也就是说重点的api在acknowledge和pending上面。
一个简单的封装
@Component
public class RedisStreamUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 创建消费组
*
* @param key 键名称
* @param group 组名称
* @return {@link String}
*/
public String createGroup(String key, String group) {
return redisTemplate.opsForStream().createGroup(key, group);
}
/**
* 获取消费者信息
*
* @param key 键名称
* @param group 组名称
* @return {@link StreamInfo.XInfoConsumers}
*/
public StreamInfo.XInfoConsumers queryConsumers(String key, String group) {
return redisTemplate.opsForStream().consumers(key, group);
}
/**
* 查询组信息
*
* @param key 键名称
* @return
*/
public StreamInfo.XInfoGroups queryGroups(String key) {
return redisTemplate.opsForStream().groups(key);
}
/**
* 添加Map消息
* @param key
* @param value
*/
public String addMap(String key, Map<String, Object> value) {
return redisTemplate.opsForStream().add(key, value).getValue();
}
/**
* 读取消息
* @param key
*/
public List<MapRecord<String, Object, Object>> read(String key) {
return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}
/**
* 确认消费
* @param key
* @param group
* @param recordIds
*/
public Long ack(String key, String group, String... recordIds) {
return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
}
/**
* 删除消息
* 当一个节点的所有消息都被删除,那么该节点会自动销毁
* @param key
* @param recordIds
*/
public Long del(String key, String... recordIds) {
return redisTemplate.opsForStream().delete(key, recordIds);
}
/**
* 判断是否存在key
* @param key
*/
public boolean hasKey(String key) {
Boolean flag= redisTemplate.hasKey(key);
return flag != null && flag;
}
}
注意:会有循环依赖的问题,如果没有那就是springboot版本太低,低版本默认是开启允许循环依赖的,高版本默认不允许(2.7已经不允许了 具体版本不记得了)
解决方法1: 在yml配置里面允许循环依赖
server:
port: 8586
spring:
application:
name: springboot3-demo
data:
redis:
port: 6579
host: 192.168.1.1
password: xxxxxxx
database: 1
lettuce:
pool:
max-wait: 5000ms
max-active: 1000
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=UTC&rewriteBatchedStatements=true
type: com.alibaba.druid.pool.DruidDataSource
username: root
password: root
# 允许循环依赖
main:
allow-circular-references: true
解决方法2:该工具类不交给spring托管 代码如下图所示
在spring bean初始化的时候 把redisTemplate bean赋值到工具类即可,工具类方法变成静态方法
配置
redis mq config
以下代码展示了如何配置多个生产者,也是这个代码最难写。
package com.qiuhuanhen.springboot3demo.redis.config;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qiuhuanhen.springboot3demo.redis.RedisStreamUtil;
import com.qiuhuanhen.springboot3demo.redis.consumer.RedisConsumer;
import com.qiuhuanhen.springboot3demo.redis.consumer.listener.RedisConsumersListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
public class RedisConfig{
@Autowired
private RedisStreamUtil redisStreamUtil;
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private Map<String, RedisConsumer> redisConsumer;
/**
* redis序列化
*
* @param redisConnectionFactory
* @return {@code RedisTemplate<String, Object>}
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(om,Object.class);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public List<Subscription> subscriptions(RedisConnectionFactory factory) {
List<Subscription> subscriptions = new ArrayList<>();
subscriptions.add( createSubscription(factory, "orderStream", "orderGroup", "orderConsumer"));
subscriptions.add( createSubscription(factory, "productStream", "productGroup", "productConsumer"));
return subscriptions;
}
/**
* @param factory
* @param streamName 类似 topic
* @param groupName 消费组是 Redis Streams 中的一个重要特性,它允许多个消费者协作消费同一个流中的消息。每个消费组可以有多个消费者。
* @param consumerName 这是消费组中的具体消费者名称。每个消费者会从消费组中领取消息进行处理。
* @return
*/
private Subscription createSubscription(RedisConnectionFactory factory, String streamName, String groupName, String consumerName) {
initStream(streamName, groupName);
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 每次从Redis Stream中读取消息的最大条数 (32为rocketmq的pullBatchSize默认数量)
.batchSize(32)
.executor(threadPoolExecutor)
// 轮询拉取消息的时间 (如果流中没有消息,它会等待这么久的时间,然后再次检查。)
.pollTimeout(Duration.ofSeconds(1))
.errorHandler(throwable -> {
log.error("[redis MQ handler exception]", throwable);
throwable.printStackTrace();
})
.build();
var listenerContainer = StreamMessageListenerContainer.create(factory, options);
// 手动ask消息
// Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
// // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
// 自动ask消息
// Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groupName, consumerName),
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
// 手动创建 核心在于 cancelOnError(t -> false) 出现异常不退出
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
.consumer(Consumer.from(groupName, consumerName))
.autoAcknowledge(false)
// 重要!
.cancelOnError(t -> false).build();
Subscription subscription = listenerContainer.register(build, new RedisConsumersListener(redisStreamUtil));
listenerContainer.start();
return subscription;
}
/**
* 初始化流 保证stream流程是正常的
*
* @param key
* @param group
*/
private void initStream(String key, String group) {
boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) {
Map<String, Object> map = new HashMap<>(1);
map.put("field", "value");
//创建主题
String result = redisStreamUtil.addMap(key, map);
//创建消费组
redisStreamUtil.createGroup(key, group);
//将初始化的值删除掉
redisStreamUtil.del(key, result);
log.info("stream:{}-group:{} initialize success", key, group);
}
}
/**
* 校验 Redis 版本号,是否满足最低的版本号要求!
*/
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
String version = MapUtil.getStr(info, "redis_version");
// 校验最低版本必须大于等于 5.0.0
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
if (majorVersion < 5) {
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!", version));
}
}
}
监听器:
(核心是实现StreamListener接口)
@Slf4j
public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> {
private RedisStreamUtil redisStreamUtil;
public RedisConsumersListener(RedisStreamUtil redisStreamUtil) {
this.redisStreamUtil = redisStreamUtil;
}
/**
* 监听器
*
* @param message
*/
@Override
public void onMessage(MapRecord<String, String, String> message) {
// stream的key值
String streamName = message.getStream();
//消息ID
RecordId recordId = message.getId();
//消息内容
Map<String, String> msg = message.getValue();
// do something 处理 (这里一般是通过设计模式获取实现类方法 统一处理)
//逻辑处理完成后,ack消息,删除消息,group为消费组名称
StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamName);
xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamName, xInfoGroup.groupName(), recordId.getValue()));
redisStreamUtil.del(streamName, recordId.getValue());
}
log.info("【streamName】= " + streamName + ",【recordId】= " + recordId + ",【msg】=" + msg);
}
}
感兴趣可以看博主踩到的坑, 看完思路才能自行判断 代码是否能直接复制使用 (个人感觉这才是分析技术最精彩的地方 有正确的思路才能在使用新技术时披荆斩棘); 不感兴趣可以直接跳到下一目录
===== ====== ====== 踩坑start ===== ==== ===== =====
一开始使用的是receive方法 (被注释的部分)
// 手动ask消息
// Subscription subscription = listenerContainer.receive(Consumer.from(groupName, consumerName),
// // 创建一个流的偏移量实例。 含义: 指定从哪个偏移量开始读取消息。ReadOffset.lastConsumed()表示从上次消费的位置开始。
// StreamOffset.create(streamName, ReadOffset.lastConsumed()), redisConsumersListener);
这也是网上使用最多的方法,通过方法名我们可以判断出 receiveAutoAck是会自动ack的,不出异常还好,那如果出现异常呢 如何ack? 所以我们肯定是要手动控制的。
我们可以看看源码 它们的差异:
是的,就是一个是否自动ack的差别。
既然引入了消息队列,那说明数据量是比较大的,所以肯定是需要考虑异常情况下 消息不能丢失的,于是博主在消费时,故意编写了异常模拟不触发ack的场景. 结果发现 一旦消费出现异常 没有ack时,pending list不再新增数据,在项目重启后数据又增加了,但是再次消息异常时 pending list又阻塞了,这种现象非常奇怪! 难道一个消息没ack redis stream就阻塞吗?这显然不符合设计。 反复思考后,看起来像是出现异常后就停止了轮询,这个mq就像极了是一次性的。
但是和轮询相关的 也就一个pollTimeout参数,它能掀起多大的火花呢?
于是继续看代码 配置redis mq时,都有哪些api. 使用receive方法后 返回的是一个Subscription ,Subscription类有isActive()方法 ,于是在定时器中打印subsciption.isActive() 发现它竟然为false
于是我们追踪这个方法:
追踪到了StreamPollTask类
如果是task类 那么应该会有run方法 ,我们直接在里面搜run()
run方法里面主要就这两个方法
this.pollState.running();
this.doLoop();
第一个running方法 一眼看到头,没什么东西 ;我们看doLoop() 这个方法看起来是循环执行,如果任务中断了 说明是loop出问题了
里面有行代码:
if (this.cancelSubscriptionOnError.test(ex)) {
this.cancel();
}
也就是说在cancelSubscriptionOnError.test为true的时候 会取消执行
还记得isActive()方法吗 它正是去判断该状态的.
通过构造方法 可以看出 该参数是StreamMessageListenerContainer.StreamReadRequest streamRequest 传进来的
StreamMessageListenerContainer.StreamReadRequest在我们查看listenerContainer.receive源码时 有过一面之缘:
我们再看看StreamReadRequest.builder出来的StreamReadRequestBuilder类:
至此,分析完成了闭环,因为receive方法创建出来 默认是遇到异常就取消执行 这明显不符合实际使用,这个设计个人感觉非常欠佳。
这便是为什么使用以下代码来创建的原因
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed()))
.consumer(Consumer.from(groupName, consumerName))
.autoAcknowledge(false)
// 重要!
.cancelOnError(t -> false).build();
===== ====== ====== 踩坑end ===== ==== ===== =====
定时器
代码比较乱 注释代码比较多的原因 不是因为瞎写,而是那些api 在实际业务中可能会使用到,所以特地写在下面了
// 定期处理 pending list 中的消息
@Scheduled(cron = "0/20 * * * * ?")
public void processPendingMessages() {
String streamKey = "orderStream"; // Redis Stream 的键
String groupName = "orderGroup"; // 消费者组的名称
String consumerName = "orderConsumer"; // 当前消费者的名称
for (Subscription each : subscription) {
System.out.println(each.isActive());
}
StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
// 获取 pending list 中未确认的消息概要
PendingMessagesSummary pendingSummary = streamOps.pending(streamKey, groupName);
// 所有pending消息的数量
long totalPendingMessages = pendingSummary.getTotalPendingMessages();
if (pendingSummary.getTotalPendingMessages() == 0L) {
return;
}
// 消费组名称
String groupName1 = pendingSummary.getGroupName();
// pending队列中的最小ID
String minMessageId = pendingSummary.minMessageId();
// pending队列中的最大ID
String maxMessageId = pendingSummary.maxMessageId();
if (pendingSummary.getTotalPendingMessages() > 0) {
// 读取消费者pending队列的前10条记录,从ID=0的记录开始,一直到ID最大值
// PendingMessages pendingMessages = streamOps.pending(streamKey, Consumer.from(groupName, consumerName), Range.closed("0", "+"), 10);
// 获取 pending list 中具体的消息
PendingMessages pendingMessages = streamOps.pending(streamKey, groupName, Range.unbounded(), 10000);
int size = pendingMessages.size();
// 获取当前批次的消息
PendingMessage currentBatchMin = pendingMessages.get(0);
PendingMessage currentBatchMax = pendingMessages.get(size-1);
pendingMessages.forEach(
pendingMessage ->
{
// 消息被获取的次数 可以根据次数做不同业务 超过一定次数未消费 考虑是否要ack并del
long deliveryCount = pendingMessage.getTotalDeliveryCount();
// 读取每个未确认的消息
// List<MapRecord<String,String,String>> messages = streamOps.read(
// StreamReadOptions.empty(),
// StreamOffset.create(streamKey,ReadOffset.lastConsumed())
StreamOffset.create(streamKey,ReadOffset.from("0"))
// );
List<MapRecord<String, String, String>> messages = streamOps.range(streamKey, Range.closed(currentBatchMin.getId().toString(), currentBatchMax.getId().toString()), Limit.limit().count(10000));
for (MapRecord<String, String, String> message : messages) {
try {
// 处理消息
processMessage(message);
// 成功处理后确认消息
streamOps.acknowledge(streamKey, groupName, message.getId());
streamOps.delete(streamKey, message.getId());
} catch (Exception e) {
// 处理异常情况
e.printStackTrace();
}
}
}
);
}
}
至于如何触发就比较简单了,往redis添加一个streamKey即可
@GetMapping("/stream")
public String testStream() {
String mystream = "";
for (int i = 0; i < 10; i++) {
Oper oper = new Oper();
oper.setTestId(11111111L);
oper.setTestDesc("订单消息队列");
oper.setVersion(i);
oper.setTestXxx(LocalDateTime.now().toString());
Map<String, Object> map = new HashMap<>();
map.put("oper", oper);
try {
Thread.sleep(10);
mystream = redisStreamUtil.addMap("orderStream", map);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return String.valueOf(mystream);
}
优化方向
-
建立一个消费者抽象类,定义消费方法
-
建议一个降级处理抽象类,定义补偿方法(即消费失败时的处理)
-
定义spring的properties类 把生产者消费者字段写到里面
-
redis需要部署集群,可在博主的主页搜索哨兵,有哨兵架构教程。
-
实际业务中,消费消息很可能是存入数据库,在入库完成之后 redis ack完成之前,如果这一瞬间突然宕机了,而数据量又非常大,可能会导致消费重复的情况,因为没有完成ack 下次还是会把该数据从pending list里面取出来。
解决方案1 :考虑是加redisson锁
解决方案2:数据库存入消息id字段并建立唯一索引
(唯一索引的魅力体现出来了)
至此,一份生产级别的redis stream mq架构成立。