RabbitMQ开启消息发送确认和消费手动确认

news2025/1/9 9:48:37

开启RabbitMQ的生产者发送消息到RabbitMQ服务端的接收确认(ACK)和消费者通过手动确认或者丢弃消费的消息。
通过配置 publisher-confirm-type: correlatedpublisher-returns: true开启生产者确认消息。

server:
  port: 8014

spring:
  rabbitmq:
    username: admin
    password: 123456
    dynamic: true
#    port: 5672
#    host: 192.168.49.9
    addresses: 192.168.49.10:5672,192.168.49.9:5672,192.168.49.11:5672
    publisher-confirm-type: correlated
    publisher-returns: true
  application:
    name: shushan
  datasource:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://ip/shushan
      username: root
      password: 
      hikari:
        minimum-idle: 10
        maximum-pool-size: 20
        idle-timeout: 50000
        max-lifetime: 540000
        connection-test-query: select 1
        connection-timeout: 600000

RabbitConfig :

package com.kexuexiong.shushan.common.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        rabbitTemplate.setConnectionFactory(connectionFactory);

        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirmCallback  data: " + correlationData);
            log.info("confirmCallback ack :" + ack);
            log.info("confirmCallback cause :" + cause);
        });

        rabbitTemplate.setReturnsCallback(returned -> log.info("returnsCallback msg : " + returned));

        return rabbitTemplate;
    }
}

AckReceiver 手动确认消费者:

package com.kexuexiong.shushan.common.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Component
public class AckReceiver implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        byte[] messageBody = message.getBody();
        try (ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(messageBody));) {

            Map<String, String> msg = (Map<String, String>) inputStream.readObject();
            log.info(message.getMessageProperties().getConsumerQueue()+"-ack Receiver :" + msg);
            log.info("header msg :"+message.getMessageProperties().getHeaders());
            if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.BUSINESS_QUEUE)){
                channel.basicNack(deliveryTag,false,false);

            }else if(Objects.equals(message.getMessageProperties().getConsumerQueue(),MqConstant.DEAD_LETTER_QUEUE)){
                channel.basicAck(deliveryTag, true);
            }else {
                channel.basicAck(deliveryTag, true);
            }

        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            log.error(e.getMessage());
        }
    }
}

通过配置 simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE)可以监听多个消息队列。

package com.kexuexiong.shushan.common.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageListenerConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private AckReceiver ackReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);

        simpleMessageListenerContainer.setConcurrentConsumers(2);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(2);
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //,MqConstant.demoDirectQueue, MqConstant.FANOUT_A, MqConstant.BIG_CAR_TOPIC

        simpleMessageListenerContainer.setQueueNames(MqConstant.DEAD_LETTER_QUEUE);

        simpleMessageListenerContainer.setMessageListener(ackReceiver);

        return simpleMessageListenerContainer;
    }

}

package com.kexuexiong.shushan.controller.mq;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.RandomUtil;
import com.kexuexiong.shushan.common.mq.MqConstant;
import com.kexuexiong.shushan.controller.BaseController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Slf4j
@RestController
@RequestMapping("/mq/")
public class MqController extends BaseController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/callback/sendDirectMessage")
    public String sendDirectMessageCallback(){

        String msgId = UUID.randomUUID().toString();
        String msg = "demo msg ,kexuexiong";
        String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");

        Map<String,Object> map = new HashMap();
        map.put("msgId",msgId);
        map.put("msg",msg);
        map.put("createTime",createTime);

        rabbitTemplate.convertAndSend("noneDirectExchange","demoDirectRouting",map);

        return "ok";
    }

    @GetMapping("/callback/lonelyDirectExchange")
    public String lonelyDirectExchange(){

        String msgId = UUID.randomUUID().toString();
        String msg = "demo msg ,kexuexiong";
        String createTime = DateUtil.format(new Date(),"YYYY-MM-dd HH:mm:ss");

        Map<String,Object> map = new HashMap();
        map.put("msgId",msgId);
        map.put("msg",msg);
        map.put("createTime",createTime);

        rabbitTemplate.convertAndSend(MqConstant.lonelyDirectExchange,"demoDirectRouting",map);

        return "ok";
    }
}

测试:

发送dirct消息 找不到交换机情况
在这里插入图片描述

