某某会员小程序后端性能优化

news2024/11/15 17:49:29

背景

某某会员小程序后台提供开放平台能力,为三方油站提供会员积分、优惠劵等api。当用户在油站加油,油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢,三方调用发放积分接口性能极低,耗时30s+;

接口情况

发放积分接口业务负责,且用存储过程写的业务,改动风险极大

数据库情况

优惠卷等表,数据量800w+,甚至存在单表3000w+

优化方案

数据库数据归档

归档交易、用户优惠劵等表历史数据,比如归档三个月前的数据(根据实际情况补充归档条件,如用户优惠劵没使用或没过期的数据不能归档)
优化效果:存储过程耗时从30s降低到7s,但是作为Toc用途接口性能远远不达标,优化数据库sql或许能进一步降低响应时间,但是存储过程复杂优化费时费力风险大

方案描述风险工作量难度是否能解决性能问题是否解决并发冲突影响使用技术
方案1java重写存储过程业务一定程度能解决yes改动点多,业务影响大java + orm
方案2保证存储过程全局串行执行noyes接口性能会降低分布式锁
方案3异步下存储过程全局串行执行yesyesrabbitmq+分布式锁+自旋锁

线程池异步化分析

接口中存储串行调用改为异步调用,

使用线程池异步化存在问题

开始简单使用线程池异步化,但是出现锁表的情况(原因存储过程没有保证原子性,并且其中大量使用临时表,并发下出现竞争锁表),而SqlServer自带的死锁检查机制杀死事务导致发放积分失败

线程池+分布式锁

异步线程【不能保证分布式环境的全局顺序执行】,使用分布式锁能保证同一个时间只有一个存储过程执行
问题:但是并发情况会将存储过程执行堆积在线程池,并发过大存在OOM风险,或者处理丢失风险

rabbitmq异步改造

可行性验证报告结论

验证通过点如下:

  1. 测试rabbitmq发送/接收消息【通过】
  2. 测试并发下分布式锁+自旋锁保证业务串行执行【通过】
  3. 测试并发下分布式锁+自旋锁+mq保证业务串行执行【通过】
  4. 测试业务幂等性保证不重复消费【通过】
  5. 测试手动ack兼容原来配置保证可靠性【通过】

当前项目rabbitmq使用方式问题分析

配置发下

spring.rabbitmq.host=172.18.229.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=totaltest
spring.rabbitmq.password=totaltest
spring.rabbitmq.virtual-host=/totaltest/
spring.rabbitmq.publisher-confirms=false

该配置没有

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

若是不配默认为

spring.rabbitmq.listener.direct.acknowledge-mode=auto
spring.rabbitmq.listener.simple.acknowledge-mode=auto

rabbitmq消费者ack机制问题分析

spring.rabbitmq.listener.direct.acknowledge-mode是用于配置Spring Boot应用中RabbitMQ消息监听器的确认模式。确认模式决定了在消费者处理消息后如何通知RabbitMQ服务器来确认消息的接收情况。
该配置有以下几种可选的值:

  1. AUTO: 在这种模式下,消费者处理消息后,RabbitMQ会自动确认消息。这意味着消息一旦被消费者接收,就会立即从队列中删除。这是默认的确认模式。
  2. MANUAL: 在这种模式下,消费者需要显式地发送确认消息来告知RabbitMQ服务器消息已经被成功处理。这意味着消费者可以在处理消息后决定是否要确认消息。通常在需要进行消息处理的事务性操作时使用这种模式。
  3. NONE: 在这种模式下,消费者不会发送任何确认消息,也不会被要求发送确认消息。这意味着消息会在被传递给消费者之后立即被视为已经被确认。

问题:项目中该配置使用的模式配置,以为着没有手动ack,即消费者接收到消息,消息就会从mq中删除,若是消费者消费异常,则消息丢失不可追溯复原

rabbitmq生产者ack机制问题分析

项目中配置如下

spring.rabbitmq.publisher-confirms=false

