关于 自定义的RabbitMQ的RabbitMessageContainer注解-实现原理

news2025/1/10 13:00:08

概述

RabbitMessageContainer注解 的主要作用就是 替换掉@Configuration配置类中的各种@Bean配置;

采用注解的方式可以让我们 固化配置,降低代码编写复杂度、减少配置错误情况的发生,提升编码调试的效率、提高业务的可用性。

  • 为什么说“降低代码编写的复杂度”呢?因为,用一行注解代替了原本好几十行的代码。
  • 为什么说“减少配置错误情况的发生,提升编码调试的效率”呢?因为,开发者从其他@Configuration配置文件复制粘贴的代码,有时会忘记修改某些Bean名称,而启动又不会报错,最终会导致队列没有消费者,需要浪费时间排查问题。
  • 为什么说“提高业务的可用性”呢?因为,组件默认配置了死信队列机制,当消费失败的时候,将异常抛出即可重试,避免因为没有配置死信队列而导致消息丢失。(如果继承AbstractJdkSerializeListener/AbstractJsonSerializeListener可以在重试一定次数后将消息落库并且丢弃)

接入方式

该组件使用Spring Boot的自动装配能力,只需要引入pom依赖即可完成接入。

<dependency>
    <groupId>com.ccbscf</groupId>
    <artifactId>ccbscf-biz-enhancer-rabbitmq-starter</artifactId>
    <version>1.0.1-SNAPSHOT</version>
</dependency>

支持哪些能力?

简单来说,以前@Bean注入方式常用的能力,这个组件都支持,以下是具体注解信息及属性配置:

  • com.ccbscf.biz.enhancer.rabbitmq.annotation.RabbitMessageContainer注解
/**
 * 向spring中注入SimpleMessageListenerContainer容器
 * 暂时只对Container的acknowledgeMode、exposeListenerChannel、prefetchCount、concurrentConsumers、maxConcurrentConsumers提供了赋值的扩展,如果需要其他的字段赋值,需要升级组件
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RabbitMessageContainer {
    /**
     * container的name,向spring容器注入bean
     * @return
     */
    String value();
    /**
     * 定义绑定关系,队列、交换器、路由key的定义都在这里面
     * 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
     * @return
     */
    QueueBinding[] bindings();

    /**
     * @return
     * @see AbstractMessageListenerContainer#setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode)
     */
    AcknowledgeMode acknowledgeMode() default AcknowledgeMode.MANUAL;

    /**
     * @return
     * @see AbstractMessageListenerContainer#setExposeListenerChannel(boolean)
     */
    boolean exposeListenerChannel() default true;

    /**
     * @return
     * @see SimpleMessageListenerContainer#setPrefetchCount(int)
     */
    int prefetchCount() default 5;

    /**
     * @return
     * @see SimpleMessageListenerContainer#setConcurrentConsumers(int)
     */
    int concurrentConsumers() default 1;

    /**
     * @return
     * @see SimpleMessageListenerContainer#setMaxConcurrentConsumers(int)
     */
    int maxConcurrentConsumers() default 1;

    /**
     * 失败 抛出异常 捕捉到异常以后 是否进行重试 默认重试
     * @return
     */
    boolean needRetry() default true;

    /**
     * 自定义的Listener维度的重试次数上限
     * @return
     */
    int customerRetriesLimitForListener() default -1;

    /**
     * 重试时间间隔
     * @return
     */
    long retryTimeInterval() default -1;
}

上面是@RabbitMessageContainer注解的源代码;原本@Bean中SimpleMessageListenerContainer常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

除了实现@Bean方式常用字段,另外增加了以下几个功能字段:

  • needRetry:失败 抛出异常 捕捉到异常以后 是否进行重试? 默认重试
  • customerRetriesLimitForListener:自定义的Listener维度的重试次数上限,此优先级高于全局的次数上限配置
  • retryTimeInterval:重试时间间隔,固定时间间隔,不支持梯度;这个配置是加在队列参数上的,一旦配置生效,就无法修改,这个RabbitMQ的特性

为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

