微服务——服务异步通讯(MQ高级)

news2024/11/27 1:39:52

MQ的一些常见问题

消息可靠性

生产者消息确认

返回ack,怎么感觉这么像某个tcp的3次握手。

使用资料提供的案例工程.

在图形化界面创建一个simple.queue的队列,虚拟机要和配置文件里面的一样。

 SpringAMQP实现生产者确认

AMQP里面支持多种生产者确认的类型。

simple是同步等待模式,发了消息之后就一直等待结果,可能会导致代码阻塞。

correlated是异步回调模式,像前段的ajax请求的回调函数。

ApplicationContextAware是bean工厂通知。会在Spring容器创建完后来通知并传一个spring容器到下面的方法。然后从中取到rabbitTemplate的bean并设置ReturnCallback。 

ReturnCallback:消息到了交换机,路由时失败了没有到达消息队列

ConfirmCallback:消息连交换机都没到。

这个不像ReturnCallback只能配置一个,这个可以在每次发消息时设置。

这里在发送消息时多了一个correlationData,这是在配置开关选择的confirm类型为correlated。里面封装了消息的唯一id和callback.

callback里面的result是成功的回调函数,ex是失败的回调函数。这里的失败是指回调都没收到。

实现

先是在生产者的配置文件里要加上前面的配置j

编写returnCallback

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果有需要的话,可以重发消息
        });
    }
}

编写ConfirmCallback

这里先要在图形界面手动将交换机和消息队列做绑定 

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        //1.准备消息
        String message = "hello, spring amqp!";
        //2.准备correlationData
        //2.1消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2.2准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            //判断结果
            if(result.isAck()){
                //ACK
                log.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());
            }else{
                //NACK
                log.error("消息投递到交换机失败!消息ID:{}",correlationData.getId());
            }
        }, ex -> {
            //记录日志
            log.error("消息发送失败!",ex);
            //重发消息
        });
        //3.发送消息
        rabbitTemplate.convertAndSend("camq.topic", "simple.test", message,correlationData);
    }

测试得到

成功的测试情况

 

失败的测试情况

投递交换机失败,交换机不存在

投递队列失败,队列不存在

 

消息持久化

这里通过重启rabbitmq容器发现消息都不见了可以确认,rabbitmq和redis一样都是内存运行的。

甚至我手动加上的消息队列和绑定关系都不见了。这里消息队列不见是因为前面创建队列时选择的是Transient,不持久化。系统默认的交换机都还在,是因为durable为true,持久化。

创建队列或交换机的时候可以设置Durability为Durable即可持久化。

在消费者代码中进行交换机和队列的创建,然后可以看见如下持久化的交换机和队列.

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simpleExchange(){
        return new DirectExchange("simple.direct",true,false);
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}

手动发送一条消息进行测试

重启之后消息还是消失了。

要想让消息持久化,需要在发送消息时指定。


    @Test
    public void testDurableMessage(){
        //1.准备消息
        Message message = MessageBuilder.withBody("hello,pop".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .build();
        //2.发送消息
        rabbitTemplate.convertAndSend("simple.queue",message);
    }

 重启之后消息就持久化了。

通常在springamqp中这些都是持久化的。

消费者消息确认

在none模式下,消费者拿到消息都就报异常了,然后消息也没了。

在auto模式下,消费者拿到消息后给mq报了个unack,然后消息会重新投递,消费者继续拿消息,tmd,死循环了。 但是这里消息就不会消失了。


    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1/0);
        log.info("消费者处理消息成功!");
    }

消费失败重试机制

重试次数耗尽之后会将消息丢弃。

消费者失败消息处理策略

 在消费者代码中

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue",true);
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

 重新发送消息进行测试,可以看见重试次数耗尽之后就送到了死信队列了。

在里面将异常的堆栈信息也包含了. 

 

死信交换机

初识死信交换机