spring.rabbitmq.publisher-confirms是Spring Boot中用于配置RabbitMQ生产者消息确认的属性。它用于控制是否启用RabbitMQ的发布确认机制,以确保消息成功发送到Exchange。
当spring.rabbitmq.publisher-confirms属性设置为true时,表示启用了RabbitMQ的发布确认机制。在这种情况下,当生产者发送消息到Exchange后,RabbitMQ会发送一个确认消息给生产者,告知消息是否成功到达Exchange。生产者可以根据收到的确认消息来判断消息是否成功发送,从而进行相应的处理。
当spring.rabbitmq.publisher-confirms属性设置为false时,表示禁用了RabbitMQ的发布确认机制。在这种情况下,生产者发送消息到Exchange后,不会收到确认消息,也无法得知消息是否成功到达Exchange。
通常情况下,建议将spring.rabbitmq.publisher-confirms属性设置为true,以确保消息的可靠发送。当然,具体是否启用发布确认机制,还取决于业务场景和对消息可靠性的要求。

rabbitmq消息可靠性问题分析

通过上诉【rabbitmq生产者ack机制问题分析】和【rabbitmq消费者ack机制问题分析】
可知当前项目中消息没有保证消息可靠性,rabbitmq宕机恢复、消费者消费异常都会导致消息丢失,导致业务完整性缺失

rabbitmq配置最小改动方案

上诉问题若想得到解决需项目中rabbitmq配置,会影响到原来所有使用mq的地方,避免影响范围较大
解决方案:新增消费者类似,通过设置不同的消费者来实现接收指定的消息需要手动 ack

测试rabbitmq配置发送接收消息【通过】

rabbitmq和springboot对应版本:3. Reference
创建虚拟host
image.png
创建测试 交换机和queues

Exchange:exchange-1
Queue:queue-1
key:springboot.*

image.png

spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

image.png

发送消息

package com.bfxy.springboot.producer;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import com.bfxy.springboot.entity.Order;

@Component
public class RabbitSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSender.class);

	//自动注入RabbitTemplate模板类
	@Autowired
	private RabbitTemplate rabbitTemplate;  
	
	//回调函数: confirm确认
	final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			System.err.println("correlationData: " + correlationData);
			System.err.println("ack: " + ack);
			if(!ack){
				System.err.println("异常处理....");
			}
		}
	};
	
	//回调函数: return返回
	final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
				String exchange, String routingKey) {
			System.err.println("return exchange: " + exchange + ", routingKey: " 
				+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
		}
	};
	
	//发送消息方法调用: 构建Message消息
	public void send(Object message, Map<String, Object> properties)  {
		LOGGER.info("消息内容:{}",message);
		LOGGER.info("properties:{}",properties);
		try {
			MessageHeaders mhs = new MessageHeaders(properties);
			Message msg = MessageBuilder.createMessage(message, mhs);
			rabbitTemplate.setConfirmCallback(confirmCallback);
			rabbitTemplate.setReturnCallback(returnCallback);
			//id + 时间戳 全局唯一
			CorrelationData correlationData = new CorrelationData("1234567890");
			rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
		}catch (Exception e){
			LOGGER.error("发送消息异常,message:{}",message);
		}


	}
	
	//发送消息方法调用: 构建自定义对象消息
	public void sendOrder(Order order)  {
		LOGGER.info("订单消息内容:{}",order);
		try {
			rabbitTemplate.setConfirmCallback(confirmCallback);
			rabbitTemplate.setReturnCallback(returnCallback);
			//id + 时间戳 全局唯一
			CorrelationData correlationData = new CorrelationData("0987654321");
			rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
		}catch (Exception e){
			LOGGER.error("订单发送消息异常,message:{}",order);
		}


	}
	
}

测试代码

package com.bfxy.springboot;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.bfxy.springboot.entity.Order;
import com.bfxy.springboot.producer.RabbitSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

	@Test
	public void contextLoads() {
	}
	
	@Autowired
	private RabbitSender rabbitSender;


	private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
	
	@Test
	public void testSender1() throws Exception {
		 Map<String, Object> properties = new HashMap<>();
		 properties.put("number", "12345");
		 properties.put("send_time", simpleDateFormat.format(new Date()));
		 rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
	}
	
	@Test
	public void testSender2() throws Exception {
		 Order order = new Order("001", "第一个订单");
		 rabbitSender.sendOrder(order);
	}
	
	
	
}

接收消息

