Rabbit消息的可靠性

news2025/1/12 12:21:42
    1. Confirm模式简介

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

    1. 具体代码设置

yml 

server:
  port: 8080

spring:
  application:
    name: confirm-learn1

  rabbitmq:
    host: 192.168.126.130
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式

my:
  exchangeName: exchange.confirm.1
  queueName: queue.confirm.1

 

 

配置类

package com.powernode.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueName}")
    private String queueName;

    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    @Bean
    public Binding binding(DirectExchange directExchange,Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }


}

 

写法一

 配置回调类

 

package com.powernode.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("关联id为:{}",correlationData.getId()+"");
        if (ack){
            log.info("消息正确的达到交换机");
            return;
        }
        //ack =false 没有到达交换机
        log.error("消息没有到达交换机,原因为:{}",cause);

    }
}

 

 发送消息类

package com.powernode.service;

import com.powernode.config.MyConfirmCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MyConfirmCallBack confirmCallBack;

    @PostConstruct //构造方法后执行它,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(confirmCallBack);
    }

    public void sendMsg(){
        Message message= MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData=new CorrelationData(); //关联数据
        correlationData.setId("order_123456"); //发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.1","info",message,correlationData);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }
}

 启动类

package com.powernode;

import com.powernode.service.MessageService;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;

@SpringBootApplication
public class Application implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
}

 方法二

利用lambda 可以省掉配置回调类

package com.powernode.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService{
    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct //构造方法后执行它,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(
                //lambda 表达式
                (correlationData, ack, cause)->{
                    log.info("关联id为:{}",correlationData.getId()+"");
                    if (ack){
                        log.info("消息正确的达到交换机");
                        return;
                    }
                    //ack =false 没有到达交换机
                    log.error("消息没有到达交换机,原因为:{}",cause);
                }

        );
    }

    public void sendMsg(){
        Message message= MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData=new CorrelationData(); //关联数据
        correlationData.setId("order_123456"); //发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.4dddd","info",message,correlationData);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }


}

  1. RabbitMQ消息Return模式

  1. 消息可靠性投递

rabbitmq 整个消息投递的路径为:

 

producer —> exchange —> queue —> consumer

 

>> 消息从 producer 到 exchange 则会返回一个 confirmCallback;

>> 消息从 exchange –> queue 投递失败则会返回一个 returnCallback;

我们可以利用这两个callback控制消息的可靠性投递;

开启 确认模式;

使用rabbitTemplate.setConfirmCallback设置回调函数,当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理;

注意配置文件中,开启 退回模式;

spring.rabbitmq.publisher-returns: true

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到

queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

 

yml

 

server:
  port: 8080

spring:
  application:
    name: ttl-learn1

  rabbitmq:
    host: 192.168.126.130
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-returns: true #开启return模式

my:
  exchangeName: exchange.return.1
  queueName: queue.return.1

配置类

package com.powernode.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueName}")
    private String queueName;

    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    @Bean
    public Binding binding(DirectExchange directExchange,Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with("info");
    }
}

方式一 

发送消息类

package com.powernode.service;

import com.powernode.config.MyReturnCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MyReturnCallBack myReturnCallBack;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(myReturnCallBack); //设置回调
    }

    public void sendMsg(){
        Message message= MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend("exchange.return.1","info1111",message);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }
}

 

      回调配置类

package com.powernode.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
     * 当消息从交换机 没有正确地 到达队列,则会触发该方法
     * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法
     *
     * @param returned
     */
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",returnedMessage.getReplyText());
    }
}

 

方式二 lambda

package com.powernode.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService{
    @Resource
    private RabbitTemplate rabbitTemplate;



    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(
                //使用lambda表达式
                message->{
                    log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",message.getReplyText());
                }

        ); //设置回调
    }

    public void sendMsg(){
        Message message= MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend("exchange.return.4","info",message);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }

}

 

RabbitMQ交换机详细属性

3.1具体参数

1、Name:交换机名称;就是一个字符串

2、Type:交换机类型,direct, topic, fanout, headers四种

3、Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;

4、Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;

5、Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。

6、Arguments:只有一个取值alternate-exchange,表示备用交换机;

3.2代码演示

结论1:没发消息之前不会创建交换机和对列

