综述
我们先看看消息队列的消息存取到底有哪些需求吧:
需求1:消息保序:由于消费者是异步处理消息,但是消费者需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。
需求2:重复消息处理:有时候可能因为网络堵塞出现消息重传的情况,消费者需要保证幂等性。换句话说,对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。
需求3:消息可靠性保证:有时候消费者在处理消息时可能出现因故障或宕机导致消息没有处理完成的情况,因此消息队列需要提供消息可靠性的保证。换句话说,当消费者重启之后,可以重新读取消息并再次进行处理。
消息队列的目的是在分布式的系统间进行通信,通信方式是通过网络传输的方式,通过网络传输就有消息丢失、重复发送消息、不同消息到达顺序混乱的可能。
所以消息队列应用的三大需求是:1、消息保序;2、重复消息处理;3、消息可靠性保证。
对应处理方案是:1、消息数据有序存取;2、消息数据具有全局唯一编号;3、消息数据在未消费完宕机恢复时继续消费,消费完成后被删除。
Redis如何实现MQ的需求?
总体来说,Redis提供了两种解决方案:一是基于List的消息队列解决方案,另一种则是基于Stream的消息队列解决方案。
基于List的MQ解决方案
首先,对于需求1-消息保序,List是按照先进先出的顺序对数据进行存取的,因此使用List作为消息队列保存消息可以满足需求1。具体实现就是LPUSH+RPOP/BRPOP。
潜在风险点:即使没有新消息写入List,消费者也需要不停地调用RPOP命令,这就会导致消费者程序的CPU一直消耗在执行RPOP命令上,带来不必要的性能损失。因此,Redis提供了BRPOP命令,提供阻塞式读取,即在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。
其次,对于需求2-重复消息处理,List本身是不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序在发送消息前自动生成,生成之后,在LPUSH时需要把这个全局唯一ID包含进去。
例如:将一条全局 ID 为 101030001、库存量为 5 的消息插入Redis消息队列
LPUSH mq "101030001:stock:5"
(integer) 1
最后,对于需求3-消息可靠性保证,List本身在读取一条消息后就不会再留存这条消息了,所以为了留存消息,List提供了BRPOPLPUSH命令,即让消费者程序从一个List中读取消息,同时再把这个消息插入到另一个List(可以理解为备份List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
综上所述,基于List可以满足对MQ的三大需求,但是,使用List还是存在一个问题:生产者消息发送很快,但是消费者处理消息的速度很慢,这就可能会导致List中的消息越积越多,给Redis的内存带来很大的压力。在Redis 5.0开始,提供了Stream数据类型,它支持多个消费者程序组成一个消费组,一起分担消息处理压力。
基于Stream的MQ解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
(1)XADD
插入消息,保证有序,可以自动生成全局唯一ID
(2)XREAD
读取消息,可以按ID读取数据
(3)XREADGROUP
按消费组形式读取消息
(4)XPENDING + XACK
XPENDING用来查询每个消费组内所有消费者已读取但尚未确认的消息
XACK用来向消息队列确认消息处理已完成
下面一一讲解各个核心命令操作:
XADD
XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Stream 可以自动为其生成一个全局唯一的 ID。
例如:往一个名为mqstream的队列中插入一条消息,key=repo, value=5,中间的*表示让Redis为插入的数据自动生成一个全局唯一的ID号,这里是1599203861727-0,它的格式是当前服务器时间(精确到毫秒)+序号。
XADD mqstream * repo 5
"1599203861727-0"
XREAD
XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。
例如:从 ID 号为 1599203861727-0 的消息开始,读取后续的所有消息(示例中一共 3 条),下面命令中的block配置项代表XREAD的阻塞时间,即当消息队列中没有消息时,XREAD就会阻塞指定的毫秒数。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
此外,也可以直接在XREAD后跟"$"符号,代表读取最新的消息。下面命令中的 XREAD 执行后,消息队列 mqstream 中一直没有消息,所以,XREAD 在 1 秒后返回了空值(nil)。
XREAD block 1000 streams mqstream $
(nil)
(1.00s)
XREADGROUP
Stream 使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
例如:使用下面的XGROUP命令创建一个消费组group1,这个消费组的消息队列是mqstream。
XGROUP create mqstream group1 0
OK
然后,再用下面的XREADGROUP命令来让消费组group1中的消费者consumer1从mqstream消息队列中读取所有消息。其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
注意事项:当消息队列(上面例子是mqstream)中的消息被消息组中的其中一个消费者读取消费处理之后,就不能再被消费组中的其他消费者读取了,这时其他消费者再执行相同的XREADGROUP命令时,读到的就是空值。
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
此外,使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的,比如我们可以使用count N命令来让各个消费者各自读取N条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
使用扩展:如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理(例如,一个消息是一条从业务系统采集的数据,既要被消费者 1 读取进行实时计算,也要被消费者 2 读取并留存到分布式文件系统 HDFS 中,以便后续进行历史查询)。这时我们可以基于Stream类型创建多个消费者组,实现同时消费生产者的数据。每个消费者组内可以再挂多个消费者分担读取消息进行消费,消费完成后,各自向Redis发送XACK,标记自己的消费组已经消费到了哪个位置,而且消费组之间互不影响。另外,Redis 基于字典和链表数据结构,实现了发布和订阅功能,这个功能可以实现一个消息被多个消费者消费使用,也可以满足上面问题中的场景需求。
XPENDING + XACK
Stream 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Stream “消息已经处理完成”,这时 Stream 就会抢这个消息移除掉。
如果消费者没有成功处理消息,它就不会给 Stream 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如:查看消费组group2中各个消费者已读取但尚未确认的消息数量,其中 XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。
XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
当某个消费者例如group消费了某个消息之后,就可以用XACK命令通知Stream从PENDING LIST中移除这条消息。
XACK mqstream group2 1599274912765-0
(integer) 1
// 下面的命令可以查看某个消费者具体读取了哪些数据
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
List 与 Stream 方案的对比
一图胜千言:
实际使用
// 生产者
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import static com.nageoffer.shortlink.project.common.constant.RedisKeyConstant.SHORT_LINK_STATS_STREAM_TOPIC_KEY;
/**
* 短链接监控状态保存消息队列生产者
*/
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveProducer {
private final StringRedisTemplate stringRedisTemplate;
/**
* 发送延迟消费短链接统计
*/
public void send(Map<String, String> producerMap) {
stringRedisTemplate.opsForStream().add(SHORT_LINK_STATS_STREAM_TOPIC_KEY, producerMap);
}
}
// 消费者
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.Week;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.nageoffer.shortlink.project.common.convention.exception.ServiceException;
import com.nageoffer.shortlink.project.dao.entity.LinkAccessLogsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkAccessStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkBrowserStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkDeviceStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkLocaleStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkNetworkStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkOsStatsDO;
import com.nageoffer.shortlink.project.dao.entity.LinkStatsTodayDO;
import com.nageoffer.shortlink.project.dao.entity.ShortLinkGotoDO;
import com.nageoffer.shortlink.project.dao.mapper.LinkAccessLogsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkAccessStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkBrowserStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkDeviceStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkLocaleStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkNetworkStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkOsStatsMapper;
import com.nageoffer.shortlink.project.dao.mapper.LinkStatsTodayMapper;
import com.nageoffer.shortlink.project.dao.mapper.ShortLinkGotoMapper;
import com.nageoffer.shortlink.project.dao.mapper.ShortLinkMapper;
import com.nageoffer.shortlink.project.dto.biz.ShortLinkStatsRecordDTO;
import com.nageoffer.shortlink.project.mq.idempotent.MessageQueueIdempotentHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static com.nageoffer.shortlink.project.common.constant.RedisKeyConstant.LOCK_GID_UPDATE_KEY;
import static com.nageoffer.shortlink.project.common.constant.ShortLinkConstant.AMAP_REMOTE_URL;
/**
* 短链接监控状态保存消息队列消费者
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private final ShortLinkMapper shortLinkMapper;
private final ShortLinkGotoMapper shortLinkGotoMapper;
private final RedissonClient redissonClient;
private final LinkAccessStatsMapper linkAccessStatsMapper;
private final LinkLocaleStatsMapper linkLocaleStatsMapper;
private final LinkOsStatsMapper linkOsStatsMapper;
private final LinkBrowserStatsMapper linkBrowserStatsMapper;
private final LinkAccessLogsMapper linkAccessLogsMapper;
private final LinkDeviceStatsMapper linkDeviceStatsMapper;
private final LinkNetworkStatsMapper linkNetworkStatsMapper;
private final LinkStatsTodayMapper linkStatsTodayMapper;
private final StringRedisTemplate stringRedisTemplate;
private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
@Value("${short-link.stats.locale.amap-key}")
private String statsLocaleAmapKey;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
// 判断当前的这个消息流程是否执行完成
if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
Map<String, String> producerMap = message.getValue();
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
actualSaveShortLinkStats(statsRecord);
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
} catch (Throwable ex) {
// 某某某情况宕机了
messageQueueIdempotentHandler.delMessageProcessed(id.toString());
log.error("记录短链接监控消费异常", ex);
throw ex;
}
messageQueueIdempotentHandler.setAccomplish(id.toString());
}
public void actualSaveShortLinkStats(ShortLinkStatsRecordDTO statsRecord) {
String fullShortUrl = statsRecord.getFullShortUrl();
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl));
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
LambdaQueryWrapper<ShortLinkGotoDO> queryWrapper = Wrappers.lambdaQuery(ShortLinkGotoDO.class)
.eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl);
ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(queryWrapper);
String gid = shortLinkGotoDO.getGid();
int hour = DateUtil.hour(new Date(), true);
Week week = DateUtil.dayOfWeekEnum(new Date());
int weekValue = week.getIso8601Value();
LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()
.pv(1)
.uv(statsRecord.getUvFirstFlag() ? 1 : 0)
.uip(statsRecord.getUipFirstFlag() ? 1 : 0)
.hour(hour)
.weekday(weekValue)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO);
Map<String, Object> localeParamMap = new HashMap<>();
localeParamMap.put("key", statsLocaleAmapKey);
localeParamMap.put("ip", statsRecord.getRemoteAddr());
String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap);
JSONObject localeResultObj = JSON.parseObject(localeResultStr);
String infoCode = localeResultObj.getString("infocode");
String actualProvince = "未知";
String actualCity = "未知";
if (StrUtil.isNotBlank(infoCode) && StrUtil.equals(infoCode, "10000")) {
String province = localeResultObj.getString("province");
boolean unknownFlag = StrUtil.equals(province, "[]");
LinkLocaleStatsDO linkLocaleStatsDO = LinkLocaleStatsDO.builder()
.province(actualProvince = unknownFlag ? actualProvince : province)
.city(actualCity = unknownFlag ? actualCity : localeResultObj.getString("city"))
.adcode(unknownFlag ? "未知" : localeResultObj.getString("adcode"))
.cnt(1)
.fullShortUrl(fullShortUrl)
.country("中国")
.date(new Date())
.build();
linkLocaleStatsMapper.shortLinkLocaleState(linkLocaleStatsDO);
}
LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()
.os(statsRecord.getOs())
.cnt(1)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkOsStatsMapper.shortLinkOsState(linkOsStatsDO);
LinkBrowserStatsDO linkBrowserStatsDO = LinkBrowserStatsDO.builder()
.browser(statsRecord.getBrowser())
.cnt(1)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkBrowserStatsMapper.shortLinkBrowserState(linkBrowserStatsDO);
LinkDeviceStatsDO linkDeviceStatsDO = LinkDeviceStatsDO.builder()
.device(statsRecord.getDevice())
.cnt(1)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkDeviceStatsMapper.shortLinkDeviceState(linkDeviceStatsDO);
LinkNetworkStatsDO linkNetworkStatsDO = LinkNetworkStatsDO.builder()
.network(statsRecord.getNetwork())
.cnt(1)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkNetworkStatsMapper.shortLinkNetworkState(linkNetworkStatsDO);
LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()
.user(statsRecord.getUv())
.ip(statsRecord.getRemoteAddr())
.browser(statsRecord.getBrowser())
.os(statsRecord.getOs())
.network(statsRecord.getNetwork())
.device(statsRecord.getDevice())
.locale(StrUtil.join("-", "中国", actualProvince, actualCity))
.fullShortUrl(fullShortUrl)
.build();
linkAccessLogsMapper.insert(linkAccessLogsDO);
shortLinkMapper.incrementStats(gid, fullShortUrl, 1, statsRecord.getUvFirstFlag() ? 1 : 0, statsRecord.getUipFirstFlag() ? 1 : 0);
LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()
.todayPv(1)
.todayUv(statsRecord.getUvFirstFlag() ? 1 : 0)
.todayUip(statsRecord.getUipFirstFlag() ? 1 : 0)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO);
} catch (Throwable ex) {
log.error("短链接访问量统计异常", ex);
} finally {
rLock.unlock();
}
}
}