package com.bfxy.springboot.conusmer;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {

	/**
	 * 用于标识方法是一个RabbitMQ消息的监听方法,用于监听指定的队列,并在接收到消息时调用该方法进行处理。
	 * 可以指定队列、交换机、路由键等属性,用于配置消息监听的相关信息。
	 * 通常与@RabbitHandler一起使用,将消息监听和消息处理方法关联起来。
	 */
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "queue-1",  durable="true"),
			exchange = @Exchange(value = "exchange-1",  durable="true",  type= "topic",  ignoreDeclarationExceptions = "true"),
			key = "springboot.*" )
	)
	/**
	 * 用于标识方法是一个RabbitMQ消息的处理方法。
	 * 通常与@RabbitListener一起使用,用于指定具体的消息处理方法。
	 * 通过@RabbitHandler注解标识的方法可以处理多个不同类型的消息,通过方法参数的类型来区分不同的消息类型。
	 */
	@RabbitHandler
	public void onMessage(Message message, Channel channel) throws Exception {
		System.err.println("--------------------------------------");
		System.err.println("消费端Payload: " + message.getPayload());
		Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK
		channel.basicAck(deliveryTag, false);
	}
	
	
	/**
	 * 
	 * 	spring.rabbitmq.listener.order.queue.name=queue-2
		spring.rabbitmq.listener.order.queue.durable=true
		spring.rabbitmq.listener.order.exchange.name=exchange-1
		spring.rabbitmq.listener.order.exchange.durable=true
		spring.rabbitmq.listener.order.exchange.type=topic
		spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
		spring.rabbitmq.listener.order.key=springboot.*
	 * @param order
	 * @param channel
	 * @param headers
	 * @throws Exception
	 */
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
			durable="${spring.rabbitmq.listener.order.queue.durable}"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
			durable="${spring.rabbitmq.listener.order.exchange.durable}", 
			type= "${spring.rabbitmq.listener.order.exchange.type}", 
			ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
			key = "${spring.rabbitmq.listener.order.key}"
			)
	)
	@RabbitHandler
	public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,  Channel channel,
			@Headers Map<String, Object> headers) throws Exception {
		System.err.println("--------------------------------------");
		System.err.println("消费端order: " + order.getId());
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK
		channel.basicAck(deliveryTag, false);
	}
	
	
}

断点测试

测试分布式锁+自旋锁测试串行执行【通过】

测试并发分布式锁顺序执行业务代码