public static SimpleMessageListenerContainer buildSimpleMessageListenerContainer(Queue queue, ConnectionFactory connectionFactory, Object messageListener) {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    simpleMessageListenerContainer.setQueues(queue);
    simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
    simpleMessageListenerContainer.setConcurrentConsumers(1);
    simpleMessageListenerContainer.setPrefetchCount(5);
    simpleMessageListenerContainer.setExposeListenerChannel(true);
    simpleMessageListenerContainer.setMessageListener(messageListener);
    simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return simpleMessageListenerContainer;
}

 

  • com.ccbscf.biz​​​​​​​.enhancer.rabbitmq.annotation.QueueBinding注解
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {

    /**
     * 绑定关系的name,主要用于向容器中注入bean的名称
     * @return
     */
    String value();

    /**
     * @return the queue.
     */
    Queue queue();

    /**
     * @return the exchange.
     */
    Exchange exchange();

    /**
     * @return the routing key or pattern for the binding.
     */
    String key() default "";
}

上面是@QueueBinding注解的源代码;原本@Bean中Binding常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    @Bean
    public Binding sendSuperviseBinding(TopicExchange approveDocDatumTopicExchange) {
        return BindingBuilder.bind(sendSuperviseQueue()).to(approveDocDatumTopicExchange).with(DOC_DATUM_TOPIC_APPROVE_ROUTING_KEY);
    }
  • com.ccbscf.biz.enhancer.rabbitmq.annotation.Queue注解
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
    /**
     * @return the queue name or "" for a generated queue name (default).
     */
    String value();

    /**
     * @return true if the queue is to be declared as durable.
     */
    boolean durable() default true;

    /**
     * @return true if the queue is to be declared as exclusive.
     */
    boolean exclusive() default false;

    /**
     * @return true if the queue is to be declared as auto-delete.
     */
    boolean autoDelete() default false;

    /**
     * 是否延迟队列
     * @return
     */
    boolean delayConsumer() default false;

    /**
     * delayConsumer为true的情况下该字段才会生效,单位:ms
     * 如果设置了delayConsumer=true延迟队消费开启,但是未设置delayTime延迟消费时间,默认值是10分钟
     * @return
     */
    long delayTime() default -1;
}

上面是@Queue注解的源代码;原本@Bean中Queue常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

除了实现@Bean方式常用字段,另外增加了以下几个功能字段:

  • delayConsumer:是否延迟队列?默认为false,如果需要开启延迟消费的功能,需要配置为true
  • delayTime:delayConsumer为true的情况下该字段才会生效,单位:ms;如果设置了delayConsumer=true延迟队消费开启,但是未设置delayTime延迟消费时间,默认值是10分钟

为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

new Queue(queueName, true, false, false, params)

 

  • com.ccbscf.biz.enhancer.rabbitmq.annotation.Exchange注解
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {

    /**
     * @return the exchange name.
     */
    String value();

    /**
     * The exchange type - only DIRECT, FANOUT TOPIC, and HEADERS exchanges are supported.
     * @return the exchange type.
     */
    String type() default ExchangeTypes.TOPIC;

    /**
     * @return true if the exchange is to be declared as durable.
     */
    boolean durable() default true;

    /**
     * @return true if the exchange is to be declared as auto-delete.
     */
    boolean autoDelete() default false;
}

上面是@Exchange注解的源代码;原本@Bean中Exchange常用的参数设置,这里都进行了支持,如果有新的个性化字段赋值,可以对组件进行扩展,给注解增加字段,同时注入BeanDefinition的时候赋值即可。

为了理解起来更直观,下面展示出原有的@Bean注入方式的示例:

    @Bean
    public TopicExchange bizCcbDefaultTopicExchange() {
        return new TopicExchange(BIZ_CCB_DEFAULT_TOPIC_EXCHANGE, true, false);
    }

核心代码逻辑

其实,实现思路非常简单,原有方式:通过开发者定义@Bean配置向spring容器中添加BeanDefinition并生成单例Bean;新的方式:根据开发者配置的注解信息集中式的生成BeanDefinition并注册到spring容器即可。

至于绑定关系、队列、交换器向MQ消息中心注册的过程不受任何影响,因为本来@Bean就是在向容器注入bean而已;

核心代码都在这一个RabbitMqEnhancerBeanDefinitionRegistry类,这个类实现了BeanDefinitionRegistryPostProcessor接口,当然BeanDefinitionRegistryPostProcessor也继承了BeanFactoryPostProcessor接口,只不过我们只使用了BeanDefinitionRegistryPostProcessor具有的特性,向容器中注入BeanDefinition信息;至于spring生成单例bean的过程,我们不去干预还是交给spring来自行完成。

从@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解中获取信息,创建相应的BeanDefinition并注册到容器中,由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。