区别在于,上一个是消费者失败之后寻找交换机路由到error队列,这个是退回到队列,再指定交换机,最后路由。

TTL

这个的应用场景比如说订单超时未支付然后自动取消。

实现  

          

准备 代码部分

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name = "dl.queue",durable = "true"),
            exchange=@Exchange(name="dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg){
        log.info("接收到 dl.queue的延迟消息:{}",msg);
    }

@Configuration
public class TTLMessageConfig {
    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct");
    }
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }
    @Bean
    public Binding simpleBinging(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

 测试代码

    @Test
    public void testTTLMessage(){
        //1.准备消息
        Message message = MessageBuilder
                .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .build();
        //2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
        //3.记录日志
        log.info("消息成功发送!");
    }

10s之后在消费者那里就可以看见

 

 然后这里会以短的优先,5s后消费者就可以收到消息。

延迟队列

1.重装rabbitmq容器 

这个插件需要找到mq内部的插件文件夹,所以需要在创建容器的时候进行数据卷挂载。

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

 2.安装DelayExchange插件

官方的安装指南地址为:Scheduling Messages with RabbitMQ | RabbitMQ - Blog

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:Community Plugins — RabbitMQ

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为Release v3.8.9 · rabbitmq/rabbitmq-delayed-message-exchange · GitHub这个对应RabbitMQ的3.8.5以上版本。 

查看挂载的数据卷.

docker volume inspect mq-plugins

接下来的看着好麻烦,以后看文档吧.

还真的麻烦的一批,真不想再搞这玩意,文件搞来搞去。

不知道为什么,挂载数据卷时一直报错,不能用自己定义的文件夹来挂载。

 

 

在消费者中如下声明

    @RabbitListener(bindings = @QueueBinding(
            value=@Queue(name = "delay.queue",durable = "true"),
            exchange=@Exchange(name="delay.direct",delayed = "true"),
            key = "delay"
    ))
    public void listenDelayQueue(String msg){
        log.info("接收到 delay.queue的延迟消息:{}",msg);
    }

 在生产者中如下定义

    @Test
    public void testSendDelayMessage(){
        //1.准备消息
        Message message = MessageBuilder
                .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久的
                .setHeader("x-delay",5000)
                .build();
        //2.准备correlationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message,correlationData);
        log.info("发送消息成功");
    }

测试结果如下 成功实现延迟5秒。但是会被报错,理论上说交换机应该立即转发,不会延迟,但是这里的延迟交换机可以帮忙保存消息延迟发送,所以这里才会报错,not_router,消息没有到达队列

 为了解决这个报错,需要修改生产者代码

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            //判断是否是延迟消息
            if (message.getMessageProperties().getReceivedDelay()>0) {
                //是一个延迟消息,忽略错误提示
                return;
            }
            //记录日志
            log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
                    replyCode,replyText,exchange,routingKey,message.toString());
            //如果有需要的话,可以重发消息
        });
    }
}

惰性队列

消息堆积问题

问题解决

消费者中声明两个队列。 

