Rabbitmq消息丢失-生产者消息丢失(一)

news2024/12/28 4:48:05

说明:消息生产者在将数据发送到Mq的时候,可能由于网络等原因造成数据投递失败。

消息丢失大致分三种:这里说的是生产者消息丢失

分析原因:

1.有没有一种可能,我刚发送消息,消息还没有到交换机就断网了,是不是消息就没有发送成功,这个时候如果不对这种情况处理,消息是不是就丢失了

2.又有没有一种可能,我又发送了一条消息,交换机拿到消息后正要发送给某个队列,就是你,你把那个队列给删掉了,这个时候消息找不到队列,消息就也丢失了

解决方法:

1.事务:Rabbitmq提供了事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

缺点:RabbitMQ 事务机制是同步的,提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,太耗性能了

2.confirm机制:相比于事务的同步,confirm机制是异步的,你发送完这个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块解决数据丢失,建议使用 confirm 机制。

话不多说,干代码

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 -->
    <groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 -->
    <version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 -->
  </parent>

  <groupId>MqLossDemo</groupId>
  <artifactId>MqLossDemo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>MqLossDemo Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!--spring boot核心-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--开发环境调试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>
    <!--amqp 支持-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.78</version>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.10</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>MqLossDemo</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2.application.yml

server:
  port: 8080
spring:
  rabbitmq:
    port: 5672
    host: 你的 rabbitmq IP
    username: admin
    password: admin
    virtual-host: /
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated
    # 发送者开启 return 确认机制
    publisher-returns: true
    template:
      #在配置文件中配置 mandatory: true 页无用,需要在RabbitTemplate中手动设置
      mandatory: true

3.RabbitMqConfig

package com.dev.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:12
 */
@Configuration
public class RabbitMqConfig {

    @Bean
    public ConfirmCallbackService confirmCallbackService() {
        return new ConfirmCallbackService();
    }

    @Bean
    public ReturnCallbackService returnCallbackService() {
        return new ReturnCallbackService();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

        //生产者发送消息到Mq交换机回执,手动ack回执回调处理
        //可以理解为:消息推送到server,但是在server里找不到交换机
        //如果想看效果【先清除交换机和队列】:在工程运行前注释掉RabbitMqQueueConfig类中的directExchange和bindingDirect方法
        rabbitTemplate.setConfirmCallback(confirmCallbackService());

        //生产者发送消息到Mq,交换机发送到队列回执,一定要设置手动设置Mandatory(true),配置文件中不生效
        //可以理解为:消息推送到server,但是在server里找不到队列
        //如果想看效果【先清除交换机和队列】:如果之前看过setConfirmCallback效果,先去掉RabbitMqQueueConfig类中注释
        //           在工程运行前注释掉RabbitMqQueueConfig类中的directQueue和bindingDirect方法
        rabbitTemplate.setReturnCallback(returnCallbackService());
        rabbitTemplate.setMandatory(true);

        return rabbitTemplate;
    }

    //生产者发送消息到Mq交换机回执 //可以理解为:消息推送到server,但是在server里找不到交换机
    class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                //log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
                System.out.println(correlationData);
                System.out.println(ack);
                System.out.println(cause);
                System.out.println("--------");
            } else {
                System.out.println("消息发送异常!");
                //可以进行重发等操作
                //这里可以处理失败的业务
            }
        }
    }

    //生产者发送的消息到Mq队列回执 //可以理解为:消息推送到server,但是在server里找不到队列
    class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

        //public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println(message.getMessageProperties().getMessageId());
            System.out.println(new String(message.getBody()));
            System.out.println(i);
            System.out.println(s);
            System.out.println(s1);
            System.out.println(s2);
            //可以将消息存储到一个新的位置,这里可以处理失败的业务
        }
    }

}

4.RabbitMqQueueConfig

package com.dev.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:12
 */
@Configuration
public class RabbitMqQueueConfig {

    //绑定键
    public final static String QUEUE_ONE = "loss_queue";

    public final static String EXCHANGE_ONE = "loss_exchange";



    @Bean
    public Queue directQueue() {
        return new Queue(RabbitMqQueueConfig.QUEUE_ONE);
    }

    //Direct交换机 起名:directExchange
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");
    }


}