结论2:发消息后,如果交换机不存在,才开始创建交换机,如果队列不存在,则创建新的对列

结论3:创建交换机或者队列完成后再重新创建,如果修改交换机或队列参数则会报错

406错误(inequivalent arg 'durable' for exchange 'exchange.durability' in vhost 'powernode': received 'false' but current is 'true', class-id=40, method-id=10))

结论4:设置持久化为false ,重启rabbitmq-server,则交换机丢失,实验durable参数,先看下控制台,然后重启rabbitmq-server

结论5:实验自动删除为 true ,从控制台上手动解绑,会发现自动删除

3.3 备用交换机

3.3.1 备用交换机使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。

3.3.2 主要代码和注意事项

备用交换机示例如下:

注意:备用交换机一般使用fanout交换机

测试时:指定一个错误路由

重点:普通交换机设置参数绑定到备用交换机

Map<String, Object> arguments = new HashMap<>();

//指定当前正常的交换机的备用交换机是谁

arguments.put("alternate-exchange", EXCHANGE_ALTERNATE);

//DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)

return new DirectExchange(EXCHANGE, true, false, arguments);

//return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

3.3.3 参考配置代码

yml

server:
  port: 8080

spring:
  application:
    name: ttl-learn1

  rabbitmq:
    host: 192.168.126.130
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode

my:
  exchangeNormalName: exchange.normal.alternate  #正常交换机
  exchangeAlternateName: exchange.alternate.1 #备用交换机
  queueNormalName: queue.normal.alternate  #正常队列
  queueAlternateName: queue.alternate.1  # 备用队列

配置类 

package com.powernode.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Value("${my.exchangeNormalName}")
    private String exchangeNormalName;
    @Value("${my.exchangeAlternateName}")
    private String exchangeAlternateName;
    @Value("${my.queueNormalName}")
    private String queueNormalName;

    @Value("${my.queueAlternateName}")
    private String queueAlternateName;

    @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder // 默认为持久化的,默认不自动删除
                .directExchange(exchangeNormalName) // 交换机的名字
                .alternate(exchangeAlternateName) //设置备用交换机 alternate-exchange
                .build();
    }

    @Bean
    public Queue queueNormal(){
        return QueueBuilder.durable(queueNormalName).build();
    }

    @Bean
    public Binding binding(DirectExchange normalExchange,Queue queueNormal){
        return BindingBuilder.bind(queueNormal).to(normalExchange).with("info");
    }

    @Bean //备用交换机
    public FanoutExchange alternateExchange(){
        return ExchangeBuilder.fanoutExchange(exchangeAlternateName).build();
    }

    @Bean
    public Queue alternateQueue(){
        return QueueBuilder.durable(queueAlternateName).build();
    }

    @Bean
    public Binding bindingAlternate(FanoutExchange alternateExchange,Queue alternateQueue){
        return BindingBuilder.bind(alternateQueue).to(alternateExchange);
    }
}

3.3.4 参考发送消息代码

@Service

public class MessageService {

    @Resource

    private RabbitTemplate rabbitTemplate;

    /**

     * 发送消息

     */

public void sendMessage() {

//我们故意写错路由key,由于我们正常交换机设置了备用交换机,所以该消息就会进入备用交换机
//从而进入备用对列,我们可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理
//如果正常交换机没有设置备用交换机,则该消息会被抛弃。

        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info1223", "hello");

        System.out.println("消息发送完毕......");

    }

}

RabbitMQ队列详细属性

4.1 具体参数

Type:队列类型

Name:队列名称,就是一个字符串,随便一个字符串就可以;

Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;

Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;

Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;

基本上不设置它,设置成false

Arguments:队列的其他属性,例如指定DLX(死信交换机等);

1、x-expires:Number

当Queue(队列)在指定的时间未被访问,则队列将被自动删除;

2、x-message-ttl:Number

发布的消息在队列中存在多长时间后被取消(单位毫秒);

3、x-overflow:String

设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,有效值为Drop HeadReject Publish

4、x-max-length:Number

队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;

5、 x-single-active-consumer:默认为false

激活单一的消费者,也就是该队列只能有一个消息者消费消息;

6、x-max-length-bytes:Number

限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法;

7、x-dead-letter-exchange:String

指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;