源代码如下:

/**
 * @ClassName RabbitMqEnhancerBeanDefinitionRegistry
 * @Description
 * 处理@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
 * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
 * 还有一种实现思路是:
 *  自定义一个BeanPostProcessor的实现类,同时实现BeanFactoryAware接口(目的是获取到BeanFactory,用ApplicationContextAware也行,但是BeanFactoryAware更好些);
 *  调用postProcessAfterInitialization方法,拦截Listener并识别注解信息,创建并注册BeanDefinition,调用BeanFactory的getBean方法,创建单例bean对象;
 *  这种方式不仅个性化spring的BeanDefinition的注册,而且还个性化了bean的创建过程,因此不是最优的方式。
 * @Author zhangyuxuan
 * @Date 2023/9/13 15:29
 * @Version 1.0
 */
public class RabbitMqEnhancerBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {

    private Environment environment;

    /**
     * 处理@RabbitMessageContainer、@Queue、@Exchange、@QueueBinding注解,以及创建相应的BeanDefinition注册到容器中;
     * 由spring容器管理,充分利用spring现有机制,自动创建bean实例,尽可能减少硬编码干预spring的流程。
     *
     * @param registry
     * @throws BeansException
     */
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        for (String beanDefinitionName : registry.getBeanDefinitionNames()) {
            BeanFactory beanFactory = (BeanFactory) registry;
            //获取bean对应的Class
            Class<?> type = beanFactory.getType(beanDefinitionName);
            //获取RabbitMessageContainer注解
            RabbitMessageContainer rabbitMessageContainer = AnnotationUtils.findAnnotation(type, RabbitMessageContainer.class);
            if (rabbitMessageContainer == null) {
                continue;
            }
            //获取QueueBinding注解
            QueueBinding[] bindings = rabbitMessageContainer.bindings();
            if (bindings.length == 0) {
                continue;
            }
            //存储queue信息,都是实际消费消息 绑定Listener的队列
            List<String> queueNameList = new ArrayList<>();
            // 这里为什么是定义数组呢,因为同一个Container是可以绑定多个队列的,因此这里是数组;
            for (QueueBinding binding : bindings) {
                Queue queue = binding.queue();
                Exchange exchange = binding.exchange();
                //是否开启延迟消费功能
                boolean needDelay = queue.delayConsumer();
                //是否开启重试功能
                boolean needRetry = rabbitMessageContainer.needRetry();
                //死信重试路由key
                String retryRoutingKey = obtainDoConsumeQueue(queue, needDelay) + DL_ROUTING_KEY_SUFFIX;
                //延迟消费 实际消费的交换器
                String exchangeForDelay = environment.getProperty("spring.application.name", "") + DELAY_EXCHANGE_NAME_SUFFIX;
                //失败重试 死信交换器
                String exchangeForDl = environment.getProperty("spring.application.name", "") + DL_EXCHANGE_NAME_SUFFIX;
                //失败重试 重试交换器
                String exchangeForRetry = environment.getProperty("spring.application.name", "") + RETRY_EXCHANGE_NAME_SUFFIX;
                if (needDelay) {//延迟消费
                    String delayRoutingKey = queue.value() + DELAY_CONSUME_ROUTE_SUFFIX;//用于延迟消费
                    //用户定义的原队列
                    BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
                            .buildQueue(queue.value(), obtainMapForDelayQueue(delayRoutingKey, exchangeForDelay, queue.delayTime()), queue.durable(), queue.exclusive(), queue.autoDelete())
                            .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());
                    //注册用户定义的原队列相关配置
                    configRabbitMq(registry, bindingWrapper, true);
                    //实际消费消息的队列
                    BindingWrapper bindingWrapperConsume = BindingWrapper.generateBinding(binding.value() + DELAY_CONSUME_BINDING_SUFFIX, delayRoutingKey)
                            .buildQueue(obtainDoConsumeQueue(queue, true), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
                            .buildExchange(exchangeForDelay, exchange.type(), exchange.durable(), exchange.autoDelete());
                    //注册实际消费消息的队列相关配置,延迟交换器已经在配置中注册
                    configRabbitMq(registry, bindingWrapperConsume, false);
                    //存储queue信息,都是实际消费消息 绑定Listener的队列
                    queueNameList.add(bindingWrapperConsume.getQueueWrapper().getQueueName());
                } else {//非延迟消费
                    BindingWrapper bindingWrapper = BindingWrapper.generateBinding(binding.value(), binding.key())
                            .buildQueue(queue.value(), obtainMapForConsumeQueue(needRetry, retryRoutingKey, exchangeForDl), queue.durable(), queue.exclusive(), queue.autoDelete())
                            .buildExchange(exchange.value(), exchange.type(), exchange.durable(), exchange.autoDelete());//用户定义的原队列
                    configRabbitMq(registry, bindingWrapper, true);
                    //存储queue信息,都是实际消费消息 绑定Listener的队列
                    queueNameList.add(bindingWrapper.getQueueWrapper().getQueueName());
                }
                if (needRetry) {//是否需要重试
                    //死信队列
                    BindingWrapper bindingWrapperDl = BindingWrapper.generateBinding(binding.value() + DL_BINDING_SUFFIX, retryRoutingKey)
                            .buildQueue(queue.value() + DL_QUEUE_SUFFIX, obtainMapForDlQueue(retryRoutingKey, exchangeForRetry, rabbitMessageContainer.retryTimeInterval()), queue.durable(), queue.exclusive(), queue.autoDelete())
                            .buildExchange(exchangeForDl, DIRECT, exchange.durable(), exchange.autoDelete());
                    //注册死信队列相关配置,死信交换器已经在配置中注册
                    configRabbitMq(registry, bindingWrapperDl, false);
                    //重试队列 用于重新消费
                    BindingWrapper bindingWrapperRetry = BindingWrapper.generateBinding(binding.value() + RETRY_BINDING_SUFFIX, retryRoutingKey)
                            .buildQueue(obtainDoConsumeQueue(queue, needDelay), Collections.emptyMap(), queue.durable(), queue.exclusive(), queue.autoDelete())
                            .buildExchange(exchangeForRetry, exchange.type(), exchange.durable(), exchange.autoDelete());
                    // 向容器中注册binding的BeanDefinition,队列复用用户定义的,重试交换器已经在配置中创建
                    registryBinding(registry, bindingWrapperRetry);
                }
            }
            // 向容器中注册container的BeanDefinition
            registryContainer(registry, beanDefinitionName, rabbitMessageContainer, queueNameList);
        }
    }

    /**
     * 因为延迟消费情况的存在,因此需要获取实际消费队列的逻辑
     * @param queue
     * @param needDelay
     * @return
     */
    private String obtainDoConsumeQueue(Queue queue, boolean needDelay) {
        return needDelay ? queue.value() + DELAY_CONSUME_QUEUE_SUFFIX : queue.value();
    }

    /**
     * 向容器中注册mq的配置,包括queue、exchange、binding
     * @param registry
     * @param bindingWrapper
     */
    private void configRabbitMq(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper, boolean isNeedCreateExchange) {
        // 向容器中注册queue的BeanDefinition
        registryQueue(registry, bindingWrapper);
        // 向容器中注册exchange的BeanDefinition
        if (isNeedCreateExchange) {
            registryExchangeIfNecessary(registry, bindingWrapper);
        }
        // 向容器中注册binding的BeanDefinition
        registryBinding(registry, bindingWrapper);
    }

    /**
     * 向容器中注册container的BeanDefinition
     * @param registry
     * @param beanDefinitionName
     * @param rabbitMessageContainer
     * @param queueNameList
     */
    private void registryContainer(BeanDefinitionRegistry registry, String beanDefinitionName, RabbitMessageContainer rabbitMessageContainer, List<String> queueNameList) {
        ManagedArray managedArray = new ManagedArray("org.springframework.amqp.core.Queue", queueNameList.size());
        for (String queueName : queueNameList) {
            managedArray.add(new RuntimeBeanReference(queueName));
        }
        AbstractBeanDefinition containerBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class)
                .addConstructorArgReference("shadowConnectionFactory")
                .addPropertyValue("queues", managedArray)
                .addPropertyReference("messageListener", beanDefinitionName)
                .addPropertyValue("acknowledgeMode", rabbitMessageContainer.acknowledgeMode())
                .addPropertyValue("maxConcurrentConsumers", rabbitMessageContainer.maxConcurrentConsumers())
                .addPropertyValue("concurrentConsumers", rabbitMessageContainer.concurrentConsumers())
                .addPropertyValue("prefetchCount", rabbitMessageContainer.prefetchCount())
                .addPropertyValue("exposeListenerChannel", rabbitMessageContainer.exposeListenerChannel())
                .getBeanDefinition();
        registry.registerBeanDefinition(rabbitMessageContainer.value(), containerBeanDefinition);
    }

    /**
     * 向容器中注册queue的BeanDefinition
     * @param registry
     * @param bindingWrapper
     */
    private void registryQueue(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
        BindingWrapper.QueueWrapper queueWrapper = bindingWrapper.getQueueWrapper();
        AbstractBeanDefinition queueBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Queue.class)
                .addConstructorArgValue(queueWrapper.getQueueName())
                .addConstructorArgValue(queueWrapper.isDurable())
                .addConstructorArgValue(queueWrapper.isExclusive())
                .addConstructorArgValue(queueWrapper.isAutoDelete())
                .addConstructorArgValue(queueWrapper.getParams())
                .getBeanDefinition();
        registry.registerBeanDefinition(queueWrapper.getQueueName(), queueBeanDefinition);
    }

    /**
     * 如果有必要,向容器注入交换器
     * @param registry
     * @param bindingWrapper
     */
    private void registryExchangeIfNecessary(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
        // 如果容器中已经被ConfigurationClassPostProcessor添加了同名的Exchange的BeanDefinition,那就不在添加了;
        // 一是兼容项目原有代码已经通过@Bean方式注入了BeanDefinition;
        // 二是Exchange本来原则上就是应该尽可能服用的,所以多个Listener一定会存在使用相同的Exchange的情况;
        if (!registry.containsBeanDefinition(bindingWrapper.getExchangeWrapper().getExchangeName())) {
            registryExchange(registry, bindingWrapper);
        }
    }

    /**
     * 向容器中注册exchange的BeanDefinition
     * @param registry
     * @param bindingWrapper
     */
    private void registryExchange(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
        BindingWrapper.ExchangeWrapper exchangeWrapper = bindingWrapper.getExchangeWrapper();
        AbstractBeanDefinition exchangeBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(this.obtainExchangeType(exchangeWrapper.getType()))
                .addConstructorArgValue(exchangeWrapper.getExchangeName())
                .addConstructorArgValue(exchangeWrapper.isDurable())
                .addConstructorArgValue(exchangeWrapper.isAutoDelete())
                .getBeanDefinition();
        registry.registerBeanDefinition(exchangeWrapper.getExchangeName(), exchangeBeanDefinition);
    }

    /**
     * 向容器中注册binding的BeanDefinition
     * @param registry
     * @param bindingWrapper
     */
    private void registryBinding(BeanDefinitionRegistry registry, BindingWrapper bindingWrapper) {
        AbstractBeanDefinition bindingBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(org.springframework.amqp.core.Binding.class)
                .addConstructorArgValue(bindingWrapper.getQueueWrapper().getQueueName())
                .addConstructorArgValue(Binding.DestinationType.QUEUE)
                .addConstructorArgValue(bindingWrapper.getExchangeWrapper().getExchangeName())
                .addConstructorArgValue(bindingWrapper.getKey())
                .addConstructorArgValue(Collections.<String, Object>emptyMap())
                .getBeanDefinition();
        registry.registerBeanDefinition(bindingWrapper.getBindingName(), bindingBeanDefinition);
    }

    /**
     * 延迟消费 存储消息的制造延迟效果 的队列 上面的param
     * @return
     */
    private Map<String, Object> obtainMapForDelayQueue(String delayRoutingKey, String exchangeForConsume, long delayTime) {
        Map<String, Object> paramsForDelay = new HashMap<>();
        paramsForDelay.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
        paramsForDelay.put(X_DEAD_LETTER_EXCHANGE, exchangeForConsume);//延迟交换器
        paramsForDelay.put(X_DEAD_LETTER_ROUTING_KEY, delayRoutingKey);//延迟消费路由key
        return paramsForDelay;
    }

    /**
     * 和Listener绑定,实际消费消息 的队列 上面的param
     * @return
     */
    private Map<String, Object> obtainMapForConsumeQueue(boolean needRetry, String dlRoutingKey, String exchangeForDl) {
        if (!needRetry) {
            return Collections.emptyMap();
        }
        Map<String, Object> paramsForDl = new HashMap<>();
        paramsForDl.put(X_DEAD_LETTER_EXCHANGE, exchangeForDl);//死信交换器
        paramsForDl.put(X_DEAD_LETTER_ROUTING_KEY, dlRoutingKey);//死信消费路由key
        return paramsForDl;
    }

    /**
     * 重试场景下 死信队列 上面的param
     * @return
     */
    private Map<String, Object> obtainMapForDlQueue(String bindingWrapperForRetry, String exchangeForRetry, long delayTime) {
        Map<String, Object> paramsForOriginal = new HashMap<>();
        paramsForOriginal.put(X_DEAD_LETTER_EXCHANGE, exchangeForRetry);//重试交换器
        paramsForOriginal.put(X_DEAD_LETTER_ROUTING_KEY, bindingWrapperForRetry);//重试消费路由key
        paramsForOriginal.put(X_MESSAGE_TTL_DEFAULT, delayTime == -1 ? TTL_DEFAULT_VALUE : delayTime);//默认10分钟
        return paramsForOriginal;
    }

    /**
     * 根据注解中的属性值,返回对应的交换机类型
     * @param exchangeTypes
     * @return
     */
    private Class<?> obtainExchangeType(String exchangeTypes) {
        switch (exchangeTypes) {
            case DIRECT:
                return DirectExchange.class;
            case FANOUT:
                return FanoutExchange.class;
            case HEADERS:
                return HeadersExchange.class;
            case TOPIC:
            default:
                return TopicExchange.class;
        }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        //do nothing
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}

