你心心念念的RabbitMQ个人实践来了来了它来了

news2025/1/27 13:01:52

前言

MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。

RabbitMq

案例

Springboot整合RabbitMQ简单案例

基本概念

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • Producer:消息生产者,就是投递消息的程序。
  • Consumer:消息消费者,就是接受消息的程序。

发布消息到RabbitMQ需要经过两步:

  1. producer → exchange
  2. exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

工作流程

了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

  1. 创建Exchange
  2. 创建Queue
  3. 将Queue绑定进Exchange中(此处会设置routing key)
  4. 生产者发布消息
  5. 消费者订阅消息

交换机(Exchange)

交换机可以绑定队列,绑定时可以给队列指定路由(Routing key)和参数(Arguments)

所有的消息发送都是经过交换机转发到队列的,而不是直接到队列中

交换机类型:

  • direct
  • 根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)
  • fanout
  • 路由无效,只要和该交换机绑定的队列,都能接收到消息
  • topic
  • 允许路由使用*和#来进行模糊匹配
  • *表示一个单词
  • 表示任意数量(零个或多个)单词
  • 例如:如果队列的路由为com.# 那么往交换机发消息是,路由填com.ccc 队列就可以收到消息
  • headers
  • 忽略路由,由参数(Arguments)来确定转发的队列

消息过期时间TTL

有两种方式设置TTL,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL,消息存活时间取两者的最小值。

  1. 创建队列时设置
  2. 是消息的存活时间,不是队列的存活时间,别搞混了。
  3. @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期 return new Queue("queueName",true, false, false, args);}
  4. 发送消息时设置
  5. public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //给消息设置过期时间 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 设置消息5秒过期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);}

死信队列

死信队列也是一个正常队列,只是当绑定了死信队列的队列满足相应条件,就会将满足条件的消息转移到死信队列中。

进入死信队列的条件:

  1. 消息被拒绝
  2. 消息过期(超时)
  3. 队列达到最大长度

死信队列的配置:

  1. 按照正常步骤定义一个队列(交换机、队列、绑定)
  2. 给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数
  3. @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "死信队列交换机名称"); args.put("x-dead-letter-routing-key", "死信队列路由"); return new Queue("queueName",true, false, false, args);}

如何保证MQ消息正确送达与消费

可靠性生产和推送

步骤:

  1. 发送消息前数据库保存MQ消息发送日志
  2. MQ消息发送后使用回调更新日志状态

实现:

上面我们讲了,发布消息到RabbitMQ需要经过两步:

producer → exchange
exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

所以,发布消息的确认也分两个部分,以下是确认步骤:

  1. 修改MQ应答机制(yml)
  2. spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 发送消息确认,producer -> exchange publisher-confirm-type: correlated # 发送消息确认,exchange -> queue publisher-returns: true
  3. 新增mq的回调方法
  4. /** * PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。 * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。 * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。 * PostConstruct在构造函数之后执行,init()方法之前执行。 * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法) */@PostConstructprivate void regCallBack() { // producer -> exchange 成功或失败都会触发此回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 这个id是在消息发送的时候传入的 String id = correlationData.getId(); // 如果ack为true代表消息被mq成功接收 if (!ack) { // 应答失败,修改日志状态 System.out.println("exchange 应答失败,做失败处理!"); } else { // 应答成功,修改日志状态 System.out.println("exchange 成功处理"); } } }); // 这个回调只有exchange -> queue 失败时才会触发 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 发送失败"); } });}
  5. 修改MQ发送消息的方法,增加日志id的传递
  6. String correlationId = "这是日志id";rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消费者需要correlationId才做这个处理 message.getMessageProperties().setCorrelationId(correlationId); return message; }}, new CorrelationData(correlationId));// 如果消费者不需要获取correlationId,则用下面这种即可rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));

可靠性消费

步骤:

  1. 开启手动应答
  2. 监听器增加手动应答逻辑