5.RabbitContoller

package com.dev.controller;

import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * 类名称:消息丢失问题
 *
 * @author lqw
 * @date 2024年02月27日 14:47
 */
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法


    /**
     * 消息丢失
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        String id = UUID.randomUUID().toString().replace("-","");
        Map<String,Object> map = new HashMap<>();
        map.put("id",id);
        map.put("name","张龙");
        Message msg = MessageBuilder.withBody(JSONObject.toJSONString(map).getBytes()).setMessageId(id).build();

        rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);
        return "ok";
    }





}

6.App

package com.dev;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:11
 */
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1489666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Matlab|配电网智能软开关(sop)规划模型

目录 1 主要内容 目标函数 2 部分程序 3 程序结果 3.1 sop选址定容优化模型 3.2 对比算例&#xff08;不含sop&#xff09; 4 下载链接 1 主要内容 该程序参考文献《基于改进灵敏度分析的有源配电网智能软开关优化配置》&#xff0c;采用二阶锥算法&#xff0c;以改进的…

Vue3+element-plus复杂表单分组处理

一、为什么表单要分组处理&#xff1f; 方便表单字段的复用&#xff1a;例如&#xff0c;你的表单有十个字段会在很多的表单都会用到&#xff0c;那么表单则需要进行分组进行表单复用&#xff1b;实现不同角色的表单权限控制&#xff1a;例如一个表单有60个字段&#xff0c;角…

STM32 NAND FLASH知识点

1.NAND FLASH的简介 NAND FLASH 的概念是由东芝公司在 1989 年率先提出&#xff0c;它内部采用非线性宏单元模式&#xff0c;为固态大容量内存的实现提供了廉价有效的解决方案。 NAND FLASH 存储器具有容量较大&#xff0c;改写速度快等优点&#xff0c;适用于大量数据的存储&…

【MySQL】事务、锁

目录 事务案例场景模拟实现转账&#xff1a;从张三的账户转账500元到李四的账户SQL示例异常 什么是事务事务的特性&#xff0c;简称ACID 属性实现原理redo logundo log MySQL 中一条 SQL 更新语句的执行过程( InnoDB 存储引擎)事务的提交流程隔离性并发事务产生的问题事务隔离级…

434G数据失窃!亚信安全发布《勒索家族和勒索事件监控报告》

最新态势快速感知 最新一周全球共监测到勒索事件90起&#xff0c;与上周相比数量有所增加。 lockbit3.0仍然是影响最严重的勒索家族&#xff1b;alphv和cactus恶意家族也是两个活动频繁的恶意家族&#xff0c;需要注意防范。 Change Healthcare - Optum - UnitedHealth遭受了…

【Python】外网远程登录访问jupyter notebook+pycharm使用ipython

第一步&#xff1a;创建python虚拟环境 conda create -n py3610 python3.6.10第二步&#xff1a;安装ipython pip install ipython pip install ipython notebook第三步&#xff1a;创建 IPython Notebook 服务器配置文件 # 进入python交互shell&#xff0c;设置密码 >&…

SpringCloud(19)之Skywalking应用上篇

一、Skywalking概述 随着互联网架构的扩张&#xff0c;分布式系统变得日趋复杂&#xff0c;越来越多的组件开始走向分布式化&#xff0c;如微服务、消 息收发、分布式数据库、分布式缓存、分布式对象存储、跨域调用&#xff0c;这些组件共同构成了繁杂的分布式网络。 思考以下…

Leaflet 加载高德地图

前言 在前面的文章中&#xff0c;我们学习了如何使用 Leaflet 创建一个基本的地图。在本文中&#xff0c;我们将学习如何在 Leaflet 中加载高德地图&#xff0c;并结合实际应用构建地图点击事件。 一、介绍 高德地图是一款由高德软件提供的数字地图服务&#xff0c;在国内使用…

在国内如何申请US,visa卡?

随着跨境与AI的发展大家对美国虚拟卡的需求也越来越多&#xff0c;比如说亚马逊、ebay、Etsy、ChatGPTPLUS、midjourney、POE等等软件以及海淘的需要&#xff0c;所以我们需要用到美国虚拟卡的场景就越来越多 如何获得一张US 虚拟信用卡&#xff1f; 方法很简单&#xff0c;点…