MQ组件配置关系图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1043826.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[补题记录] Atcoder Beginner Contest 299(E)

URL&#xff1a;https://atcoder.jp/contests/abc299 目录 E Problem/题意 Thought/思路 Code/代码 E Problem/题意 给出 N&#xff08;1 < N < 2000&#xff09;个点和 M 条边的一个无向图&#xff0c;要求用白色和黑色对这个图染色。 满足下面两个条件&#xff…

怎么加密U盘文件?U盘文件加密软件哪个好?

当U盘中储存重要数据时&#xff0c;我们需要保护U盘文件安全&#xff0c;避免数据泄露。那么&#xff0c;怎么加密U盘文件呢&#xff1f;U盘文件加密软件哪个好呢&#xff1f; ​U盘数据怎么避免泄露&#xff1f; 想要避免U盘数据泄露&#xff0c;最佳的方法就是对U盘文件进行…

C#解析JSON详解

C#解析Json详解 文章目录 C#解析Json详解什么是Json&#xff1f;Json的特点 常用的Json库Json.NET (Newtonsoft.Json)System.Text.Json 实例序列化反序列化 总结 什么是Json&#xff1f; JSON的全称是JavaScript Object Notation&#xff0c;是一种轻量级的数据交换格式&#…

作为一名独立开发者,如何获取客户?