8.x-dead-letter-routing-key:String

指定死信交换机的路由键,一般和6一起定义;

9.x-max-priority:Number

如果将一个队列加上优先级参数,那么该队列为优先级队列;

(1)、给队列加上优先级参数使其成为优先级队列

x-max-priority=10【0-255取值范围】

(2)、给消息加上优先级属性

通过优先级特性,将一个队列实现插队消费;

MessageProperties messageProperties=new MessageProperties();
messageProperties.setPriority(8);

10、x-queue-mode:String(理解下即可)

队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用,如果未设置,则队列将保留内存缓存以尽可能快地传递消息;

11、x-queue-master-locator:String(用的较少,不讲)

在集群模式下设置队列分配到的主节点位置信息;

每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;

每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave。

基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;

关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator,有三种可供选择的策略:

(1)min-masters:选择master queue数最少的那个服务节点host;

(2)client-local:选择与client相连接的那个服务节点host;

(3)random:随机分配;

4.2 参考代码

@Configuration

public class RabbitConfig {

    public static final String EXCHANGE = "exchange";

    public static final String QUEUE = "queue";

    public static final String KEY = "info";

    QueueBuilder builder;

    @Bean

    public DirectExchange directExchange() {

        return ExchangeBuilder.directExchange(EXCHANGE).build();

    }

    @Bean

    public Queue queue() {

        Map<String, Object> arguments = new HashMap<>();

        //arguments.put("x-expires", 5000);

        //arguments.put("x-max-length", 5);

        //arguments.put("x-overflow", "reject-publish");

        arguments.put("x-single-active-consumer", false); //TODO ???

        //arguments.put("x-max-length-bytes", 20); // 单位是字节

        //arguments.put("x-max-priority", 10); // 0-255 //表示把当前声明的这个队列设置成了优先级队列,那么该队列它允许消息插队

        //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM内存的使用,如果未设置,队列将保留内存缓存以尽可能快地传递消息;

        //有时候我们把这种队列叫:惰性队列

        //arguments.put("x-queue-mode", "lazy");

        //设置队列版本。默认为版本1。

        //版本1有一个基于日志的索引,它嵌入了小消息。

        //版本2有一个不同的索引,可以在许多场景中提高内存使用率和性能,并为以前嵌入的消息提供了按队列存储。

        //arguments.put("x-queue-version", 2);

        // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

        //arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

        //-------------------------

        //arguments.put("x-expires", 10000); //自动过期,10秒

        //arguments.put("x-message-ttl", 10000); //自动过期,10秒,不会删除队列

        //QueueBuilder 类里面有定义,设置队列溢出行为,当达到队列的最大长度时消息会发生什么,有效值是drop-head、reject-publish

        //arguments.put("x-max-length", 5);

        //arguments.put("x-overflow", QueueBuilder.Overflow.dropHead.getValue());

        //表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

        //arguments.put("x-single-active-consumer", true);

        // x-max-length-bytes,队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

        //arguments.put("x-max-length-bytes", 10);

        //参数是1到255之间的正整数,表示队列应该支持的最大优先级,数字越大代表优先级越高,没有设置priority优先级字段,那么priority字段值默认为0;如果优先级队列priority属性被设置为比x-max-priority大,那么priority的值被设置为x-max-priority的值。

        //arguments.put("x-max-priority", 10);

        //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

        //arguments.put("x-queue-mode", "lazy");

        arguments.put("x-queue-version", 2);

        // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

        arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());

        //---------------------------------------------

        // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)

        return new Queue(QUEUE, true, false, false, arguments);

    }

    @Bean

    public Binding binding(DirectExchange directExchange, Queue queue) {

        return BindingBuilder.bind(queue).to(directExchange).with(KEY);

    }

}

实验durable 参数 重启rabbitmq-server,队列丢失

实验autodelete参数:加入接收者,发现停掉服务,那么久没有消费者了,对列就会自动删除

 

消息可靠性投递

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

 

 

 

 

 

确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

解决方案:

  1. 、队列持久化

代码:

QueueBuilder.durable(QUEUE).build();

  1. 、交换机持久化

代码:

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

  1. 、消息持久化

代码:

默认持久化

 MessageProperties messageProperties = new MessageProperties();

