文章目录
- 避免重复消费(保证消息幂等性)
- 消息积压
- 上线更多的消费者,进行正常消费
- 惰性队列
- 消息缓存
- 延时队列
- RabbitMQ如何保证消息的有序性?
- RabbitMQ消息的可靠性、延时队列
- 如何实现数据库与缓存数据一致?
- 开启消费者多线程消费
避免重复消费(保证消息幂等性)
-
方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
-
方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可
-
方式3: rabbitMQ的每一个消息都有
redelivered
字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMessage() {
// 创建消费对象,并指定 全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
MessageProperties messageProperties = new MessageProperties ();
messageProperties.setMessageId (UUID.randomUUID ().toString ());
messageProperties.setContentType ("text/plain");
messageProperties.setContentEncoding ("utf-8");
Message message = new Message ("hello,message idempotent!".getBytes (), messageProperties);
System.out.println ("生产消息:" + message.toString ());
rabbitTemplate.convertAndSend (EXCHANGE_NAME, ROUTE_KEY, message);
}
消费消息
/**
* 消费消息
*
* @param message
* @param channel
* @throws IOException
*/
@RabbitHandler
//org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
//注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
@RabbitListener(queues = "message_idempotent_queue")
@Transactional
public void handler(Message message, Channel channel) throws IOException {
/**
* 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
*/
// 获取消息Id
String messageId = message.getMessageProperties ().getMessageId ();
if (StringUtils.isBlank (messageId)) {
logger.info ("获取消费ID为空!");
return;
}
MessageIdempotent messageIdempotent = null;
Optional<MessageIdempotent> list = messageIdempotentRepository.findById (messageId);
if (list.isPresent ()) {
messageIdempotent = list.get ();
}
// 如果找不到,则进行消费此消息
if (null == messageIdempotent) {
//获取消费内容
String msg = new String (message.getBody (), StandardCharsets.UTF_8);
logger.info ("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);
//手动ACK
channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
//存入到表中,标识该消息已消费
MessageIdempotent idempotent = new MessageIdempotent ();
idempotent.setMessageId (messageId);
idempotent.setMessageContent (msg);
messageIdempotentRepository.save (idempotent);
} else {
//如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
logger.error ("该消息已消费,无须重复消费!");
}
}
消息积压
上线更多的消费者,进行正常消费
线上突发问题,要临时扩容,增加消费端的数量
考虑到
消费者的处理能力
,增加配置!!!
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
simple代表简单队列模型
惰性队列
//基于@Bean声明lazy-queue
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() //开启x-queue-mode为lazy
.build();
}
//基于@RabbitListener声明LazyQueue
@RabbitListener(queuesToDeclare = {
@Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
)
})
public void listenLazyQueue(String msg) {
System.out.println("接收到lazy.queue的消息:【" + msg + "】");
}
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的
page-out
,性能比较稳定
消息缓存
使用Redis的List或ZSET做接收消息缓存,写一个程序 按照消费者处理时间定时从Redis取消息发送到MQ
延时队列
设置消息过期时间,过期后转入死信队列,写一个程序 处理死信消息(重新如队列或者 即使处理或记录到数据库延后处理)
RabbitMQ如何保证消息的有序性?
RabbitMQ是队列存储,天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证
因此,要保证消息的有序性,需要做的下面几点:
- 保证消息发送的有序性
- 保证一组有序的消息都发送到同一个队列
- 保证一个队列只包含一个消费者
这样也会造成吞吐量下降,可以
在消费者内部采用多线程的方式消费
RabbitMQ消息的可靠性、延时队列
RabbitMQ消息可靠性、延时队列
如何实现数据库与缓存数据一致?
实现方案有下面几种:
- 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
- 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以
通过MQ通知服务A
,服务A修改Redis缓存数据
- 通用方案:使用
Canal框架
,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据
开启消费者多线程消费
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
/**
* @RabbitListener:加了该注解的方法表示该方法是一个消费者 concurrency:并发数量。
* 其他属性和注解想了解的话,自己按Ctrl点进去看
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "Queue1"),
exchange = @Exchange(value = "Exchange1"),
key = "key1"
),
concurrency = "10"
)
public void process1(Message message) throws Exception {
System.out.println("Queue1:" + new String(message.getBody()));
}
}
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class RabbitmqConfig {
@Bean("batchQueueRabbitListenerContainerFactory")
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory ();
factory.setConnectionFactory (connectionFactory);
factory.setMessageConverter (new Jackson2JsonMessageConverter ());
//确认方式,manual为手动ack.
factory.setAcknowledgeMode (AcknowledgeMode.MANUAL);
//每次处理数据数量,提高并发量
//factory.setPrefetchCount(250);
//设置线程数
//factory.setConcurrentConsumers(30);
//最大线程数
//factory.setMaxConcurrentConsumers(50);
/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory (connectionFactory);
factory.setConcurrentConsumers (1);
factory.setPrefetchCount (1);
//factory.setDefaultRequeueRejected(true);
//使用自定义线程池来启动消费者。
factory.setTaskExecutor (taskExecutor ());
return factory;
}
@Bean("correctTaskExecutor")
@Primary
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
// 设置核心线程数
executor.setCorePoolSize (100);
// 设置最大线程数
executor.setMaxPoolSize (100);
// 设置队列容量
executor.setQueueCapacity (0);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds (300);
// 设置默认线程名称
executor.setThreadNamePrefix ("thread-file-queue");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown (true);
return executor;
}
}