很多程序员想成为一名独立开发者&#xff0c;从事自由职业&#xff0c;最大的困难在于如何赚钱&#xff0c;进一步来说&#xff0c;就是如何找到自己的客户&#xff0c;有很多开发者拥有丰富的经验&#xff0c;优秀的能力&#xff0c;但无法吸引客户。这篇文章的灵感正是为此而…

Qt扩展-QCustomPlot 简介及配置

QCustomPlot 简介及配置 一、概述二、安装教程三、帮助文档的集成 一、概述 QCustomPlot是一个用于绘图和数据可视化的Qt 控件。它没有进一步的依赖关系&#xff0c;并且有良好的文档记录。这个绘图库专注于制作好看的、发布质量的2D绘图、图形和图表&#xff0c;以及为实时可…

【面试题】Promise只会概念远远不够,还需这17道题目巩固!

前端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 表妹一键制作自己的五星红旗国庆头像&#xff0c;超好看 在学习Promise相关题目之前&#xff0c;我们先做一些知识的回顾&#xff1a;JavaScript 是一门单…

自动化测试面试经历

一家做户外的外企 面试问题 1、自我介绍&#xff1a;大概介绍了自己的工作经历 2、数据库问题&#xff1a;学生表中包含id、姓名、成绩、班级&#xff0c;求平均成绩 回答&#xff1a;group by 班级&#xff0c;求平均&#xff08;不够完美&#xff09; 3、java的访问修饰符…

