说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。
解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。
我使用redis及设置redis过期时间来解决重复消费问题。
工程图:
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>spring-boot-starter-parent</artifactId> <!-- 被继承的父项目的构件标识符 -->
<groupId>org.springframework.boot</groupId> <!-- 被继承的父项目的全球唯一标识符 -->
<version>2.2.2.RELEASE</version> <!-- 被继承的父项目的版本 -->
</parent>
<groupId>RabbitmqDemoOne</groupId>
<artifactId>RabbitmqDemoOne</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<name>RabbitmqDemoOne Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--spring boot核心-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring boot 测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--springmvc web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--开发环境调试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--amqp 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.5</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
</dependencies>
<build>
<finalName>RabbitmqDemoOne</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.application.yml
server:
port: 8080
spring:
redis:
host: 127.0.0.1
port: 6379
rabbitmq:
port: 5672
host: 192.168.199.139
username: admin
password: admin
virtual-host: /
3.RabbitMqConfig
package com.dev.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 李庆伟
* @title: RabbitMqConfig
* @date 2024/3/3 14:12
*/
@Configuration
public class RabbitMqConfig {
/**
* 队列
* @return repeatQueue队列名称 true 持久化
*/
@Bean
public Queue makeQueue(){
return new Queue("repeatQueue",true);
}
}
4.RedisTemplateConfig
package com.dev.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author 李庆伟
* @title: RedisTemplateConfig
* @date 2024/3/3 14:24
*/
@Configuration
public class RedisTemplateConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 设置键(key)的序列化采用StringRedisSerializer。
redisTemplate.setKeySerializer(new StringRedisSerializer());
//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk
// 设置值(value)的序列化采用FastJsonRedisSerializer。
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
5.RabbitRepeatController
package com.dev.controller;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author 李庆伟
* @title: RabbitRepeatContoller
* @date 2024/3/3 14:05
*/
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
/**
* 测试
* @return
*/
@GetMapping("/sendMessage")
public String sendMessage() {
for (int i = 0; i < 1000; i++) {
String id = UUID.randomUUID().toString().replace("-","");
Map<String,Object> map = new HashMap<>();
map.put("id",id);
map.put("name","张龙");
map.put("phone","123..11");
map.put("num",i);
String str = JSONObject.toJSONString(map);
Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();
rabbitTemplate.convertAndSend("", "repeatQueue", msg);
}
return "ok";
}
}
6.RabbitMqListener
package com.dev.listener;
import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* @author 李庆伟
* @title: RabbitMqListener
* @date 2024/3/3 14:13
*/
@Component
public class RabbitMqListener {
@Autowired
private RedisUtil redisUtil;
@RabbitListener(queues = "repeatQueue")
@RabbitHandler
public void process(Message msg) throws UnsupportedEncodingException {
//获取在发送消息时设置的唯一id
String id = msg.getMessageProperties().getMessageId();
//去redis中查看是否有记录,如果有证明已经消费过了
String val = redisUtil.get(id);
if(StringUtils.isNotEmpty(val)){
return;
}
String str = new String(msg.getBody(),"utf-8");
if(StringUtils.isNotEmpty(str)){
Map<String,Object> map = JSON.parseObject(str,Map.class);
System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));
//将消费过的消息记录到redis中,失效时间为1个小时
redisUtil.set(id,id,3600L);
System.out.println("----------");
}
}
}
7.RedisUtil
package com.dev.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author 李庆伟
* @title: RedisUtil
* @date 2024/3/3 14:27
*/
@Component
public class RedisUtil {
@Autowired
private RedisTemplate redisTemplate;
/**
* 批量删除对应的value
*
* @param keys
*/
public void remove(final String... keys) {
for (String key : keys) {
remove(key);
}
}
/**
* 批量删除key
*
* @param pattern
*/
public void removePattern(final String pattern) {
Set<Serializable> keys = redisTemplate.keys(pattern);
if (keys.size() > 0)
redisTemplate.delete(keys);
}
/**
* 删除对应的value
*
* @param key
*/
public void remove(final String key) {
if (exists(key)) {
redisTemplate.delete(key);
}
}
/**
* 判断缓存中是否有对应的value
*
* @param key
* @return
*/
public boolean exists(final String key) {
return redisTemplate.hasKey(key);
}
/**
* 读取缓存
*
* @param key
* @return
*/
public String get(final String key) {
Object result = null;
redisTemplate.setValueSerializer(new StringRedisSerializer());
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
result = operations.get(key);
if(result==null){
return null;
}
return result.toString();
}
/**
* 写入缓存
*
* @param key
* @param value
* @return
*/
public boolean set(final String key, Object value) {
boolean result = false;
try {
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
result = true;
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 写入缓存
*
* @param key
* @param value
* @return
*/
public boolean set(final String key, Object value, Long expireTime) {
boolean result = false;
try {
ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
operations.set(key, value);
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
result = true;
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public boolean hmset(String key, Map<String, String> value) {
boolean result = false;
try {
redisTemplate.opsForHash().putAll(key, value);
result = true;
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public Map<String,String> hmget(String key) {
Map<String,String> result =null;
try {
result= redisTemplate.opsForHash().entries(key);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 递增
*
* @param key 键
* @paramby 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
*
* @param key 键
* @paramby 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
/**
* redis zset可已设置排序(案例,热搜)
*
* @param key 键
* @paramby
* @return
*/
public void zadd(String key ,String name) {
BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);
//自增长后的数据
boundZSetOperations.incrementScore(name,1);
}
}
8.App
package com.dev;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author 李庆伟
* @title: App
* @date 2024/3/3 14:01
*/
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class);
}
}