@Configuration
public class LazyConfig {
    @Bean
    public Queue lazyQueue(){
        return QueueBuilder.durable("lazy.queue")
                .lazy()
                .build();
    }
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable("normal.queue")
                .build();
    }
}

 测试,准备两个队列之后分别向两个队列发消息。

    @Test
    public void testLazyMessage(){
        for(int i=0;i<1000000;i++){
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
                    .build();
            //3.发送消息
            rabbitTemplate.convertAndSend("lazy.queue", message);
        }
    }
    @Test
    public void testnormalMessage(){
        for(int i=0;i<1000000;i++){
            //1.准备消息
            Message message = MessageBuilder
                    .withBody("hello,ttl".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //持久的
                    .build();
            //3.发送消息
            rabbitTemplate.convertAndSend("normal.queue", message);
        }
    }

可以看见惰性队列的消息全部到paged out 刷出磁盘了?????、,为什么非惰性队列的也是刷出磁盘了。

 

MQ集群

集群个屁,不搞了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1320726.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库sql语句查询补充

数据库sql语句查询补充 0.前言1.Like谓语2.带有Having当中的分组查询eg. 例题:错题重做: 3.内连接例题 0.前言 数据库期末复习,对自己做错的题进行知识总结和梳理 1.Like谓语 like谓语主要有两个操作 %:百分号,表示任意长度的字符串_:下划线,表示任意单个字符 like谓语的语…

MySQL数据库,触发器、窗口函数、公用表表达式

触发器 触发器是由事件来触发某个操作&#xff08;也包含INSERT、UPDATE、DELECT事件&#xff09;&#xff0c;如果定义了触发程序&#xff0c;当数据库执行这些语句时&#xff0c;就相当于事件发生了&#xff0c;就会自动激发触发器执行相应的操作。 当对数据表中的数据执行…

AX7A200教程(9): ov5640摄像头输出显示720p视频

一&#xff0c;功能框图 ov5640摄像头视频通过ddr3缓存后&#xff0c;最后使用hdmi接口进行输出显示 二&#xff0c;摄像头硬件说明 2.1&#xff0c;像头硬件管脚 如下图所示&#xff0c;一共18个管脚 2.2&#xff0c;摄像头电源初始化时序 因这个ov5640摄像头是买的老摄像…

“去 Android化”为何蔚然成风?

早在2008年时&#xff0c;国内市场诞生了第一批自研手机OS&#xff0c;由于种种缘由铩羽而归&#xff0c;“优化Android ”貌似成为了本土特色。而从2023年下半年开始掀起了一股"去安卓化"的热潮&#xff0c;像华为、小米、vivo等都不约而同的站在了同一战线。 “去…

Kotlin Multiplatform的现状—2023年网络研讨会

Kotlin Multiplatform的现状—2023年网络研讨会 在2023年&#xff0c;Kotlin Multiplatform因其开发、当前状态和未来潜力而受到了相当大的关注。随着越来越多的开发者对采用KMP进行跨平台解决方案表示兴趣&#xff0c;JetBrains在11月下旬推出了一系列网络研讨会作为回应。首…

数字化转型三大证书推荐:TOGAF+ITIL4+DAMA

&#x1f308;数字化转型是企业发展的必经之路。通过数字化的手段&#xff0c;有效提升企业业务开展及内部运营的效率&#xff0c;利于企业的降本增效及流程再造。 目前关于数字化转型的培训学习越来越多&#xff0c;对于推动企业数字化转型起到了重要作用。 数字化转型三大证书…

css+html横向滚动+固定宽

没什么好说的&#xff0c;快上代码&#xff01; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Do…

滴灌广袤农村——建行江门市分行多维施策惠乡村

江门是全省农业大市、海洋大市&#xff0c;县域面积辽阔&#xff0c;约占全市95%&#xff0c;总人口和GDP约占7成左右&#xff0c;为建行江门市分行服务乡村振兴提供“沃土”。建行江门市分行以新金融行动贯彻新发展理念&#xff0c;主动作为&#xff0c;以数字技术赋能乡村振兴…

竞赛保研 python的搜索引擎系统设计与实现

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; python的搜索引擎系统设计与实现 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;5分创新点&#xff1a;3分 该项目较为新颖&#xff…

域名接入CloudFlare

接入Cloudflare分为两步 Cloudflare中注册站点域名DNS修改 整个过程如下 1.) 访问Cloudflare面板&#xff0c;添加站点 2.) 选择免费版 3.) 查看并明确DNS记录&#xff0c;其中服务的解析地址填写自己实际的服务器ip 4.) 去域名管理控制台&#xff0c;移除旧DNS服务器&#…

Spring Cloud:Eureka