MQ - 32 基础功能:消息查询的设计

文章目录 导图概述什么时候会用到消息查询消息队列支持查询的理论基础消息数据存储结构关于索引的一些知识点内核支持简单查询根据 Offset 查询数据根据时间戳查询数据根据消息 ID 查询数据借助第三方工具实现复杂查询第三方引擎支持查询工具化简单查询总结导图 概述 从功能上…

Error:java: 错误: 不支持发行版本 5

当创建maven项目之后&#xff0c;编译一个简单的helloworld,发生以下报错 : Error:java: 错误: 不支持发行版本 5 解决方案 : File -> Settings -> BUIld,Execution,Deployment -> Compiler -> Java Compiler 将Module表格中的Target bytecode version的1.5换…

【优测云服务平台】打造承载百倍级增长后台背后的力量-性能优化

项目介绍&#xff1a; 腾讯课堂是腾讯推出的专业在线教育平台&#xff0c;凭借技术优势&#xff0c;实现在线即时互动教学&#xff0c;打破地域的限制&#xff0c;让每个人都能接受优秀老师的指导和教学。 一、背景 2020年初&#xff0c;新冠病毒肆虐&#xff0c;疫情大面积爆…

基于SSM的办公用品管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

cocoscreator3.X 强更 游戏内下载APK和安装APK