package com.bfxy.springboot;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

	private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class);

	@Autowired
	RedissonClient redissonClient;


	@Test
	public void contextLoads() {
		long startTime = System.currentTimeMillis();
		for (int i = 1;i<5;i++){
			int finalI = i;
			CompletableFuture.runAsync(()->{
				bizLock(String.valueOf(finalI));
			});
		}
		while (true){}

	}


	private void bizLock(String taskName) {
		RLock lock = redissonClient.getLock("my-lock");
		boolean locked = false;
		try {
			while (!locked) {
				locked = lock.tryLock();
				if (locked) {
					try {
						biz(3000, taskName);
						System.out.println("----------------");
					} finally {
						lock.unlock();
					}
				} else {
					// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁
					Thread.sleep(100);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}



	private void biz(Integer time,String taskName) throws Exception{
		long startTime = System.currentTimeMillis();
		LOGGER.info("任务序号={},任务执行开始时间={}",taskName,startTime);
		Thread.sleep(time);
		long endtime = System.currentTimeMillis();
		LOGGER.info("任务序号={},任务执行结束时间={}",taskName,startTime);
		LOGGER.info("任务序号={},任务执行消耗时间={}",taskName,(endtime-startTime));
	}
}

执行日志如下:【测试结果并发串行执行(同一时间只有一个任务执行)】

2024-07-10 13:46:49.587  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行开始时间=1720590409587
2024-07-10 13:46:52.601  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行结束时间=1720590409587
2024-07-10 13:46:52.601  INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests     : 任务序号=1,任务执行消耗时间=3014
----------------
2024-07-10 13:46:52.665  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行开始时间=1720590412665
2024-07-10 13:46:55.678  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行结束时间=1720590412665
2024-07-10 13:46:55.678  INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests     : 任务序号=4,任务执行消耗时间=3013
----------------
2024-07-10 13:46:55.759  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行开始时间=1720590415759
2024-07-10 13:46:58.761  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行结束时间=1720590415759
2024-07-10 13:46:58.761  INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests     : 任务序号=2,任务执行消耗时间=3002

压力测试对比资源使用情况

结论使用线程池比较消耗资源,特别是内存,一点并发上来可能oom

压测前

image.png

200并发

image.png

1000并发

image.png

2000并发

image.png

测试分布式锁+自旋锁+mq全局串行执行【通过】

使用线程池控制会导致请求积压到线程池消耗cpu和内存资源,使用mq能有效削峰限流(减小服务器资源消耗),线上部署了两个节点即并发为2

消费者代码

	/**
	 * 
	 * 	spring.rabbitmq.listener.order.queue.name=queue-2
		spring.rabbitmq.listener.order.queue.durable=true
		spring.rabbitmq.listener.order.exchange.name=exchange-1
		spring.rabbitmq.listener.order.exchange.durable=true
		spring.rabbitmq.listener.order.exchange.type=topic
		spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
		spring.rabbitmq.listener.order.key=springboot.*
	 * @param order
	 * @param channel
	 * @param headers
	 * @throws Exception
	 */
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
			durable="${spring.rabbitmq.listener.order.queue.durable}"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
			durable="${spring.rabbitmq.listener.order.exchange.durable}", 
			type= "${spring.rabbitmq.listener.order.exchange.type}", 
			ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
			key = "${spring.rabbitmq.listener.order.key}"
			)
	)
	@RabbitHandler
	public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order,  Channel channel,
			@Headers Map<String, Object> headers) throws Exception {
		//System.err.println("--------------------------------------");
		//System.err.println("消费端order: " + order.getId());
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		onLockOrderMessage(order);
		//手工ACK
		channel.basicAck(deliveryTag, false);
	}


	@Autowired
	RedissonClient redissonClient;

	private void onLockOrderMessage(com.bfxy.springboot.entity.Order order) {
		RLock lock = redissonClient.getLock("my-lock");
		boolean locked = false;
		try {
			while (!locked) {
				locked = lock.tryLock();
				if (locked) {
					try {
						long startTime = System.currentTimeMillis();
						String id = order.getId();
						LOGGER.info("订单序号={},订单执行开始时间={}",id,startTime);
						Thread.sleep(7000);
						long endtime = System.currentTimeMillis();
						LOGGER.info("订单序号={},订单执行结束时间={}",id,startTime);
						LOGGER.info("订单序号={},订单执行消耗时间={}",id,(endtime-startTime));
						System.out.println("----------------");
					} finally {
						lock.unlock();
					}
				} else {
					// 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁
					Thread.sleep(100);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

生产者代码

	@Test
	public void testSender3() throws Exception {
		for (int i = 1;i<=50;i++){
			int finalI = i;
			CompletableFuture.runAsync(()->{
				Order order = new Order(String.valueOf(finalI), "第"+finalI+"个订单");
				rabbitSender.sendOrder(order);
			});
			System.err.println("发送消息订单:"+finalI);
			if (i%5==0){
				Thread.sleep(1000);
			}
		}

	}

启动两个消费者【验证全局串行:同一时间只有一个业务执行】

记录消费日志验证是否串行

  1. 通过日志可知:单个消费者消费顺序执行
  2. 验证消费者1和2直接业务串行

消费者2:15:18:22 到 15:18:50 之间没有接收到消息【串行执行】
image.png
验证消费1:15:18:22 到 15:18:50时间段消息情况【串行执行】
image.png

业务幂等性保障测试【通过】

mq接收到消息会将消息中的uid放入redis,当重复消费时会进行判断,保障业务幂等性

	@Test
	public void time() {
		// 获取字符串对象
		String key = "myKey";
		String value = "Hello, Redis!";
		RBucket<String> bucket = redissonClient.getBucket(key);
		bucket.set(value, 30, TimeUnit.SECONDS); // 设置失效时间为10秒
	}

image.png

幂等逻辑

// 判断key是否存在
if(bucket.isExists()){
    LOGGER.error("重复消费,id={}",id);
    // 重复消息不执行业务逻辑跳出直接ack
    break;
}else {
    marker(id);
}

重复消费情况:进入断点表示重复执行break会跳过业务代码
image.png

rabbitmq配置生效测试

原项目配置【自动ack测试】-【通过】

测试自动ack是否生效

package com.bfxy.springboot.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@Configuration
public class RabbitMQConfig {
    @Value("${spring.rabbitmq.host}")
    private String addresses;

    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;

    @Bean
    /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
    // @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

	@Bean
	public RabbitTemplate manualAckRabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		// 配置手动ACK
		rabbitTemplate.setChannelTransacted(true);
		rabbitTemplate.setChannelTransacted(true);
		rabbitTemplate.setChannelTransacted(true);
		rabbitTemplate.setChannelTransacted(true);
		rabbitTemplate.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		return rabbitTemplate;
	}

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses + ":" + port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        /** 如果要进行消息回调,则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    @Bean("mqContainerFactory")
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

}

注释手动ack
image.png

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=total-api
spring.rabbitmq.connection-timeout=15000

发送消息,没ack前控制台信息
image.png
等待一会,自动ack的消息从rabbitmq中删除了
image.png

新增配置【手动ack测试】-【通过】

rabbitmq如何实现接受指定的消息要手动ack,其他消息自动ack

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
}

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> manualAckListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> autoAckListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
}

消费者代码指定手动ack,并注释手动ackimage.png
查看rabbitmq中消息是否被删除,预期消息不会删除
image.png
放开手动ack注释,再次测试
image.png

兜底保证方案

消息处理可能失败,处理失败的消息记录到broker_message_log表中

-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  `message` varchar(4000) DEFAULT NULL, -- 消息内容
  `try_count` int(4) DEFAULT '0', -- 重试次数
  `status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  --下一次重试时间 或 超时时间
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --创建时间
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --更新时间
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

通过定时任务重新执行失败的消息
执行点设计

  1. 定时任务重目标业务方法(该方式要将业务封装某个class的某个方法中,失败时会录入表中)
  2. 发送mq在消费中执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1922531.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

短视频SEO矩阵系统:源码开发与部署全攻略

在数字化时代&#xff0c;短视频已成为人们获取信息、娱乐休闲的重要方式。随着短视频平台的兴起&#xff0c;如何让自己的内容在众多视频中脱颖而出&#xff0c;成为每个创作者和内容运营者关注的焦点。本文将为您深入解析短视频SEO矩阵系统的源码开发与部署&#xff0c;助您在…

【原创教程】埃斯顿机器人:弯管机推力解决方式(上)

现的功能及应用的场合 本项目为弯管机设备改造工程,在不破坏设备原有的功能的情况下通过只更换设备原来的永宏PLC,使弯管机能够与埃斯顿机器人进行信号交互,通过机器人对弯管机进行上料、下料动作,即节约了人工成本,又提高了生产效率。 本文所述内容为“弯管机推力”的解决…

【设计模式】装饰模式

设计模式 【设计模式】工厂方法模式【设计模式】抽象工厂模式【设计模式】单例模式【设计模式】策略模式【设计模式】观察者模式【设计模式】装饰模式 一、介绍 装饰模式是一种结构型设计模式&#xff0c;它允许你通过将对象和对象装饰器分离来扩展对象的功能。装饰模式是一种动…

虚拟机vmware网络设置

一、网络分类 打开vmware workstation网络编辑器可以知道有三种网络类型&#xff0c;分别是&#xff1a;桥接模式、nat模式、仅主机模式。 1、桥接模式 桥接模式是将主机网卡与虚拟机虚拟的网卡利用虚拟网桥进行通信。在桥接的作用下, 类似于把物理主机虚拟为一个交换机, 所有设…

设计模式的七大原则

1.单一职责原则 单一职责原则(Single responsibility principle)&#xff0c;即一个类应该只负责一项职责。如类A负责两个不同职责&#xff1a;职责1&#xff0c;职责2。当职责1需求变更而改变A时&#xff0c;可能造成职责2执行错误&#xff0c;所以需要将类A的粒度分解为A1、…

2-32 基于matlab的最小二乘估计递推算法

基于matlab的最小二乘估计递推算法&#xff0c;生成M序列&#xff0c;对参数估计值进行辨识&#xff0c;输出估计误差结果。程序已调通&#xff0c;可直接运行。 2-32 最小二乘估计递推算法 参数估计 - 小红书 (xiaohongshu.com)

C# Winform 系统方案目录的管理开发

在做一个中等复杂程度项目时&#xff0c;我们通常有系统全局配置&#xff0c;还要有对应的方案目录的管理和更新。 比如我们有如下需求&#xff1a;开发一个方案管理&#xff0c;可以新建、打开和保存方案&#xff0c;同时还需要保存方案中的各种文件。我设计的采用目录管理和…

计算器原生js

目录 1.HTML 2.CSS 2.JS 4.资源 5.运行截图 6.下载连接 7.注意事项 1.HTML <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-s…

IDEA 中的调试方式(以 java 为例)

文章目录 IDEA 中的调试方式(以 java 为例)1. 基本介绍2. 断点调试的快捷键2.1 设置断点并启动调试2.3 快捷键 IDEA 中的调试方式(以 java 为例) 在开发中查找错误的时候&#xff0c;我们可以用断点调试&#xff0c;一步一步的看源码执行的过程&#xff0c;从而发现错误所在。 …

WEB前端01-HTML5基础(01)

一.WEB相关概念 软件架构 C/S: Client/Server &#xff08;客户端/服务器端&#xff09;&#xff1a;在用户本地有一个客户端程序&#xff0c;在远程有一个服务器端程序 优点&#xff1a;用户体验好 缺点&#xff1a;开发、安装&#xff0c;部署&#xff0c;维护麻烦 B/S: Br…

【银河麒麟服务器操作系统】系统夯死分析及处理建议

了解银河麒麟操作系统更多全新产品&#xff0c;请点击访问麒麟软件产品专区&#xff1a;https://product.kylinos.cn 服务器环境以及配置 【机型】物理机 处理器&#xff1a; Intel 内存&#xff1a; 512G 整机类型/架构&#xff1a; X86_64 【内核版本】 4.19.90-25…

IDEA的JAVA版本没有8怎么办

问题&#xff1a; 很多小伙伴会出现如下的情况&#xff0c;java的版本很高&#xff0c;没有8 解决 更换IDEA内置的Server URL的镜像地址 就是这个 把其中的地址换成 https://start.aliyun.com/ https://start.aliyun.com/ 我们可以看到JAVA 8就出现了

Mysql的语句执行很慢,如何分析排查?

1、检查服务器性能是否存在瓶颈 如果系统资源使用率比较高&#xff0c;比如CPU,硬盘&#xff0c;那访问肯定会慢&#xff0c;如果你发现是Mysl占比比较高&#xff0c;说明Mysql的读写频率高&#xff0c;如果本身网站访问量不大&#xff0c;说明你的sql参数&#xff0c;sql语句查…

气膜建筑的消防应急门:安全与保障—轻空间

气膜建筑&#xff0c;作为一种现代化的建筑形式&#xff0c;以其独特的结构和多功能用途受到广泛欢迎。然而&#xff0c;消防安全作为任何建筑的核心问题&#xff0c;尤其受到关注。为了确保在紧急情况下的安全疏散&#xff0c;气膜建筑在设计和建设过程中&#xff0c;特别重视…

网络安全高级工具软件100套

1、 Nessus&#xff1a;最好的UNIX漏洞扫描工具 Nessus 是最好的免费网络漏洞扫描器&#xff0c;它可以运行于几乎所有的UNIX平台之上。它不止永久升级&#xff0c;还免费提供多达11000种插件&#xff08;但需要注册并接受EULA-acceptance–终端用户授权协议&#xff09;。 它…

LabVIEW阀门运动PCT测试

开发了一套基于LabVIEW的阀门运动PCT&#xff08;Pressure-Composition-Temperature&#xff09;测试方法。该系统通过控制阀门运动&#xff0c;实现对氢气吸附和解吸过程的精确测量和控制。所用硬件包括NI cDAQ-9174数据采集模块、Omega PX309压力传感器、SMC ITV2030电动调节…

Intel 和 ARM 对ROP/COP/JOP的缓解措施

文章目录 前言一、ROP1.1 Intel1.2 ARM 二、COP/JOP2.1 Intel2.2 ARM 前言 前向转移(forward)&#xff1a;将控制权定向到程序中一个新位置的转移方式, 就叫做前向转移, 比如jmp和call指令。这里我们主要保护的间接跳转&#xff0c;间接跳转是运行时才知道函数地址&#xff0c…

虚幻引擎ue5如何调节物体锚点

当发现锚点不在物体上时&#xff0c;如何调节瞄点在物体上。 步骤1&#xff1a;按住鼠标中键拖动锚点&#xff0c;在透视图中多次调节锚点位置。 步骤2:在物体上点击鼠标右键点击-》锚定--》“设置为枢轴偏移”即可。

百日筑基第十九天-一头扎进消息队列2

百日筑基第十九天-一头扎进消息队列2 消息队列的通讯协议 目前业界的通信协议可以分为公有协议和私有协议两种。公有协议指公开的受到认可的具有规 范的协议&#xff0c;比如 JMS、HTTP、STOMP 等。私有协议是指根据自身的功能和需求设计的协 议&#xff0c;一般不具备通用性&…

AI网络爬虫023:用deepseek批量提取天工AI的智能体数据

文章目录 一、介绍二、输入内容三、输出内容一、介绍 天工AI的智能体首页: F12查看真实网址和响应数据: 翻页规律: https://work.tiangong.cn/agents_api/square/sq_list_by_category?category_id=7&offset=0 https://work.tiangong.cn/agents_api/square/sq_list_b…