66 消息队列

news2024/11/24 16:24:58

66 消息队列

基础概念

参考资料:消息队列MQ快速入门(概念、RPC、MQ实质思路、队列介绍、队列对比、应用场景)

  • 消息队列就是一个使用队列来通信的组件;
  • 为什么需要消息队列?

在实际的商业项目中,它这么做肯定是有道理的。那么没有引入消息队列之前服务存在哪些问题呢?就拿支付服务来说,你提交了一个支付订单后,后台需要进行扣库存、扣款、短信通知等等,你需要等待后台把所有该做的做完了才能知道自己有没有购买成功,用户等等时间过长,后台请求链太多,很多业务不需要马上做完,比如短信通知等等,这些响应速度对于业务来说无关紧要,所以就提出了异步处理。

异步处理就是指我现在不做这个工作,我把这个工作丢给箱子里,有人会来这个箱子里找属于它的工作,我丢完我的工作就做完了,就可以给用户响应了,解耦,异步,提高性能。

应用在:服务解耦、流量控制,有好处也有坏处,坏处就是服务的稳定性降低,人多就不好控制,系统也一样。

消息队列具有两种模型:队列模型和发布/订阅模型。这两个模型简单的来说就是:队列模型即一条消息只能被一个消费者消费、发布订阅模型即一条消息可以被多个消费者消费。

其设计模式就是一发一存一消费,生产者——消费者模型。

五种队列

简单队列

在这里插入图片描述

一言以蔽之:简单队列——一个消息对应一个消费者

工作队列

在这里插入图片描述

一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

竞争消费者模式。

这条消息具体会被哪个消费者消费事先并不知。

如何分发消息使之最大限度的发挥每一个消费者的效率——负载均衡。

发布/订阅模型

在这里插入图片描述

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。

路由模式

在这里插入图片描述

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

也就是让消费者有选择性的接收消息。

主题模式

在这里插入图片描述

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。

交换机

前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。

交换器分为四种,分别是:direct、fanout、topic和 headers。

前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。

①、direct

如果路由键完全匹配的话,消息才会被投放到相应的队列。

在这里插入图片描述

②、fanout

当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

在这里插入图片描述

③、topic

设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

常用6种消息队列介绍和对比

RabbitMQ

在这里插入图片描述

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

在这里插入图片描述

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

ZeroMQ

号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

Kafka
特征
  • 分布式消息发布订阅系统,其分区特性、可复制和可容错都是其不错的特性
  • 快速持久化,可在O(1)的系统开销下进行消息持久化
  • 高吞吐,在一台普通的服务器上就可以达到10W/s的吞吐率
  • 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式、自动实现负载均衡
  • 支持同步和异步复制两种
  • 支持数据批量发送和拉取
  • zero-copy:减少IO操作步骤
  • 数据迁移、扩容对用户透明
  • 无需停机即可扩展机器
  • 其他特性:严格的消息顺序、丰富的消息拉取机制、高效订阅者水平扩展、实时的消息订阅、亿级的消息堆积能力、定期删除机制
优点
  • 客户端语言丰富
  • 性能卓越、单机写入TPS约在百万级/秒,消息大小为10个字节
  • 提供完全分布式架构,并有replica机制,拥有较高的可用性和可靠性,理论上支持消息无限堆积
  • 支持批量操作
  • 消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次
  • 有优秀的第三方Kafka Web管理界面kafka-Manager
  • 在日志领域比较成熟,被多家公司和多个开源项目使用
缺点
  • Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
  • 使用短轮询方式,实时性取决于轮询间隔时间
  • 消费失败不支持重试
  • 支持消息顺序,但是一台代理宕机后,就会产生消息乱序
RocketMQ
特征
  • 具有高性能、高可靠、高实时、分布式特点
  • Producer、Consumer、队列都可以分布式
  • Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合
  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 较少的依赖
  • 可以运行在Java语言所支持的平台之上
优点
  • 单机支持1万以上持久化队列
  • 所有消息都是持久化的,先写入系统PageCache,然后刷盘,可以保证内存和磁盘都有一份数据,访问时,直接从内存取
  • 模型简单,接口易用
  • 性能优越,可以大量堆积消息在broker中
  • 支持多种消费,包括集群消费,广播消费等
  • 各个环节分布式扩展设计
  • 开发都较活跃,版本更新快
缺点
  • 没有web管理界面,提供了一个CLI管理工具来查询、管理和诊断各种问题
  • 没有在MQ核心去实现JMS等接口