//设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码

 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

  1. 、集群,镜像队列,高可用

 

  1. 确保消息从队列正确地投递到消费者

采用消息消费时的手动ack确认机制来保证;

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

#开启手动ack消息消费确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;

如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

  1. 消息的幂等性

消息消费时的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

以接口幂等性举例:

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;

注册接口;

发送短信验证码接口;

比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;

如何避免消息的重复消费问题?(消息消费时的幂等性)

全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

具体代码参考以下代码;

参考代码:

  //1、把消息的唯一ID写入redis

        boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中该key不存在,那么就设置,存在就不设置

        if (flag) { //key不存在返回true

            //相当于是第一次消费该消息

            //TODO 处理业务

            System.out.println("正常处理业务....." + orders.getId());

        }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1035233.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

黑马JVM总结(二十二)

&#xff08;1&#xff09;类的结构-field 成员变量信息 类字节码里的一些简单表示&#xff1a; &#xff08;2&#xff09;类文件结构-method-init &#xff08;3&#xff09;类文件结构-method-main &#xff08;4&#xff09;类文件结构-附加属性

Java基础(六)

前言&#xff1a;本篇博客学习Junit单元测试框架的使用以及常见的注解。 目录 单元测试 Junit单元测试框架 常见注解 单元测试 什么是单元测试&#xff1f; 针对最小的功能单元&#xff08;方法&#xff09;&#xff0c;编写测试代码对其进行正确性测试。 Junit单元测试框…

十六、MySql的MVCC机制CONNECT(收官!)

文章目录 一、数据库并发的场景有三种&#xff1a;二、读-写&#xff08;一&#xff09;3个记录隐藏列字段&#xff08;二&#xff09;undo 日志&#xff08;三&#xff09;模拟 MVCC&#xff08;四&#xff09;一些思考&#xff08;五&#xff09;Read View 一、数据库并发的场…

interview6-jvm篇

JVM(Java Virtual Machine)Java程序的运行环境&#xff08;java二进制字节码的运行环境&#xff09; 在JVM中共有四大部分&#xff0c;分别是ClassLoader&#xff08;类加载器&#xff09;、Runtime DataArea&#xff08;运行时数据区&#xff0c;内存分区&#xff09;、Execu…

整合车辆出险报告Api接口,轻松管理车险理赔!

随着车辆保有量的不断增加&#xff0c;车辆出险的情况也越来越普遍。对于车主来说&#xff0c;如何高效地管理车险理赔&#xff0c;处理保险事故是非常重要的。这时候我们就可以借助整合车辆出险报告API接口&#xff0c;实现快速定位理赔信息&#xff0c;轻松管理车险理赔。 一…

【C++面向对象侯捷】8.栈,堆和内存管理

文章目录 栈&#xff0c;堆stack object的生命周期static local object的生命周期global object的生命周期heap objects 的生命期new&#xff1a;先分配memory&#xff0c;再调用构造函数delete: 先调用析构函数&#xff0c;再释放 memory动态分配所得的内存块&#xff0c;in V…

VirtualBox解决VERR_SUPDRV_COMPONENT_NOT_FOUND错误

简述 最近使用VirtualBox时发现其增强功能不能用了&#xff0c;也就是不能双向拖拉文件&#xff0c;整了很久不知所以&#xff1b;看到有网友说跟新其VBoxGuestAdditions.ios文件&#xff0c;所以直接把我的VirtualBox从6.x升级到了7.x&#xff0c;然后就发生了眼前的一幕&…

040_小驰私房菜_MTK平台,添加camera客制化size

全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 【问题背景:】mtk8195平台,录像需要添加一组自定义size 2560 * 1600。 添加一组自定义size,我们需要确认一…

数据湖在爱奇艺数据中台的应用

01 我们眼中的数据湖 作为爱奇艺的数据中台团队&#xff0c;我们的核心任务是管理和服务公司内的大量数据资产。在实施数据治理的过程中&#xff0c;我们不断吸收新的理念&#xff0c;引入尖端的工具&#xff0c;以精细化我们的数据体系管理。“数据湖”作为近年来数据领域广泛…

为您的SSH提提速