React-子传父

1.概念 说明&#xff1a;React中子组件向父组件传递数据通常涉及回调函数和状态提升等方法。 2.代码实现 2.1绑定事件 说明&#xff1a;父组件绑定自定义事件 <Son onGetSonMsg{getMsg}></Son> 2.2接受事件 说明&#xff1a;子组件接受父组件的自定义事件名称…

【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解

&#x1f389;&#x1f389;欢迎光临&#x1f389;&#x1f389; &#x1f3c5;我是苏泽&#xff0c;一位对技术充满热情的探索者和分享者。&#x1f680;&#x1f680; &#x1f31f;特别推荐给大家我的最新专栏《Spring 狂野之旅&#xff1a;从入门到入魔》 &#x1f680; 本…

【Pytorch入门】常见Transforms/ __call__方法

在Python中&#xff0c;__call__方法是一个特殊方法&#xff0c;用于使对象可以像函数一样被调用。当一个对象实现了__call__方法时&#xff0c;可以直接使用括号运算符将对象作为函数调用。 通过实现__call__方法&#xff0c;可以为对象提供函数式的行为&#xff0c;使其更加…

超全Chat GPT论文修改指令

文献综述指令润色修改指令论文选题指令论文大指令研究理论指令论文致谢指令参考文献指令论文润色整体逻辑论文整体优化提问指令 1&#xff0e;文献综述指令 请你帮我写一份关于&#xff08;研究主题&#xff09;的文献综述。我的论文选题方向是 XXXX &#xff0c;我已经找到了…

Vue.js 修饰符:精准控制组件行为

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

IDEA中Maven无法下载jar包问题解决

在项目中经常会遇到jar包无法下载的问题&#xff0c;可以根据以下几种方法进行排查。 1. 排查网络连接 网络连接失败&#xff0c;会导致远程访问Maven仓库失败&#xff0c;所以应确保网络连接正常。 2. 排查Maven的配置 Maven配置文件&#xff08;settings.xml&#xff09;…

《数字图像处理(MATLAB版)》相关算法代码及其分析(3)

目录 1 对边界进行子采样 1.1 输入参数检查 1.2 处理重复坐标 1.3 计算边界最大范围 1.4 确定网格线数量 1.5 构建网格位置向量 1.6 计算曼哈顿距离 1.7 整理输出结果 1.8 返回结果 2 改变图像的存储类别 2.1 函数输入 2.2 数据类型转换 2.3 错误处理 2.4 返回结…

LabVIEW高温摩擦磨损测试系统

LabVIEW高温摩擦磨损测试系统 介绍了一个基于LabVIEW的高温摩擦磨损测试系统的软件开发项目。该系统实现高温条件下材料摩擦磨损特性的自动化测试&#xff0c;通过精确控制和数据采集&#xff0c;为材料性能研究提供重要数据支持。 项目背景 随着材料科学的发展&#xff0c;…

视觉Transformers中的位置嵌入 - 研究与应用指南

视觉 Transformer 中位置嵌入背后的数学和代码简介。 自从 2017 年推出《Attention is All You Need》以来&#xff0c;Transformer 已成为自然语言处理 (NLP) 领域最先进的技术。 2021 年&#xff0c;An Image is Worth 16x16 Words 成功地将 Transformer 应用于计算机视觉任务…

小迪安全31WEB 攻防-通用漏洞文件上传js 验证mimeuser.ini语言特性

#知识点&#xff1a; 1、文件上传-前端验证 2、文件上传-黑白名单 3、文件上传-user.ini 妙用 4、文件上传-PHP 语言特性 #详细点&#xff1a; 检测层面&#xff1a;前端&#xff0c;后端等 2、检测内容&#xff1a;文件头&#xff0c;完整性&#xff0c;二次渲染…

docker快照备份回滚

1. 安装系统 1.1 vm安装Ubuntu 参考:https://blog.csdn.net/u010308917/article/details/125157774 1.2 其他操作 添加自定义物理卷 –待补充– 1.2.1 查询可用物理卷 fdisk -l 输出如下 Disk /dev/loop0: 73.9 MiB, 77492224 bytes, 151352 sectors Units: sectors of …