选择

在这里插入图片描述

  • ActiveMQ,最早的时候大家都用,但现在用的不是很多了,没经过大规模吞吐量场景的验证,社区不是很活跃,主流上不选择这个
  • RabbitMQ较为成熟一些,在可用性、稳定性、可靠性上,RabbitMQ都要超过kafka,综合性能不错,但是erlang语言阻止了大量java工程师深入研究,且不支持事务,消息吞吐能力有限
  • Kafka的性能是比RabbitMQ要更强的,RabbitMQ在有大量的消息堆积时,性能会下降,而Kafka不会,但是Kafka的设计初衷是处理日志的,可以看做一个日志系统,针对性非常强,没有具备一个成熟MQ应该具备的特性,它还是个孩子啊
  • RocketMQ的思路起源于Kafka,但它对消息的可靠传输及事务性做了优化,适合一些大规模的分布式系统应用,但是生态不够成熟,会有黄掉的风险
  • ZeroMQ只是一个网络编程的Pattern库,将常见的网络请求形式(分组管理、链接管理、发布订阅等)进行模式化、组件化。简单来说就是在socket之上、MQ之下。使用ZeroMQ的话,需要对自己的业务代码进行改造,不利于服务解耦

基于Netty实现huiMQ自定义消息队列

Netty服务端将会作为huiMQ,这里先搭一个Netty服务端与Netty客户端,具体的代码请参考:68 Netty

具体项目代码请查看:点击查看

在这里插入图片描述

生产者

发送消息队列

所有需要发送的消息都会被统一存在一个队列中,然后由一个线程来对这个队列中的消息发送到Netty服务端中。但是这里会存在消息生产者发送到消息队列时失败,从而导致消息丢失,所以为了保证消息不丢失,在该线程发送后,会把这条消息暂时存放在一个Map中,等到消息队列发送确认响应后才会把这条消息消除,如果超过2秒还未回复,就会把这个消息重新放回待发送消息队列中,同时把重传次数加一,如果重传次数大于5次,就会写入日志。

我们待发送消息队列选择BlockingQueue,该队列是线程安全的,阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

    // 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行
    private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);

    // 等待确认的消息Map
    private static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();

    // 等待确认消息超时Map
    private static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();

    // 超时
    private static final Long timeout = 2000L;

    // 最多超时重传次数
    private static final Integer maxRetries = 5;

    private static SocketChannel socketChannel;
