文章目录
- 分布式消息队列认知提升
- 分布式消息队列(MQ)应用场景
- 分布式消息队列(MQ)应用思考点
- MQ本身的一些思考点
- 业界主流的分布式消息队列(MQ)
- MQ的技术选型关注点
- 初识 JMS 与其专业术语
- RabbitMQ四种集群架构
- 主备模式
- 主备模式架构模型
- 主备模式-HaProxy核心配置
- 远程模式(Shovel模式)
- Shovel架构模型
- Shovel集群的拓扑图
- Shovel集群配置步骤
- 具体配置步骤
- 镜像模式(最主流)
- 镜像模式介绍
- Mirror镜像队列的优点
- 镜像模式集群架构图
- Mirror镜像队列的缺点
- 多活模式
- 多活模式介绍
- 多活集群架构
- 多活模式 - Federation插件
- RabbitMQ核心概念
- 初识RabbitMQ
- AMQP协议
- AMQP协议模型
- AMQP核心概念
- RabbitMQ的整体架构是什么样子的?
- RabbitMQ消息是如何流转的?
- RabbitMQ环境搭建
- RabbitMQ安装步骤
- RabbitMQ急速入门HelloWorld
- 急速入门-消息生产与消费
- 入门代码示例
- 生产者
- 消费者
- Rabbitmq核心API-Exchange交换机
- Exchange 交换机介绍
- 声明交换机
- 声明队列
- 通过路由键将交换机和队列绑定
- 通过路由键将交换机和交换机绑定
- 生产者发送消息
- 消费者消费消息
- Direct Exchange
- 代码示例
- 生产者
- 消费者
- Topic Exchange
- 代码示例
- 生产者
- 消费者
- Fanout Exchange
- 代码示例
- 生产者
- 消费者
- Header Exchange
- 代码示例
- 生产者
- 消费者
- 提问:生产者创建队列和交换机还是消费者创建队列和交换机
- 官网解释
- 个人理解翻译
- Rabbitmq核心API-其他关键概念讲解
- Binding - 绑定
- Queue - 消息队列
- Message - 消息
- 什么是MIME类型
- Virtual Host - 虚拟主机
- Rabbitmq高级特性-生产端可靠性投递与消费端幂等性
- RabbitMQ的ACK机制
- 1、什么是消息确认ACK。
- 2、RabbitMQ的ACK的消息确认机制。
- 3、ACK机制的开发注意事项?
- 4、怎么解决ACK的内存泄漏问题?
- 生产端如何保障消息100%的投递成功?
- 什么是生产端的可靠性投递?
- 举例无法满足生产端的可靠性投递的情况
- BAT/TMD互联网大厂的解决方案-消息落库,对消息状态进行打标
- 消费端 - 幂等性保障
- 幂等性概念
- 生产端和消费端重复消费问题
- 在海量订单产生的业务高蜂期,如何避免消息的重复消费问题?
- 业界主流的幂等性操作
- Rabbitmq高级特性-生产端特性_确认机制和返回机制
- Confirm 消息确认机制
- Confirm 消息确认机制介绍
- Confirm 确认消息流程解析
- Confirm 确认消息实现
- 代码示例
- 生产者
- 消费者
- Return 消息返回机制
- Return 消息返回机制
- Return 消息返回机制流程
- 代码示例
- 生产者
- 消费者
- Rabbitmq高级特性-消费端特性_流控服务和ACK重回队列
- 消费端限流
- 消费端限流介绍
- 代码示例
- 生产者
- 消费者
- 消费端ACK与重回队列
- 消费端的手工ACK和NACK
- 手工ACK
- NACK
- 重回队列
- 代码示例
- 生产者
- 消费者
- Rabbitmq高级特性-TTL消息与死信队列详解
- TTL消息/队列
- TTL
- 死信队列
- 死信队列:DLX,Dead-Letter-Exchange
- 消息变成死信有以下几种情况
- 代码示例
- 生产端
- 消费端
- RabbitMQ与SpringBoot2.X整合
- RabbitMQ整合应用 - SpringBoot2.X步骤
- 生产端配置文件
- 生产者
- 测试发送方法
- 消费端配置文件
- 消费者
- @RabbitListener注解使用
- RabbitMQ基础组件封装实战
- RabbitMQ基础组件整体功能概述
- 基础组件实现关键点
- 基础组件实现功能点
- RabbitMQ基础组件模块划分
- 项目架构
- 父目录pom.xml文件
- RabbitMQ基础组件API封装
- 通用API抽象——rabbit-api
- 统一消息类
- 消息类型枚举类
- 建造者模式——用于创建消息类
- 基础异常类封装
- 生产者抽象接口
- 回调函数封装
- 消费者抽象接口
- 自动装配与架构接口定义
- 核心生产者封装(后续肯定作为jar包引入第三方工程)——rabbit-core-producer
- pom.xml文件
- spring的自动装配(\resources\META-INF\spring.factories)
- 发送消息的实际实现类
- 发送不同类型消息的核心类
- 发送消息的异步线程池
- 公共模块——rabbit-common
- pom.xml文件
- 发送迅速异步消息和确认消息实现
- rabbit-core-producer
- 发送消息的实际实现类
- 发送不同类型消息的核心类
- 发送消息的异步线程池
- RabbitTemplate池化封装
- RabbitTemplate池化封装的目的
- rabbit-core-producer
- RabbitTemplate池化类封装
- 序列化与反序列化转换封装
- rabbit-common
- 序列化工厂类
- 序列化与反序列化接口
- 序列化工厂实现类
- 序列化与反序列化实现类
- 消息转换类
- 装饰者/静态代理模式封装消息转换类
- 从架构的视角分析可靠性消息投递(重点)
- 具体架构设计
- 可靠性投递落地-集成数据源
- rabbit-common
- pom.xml文件添加以下依赖
- rabbit-core-producer
- 建库和表语句rabbit-producer-message-schema.sql——日志表(即具体架构设计中的MSG DB)
- 日志表实体
- 日志表Mapper
- 日志表xml(resources\mapping\BrokerMessageMapper.xml)
- jdbc相关配置(resources\rabbit-producer-message.properties)
- 数据源配置加载类
- 执行建表语句rabbit-producer-message-schema.sql类
- mybatis整合spring boot相关类
- 可靠性投递落地-可靠性消息业务实现落地
- rabbit-core-producer
- 日志表业务实体类
- 消息状态枚举
- 消息超时时间常量(即超过这个时间消息状态还没有变更重试)
- 可靠性投递落地-ESJOB定时任务讲解
- ESJOB定时任务
- 官网地址
- 定时任务通用组件封装
- 为什么作定时任务通用组件封装
- rabbit-task
- pom.xml文件
- 自动装配类
- spring的自动装配(\resources\META-INF\spring.factories)
- 读取配置文件加载到class类
- 自定义注解——模块装配,相当于开关
- 自定义注解——esjob配置
- esjob配置注解解析类
- esjob任务类型枚举类
- 定时任务通用组件封装测试——rabbit-esjob-test
- pom.xml文件
- 启动类Application
- 项目配置文件application.properties
- 测试类
- 测试结果
- 可靠性消息重试实现集成定时任务组件
- rabbit-common
- 数据库消息对象转换类
- java对象与json进行转换的通用工具类
- rabbit-core-producer
- pom.xml文件引入esjob组件rabbit-task依赖
- RabbitProducerAutoConfiguration类添加@EnableElasticJob注解
- 定义定时任务将需要重新发送的消息抓取发送到RabbitMQ上
- 可靠性消息最终演示
- rabbit-test
- pom.xml文件
- 项目配置文件application.properties
- 主启动类
- 主配置类
- 测试类
- 批量消息发送封装
- 前景
- rabbit-api
- MessageHolder
- 批量发送消息方法ProducerClient类
- 真正发消息的方法RabbitBrokerImpl类
- 新的异步队列类
- 延迟消息应用与封装
- 实现方式
- 下载地址
- 安装启动
- 测试(控制台创建exchange时多了一种x-delayed-message类型即安装成功)
- 创建一个延迟交换机(必须设置扩展参数x-delayed-type,值必须为topic)
- 创建一个延迟队列
- 将交换机和队列通过路由键绑定
- 测试发送一条延迟消息(扩展参数x-delay是必须的,值单位是毫秒)
- rabbit-common
- 装饰者/静态代理模式封装消息转换类——RabbitMessageConvert设置
- rabbit-test
- 测试类
- 附git项目地址
- 总结与复习
- 个人遇到的面试题
分布式消息队列认知提升
分布式消息队列(MQ)应用场景
服务解耦:服务的拆分和隔离是业务层面的划分,拆分之后如何通信要看服务之间的依赖性是强依赖还是弱依赖,如果是强依赖,可以采用直连方式,比如:同步dubbo、同步http、同步springcloud调用等,如果是弱依赖,可以使用消息队列做消息解耦。需要注意的是弱依赖不代表可以失败,如果说弱依赖不能失败,比如上游服务做消息发送MQ,下游服务一定要收到这条消息并处理,此时就需要上有服务作可靠性的投递,后续会介绍。
削峰填谷:如果在生产环境中,有一些即时性很高、流量很大的应用场景,比如:秒杀、大促等,如何对应用服务作抗压。削峰填谷指把流量的高峰和低谷的速率均衡。简单来说,就是当下游服务处理不过来,可以将消息缓存到某处,然后慢速消费。MQ本质上最早的作用就是如此。
异步化缓冲:有些业务逻辑可以做异步操作,只需要做到最终一致性即可,不需要实时的强一致性。
分布式消息队列(MQ)应用思考点
生产端可靠性投递:如果是金融领域相关的,那这个消息一定不能丢失,此时需要做到生产端100%可靠性投递。比如要求一条消息发出去,跟数据库一定要保障 原子性才行。后续举例说明。
消费端幂等:消费端需要作幂等性的验证,不能让消息消费多次。
MQ本身的一些思考点
-
高可用:如果应用服务其中有一个MQ的节点宕机,怎么保障高可用。
- 解决方式:HA(High Availability),如:Nginx高可用集群架构Keepalived双机主备原理
-
低延迟:巨大流量冲压下,如何保障低延迟。
-
可靠性:消息落到MQ,怎么保证消息不会丢失,如果磁盘发生损坏,使用啥解决手段。
- 解决方式:主流的就是副本解决,如:kafka、es都有分片或者副本的概念
-
消息堆积能力:应对于对应的业务场景,有多大业务以及数据量,大体预估消息能堆积到什么程度,判断不同的MQ是否胜任这样的消息堆积程度,再做技术选型
-
扩展性:判断该MQ是否支持无感知的横向扩容
-
…
业界主流的分布式消息队列(MQ)
- ActiveMQ
- 适用于传统行业、中小型公司,并发和消息承载能力不是很优秀,不适用于高并发、大流量的需求。 而且官方社区现在对ActiveMQ 5.x维护越来越少
- RabbitMQ
- 适用于于高并发、大流量的需求,但是相对其横向扩展能力不是很好。 erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。
- RocketMQ
- 接口简单易用,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都是ok的,还可以支撑大规模的topic数量,支持复杂MQ业务场景,很大的优势在于,源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控。但是接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。
- Kafka
- 对消息可靠性要求不是那么高,可以使用kafka,kafka可以在很廉价的服务器上有着很高的性能和吞吐量的表现。
MQ的技术选型关注点
- 各个MQ的性能、优缺点、相应的业务场景
- 集群架构模式,分布式、可扩展、高可用、可维护性
- 综合成本问题,集群规模,人员成本
- 未来的方向、规划、思考
初识 JMS 与其专业术语
JMS(Java Message Service)规范,也就是Java消息服务,它定义了Java中访问消息中间件的接口的规范。在这里注意哦,JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为 “JMS Provider”,目前知名的开源 MOM (Message Oriented Middleware,也就是消息中间件)系统包括Apache的ActiveMQ、RocketMQ、Kafka,以及RabbitMQ,可以说他们都 “基本遵循” 或 “参考” JMS规范,都有自己的特点和优势。
- 专业术语
- JMS(Java Message Service):实现JMS 接口的消息中间件;
- Provider(MessageProvider):消息的生产者;
- Consumer(MessageConsumer):消息的消费者;
- PTP(Point to Point):即点对点的消息模型,这也是非常经典的模型;
- Pub / Sub(Publish/Subscribe):,即发布/订阅的消息模型;
- Queue:队列目标,也就是我们常说的消息队列,一般都是会真正的进行物理存储;
- Topic:主题目标;
- ConnectionFactory:连接工厂,JMS 用它创建连接;
- Connection:JMS 客户端到JMS Provider 的连接;
- Destination:消息的目的地;
- Session:会话,一个发送或接收消息的线程(这里Session可以类比Mybatis的Session);
- JMS 消息格式定义:
- StreamMessage 原始值的数据流
- MapMessage 一套名称/值对
- TextMessage 一个字符串对象
- BytesMessage 一个未解释字节的数据流
- ObjectMessage 一个序列化的Java对象
RabbitMQ四种集群架构
- 主备模式:warren(兔子窝),一个主/备方案(主节点如果宕机,从节点提供服务,和ActiveMQ利用Zookeeper做主/备一样,而RabbitMQ利用HaProxy做主/备)
- 远程模式:早期RabbitMQ版本提供的多活存储,主要用于**远距离通信和复制,可以实现双活的一种模式,简称Shovel模式。**虽然架构简单,但是配置复杂,可靠性有待提高。后续考虑用多活模式代替远程模式。所谓Shovel模式就是我们把消息进行不同数据中心的复制工作,可以跨地域的让两个mq集群互联。 目的是当上游消息量过大可以堆积到另一个mq集消费处理群,并让两个mq集群互联。
- 镜像模式:业界最为广泛RabbitMQ集群架构模型,能够保证消息可靠性
- 多活模式:与远程模式类似,就是作异地双活或者数据的转储功能
主备模式
主备模式架构模型
主备模式是指,主节点负责提供读写服务,备份节点不提供读写,只负责提供备份服务,当主节点宕机时,备份节点会自动切换为主节点提供读写服务。这里的Consumer不仅仅是指消费者,可以理解为需求方,它通过HaProxy默认路由到主节点,然后主节点提供服务,当主节点宕机,下次路由由于HaProxy配置了一些规则会路由到备份节点,备份节点会升级为主节点,当主节点恢复,加入集群,就会成为新的主节点的备份节点。
主备模式-HaProxy核心配置
listen rabbitmq_cluster #主备模式集群名
bind 0.0.0.0:5672 #绑定的端口,5672
mode tcp #配置TCP模式
balance roundrobin #简单的轮询
server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2 #主节点
server bhz77 192.168.11.77:5672 backup check inter 5000 rise 2 fall 2 #备节点
注意了,上面的 rabbitMQ 集群节点配置 # inter 每隔 5 秒对 mq 集群做健康检查, 2 次正确证明服务可用,2 次失败证明服务器不可用,并且配置主备机制
远程模式(Shovel模式)
Shovel架构模型
Shovel集群的拓扑图
Shovel集群配置步骤
具体配置步骤
大体上就是,当前两个集群想要建立关联,有一个sources(来源)和对destinations(目的地),对于每一个broker需要配置地址,对于destinations,需要声明队列、交换机、绑定规则,后续每次创建一个队列和交换机,都要加一个配置,把这个路由通过配置写上,这种方式非常不方便,非常麻烦。
镜像模式(最主流)
镜像模式介绍
- 集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失
- 在实际工作中使用最多,并且实现集群非常简单,一般互联网大厂都会构建这种镜像集群模式
ps:镜像模式实际就是数据备份,像MongoBD的复制集、es的分片都是使用这种模式概念
Mirror镜像队列的优点
- 可靠性,数据不会丢失
- 内部做数据同步,RabbitMQ底层是erlang开发的,天然交换机的方式,有跟原生socket一样低的延迟,故作数据同步时性能非常好
- 3节点,最好时奇数节点,防止脑裂,保障数据可靠性
镜像模式集群架构图
此图就是一个完整的镜像队列集群模型,下面三个RabbitMQ节点保证读写一致,springboot访问RabbitMQ服务器,不是直连,是通过HaProxy,当HaProxy如果是单点,发生故障,整个就提供不了服务了,这里的KeepAlived的作用就是高可用,它会虚拟出来一个VIP,通过VIP路由到其中一台HaProxy,再由这一台HaProxy负载均衡到下游的RabbitMQ集群。
Mirror镜像队列的缺点
从上图可以看出这种镜像队列集群的缺陷就是不能支持横向扩容,因为它的数据存储是有限的,当我们数据量尤其在高峰期流量非常大,可能消费者消费的数据没有那么快,消息都会堆积到镜像队列上,此时横向扩容就没有意义了,因为横向再扩容一份,变为四个节点,四分数据需要同步,对性能和吞吐量都有所降低,反而增加了RabbitMQ集群的负担。所以官方也建议镜像队列集群保证最小奇数3个即可。但是如果此时想要横向扩容,官方也提供一种多活模式用于解决镜像队列集群不能支持横向扩容的缺陷。
多活模式
多活模式介绍
- 这种模式也是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用这种双活或者多活模式来实现的
- 这种模式需要依赖RabbitMQ的federation插件,可以实现持续的可靠性AMQP数据通信,多活模式实际配置与应用非常简单
- RabbitMQ部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心中各部署一套RabbitMQ集群,各中心的RabbitMQ服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享
多活集群架构
Application代表应用,下面两组集群都是一个镜像对列集群,需要保证两个集群之间的异地互通,中间就可以使用federation插件用于消息连接通信。
多活模式 - Federation插件
-
Federation插件是一个不需要构建Cluster,而在Brokers之间传输消息的高性能插件。因为它的目的就是让集群之间消息传输。Federation插件可以在Brokers或者Cluster之间传输消息。连接的双方可以使用不同的users和virtual hosts,双方也可以使用版本不同的RabbitMQ和Erlang。Federation插件使用AMOP协议通讯,可以接受不连续的传输
-
Federation Exchanges,Exchanges代表交换机,可以看成Downstream(下游交换机)从Upstream(上游交换机)主动拉取消息,但并不是拉去所有消息。其实和Shovel模式差不多,只是解决了Shovel模式必须要在配置文件中配置再重启的麻烦操作,**Federation的优点就是想加哪些Exchanges相互通信,直接可以通过控制台操作,有可视化页面且即时生效,支持热更新。**必须是在Downstream上已经明确定义Bindings关系的Exchange(上游的交换机),也就是有实际的物理Quene来接收消息,才会从Upstream拉取消息到Downstream。使用AMOP协议实施代理间通信,Downstream会将绑定关系组合在一起,绑定/解除绑定命令将发送到Upstream交换机。因此,Federation Exchange只接收具有订阅的消息,官方图说明如下:
ps:可以认为是两个独立的镜像队列集群,只是可以使用Federation插件实现上下游灵活的消息通信。
RabbitMQ核心概念
初识RabbitMQ
- RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,这也是所有MQ所做的事。比如即时性要求不高、异步rpc的订单服务和物流服务的解耦。但是即时性要求很高,同步rpc比如下单和支付的动作则不能使用MQ
- RabbitMQ是使用Erlang语言来编写的,Erlang语言属于交换机语言,优点时数据同步快,并且主流MQ中只有RabbitMQ是基于AMQP协议的
- RabbitMQ的消息堆积能力有限,不太适合比较大的消息堆积的场景,可以采用多集群的方式但是成本会增加
AMQP协议
- AMQP全称:Advanced Message Queuing Protocol,高级消息队列协议
- AMQP定义:是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息中间件设计
AMQP协议模型
图中Publisher和Consumer代表生产者和消费者,Server即RabbitMQ的broker,Virtual host代表虚拟主机/路径,其实就是作用域的划分,跟微服务的划分类似,比如订单体系、商品体系、物流体系等。Exchange代表交换机,可以简单的理解为topic(主题),Message Queue代表消息队列,用于存储消息到磁盘,Exchange和Message Queue是多对多的关系,之间的交互是通过路由的概念。但是一般一个Message Queue只会绑定一个Exchange。Publisher只发送消息到Exchange上,只与Exchange通信,Consumer只需要监听Message Queue,不需要关注Exchange。
AMQP核心概念
- Server:又称broker,就是一个RabbitMQ节点,接收客户端的连接,实现AMQP实体服务
- Connection:连接,应用程序与broker的网络连接
- Channel:网络信道,几乎所有的操作都是在Channel中进行的,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务
- Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则是消息体内容
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同名称的Exchange或Queue,但是不同的Virtual host里面能有相同名称的Exchange
- Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- Binding:Exchange或Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定Queue
- Queue:也称Message Queue,消息队列,保存消息并将它们转发给消费者
RabbitMQ的整体架构是什么样子的?
RabbitMQ消息是如何流转的?
RabbitMQ环境搭建
RabbitMQ安装步骤
- 官网地址:http://www.rabbitmq.com/
- 提前准备:安装Linux必要的安装包
- 下载RabbitMQ必须安装包
- 配置文件修改
在这里我们使用RabbitMQ 3.6.5 版本进行操作:
-
环境描述:Linux(centos7 Redhat7)
## 1. 首先在Linux上进行一些软件的准备工作,yum下来一些基础的软件包 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz ## 配置好主机名称:/etc/hosts /etc/hostname,使得装RabbitMQ集群的三台服务器通过主机名可以ping通 ## 2. 下载RabbitMQ所需软件包(本神在这里使用的是 RabbitMQ3.6.5 稳定版本) wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-1.1.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm ## 3. 安装服务命令 rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --force --nodeps rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm ## ps:rpm的包安装不能指定安装目录,tar.gz的包可以 ## 4. 修改配置文件用户登录与连接心跳检测 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app 修改点1:loopback_users 中的 <<"guest">>,只保留"guest" (用于用户登录) 修改点2:heartbeat 为10(用于心跳连接) ## 5. 安装管理插件 ## 5.1 首先启动服务(后面 | 包含了停止、查看状态以及重启的命令) /etc/init.d/rabbitmq-server start | stop | status | restart ## 5.2 查看服务有没有启动: lsof -i:5672 (5672是Rabbit的默认端口) ## 5.3 安装web端的客户端插件 rabbitmq-plugins enable rabbitmq_management ## 5.4 可查看管理端口(控制台)有没有启动: lsof -i:15672 或者 netstat -tnlp | grep 15672 ## 6. 一切OK 我们访问地址,输入用户名密码均为 guest : ## http://你的ip地址:15672/ 默认账号和密码都是guest ## 7. 如果一切顺利,那么到此为止,我们的环境已经安装完啦
RabbitMQ急速入门HelloWorld
急速入门-消息生产与消费
- ConnectionFactory:获取连接工厂
- Connection:一个连接
- Channel:数据通信信道,可发送和接收消息
- Queue:具体消息存储队列
- Producer & Consumer:生产者和消费者
入门代码示例
生产者
package com.bfxy.rabbitmq.api.helloworld;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String queueName = "test001";
/*
queueDeclare()方法:用于实现通道与消息队列的绑定。它包含5个参数。
String queue:被绑定的消息队列名,当该消息队列不存在时,将新建该消息队列
Boolean durable:是否持久化消息队列,该参数持久化的仅为队列,而不包含队列中的消息
Boolean exclusive:该通道是 否独占该队列
Boolean autoDelete:消费完成时是否删除队列,该删除操作在消费者彻底断开连接之后进行
Map<String, Object> arguments:其他配置参数
*/
channel.queueDeclare(queueName, false, false, false, null);
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // deliveryMode设置为2,代表将该队列的消息持久化;1代表不将该队列的消息持久化
.contentEncoding("UTF-8")
.headers(headers).build();
// 5 发送
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
/*
basicPublish()方法:是基础的发布消息方法,它有四个参数
String exchange:交换机名,传入空字符串"",默认走AMQP default这个Exchange
String routingKey:(路由地址)发布消息的队列,无论channel绑定哪个队列,最终发布消息的队列都有该字串指定
AMQP.BasicProperties props:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化。
byte[] body:消息数据本体,必须是byte数组
*/
channel.basicPublish("", queueName, props, msg.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 该连接如果和rabbitmq断了的重连设置
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String queueName = "test001";
channel.queueDeclare(queueName, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、消费者
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Rabbitmq核心API-Exchange交换机
Exchange 交换机介绍
Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
声明交换机
exchangeDeclare()方法有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
这个方法的返回值是 AMQP.Exchange.DeclareOk 用来标识成功声明了一个交换器。
- exchange:交换机名称
- type:交换机类型 direct、topic、fanout、headers
- Direct Exchange:直连交换机
- Topic Exchange:主题交换机
- Fanout Exchange:扇形交换机
- Header Exchange:头交换机
- durable:是否需要持久化,true为持久化
- autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
- internal:当前Exchange是否用于RabbitMQ内部使用,默认为false。用户所创建的Queue不会消费该类型交换机下的消息,既然是为了RabbitMQ系统所用,作为用户,我们就没有必要创建该类型的Exchange,当然默认也是选择false
- arguments:扩展参数,用于扩展AMQP协议自制定化使用,可以自定义符合自己业务场景的交换机
声明队列
AMQP.Queue.DeclareOk queueDeclare有两个重载方法,一个无惨、一个有参
AMQP.Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
-
queue:队列名称
-
durable:是否设置持久化,true为持久化
-
exclusive:设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里 需要注意三点:
-
排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是 可以同时访问同一连接创建的排他队列;
-
“首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
-
即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
-
-
autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
-
arguments:扩展参数,用于扩展AMQP协议自制定化使用,可以自定义符合自己业务场景的队列
-
其他方法可以在Channel接口中查看。
生产者和消费者都能够使用queueDeclare()方法来声明一个队列, 但是如果消费者在同一个信道上订阅了另一个队列, 就无法再声明队列了。必须先取消订阅,然后将信道置为“传输”模式,之后才能声明队列。
如果需要查看队列内容,可以调用AMQP.Queue. PurgeOk queuePurge(String queue) throws IOException;
通过路由键将交换机和队列绑定
AMQP.Queue.BindOk queueBind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments)throws IOException;
- queue:队列名称
- exchange:交换机名称
- routingKey:路由键
- arguments:其他参数,可用于头交换机绑定队列
通过路由键将交换机和交换机绑定
AMQP.Exchange.BindOk exchangeBind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
- destination:目标交换机
- source:源交换机
- routingKey:路由键
- arguments:其他参数
生产者发送消息到source交换机中,指定了routingKey1,source与另一个交换器destination还有routingKey2绑定,如果routingKey1==routingKey2,就会并把消息转发到destination中,进而存储在destination绑定的队列queue中。
生产者发送消息
void basicPublish(String exchange,
String routingKey,
boolean mandatory,
boolean immediate,
BasicProperties props,
byte[] body) throws IOException;
- exchange:交换机名,传入空字符串"",默认走AMQP default这个Exchange
- routingKey:(路由地址)发布消息的队列,无论channel绑定哪个队列,最终发布消息的队列都有该字串指定
- mandatory:当设置为true时,如果交换机无法通过自身类型以及路由键找到队列时,会调用Basic.Return将详细返回给生产者。当为false时,出现上述情况,直接丢弃。
- immediate:当设置为true时,如果发现队列没有绑定消费者时,会调用Basic.Return将详细返回给生产者。RabbitMQ 3.0以后版本不再支持。
生产者需要通过Channel.addReturnListener()添加ReturnListener监听器,来处理返回的消息。当然可以设置备份交换机(Alternate Exchange),当消息无法路由时,通过备份交换机存储,而不返回客户端。 - props:消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化。
- body:消息数据本体,必须是byte数组
消费者消费消息
String basicConsume(String queue,
boolean autoAck,
String consumerTag,
boolean noLocal,
boolean exclusive,
Map<String, Object> arguments,
Consumer callback) throws IOException;
- queue:队列名称
- autoAck:是否自动确认消息,true自动确认,false 不自动要手动调用basicAck()方法实现消息确认,实际工作中设置为false
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置为true表示不能将同一个Connection中生产者发送的消息发送给这个Connection中的消费者
- exclusive:是否排他
- arguments:其他参数
- callback:设置消费者回调函数,消费者DefaultConsumer建立使用,重写其中的方法
Direct Exchange
- 直连交换机,可以通过生产者发送消息Routing Key和通道声明的Queue的Name完全匹配,也可以通过通道的
channel.queueBind(queueName, exchangeName, routingKey);
方法将Routing Key、Queue Name和Exchange进行绑定,然后生产者发送消息的Routing key与Exchange、Queue绑定的Routing key相等。完全匹配只能使用RabbitMQ自带的Exchange:default Exchange(控制台可以看到的AMQP Exchange:default )实现,所以不需要将Exchange进行任何绑定(binging)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。即指定exchange为""且Routing Key=Queue Name
。默认交换机(default exchange)实际上是一个由消息代理预先声明好的,没有名字(名字为空字符串)的直连交换机(direct exchange)。可以认为默认交换机就是一个特殊的直连交换机。如下图。
- 完全匹配的代码演示可以看入门代码示例,下面的代码示例是通过通道的
channel.queueBind(queueName, exchangeName, routingKey);
方法将Routing Key、Queue Name和Exchange进行绑定,然后生产者发送消息的Routing key与Exchange、Queue绑定的Routing key相等的代码示例。
ps:同一个Routing Key可以绑定到不同的Queue Name上去,在这种情况下,直连交换机将会和扇形交换机有着相同的行为,将消息推送到所有绑定的Queue Name上
代码示例
生产者
package com.bfxy.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DirectExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明交换机
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5 消息发送
String routingKey = "test_direct_routingKey";
String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DirectExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换机
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明队列
String queueName = "test_direct_queue";
channel.queueDeclare(queueName, false, false, false, null);
// 通过路由键将交换器和队列绑定起来
String routingKey = "test_direct_routingKey";
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Topic Exchange
-
主题交换机,topic类型的Exchange会根据通配符对Routing key进行匹配,生产者发送消息的Routing key匹配Exchange、Queue绑定的Routing key规则,就会被路由到对应的Queue上。通配符的匹配规则如下:
-
Routing key必须是一串字符串,每个单词用“.”分隔;
-
符号#表示一个或多个单词
-
符号*表示一个单词
-
例如:
abc.#
能够匹配到abc.def.ghi
,但是abc.*
只会匹配到abc.def
-
代码示例
生产者
package com.bfxy.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4TopicExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明交换机
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5 消息发送
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4TopicExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
String routingKey = "user.*"; // 匹配一个词,故能接收到两条消息
// String routingKey = "user.#"; // 匹配一个或多个词,故能接收到三条消息
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
System.err.println("consumer2 start... ");
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Fanout Exchange
-
扇形交换机,采用广播模式,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
-
不设置和处理路由键,设置了也不生效,只需要简单的将队列绑定到交换机上
-
Fanout交换机转发消息是最快的
代码示例
生产者
package com.bfxy.rabbitmq.api.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4FanoutExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明交换机
String exchangeName = "test_fanout_exchange_333";
String exchangeType = "fanout";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5 消息发送
String routingKey = ""; // 不设置路由键,设置了也不生效也是转发到与该交换机绑定的所有队列上
for (int i = 0; i < 10; i ++) {
String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4FanoutExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String exchangeName = "test_fanout_exchange_333";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Header Exchange
-
头交换机,不处理路由键。而是根据发送的消息内容中的header键值对和队列绑定交换机的header键值对进行匹配。
-
队列绑定交换机的header键值对里有一个特殊值”x-match”,它有两个值,all和any
-
all:默认值。一个发送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
-
any:一个发送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
-
-
header属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。可以是整型和哈希值,而fanout,direct,topic 的路由键都需要字符串形式的。
-
缺点:headers类型的交换机性能会很差
代码示例
生产者
package com.bfxy.rabbitmq.api.exchange.header;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class Sender4HeaderExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明交换机
String exchangeName = "test_headers_exchange";
String exchangeType = "headers";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
Map<String,Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
// 5 生成发送消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.headers(headers)
.build();
// 6 发送消息
String message = "headers-" + System.currentTimeMillis();
channel.basicPublish(exchangeName, "", props, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.exchange.header;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.util.HashMap;
import java.util.Map;
public class Receiver4HeaderExchange1 {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换机
String exchangeName = "test_headers_exchange";
String exchangeType = "headers";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 声明一个临时队列
String queueName = "test_headers_queue";
channel.queueDeclare(queueName, false, false, false, null);
// 将队列绑定到指定交换机上
Map<String,Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "aaa");
headers.put("x-match", "any");
channel.queueBind(queueName, exchangeName, "", headers);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
提问:生产者创建队列和交换机还是消费者创建队列和交换机
官网解释
http://gigi.nullneuron.net/gigilabs/rabbitmq-who-creates-the-queues-and-exchanges/
个人理解翻译
RABBITMQ:谁创建队列和交换机
消息传递是任何分布式体系结构的基本组成部分。它允许发布者向任何数量的消费者发送消息,而不必了解他们。这对于真正的异步和解耦通信非常有用。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FKOsYWlC-1684741046251)(.\RabbitMQ\rabbitmq-basic-setup.png)]
上图显示了使用RabbitMQ时看到的一个非常基本但典型的架构。发布者将消息发布到交换机。交换机处理将消息路由到绑定到它的队列的逻辑。例如,如果它是扇型交换机,那么将复制相同消息的副本并将其放置在每个队列上。然后,消费者可以从队列中读取消息并处理它们。
要使此架构工作,一个重要的假设是,当发布者和消费者运行时,所有这些RabbitMQ基础设施(即队列、交换和绑定)都必须已经存在。发布者将无法发布到不存在的交换机,消费者也无法从不存在的队列中接收消息。
因此,在开始发送和接收消息之前,让(发布者和消费者)/(发布者或消费者)创建所需的队列、交换和绑定并不是不合理的。让我们来看看如何做到这一点,以及每种方式的含义。
1、职责分工
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vVpPQ3qp-1684741046251)(.\RabbitMQ\rabbitmq-consumer-creates-queues.png)]
为了使发布者和消费者彼此完全分离,理想情况下,发布者应该只知道交换机(而不是队列),消费者应该只知道队列(而不是交换机)。绑定是交换机和队列之间的粘合剂。
一种可能的方法是让发布者处理交换机的创建,而消费者创建他们需要的队列并将其绑定到交换。这具有解耦的优点:当需要新的队列时,需要它们的消费者只需根据需要创建和绑定它们,而发布者不需要了解它们。但它并不是完全解耦的,因为消费者必须了解交易机才能绑定到交易机。
另一方面,丢失信息的风险非常大。如果在任何消费者运行之前部署了发布者,那么交换机将没有绑定,发布到它的任何消息都将丢失。这是否可接受取决于应用程序。
2、发布者创建一切
发布者可以配置为在运行时立即创建所有必要的基础设施(交换机、队列和绑定)。这样做的优点是不会丢失任何消息(因为队列将绑定到交换机,而不需要任何消费者先运行)。
然而,这意味着发布者必须知道将绑定到交换的所有队列,这不是一种非常解耦的方法。每次添加新队列时,必须重新配置和重新部署发布者以创建并绑定它。
3、消费者创建一切
相反的方法是让消费者在运行时立即创建所需的交换机、队列和绑定。与前面的方法一样,这引入了耦合,因为消费者必须知道他们的队列绑定到的交换机。交换机中的任何更改(例如重命名)都意味着所有消费者都必须重新配置和重新部署。当存在大量队列和消费者时,这种复杂性可能令人望而却步。
4、两者都不创造任何东西
一个完全不同的选择是,发布者和消费者都不创建任何所需的基础设施。相反,它是使用管理插件的用户界面或管理CLI预先创建的。这有几个优点:
-
发布者和消费者可以真正脱钩。发布者只知道交换机,而消费者只知道队列。
-
可以作为部署规划的一部分,编写脚本和自动化。
-
可以添加任何更改(例如:新队列),而无需接触任何现有的、已部署的发布者和消费者。
总结
异步消息传递是在分布式体系结构中解耦服务的一种很好的方式,但为了保持它们的解耦,需要一种有效的策略来维护底层消息传递结构(在RabbitMQ的情况下,这些结构就是队列、交换和绑定)。
虽然发布者和消费者服务可能自己负责创建他们所需的内容,但在初始消息丢失、耦合和操作维护(在配置和部署方面)方面可能会付出沉重的代价。
最好的方法可能是处理消息传递系统的配置:在应用程序外部编写脚本。这确保了服务保持解耦,并且排队系统可以根据需要动态变化,而不必影响大量现有服务。
Rabbitmq核心API-其他关键概念讲解
Binding - 绑定
- Exchange和Exchange之间、Exchange和Queue之间的连接关系
- Binding中可以包含RoutingKey或者参数
Queue - 消息队列
- 消息队列,实际存储消息数据
- Durability:是否持久化,Durable:是,Transient:否
- Auto delete:如选yes,代表当最后一个consume监听被移除之后,该Queue会自动删除
Message - 消息
-
服务器和应用程序之间传输的数据
-
本质上就是一段数据,由Properties和Payload(Body)组成
-
常用属性:
- delivery mode:消息持久化类型,1代表非持久化,2代表持久化,性能影响巨大
- headers:键值对,用户自定义的任意键值对
-
其他属性
- content_type:消息体的MIME类型
- content-encoding:消息的编码类型,如是否压缩
- message-id:消息的唯一性标识,由应用设置,防止消息重复消费
- correlation-id:一般用做关联消息的message-id,常用于消息的响应
- timestamp:消息的创建时间,整形,精确到秒
- expiration:消息的过期时间,字符串,但是呈现形式为整形,精确到秒
- app-id:应用程序类型和版本号
- user-id:标识已登录用户,极少使用
- type:消息类型名称,完全由应用决定如何使用该字段
- reply-to:构建回复消息的私有响应队列
- priority:指定队列中消息的优先级
- cluster_id:集群id
什么是MIME类型
根据百度百科的解释:MIME:全称Multipurpose Internet Mail Extensions,多功能Internet邮件扩充服务。它是一种多用途网际邮件扩充协议,在1992年最早应用于电子邮件系统,但后来也应用到浏览器。MIME类型就是设定某种扩展名的文件用一种应用程序来打开的方式类型,当该扩展名文件被访问的时候,浏览器会自动使用指定应用程序来打开。多用于指定一些客户端自定义的文件名,以及一些媒体文件打开方式。
说白了也就是文件的媒体类型。浏览器可以根据它来区分文件,然后决定什么内容用什么形式来显示。Response对象通过设置ContentType使客户端浏览器,区分不同种类的数据,并根据不同的MIME调用浏览器内不同的程序嵌入模块来处理相应的数据。
MIME类型格式:类别/子类别;参数
Content-Type: [type]/[subtype]; parameter
MIME主类别:
text:用于标准化地表示的文本信息,文本消息可以是多种字符集和或者多种格式的;
Multipart:用于连接消息体的多个部分构成一个消息,这些部分可以是不同类型的数据;
Application:用于传输应用程序数据或者二进制数据;
Message:用于包装一个E-mail消息;
Image:用于传输静态图片数据;
Audio:用于传输音频或者音声数据;
Video:用于传输动态影像数据,可以是与音频编辑在一起的视频数据格式。
部分文件的MIMEType
序号 | 内容类型 | 文件类型 | 说明 |
---|---|---|---|
1 | application/msword | doc dot | Microsoft Word2003 |
2 | application/vnd.ms-excel | xls | Microsoft Excel2003 |
4 | application/vnd.ms-powerpoint | ppt | Microsoft Powerpoint |
5 | application/pdf | Adobe Acrobat | |
6 | application/octet-stream | bin exe so dll class | 可执行程序 |
6 | application/zip | zip | winzip |
7 | application/x-gzip | gz | gzip |
8 | image/gif | gif | GIF图像 |
9 | image/jpeg | jpg jpeg jpe | JPEG图像 |
10 | image/png | png | PNG图像 |
11 | text/html | html jsp | 网页文档 |
12 | text/plain | txt | 文本文档 |
13 | text/xml | xml | XML文档 |
14 | text/json | json | JSON字符串 |
Virtual Host - 虚拟主机
- 虚拟地址,用于进行逻辑隔离,最上层的消息路由
- 一个Virtual Host里面可以有若干个Exchange和Queue
- 同一个Virtual Host里面不能有相同名称的Exchange或Queue
Rabbitmq高级特性-生产端可靠性投递与消费端幂等性
RabbitMQ的ACK机制
1、什么是消息确认ACK。
答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。同理,RabbitMQ也有生产者ACK应答,告诉RabbitMQ消息已发送。channel.basicConsume(queueName, true, consumer);
方法的第二个参数自动ACK的意思就是在consumer收到消息后,可能消息还没处理完,自动回给RabbitMQ消息确定-ACK。
手动AC是正常的生产或者实际开发常用的方式,只有消息处理成功,才会手动调用消息确定-ACK方法,失败返回ACK,确保消息不丢失。
2、RabbitMQ的ACK的消息确认机制。
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,MQ收到反馈后才将此消息从队列中删除。消息的ACK确认机制默认是打开的。
如果一个消费者在处理消息出现了网络不稳、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列。
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
3、ACK机制的开发注意事项?
如果消费者发生异常,ack没法发送消息应答。Message会一直重新重发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
4、怎么解决ACK的内存泄漏问题?
(1)在程序处理中可以进行异常捕获,保证消费者的程序正常执行。
(2)使用RabbitMQ的ack的配置确认机制。(开启重试次数)
(3)手动设置消息应答。如果消费端异常,也返回应答成功,再把未消费成功的数据记录下来,进行补偿。
生产端如何保障消息100%的投递成功?
RabbitMQ在未接收到消费端的ACK后,仍然会将未ACK的消息再发给消费端直至收到消费端的ACK,故做消息的可靠性投递只需要做生产端与MQ之间的消息传输即可,消费端的可靠性投递大多数MQ都已经内部实现。
什么是生产端的可靠性投递?
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)确认应答
- 未完善的消息进行补偿机制
举例无法满足生产端的可靠性投递的情况
情况一:当Broker繁忙时返回的ack是false,false可能不是因为Broker磁盘满了,就是因为Broker处理不过来了,此时需要稍后再推送该条消息
情况二:当消息发送到Broker后,Broker也存储了。Broker返回confirm的ack状态,在返回confirm的ack状态时由于网络原因,producer的confirm Listener没有收到ack应答,此时这条消息就没发确认了。
以上举例的两种情况就没法满足生产端消息的可靠性投递的需求了。
BAT/TMD互联网大厂的解决方案-消息落库,对消息状态进行打标
先看蓝色框和红色框的部分,RabbitMQ天然支持的消息确认模式:当sender发送一条消息send message到MQ Broker后,MQ Broker会返回一个broker confirm的ack应答,MQ Broker里的producer的confirm Listener会监听这个ack应答,判断这个应答是否成功,成功则OK,失败则可以作其他的补偿处理等。
需要保证业务数据和消息是强一致性,或者是原子性的。比如:订单创建成功和发送消息给下游系统保证强一致性,对应图中BIZ DB和MSG DB两个数据库,可以通过事务保证,如果step1成功了,此时MSG DB里有一个字段status=0;step2再去发送消息,broker收到message,step3返回ack应答,如果ack应答为成功,再去step4更新下MSG DB里有一个字段status=1。但是当出上述情况可能MSG DB里status一直是0。此时,需要作一些补偿,比如当我们发送消息入库后,以此时间为准,后超过5分钟MSG DB里status还是一直是0。可以通过step5定时任务把这些消息抓取出来,然后做step6做消息的重新投递。假设重新投递好几次,status还是为0,此时可以做step7:最大重新投递次数Retry Count为3,当Retry Count大于3就认为该条消息失败,把status修改为2代表最终消息发送失败,待回滚。回滚即BIZ DB和MSG DB两张表数据作一次回滚,不能删除,原因见下极端情况。
存在一种极端的情况:恰巧发送第四次消息的时候,broker收到消息了,但是ack时没收到,此时,判断Retry Count大于3即回滚,这条订单数据被回滚了。 但是这条消息已经发送到下游系统,被下游系统消费处理成功了。所以即使是失败的消息也不能删除需要做备份。然后BIZ DB的数据需要和下游系统的数据做一致性校验对比。比如下游系统处理完成已经发货了,一般这样就需要把该条订单消息置为可用。具体根据业务制定。
需要知道的是,任何的设计都不能保障这个真正的100%可靠性投递,总会存在极端情况。这里也可以将最大重新投递次数Retry Count设置为5、8、10等。大大减少这种情况的发生。剩下的可以使用人工的方式进行解决。很多时候特别微小概率的事如果做的比较复杂就会增加系统架构的复杂度。
另外,BIZ DB和MSG DB两个数据库必须要求同源,才能保证一个事务,还有在高并发情况下,不会加事务。此时可以将BIZ DB和MSG DB两个数据库作为一个数据库,就是BIZ DB数据库,status字段直接加在订单表中,对一张表操作就不用事务了。
消费端 - 幂等性保障
幂等性概念
我们可以借鉴数据库的乐观锁机制,比如我们执行一条更新库存的SQL语句:update t_reps set count = count - 1, version = version + 1 where version = 1
,即仅当version = 1时才执行该条更新操作,当并发条件下即使执行多次该条语句,也只有一次命中数据并修改成功,这就是幂等性
生产端和消费端重复消费问题
生产端发送消息成功,但是在MQ Broker返回的ACK时由于网络等原因失败,按照上面的流程导致消息记录表的状态一直是0,就会导致消息的重复发。目前大多数MQ如RabbitMQ无法做到幂等,kafka可以保证消息仅有一次投递的机制但是对性能损耗较大。
消费端也可能在返回ACK作消息应答给MQ Broker时异常。Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
在海量订单产生的业务高蜂期,如何避免消息的重复消费问题?
- 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到多条一样的消息
业界主流的幂等性操作
-
业务唯一ID(或)指纹码机制,利用数据库主键去重
-
整个思路就是首先我们需要根据消息生成一个全局唯一的ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。
将唯一ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了。
-
例如:select count(1) from t_order where id = 唯一ID(或)指纹码,这条sql的作用可以保证大多数插入相同唯一ID(或)指纹码的数据只有一条插入可以执行,但是肯定存在两次查询都得到0的情况并同时插入,此时就需要通过数据库唯一id的方式实现幂等性了。可以提高性能。这也是很多高并发代码的处理方式,把大概率的事件用极小的性能损耗完成;小概率事件用兜底的方案实现,比如缓存也是这样
-
好处:实现简单
-
坏处:高并发下有数据库写入的性能瓶颈
Rabbitmq高级特性-生产端特性_确认机制和返回机制
Confirm 消息确认机制
Confirm 消息确认机制介绍
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答,生产者投递消息后对Broker返回的ACK作监听,过程为异步
- 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
Confirm 确认消息流程解析
Confirm Listener监听MQ Broker返回的ACK
Confirm 确认消息实现
- 第一步:在channel开启确认模式:channel.confirmSelect()
- 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
代码示例
生产者
package com.bfxy.rabbitmq.api.confirmlistener;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4ConfirmListener {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String exchangeName = "test_confirmlistener_exchange";
String routingKey1 = "confirm.save";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.confirmSelect(); // 开启确认模式
channel.addConfirmListener(new ConfirmListener() { // 添加监听
// 失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- error ---------");
}
// 成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("------- ok ---------");
}
});
// 5 发送
String msg = "Hello World RabbitMQ 4 Confirm Listener Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
} /*finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}*/ // 不要关闭资源,ConfirmListener会监听broker的应答执行回调函数
}
}
消费者
package com.bfxy.rabbitmq.api.confirmlistener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4ConfirmListener {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String exchangeName = "test_confirmlistener_exchange";
String exchangeType = "topic";
String queueName = "test_confirmlistener_queue";
String routingKey = "confirm.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
// 手工签收消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Return 消息返回机制
Return 消息返回机制
-
Return Listener用于处理一些不可路由的消息,可以对路由错误的消息作日志记录等处理
-
我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作
-
当某种情况下,我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这个不可达的消息,就要使用Return Listener!
-
在基础的API中有一个关键的配置项:
- Mandatory;如果为true,则监听器会接收到路由不可达的消息,然后进行后续操作,如果是false,那么broker端会自动删除该消息!
Return 消息返回机制流程
代码示例
生产者
package com.bfxy.rabbitmq.api.returnlistener;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Sender4ReturnListener {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String exchangeName = "test_returnlistener_exchange";
String routingKey1 = "abcd.save";
String routingKey2 = "return.save";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("**************handleReturn**********");
System.out.println("replyCode: " + replyCode);
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("body: " + new String(body));
}
});
// 6 发送
String msg = "Hello World RabbitMQ 4 Return Listener Message ...";
boolean mandatory = true;
channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes());
// boolean mandatory = false;
// channel.basicPublish(exchangeName, routingKey1 , mandatory, null , msg.getBytes());
// channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
} /*finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}*/ // 不要关闭资源,ReturnListener会监听broker的应答执行回调函数
}
}
消费者
package com.bfxy.rabbitmq.api.returnlistener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4ReturnListener {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String exchangeName = "test_returnlistener_exchange";
String exchangeType = "topic";
String queueName = "test_returnlistener_queue";
String routingKey = "return.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
//参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
//循环获取消息
while (true) {
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Rabbitmq高级特性-消费端特性_流控服务和ACK重回队列
消费端限流
消费端限流介绍
- 高并发情况下,队列里面一瞬间就就积累了上万条数据,但是消费者无法同时处理这么多请求,这个时候当我们打开客户端,瞬间就有巨量的信息给推送过来、但是客户端是没有办法同时处理这么多数据的,结果就是消费者(客户端)挂掉了
- RabbitMQ提供一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置的qos的值)未被确认之前,不进行拉取新的消息到消费端
- 具体方法:void basicQos(int prefetchSize, int prefetchCount, boolean global);
- 参数解释
- prefetchSize:消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
- prefetchCount:告诉rabbitmq不要一次性给消费者推送大于N个消息,即一旦有N个消息未ack,则该consumer将组层阻塞掉,直到有消息ack
- global:是否将上面的设置应用于整个通道
- false:表示只应用于当前消费者
- true:表示当前通道的所有消费者都应用这个限流策略
ps:prefetchSize和global这两项,RabbitMQ没有实现,暂且不研究。特别注意一点,prefetchCount在 autoAck=false 的情况下才生效,即在自动应答的情况下该值无效
代码示例
生产者
package com.bfxy.rabbitmq.api.limit;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
// 5 发送
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
channel.basicPublish("", queueName , props , msg.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.limit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 消费端限流设置
channel.basicQos(0, 1, false);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费端ACK与重回队列
消费端的手工ACK和NACK
手工ACK
-
具体方法:void basicAck(long deliveryTag, boolean multiple);
-
调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。
-
参数解释:
- deliveryTag:消息的唯一标识ID,是一个正整数,是mq来自增设置的,不用在消息手动设置在消息中
- multiple:是否应用于多消息
-
使用场景:如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。一般工作中都会使用手工ACK。
NACK
-
具体方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue);
-
表示消息处理失败,如果设置了重回队列,Broker端就会将没有成功处理的消息重新发送。
-
参数解释:
- deliveryTag:消息的唯一标识ID,是一个正整数,是mq自增设置的,不用在消息发送时手动设置在消息中
- multiple:是否应用于多消息
- requeue:是否重新放回队列,否则丢弃或者进入死信队列
-
使用场景:消费端进行消费的时候,如果由于业务异常,我们可以进行日志的记录,然后进行补偿,尽量不要重回队列,这个可以在NACK里做。
重回队列
- 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker
- 不推荐没有处理成功的消息重回队列,因为如果这样这条消息一直处理不成功,就会持续占用内存,推荐直接使用NACK日志记录即可
- 重回队列都是重回到队列的尾部
- 一般我们实际应用中,都会关闭重回队列,也就是设置为false
代码示例
生产者
package com.bfxy.rabbitmq.api.requeue;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
// 5 发送
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
Map<String, Object> headers = new HashMap<>();
headers.put("flag", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
channel.basicPublish("", queueName, props, msg.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
package com.bfxy.rabbitmq.api.requeue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
Thread.sleep(1000);
// 获取发送消息中headers里flag为0的一条消息数据,模拟异常消费的情况
if ((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {
// throw new RuntimeException("异常"); // 报异常,可以添加日志记录
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
} else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
Rabbitmq高级特性-TTL消息与死信队列详解
TTL消息/队列
TTL
- TTL是Time To Live的缩写,也就是生存时间
- 设置消息或者队列(即该队列全部消息)的生存时间为多少时间,如果超过这个时间没有消费者消费,这条消息或队列的全部消息删除,后续消费者不能消费删除掉的消息
- RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
- RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过队列的超时时间配置,那么消息会自动的清除
死信队列
死信队列:DLX,Dead-Letter-Exchange
-
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
-
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
-
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而路由到设置的Exchange的一个队列上
-
可以监听这个Exchange中消息做相应的处理,比如做补偿,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能
-
死信队列设置:
- 首先需要设置死信队列的exchange和queue,然后进行绑定:
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
- 首先需要设置死信队列的exchange和queue,然后进行绑定:
-
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可(即给该队列添加一个死信队列配置):arguments.put(“x-dead-letter-exchange”,“dlx.exchange”)
-
这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列
消息变成死信有以下几种情况
- 消息被拒绝(basic.reject/basic.nack)并且requeue=false(不重回队列)
- 消息TTL过期
- 队列达到最大长度,比如达到消息大小或者个数
代码示例
生产端
package com.bfxy.rabbitmq.api.dlx;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Sender4DLXExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
// 1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2 创建Connection
connection = connectionFactory.newConnection();
// 3 创建Channel
channel = connection.createChannel();
// 4 声明
String exchangeName = "test_dlx_exchange";
String routingKey = "group.bfxy";
String exchangeType = "topic";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5 发送
Map<String, Object> headers = new HashMap<>();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
// TTL
.expiration("6000")
.headers(headers).build();
String msg = "Hello World RabbitMQ 4 DLX Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , props , msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费端
package com.bfxy.rabbitmq.api.dlx;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Receiver4DLXtExchange {
public static void main(String[] args) throws Exception {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("192.168.218.21");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明正常的 exchange queue 路由规则
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String exchangeType = "topic";
String routingKey = "group.*";
// 声明 exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 注意在这里要加一个特殊的属性arguments: x-dead-letter-exchange
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// arguments.put("x-dead-letter-routing-key", "dlx.*");
// arguments.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, false, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
// dlx declare:
channel.exchangeDeclare("dlx.exchange", exchangeType, true, false, false, null);
channel.queueDeclare("dlx.queue", false, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 循环获取消息
while (true) {
// 获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
ps:在消费端设置了死信队列配置,故需要先启动消费端,然后停止消费端,启动生产端,等6s,就可以看到生产的消费因为超过过期时间未消费进入死信队列。
RabbitMQ与SpringBoot2.X整合
RabbitMQ整合应用 - SpringBoot2.X步骤
-
引入maven依赖
<!-- springboot rabbitmq(amqp) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
application.properties配置
-
生产端核心配置
## 是否启用消息确认模式 spring.rabbitmq.publisher-confirms=true ## 设置return消息模式,注意要和mandatory一起去配合使用 spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
-
消费端核心配置
## 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto代表自动,manual代表手动签收 spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 并行数量 spring.rabbitmq.listener.simple.concurrency=1 spring.rabbitmq.listener.simple.max-concurrency=5
-
生产端配置文件
server.servlet.context-path=/
server.port=8001
spring.rabbitmq.addresses=192.168.218.21:5672,192.168.218.22:5672,192.168.218.23:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
## 是否启用消息确认模式
spring.rabbitmq.publisher-confirms=true
## 设置return消息模式,注意要和mandatory一起去配合使用
##spring.rabbitmq.publisher-returns=true
##spring.rabbitmq.template.mandatory=true
spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
生产者
package com.bfxy.rabbitmq.producer.component;
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 这里就是确认消息的回调监听接口,用于确认消息是否被broker所收到
*/
final ConfirmCallback confirmCallback = new ConfirmCallback() {
/**
* @param correlationData 作为一个唯一的标识
* @param ack broker 是否落盘成功
* @param cause 失败的一些异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
assert correlationData != null;
System.err.println("消息ACK结果:" + ack + ", correlationData: " + correlationData.getId());
}
};
/**
* 对外发送消息的方法
* @param message 具体的消息内容
* @param properties 额外的附加属性
* @throws Exception
*/
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message<?> msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
// 指定业务唯一的id,做唯一标识
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 消息发送完毕的回调函数
MessagePostProcessor mpp = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
throws AmqpException {
System.err.println("---> post to do: " + message);
return message;
}
};
rabbitTemplate.convertAndSend("exchange-1", "springboot.rabbit", msg, mpp, correlationData);
}
}
测试发送方法
package com.bfxy.rabbitmq.producer.test;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.rabbitmq.producer.component.RabbitSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private RabbitSender reRabbitSender;
@Test
public void testSender() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("attr1", "12345");
properties.put("attr2", "abcde");
reRabbitSender.send("hello rabbitmq!", properties);
Thread.sleep(10000);
}
}
消费端配置文件
server.servlet.context-path=/
server.port=8002
spring.rabbitmq.addresses=192.168.218.21:5672,192.168.218.22:5672,192.168.218.23:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
## 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=1
## 最好不要在代码里写死配置信息,尽量使用这种方式也就是配置文件的方式
## 在代码里使用 ${} 方式进行设置配置: ${spring.rabbitmq.listener.order.exchange.name}
spring.rabbitmq.listener.order.exchange.name=exchange-1
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.key=springboot.*
spring.rabbitmq.listener.order.queue.name=queue-1
spring.rabbitmq.listener.order.queue.durable=true
spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
消费者
package com.bfxy.rabbitmq.consumer.component;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.util.Objects;
@Component
public class RabbitReceive {
/**
* @RabbitListener @QueueBinding @Queue @Exchange 这些注解组合使用监听队列
* @param message
* @param channel
* @throws Exception
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "true"),
key = "${spring.rabbitmq.listener.order.exchange.key}"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
// 1. 收到消息以后进行业务端消费处理
System.err.println("-----------------------");
System.err.println("消费消息:" + message.getPayload());
// 2. 处理成功之后 获取deliveryTag 并进行手工的ACK操作, 因为我们配置文件里配置的是手工签收:spring.rabbitmq.listener.simple.acknowledge-mode=manual
// 消息的唯一标识ID,是一个正整数,是mq来自增设置的,不用在消息手动设置在消息中
long deliveryTag = Long.parseLong(Objects.requireNonNull(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)).toString());
channel.basicAck(deliveryTag, false); // RabbitMQ在未接收到消费端的ACK后,仍然会将未ACK的消息再发给消费端直至收到消费端的ACK,故做消息的可靠性投递只需要做生产端与MQ之间的消息传输即可,消费端的可靠性投递大多数MQ都已经内部实现
}
}
@RabbitListener注解使用
-
@RabbitListener:消费端监听配置
-
配合使用注解
- @QueueBinding:绑定消息队列、交换机和路由
- @Queue:定义消息队列
- @Exchange:定义交换机
-
示例:
ps:由于类配置写在代码里非常不友好,所以强烈建议使用配置文件配置。而且,一般来说,像Exchange、Queue、RoutingKey、Binding关系等一般工作中都是先在控制台在MQ Broker配置好。
RabbitMQ基础组件封装实战
RabbitMQ基础组件整体功能概述
基础组件实现关键点
- 一线大厂的MQ组件实现思路和架构设计方案
- 基础组件封装设计 - 迅速消息发送
- 基础组件封装设计 - 确认消息发送
- 基础组件封装设计 - 延迟消息发送
基础组件实现功能点
RabbitMQ基础组件模块划分
项目架构
rabbit-parent -- 父目录
rabbit-api -- 基础组件提供给第三方使用通用上层api接口
rabbit-common -- 通用工具、序列化类
rabbit-core-producer -- 对消息的发送类型(比如:迅速、延迟、可靠性)封装
rabbit-task -- 分布式定时任务组件封装,对dangdang网的elastic-job封装,可以放到任何项目中作为一个单独的组件
rabbit-esjob-test -- esjob组件封装测试
rabbit-test -- 可靠性消息测试
父目录pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>foodie-dev</artifactId>
<groupId>com.imooc</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-parent</artifactId>
<packaging>pom</packaging>
<name>rabbit-parent</name>
<description>rabbit-parent</description>
<modules>
<module>rabbit-api</module>
<module>rabbit-common</module>
<module>rabbit-core-producer</module>
<module>rabbit-task</module>
</modules>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<fasterxml.uuid.version>3.1.4</fasterxml.uuid.version>
<org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version>
<druid.version>1.0.24</druid.version>
<elastic-job.version>2.1.4</elastic-job.version>
<guava.version>20.0</guava.version>
<commons-lang3.version>3.3.1</commons-lang3.version>
<commons-io.version>2.4</commons-io.version>
<commons-collections.version>3.2.2</commons-collections.version>
<curator.version>2.11.0</curator.version>
<fastjson.version>1.1.26</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!--对json格式的支持 -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${org.codehaus.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
<version>${fasterxml.uuid.version}</version>
</dependency>
</dependencies>
</project>
RabbitMQ基础组件API封装
通用API抽象——rabbit-api
统一消息类
package com.bfxy.rabbit.api.model;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message implements Serializable {
private static final long serialVersionUID = -1L;
/**
* 消息的唯一ID
*/
private String messageId;
/**
* 消息的主题(即RabbitMQ中的exchange概念)
*/
private String topic;
/**
* 消息的路由规则
*/
private String routingKey = "";
/**
* 消息的附加属性
*/
private Map<String, Object> attributes = new HashMap<>();
/**
* 延迟消息的参数配置
*/
private Integer delayMills;
/**
* 消息类型,默认为CONFIRM类型
*/
private MessageTypeEnum messageType = MessageTypeEnum.CONFIRM;
public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, Integer delayMills) {
this.messageId = messageId;
this.topic = topic;
this.routingKey = routingKey;
this.attributes = attributes;
this.delayMills = delayMills;
}
}
消息类型枚举类
package com.bfxy.rabbit.api.enums;
public enum MessageTypeEnum {
/**
* 迅速消息:不需要保障消息的可靠性,也不需要做confirm确认
*/
RAPID("RAPID"),
/**
* 确认消息:不需要保障消息的可靠性,但是会做消息的confirm确认
*/
CONFIRM("CONFIRM"),
/**
* 可靠性消息:一定保障消息的100%可靠性投递,不允许有任何消息的丢失
* PS:保证数据库和所发的消息是原子性的(即最终一致的)
*/
RELIANT("RELIANT");
private final String value;
MessageTypeEnum(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
建造者模式——用于创建消息类
package com.bfxy.rabbit.api.model.builder;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.exception.MessageRunTimeException;
import com.bfxy.rabbit.api.model.Message;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* $MessageBuilder 设计模式-建造者模式
*/
public class MessageBuilder {
private String messageId;
private String topic;
private String routingKey = "";
private Map<String, Object> attributes = new HashMap<>();
private Integer delayMills;
private MessageTypeEnum messageType = MessageTypeEnum.CONFIRM;
private MessageBuilder() {
}
public static MessageBuilder create() {
return new MessageBuilder();
}
public MessageBuilder withMessageId(String messageId) {
this.messageId = messageId;
return this;
}
public MessageBuilder withTopic(String topic) {
this.topic = topic;
return this;
}
public MessageBuilder withRoutingKey(String routingKey) {
this.routingKey = routingKey;
return this;
}
public MessageBuilder withAttributes(Map<String, Object> attributes) {
this.attributes = attributes;
return this;
}
public MessageBuilder withAttribute(String key, Object value) {
this.attributes.put(key, value);
return this;
}
public MessageBuilder withDelayMills(Integer delayMills) {
this.delayMills = delayMills;
return this;
}
public MessageBuilder withMessageType(MessageTypeEnum messageType) {
this.messageType = messageType;
return this;
}
public Message build() {
if (messageId == null) {
messageId = UUID.randomUUID().toString();
}
// topic为空,不能创建,抛出运行期异常
if (topic == null) {
throw new MessageRunTimeException("this topic is null");
}
Message message = new Message(messageId, topic, routingKey, attributes, delayMills, messageType);
return message;
}
}
基础异常类封装
package com.bfxy.rabbit.api.exception;
public class MessageException extends Exception {
private static final long serialVersionUID = -1L;
public MessageException() {
super();
}
public MessageException(String message) {
super(message);
}
public MessageException(Throwable cause) {
super(cause);
}
public MessageException(String message, Throwable cause) {
super(message, cause);
}
}
package com.bfxy.rabbit.api.exception;
public class MessageRunTimeException extends RuntimeException {
private static final long serialVersionUID = -1L;
public MessageRunTimeException() {
super();
}
public MessageRunTimeException(String message) {
super(message);
}
public MessageRunTimeException(Throwable cause) {
super(cause);
}
public MessageRunTimeException(String message, Throwable cause) {
super(message, cause);
}
}
生产者抽象接口
package com.bfxy.rabbit.api.business;
import com.bfxy.rabbit.api.exception.MessageRunTimeException;
import com.bfxy.rabbit.api.model.Message;
import java.util.List;
/**
* $MessageProducer 生产者发送消息
*/
public interface MessageProducer {
/**
* 消息的发送
* @param message
* @throws MessageRunTimeException
*/
void send(Message message) throws MessageRunTimeException;
/**
* 消息的批量发送
* @param messageList
* @throws MessageRunTimeException
*/
void send(List<Message> messageList) throws MessageRunTimeException;
/**
* 消息的发送,附带SendCallback回调执行响应的业务逻辑处理
* @param message
* @param sendCallback
* @throws MessageRunTimeException
*/
void send(Message message, SendCallback sendCallback) throws MessageRunTimeException;
}
回调函数封装
package com.bfxy.rabbit.api.business;
/**
* $SendCallback 回调函数处理
*/
public interface SendCallback {
void onSuccess();
void onFailure();
}
消费者抽象接口
package com.bfxy.rabbit.api.business;
import com.bfxy.rabbit.api.model.Message;
/**
* $MessageListener 消费者监听消息
*/
public interface MessageListener {
void onMessage(Message message);
}
自动装配与架构接口定义
核心生产者封装(后续肯定作为jar包引入第三方工程)——rabbit-core-producer
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-core-producer</artifactId>
<name>rabbit-core-producer</name>
<dependencies>
<dependency>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
spring的自动装配(\resources\META-INF\spring.factories)
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.bfxy.rabbit.producer.autoconfigure.RabbitProducerAutoConfiguration
package com.bfxy.rabbit.producer.autoconfigure;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ComponentScan;
/**
* $RabbitProducerAutoConfiguration 自动装配
*/
@Configuration
@ComponentScan({"com.bfxy.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}
发送消息的实际实现类
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.business.MessageProducer;
import com.bfxy.rabbit.api.business.SendCallback;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.exception.MessageRunTimeException;
import com.bfxy.rabbit.api.model.Message;
import com.google.common.base.Preconditions;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* $ProducerClient 发送消息的实际实现类
*/
@Component
public class ProducerClient implements MessageProducer {
@Resource
private final RabbitBroker rabbitBroker;
public ProducerClient(RabbitBroker rabbitBroker) {
this.rabbitBroker = rabbitBroker;
}
@Override
public void send(Message message) throws MessageRunTimeException {
// 避免不使用MessageBuilder构建的方式(比如直接new Message)导致的topic为空的情况
Preconditions.checkNotNull(message.getTopic());
MessageTypeEnum messageType = message.getMessageType();
switch (messageType) {
case RAPID:
rabbitBroker.rapidSend(message);
break;
case CONFIRM:
rabbitBroker.confirmSend(message);
break;
case RELIANT:
rabbitBroker.reliantSend(message);
break;
default:
break;
}
}
@Override
public void send(List<Message> messageList) throws MessageRunTimeException {
messageList.forEach(message -> {
message.setMessageType(MessageTypeEnum.RAPID);
MessageHolder.add(message);
});
rabbitBroker.sendMessages();
}
@Override
public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
// TODO
}
}
发送不同类型消息的核心类
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.model.Message;
/**
* $RabbitBroker 具体发送不同类型消息的接口
*/
public interface RabbitBroker {
void rapidSend(Message message);
void confirmSend(Message message);
void reliantSend(Message message);
void sendMessages();
}
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.model.Message;
import com.bfxy.rabbit.producer.constant.BrokerMessageConst;
import com.bfxy.rabbit.producer.enums.BrokerMessageStatusEnum;
import com.bfxy.rabbit.producer.entity.BrokerMessage;
import com.bfxy.rabbit.producer.service.MessageStoreService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* $RabbitBrokerImpl 真正的发送不同类型的消息实现类
*/
@Component
@Slf4j
public class RabbitBrokerImpl implements RabbitBroker {
// @Resource
// private RabbitTemplate rabbitTemplate;
@Resource
private RabbitTemplateContainer rabbitTemplateContainer;
@Resource
private MessageStoreService messageStoreService;
/**
* 迅速发消息
* @param message
*/
@Override
public void rapidSend(final Message message) {
message.setMessageType(MessageTypeEnum.RAPID);
sendKernel(message);
}
@Override
public void confirmSend(final Message message) {
message.setMessageType(MessageTypeEnum.CONFIRM); // RabbitTemplateContainer里对消息的confirm确认方法进行了封装
sendKernel(message);
}
@Override
public void reliantSend(final Message message) {
message.setMessageType(MessageTypeEnum.RELIANT);
// 判断数据库中该条消息记录是否存在
BrokerMessage brokerMessageDB = messageStoreService.selectByMessageId(message.getMessageId());
if (brokerMessageDB == null) {
// 1.记录数据库消息发送的日志
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setStatus(BrokerMessageStatusEnum.SENDING.getCode());
// tryCount默认是0,在最开始发送的时候不需要设置
//brokerMessage.setTryCount();
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setMessage(message);
messageStoreService.insert(brokerMessage);
}
// 2.执行真正的发送消息逻辑
sendKernel(message);
}
@Override
public void sendMessages() {
List<Message> messageList = MessageHolder.clear();
messageList.forEach(message -> {
MessageHolderAsyncQueue.submit(new Runnable() {
@Override
public void run() {
CorrelationData correlationData = new CorrelationData(String.format("%s#%s#%s", message.getMessageId(), System.currentTimeMillis(), message.getMessageType().getValue()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getRabbitTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
}
});
});
}
/**
* 发送消息的核心方法,使用异步线程池发送消息
* @param message
*/
private void sendKernel(Message message) {
AsyncBaseQueue.submit(new Runnable() {
@Override
public void run() {
CorrelationData correlationData = new CorrelationData(String.format("%s#%s#%s", message.getMessageId(), System.currentTimeMillis(), message.getMessageType().getValue()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getRabbitTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());
}
});
}
}
发送消息的异步线程池
package com.bfxy.rabbit.producer.broker;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class AsyncBaseQueue {
private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 10000;
private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE,
THREAD_SIZE,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
}
});
public static void submit(Runnable runnable) {
senderAsync.submit(runnable);
}
}
公共模块——rabbit-common
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-common</artifactId>
<name>rabbit-common</name>
<dependencies>
<!-- 对外提供的api接口 -->
<dependency>
<artifactId>rabbit-api</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
发送迅速异步消息和确认消息实现
rabbit-core-producer
发送消息的实际实现类
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.business.MessageProducer;
import com.bfxy.rabbit.api.business.SendCallback;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.exception.MessageRunTimeException;
import com.bfxy.rabbit.api.model.Message;
import com.google.common.base.Preconditions;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* $ProducerClient 发送消息的实际实现类
*/
@Component
public class ProducerClient implements MessageProducer {
@Resource
private final RabbitBroker rabbitBroker;
public ProducerClient(RabbitBroker rabbitBroker) {
this.rabbitBroker = rabbitBroker;
}
@Override
public void send(Message message) throws MessageRunTimeException {
// 避免不使用MessageBuilder构建的方式(比如直接new Message)导致的topic为空的情况
Preconditions.checkNotNull(message.getTopic());
MessageTypeEnum messageType = message.getMessageType();
switch (messageType) {
case RAPID:
rabbitBroker.rapidSend(message);
break;
case CONFIRM:
rabbitBroker.confirmSend(message);
break;
case RELIANT:
rabbitBroker.reliantSend(message);
break;
default:
break;
}
}
@Override
public void send(List<Message> messageList) throws MessageRunTimeException {
messageList.forEach(message -> {
message.setMessageType(MessageTypeEnum.RAPID);
MessageHolder.add(message);
});
rabbitBroker.sendMessages();
}
@Override
public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
// TODO
}
}
发送不同类型消息的核心类
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.model.Message;
/**
* $RabbitBroker 具体发送不同类型消息的接口
*/
public interface RabbitBroker {
void rapidSend(Message message);
void confirmSend(Message message);
void reliantSend(Message message);
void sendMessages();
}
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.model.Message;
import com.bfxy.rabbit.producer.constant.BrokerMessageConst;
import com.bfxy.rabbit.producer.enums.BrokerMessageStatusEnum;
import com.bfxy.rabbit.producer.entity.BrokerMessage;
import com.bfxy.rabbit.producer.service.MessageStoreService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* $RabbitBrokerImpl 真正的发送不同类型的消息实现类
*/
@Component
@Slf4j
public class RabbitBrokerImpl implements RabbitBroker {
// @Resource
// private RabbitTemplate rabbitTemplate;
@Resource
private RabbitTemplateContainer rabbitTemplateContainer;
@Resource
private MessageStoreService messageStoreService;
/**
* 迅速发消息
* @param message
*/
@Override
public void rapidSend(final Message message) {
message.setMessageType(MessageTypeEnum.RAPID);
sendKernel(message);
}
@Override
public void confirmSend(final Message message) {
message.setMessageType(MessageTypeEnum.CONFIRM); // RabbitTemplateContainer里对消息的confirm确认方法进行了封装
sendKernel(message);
}
@Override
public void reliantSend(final Message message) {
message.setMessageType(MessageTypeEnum.RELIANT);
// 判断数据库中该条消息记录是否存在
BrokerMessage brokerMessageDB = messageStoreService.selectByMessageId(message.getMessageId());
if (brokerMessageDB == null) {
// 1.记录数据库消息发送的日志
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setStatus(BrokerMessageStatusEnum.SENDING.getCode());
// tryCount默认是0,在最开始发送的时候不需要设置
//brokerMessage.setTryCount();
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setMessage(message);
messageStoreService.insert(brokerMessage);
}
// 2.执行真正的发送消息逻辑
sendKernel(message);
}
@Override
public void sendMessages() {
List<Message> messageList = MessageHolder.clear();
messageList.forEach(message -> {
MessageHolderAsyncQueue.submit(new Runnable() {
@Override
public void run() {
CorrelationData correlationData = new CorrelationData(String.format("%s#%s#%s", message.getMessageId(), System.currentTimeMillis(), message.getMessageType().getValue()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getRabbitTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
}
});
});
}
/**
* 发送消息的核心方法,使用异步线程池发送消息
* @param message
*/
private void sendKernel(Message message) {
AsyncBaseQueue.submit(new Runnable() {
@Override
public void run() {
CorrelationData correlationData = new CorrelationData(String.format("%s#%s#%s", message.getMessageId(), System.currentTimeMillis(), message.getMessageType().getValue()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getRabbitTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());
}
});
}
}
发送消息的异步线程池
package com.bfxy.rabbit.producer.broker;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class AsyncBaseQueue {
private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 10000;
private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE,
THREAD_SIZE,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
}
});
public static void submit(Runnable runnable) {
senderAsync.submit(runnable);
}
}
RabbitTemplate池化封装
RabbitTemplate池化封装的目的
spring注入的RabbitTemplate默认是单例的,RabbitTemplate有很多属性,如:exchange、routingKey等,故每一种exchange都对应着不同的RabbitTemplate,这样单例的RabbitTemplate性能会很慢。此时,可以将RabbitTemplate做持久化操作,以exchange为key,value为一个RabbitTemplate,做RabbitTemplate池化封装,目的是为了提高多生产者发送,提高性能。这样避免了单例性能慢的问题,也可以为某些RabbitTemplate进行制定化的需求配置,这也是一般实际工作中使用的。
rabbit-core-producer
RabbitTemplate池化类封装
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.exception.MessageRunTimeException;
import com.bfxy.rabbit.api.model.Message;
import com.bfxy.rabbit.common.convert.GenericMessageConverter;
import com.bfxy.rabbit.common.convert.RabbitMessageConvert;
import com.bfxy.rabbit.common.serializer.Serializer;
import com.bfxy.rabbit.common.serializer.SerializerFactory;
import com.bfxy.rabbit.common.serializer.impl.JacksonSerializerFactory;
import com.bfxy.rabbit.producer.enums.BrokerMessageStatusEnum;
import com.bfxy.rabbit.producer.service.MessageStoreService;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
* $RabbitTemplateContainer RabbitTemplate池化封装
* 每一个topic,对应一个RabbitTemplate
* 1.提高发送的效率
* 2.可以根据不用的需求定制化不同的RabbitTemplate,比如每一个topic都有自己的routingKey规则
*/
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
private Map<String/* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
private final Splitter splitter = Splitter.on("#");
private final SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
@Resource
private ConnectionFactory connectionFactory;
@Resource
private MessageStoreService messageStoreService;
public RabbitTemplate getRabbitTemplate(Message message) throws MessageRunTimeException {
Preconditions.checkNotNull(message);
String topic = message.getTopic();
RabbitTemplate rabbitTemplate = rabbitMap.get(topic);
if (rabbitTemplate != null) {
return rabbitTemplate;
}
log.info("#RabbitTemplateContainer.getRabbitTemplate# topic: {} is not exists, create one", topic);
RabbitTemplate newRabbitTemplate = new RabbitTemplate(connectionFactory);
newRabbitTemplate.setExchange(topic);
newRabbitTemplate.setRoutingKey(message.getRoutingKey());
newRabbitTemplate.setRetryTemplate(new RetryTemplate());
// 对于message添加序列化和反序列化方式
Serializer serializer = serializerFactory.create();
GenericMessageConverter genericMessageConverter = new GenericMessageConverter(serializer);
RabbitMessageConvert rabbitMessageConvert = new RabbitMessageConvert(genericMessageConverter);
newRabbitTemplate.setMessageConverter(rabbitMessageConvert);
MessageTypeEnum messageType = message.getMessageType();
// 只要不是迅速消息,都应该有消息的confirm确认
if (!MessageTypeEnum.RAPID.equals(messageType)) {
newRabbitTemplate.setConfirmCallback(this);
}
rabbitMap.putIfAbsent(topic, newRabbitTemplate);
return rabbitMap.get(topic);
}
/**
* 无论是confirm消息还是reliant消息,发送消息以后broker都会去回调confirm。但是confirm消息不需要入库,而reliant消息需要,故消息重发使用reliant消息
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 具体的消息应答
if (correlationData != null && StringUtils.isNotBlank(correlationData.getId())) {
List<String> strings = splitter.splitToList(correlationData.getId());
String messageId = strings.get(0);
long sendTime = Long.parseLong(strings.get(1));
String messageType = strings.get(2);
if (ack) {
// 当broker返回ack成功时,更新下日志表里对应消息发送状态为SEND_OK
// 注意:如果当前消息类型为reliant,才去数据库查找并更新
if (MessageTypeEnum.RELIANT.getValue().equals(messageType)) {
messageStoreService.updateStatusByMessageId(messageId, BrokerMessageStatusEnum.SEND_OK);
}
log.info("#RabbitTemplateContainer.confirm# send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
} else {
log.error("#RabbitTemplateContainer.confirm# send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
}
}
}
}
序列化与反序列化转换封装
rabbit-common
序列化工厂类
package com.bfxy.rabbit.common.serializer;
public interface SerializerFactory {
Serializer create();
}
序列化与反序列化接口
package com.bfxy.rabbit.common.serializer;
/**
* $Serializer 序列化和反序列化的接口
*/
public interface Serializer {
byte[] serializeRaw(Object data);
String serialize(Object data);
<T> T deserialize(String content);
<T> T deserialize(byte[] content);
}
序列化工厂实现类
package com.bfxy.rabbit.common.serializer.impl;
import com.bfxy.rabbit.api.model.Message;
import com.bfxy.rabbit.common.serializer.Serializer;
import com.bfxy.rabbit.common.serializer.SerializerFactory;
public class JacksonSerializerFactory implements SerializerFactory {
// 饿汉模式
public static final SerializerFactory INSTANCE = new JacksonSerializerFactory();
@Override
public Serializer create() {
return JacksonSerializer.createParametricType(Message.class);
}
}
序列化与反序列化实现类
package com.bfxy.rabbit.common.serializer.impl;
import java.io.IOException;
import java.lang.reflect.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.bfxy.rabbit.common.serializer.Serializer;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* $JacksonSerializer
*/
public class JacksonSerializer implements Serializer {
private static final Logger LOGGER = LoggerFactory.getLogger(JacksonSerializer.class);
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.disable(SerializationFeature.INDENT_OUTPUT);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true);
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
}
private final JavaType type;
private JacksonSerializer(JavaType type) {
this.type = type;
}
public JacksonSerializer(Type type) {
this.type = mapper.getTypeFactory().constructType(type);
}
public static JacksonSerializer createParametricType(Class<?> cls) {
return new JacksonSerializer(mapper.getTypeFactory().constructType(cls));
}
public byte[] serializeRaw(Object data) {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
LOGGER.error("序列化出错", e);
}
return null;
}
public String serialize(Object data) {
try {
return mapper.writeValueAsString(data);
} catch (JsonProcessingException e) {
LOGGER.error("序列化出错", e);
}
return null;
}
public <T> T deserialize(String content) {
try {
return mapper.readValue(content, type);
} catch (IOException e) {
LOGGER.error("反序列化出错", e);
}
return null;
}
public <T> T deserialize(byte[] content) {
try {
return mapper.readValue(content, type);
} catch (IOException e) {
LOGGER.error("反序列化出错", e);
}
return null;
}
}
消息转换类
package com.bfxy.rabbit.common.convert;
import com.bfxy.rabbit.common.serializer.Serializer;
import com.google.common.base.Preconditions;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* $GenericMessageConverter spring amqp的com.bfxy.rabbit.api.model.Message和我们自定义的message互相转换,就是一个序列化和反序列化的过程
*/
public class GenericMessageConverter implements MessageConverter {
private final Serializer serializer;
public GenericMessageConverter(Serializer serializer) {
Preconditions.checkNotNull(serializer);
this.serializer = serializer;
}
/**
* com.bfxy.rabbit.api.model.Message转换为spring amqp的message
* @param o
* @param messageProperties
* @return
* @throws MessageConversionException
*/
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(this.serializer.serializeRaw(o), messageProperties);
}
/**
* spring amqp的message转换为自定义的com.bfxy.rabbit.api.model.Message
* @param message
* @return
* @throws MessageConversionException
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.serializer.deserialize(message.getBody());
}
}
装饰者/静态代理模式封装消息转换类
package com.bfxy.rabbit.common.convert;
import com.google.common.base.Preconditions;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* $RabbitMessageConvert 设计模式-装饰者模式/静态代理
*/
public class RabbitMessageConvert implements MessageConverter {
private final GenericMessageConverter genericMessageConverter;
// private final String defaultExpire = String.valueOf(24 * 60 * 60 * 1000);
public RabbitMessageConvert(GenericMessageConverter genericMessageConverter) {
Preconditions.checkNotNull(genericMessageConverter);
this.genericMessageConverter = genericMessageConverter;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// 可以通过messageProperties添加符合自己的业务逻辑的装饰
// messageProperties.setExpiration(defaultExpire);
com.bfxy.rabbit.api.model.Message message = (com.bfxy.rabbit.api.model.Message) object;
// 设置延迟消息时间
messageProperties.setDelay(message.getDelayMills());
return this.genericMessageConverter.toMessage(object, messageProperties);
}
@Override
public com.bfxy.rabbit.api.model.Message fromMessage(Message message) throws MessageConversionException {
return (com.bfxy.rabbit.api.model.Message) this.genericMessageConverter.fromMessage(message);
}
}
从架构的视角分析可靠性消息投递(重点)
具体架构设计
可参照上面生产端如何保障消息100%的投递成功?
可靠性投递落地-集成数据源
rabbit-common
pom.xml文件添加以下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
rabbit-core-producer
建库和表语句rabbit-producer-message-schema.sql——日志表(即具体架构设计中的MSG DB)
CREATE DATABASE IF NOT EXISTS broker_message
DEFAULT CHARACTER SET utf8mb4
DEFAULT COLLATE utf8mb4_0900_ai_ci;
-- 表 broker_message.broker_message 结构
CREATE TABLE IF NOT EXISTS `broker_message` (
`message_id` varchar(128) NOT NULL,
`message` varchar(4000),
`try_count` int(4) DEFAULT 0,
`status` varchar(10) DEFAULT '',
`next_retry` datetime NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
日志表实体
package com.bfxy.rabbit.producer.entity;
import com.bfxy.rabbit.api.model.Message;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* $BrokerMessage 消息记录表实体映射
*/
@Data
public class BrokerMessage implements Serializable {
private static final long serialVersionUID = -1L;
private String messageId;
private Message message;
private Integer tryCount = 0;
private String status;
private Date nextRetry;
private Date createTime;
private Date updateTime;
}
日志表Mapper
package com.bfxy.rabbit.producer.mapper;
import java.util.Date;
import java.util.List;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import com.bfxy.rabbit.producer.entity.BrokerMessage;
@Mapper
public interface BrokerMessageMapper {
int deleteByPrimaryKey(String messageId);
int insert(BrokerMessage record);
int insertSelective(BrokerMessage record);
BrokerMessage selectByPrimaryKey(String messageId);
int updateByPrimaryKeySelective(BrokerMessage record);
int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
int updateByPrimaryKey(BrokerMessage record);
void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);
List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
}
日志表xml(resources\mapping\BrokerMessageMapper.xml)
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.bfxy.rabbit.producer.mapper.BrokerMessageMapper" >
<resultMap id="BaseResultMap" type="com.bfxy.rabbit.producer.entity.BrokerMessage" >
<id column="message_id" property="messageId" jdbcType="VARCHAR" />
<result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.bfxy.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
<result column="try_count" property="tryCount" jdbcType="INTEGER" />
<result column="status" property="status" jdbcType="VARCHAR" />
<result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
<result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
</resultMap>
<sql id="Base_Column_List" >
message_id, message, try_count, status, next_retry, create_time, update_time
</sql>
<select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
<include refid="Base_Column_List" />
from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
delete from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.bfxy.rabbit.producer.entity.BrokerMessage" >
insert into broker_message (message_id, message, try_count,
status, next_retry, create_time,
update_time)
values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.bfxy.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER},
#{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
#{updateTime,jdbcType=TIMESTAMP})
</insert>
<insert id="insertSelective" parameterType="com.bfxy.rabbit.producer.entity.BrokerMessage" >
insert into broker_message
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
message_id,
</if>
<if test="message != null" >
message,
</if>
<if test="tryCount != null" >
try_count,
</if>
<if test="status != null" >
status,
</if>
<if test="nextRetry != null" >
next_retry,
</if>
<if test="createTime != null" >
create_time,
</if>
<if test="updateTime != null" >
update_time,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
#{messageId,jdbcType=VARCHAR},
</if>
<if test="message != null" >
#{message,jdbcType=VARCHAR, typeHandler=com.bfxy.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
#{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
#{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
#{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
#{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
#{updateTime,jdbcType=TIMESTAMP},
</if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="com.bfxy.rabbit.producer.entity.BrokerMessage" >
update broker_message
<set >
<if test="message != null" >
message = #{message,jdbcType=VARCHAR, typeHandler=com.bfxy.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
try_count = #{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
status = #{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
create_time = #{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
update_time = #{updateTime,jdbcType=TIMESTAMP},
</if>
</set>
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="com.bfxy.rabbit.producer.entity.BrokerMessage" >
update broker_message
set message = #{message,jdbcType=VARCHAR, typeHandler=com.bfxy.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
try_count = #{tryCount,jdbcType=INTEGER},
status = #{status,jdbcType=VARCHAR},
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
create_time = #{createTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="changeBrokerMessageStatus" >
update broker_message bm
set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
<select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
<![CDATA[
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
and bm.next_retry < sysdate()
]]>
</select>
<select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
</select>
<update id="update4TryCount" >
update broker_message bm
set bm.try_count = bm.try_count + 1,
bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
</mapper>
jdbc相关配置(resources\rabbit-producer-message.properties)
rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://192.168.218.20:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=root
rabbit.producer.druid.jdbc.password=123456
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testOnBorrow=false
rabbit.producer.druid.jdbc.testOnReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true
数据源配置加载类
package com.bfxy.rabbit.producer.config.database;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;
@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
@Value("${rabbit.producer.druid.type}")
private Class<? extends DataSource> dataSourceType;
@Bean(name = "rabbitProducerDataSource")
@Primary
@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
public DataSource rabbitProducerDataSource() throws SQLException {
DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
return rabbitProducerDataSource;
}
public DataSourceProperties primaryDataSourceProperties(){
return new DataSourceProperties();
}
public DataSource primaryDataSource(){
return primaryDataSourceProperties().initializeDataSourceBuilder().build();
}
}
执行建表语句rabbit-producer-message-schema.sql类
package com.bfxy.rabbit.producer.config.database;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
/**
* $BrokerMessageConfiguration
* 帮我执行SQL脚本
* 帮我进行数据库表结构的创建
*/
@Configuration
public class BrokerMessageConfiguration {
@Autowired
private DataSource rabbitProducerDataSource;
@Value("classpath:rabbit-producer-message-schema.sql")
private Resource schemaScript;
@Bean
public DataSourceInitializer initDataSourceInitializer() {
System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
final DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(rabbitProducerDataSource);
initializer.setDatabasePopulator(databasePopulator());
return initializer;
}
private DatabasePopulator databasePopulator() {
final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(schemaScript);
return populator;
}
}
mybatis整合spring boot相关类
package com.bfxy.rabbit.producer.config.database;
import javax.annotation.Resource;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
@Configuration
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {
@Resource(name= "rabbitProducerDataSource")
private DataSource rabbitProducerDataSource;
@Bean(name="rabbitProducerSqlSessionFactory")
public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(rabbitProducerDataSource);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
SqlSessionFactory sqlSessionFactory = bean.getObject();
assert sqlSessionFactory != null;
sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
return sqlSessionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Bean(name="rabbitProducerSqlSessionTemplate")
public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.bfxy.rabbit.producer.config.database;
import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScannerConfig {
@Bean(name="rabbitProducerMapperScannerConfigurer")
public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
mapperScannerConfigurer.setBasePackage("com.bfxy.rabbit.producer.mapper");
return mapperScannerConfigurer;
}
}
ps:以上jdbc相关配置、数据源配置、mybatis整合springboot相关配置类都可以写入application.yml内,而为啥需要单独创建一个jdbc相关配置的文件和代码内写配置类,为了业务无感知,这里这样做的目的是不想让业务方去改。
可靠性投递落地-可靠性消息业务实现落地
rabbit-core-producer
日志表业务实体类
package com.bfxy.rabbit.producer.service;
import com.bfxy.rabbit.producer.enums.BrokerMessageStatusEnum;
import com.bfxy.rabbit.producer.entity.BrokerMessage;
import com.bfxy.rabbit.producer.mapper.BrokerMessageMapper;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
@Service
public class MessageStoreService {
@Resource
private BrokerMessageMapper brokerMessageMapper;
public int insert(BrokerMessage brokerMessage) {
return brokerMessageMapper.insert(brokerMessage);
}
public void updateStatusByMessageId(String messageId, BrokerMessageStatusEnum brokerMessageStatusEnum) {
brokerMessageMapper.changeBrokerMessageStatus(messageId, brokerMessageStatusEnum.getCode(), new Date());
}
public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatusEnum brokerMessageStatusEnum) {
return brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatusEnum.getCode());
}
public BrokerMessage selectByMessageId(String messageId) {
return brokerMessageMapper.selectByPrimaryKey(messageId);
}
public int updateTryCountByMessageId(String messageId) {
return brokerMessageMapper.update4TryCount(messageId, new Date());
}
}
消息状态枚举
package com.bfxy.rabbit.producer.enums;
/**
* $BrokerMessageStatus 消息的发送状态
*/
public enum BrokerMessageStatusEnum {
SENDING("0"),
SEND_OK("1"),
SEND_FAIL("2"),
/**
* 这个状态代表MQ Broker繁忙了,不是磁盘满了,因为磁盘满了肯定无法写入了,但是繁忙可以后续等一段时间再写入
*/
SEND_FAIL_A_MOMENT("3");
private final String code;
BrokerMessageStatusEnum(String code) {
this.code = code;
}
public String getCode() {
return code;
}
}
消息超时时间常量(即超过这个时间消息状态还没有变更重试)
package com.bfxy.rabbit.producer.constant;
/**
* $BrokerMessageConst 常量信息
*/
public interface BrokerMessageConst {
int TIMEOUT = 1;
}
可靠性投递落地-ESJOB定时任务讲解
ESJOB定时任务
ESJOB定时任务是分布式定时任务,在集群中一个job执行其它的job就无法执行,或者说这个job可以帮助我们分片处理。比如一个数据库里有100条记录,可能有10个应用程序同时启动,每个应用程序只处理其中的1/10,当然,需要合理的分片设置才可以。
ESJOB定时任务采用zk实现分布式锁,因为定时任务并发不是很高,高并发的情况下一般不会采用zk做分布式锁,可以使用redis
官网地址
https://shardingsphere.apache.org/elasticjob/
定时任务通用组件封装
为什么作定时任务通用组件封装
对ESJOB做一层更好的封装,因为现在ESJOB集成spring容器这种方式开发相对比较麻烦。
每创建一个ESJOB都需要做以下配置
-
配置文件需要对这个任务进行配置
-
再添加一个config类读取配置文件的配置注入spring容器中
希望达到的效果
希望只需要一个注解就可以帮助我们实现一个ESJOB的创建,配置文件里的配置到注解中即可在项目启动后动态生成代码
例如:
@ElasticJobConfig(name = "com.bfxy.esjob.config.MySimpleJobConfig", cron = "0/5 * * * * ?", ...)
rabbit-task
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-task</artifactId>
<name>rabbit-task</name>
<properties>
<elastic-job.version>2.1.4</elastic-job.version>
</properties>
<dependencies>
<!-- elastic-job dependency -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
自动装配类
package com.bfxy.rabbit.task.autoconfigure;
import com.bfxy.rabbit.task.parser.ElasticJobConfParser;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* $JobParserAutoConfiguration 初始化job注册中心配置成功
*/
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk", name = {"namespace", "serverLists"}, matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
public class JobParserAutoConfiguration {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(JobZookeeperProperties jobZookeeperProperties) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
zookeeperConfiguration.setBaseSleepTimeMilliseconds(jobZookeeperProperties.getBaseSleepTimeMilliseconds());
zookeeperConfiguration.setMaxSleepTimeMilliseconds(zookeeperConfiguration.getMaxSleepTimeMilliseconds());
zookeeperConfiguration.setConnectionTimeoutMilliseconds(zookeeperConfiguration.getConnectionTimeoutMilliseconds());
zookeeperConfiguration.setSessionTimeoutMilliseconds(zookeeperConfiguration.getSessionTimeoutMilliseconds());
zookeeperConfiguration.setMaxRetries(zookeeperConfiguration.getMaxRetries());
zookeeperConfiguration.setDigest(zookeeperConfiguration.getDigest());
log.info("初始化job注册中心配置成功,zkaddress:{},namespace:{}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
@Bean
public ElasticJobConfParser elasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) {
return new ElasticJobConfParser(jobZookeeperProperties, zookeeperRegistryCenter);
}
}
spring的自动装配(\resources\META-INF\spring.factories)
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.bfxy.rabbit.task.autoconfigure.JobParserAutoConfiguration
读取配置文件加载到class类
package com.bfxy.rabbit.task.autoconfigure;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Data;
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {
private String namespace;
private String serverLists;
private int maxRetries = 3;
private int connectionTimeoutMilliseconds = 15000;
private int sessionTimeoutMilliseconds = 60000;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private String digest = "";
}
自定义注解——模块装配,相当于开关
package com.bfxy.rabbit.task.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
import com.bfxy.rabbit.task.autoconfigure.JobParserAutoConfiguration;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(JobParserAutoConfiguration.class)
public @interface EnableElasticJob {
}
自定义注解——esjob配置
package com.bfxy.rabbit.task.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* $ElasticJobConfig 加上这个注解就可以生成像MySimpleJobConfig这样的类的代码
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {
String name(); //elasticjob的名称
String cron() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
boolean failover() default false;
boolean misfire() default true;
String description() default "";
boolean overwrite() default false;
boolean streamingProcess() default false;
String scriptCommandLine() default "";
boolean monitorExecution() default false;
public int monitorPort() default -1; //must
public int maxTimeDiffSeconds() default -1; //must
public String jobShardingStrategyClass() default ""; //must
public int reconcileIntervalMinutes() default 10; //must
public String eventTraceRdbDataSource() default ""; //must
public String listener() default ""; //must
public boolean disabled() default false; //must
public String distributedListener() default "";
public long startedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public long completedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
esjob配置注解解析类
package com.bfxy.rabbit.task.parser;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.util.StringUtils;
import com.bfxy.rabbit.task.annotation.ElasticJobConfig;
import com.bfxy.rabbit.task.autoconfigure.JobZookeeperProperties;
import com.bfxy.rabbit.task.enums.ElasticJobTypeEnum;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
/**
* $ElasticJobConfParser 用于解析自定义注解@ElasticJobConfig
*/
@Slf4j
public class ElasticJobConfParser implements ApplicationListener<ApplicationReadyEvent> /*在spring的所有bean加载完成后再去解析自定义注解@ElasticJobConfig*/ {
private final JobZookeeperProperties jobZookeeperProperties;
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
public ElasticJobConfParser(JobZookeeperProperties jobZookeeperProperties,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.jobZookeeperProperties = jobZookeeperProperties;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
try {
ApplicationContext applicationContext = event.getApplicationContext();
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
for(Iterator<?> it = beanMap.values().iterator(); it.hasNext();) {
Object confBean = it.next();
Class<?> clazz = confBean.getClass();
if(clazz.getName().indexOf("$") > 0) {
String className = clazz.getName();
clazz = Class.forName(className.substring(0, className.indexOf("$")));
}
// 获取接口类型 用于判断是什么类型的任务
// 这里简单认为任务类只实现一个任务类型接口,真实可能实现多个接口需要循环遍历
String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
// 获取配置项 ElasticJobConfig
ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);
String jobClass = clazz.getName();
String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
String cron = conf.cron();
String shardingItemParameters = conf.shardingItemParameters();
String description = conf.description();
String jobParameter = conf.jobParameter();
String jobExceptionHandler = conf.jobExceptionHandler();
String executorServiceHandler = conf.executorServiceHandler();
String jobShardingStrategyClass = conf.jobShardingStrategyClass();
String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
String scriptCommandLine = conf.scriptCommandLine();
boolean failover = conf.failover();
boolean misfire = conf.misfire();
boolean overwrite = conf.overwrite();
boolean disabled = conf.disabled();
boolean monitorExecution = conf.monitorExecution();
boolean streamingProcess = conf.streamingProcess();
int shardingTotalCount = conf.shardingTotalCount();
int monitorPort = conf.monitorPort();
int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();
// 先把当当网的esjob的相关configuration实例化出来,再集成spring
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
.build();
// 我到底要创建什么样的任务.
JobTypeConfiguration typeConfig = null;
if (ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
}
if (ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
}
if (ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
}
// LiteJobConfiguration
LiteJobConfiguration jobConfig = LiteJobConfiguration
.newBuilder(typeConfig)
.overwrite(overwrite)
.disabled(disabled)
.monitorPort(monitorPort)
.monitorExecution(monitorExecution)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.jobShardingStrategyClass(jobShardingStrategyClass)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.build();
// 创建一个Spring的beanDefinition
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.setScope("prototype"); // 因为一个job是一个config文件故是多例
// 1.添加bean构造参数,相当于添加自己的真实的任务实现类
if (!ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
factory.addConstructorArgValue(confBean);
}
// 2.添加注册中心
factory.addConstructorArgValue(this.zookeeperRegistryCenter);
// 3.添加LiteJobConfiguration
factory.addConstructorArgValue(jobConfig);
// 4.如果有eventTraceRdbDataSource 则也进行添加,相当于添加了JobEventConfiguration数据原配置
if (StringUtils.hasText(eventTraceRdbDataSource)) {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
}
// 5.添加监听,相当于SimpleJobListener
List<?> elasticJobListeners = getTargetElasticJobListeners(conf);
factory.addConstructorArgValue(elasticJobListeners);
// 接下来就是把factory 也就是 SpringJobScheduler注入到Spring容器中
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
String registerBeanName = conf.name() + "SpringJobScheduler";
defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
scheduler.init();
log.info("启动elastic-job作业: " + jobName);
}
log.info("共计启动elastic-job作业数量为: {} 个", beanMap.values().size());
} catch (Exception e) {
log.error("elasticjob 启动异常, 系统强制退出", e);
System.exit(1);
}
}
private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConfig conf) {
List<BeanDefinition> result = new ManagedList<>(2);
String listeners = conf.listener();
if (StringUtils.hasText(listeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
factory.setScope("prototype");
result.add(factory.getBeanDefinition());
}
String distributedListeners = conf.distributedListener();
long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();
if (StringUtils.hasText(distributedListeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
factory.setScope("prototype");
factory.addConstructorArgValue(startedTimeoutMilliseconds);
factory.addConstructorArgValue(completedTimeoutMilliseconds);
result.add(factory.getBeanDefinition());
}
return result;
}
}
esjob任务类型枚举类
package com.bfxy.rabbit.task.enums;
public enum ElasticJobTypeEnum {
SIMPLE("SimpleJob", "简单类型job"),
DATAFLOW("DataflowJob", "流式类型job"),
SCRIPT("ScriptJob", "脚本类型job");
private String type;
private String desc;
private ElasticJobTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
定时任务通用组件封装测试——rabbit-esjob-test
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-esjob-test</artifactId>
<name>rabbit-esjob-test</name>
<dependencies>
<dependency>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-task</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
启动类Application
package com.bfxy.rabbit;
import com.bfxy.rabbit.task.annotation.EnableElasticJob;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@EnableElasticJob
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
项目配置文件application.properties
server.servlet.context-path=/esjob/test
server.port=8001
spring.application.name=rabbit-esjob-test
elastic.job.zk.serverLists=192.168.218.21:2181,192.168.218.22:2181,192.168.218.23:2181
elastic.job.zk.namespace=elastic-job
测试类
package com.bfxy.rabbit.esjob.test;
import com.bfxy.rabbit.task.annotation.ElasticJobConfig;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.stereotype.Component;
@Component
@ElasticJobConfig(
name = "com.bfxy.rabbit.esjob.test.DemoJob",
cron = "0/10 * * * * ?",
description = "样例定时任务",
overwrite = true,
shardingTotalCount = 2
)
public class DemoJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.err.println("执行Demo job.");
}
}
![微信图片_20221119231450](.\RabbitMQ\微信图片_20221119231450.png)![QQ图片20230312204736](.\RabbitMQ\QQ图片20230312204736.png)![QQ图片20230312204736](.\RabbitMQ\QQ图片20230312204736.png)![QQ图片20230312204736](.\RabbitMQ\QQ图片20230312204736.png)package com.bfxy.rabbit.esjob.test;
import com.bfxy.rabbit.task.annotation.ElasticJobConfig;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.stereotype.Component;
@Component
@ElasticJobConfig(
name = "com.bfxy.rabbit.esjob.test.TestJob",
cron = "0/5 * * * * ?",
description = "测试定时任务",
overwrite = true,
shardingTotalCount = 5
)
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.err.println("执行Test job.");
}
}
测试结果
可靠性消息重试实现集成定时任务组件
rabbit-common
数据库消息对象转换类
package com.bfxy.rabbit.common.mybatis.handler;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import com.bfxy.rabbit.api.model.Message;
import com.bfxy.rabbit.common.util.FastJsonConvertUtil;
/**
* <B>系统名称:</B><BR>
* <B>模块名称:</B><BR>
* <B>中文类名:</B><BR>
* <B>概要说明:</B><BR>
* @author BuFanxueyuan
* @since 2018年5月15日 下午2:35:54
*/
public class MessageJsonTypeHandler extends BaseTypeHandler<Message> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, Message parameter,
JdbcType jdbcType) throws SQLException {
ps.setString(i, FastJsonConvertUtil.convertObjectToJSON(parameter));
}
@Override
public Message getNullableResult(ResultSet rs, String columnName)
throws SQLException {
String value = rs.getString(columnName);
if(null != value && !StringUtils.isBlank(value)) {
return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnName), Message.class);
}
return null;
}
@Override
public Message getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
String value = rs.getString(columnIndex);
if(null != value && !StringUtils.isBlank(value)) {
return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnIndex), Message.class);
}
return null;
}
@Override
public Message getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
String value = cs.getString(columnIndex);
if(null != value && !StringUtils.isBlank(value)) {
return FastJsonConvertUtil.convertJSONToObject(cs.getString(columnIndex), Message.class);
}
return null;
}
}
java对象与json进行转换的通用工具类
package com.bfxy.rabbit.common.util;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
/**
* $FastJsonConvertUtil java对象与json进行转换的通用工具类
* @author Alienware
*
*/
public class FastJsonConvertUtil {
private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
/**
* <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
* <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
* @param data JSON字符串
* @param clzss 转换对象
* @return T
*/
public static <T> T convertJSONToObject(String data, Class<T> clzss) {
try {
T t = JSON.parseObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
* <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
* @param data JSONObject对象
* @param clzss 转换对象
* @return T
*/
public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
try {
T t = JSONObject.toJavaObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
* <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
* @param data JSON字符串数组
* @param clzss 转换对象
* @return List<T>集合对象
*/
public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
try {
List<T> t = JSON.parseArray(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
* <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
* @param data List<JSONObject>
* @param clzss 转换对象
* @return List<T>集合对象
*/
public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
try {
List<T> t = new ArrayList<T>();
for (JSONObject jsonObject : data) {
t.add(convertJSONToObject(jsonObject, clzss));
}
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B>将对象转为JSON字符串<BR>
* <B>概要说明:</B>将对象转为JSON字符串<BR>
* @param obj 任意对象
* @return JSON字符串
*/
public static String convertObjectToJSON(Object obj) {
try {
String text = JSON.toJSONString(obj);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B>将对象转为JSONObject对象<BR>
* <B>概要说明:</B>将对象转为JSONObject对象<BR>
* @param obj 任意对象
* @return JSONObject对象
*/
public static JSONObject convertObjectToJSONObject(Object obj){
try {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* <B>方法名称:</B><BR>
* <B>概要说明:</B><BR>
* @param obj
* @return
*/
public static String convertObjectToJSONWithNullValue(Object obj) {
try {
String text = JSON.toJSONString(obj, featuresWithNullValue);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
System.err.println(System.getProperties());
}
}
rabbit-core-producer
pom.xml文件引入esjob组件rabbit-task依赖
<dependency>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-task</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
RabbitProducerAutoConfiguration类添加@EnableElasticJob注解
定义定时任务将需要重新发送的消息抓取发送到RabbitMQ上
package com.bfxy.rabbit.producer.task;
import com.bfxy.rabbit.producer.broker.RabbitBroker;
import com.bfxy.rabbit.producer.enums.BrokerMessageStatusEnum;
import com.bfxy.rabbit.producer.entity.BrokerMessage;
import com.bfxy.rabbit.producer.service.MessageStoreService;
import com.bfxy.rabbit.task.annotation.ElasticJobConfig;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Component
@ElasticJobConfig(
name = "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
cron = "0/10 * * * * ?",
description = "可靠性投递消息补偿任务",
overwrite = true,
shardingTotalCount = 1 // 这里因为测试只有一张表,但是真正生产环境可能有多张表,那这里的分片数就需要设置生多个,从这些表拉取数据,比如100张表,10个分片
)
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage> {
/**
* 最大重试次数
*/
private static final int MAX_RETRY_COUNT = 3;
@Resource
private MessageStoreService messageStoreService;
@Resource
private RabbitBroker rabbitBroker;
@Override
public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
List<BrokerMessage> brokerMessageList = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatusEnum.SENDING);
log.info("----------@@@@@@ 抓取数据集合,数量:{} @@@@@@----------", brokerMessageList.size());
return brokerMessageList;
}
@Override
public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
dataList.forEach( brokerMessage -> {
String messageId = brokerMessage.getMessageId();
if (brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
messageStoreService.updateStatusByMessageId(messageId, BrokerMessageStatusEnum.SEND_FAIL);
log.warn("-----消息设置为最终失败,消息ID:{}-----", messageId);
} else {
// 重发逻辑
// 每次重发的时候要更新下try_count字段
messageStoreService.updateTryCountByMessageId(messageId);
// 注意:无论是confirm消息还是reliant消息,发送消息以后broker都会去回调confirm。但是confirm消息不需要入库,而reliant消息需要,故消息重发使用reliant消息
rabbitBroker.reliantSend(brokerMessage.getMessage());
}
});
}
}
可靠性消息最终演示
rabbit-test
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.bfxy.base.rabbit</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbit-test</artifactId>
<name>rabbit-test</name>
<dependencies>
<dependency>
<groupId>com.bfxy.base.rabbit</groupId>
<artifactId>rabbit-core-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
项目配置文件application.properties
server.servlet.context-path=/test
server.port=8002
spring.application.name=rabbit-test
spring.rabbitmq.addresses=192.168.218.21:5672,192.168.218.22:5672,192.168.218.23:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.auto-startup=false
elastic.job.zk.serverLists=192.168.218.21:2181,192.168.218.22:2181,192.168.218.23:2181
elastic.job.zk.namespace=elastic-job
主启动类
package com.bfxy.rabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
主配置类
package com.bfxy.rabbit;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.bfxy.rabbit.*"})
public class MainConfig {
}
测试类
package com.bfxy.rabbit;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.model.Message;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.rabbit.producer.broker.ProducerClient;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private ProducerClient producerClient;
@Test
public void testSendMessage() throws Exception {
for(int i = 0 ; i < 1; i ++) {
String uniqueId = UUID.randomUUID().toString();
Map<String, Object> attributes = new HashMap<>();
attributes.put("name", "张三");
attributes.put("age", "18");
Message message = new Message(uniqueId, "exchange-1", "springboot.abc", attributes, 0); // 可以通过修改成一个不存在的exchange来测试消息发送失败的情况
message.setMessageType(MessageTypeEnum.RELIANT);
producerClient.send(message);
}
Thread.sleep(100000);
}
}
批量消息发送封装
前景
因为RabbitMQ本身是不支持批量发送消息的,故可以将该请求中的消息放入ThreadLocal
中,提交后将ThreadLocal
里的消息都取出再循环的发送即可
rabbit-api
MessageHolder
package com.bfxy.rabbit.producer.broker;
import com.bfxy.rabbit.api.model.Message;
import com.google.common.collect.Lists;
import java.util.List;
public class MessageHolder {
private final List<Message> messageList = Lists.newArrayList();
@SuppressWarnings({"rawtypes", "unchecked"})
public static final ThreadLocal<MessageHolder> holder = new ThreadLocal() {
@Override
protected Object initialValue() {
return new MessageHolder();
}
};
public static void add(Message message) {
holder.get().messageList.add(message);
}
public static List<Message> clear() {
List<Message> tmp = Lists.newArrayList(holder.get().messageList);
holder.remove();
return tmp;
}
}
批量发送消息方法ProducerClient类
@Override
public void send(List<Message> messageList) throws MessageRunTimeException {
messageList.forEach(message -> {
message.setMessageType(MessageTypeEnum.RAPID);
MessageHolder.add(message);
});
rabbitBroker.sendMessages();
}
真正发消息的方法RabbitBrokerImpl类
@Override
public void sendMessages() {
List<Message> messageList = MessageHolder.clear();
messageList.forEach(message -> {
MessageHolderAsyncQueue.submit(new Runnable() {
@Override
public void run() {
CorrelationData correlationData = new CorrelationData(String.format("%s#%s#%s", message.getMessageId(), System.currentTimeMillis(), message.getMessageType().getValue()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getRabbitTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
}
});
});
}
新的异步队列类
package com.bfxy.rabbit.producer.broker;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class MessageHolderAsyncQueue {
private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 10000;
private static final ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE,
THREAD_SIZE,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
}
});
public static void submit(Runnable runnable) {
senderAsync.submit(runnable);
}
}
延迟消息应用与封装
实现方式
使用rabbitmq的延迟插件:rabbitmq_delayed_message_exchange-0.0.1.ez
下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装启动
## 1.进入rabbitmq插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.4/plugins
## 2.使用服务器文件传输工具传输即可
## 3.启动
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
测试(控制台创建exchange时多了一种x-delayed-message类型即安装成功)
创建一个延迟交换机(必须设置扩展参数x-delayed-type,值必须为topic)
创建一个延迟队列
将交换机和队列通过路由键绑定
测试发送一条延迟消息(扩展参数x-delay是必须的,值单位是毫秒)
rabbit-common
装饰者/静态代理模式封装消息转换类——RabbitMessageConvert设置
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// 可以通过messageProperties添加符合自己的业务逻辑的装饰
// messageProperties.setExpiration(defaultExpire);
com.bfxy.rabbit.api.model.Message message = (com.bfxy.rabbit.api.model.Message) object;
// 设置延迟消息时间
messageProperties.setDelay(message.getDelayMills());
return this.genericMessageConverter.toMessage(object, messageProperties);
}
rabbit-test
测试类
package com.bfxy.rabbit;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.bfxy.rabbit.api.enums.MessageTypeEnum;
import com.bfxy.rabbit.api.model.Message;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.rabbit.producer.broker.ProducerClient;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private ProducerClient producerClient;
@Test
public void testSendMessageDelay() throws Exception {
for(int i = 0 ; i < 1; i ++) {
String uniqueId = UUID.randomUUID().toString();
Map<String, Object> attributes = new HashMap<>();
attributes.put("name", "张三");
attributes.put("age", "18");
Message message = new Message(uniqueId, "delay-exchange", "delay.abc", attributes, 20000); // 延迟20秒发送
message.setMessageType(MessageTypeEnum.RELIANT);
producerClient.send(message);
}
Thread.sleep(100000);
}
}
附git项目地址
https://gitee.com/chiyutoBeBest/rabbit-parent.git
总结与复习
个人遇到的面试题
Q:RabbitMQ的延迟消息如何实现?
A:主要有两种实现方式
-
实现方式一:队列ttl+死信exchange
- 使用两个队列,一个队列接收消息不消费,等待指定时间后消息死亡,再由该队列绑定的死信exchange再次将其路由到另一个队列提供业务消费。
-
实现方式二:rabbitmq延时插件
- 具体看延迟消息应用与封装
Q:简述下RabbitMQ的消息传输协议AMQP
A:AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
说简单点就是在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列(MQ),而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议同时规定了消息的格式和工作方式。
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
具体看RabbitMQ核心概念
Q:RabbitMQ的消息确认机制
A:具体看Rabbitmq高级特性-生产端特性_确认机制和返回机制
Q:RabbitMQ实现可靠性投递
A:具体看Rabbitmq高级特性-生产端可靠性投递与消费端幂等性