SpringBoot整合rabbitmq-重复消费问题

news2024/11/19 7:32:47

说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。

解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。

我使用redis及设置redis过期时间来解决重复消费问题。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 -->
    <groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 -->
    <version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 -->
  </parent>

  <groupId>RabbitmqDemoOne</groupId>
  <artifactId>RabbitmqDemoOne</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>RabbitmqDemoOne Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  <dependencies>
    <!--spring boot核心-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--开发环境调试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>

    <!--amqp 支持-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.78</version>
    </dependency>
    <!-- commons-lang -->
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.5</version>
    </dependency>

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.10</version>
    </dependency>
  </dependencies>


  <build>
    <finalName>RabbitmqDemoOne</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2.application.yml

server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    port: 6379
  rabbitmq:
    port: 5672
    host: 192.168.199.139
    username: admin
    password: admin
    virtual-host: /

3.RabbitMqConfig

package com.dev.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李庆伟
 * @title: RabbitMqConfig
 * @date 2024/3/3 14:12
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 队列
     * @return repeatQueue队列名称 true 持久化
     */
    @Bean
    public Queue makeQueue(){
        return new Queue("repeatQueue",true);
    }

}

4.RedisTemplateConfig

package com.dev.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author 李庆伟
 * @title: RedisTemplateConfig
 * @date 2024/3/3 14:24
 */
@Configuration
public class RedisTemplateConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        // 设置键(key)的序列化采用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk
        // 设置值(value)的序列化采用FastJsonRedisSerializer。
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);

        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }


}

5.RabbitRepeatController

package com.dev.controller;

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author 李庆伟
 * @title: RabbitRepeatContoller
 * @date 2024/3/3 14:05
 */
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法


    /**
     * 测试
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        for (int i = 0; i < 1000; i++) {
            String id = UUID.randomUUID().toString().replace("-","");
            Map<String,Object> map = new HashMap<>();
            map.put("id",id);
            map.put("name","张龙");
            map.put("phone","123..11");
            map.put("num",i);
            String str = JSONObject.toJSONString(map);
            Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();
            rabbitTemplate.convertAndSend("", "repeatQueue", msg);
        }
        return "ok";
    }

}

6.RabbitMqListener

package com.dev.listener;

import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 * @author 李庆伟
 * @title: RabbitMqListener
 * @date 2024/3/3 14:13
 */
@Component
public class RabbitMqListener {

    @Autowired
    private RedisUtil redisUtil;

    @RabbitListener(queues = "repeatQueue")
    @RabbitHandler
    public void process(Message msg) throws UnsupportedEncodingException {

        //获取在发送消息时设置的唯一id
        String id = msg.getMessageProperties().getMessageId();
        
        //去redis中查看是否有记录,如果有证明已经消费过了
        String val = redisUtil.get(id);

        if(StringUtils.isNotEmpty(val)){
            return;
        }
        String str = new String(msg.getBody(),"utf-8");
        if(StringUtils.isNotEmpty(str)){
            Map<String,Object> map = JSON.parseObject(str,Map.class);
            System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));
            //将消费过的消息记录到redis中,失效时间为1个小时
            redisUtil.set(id,id,3600L);
            System.out.println("----------");
        }


    }

}

7.RedisUtil

package com.dev.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author 李庆伟
 * @title: RedisUtil
 * @date 2024/3/3 14:27
 */