发送消息线程
    /***
     * @Description 构建一个线程来一直对队列中的消息发送到Netty服务端
     * @return {@link }
     * @Author yaoHui
     * @Date 2024/10/11
     */
    private void sendMessageByThread(){
        new Thread(() -> {
            while(true){
                try{
                    log.info("sendMessageByThread ready");
                    MessageBase.Message message = blockingQueue.take();
                    log.info("sendMessageByThread working");
                    sendMessage(message);
                } catch (Exception e) {
                    log.error("sendMessageByThread is interrupt 发送消息线程被中断");
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
    }
    
    
    /***
     * @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑
     * @param message
     * @return {@link  }
     * @Author yaoHui
     * @Date 2024/10/11
     */
    private void sendMessage(MessageBase.Message message){
        // 超过最大重传次数
        if(message.getRetryCount() > maxRetries){
            log.error("消息传输失败次数超过5次:" + message.toString());
        }else{
            if (socketChannel.isActive()){
                socketChannel.writeAndFlush(message);
                waitAckMessageMap.put(message.getRequestId(),message);
                messageTimeOutMap.put(message.getRequestId(),LocalDateTime.now().plusSeconds(timeout));
            }else{
                log.info("Netty连接失败,请重试");
                addMessageBlockingQueue(message);
            }
        }

    }
确认消息定时任务
    /***
    * @Description 将超时的消息重新加入队列中重新进行发送
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    private void messageRetryThread(){
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

        Runnable task = () -> {
            log.info("messageRetryThread working");
          for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){
              MessageBase.Message message = entry.getValue();
              String key = entry.getKey();
              LocalDateTime time = messageTimeOutMap.get(key);
              if(time.isBefore(LocalDateTime.now())){
                  MessageBase.Message newMessage = MessageBase.Message.newBuilder()
                          .setRequestId(message.getRequestId())
                          .setCmd(message.getCmd())
                          .setContent(message.getContent())
                          .setRetryCount(message.getRetryCount()+1)
                          .setUrlPath(message.getUrlPath())
                          .build();
                  addMessageBlockingQueue(newMessage);
                  messageTimeOutMap.remove(key);
                  waitAckMessageMap.remove(key);
              }
          }
        };

        // 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);
    }
接收Netty服务端ACK消息
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof MessageBase.Message) {
            MessageBase.Message message = (MessageBase.Message) msg;

            if(message.getCmd() == MessageBase.Message.CommandType.ACK){
                SendMessageThread.getAckAndRemoveMessage(message.getRequestId());
                log.info("收到ACK:" + message.getRequestId());
            }else{
                System.out.println("Received response from server:");
                System.out.println("ID: " + message.getRequestId());
                System.out.println("Content: " + message.getContent());
            }

        } else {
            System.err.println("Received an unknown message type: " + msg.getClass().getName());
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端连接成功");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("客户端发生异常", cause);
        ctx.close();
    }
}
    /***
    * @Description 根据RequestId来消除待确认中的消息
    * @param key
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    public static void getAckAndRemoveMessage(String key){
        messageTimeOutMap.remove(key);
        waitAckMessageMap.remove(key);
    }
全部代码

更详细的代码请查看:点击查看

NettyClient
@Component
@Slf4j
public class NettyClient {

    private static final EventLoopGroup group = new NioEventLoopGroup();

    private static final Integer port = 54021;

    private static final String host = "localhost";

    private static SocketChannel socketChannel;

    private static SendMessageThread sendMessageThread = new SendMessageThread();

    /***
    * @Description 添加消息到阻塞队列中 为消息生产者调用
    * @param message
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    public void addMessageBlockingQueue(MessageBase.Message message){
        if(!socketChannel.isActive()){
            this.start();
        }
        sendMessageThread.addMessageBlockingQueue(message);
    }

    /***
    * @Description 该方法提供获取SocketChannel 暂时无用
    * @return {@link SocketChannel }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    public static SocketChannel getSocketChannel() throws Exception {
        if (!socketChannel.isActive()) {
            return socketChannel;
//            socketChannel = socketChannel1;
        }
        return socketChannel;
    }

    /***
    * @Description 连接断开重新连接
    * @return {@link SocketChannel }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    private static SocketChannel retryConnect(){
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                // 空闲检测
                                .addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲
                                .addLast(new HeartbeatHandler())
                                .addLast(new ProtobufVarint32FrameDecoder())
                                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                .addLast(new ProtobufEncoder())
                                .addLast(new NettyClientHandler())
                        ; // 自定义处理器
                    }
                });
        ChannelFuture future = bootstrap.connect();
        if (future.isSuccess()){
            return (SocketChannel) future.channel();
        }else{
            return null;
        }
    }

    /***
    * @Description Netty客户端启动函数 调用Start可以启动对Netty服务端的连接
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    @PostConstruct
    private void start(){
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                // 空闲检测
                                .addLast(new IdleStateHandler(60, 60, 60)) // 60秒写空闲,30秒读空闲
                                .addLast(new HeartbeatHandler())
                                .addLast(new ProtobufVarint32FrameDecoder())
                                .addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                .addLast(new ProtobufEncoder())
                                .addLast(new NettyClientHandler())
                        ; // 自定义处理器
                    }
                });
        ChannelFuture future = bootstrap.connect();
        //客户端断线重连逻辑
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("连接Netty服务端成功");
            } else {
                log.info("连接失败,进行断线重连");
                future1.channel().eventLoop().schedule(this::start, 10, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();

        sendMessageThread.setSocketChannel(socketChannel);
    }

}

SendMessageThread
package com.fang.screw.client.Thread;

import com.fang.screw.client.component.NettyClient;
import com.fang.screw.client.protocol.MessageBase;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @FileName SendMessageThread
 * @Description
 * @Author yaoHui
 * @date 2024-10-11
 **/
@Slf4j
public class SendMessageThread {

    // 采用阻塞队列保证线程的安全 以便免多线程的情况下导致数据丢失 同时阻塞队列也可以当队列满的时候阻塞线程让其之后再重新添加运行
    private static final BlockingQueue<MessageBase.Message> blockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);

    // 等待确认的消息Map
    private static final ConcurrentHashMap<String, MessageBase.Message> waitAckMessageMap = new ConcurrentHashMap<>();

    // 等待确认消息超时Map
    private static final ConcurrentHashMap<String, LocalDateTime> messageTimeOutMap = new ConcurrentHashMap<>();

    // 超时
    private static final Long timeout = 2000L;

    // 最多超时重传次数
    private static final Integer maxRetries = 5;

    private static SocketChannel socketChannel;

    public SendMessageThread(){
        sendMessageByThread();
        messageRetryThread();
    }

    public SendMessageThread(SocketChannel socketChannel1){
        socketChannel = socketChannel1;
    }

    public void setSocketChannel(SocketChannel socketChannel1){
        socketChannel = socketChannel1;
    }

    public void addMessageBlockingQueue(MessageBase.Message message){
        blockingQueue.add(message);
    }

    /***
     * @Description 构建一个线程来一直对队列中的消息发送到Netty服务端
     * @return {@link }
     * @Author yaoHui
     * @Date 2024/10/11
     */
    private void sendMessageByThread(){
        new Thread(() -> {
            while(true){
                try{
                    log.info("sendMessageByThread ready");
                    MessageBase.Message message = blockingQueue.take();
                    log.info("sendMessageByThread working");
                    sendMessage(message);
                } catch (Exception e) {
                    log.error("sendMessageByThread is interrupt 发送消息线程被中断");
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
    }

    /***
     * @Description 线程调用 让消息发送到Netty服务端的具体实现逻辑
     * @param message
     * @return {@link  }
     * @Author yaoHui
     * @Date 2024/10/11
     */
    private void sendMessage(MessageBase.Message message){
        // 超过最大重传次数
        if(message.getRetryCount() > maxRetries){
            log.error("消息传输失败次数超过5次:" + message.toString());
        }else{
            if (socketChannel.isActive()){
                socketChannel.writeAndFlush(message);
            }else{
                log.info("Netty连接失败,请重试");
//                socketChannel = NettyClient.getSocketChannel();
                addMessageBlockingQueue(message);
            }
        }

    }

    /***
    * @Description 将超时的消息重新加入队列中重新进行发送
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    private void messageRetryThread(){
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);

        Runnable task = () -> {
            log.info("messageRetryThread working");
          for (Map.Entry<String, MessageBase.Message> entry : waitAckMessageMap.entrySet()){
              MessageBase.Message message = entry.getValue();
              String key = entry.getKey();
              LocalDateTime time = messageTimeOutMap.get(key);
              if(time.isBefore(LocalDateTime.now())){
                  MessageBase.Message newMessage = MessageBase.Message.newBuilder()
                          .setRequestId(message.getRequestId())
                          .setCmd(message.getCmd())
                          .setContent(message.getContent())
                          .setRetryCount(message.getRetryCount()+1)
                          .setUrlPath(message.getUrlPath())
                          .build();
                  addMessageBlockingQueue(newMessage);
                  messageTimeOutMap.remove(key);
                  waitAckMessageMap.remove(key);
              }
          }
        };

        // 安排任务在延迟2秒后开始执行,之后每隔3秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(task,2,3,TimeUnit.SECONDS);
    }

    /***
    * @Description 根据RequestId来消除待确认中的消息
    * @param key
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/11
    */
    public static void getAckAndRemoveMessage(String key){
        messageTimeOutMap.remove(key);
        waitAckMessageMap.remove(key);
    }


}

huiMQ

保存消息
    /***
    * @Description 保存消息到消息队列中 并且保存到MySQL数据库持久化 注意这里是一个事务 要么都成功要么都别成功
    * @param message
    * @return {@link boolean }
    * @Author yaoHui
    * @Date 2024/10/12
    */
    @Transactional(rollbackFor = Exception.class)
    public boolean saveMessage(MessageBase.Message message){
        try{
            // 保存到消息队列中
            if(!messageQueueMap.containsKey(message.getChannel())){
                BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
                messageQueueMap.put(message.getChannel(),messageBlockingQueue);
            }
            messageQueueMap.get(message.getChannel()).add(message);

            // 保存到MySQL数据库中
            if(!messageQueueMapper.exists(Wrappers.<MessageQueuePO>lambdaQuery()
                    .eq(MessageQueuePO::getRequestId,message.getRequestId())
                    .eq(MessageQueuePO::getDelFlag,0))){
                MessageQueuePO messageQueuePO = new MessageQueuePO();
                messageQueuePO.setRequestId(message.getRequestId());
                messageQueuePO.setCmd(message.getCmdValue());
                messageQueuePO.setContent(message.getContent());
                messageQueuePO.setUrlPath(message.getUrlPath());
                messageQueueMapper.insert(messageQueuePO);
                log.info("HuiMQ保存消息:" + messageQueuePO.toString());
            }
        }catch (Exception e){
            log.info("保存消息至消息队列错误!");
            e.printStackTrace();
            return false;
        }

        return true;
    }
收到消费端发来请求消息
// 消费端请求发送消息
                // 检查是否有超时的消息 如果有则将其重新放置于待重传的消息队列中
                HuiMessageQueue.checkTimeOutMessage();

                if(HuiMessageQueue.messageQueueMap.containsKey(message.getChannel())){
                    BlockingQueue<MessageBase.Message> queue = HuiMessageQueue.messageQueueMap.get(message.getChannel());
                    while(!queue.isEmpty()){
                        MessageBase.Message sendMessage = queue.take();
                        log.info("HuiMQ向消费者发送消息:" + sendMessage.toString());
                        ctx.writeAndFlush(sendMessage);
                        // 将消费列为带确认消息
                        HuiMessageQueue.waitAckMessageMap.put(sendMessage.getRequestId(),sendMessage);
                        HuiMessageQueue.messageTimeOutMap.put(sendMessage.getRequestId(), LocalDateTime.now().plusSeconds(2L));
                    }
                }
                
                    /***
    * @Description 检查是否有超时没有收到确认消息的消息 将其重新放置在待发送消息队列中
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/13
    */
    public static void checkTimeOutMessage(){
        for(Map.Entry<String,LocalDateTime> mapEntry : messageTimeOutMap.entrySet()){
            LocalDateTime time = mapEntry.getValue();
            if(time.isBefore(LocalDateTime.now())){
                String s = mapEntry.getKey();
                MessageBase.Message message = waitAckMessageMap.get(s);
                waitAckMessageMap.remove(s);
                messageTimeOutMap.remove(s);

                // 保存到消息队列中
                if(!messageQueueMap.containsKey(message.getChannel())){
                    BlockingQueue<MessageBase.Message> messageBlockingQueue = new ArrayBlockingQueue<MessageBase.Message>(1024);
                    messageQueueMap.put(message.getChannel(),messageBlockingQueue);
                }
                messageQueueMap.get(message.getChannel()).add(message);
            }
        }
    }
收到消费端ACK消息
log.info("收到消费端发来的ACK报文:" + message.getRequestId());
HuiMessageQueue.setMessageAck(message.getRequestId());

    /***
    * @Description
    * @param requestId
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/13
    */
    public static void setMessageAck(String requestId){
        messageTimeOutMap.remove(requestId);
        waitAckMessageMap.remove(requestId);
    }

消费者

标记需要监听消息方法
/**
 * @FileName HuiListener
 * @Description HuiMQ监听注解
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HuiListener {
    String queueName();
}
获取所有被HuiListener注解标记的方法

通过BeanPostProcessor来获取所有被指定注解标记的方法,BeanPostProcessor会在每个Bean初始化前后调用,分别为postProcessBeforeInitialization和postProcessAfterInitialization。

这里会将所有被HuiListener标记的方法和Bean注册到huiListenerRegistry中,为了方便之后通过反射的方式来直接运行指定方法。

/**
 * @FileName HuiListenerAnnotationBeanPostProcessor
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Component
public class HuiListenerAnnotationBeanPostProcessor implements BeanPostProcessor, InitializingBean {

    private static final HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();

    public static boolean huiListenerFlag = false;

    /***
    * @Description 在 bean 的初始化方法(如 @PostConstruct 注解的方法或 init-method 指定的方法)之前调用。
    * @param bean
    * @param beanName
    * @return {@link Object }
    * @Author yaoHui
    * @Date 2024/10/12
    */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return BeanPostProcessor.super.postProcessBeforeInitialization(bean, beanName);
    }

    /***
     * @Description 在 bean 的初始化方法之后调用。查看当前的bean是否存在被HuiListener注解过的方法
     * @param bean
     * @param beanName
     * @return {@link Object }
     * @Author yaoHui
     * @Date 2024/10/12
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        Method[] methods = bean.getClass().getMethods();
        for(Method method : methods){
            if(method.isAnnotationPresent(HuiListener.class)){
                processHuiListener(method,bean);
                huiListenerFlag = true;
            }
        }

        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }

    private void processHuiListener(Method method,Object bean){
        HuiListener huiListener = method.getAnnotation(HuiListener.class);
        HuiListenerEndpoint huiListenerEndpoint = new HuiListenerEndpoint();
        huiListenerEndpoint.setBean(bean);
        huiListenerEndpoint.setMethod(method);
        huiListenerRegistry.registerListenerEndpoint(huiListener.queueName(),huiListenerEndpoint);
    }

    @Override
    public void afterPropertiesSet() throws Exception {

    }
}
/**
 * @FileName HuiListenerEndpoint
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Data
public class HuiListenerEndpoint {

    private Method method;

    private Object bean;
}
/**
 * @FileName HuiListenerRegistry
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Slf4j
public class HuiListenerRegistry {

    public static final ConcurrentHashMap<String,HuiListenerEndpoint> huiListenerEndpointConcurrentHashMap = new ConcurrentHashMap<>();

    /***
    * @Description 添加HuiListener监听的方法
    * @param queueName
    * @param huiListenerEndpoint
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/12
    */
    public void registerListenerEndpoint(String queueName,HuiListenerEndpoint huiListenerEndpoint){
        huiListenerEndpointConcurrentHashMap.put(queueName,huiListenerEndpoint);
    }

    /***
    * @Description 消费者处理收到消息的主要逻辑
    * @param message
    * @return {@link boolean }
    * @Author yaoHui
    * @Date 2024/10/12
    */
    public boolean handleMessage(MessageBase.Message message){
        HuiListenerEndpoint huiListenerEndpoint = huiListenerEndpointConcurrentHashMap.get(message.getChannel());
        if(ObjectUtils.isEmpty(huiListenerEndpoint)){
            log.info("消息无对应Channel消费:" + message.getChannel());
            return true;
        }

        Method method = huiListenerEndpoint.getMethod();
        Object bean = huiListenerEndpoint.getBean();

        try{
            Class<?>[] classes = method.getParameterTypes();
            method.invoke(bean,JSON.parseObject(message.getContent(),classes[0]));
        }catch (Exception e){
            log.info("消息消费异常");
            e.printStackTrace();
            return false;
        }

        return true;
    }
}
定期发送请求消息
/**
 * @FileName getMessageThread
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Slf4j
public class GetMessageThread {

    private NettyClient nettyClient;

//    private static HuiListenerRegistry huiListenerRegistry = new HuiListenerRegistry();

    public GetMessageThread(NettyClient nettyClient){
        this.nettyClient = nettyClient;
        regularGetMessage();
    }

    /***
    * @Description 定时发送是否存在消息
    * @return {@link }
    * @Author yaoHui
    * @Date 2024/10/12
    */
    private void regularGetMessage(){
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        Runnable task = () -> {
//            log.info("regularGetMessage is running");
            Set<String> channel = HuiListenerRegistry.huiListenerEndpointConcurrentHashMap.keySet();
            for(String s : channel){
                MessageBase.Message message = MessageBase.Message.newBuilder()
                        .setCmd(MessageBase.Message.CommandType.SEND_MESSAGE)
                        .setChannel(s).build();
                nettyClient.sendMessage(message);
            }

        };

        scheduledExecutorService.scheduleAtFixedRate(task,2,3, TimeUnit.SECONDS);
    }


}

消费端启动定期请求消息

如果这个服务中有被HuiListener注解标记的方法就会启用这个方法,SmartInitializingSingleton会在所有的Bean初始化之后运行。

/**
 * @FileName MyBeanInjector
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Component
@Slf4j
public class MyBeanInjector implements SmartInitializingSingleton {

    private final NettyClient nettyClient;

    public MyBeanInjector(NettyClient nettyClient){
        this.nettyClient = nettyClient;
    }

    @Override
    public void afterSingletonsInstantiated() {
        log.info("GetMessageThread is ready");
        if (HuiListenerAnnotationBeanPostProcessor.huiListenerFlag){
            GetMessageThread getMessageThread = new GetMessageThread(nettyClient);
            log.info("GetMessageThread is running");
        }
    }
}
使用HuiMQ
/**
 * @FileName ReceiveHuiMessage
 * @Description
 * @Author yaoHui
 * @date 2024-10-12
 **/
@Component
@Slf4j
public class ReceiveHuiMessage {

    @HuiListener(queueName = "queue")
    public void noticeUserHaveComment(CommentVO commentVO){
      log.info("Chat模块接收到消息:" + commentVO.toString());
    }

}

进阶

如何保证消息不丢失

对于生产者来说需要做到失败重传,采用确认机制可以有效避免消息丢失;

对于Broker来说,需要控制给生产者确认的时机,在Broker保存消息到MySQL后再进行返回,可以有效避免消息丢失;

对于消费者来说,需要在消息真正消费之后再给Broker进行确认,可以避免消息丢失;

如何保证消息不会被重复消费

对于正常的业务消息在消息队列中是不可避免会存在重复消费问题,所以我们只能在业务层面进行消除该影响。

这种问题就是典型的幂等性问题,即对于同样的一种操作所带来的结果是一致的,比如这种update t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等。

所以我们一般的解决方法是添加版本号version,在执行SQL时判断version是否一致,不一致则不执行。或者记录关键的key,像订单号这种,执行过的就不需要再执行。

如何保证消息的有序性

在这里插入图片描述

如何处理消息堆积

消息队列根本原因是消费者消费跟不上生产者,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。

假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2211808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

shell原理

shell 是个进程 &#xff0c; exe在user/bin/bash [用户名主机名 pwd] snprintf fflush&#xff08;stdout&#xff09;&#xff0c;在没有\n情况下立马输出 strtok 第一个参数null表示传入上个有效参数 命令行中&#xff0c;有些命令必须由子进程执行&#xff0c; 如ls 有些…

【进阶OpenCV】 (11)--DNN板块--实现风格迁移

文章目录 DNN板块一、DNN特点二、DNN函数流程三、实现风格迁移1. 图像预处理2. 加载星空模型3. 输出处理 总结 DNN板块 DNN模块是 OpenCV 中专门用来实现 DNN(Deep Neural Networks,深度神经网络) 模块的相关功能&#xff0c;其作用是载入别的深度学习框架(如 TensorFlow、Caf…

考虑促销因素的医药电商平台需求预测研究

一、考虑促销因素的医药电商平台需求预测研究 一、引言 1. 互联网医疗健康的发展 内容&#xff1a;介绍了在互联网的大背景下&#xff0c;医疗健康行业如何迅速发展&#xff0c;举例了&#xff11;药网和叮当快药等平台提供的服务。重点&#xff1a;互联网医疗用户规模和市场…

《人工智能(AI)和深度学习简史》

人工智能&#xff08;AI&#xff09;和深度学习在过去几十年里有了飞跃式的进步&#xff0c;彻底改变了像计算机视觉、自然语言处理、机器人这些领域。本文会带你快速浏览AI和深度学习发展的关键历史时刻&#xff0c;从最早的神经网络模型&#xff0c;一直到现在的大型语言模型…

【技术支持】家里智能电视不能联网重置小米路由器之路

问题现象 最近家里的路由器出现一点问题&#xff0c;现象是手机和电脑连接wifi后&#xff0c;都可以正常打开网页看视频。 但是小爱同学和小米盒子&#xff0c;都出现网络问题&#xff0c;不能正常播放音乐或者视频。 这是小米盒子的网络问题截图 这是和小米盒子连接的智能电…

骨架提取(持续更新)

一 什么是骨架提取 1.1 简介 骨架提取是图像处理或计算机视觉中的一种技术&#xff0c;用于从二值化图像中提取物体的中心线或轮廓&#xff0c;通常称为“骨架”或“细化图像”。这一技术主要用于简化形状表示&#xff0c;同时保留物体的拓扑结构。 这里我们强调了&#xff…

openpyxl -- Cell

文章目录 CellCell的属性MergedCell 版本&#xff1a;openpyxl - 3.0.10 Cell 创建一个单元格&#xff0c;并存入数据、样式、注释等&#xff1b;openpyxl.cell.cell.Cell;获取cell worksheet_obj[“B3”]&#xff0c;根据coordinate获取cell; 也可直接赋值写入&#xff1b;wo…

查找学位论文的数据库有哪些

学位论文分类&#xff1a;一般分为学士论文、硕士论文、博士论文。下面介绍一下能够轻松获取学位论文全文的数据库及获取数据库资源的途径。 1.知网 中国优秀硕士学位论文全文数据库 &#xff08;中国知网CNKI&#xff09; 中国博士学位论文全文数据库 &#xff08;中国知网…

获取时隔半个钟的三天与el-time-select

摘要&#xff1a; 今天遇到需求是配送时间&#xff0c;时隔半个钟的排线&#xff01;所以需要拼接时间&#xff01;例如2024-10-08 14&#xff1a;30&#xff0c;2024-10-08 15&#xff1a;00&#xff0c;2024-10-08 15&#xff1a;30 <el-form-item label"配送时间&a…

Cyber Weekly #28

赛博新闻 1、特斯拉发布无人驾驶汽车Cybercab和Robovan 本周五&#xff08;10月11日&#xff09;&#xff0c;特斯拉公布两款车型Cybercab和Robovan&#xff0c;以及他们的Robotaxi无人驾驶出租车计划。Cybercab没有方向盘&#xff0c;没有充电孔&#xff0c;也没有脚踏板和后…

动态规划的优化与高级应用

姊妹篇&#xff1a; 动态规划基础与经典问题-CSDN博客 贪心算法&#xff1a;原理、应用与优化_最优解-CSDN博客​​​​​​贪心算法&#xff1a;原理、应用与优化_最优解-CSDN博客 一、动态规划的优化策 动态规划在提高时间效率的同时&#xff0c;往往会占用较多的空间。因…

【电商搜索】现代工业级电商搜索技术-中科大-利用半监督学习改进非点击样本的转化率预测

【电商搜索】现代工业级电商搜索技术-中科大-利用半监督学习改进非点击样本的转化率预测 0. 论文信息 RecSys24: Utilizing Non-click Samples via Semi-supervised Learning for Conversion Rate Prediction inproceedings{huang2024utilizing, title{Utilizing Non-click S…

微生物测序报告中的多样性数据详细解读

随着技术的发展&#xff0c;高通量测序技术已成为研究微生物群落的重要工具。这种技术使得科学家们能够解析巨量微生物DNA序列&#xff0c;从而获得丰富的微生物组数据&#xff0c;包括16S rRNA基因、ITS序列和宏基因组。然而&#xff0c;这些数据只是迈向揭示微生物群落复杂性…

docker启动MySQL容器失败原因排查记录

背景 最近在尝试容器搭建MySQL集群时碰到一个错误&#xff0c;启动MySQL时碰到一个&#xff0c;经过排查解决&#xff0c;在此做一个记录 问题过程 1、启动MySQL容器 $ sudo docker run -d -p 3306:3306 \ > --name mysql \ > -v /opt/mysql/log:/var/log/mysql \ &g…

java项目之大型商场应急预案管理系统(源码+文档)

项目简介 大型商场应急预案管理系统实现了以下功能&#xff1a; 大型商场应急预案管理系统的主要使用者管理员功能有个人中心&#xff0c;员工管理&#xff0c;预案信息管理&#xff0c;预案类型管理&#xff0c;事件类型管理&#xff0c;预案类型统计管理&#xff0c;事件类…

【vue】03-指令补充+样式绑定+计算属性+侦听器

代码获取 知识总结 ⼀、指令补充 1.指令修饰符 1.1 什么是指令修饰符&#xff1f; 所谓指令修饰符就是让指令的 功能更强⼤&#xff0c;书写更便捷 1.2 分类 1.2.1 按键修饰符 keydown.enter&#xff1a;当enter键按下时触发 keyup.enter&#xff1a;当enter键抬起时触…

执行powershell脚本出错:未对文件进行数字签名

解决执行powershell脚本时出错&#xff1a;未对文件 \test.ps1进行数字签名。无法在当前系统上运行该脚本 前言 今天从github上下载了一个PowerShell脚本要在本地运行&#xff0c;运行的时候出现了未对文件进行数字签名的问题&#xff0c;然后在这里记录下怎么解决 解决方法…

选择2024年开发App的理由,费用分析与效益

App开发费用受复杂度、团队、地理位置、平台等因素影响。低代码平台如ZohoCreator提供经济高效开发方案&#xff0c;降低费用并提升灵活性。2024年&#xff0c;企业需考虑这些因素制定长期规划。 调查显示&#xff1a; 企业估算应用开发费用时&#xff0c;常采用以下公式&…

大厂面试真题-组合和聚合的区别是什么

组合和聚合比较类似&#xff0c;二者都表示整体和部分之间的关系。 聚合关系的特点是&#xff1a;整体由部分构成&#xff0c;但是整体和部分之间并不是强依赖的关系&#xff0c;而是弱依 赖的关系&#xff0c;也就是说&#xff0c;即使整体不存在了&#xff0c;部分仍然存在…