实现:

  1. 开启手动应答
  2. spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 将自动应答ack模式改成手动应答
  3. acknowledge-mode有三种类型:
  4. nome:不进行ack,rabbitmq默认消费者正确处理所有请求
  5. munual:手动确认
  6. auto:自动确认消息(默认类型)。如果消费者抛出异常,则消息重回队列。
  7. 监听器增加手动应答逻辑
  8. @RabbitListener(queues = {"队列名字"})public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相应处理,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("消息为:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消费成功,进行确认 channel.basicAck(tag, false); } catch (IOException e) { // 消费失败,重发 // requeue代表是否重发,为false则直接将消息丢弃,有死信就进入死信队列 channel.basicNack(tag, false, true); }}

总结

本文介绍了RabbitMQ的一些概念和简单使用,有不少东西其实是没有讲清楚的,比如publisher-confirm-type和acknowledge-mode的几种类型的区别等等。主要是在官方文档找不到相关的细致描述,查文档的能力还是有待提高。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39936.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云原生系列 【基于CCE Kubernetes编排实战二】

✅作者简介&#xff1a; CSDN内容合伙人&#xff0c;全栈领域新星创作者&#xff0c;阿里云专家博主&#xff0c;阿里云问答板块版主&#xff0c;华为云享专家博主&#xff0c;掘金后端评审团成员 &#x1f495;前言&#xff1a; 最近云原生领域热火朝天&#xff0c;那么云原生…

Hystirx限流:信号量隔离和线程池隔离

背景&#xff1a; 最近工作中要处理服务高并发的问题&#xff0c;大流量场景下限流熔断降级可以说是必不可少的&#xff0c;打算对限流做一次改造&#xff0c;所以要先了解一下hytrix相关内容&#xff0c;比如了解一下线程池隔离和信号量隔离的区别。 **信号量&#xff1a;**信…

[网络工程师]-应用层协议-DHCP

BOOTP是最早的主机配置协议&#xff0c;动态主机配置协议&#xff08;Dynamic Host Configuration Protocol&#xff0c;DHCP&#xff09;则是在其基础上进行了改良的协议&#xff0c;是一种用于简化主机IP配置管理的IP管理标准。通过DHCP协议&#xff0c;DHCP服务器为DHCP客户…

集合学习笔记——Collection 全家桶

Collection是我们日常开发中使用频率非常高的集合&#xff0c;它的主要实现有List和Set,区别是List是有序的&#xff0c;元素可以重复;Set是无序的&#xff0c;元素不可以重复&#xff0c;我们简单看下继承关系&#xff1a; List的实现类主要线程不安全的ArrayList和LinkedList…

推挽输出和开漏输出-三极管-mos管

一、推挽输出 1.1推挽输出的概念 推挽&#xff08;push-pull&#xff09;输出是由两个MOS或者三极管组成&#xff0c;两个管子始终保持一个导通&#xff0c;另一个截止的状态。 图1 推挽电路示意图 当输入高电平时&#xff0c;叫做推&#xff1b; 上管Q1导通&#xff0c;下管…

【目标检测】Faster R-CNN论文的讲解

目录&#xff1a;Faster R-CNN论文的讲解一、前言二、回顾Fast R-CNN三、引入Faster R-CNN四、Faster R-CNN的介绍4.1 框架结构4.2 RPN如何产生候选区域的4.3 损失函数4.4 训练候选框提取网络4.5 RPN和Fast R-CNN共享特征的方法4.5.1 交替训练法4.5.2 近似联合训练法一、前言 …

C语言——学生信息管理系统

目录 功能展示 界面展示 所有功能模块&#xff1a; 功能1&#xff1a;菜单模块&#xff08;显示功能菜单&#xff09; 功能2&#xff1a;增加学生信息 功能3&#xff1a;输出学生信息&#xff08;查看所有学习信息&#xff09; 功能4&#xff1a;修改学生信息 功能5&a…

python3-GUI概述及应用

目录一、什么是GUI二、Python GUIPySimpleGUI概述一、PySimpleGUI简介二、PySimpleGUI特征三、输出设备hello,world猜数字一、玩家猜数字二、电脑猜数字21点游戏一、21点游戏简介二、程序代码一、什么是GUI 图形用户界面&#xff08;Graphical User Interface&#xff0c;简称…

十六、CANdelaStudio深入-CDD与CDDT的差异(新建自定义服务)

本专栏将由浅入深的展开诊断实际开发与测试的数据库编辑,包含大量实际开发过程中的步骤、使用技巧与少量对Autosar标准的解读。希望能对大家有所帮助,与大家共同成长,早日成为一名车载诊断、通信全栈工程师。 本文介绍CANdelaStudio的CDD与CDDT的差异与新建自定义服务,欢迎…

数字图像处理(一)——什么是数字图像

一、什么是数字图像处理&#xff1f; 一副图像可以被定义为一个二维函数f(x,y)&#xff0c;其中x和y是空间平面坐标&#xff0c;而对任意一对空间坐标(x,y)处幅值f称为图像在该点的强度或者灰度。当x和y以及灰度值f是有限的离散数值时&#xff0c;我们称该图像为数字图像。像素…

排序算法简述

一、概述 常见的排序算法有冒泡排序、插入排序、选择排序、快速排序、归并排序、桶排序、基数排序&#xff0c;这些排序各自有各自的特点。按照时间时间复杂度可以分为 O(n^2):冒泡、插入、选择排序&#xff1b;O(nlogn):归并、快速排序&#xff1b;O(n):桶排序、计数排序、基…

[附源码]java毕业设计自治小区物业设备维护管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[附源码]Python计算机毕业设计房地产销售系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

SSM+Mysql实现的共享单车管理系统(功能包含分角色,登录、用户管理、服务点管理、单车管理、分类管理、学生信息管理、单车租赁、信息统计、系统设置等)

博客目录SSMMysql实现的共享单车管理系统实现功能截图系统功能使用技术代码完整源码SSMMysql实现的共享单车管理系统 本系统一个学校共享单车管理的项目&#xff0c;通过线上系统化的管理&#xff0c;可以为后续的运营以及单车的项目运转提供极大的帮助。 (文末查看完整源码) …

【计算机视觉(CV)】基于图像分类网络VGG实现中草药识别(二)

【计算机视觉&#xff08;CV&#xff09;】基于图像分类网络VGG实现中草药识别&#xff08;二&#xff09; 作者简介&#xff1a;在校大学生一枚&#xff0c;华为云享专家&#xff0c;阿里云专家博主&#xff0c;腾云先锋&#xff08;TDP&#xff09;成员&#xff0c;云曦智划项…

Graph (discrete mathematics)

In mathematics, and more specifically in graph theory, a graph is a structure amounting to a set of objects in which some pairs of the objects are in some sense “related”. The objects correspond to mathematical abstractions called vertices (also called n…

餐厅食材采购信息管理系统的设计与实现

摘 要 网络的广泛应用给生活带来了十分的便利。所以把餐厅食材采购信息管理与现在网络相结合&#xff0c;利用JSP技术建设餐厅食材采购信息管理系统&#xff0c;实现餐厅食材采购的信息化。则对于进一步提高餐厅食材采购信息管理发展&#xff0c;丰富餐厅食材采购信息管理经验…

SpringBoot SpringBoot 原理篇 3 核心原理 3.5 启动流程【4】【5】【6】

SpringBoot 【黑马程序员SpringBoot2全套视频教程&#xff0c;springboot零基础到项目实战&#xff08;spring boot2完整版&#xff09;】 SpringBoot 原理篇 文章目录SpringBootSpringBoot 原理篇3 核心原理3.5 启动流程【4】【5】【6】3.5.1 看源码咯3.5.2 总结3 核心原理 …

剑指 Offer 10- I. 斐波那契数列

一、题目描述 写一个函数&#xff0c;输入 n &#xff0c;求斐波那契&#xff08;Fibonacci&#xff09;数列的第 n 项&#xff08;即 F(N)&#xff09;。斐波那契数列的定义如下&#xff1a; F(0) 0, F(1) 1 F(N) F(N - 1) F(N - 2), 其中 N > 1. 斐波那契数列由 0 和…

leetcode:6248. 统计中位数为 K 的子数组【问题转化 + 排序二分】

目录题目截图题目分析ac code总结题目截图 题目分析 找到k的位置然后一步步往左走&#xff0c;一步步往右走统计左边和右边的比当前k小的和比k大的lst [[small, big]]&#xff0c;分为left和right两部分可以先一侧的单独看small和big&#xff0c;找到big - small 0或者1的即…