Rabbitmq消息丢失-消费者消息丢失(二)

news2025/1/11 5:08:12

说明:消费端在处理消息的过程中出现异常,例如:业务逻辑异常,或者消费者被停机,或者网络断开连接等,以上等情况使消息没有得到正确恰当的处理,也会使消息丢失。

分析:分析就是说明中的例如!

解决:ACK确认机制

所谓的ACK就是:首先关闭自动确认【自动ACK】,消费者收到一个消息后,就可以发一个确认【ACK】给MQ,当然什么时候发送确认【ACK】是程序员决定的,也就是说每次在确保处理完这个消息相关的业务后,程序员可以手动发送确认【ACK】,之后把消息从MQ中干掉!这样即使出现了异常也可以有效的消费消息。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 -->
    <groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 -->
    <version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 -->
  </parent>

  <groupId>MqLossDemoTwo</groupId>
  <artifactId>MqLossDemoTwo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>MqLossDemoTwo Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!--spring boot核心-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--开发环境调试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>
    <!--amqp 支持-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.78</version>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.10</version>
    </dependency>
  </dependencies>


  <build>
    <finalName>MqLossDemoTwo</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2.application.yml

server:
  port: 8080
spring:
  rabbitmq:
    port: 5672
    host: 你的 rabbitmq IP
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        concurrency: 10
        max-concurrency: 10
        prefetch: 1
        auto-startup: true
        default-requeue-rejected: true
        # 设置消费端手动 ack
        acknowledge-mode: manual
        # 是否支持重试
        retry:
          enabled: true

3.RabbitMqQueueConfig

package com.dev.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:12
 */
@Configuration
public class RabbitMqQueueConfig {

    //绑定键
    public final static String QUEUE_ONE = "loss_queue";

    public final static String EXCHANGE_ONE = "loss_exchange";



    @Bean
    public Queue directQueue() {
        return new Queue(RabbitMqQueueConfig.QUEUE_ONE);
    }

    //Direct交换机 起名:directExchange
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");
    }


}

4.RabbitController

package com.dev.controller;

import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * 类名称:消息丢失问题
 *
 * @author lqw
 * @date 2024年02月27日 14:47
 */
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法


    /**
     * 消息丢失
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        String id = UUID.randomUUID().toString().replace("-","");
        Map<String,Object> addMap = new HashMap<>();//添加用户信息
        addMap.put("id",id);
        addMap.put("name","张龙");
        Message msg = MessageBuilder.withBody(JSONObject.toJSONString(addMap).getBytes()).setMessageId(id).build();

        rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);
        return "ok";
    }





}

5.RabbitMqListener

package com.dev.listener;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 16:54
 */

@Component
public class RabbitMqListener {