本文环境3.6.0&#xff0c;目测3.7, 3.8都可以用 强制更新是强制用户下载整包并覆盖安装&#xff0c;因为android部分代码不方便热更&#xff0c;所以游戏内采用服务器推送下载通知&#xff0c;游戏执行下载后再安装的形式. 下载完全可在ts层完成&#xff0c;可采用cocoscrea…

继苹果、联发科后,传高通下一代5G芯片将由台积电以3纳米代工

台积电3纳米又有重量级客户加入。市场传出&#xff0c;继苹果、联发科之后&#xff0c;手机芯片大厂高通下一代5G旗舰芯片也将交由台积电以3纳米生产&#xff0c;最快将于10月下旬发表&#xff0c;成为台积电3纳米第三家客户。 针对相关传闻&#xff0c;至昨日&#xff08;25日…

基于SpringBoot的药房管理系统

基于SpringBootVue的药房管理系统&#xff0c;前后端分离 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 【主要功能】 角色&#xff1a;管理员、用户 管理员&am…

共享门店模式:一种新兴的商业模式

共享门店模式是一种利用实体店铺的空间和资源&#xff0c;让多个品牌或商家在同一地点共同运营的商业模式。这种模式可以提高店铺的利用率&#xff0c;降低经营成本&#xff0c;增加客流量&#xff0c;实现资源的最大化利用。如果你是一个有创业想法的企业家&#xff0c;或者你…

Webshell 流量特征分析 (2)

前言&#xff1a;webshell是以asp、php、jsp或者cgi等网页文件形式存在的一种代码执行环境&#xff0c;主要用于网站管理、服务器管理、权限管理等操作。使用方法简单&#xff0c;只需上传一个代码文件&#xff0c;通过网址访问&#xff0c;便可进行很多日常操作&#xff0c;极…

华为OD七日集训第6期 十一特辑 - 按算法分类,由易到难,循序渐进,玩转OD

目录 专栏导读华为OD机试算法题太多了&#xff0c;知识点繁杂&#xff0c;如何刷题更有效率呢&#xff1f; 一、逻辑分析二、数据结构1、线性表① 数组② 双指针 2、map与list3、优先队列4、滑动窗口5、二叉树6、并查集7、栈 三、算法1、基础算法① 贪心算法② 二分查找③ 分治…

【ComfyUI】Pytorch预训练模型(torch.hub)缓存地址修改

序言 最近玩ComfyUI时&#xff0c;每次生成图片&#xff0c;总是会下载一些东西&#xff0c;时间长了&#xff0c;C盘就不够用了&#xff0c;今天清理C盘发现&#xff0c;总是会在C:\Users\yutao\.cache\torch\hub\checkpoints这个路径下&#xff0c;下载大模型文件&#xff0…

初级软件测试入门教程

一、软件测试的基本概念 1、软件测试的定义 就是以发现错误为目的而运行程序的过程。 软件测试员的目标是找到软件缺陷&#xff0c;尽可能早一些&#xff0c;并确保其得以修复。 2、软件测试方法总体分类 试图验证软件是“工作的”&#xff08;所谓“工作的”就是指软件的…

linux应用层静态链接和动态链接(.a .so)

1、介绍 即使一个非常简单的程序&#xff0c;也需要依赖C标准库和系统库&#xff0c;链接其实就是把其他第三方库和自己源代码生成的二进制目标文件融合在一起的过程。经过链接之后&#xff0c;那些第三方库中定义的函数就能被调用执行了。早期的一些操作系统一般使用静态链接…