66 消息队列
基础概念
参考资料:消息队列MQ快速入门(概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景)
- 消息队列就是一个使用队列来通信的组件;
- 为什么需要消息队列?
在实际的商业项目中,它这么做肯定是有道理的。那么没有引入消息队列之前服务存在哪些问题呢?就拿支付服务来说,你提交了一个支付订单后,后台需要进行扣库存、扣款、短信通知等等,你需要等待后台把所有该做的做完了才能知道自己有没有购买成功,用户等等时间过长,后台请求链太多,很多业务不需要马上做完,比如短信通知等等,这些响应速度对于业务来说无关紧要,所以就提出了异步处理。
异步处理就是指我现在不做这个工作,我把这个工作丢给箱子里,有人会来这个箱子里找属于它的工作,我丢完我的工作就做完了,就可以给用户响应了,解耦,异步,提高性能。
应用在:服务解耦、流量控制,有好处也有坏处,坏处就是服务的稳定性降低,人多就不好控制,系统也一样。
消息队列具有两种模型:队列模型和发布/订阅模型。这两个模型简单的来说就是:队列模型即一条消息只能被一个消费者消费、发布订阅模型即一条消息可以被多个消费者消费。
其设计模式就是一发一存一消费,生产者——消费者模型。
五种队列
简单队列
一言以蔽之:简单队列——一个消息对应一个消费者
工作队列
一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!
竞争消费者模式。
这条消息具体会被哪个消费者消费事先并不知。
如何分发消息使之最大限度的发挥每一个消费者的效率——负载均衡。
发布/订阅模型
一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。
ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
路由模式
生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。
也就是让消费者有选择性的接收消息。
主题模式
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。
符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
交换机
前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。
交换器分为四种,分别是:direct、fanout、topic和 headers。
前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。
①、direct
如果路由键完全匹配的话,消息才会被投放到相应的队列。
②、fanout
当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。
③、topic
设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。
常用6种消息队列介绍和对比
RabbitMQ
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
ZeroMQ
号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
Kafka
特征
- 分布式消息发布订阅系统,其分区特性、可复制和可容错都是其不错的特性
- 快速持久化,可在O(1)的系统开销下进行消息持久化
- 高吞吐,在一台普通的服务器上就可以达到10W/s的吞吐率
- 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式、自动实现负载均衡
- 支持同步和异步复制两种
- 支持数据批量发送和拉取
- zero-copy:减少IO操作步骤
- 数据迁移、扩容对用户透明
- 无需停机即可扩展机器
- 其他特性:严格的消息顺序、丰富的消息拉取机制、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制
优点
- 客户端语言丰富
- 性能卓越、单机写入TPS约在百万级/秒,消息大小为10个字节
- 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积
- 支持批量操作
- 消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次
- 有优秀的第三方Kafka Web管理界面kafka-Manager
- 在日志领域比较成熟,被多家公司和多个开源项目使用
缺点
- Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
- 使用短轮询方式,实时性取决于轮询间隔时间
- 消费失败不支持重试
- 支持消息顺序,但是一台代理宕机后,就会产生消息乱序
RocketMQ
特征
- 具有高性能、高可靠、高实时、分布式特点
- Producer、Consumer、队列都可以分布式
- Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 较少的依赖
- 可以运行在Java语言所支持的平台之上
优点
- 单机支持1万以上持久化队列
- 所有消息都是持久化的,先写入系统PageCache,然后刷盘,可以保证内存和磁盘都有一份数据,访问时,直接从内存取
- 模型简单,接口易用
- 性能优越,可以大量堆积消息在broker中
- 支持多种消费,包括集群消费,广播消费等
- 各个环节分布式扩展设计
- 开发都较活跃,版本更新快
缺点
- 没有web管理界面,提供了一个CLI管理工具来查询、管理和诊断各种问题
- 没有在MQ核心去实现JMS等接口
选择
- ActiveMQ,最早的时候大家都用,但现在用的不是很多了,没经过大规模吞吐量场景的验证,社区不是很活跃,主流上不选择这个
- RabbitMQ较为成熟一些,在可用性、稳定性、可靠性上,RabbitMQ都要超过kafka,综合性能不错,但是erlang语言阻止了大量java工程师深入研究,且不支持事务,消息吞吐能力有限
- Kafka的性能是比RabbitMQ要更强的,RabbitMQ在有大量的消息堆积时,性能会下降,而Kafka不会,但是Kafka的设计初衷是处理日志的,可以看做一个日志系统,针对性非常强,没有具备一个成熟MQ应该具备的特性,它还是个孩子啊
- RocketMQ的思路起源于Kafka,但它对消息的可靠传输及事务性做了优化,适合一些大规模的分布式系统应用,但是生态不够成熟,会有黄掉的风险
- ZeroMQ只是一个网络编程的Pattern库,将常见的网络请求形式(分组管理、链接管理、发布订阅等)进行模式化、组件化。简单来说就是在socket之上、MQ之下。使用ZeroMQ的话,需要对自己的业务代码进行改造,不利于服务解耦
基于Netty实现huiMQ自定义消息队列
Netty服务端将会作为huiMQ,这里先搭一个Netty服务端与Netty客户端,具体的代码请参考:68 Netty
具体项目代码请查看:点击查看
生产者
发送消息队列
所有需要发送的消息都会被统一存在一个队列中,然后由一个线程来对这个队列中的消息发送到Netty服务端中。但是这里会存在消息生产者发送到消息队列时失败,从而导致消息丢失,所以为了保证消息不丢失,在该线程发送后,会把这条消息暂时存放在一个Map中,等到消息队列发送确认响应后才会把这条消息消除,如果超过2秒还未回复,就会把这个消息重新放回待发送消息队列中,同时把重传次数加一,如果重传次数大于5次,就会写入日志。
我们待发送消息队列选择BlockingQueue,该队列是线程安全的,阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
// 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行
private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
// 等待确认的消息Map
private static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();
// 等待确认消息超时Map
private static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();
// 超时
private static final Long timeout = 2000L;
// 最多超时重传次数
private static final Integer maxRetries = 5;
private static SocketChannel socketChannel;
发送消息线程
/***
* @Description 构建一个线程来一直对队列中的消息发送到Netty服务端
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void sendMessageByThread(){
new Thread(() -> {
while(true){
try{
log.info("sendMessageByThread ready");
MessageBase.Message message = blockingQueue.take();
log.info("sendMessageByThread working");
sendMessage(message);
} catch (Exception e) {
log.error("sendMessageByThread is interrupt 发送消息线程被中断");
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
/***
* @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑
* @param message
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void sendMessage(MessageBase.Message message){
// 超过最大重传次数
if(message.getRetryCount() > maxRetries){
log.error("消息传输失败次数超过5次:" + message.toString());
}else{
if (socketChannel.isActive()){
socketChannel.writeAndFlush(message);
waitAckMessageMap.put(message.getRequestId(),message);
messageTimeOutMap.put(message.getRequestId(),LocalDateTime.now().plusSeconds(timeout));
}else{
log.info("Netty连接失败,请重试");
addMessageBlockingQueue(message);
}
}
}
确认消息定时任务
/***
* @Description 将超时的消息重新加入队列中重新进行发送
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void messageRetryThread(){
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
Runnable task = () -> {
log.info("messageRetryThread working");
for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){
MessageBase.Message message = entry.getValue();
String key = entry.getKey();
LocalDateTime time = messageTimeOutMap.get(key);
if(time.isBefore(LocalDateTime.now())){
MessageBase.Message newMessage = MessageBase.Message.newBuilder()
.setRequestId(message.getRequestId())
.setCmd(message.getCmd())
.setContent(message.getContent())
.setRetryCount(message.getRetryCount()+1)
.setUrlPath(message.getUrlPath())
.build();
addMessageBlockingQueue(newMessage);
messageTimeOutMap.remove(key);
waitAckMessageMap.remove(key);
}
}
};
// 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次
scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);
}
接收Netty服务端ACK消息
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MessageBase.Message) {
MessageBase.Message message = (MessageBase.Message) msg;
if(message.getCmd() == MessageBase.Message.CommandType.ACK){
SendMessageThread.getAckAndRemoveMessage(message.getRequestId());
log.info("收到ACK:" + message.getRequestId());
}else{
System.out.println("Received response from server:");
System.out.println("ID: " + message.getRequestId());
System.out.println("Content: " + message.getContent());
}
} else {
System.err.println("Received an unknown message type: " + msg.getClass().getName());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接成功");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端发生异常", cause);
ctx.close();
}
}
/***
* @Description 根据RequestId来消除待确认中的消息
* @param key
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
public static void getAckAndRemoveMessage(String key){
messageTimeOutMap.remove(key);
waitAckMessageMap.remove(key);
}
全部代码
更详细的代码请查看:点击查看
NettyClient
@Component
@Slf4j
public class NettyClient {
private static final EventLoopGroup group = new NioEventLoopGroup();
private static final Integer port = 54021;
private static final String host = "localhost";
private static SocketChannel socketChannel;
private static SendMessageThread sendMessageThread = new SendMessageThread();
/***
* @Description 添加消息到阻塞队列中 为消息生产者调用
* @param message
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
public void addMessageBlockingQueue(MessageBase.Message message){
if(!socketChannel.isActive()){
this.start();
}
sendMessageThread.addMessageBlockingQueue(message);
}
/***
* @Description 该方法提供获取SocketChannel 暂时无用
* @return {@link SocketChannel }
* @Author yaoHui
* @Date 2024/10/11
*/
public static SocketChannel getSocketChannel() throws Exception {
if (!socketChannel.isActive()) {
return socketChannel;
// socketChannel = socketChannel1;
}
return socketChannel;
}
/***
* @Description 连接断开重新连接
* @return {@link SocketChannel }
* @Author yaoHui
* @Date 2024/10/11
*/
private static SocketChannel retryConnect(){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 空闲检测
.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲
.addLast(new HeartbeatHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyClientHandler())
; // 自定义处理器
}
});
ChannelFuture future = bootstrap.connect();
if (future.isSuccess()){
return (SocketChannel) future.channel();
}else{
return null;
}
}
/***
* @Description Netty客户端启动函数 调用Start可以启动对Netty服务端的连接
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
@PostConstruct
private void start(){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// 空闲检测
.addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲
.addLast(new HeartbeatHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new NettyClientHandler())
; // 自定义处理器
}
});
ChannelFuture future = bootstrap.connect();
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(this::start, 10, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
sendMessageThread.setSocketChannel(socketChannel);
}
}
SendMessageThread
package com.fang.screw.client.Thread;
import com.fang.screw.client.component.NettyClient;
import com.fang.screw.client.protocol.MessageBase;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
/**
* @FileName SendMessageThread
* @Description
* @Author yaoHui
* @date 2024-10-11
**/
@Slf4j
public class SendMessageThread {
// 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行
private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
// 等待确认的消息Map
private static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();
// 等待确认消息超时Map
private static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();
// 超时
private static final Long timeout = 2000L;
// 最多超时重传次数
private static final Integer maxRetries = 5;
private static SocketChannel socketChannel;
public SendMessageThread(){
sendMessageByThread();
messageRetryThread();
}
public SendMessageThread(SocketChannel socketChannel1){
socketChannel = socketChannel1;
}
public void setSocketChannel(SocketChannel socketChannel1){
socketChannel = socketChannel1;
}
public void addMessageBlockingQueue(MessageBase.Message message){
blockingQueue.add(message);
}
/***
* @Description 构建一个线程来一直对队列中的消息发送到Netty服务端
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void sendMessageByThread(){
new Thread(() -> {
while(true){
try{
log.info("sendMessageByThread ready");
MessageBase.Message message = blockingQueue.take();
log.info("sendMessageByThread working");
sendMessage(message);
} catch (Exception e) {
log.error("sendMessageByThread is interrupt 发送消息线程被中断");
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
/***
* @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑
* @param message
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void sendMessage(MessageBase.Message message){
// 超过最大重传次数
if(message.getRetryCount() > maxRetries){
log.error("消息传输失败次数超过5次:" + message.toString());
}else{
if (socketChannel.isActive()){
socketChannel.writeAndFlush(message);
}else{
log.info("Netty连接失败,请重试");
// socketChannel = NettyClient.getSocketChannel();
addMessageBlockingQueue(message);
}
}
}
/***
* @Description 将超时的消息重新加入队列中重新进行发送
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
private void messageRetryThread(){
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
Runnable task = () -> {
log.info("messageRetryThread working");
for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){
MessageBase.Message message = entry.getValue();
String key = entry.getKey();
LocalDateTime time = messageTimeOutMap.get(key);
if(time.isBefore(LocalDateTime.now())){
MessageBase.Message newMessage = MessageBase.Message.newBuilder()
.setRequestId(message.getRequestId())
.setCmd(message.getCmd())
.setContent(message.getContent())
.setRetryCount(message.getRetryCount()+1)
.setUrlPath(message.getUrlPath())
.build();
addMessageBlockingQueue(newMessage);
messageTimeOutMap.remove(key);
waitAckMessageMap.remove(key);
}
}
};
// 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次
scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);
}
/***
* @Description 根据RequestId来消除待确认中的消息
* @param key
* @return {@link }
* @Author yaoHui
* @Date 2024/10/11
*/
public static void getAckAndRemoveMessage(String key){
messageTimeOutMap.remove(key);
waitAckMessageMap.remove(key);
}
}
huiMQ
保存消息
/***
* @Description 保存消息到消息队列中 并且保存到MySQL数据库持久化 注意这里是一个事务 要么都成功要么都别成功
* @param message
* @return {@link boolean }
* @Author yaoHui
* @Date 2024/10/12
*/
@Transactional(rollbackFor = Exception.class)
public boolean saveMessage(MessageBase.Message message){
try{
// 保存到消息队列中
if(!messageQueueMap.containsKey(message.getChannel())){
BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
messageQueueMap.put(message.getChannel(),messageBlockingQueue);
}
messageQueueMap.get(message.getChannel()).add(message);
// 保存到MySQL数据库中
if(!messageQueueMapper.exists(Wrappers.<MessageQueuePO>lambdaQuery()
.eq(MessageQueuePO::getRequestId,message.getRequestId())
.eq(MessageQueuePO::getDelFlag,0))){
MessageQueuePO messageQueuePO = new MessageQueuePO();
messageQueuePO.setRequestId(message.getRequestId());
messageQueuePO.setCmd(message.getCmdValue());
messageQueuePO.setContent(message.getContent());
messageQueuePO.setUrlPath(message.getUrlPath());
messageQueueMapper.insert(messageQueuePO);
log.info("HuiMQ保存消息:" + messageQueuePO.toString());
}
}catch (Exception e){
log.info("保存消息至消息队列错误!");
e.printStackTrace();
return false;
}
return true;
}
收到消费端发来请求消息
// 消费端请求发送消息
// 检查是否有超时的消息 如果有则将其重新放置于待重传的消息队列中
HuiMessageQueue.checkTimeOutMessage();
if(HuiMessageQueue.messageQueueMap.containsKey(message.getChannel())){
BlockingQueue<MessageBase.Message> queue = HuiMessageQueue.messageQueueMap.get(message.getChannel());
while(!queue.isEmpty()){
MessageBase.Message sendMessage = queue.take();
log.info("HuiMQ向消费者发送消息:" + sendMessage.toString());
ctx.writeAndFlush(sendMessage);
// 将消费列为带确认消息
HuiMessageQueue.waitAckMessageMap.put(sendMessage.getRequestId(),sendMessage);
HuiMessageQueue.messageTimeOutMap.put(sendMessage.getRequestId(), LocalDateTime.now().plusSeconds(2L));
}
}
/***
* @Description 检查是否有超时没有收到确认消息的消息 将其重新放置在待发送消息队列中
* @return {@link }
* @Author yaoHui
* @Date 2024/10/13
*/
public static void checkTimeOutMessage(){
for(Map.Entry<String,LocalDateTime> mapEntry : messageTimeOutMap.entrySet()){
LocalDateTime time = mapEntry.getValue();
if(time.isBefore(LocalDateTime.now())){
String s = mapEntry.getKey();
MessageBase.Message message = waitAckMessageMap.get(s);
waitAckMessageMap.remove(s);
messageTimeOutMap.remove(s);
// 保存到消息队列中
if(!messageQueueMap.containsKey(message.getChannel())){
BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
messageQueueMap.put(message.getChannel(),messageBlockingQueue);
}
messageQueueMap.get(message.getChannel()).add(message);
}
}
}
收到消费端ACK消息
log.info("收到消费端发来的ACK报文:" + message.getRequestId());
HuiMessageQueue.setMessageAck(message.getRequestId());
/***
* @Description
* @param requestId
* @return {@link }
* @Author yaoHui
* @Date 2024/10/13
*/
public static void setMessageAck(String requestId){
messageTimeOutMap.remove(requestId);
waitAckMessageMap.remove(requestId);
}
消费者
标记需要监听消息方法
/**
* @FileName HuiListener
* @Description HuiMQ监听注解
* @Author yaoHui
* @date 2024-10-12
**/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HuiListener {
String queueName();
}
获取所有被HuiListener注解标记的方法
通过BeanPostProcessor来获取所有被指定注解标记的方法,BeanPostProcessor会在每个Bean初始化前后调用,分别为postProcessBeforeInitialization和postProcessAfterInitialization。
这里会将所有被HuiListener标记的方法和Bean注册到huiListenerRegistry中,为了方便之后通过反射的方式来直接运行指定方法。
/**
* @FileName HuiListenerAnnotationBeanPostProcessor
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Component
public class HuiListenerAnnotationBeanPostProcessor implements BeanPostProcessor, InitializingBean {
private static final HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();
public static boolean huiListenerFlag = false;
/***
* @Description 在 bean 的初始化方法(如 @PostConstruct 注解的方法或 init-method 指定的方法)之前调用。
* @param bean
* @param beanName
* @return {@link Object }
* @Author yaoHui
* @Date 2024/10/12
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName);
}
/***
* @Description 在 bean 的初始化方法之后调用。查看当前的bean是否存在被HuiListener注解过的方法
* @param bean
* @param beanName
* @return {@link Object }
* @Author yaoHui
* @Date 2024/10/12
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Method[] methods = bean.getClass().getMethods();
for(Method method : methods){
if(method.isAnnotationPresent(HuiListener.class)){
processHuiListener(method,bean);
huiListenerFlag = true;
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
private void processHuiListener(Method method,Object bean){
HuiListener huiListener = method.getAnnotation(HuiListener.class);
HuiListenerEndpoint huiListenerEndpoint = new HuiListenerEndpoint();
huiListenerEndpoint.setBean(bean);
huiListenerEndpoint.setMethod(method);
huiListenerRegistry.registerListenerEndpoint(huiListener.queueName(),huiListenerEndpoint);
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
/**
* @FileName HuiListenerEndpoint
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Data
public class HuiListenerEndpoint {
private Method method;
private Object bean;
}
/**
* @FileName HuiListenerRegistry
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Slf4j
public class HuiListenerRegistry {
public static final ConcurrentHashMap<String,HuiListenerEndpoint> huiListenerEndpointConcurrentHashMap = new ConcurrentHashMap<>();
/***
* @Description 添加HuiListener监听的方法
* @param queueName
* @param huiListenerEndpoint
* @return {@link }
* @Author yaoHui
* @Date 2024/10/12
*/
public void registerListenerEndpoint(String queueName,HuiListenerEndpoint huiListenerEndpoint){
huiListenerEndpointConcurrentHashMap.put(queueName,huiListenerEndpoint);
}
/***
* @Description 消费者处理收到消息的主要逻辑
* @param message
* @return {@link boolean }
* @Author yaoHui
* @Date 2024/10/12
*/
public boolean handleMessage(MessageBase.Message message){
HuiListenerEndpoint huiListenerEndpoint = huiListenerEndpointConcurrentHashMap.get(message.getChannel());
if(ObjectUtils.isEmpty(huiListenerEndpoint)){
log.info("消息无对应Channel消费:" + message.getChannel());
return true;
}
Method method = huiListenerEndpoint.getMethod();
Object bean = huiListenerEndpoint.getBean();
try{
Class<?>[] classes = method.getParameterTypes();
method.invoke(bean,JSON.parseObject(message.getContent(),classes[0]));
}catch (Exception e){
log.info("消息消费异常");
e.printStackTrace();
return false;
}
return true;
}
}
定期发送请求消息
/**
* @FileName getMessageThread
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Slf4j
public class GetMessageThread {
private NettyClient nettyClient;
// private static HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();
public GetMessageThread(NettyClient nettyClient){
this.nettyClient = nettyClient;
regularGetMessage();
}
/***
* @Description 定时发送是否存在消息
* @return {@link }
* @Author yaoHui
* @Date 2024/10/12
*/
private void regularGetMessage(){
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
Runnable task = () -> {
// log.info("regularGetMessage is running");
Set<String> channel = HuiListenerRegistry.huiListenerEndpointConcurrentHashMap.keySet();
for(String s : channel){
MessageBase.Message message = MessageBase.Message.newBuilder()
.setCmd(MessageBase.Message.CommandType.SEND_MESSAGE)
.setChannel(s).build();
nettyClient.sendMessage(message);
}
};
scheduledExecutorService.scheduleAtFixedRate(task,2,3, TimeUnit.SECONDS);
}
}
消费端启动定期请求消息
如果这个服务中有被HuiListener注解标记的方法就会启用这个方法,SmartInitializingSingleton会在所有的Bean初始化之后运行。
/**
* @FileName MyBeanInjector
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Component
@Slf4j
public class MyBeanInjector implements SmartInitializingSingleton {
private final NettyClient nettyClient;
public MyBeanInjector(NettyClient nettyClient){
this.nettyClient = nettyClient;
}
@Override
public void afterSingletonsInstantiated() {
log.info("GetMessageThread is ready");
if (HuiListenerAnnotationBeanPostProcessor.huiListenerFlag){
GetMessageThread getMessageThread = new GetMessageThread(nettyClient);
log.info("GetMessageThread is running");
}
}
}
使用HuiMQ
/**
* @FileName ReceiveHuiMessage
* @Description
* @Author yaoHui
* @date 2024-10-12
**/
@Component
@Slf4j
public class ReceiveHuiMessage {
@HuiListener(queueName = "queue")
public void noticeUserHaveComment(CommentVO commentVO){
log.info("Chat模块接收到消息:" + commentVO.toString());
}
}
进阶
如何保证消息不丢失
对于生产者来说需要做到失败重传,采用确认机制可以有效避免消息丢失;
对于Broker来说,需要控制给生产者确认的时机,在Broker保存消息到MySQL后再进行返回,可以有效避免消息丢失;
对于消费者来说,需要在消息真正消费之后再给Broker进行确认,可以避免消息丢失;
如何保证消息不会被重复消费
对于正常的业务消息在消息队列中是不可避免会存在重复消费问题,所以我们只能在业务层面进行消除该影响。
这种问题就是典型的幂等性问题,即对于同样的一种操作所带来的结果是一致的,比如这种update t1 set money = 150 where id = 1 and money = 100;
执行多少遍money
都是150,这就叫幂等。
所以我们一般的解决方法是添加版本号version,在执行SQL时判断version是否一致,不一致则不执行。或者记录关键的key,像订单号这种,执行过的就不需要再执行。
如何保证消息的有序性
如何处理消息堆积
消息队列根本原因是消费者消费跟不上生产者,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。
假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic
的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。