2023-10-10T17:04:58.492+08:00 ERROR 27232 --- [.168.49.10:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :false
2023-10-10T17:04:58.492+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noneDirectExchange' in vhost '/', class-id=60, method-id=40)

ack 为false。

发送dirct消息 找不到队列
在这里插入图片描述

2023-10-10T17:05:55.851+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback  data: null
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback ack :true
2023-10-10T17:05:55.852+08:00  INFO 27232 --- [nectionFactory5] c.k.shushan.common.config.RabbitConfig   : confirmCallback cause :null
2023-10-10T17:05:55.865+08:00  INFO 27232 --- [nectionFactory6] c.k.shushan.common.config.RabbitConfig   : returnsCallback msg : ReturnedMessage [message=(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=lonelyDirectExchange, routingKey=demoDirectRouting]

ACK为true,replyText=NO_ROUTE。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1076810.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

举个栗子~Tableau 技巧(258):使用参数高亮文本表中的行

经常有数据粉咨询&#xff1a;如何高亮文本表中的某一行&#xff0c;像 Excel 那样给数据行增加底色&#xff0c;达到突出显示的效果。 其实&#xff0c;可以通过参数来实现这个需求。如下示例&#xff0c;在参数中键入不同的行数&#xff0c;视图就高亮对应的数据行。 那么&…

idea部署Tomcat web项目报错

idea部署Tomcat web项目报错 facets&#xff0c;a. 新增web, b. 指定好web路径, c. 右下脚创建 Artifacts 必须添加src/webapp/WEB-INF/lib, jar包到SDKs中 新建lib包文件夹&#xff0c;添加依赖jar包进来

【Python 零基础入门 】安装 环境配置

【Python 零基础入门 】第一课 安装 & 环境配置 Python 零基础入门 第一课 安装 & 环境配置Python 的历史Python 的前景安装了解你的操作系统Python 安装环境配置 PyCharm 安装第一个程序 Python 零基础入门 第一课 安装 & 环境配置 在当今的技术时代, 编程语言正…

QECon大会亮相产品,更合适的企业级测试平台:RunnerGo

在当今这个数字化时代&#xff0c;应用程序的性能至关重要。一款可靠的性能测试工具&#xff0c;能够为企业带来无数的好处。最近&#xff0c;一款名为RunnerGo的开源性能测试工具备受瞩目。本文将详细介绍RunnerGo的特点、优势以及如何解决性能测试中的痛点。 RunnerGo产品介绍…

用go获取IPv4地址,WLAN的IPv4地址,本机公网IP地址,本机空闲端口详解

文章目录 获取IPv4地址获取WLAN的IPv4地址获取本机公网IP地址获取本机空闲端口 获取IPv4地址 下面的代码会打印出本机所有的IPv4地址。这个方法可能会返回多个IP地址&#xff0c;因为一台机器可能有多个网络接口&#xff0c;每个接口可能有一个或多个IP地址。 package mainim…

Graph RAG: 知识图谱结合 LLM 的检索增强

本文为大家揭示 NebulaGraph 率先提出的 Graph RAG 方法&#xff0c;这种结合知识图谱、图数据库作为大模型结合私有知识系统的最新技术栈&#xff0c;是 LLM 系列的第三篇&#xff0c;加上之前的图上下文学习、Text2Cypher 这两篇文章&#xff0c;目前 NebulaGraph LLM 相关的…

接口测试——接口协议抓包分析与mock_L3

目录&#xff1a; 弱网测试mock的价值与意义mock实战练习 Rewrite 原理Map Local 原理Map Remote 原理使用curl发送请求tcpdump与wireshark的使用 1.弱网测试 什么是弱网测试&#xff1f; 按照移动的特性&#xff0c;一般应用低于 3G、弱信号的 Wifi 可以划分为弱网弱网测试…

im即时通讯系统源码/如何搭建一个自己的im即时通讯呢?

​一&#xff0c;思路梳理 1&#xff0c;首先思考群聊的实现方式。 每当一个用户使用websocket建立连接时&#xff0c;都会存放一个连接对象&#xff08;在connectMap集合存放&#xff0c;键为sessionId&#xff0c;值为该连接对象&#xff09;&#xff0c;每次当用户发送一条…

js Learn(异步JavaScript)

在这个模块中&#xff0c;我们来看看异步JavaScript&#xff0c;为什么它很重要&#xff0c;以及如何使用它来有效地处理潜在的阻塞操作&#xff0c;比如从服务器获取资源。 指南 异步JavaScript介绍 在本文中&#xff0c;我们将学习同步&#xff08;synchronous&#xff09…

TLR4-IN-C34-C2-COO,一种结合了TLR4抑制剂TLR4-IN-C34的连接器

TLR4-IN-C34-C2-COO是一种结合了TLR4抑制剂TLR4-IN-C34的连接器&#xff0c;在免疫调节中发挥重要作用&#xff0c;它通过抑制TLR4信号通路的传导&#xff0c;从而达到降低炎症反应的目的。TLR4是Toll样受体家族中的一员&#xff0c;它主要识别来自细菌和病毒的保守模式&#x…

零碳联盟:改变世界,实现绿色能源的共同梦想

如今&#xff0c;全球气候变暖已然成为我们面对的头等大事。温室气体的排放不断升高&#xff0c;导致地球温度上升&#xff0c;带来了严重的极端气候、冰川消融和海平面上升等问题。这一切都源于人类活动&#xff0c;特别是大规模使用化石燃料&#xff0c;如煤炭发电、供暖以及…

AIGC | LLM 提示工程 -- 如何向ChatGPT提问

当前生成式人工智能已经成为革命性的驱动源&#xff0c;正在迅速地重塑世界&#xff0c;将会改变我们生活方式和思考模式。LLM像一个学会了全部人类知识的通才&#xff0c;但这不意味每个人可以轻松驾驭这个通才。我们只有通过学习面向LLM的提示工程&#xff0c;才可以更好的让…

2023中考满分多少 中考总分数展示

中考总分根据地区而不同&#xff0c;以下是各地区总分数展示&#xff1a; 大部分地区的中考总分为750分&#xff0c;包括语文150分、数学150分、英语150分&#xff08;其中听力测试30分&#xff09;、思想品德与历史合卷共150分&#xff0c;物理与化学合卷共150分。 安徽中考…

计算机视觉--距离变换算法

计算机视觉 文章目录 计算机视觉前言距离变换 总结 前言 计算机视觉CV是人工智能一个非常重要的领域。 在本次的距离变换任务中&#xff0c;我们将使用D4距离度量方法来对图像进行处理。通过这次实验&#xff0c;我们可以更好地理解距离度量在计算机视觉中的应用。希望大家对计…

flutter sdk提供完整页面的ui

1.完整ui页面 可以借鉴一些使用案例&#xff1a; return Placeholder();/// A widget that draws a box that represents where other widgets will one day /// be added. /// /// This widget is useful during development to indicate that the interface is /// not yet…

总结四:数据库(MySQL)面经

文章目录 一、SQL1、介绍一下数据库分页2、介绍一下SQL中的聚合函数3、表跟表是怎么关联的?4、说一说你对外连接的了解&#xff1f;5、说一说数据库的左连接和右连接&#xff1f;6、SQL中怎么将行转成列&#xff1f;7、谈谈你对SQL注入的理解&#xff1f;8、将一张表的部分数据…

车载激光雷达标定板在无人驾驶中的作用

在自动驾驶领域&#xff0c;激光雷达的作用主要是通过扫描周围环境&#xff0c;获取车辆行驶过程中路况和障碍物的位置和形状&#xff0c;并将数据和信号传递给自动驾驶系统&#xff0c;帮助其做出相应的驾驶决策。 激光雷达使其成为自动驾驶中不可或缺的组成部分。激光雷达可以…

Dremio:新一代数据湖仓引擎

Dremio数据湖引擎 1、什么是Dremio2、什么是数据湖仓2.1、数据湖仓的历史和演变 3、Dremio查询引擎&#xff08;Dremio Sonar&#xff09;3、Dremio特点1、唯一具有自助式SQL分析功能的数据湖仓2、数据完全开放&#xff0c;无锁定3、亚秒级性能&#xff0c;云数据仓库成本的1/1…

前后端分离计算机毕设项目之基于springboot+vue的房屋租赁系统《内含源码+文档+部署教程》

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

47 从前序与中序遍历序列构造二叉树

从前序与中序遍历序列构造二叉树 先序无法确定子树大小&#xff0c;中序找不到根&#xff1b;所以用先序找根&#xff0c;用中序找大小题解1 递归题解2 迭代 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同…