1.消息模式 P2P Worker Pub/Sub(exchange 4种)
四种交换机:direct/topic/headers/fanout,默认交换机是direct,其中Publish/Subscribe,Routing,Topics三种模式可以统一归为Exchange模式,只是创建时交换机的类型不一样,分别是fanout、direct、topic。
Hello-World 简单队列 P2P
一个生产者,一个默认的交换机(direct),一个队列,一个消费者
Work 工作队列
一个生产者,一个默认的交换机(direct),一个队列,多个消费者,默认采用公平分发(例如10条消息,每个消费者接收5条)
Publish/Subscribe 发布订阅模式
一个生产者,一个交换机(fanout),多个队列,多个消费者
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都在交换机上绑定
(5)消息通过交换机到达每个消费者的队列
Routing 路由模式
一个生产者,一个交换机(directExchange),多个队列,多个消费者,但是通过指定路由key绑定到队列上,完成指定生产者消费者的通信。
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
Topic 主题模式
一个生产者,一个交换机(TopicExchange),多个队列,多个消费者
又称通配符模式(可以理解为模糊匹配,路由模式相当于精确匹配),此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。
*号代表一个单词
#号代表0个或多个单词
2.rabbitMQ防止消息丢失
有三个场景下是会发生消息丢失的:
存储在队列中,如果队列没有对消息持久化,RabbitMQ服务器宕机重启会丢失数据。
生产者发送消息到RabbitMQ服务器过程中,RabbitMQ服务器如果宕机停止服务,消息会丢失。
消费者从RabbitMQ服务器获取队列中存储的数据消费,但是消费者程序出错或者宕机而没有正确消费,导致数据丢失。
针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。消息持久化:需要设置Exchange为持久化和Queue持久化,这样当消息发送到RabbitMQ服务器时,消息就会持久化。
四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,Exchange为持久化:
从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。
Queue持久化:
也是通过durable参数设置是否持久化,默认是true。
3.RabbitMQ消息确认机制
RabbitMQ的事务机制是同步操作,会极大的降低RabbitMQ的性能,所以推出了confirm机制
confirm机制 :
publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。//开启confirm
连接对象调用channel.confirmSelect()方法
//确定批量操作是否成功
channel.waitForConfirmsOrDie();// 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常//开启异步回调 就是return机制
channel.addConfirmListener()Return机制:
采用Return机制来监听消息是否从exchange送到了指定的queue中
开启Return机制,在发送消息时,指定mandatory参数为true
channel.basicPublish(“”,“HelloWorld”**,true,**null,msg.getBytes());
4.ack
spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除队列中的消息,这样会存在一些问题:如果消费者处理消息需要较长时间,或者在消费消息的时候出现异常,都会出现问题,手动Ack可以避免消息重复消费。
//手动Ack,确定消费消息
//deliveryTag:该消息的index
//multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);