可以结合着狂神的RabbitMQ的笔记来进行学习
狂神说RabbitMQ笔记
RabbitMQ高级特性
消息可靠性投递
保证我发出的消息可以到达中间件,避免在传输的过程中发生丢失的情况。
这两个可靠性传输方式分别是负责不同的阶段,confirm是负责保证从生产者到队列的传输可靠性,而return是保证从队列到消费者的传输可靠性。
confirm确认模式
代码实现
这里以SpringBoot为例,以上面的生产者为例进行修改
- 修改yml文件
因为是这种配置需要确认模式的开启
- 在MQ的配置类中提前定义好交换机和队列
@Configuration
public class RabbitMQConfig {
//定义两个交换机的名称
public static final String CONFIRM_EXCHANGE_NAME = "confirmExchange";
public static final String CONFIRM_QUEUE_NAME = "confirmQueue";
@Bean("confirmExchange")
public Exchange confirmExchange(){
/*
定义一个Confirm模式专用的交换机
*/
return ExchangeBuilder.topicExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();
}
@Bean("confirmQueue")
public Queue confirmQueue(){
/*
定义一个Confirm模式专用的队列
*/
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 绑定交换机和队列
*/
@Bean
public Binding bindConfirmQueueExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){
//这里通过调用绑定器BindingBuilder来绑定队列和交换机、并且因为交换机是topic模式,要配好通配符
//BindingBuilder.绑定(参数列表的队列).和哪个交换机绑定(参数列表的交换机).绑定规则(RoutingKey通配符).没有参数()
return BindingBuilder.bind(queue).to(exchange).with("confirm.#").noargs();
}
}
- rabbitTemplate定义ConfirmCallBack回调函数,等待消息发送到交换机,这个期间发送只要是成功了,就会执行这个重写的方法。
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConfirmTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//发送消息convertAndSend(交换机名称,RoutingKey,想要发送的消息)
rabbitTemplate.convertAndSend("confirmExchange","confirmQueue","确认模式测试");
}
}
运行即可,新的交换机以及队列会在RabbitMQ中生成,发送消息,可以根据发送情况进行判断,会回调运行一个方法。
也就是说,运行完convertAndSend
发送消息之后,会回调运行setConfirmCallback
以及里面重写的confirm
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConfirmTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 相关配置信息
* @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被执行了....");
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
//发送消息convertAndSend(交换机名称,RoutingKey,想要发送的消息)
rabbitTemplate.convertAndSend("confirmExchange","confirmQueue","确认模式测试");
}
}
运行代码,MQ中新增交换机和队列,发送消息后会运行回调方法,根据回调方法里的ack状态,进行一定的操作。
return退回模式
和上面的confirm类似,confirm是负责保证从生产者到队列的传输可靠性,而return是保证从队列到消费者的传输可靠性。
代码实现
-
修改yml文件
因为是这种配置需要回退模式模式的开启
-
在MQ的配置类中提前定义好交换机和队列
参考上一个的操作,这里不赘述了。
这里定义好交换机名称为returnExchange
、队列为returnQueue
、RoutingKey为returnKey
PS:Queue不是那么重要,他已经和交换机绑定好了 -
Return测试类以及处理方法
@SpringBootTest
@RunWith(SpringRunner.class)
public class ReturnTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
@Test
public void testReturn() {
//设置交换机处理失败消息的模式,如果失败,会走下面的这个匿名内部类重写的方法
rabbitTemplate.setMandatory(true);
//设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
//发送消息convertAndSend(交换机名称,RoutingKey,想要发送的消息)
rabbitTemplate.convertAndSend("returnExchange", "returnKey", "message confirm....");
}
}
运行代码,因为没有消费者,所以等同于发送失败,因此会运行回调函数里的方法。
总结
Consumer ACK
Consumer ACK都是用于消费端
代码实现
代码实现都是在消费者端进行
Consumer ACK机制:
- 设置手动签收。acknowledge=“manual”
- 让监听器类实现ChannelAwareMessageListener接口
- 如果消息成功处理,则调用channel的 basicAck()签收
- 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer ,直到重新接收
-
修改yml文件为手动接收消息
-
创建一个类实现
ChannelAwareMessageListener
的接口,再重写
代码实现:
/**
* Consumer ACK机制:
* 1. 设置手动签收。acknowledge="manual"
* 2. 让监听器类实现ChannelAwareMessageListener接口
* 3. 如果消息成功处理,则调用channel的 basicAck()签收,签收就不会返还给队列了
* 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer。直到能够正确消费
*
*
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误,这个时候消息就会返还给队列,队列重新发给消费者
//3. 手动签收,签收后不再返回
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收,按照API的参数,决定是否返还给队列进行下一步重发
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);
}
}
}
代码运行,在人为错误的地方,进入了异常处理,异常处理中进行了返还队列。队列返还之后重发消费者,直到可以正常消费。
小结
总结
消费端限流
顾名思义,从消费端做出努力进行限流,消费端每秒拉取的消息数量控制在某个数值以下
消费者宕机过程中MQ上囤积大量消息,重启消费者服务后消息瞬间涌入,造成消息消费服务压力剧增,因此大流量下消息消费端需要进行限流设置
在非自动确认消息的前提下,如果一定数量的消息(基于Consumer和Channel设置QOS的值)没有被确认,将不进行消费新的消息,具体API与《轮训分发-不公平分发》类似,原理类似令牌桶算法
PS:QOS(Quility of Service),服务质量保证
代码实现
非常简单:
-
yml开启限流
-
生产者
新建队列以及交换机
public static final String Qos_EXCHANGE_NAME = "qosExchange";
public static final String Qos_QUEUE_NAME = "qosQueue";
@Bean("qosExchange")
public Exchange qosExchange(){
/*
定义一个Confirm模式专用的交换机
*/
return ExchangeBuilder.topicExchange(Qos_EXCHANGE_NAME).durable(true).build();
}
@Bean("qosQueue")
public Queue qosQueue(){
/*
定义一个Confirm模式专用的队列
*/
return QueueBuilder.durable(Qos_QUEUE_NAME).build();
}
/**
* 绑定交换机和队列
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindQosQueueExchange(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){
//这里通过调用绑定器BindingBuilder来绑定队列和交换机、并且因为交换机是topic模式,要配好通配符
//BindingBuilder.绑定(参数列表的队列).和哪个交换机绑定(参数列表的交换机).绑定规则(RoutingKey通配符).没有参数()
return BindingBuilder.bind(queue).to(exchange).with("qos.#").noargs();
}
发送消息,一次性发送10条
@SpringBootTest
@RunWith(SpringRunner.class)
public class QosProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageToQos(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("确认模式方法运行...");
}
});
for (int i = 0; i < 10; i++) {
//发送消息convertAndSend(交换机名称,RoutingKey,想要发送的消息),一次性10条
rabbitTemplate.convertAndSend("qosExchange","qos.test","第"+i+"条消息");
}
}
}
运行代码,来到管理页面
- 消费者
消费者端要完成限流操作,并且进行手动签收,但是具体的限流操作没有代码,配置好就可以了。
每秒拉取1个消息,这里sleep是让效果更加明显。
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
// ....
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
运行代码,不难发现,管理台中的Ready的消息以预定好的每秒1个消息的速度进行递减,当然,这个拉取的速度可以自行配置
如果没有这个限流机制,一旦重启服务关闭了限流,大量的没有消费的消息会一次性直接打入消费端,消费者端直接宕机。
小结
TTL
相当于自动清除的时间,避免积压过久。统一设置时间然后到时间了就过期。
一般过期分为两种,一种是队列统一过期,另一种是消息单独过期
内容挺多,不赘述了,这个写的挺好,可以看看
SpringBoot整合TTL,设置TTL时间
注意,这里人为的创造了没有消费者的条件,所有的消息都堆积在队列中,没有消费者消费,直到触发TTL
队列统一过期
这里只演示队列的设置,其他的包括消息发送的内容和之前的无异。在队列创建好的时候就规定好过期时间,只要消费端一直没有消费者把这个消息消费了,只要时间一到,就过期
//创建队列
@Bean(value = "getTtlQueue")
public Queue getTtlQueue(){
// 设置 ttl 队列,并设定 x-message-ttl 参数,表示 消息存活最大时间,单位 ms
//return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build();
Map<String, Object> arguments = new HashMap<>();
//设置10s过期,单位是毫秒
arguments.put("x-message-ttl",10000);
return new Queue(ttl_queue_name,true,false,false,arguments);
}
运行代码,因为设置了TTL,而消息推送到交换机的间隔是8s,过期TTL是10s,所以,任何一个时间点,只要这个消息仍然留存在队列中,这个队列里就一直只剩1个消息。
消息单独过期
消息单独过期是在发送时处理要发送的消息,把要发送消息设置好过期时间,只要消费端一直没有消费者把这个消息消费了,只要时间一到,就过期
//发送10条消息
for (int i = 0; i < 10; i++) {
String msg = "msg"+i;
System.out.println("发送消息 msg:"+msg);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("5000"); // 针对消息设定时限,单个消息TTL为5s
// 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中
Message message = new Message(msg.getBytes(), messageProperties);
// 正常的发送消息即可,不过这次传输的消息是特殊处理过(设置过TTL的)
rabbitTemplate.convertAndSend(RabbitMQConfig.ttl_exchange_name,RabbitMQConfig.ttl_routing_key,message);
//每两秒发送一次
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
死信队列概述(基于RabbitMQ实现)
综述
消息如何成为死信
如果消费者消费时间超过ttl,那么消息就不会被消费者消费,从而变成死信。
这个时候死信就会被送到死信交换机放入死信队列来进行处理。
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
死信队列
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange
和 x-dead-letter-routing-key
小结
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队
列 - 消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,并且不重回队列;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
死信队列实现
一句话概括,正常队列里的消息时间达到TTL之后,把消息带着RoutingKey通过死信交换机送入死信队列
思路如下:
- 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
- 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
- 正常队列绑定死信交换机
设置两个参数:
x-dead-letter-exchange:死信交换机名称
x-dead-letter-routing-key:发送给死信交换机的routingkey
内容挺多,不赘述了,这个写的挺好,可以看看
SpringBoot实现死信队列
编写配置类
编写配置类,主要是创建普通队列(同时设置死信处理方案),普通交换机。绑定普通队列和交换机。创建死信队列,死信交换机,绑定死信队列和交换机。
大致的思路:
- 1、消息队列分为正常交换机、正常消息队列;以及死信交换机和死信队列。
- 2、正常队列针对死信信息,需要将数据 重新 发送至死信交换机中。
关键的配置基本都在这里了
/**
* 死信队列配置
*/
@Configuration
public class RabbitMQDeadMsgConfig {
// 定义正常交换机和正常队列信息(交换机名、队列名、路由key)
public static final String queue_name = "xj_natural_queue";
public static final String exchange_name = "xj_natural_exchange";
public static final String routing_key = "xj_natural_routingKey";
// 定义死信交换机名、死信队列名、路由key
public static final String queue_name_dead = "xj_dead_queue";
public static final String exchange_name_dead = "xj_dead_exchange";
public static final String routing_key_dead = "xj_dead_routingKey";
//==================================正常交换机部分=================================
/**
* 设置正常的消息队列;
* 正常的消息队列具备以下几种功能:
* 1、消息正常消费,需要绑定对应的消费者(这里为了测试死信,不创建消费者)
* 2、当消息失效后,需要将指定的消息发送至 死信交换机 中
* @return
*/
@Bean(value = "getNaturalQueue")
public Queue getNaturalQueue(){
return QueueBuilder.durable(queue_name)
// 正常的队列,在消息失效后,需要将消息丢入 死信 交换机中,这里都是消息达到TTL之后要去的地方
// 这里只需要针对名称进行绑定
.withArgument("x-dead-letter-exchange",exchange_name_dead)
// 丢入 死信交换机,需要设定指定的 routingkey
.withArgument("x-dead-letter-routing-key",routing_key_dead)
// 设置正常队列中消息的存活时间为 10s,当然也可以针对单个消息进行设定不同的过期时间
.withArgument("x-message-ttl",10000)
// 设定当前队列中,允许存放的最大消息数目
.withArgument("x-max-length",10)
//创建
.build();
}
/**
* 设定正常的消息交换机
* @return
*/
@Bean(value = "getNaturalExchange")
public Exchange getNaturalExchange(){
// 这里为了测试,采取 direct exchange
return ExchangeBuilder.directExchange(exchange_name)
.durable(true) // 设定持久化
.build();
}
/**
* 将正常的消息交换机和正常的消息队列进行绑定
* @param queue
* @param directExchange
* @return
*/
@Bean
public Binding bindNaturalExchangeAndQueue(@Qualifier(value = "getNaturalQueue") Queue queue,
@Qualifier(value = "getNaturalExchange") Exchange directExchange){
return BindingBuilder
// 绑定正常的消息队列
.bind(queue)
// 至指定的消息交换机
.to(directExchange)
// 匹配 routingkey
.with(routing_key)
// 无参数,不加会报错提示
.noargs();
}
//=================================死信处理部分====================================
/**
* 定义死信队列
* @return
*/
@Bean(value = "getDealQueue")
public Queue getDealQueue(){
return QueueBuilder.durable(queue_name_dead).build();
}
/**
* 定义死信交换机
* @return
*/
@Bean(value = "getDeadExchange")
public Exchange getDeadExchange(){
return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build();
}
/**
* 将死信交换机和死信队列进行绑定
* @param deadQueue
* @param directDeadExchange
* @return
*/
@Bean
public Binding bindDeadExchangeAndQueue(@Qualifier(value = "getDealQueue") Queue deadQueue,
@Qualifier(value = "getDeadExchange") Exchange directDeadExchange){
return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs();
}
}
- 编写消息发送服务,将
convertAndSend
方法进行二次封装。服务中设置手动确认,回调方法等等。
@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 直接发送消息
* @param exchange
* @param routingKey
* @param msg
*/
public void sendMessage(String exchange,String routingKey,Object msg) {
// 设置交换机处理失败消息的模式 true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
// 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
rabbitTemplate.setMandatory(true);
//消息消费者确认收到消息后,手动ack回执
rabbitTemplate.setConfirmCallback(this);
// return 配置
rabbitTemplate.setReturnCallback(this);
//发送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
/**
* 交换机并未将数据丢入指定的队列中时,触发
* channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
* 参数三:true 表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
}
/**
* 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
* @param correlationData 相关配置信息
* @param ack exchange 交换机,判断交换机是否成功收到消息 true 表示交换机收到
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause));
log.info("correlationData -->"+correlationData.toString());
if(ack){
// 交换机接收到
log.info("---- confirm ----ack==true cause="+cause);
}else{
// 没有接收到
log.info("---- confirm ----ack==false cause="+cause);
}
}
}
- 发送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class DeadMsgTest {
@Autowired
private RabbitmqService rabbitmqService;
@Test
public void sendMessage() throws InterruptedException {
// 向正常的消息队列中丢数据,测试限定时间未消费后,死信队列的情况
// 配置文件中,针对于正常队列而言,设置有10条上限大小
for (int i = 0; i < 20; i++) {
String msg = "dead msg test "+i;
// 向正常的消息交换机中传递数据
rabbitmqService.sendMessage(RabbitMQDeadMsgConfig.exchange_name,RabbitMQDeadMsgConfig.routing_key,msg);
TimeUnit.SECONDS.sleep(2);
}
}
}
执行消费者
执行生产者,等待10秒钟后查看控制台,消息达到TTL之后,被拒绝接收,进入死信队列
运行死信队列对应的消费者,死信队列消息被成功消费
延迟队列(基于RabbitMQ实现)
就是把上面的死信队列给改一下,在队列中加个TTL就是延迟队列了
延迟队列是干嘛的?
延时队列是存储延时消息的队列,延时消息就是生产者发送了一条消息,但是不希望该消息不要被立即消费,而是设置一个延时时间,等过了这个时间再消费消息
延迟队列概念
延迟队列本身只是一个概念,在RabbitMQ中并没有直接的延迟队列的实现
想要实现延迟队列,就可以用TTL+死信队列来实现
代码实现
延迟队列就是通过先将消息放到死信队列中,放到触发TTL,TTL触发后交由死信交换机,死信交换机把消息交给目标消费者监听的队列进行消费。
- RabbitMQ配置类中新建DLX相关队列以及交换机,以及绑定
- 发送消息,等待消息在普通队列中到达TTL,TTL过后送至死信交换机,死信交换机将消息送至目标队列
RabbitMQ延时队列的实现
日志与监控
日志
这个用的一般不多,这个是指安装在Linux系统上的RabbitMQ的位置
监控
web监控地址:http://localhost:15672/#/
按照默认的账号密码登陆,账号密码都是guest
rabbitmqctl管理和监控
在Linux可以用的命令,进入软件后即可用命令行进行查看,当然,这些了解即可,因为这些东西在刚刚的web页面也都可以看到
查看队列
查看队列
# rabbitmqctl list_queues
查看exchanges
# rabbitmqctl list_exchanges
查看用户
# rabbitmqctl list_users
查看连接
# rabbitmqctl list_connections
查看消费者信息
# rabbitmqctl list_consumers
查看环境变量
# rabbitmqctl environment
查看未被确认的队列
# rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用
# rabbitmqctl list_queues name memory
查看准备就绪的队列
# rabbitmqctl list_queues name messages_ready
消息可靠性分析与追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
消息追踪-Firehose
firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令
rabbitmqctl trace_off:关闭Firehose命令
打开后在web管理页面就可以看到如何使用相关的内容
消息追踪-rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
消息可靠性保障
消息补偿机制
100%确保消息发送成功
消息幂等性保障
乐观锁解决方案
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
主要靠乐观锁,也就是那个version的版本来进行控制,符合条件了才操作,再后面想重复操作会因为版本号的不同而被阻隔。
RabbitMQ集群搭建
摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理
一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。
集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
单机多实例部署
由于某些因素的限制,有时候你不得不在一台机器上去搭建一个rabbitmq集群,这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料,这里主要论述如何在单机中配置多个rabbitmq实例。
主要参考官方文档:https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运行没有问题
[root@super ~]# rabbitmqctl status
Status of node rabbit@super ...
[{pid,10232},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.5"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.5"},
{webmachine,"webmachine","1.10.3"},
{mochiweb,"MochiMedia Web Server","2.13.1"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.5"},
{rabbit,"RabbitMQ","3.6.5"},
{os_mon,"CPO CXC 138 46","2.4"},
{syntax_tools,"Syntax tools","1.7"},
{inets,"INETS CXC 138 49","6.2"},
{amqp_client,"RabbitMQ AMQP Client","3.6.5"},
{rabbit_common,[],"3.6.5"},
{ssl,"Erlang/OTP SSL application","7.3"},
{public_key,"Public key infrastructure","1.1.1"},
{asn1,"The Erlang ASN1 compiler version 4.0.2","4.0.2"},
{ranch,"Socket acceptor pool for TCP protocols.","1.2.1"},
{mnesia,"MNESIA CXC 138 12","4.13.3"},
{compiler,"ERTS CXC 138 10","6.0.3"},
{crypto,"CRYPTO","3.6.3"},
{xmerl,"XML parser","1.3.10"},
{sasl,"SASL CXC 138 11","2.7"},
{stdlib,"ERTS CXC 138 10","2.8"},
{kernel,"ERTS CXC 138 10","4.2"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
[{total,56066752},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,2680},
{queue_procs,268248},
{queue_slave_procs,0},
{plugins,1131936},
{other_proc,18144280},
{mnesia,125304},
{mgmt_db,921312},
{msg_index,69440},
{other_ets,1413664},
{binary,755736},
{code,27824046},
{atom,1000601},
{other_system,4409505}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,411294105},
{disk_free_limit,50000000},
{disk_free,13270233088},
{file_descriptors,
[{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]},
{processes,[{limit,1048576},{used,262}]},
{run_queue,0},
{uptime,43651},
{kernel,{net_ticktime,60}}]
停止rabbitmq服务
[root@super sbin]# service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.
启动第一个节点:
[root@super sbin]# RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit1.log
###### ## /var/log/rabbitmq/rabbit1-sasl.log
##########
Starting broker...
completed with 6 plugins.
启动第二个节点:
web管理插件端口占用,所以还要指定其web插件占用的端口号。
[root@super ~]# RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit2.log
###### ## /var/log/rabbitmq/rabbit2-sasl.log
##########
Starting broker...
completed with 6 plugins.
结束命令:
rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stop
rabbit1操作作为主节点:
[root@super ~]# rabbitmqctl -n rabbit1 stop_app
Stopping node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 reset
Resetting node rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1@super ...
[root@super ~]#
rabbit2操作为从节点:
[root@super ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2@super ...
[root@super ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1@'super' ###''内是主机名换成自己的
Clustering node rabbit2@super with rabbit1@super ...
[root@super ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2@super ...
查看集群状态:
[root@super ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1@super ...
[{nodes,[{disc,[rabbit1@super,rabbit2@super]}]},
{running_nodes,[rabbit2@super,rabbit1@super]},
{cluster_name,<<"rabbit1@super">>},
{partitions,[]},
{alarms,[{rabbit2@super,[]},{rabbit1@super,[]}]}]
web监控:
集群管理
rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。
rabbitmqctl cluster_status
显示集群的状态。
rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。
rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。
rabbitmqctl update_cluster_nodes {clusternode}
在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。
rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消队列queue同步镜像的操作。
rabbitmqctl set_cluster_name {name}
设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称,通过这个命令可以重新设置。
RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’
- Name:策略名称
- Pattern:匹配的规则,如果是匹配所有的队列,是^.
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。
负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
安装HAProxy
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg
配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 60s
srvtimeout 15s
#front-end IP for consumers and producters
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip
balance roundrobin
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
listen stats
bind 172.16.98.133:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s
启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy
访问如下地址对mq节点进行监控
http://172.16.98.133:8100/rabbitmq-stats
代码中访问mq集群地址,则变为访问haproxy地址:5672