    @RabbitListener(queues = "loss_queue")
    @RabbitHandler
    public void process(Message msg, Channel channel) {
        System.out.println("Rabbitmq Direct : " + msg);
        //设置的唯一id,可以用来处理重复消费
        String id = msg.getMessageProperties().getMessageId();

        //消息队列自身设置的唯一标识
        long tag = msg.getMessageProperties().getDeliveryTag();

        //int a = 1/0;

        try {
            //监听到要添加的用户信息
            String dataStr = new String(msg.getBody());
            Map<String,Object> addMap = JSON.parseObject(dataStr,Map.class);


            //先去redis中查询是否已经添加过了该用户
            //如果未添加[重复消费]
            if(true){ //如果未添加
                //添加用户业务
                //....add(addMap);
                //告诉队列该消息已经消费
                channel.basicAck(tag,false);
            } else {//如果已添加
                //告诉队列该消息已经消费
                channel.basicAck(tag,false);
            }

        } catch (Exception e) {
            try {
                //tag:消息序号
                //multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,
                //                               true确认所有consumer获得的消息(成功消费,消息从队列中删除)
                //requeue:是否要退回到队列 true 将消息再次放到mq队列中,false是不把消息放到队列中
                channel.basicNack(tag, true, false);
                //channel.basicNack(tag, true, true); //如果能走到此处,这样会把消息在放到队列中,会在次被监听到,陷入死循环
                //channel.basicNack(tag, false, true); //如果能走到此处,这样会把消息在放到队列中,会在次被监听到,陷入死循环
                //channel.basicNack(tag, false, false); //如果能走到此处,如果是扇形交换机,其他消费者也会再次消费此信息

                //可以把添加用户业务数据保存起来
                //...
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

    }

}

6.App

package com.dev;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:11
 */
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1488598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中科数安|防止电脑文件资料外泄

#防止电脑文件资料泄漏# 中科数安提供了一系列解决方案来防止电脑文件资料外泄。 www.weaem.com 这些解决方案包括以下几个方面&#xff1a; 访问控制&#xff1a;实施严格的文件访问控制&#xff0c;确保只有授权的人员能够访问和编辑核心文件。使用身份验证和权限管理系统&a…

Android APK包反编译为java文件教程

方法 流程&#xff1a; test.apk -> smali文件 -> dex文件 -> jar文件 ->java 文件 将APK包解压为 smail文件 下载 apktool工具 apktool.jar 将 test.apk 和 apktool.jar放同一目录下&#xff0c;并执行以下命令 java -jar apktool.jar d -f xxx.apk -o xxx(解…

30、类和接口

文章目录 接口概念接口和类之间有何关系&#xff1f; 可以使用接口来约束类接口继承接口接口还可以继承类接口为什么可以继承类内层原因&#xff1a;接口为什么可以继承类 用得出的结论解释最初的demo接口继承类的一些限制 接口概念 接口&#xff08;Interfaces&#xff09;可…

SAP PP学习笔记 - 豆知识07 - 如何查看BOM一览

SAP标准提供了CS03&#xff0c;只能查询单个的BOM&#xff0c;如果想查看一览&#xff0c;只能自己写SQVI 查询。 有其他高招的童鞋&#xff0c;请赐教啊。 1&#xff0c;SQVI 工具 SAP MM学习笔记18- SQVI 工具_sap sqvi-CSDN博客 输入查询名&#xff0c;然后点击 登录 2&a…

C++学习笔记:set和map

set和map set什么是setset的使用 关联式容器键值对 map什么是mapmap的使用map的插入方式常用功能map[] 的灵活使用 set 什么是set set是STL中一个底层为二叉搜索树来实现的容器 若要使用set需要包含头文件 #include<set>set中的元素具有唯一性(因此可以用set去重)若用…

Linux高级编程:进程间的通信(二)、IPC

回顾 共7种方式&#xff1a; 古老的进程间通信方式&#xff1a; 管道&#xff1a; 无名管道 有名管道 信号 系统V IPC进程对象 共享内存 消息队列 信号量集 socket通信 //网络 ------------------------- 无名管道 pipe&#xff08;&#xff09; 特点&#xff1a; 用于…

CSS3笔记

1.相同优先级的样式以写在后面的为主。 2.交集选择器&#xff0c;并且 条件挨在一起 p.rich{...} /*p元素class有rich的元素*/ 3.并集选择器&#xff0c;或者 逗号隔开 .class1,class2{...}/*满足其中一个类名都会使用该样式*/ 4.后代选择器 空格 隔开 所有符合的包括孙子及…

揭秘App访问量背后的秘密:数据统计与分析

在移动互联网时代&#xff0c;App已成为人们日常生活的重要组成部分。对于App运营者来说&#xff0c;了解用户的访问量、行为习惯等数据至关重要。本文将深入探讨如何精准统计App访问量&#xff0c;为运营者提供有价值的数据支持。 一、App访问量统计的重要性 访问量是衡量A…

2024年【焊工(初级)】找解析及焊工(初级)考试技巧

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 焊工&#xff08;初级&#xff09;找解析是安全生产模拟考试一点通生成的&#xff0c;焊工&#xff08;初级&#xff09;证模拟考试题库是根据焊工&#xff08;初级&#xff09;最新版教材汇编出焊工&#xff08;初级…

【EAI 027】Learning Interactive Real-World Simulators

Paper Card 论文标题&#xff1a;Learning Interactive Real-World Simulators 论文作者&#xff1a;Mengjiao Yang, Yilun Du, Kamyar Ghasemipour, Jonathan Tompson, Leslie Kaelbling, Dale Schuurmans, Pieter Abbeel 作者单位&#xff1a;UC Berkeley, Google DeepMind, …

leetcode 移除链表元素

本题中&#xff0c;我们是要移除链表的某一个节点&#xff0c;为了确保统一操作&#xff0c;我们需要使用虚拟头节点&#xff0c;这样我们删除节点的时候&#xff0c;就是把这个要删除的节点&#xff08;当前节点cur&#xff09;的前一个节点pre&#xff0c;使得pre.next指向要…

jsp阿帕奇安装教程

1.将压缩包解压&#xff0c;存放在自己所知道的位置 2.将软件文件夹打开 使用winr &#xff0c;输入cmd运行打开 输入Java或者Javac&#xff0c;出现一大串之后表明成功 接着在所解压的软件中点开bin这个文件夹&#xff0c;找到startup.bat点击 点击之后会出现黑框&#xff0c…

Linux中断实验:定时器实现按键消抖处理

一. 简介 前面文章学习了Linux驱动按键中断实验,文章地址如下: Linux驱动按键中断实验:按键中断功能的实现-CSDN博客 本文在Linux驱动按键中断实现的基础上,使用定时器实现按键消抖处理。 二. Linux中断实验:定时器实现按键消抖处理 1. 定时器处理按键消抖的原理 按…

java使用poi、ftl将html导出word,设置视图为 页面视图

1、修改html标签&#xff0c; 加入如下内容 <html xmlns:v"urn:schemas-microsoft-com:vml" xmlns:o"urn:schemas-microsoft-com:office:office" xmlns:w"urn:schemas-microsoft-com:office:word" xmlns:m"http://schemas.microsoft.c…

【NCRE Python】

基本输入输出函数 input 函数 input 函数用于从标准输入&#xff08;如键盘&#xff09;接收用户输入的字符串。当 input 函数被调用时&#xff0c;程序会暂停执行&#xff0c;等待用户输入文本并按回车键。用户输入的文本会作为字符串返回给程序。input 函数还可以接收一个字…

【笔记版】docker常用指令---systemctl类、docker状态

systemctl [options] docker 启动&#xff1a;system start docker查看状态&#xff1a;systemctl status docker停止&#xff1a;systemctl stop docker有警告&#xff1a;service关闭了&#xff0c;但是docker.socket仍响应解决方法&#xff1a;systemctl stop docker.socket…

OSError: [WinError 1455] 页面文件太小,无法完成操作。

[问题描述]&#xff1a;OSError: [WinError 1455] 页面文件太小&#xff0c;无法完成操作。 原因1&#xff1a;线程数太大 方法&#xff1a;改小线程&#xff08;workers&#xff09;数。 原因2&#xff1a;虚拟内存太小或为0&#xff0c;调大虚拟内存。 方法&#xff1a;右键…

SL3036 DC100V降压5V 2A 低功耗 性价比低 替代MP9486A

SL3036 DC100V降压5V 2A是一款常用的电源管理芯片&#xff0c;它能够将较高的输入电压降至较低的输出电压&#xff0c;为电子设备提供稳定可靠的电源供应。其低功耗、高性价比的特点使得它在许多领域得到了广泛的应用。然而&#xff0c;对于一些对成本敏感的应用来说&#xff0…

Linux——MySQL主从复制与读写分离

实验环境 虚拟机 3台 centos7.9 网卡NAT模式 数量 1 组件包mysql-5.6.36.tar.gz cmake-2.8.6.tar.gz 设备 IP 备注 Centos01 192.168.223.123 Amoeba Centos02 192.168.223.124 Master Centos03 192.168.223.125 Slave MySQL安装 主从同时操作 安装所需要的…