MQ高级特性
1.削峰
设置 消费者
测试 添加多条消息
拉取消息 每隔20秒拉取一次 一次拉取五条 然后在20秒内一条一条消费
TTL
Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
可以在管理台新建队列、交换机,绑定
1.图形化操作
添加队列
添加交换机
将交换机和对应的队列进行绑定
时间结束 , 消息失效
2.代码实现
配置 生产者
@Configuration public class TopicMqTtlConfig { @Value("${mq.exchange.name}") private String EXCHANGENAME; @Value("${mq.queue.name1}") private String QUEUENAME1; @Value("${mq.queue.name2}") private String QUEUENAME2; // 1 // . 交换机 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 2。 队列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//过期时间30秒 .withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃 .build(); return queue; } @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .withArgument("x-message-ttl",300000000)//过期时间30秒 .build(); return queue2; } // 3. 交换机和队列进行绑定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("ttl1.*").noargs(); return binding1; } @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("ttl2.#").noargs(); return binding2; } }
测试
添加成功 ttl1只接收10条
时间过期
死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机,因为其他MQ产品中没有交换机的概念),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
比如消息队列的消息过期,如果绑定了死信交换器,那么该消息将发送给死信交换机
消息在什么情况下会成为死信?(面试会问)
1.队列消息长度到最大的限制
最大的长度设置为10当第11条消息进来的时候就会成为死信
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false(不重新回到队列中)
设置消费者为手动签收的状态
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定交换机的方式是什么?
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
// 1. 交换机 :正常的交换机 死信交换机
// 2.队列 :正常的 死信
//3.绑定 正常ex - 正常的que
正常的que和死信交换机
死信ex-死信queue
2.代码实现
@Configuration public class TopicMqDeadConfig { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; @Value("${mq1.exchange.name2}") private String DEADEXCHANGE; @Value("${mq1.queue.name1}") private String QUEUENAME1; @Value("${mq1.queue.name2}") private String QUEUENAME2; // 声明正常交换机 @Bean("ex1") public Exchange getExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build(); return exchange; } // 正常队列 @Bean("queue1") public Queue getQueue1(){ Queue queue = QueueBuilder.nonDurable(QUEUENAME1) .withArgument("x-message-ttl",30000)//过期时间30秒 .withArgument("x-dead-letter-exchange",DEADEXCHANGE) .withArgument("x-dead-letter-routing-key","dead.test")//将正常队列与死信交换机,死信队列绑定 //.withArgument("x-max-length",10)//队列中最多接收10条消息超过10条的部分废弃 .build(); return queue; } // 交换机和队列进行绑定 @Bean("binding1") public Binding bindQueue1ToExchange(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){ Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("normal.*").noargs(); return binding1; } // 声明死信交换机 @Bean("ex2") public Exchange getDeadExchange(){ Exchange exchange = ExchangeBuilder.topicExchange(DEADEXCHANGE).durable(false).build(); return exchange; } //死信队列 @Bean("queue2") public Queue getQueue2(){ Queue queue2 = QueueBuilder.nonDurable(QUEUENAME2) .build(); return queue2; } // 死信交换机和死信队列进行绑定 @Bean("binding2") public Binding bindQueue2ToExchange(@Qualifier("ex2") Exchange exchange,@Qualifier("queue2") Queue queue){ Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("dead.*").noargs(); return binding2; } }
测试
如果程序出现错误 拒绝签收
监听正常队列
发送消息 启动测试
总结:
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3. 消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,并且不重回队列;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
- 1. 下单后,30分钟未支付,取消订单,回滚库存
- 2. 新用户注册成功7天后,发送短信问候。
实现方式:
1. 定时器
2. 死信队列
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列
组合实现延迟队列的效果。
1.配置
添加依赖
<!--2. rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--nacos 配置中心--> <!--配置中心--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- application bootstrap --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> <!-- nacos--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>com.example</groupId> <artifactId>sys-comm</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
修改配置
2.代码实现
创建实体类
发送消息 测试
过期后放入死信队列
添加依赖
<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.16</version> </dependency>
将json数据转化为对象
获取成功
3.连接数据库
创建表
创建测试类
@RestController @RequestMapping("order") public class OrderController { @Value("${mq1.exchange.name1}") private String EXCHANGENAME; // @Resource private RabbitTemplate rabbitTemplate; @GetMapping public Result aaa(TabOrder order){ //1. 消息 存放到mq里面 String s = JSONUtil.toJsonStr(order); // openfeign -- 数据添加到数据库里面 rabbitTemplate.convertAndSend(EXCHANGENAME, "normal.test", s); return Result.success(s); } }
监听normal
import javax.annotation.Resource; @Component public class XiaoFeng implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_normal") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 将字符串转化为 对象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // 将订单的信息 报讯到数据库里面 int insert = orderMapper.insert(order); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒绝签收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
监听dead
@Component public class YanChi implements ChannelAwareMessageListener { @Resource private TabOrderMapper orderMapper; @Override @RabbitListener(queues = "test_queue_dead") public void onMessage(Message message, Channel channel) throws Exception { //Thread.sleep(2000);// 20s byte[] body = message.getBody(); String s = new String(body); System.out.println(s); // 将字符串转化为 对象 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try{ TabOrder order = JSONUtil.toBean(s, TabOrder.class); // order 的状态 TabOrder tabOrder = orderMapper.selectById(order.getId()); if(tabOrder.getStatus()==1){ // 取消 tabOrder.setStatus(3); } orderMapper.updateById(tabOrder); channel.basicAck(deliveryTag,true); // }catch(Exception e){ //long deliveryTag, boolean multiple, boolean requeue System.out.println("拒绝签收消息"); channel.basicNack(deliveryTag,true,false);// 死信消息 } } }
测试
成功