SSH是运维和开发人员接触比较多的工具&#xff0c;一般用SSH来连接远程服务器&#xff0c;这个是我的一些免费客户和企业客户经常使用的场景&#xff0c;当然SSH除了远程连接之外&#xff0c;还有很多额外的用途&#xff0c;比如SSH本身是具备代理功能的&#xff0c;我们也有一…

HarmonyOS开发:封装一个便捷的Log工具类

前言 日志打印&#xff0c;没什么好说的&#xff0c;系统已给我们提供&#xff0c;且调用也是非常的简单&#xff0c;我们封装的目的&#xff0c;一是扩展&#xff0c;打印一些不常见的类型&#xff0c;比如格式化json&#xff0c;使得日志看起来比较好看&#xff0c;二是&…

【Java 基础篇】Java Consumer 接口详解

在Java编程中&#xff0c;有时需要对某个对象进行操作或者处理&#xff0c;而这个操作可能是非常灵活的。Java 8引入了函数式编程的特性&#xff0c;其中的一个重要接口就是Consumer接口。本文将详细介绍Consumer接口&#xff0c;包括它的定义、用法以及示例。 什么是 Consume…

【Java 基础篇】Java Supplier 接口详解

在Java中&#xff0c;Supplier接口是一个重要的函数式接口&#xff0c;它属于java.util.function包&#xff0c;用于表示一个供应商&#xff0c;它不接受任何参数&#xff0c;但可以提供一个结果。Supplier通常用于延迟计算或生成值的场景。本文将详细介绍Supplier接口的用法以…

矩阵的c++实现

在大学数学课程《线性代数》中&#xff0c;就有矩阵和行列式的出现&#xff0c;这篇文章主要讲矩阵在c中的实现和一些用途&#xff08;目前我知道的&#xff09; 此篇文章只写c的内容&#xff0c;不具体写到数学中矩阵的一些公式、性质。 本篇文章中一部分图片来自百度百科。…

Android 12,调用系统库libft2.so 遇到的各种问题记录

问题前提,Android 12系统,vendor静态库中调用 libft2.so。(vendor静态库中调用libft2.so会简单点,没这么麻烦) 【问题1】 (native:vendor) can not link against libft2 (native:platform) 本地debug尝试修改: 为了本地环境debug调试方便,我找了个 mk文件,在里面添加了…

《富足》—没有完善的个人,但是可以有完善的团队

摘要&#xff1a;在吴军老师《富足》一书上&#xff0c;阅读到一句话&#xff1a;“没有完善的个人&#xff0c;但是可以有完善的团队”。很认同这句&#xff0c;目前听见最多的可能是“没有完美的个人&#xff0c;只有完美的团队”&#xff0c;这句长挂在嘴边的话在社会工作多…

Win7开启触摸键盘方法

在Win7系统中&#xff0c;自带有触摸屏幕键盘&#xff0c;能够在屏幕上显示虚拟键盘&#xff0c;让用户可以用指针设备或触屏等进行输入操作&#xff0c;那么Win7系统怎么开启触摸键盘呢&#xff1f;想知道的小伙伴可以跟着我一起来学习一下。 1、首先打开Win7系统的开始菜单&a…

计算机竞赛 深度学习YOLO抽烟行为检测 - python opencv

文章目录 1 前言1 课题背景2 实现效果3 Yolov5算法3.1 简介3.2 相关技术 4 数据集处理及实验5 部分核心代码6 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习YOLO抽烟行为检测 该项目较为新颖&#xff0c;适合作为竞赛课…

方案:TSINGSEE青犀智能分析AI皮带撕裂算法的场景应用

在工地矿山等现实场景中&#xff0c;皮带运输在生产过程中是必不可少的&#xff0c;然而&#xff0c;由于长时间高强度的运转&#xff0c;皮带很容易发生撕裂、破损、跑偏等问题。这些问题会严重影响生产速度&#xff0c;甚至会导致严重的安全事故。为了有效预防此类安全事故发…

windows server 2019 、2012等服务器查看系统和应用程序日志

查看windows系统日志 点击左下角的windows按钮&#xff0c;输入事件两个字&#xff0c;会显示时间查看器 点击事件查看器&#xff0c;windows日志下面可以卡到系统日志和应用程序的日志 筛选时间范围内的日志 修改记录时间 选组自定义范围 选择事件事件 输入事件范围&#xff…