目录 一、Eureka介绍 1.Eureka的作用 2.总结 二.搭建Eureka服务端步骤 1.导入maven依赖 2.编写启动类&#xff0c;添加EnableEurekaServer注解 3.添加application.yml文件&#xff0c;编写下面的配置&#xff1a; 三.注册Eureka客户端服务提供者&#xff08;user-servic…

从 0 开始创建 SpringBoot 项目

从 0 开始创建 SpringBoot 项目 从 0 开始创建 SpringBoot 项目环境准备创建项目项目目录结构及说明编写代码参考 从 0 开始创建 SpringBoot 项目 环境准备 操作系统&#xff1a;Windows 10IDE&#xff1a;IntelliJ IDEA 2023.3.1Java 版本&#xff1a;jdk1.8 工具网盘链接&…

Maven下载及安装自用版

Maven下载及安装自用版 可能是Maven用久了。感觉Maven用起来还算顺手&#xff0c;比Gradle要好上手一些。 一、下载 Maven 下载地址 注意下载版本和依赖要求&#xff0c;下载后&#xff0c;解压放在指定的位置;注意安装地址&#xff0c;放在自己规划好的开发环境专用文件夹里…

三菱plc学习入门(一,认识三菱plc)

今天就开始对三菱的plc软件入一个门&#xff0c;希望小编的文章对读者和初学者有所帮助&#xff01;欢迎评论指正&#xff0c;废话不多说&#xff0c;下面开始学习。 目录 plc的型号介绍 M表示什么&#xff1f; T表示什么&#xff1f; R表示什么&#xff1f; 为什么三菱没…

创建自定义 gym env 教程

gym-0.26.1 pygame-2.1.2 自定义环境 GridWolrdEnv 教程参考 官网自定义环境 &#xff0c;我把一些可能有疑惑的地方讲解下。 首先整体文件结构, 这里省略了wrappers gym-examples/main.py # 这个是测试自定义的环境setup.py gym_examples/__init__.pyenvs/__init__.pygri…

机器学习 | SVM支持向量机

欲穷千里目&#xff0c;更上一层楼。 一个空间的混乱在更高维度的空间往往意味着秩序。 Machine-Learning: 《机器学习必修课&#xff1a;经典算法与Python实战》配套代码 - Gitee.com 1、核心思想及原理 针对线性模型中分类两类点的直线如何确定。这是一个ill-posed problem。…

【Docker光速搞定深度学习环境配置!】

你是否还在用压缩包打包你的代码&#xff0c;然后在新的机器重新安装软件&#xff0c;配置你的环境&#xff0c;才能跑起来&#xff1f; 特别有这样的情况&#xff1a;诶&#xff0c;在我电脑跑的好好的&#xff0c;怎么这里这么多问题&#xff1f; 当项目比较简单的时候&am…

接口自动化测试框架【AIM】

最近在做公司项目的自动化接口测试&#xff0c;在现有几个小框架的基础上&#xff0c;反复研究和实践&#xff0c;搭建了新的测试框架。利用业余时间&#xff0c;把框架总结了下来。 AIM框架介绍 AIM&#xff0c;是Automatic Interface Monitoring的简称&#xff0c;即自动化…

Idea代码走查工具FindBus使用以及缺陷分析

1. 简介 Findbugs是一个静态分析工具&#xff0c;它检查类或者jar文件&#xff0c;将字节码与一组缺陷模式进行对比以发现可能的问题。利用这个工具可以在不实际运行程序的情况下对软件进行分析。可以帮助改进代码质量。Findbugs提供了方便操作的可视化界面&#xff0c;同时&a…

[计网00] 计算机网络开篇导论

目录 前言 计算机网络的概念 计算机网络的分层 计算机网络的分类 网络的标准化工作和相关组织 计算机网络的性能指标 前言 计算机网络在我们的日常生活中无处不在 在网络会有各种各样的协议和封装 保证我们的信息完整,无误的在各个客户端之前传输 计算机网络的概念 四…