@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 批量删除对应的value
     *
     * @param keys
     */
    public void remove(final String... keys) {
        for (String key : keys) {
            remove(key);
        }
    }
    /**
     * 批量删除key
     *
     * @param pattern
     */
    public void removePattern(final String pattern) {
        Set<Serializable> keys = redisTemplate.keys(pattern);
        if (keys.size() > 0)
            redisTemplate.delete(keys);
    }
    /**
     * 删除对应的value
     *
     * @param key
     */
    public void remove(final String key) {
        if (exists(key)) {
            redisTemplate.delete(key);
        }
    }
    /**
     * 判断缓存中是否有对应的value
     *
     * @param key
     * @return
     */
    public boolean exists(final String key) {
        return redisTemplate.hasKey(key);
    }
    /**
     * 读取缓存
     *
     * @param key
     * @return
     */
    public String get(final String key) {
        Object result = null;
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
        result = operations.get(key);
        if(result==null){
            return null;
        }
        return result.toString();
    }
    /**
     * 写入缓存
     *
     * @param key
     * @param value
     * @return
     */
    public boolean set(final String key, Object value) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     * 写入缓存
     *
     * @param key
     * @param value
     * @return
     */
    public boolean set(final String key, Object value, Long expireTime) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public  boolean hmset(String key, Map<String, String> value) {
        boolean result = false;
        try {
            redisTemplate.opsForHash().putAll(key, value);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public  Map<String,String> hmget(String key) {
        Map<String,String> result =null;
        try {
            result=  redisTemplate.opsForHash().entries(key);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 递增
     *
     * @param key 键
     * @paramby 要增加几(大于0)
     * @return
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key 键
     * @paramby 要减少几(小于0)
     * @return
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * redis zset可已设置排序(案例,热搜)
     *
     * @param key 键
     * @paramby
     * @return
     */
    public void zadd(String key ,String name) {
        BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);
        //自增长后的数据
        boundZSetOperations.incrementScore(name,1);
    }


}

8.App

package com.dev;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author 李庆伟
 * @title: App
 * @date 2024/3/3 14:01
 */
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1488104.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动态规划DP之背包问题4---分组背包问题

目录 DP分析&#xff1a; 例题&#xff1a; 01背包&#xff1a; 一种物品只有一件 动态规划DP之背包问题1---01背包问题-CSDN博客 完全背包&#xff1a;一种物品有无限件 动态规划DP之背包问题2---完全背包问题-CSDN博客 多重背包&#xff1a;一种物品有有限…

Avalonia学习(二十九)-仪表

Avalonia制作仪表盘&#xff0c;把控件给大家演示一下&#xff0c;Avalonia有三类自定义控件&#xff0c;分别是用户控件、模版控件、自主控件。前面已经很多用户控件了&#xff0c;这个是演示模版控件&#xff0c;另外一种不知道哪种情况下使用。 前端代码&#xff1a; <…

常用的java反编译工具介绍

在软件开发和逆向工程领域&#xff0c;反编译工具是一个重要的工具&#xff0c;它们可以帮助开发者理解、分析和修改编译后的Java字节码文件&#xff08;.class文件&#xff09;。这些工具通常用于以下几个目的&#xff1a; 1. 调试&#xff1a;帮助开发者理解字节码的…

CloudCanal x Hive 构建高效的实时数仓

简述 CloudCanal 最近对于全周期数据流动进行了初步探索&#xff0c;打通了Hive 目标端的实时同步&#xff0c;为实时数仓的构建提供了支持&#xff0c;这篇文章简要做下分享。 基于临时表的增量合并方式基于 HDFS 文件写入方式临时表统一 Schema任务级的临时表 基于临时表的…

华为手环 8:返校季新宠,助力高效学习与健康生活

随着春节假期的结束&#xff0c;学生们也纷纷踏上了返校的旅途。新的学期&#xff0c;新的气象&#xff0c;让华为手环8为你的带来全新的智能生活体验。它不仅仅是一款风格多变的时尚手环&#xff0c;还拥有了智能消息提醒、100多种运动模式和睡眠监测等强大功能&#xff0c;让…

Linux 实现打印彩色进度条

文章目录 预备知识一、理解回车换行二、认识行缓冲1、代码一、二&#xff08;回车换行理解&#xff09;2、代码三、四&#xff08;sleep函数和ffush函数理解&#xff09; 三、简单倒计时1. 倒计时代码2、效果展示 四、进度条1、效果展示2、进度条代码makefileProcessBar.hProce…

【数据结构和算法初阶(C语言)】双向循环带头链表的增删查改详解(天才设计的链表结构,应用简单逆天!!!!!)

目录 ​编辑​编辑 1.双向链表的定义&#xff1a;前赴后继 2.带头链表的定义-----哨兵位 3.增删查改 3.1创建新节点函数----方便后续增加节点调用 3.2创建哨兵位----创建头结点 3.3增加节点&#xff0c;尾部插入数据 3.4尾删除 3.5查找函数----遍历对比&#xff…

程序人生:当看到男友学测试拿高薪后,我突然悟了......

Hello&#xff0c;大家好&#xff0c;我是小雨 认识软件测试是因为我男朋友&#xff0c;他在华测教育培训软测后出去就业非常不错&#xff0c;所以我也萌生了转行的想法&#xff0c;当时我的想法是&#xff0c;他都能学会&#xff0c;那我肯定没问题&#xff0c;所以在他的介绍…

【编程系列----编译文件解析】Program size: Code, RO-data , RW-data, ZI-data

1.先给个 axf 文件 转编译bin文件的代码 2.解析下编译文件MAP 先给个编译提示 Code 是代码占用的空间 RO-data是 Read Only 只读常量的大小&#xff0c;如const型 RW-data是&#xff08;Read Write&#xff09; RW是可读可写变量&#xff0c;就是初始化时候就已经赋值了的&a…

图书管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 课题…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:组件内容模糊)

为当前组件添加内容模糊效果。 说明&#xff1a; 从API Version 10开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 foregroundBlurStyle foregroundBlurStyle(value: BlurStyle, options?: ForegroundBlurStyleOptions) 为当前组件提供…

Linux 学习笔记(12)

十二、 系统服务 1 、系统服务分类&#xff0c;根据其使用的方法来分&#xff0c;可以被分为三类 a、由 init 控制的服务&#xff1a;基本都是系统级别的服务&#xff0c;运行级别这一章讲的就是这一类的服务 b、由 System V 启动脚本启动的服务&#xff1a;和我们打交道最多…

政务浏览器——打通信创闭环最后一公里

当前&#xff0c;信创建设工作主要集中在芯片、操作系统、数据库以及pc整机&#xff0c;这些领域基本可用&#xff0c;或者达到了市场主流水平。但是&#xff0c;政务办事场景下的信创落地仍然困难重重&#xff0c;很多地方不得不装双系统或买两台设备来来平衡日常业务和信创考…

关于企业数字化转型:再认识、再思考、再出发

近年来&#xff0c;随着国家数字化政策不断出台、新兴技术不断进步、企业内生需求持续释放&#xff0c;数字化转型逐步成为企业实现高质量发展的必由之路&#xff0c;成为企业实现可持续发展乃至弯道超车的重要途径。本文重点分析当下阻碍企业数字化转型的难点&#xff0c;提出…

[项目设计] 从零实现的高并发内存池(二)

&#x1f308; 博客个人主页&#xff1a;Chris在Coding &#x1f3a5; 本文所属专栏&#xff1a;[高并发内存池] ❤️ 前置学习专栏&#xff1a;[Linux学习] ⏰ 我们仍在旅途 ​ 目录 2.高并发内存池整体架构 3.ThreadCache实现 3.1 ThreadCache整体架构…

【JSON2WEB】08 Amis的事件和校验

CRUD操作中&#xff0c;新增、编辑、删除数据后要同步刷新列表&#xff0c;这个可以用Amis的事件来实现。 1 新增数据后刷新列表 Step 1 找到【新增数据】弹窗的【提交】按钮 Step 2 添加鼠标点击事件 这里的 组件ID&#xff1a;u:13d67a44214e 为表格2的组件ID&#xff0c; …

2024常用开源测试开发工具!

今天为大家奉献一篇测试开发工具集锦干货。在本篇文章中&#xff0c;将给大家推荐几款日常工作中经常用到的测试开发工具神器&#xff0c;涵盖了自动化测试、性能压测、流量复制、混沌测试、造数据等。 1、AutoMeter-API 自动化测试平台 AutoMeter 是一款针对分布式服务&…

MySQL 8.0.35 企业版安装和启用TDE插件keyring_encrypted_file

本文主要记录MySQL企业版TDE插件keyring_encrypted_file的安装和使用。 TDE说明 TDE( Transparent Data Encryption,透明数据加密) 指的是无需修改应用就可以实现数据的加解密&#xff0c;在数据写磁盘的时候加密&#xff0c;读的时候自动解密。加密后其他人即使能够访问数据库…

Vue-03

Vue指令 v-bind 作用&#xff1a;动态设置html的标签属性&#xff08;src url title…&#xff09; 语法&#xff1a;v-bind:属性名"表达式" 举例代码如下&#xff1a; 实现效果如下&#xff1a; 案例&#xff1a;图片切换 实现代码如下&#xff1a; 实现的效果…

#WEB前端(CCS常用属性,补充span、div)

1.实验&#xff1a; 复合元素、行内元素、块内元素、行内块元素 2.IDE&#xff1a;VSCODE 3.记录&#xff1a; span为行内元素&#xff1a;不可设置宽高&#xff0c;实际占用控件决定分布空间。 div为块内元素&#xff1a;占满整行&#xff0c;可以设置宽高 img为行内块元…