mq调用流程
创建消息转换器
package com.wd.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqMessageConvertConfig { /** * 公共的消息转换器 * * @return MessageConverter */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
创建exchange交换机:普通交换机、延迟交换机、死信交换机
package com.wd.config; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqExchangeConfig { public static final int DELAY_TIME = 20 * 1000; /** * 普通交换机名称 */ public static final String EXCHANGE_NAME = "wd_exchange"; /** * 延迟交换机名称 */ public static final String DELAY_EXCHANGE_NAME = "wd_delay_exchange"; /** * 死信交换机 */ public static final String DEAD_EXCHANGE_NAME = "wd_dead_exchange"; @Bean public DirectExchange exchange() { return new DirectExchange(EXCHANGE_NAME, true, false); } @Bean public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME, true, false); } @Bean public DirectExchange deadExchange() { return new DirectExchange(DEAD_EXCHANGE_NAME, true, false); } }
创建master的connection
package com.wd.config.master; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqMasterConnectionConfig { @Value("${rabbitmq.master.vhost}") private String vhost; @Value("${rabbitmq.master.addresses}") private String addresses; @Value("${rabbitmq.master.username}") private String username; @Value("${rabbitmq.master.password}") private String password; @Bean public ConnectionFactory masterConnectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(addresses); cachingConnectionFactory.setVirtualHost(vhost); cachingConnectionFactory.setUsername(username); cachingConnectionFactory.setPassword(password); return cachingConnectionFactory; } @Bean public RabbitTemplate rabbitTemplate(@Qualifier("masterConnectionFactory") ConnectionFactory masterConnectionFactory, MessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(masterConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } }
创建slave的connection
package com.wd.config.slave; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqSlaveConnectionConfig { @Value("${rabbitmq.slave.vhost}") private String vhost; @Value("${rabbitmq.slave.addresses}") private String addresses; @Value("${rabbitmq.slave.username}") private String username; @Value("${rabbitmq.slave.password}") private String password; @Bean public ConnectionFactory slaveConnectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses(addresses); cachingConnectionFactory.setVirtualHost(vhost); cachingConnectionFactory.setUsername(username); cachingConnectionFactory.setPassword(password); return cachingConnectionFactory; } @Bean public RabbitTemplate slaveRabbitTemplate(@Qualifier("slaveConnectionFactory") ConnectionFactory slaveConnectionFactory, MessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(slaveConnectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } }
创建队列A: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME; import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME; @Configuration public class QueueAConfig { private static final String QUEUE_A_NAME = "wd_queue_a"; private static final String DELAY_QUEUE_A_NAME = "wd_delay_queue_a"; private static final String DEAD_QUEUE_A_NAME = "wd_dead_queue_a"; private static final String QUEUE_A_ROUTING_KEY = "queue_A_routing_key"; private static final String DELAY_QUEUE_A_ROUTING_KEY = "delay_queue_a_routing_key"; private static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead_letter_queue_A_routing_key"; @Bean public Queue queueA() { return new Queue(QUEUE_A_NAME, true); } @Bean public Binding queueABinding(@Qualifier("queueA") Queue queueA, @Qualifier("exchange") DirectExchange exchange) { return BindingBuilder.bind(queueA).to(exchange).with(QUEUE_A_ROUTING_KEY); } @Bean public Queue delayQueueA() { Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", DELAY_TIME); return new Queue(DELAY_QUEUE_A_NAME, true, false, false, args); } @Bean public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue delayQueueA, @Qualifier("delayExchange") DirectExchange delayExchange) { return BindingBuilder.bind(delayQueueA).to(delayExchange).with(DELAY_QUEUE_A_ROUTING_KEY); } @Bean public Queue deadQueueA() { return new Queue(DEAD_QUEUE_A_NAME, true); } @Bean public Binding deadQueueABinding(@Qualifier("deadQueueA") Queue deadQueueA, @Qualifier("deadExchange") DirectExchange deadExchange) { return BindingBuilder.bind(deadQueueA).to(deadExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY); } }
创建队列B: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME; import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME; @Configuration public class QueueBConfig { private static final String QUEUE_B_NAME = "wd_queue_b"; private static final String DELAY_QUEUE_B_NAME = "wd_delay_queue_b"; private static final String DEAD_QUEUE_B_NAME = "wd_dead_queue_b"; private static final String QUEUE_B_ROUTING_KEY = "queue_b_routing_key"; private static final String DELAY_QUEUE_B_ROUTING_KEY = "delay_queue_b_routing_key"; private static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead_letter_queue_b_routing_key"; @Bean public Queue queueB() { return new Queue(QUEUE_B_NAME, true); } @Bean public Binding queueBBinding(@Qualifier("queueB") Queue queueB, @Qualifier("exchange") DirectExchange exchange) { return BindingBuilder.bind(queueB).to(exchange).with(QUEUE_B_ROUTING_KEY); } @Bean public Queue delayQueueB() { Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", DELAY_TIME); return new Queue(DELAY_QUEUE_B_NAME, true, false, false, args); } @Bean public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue delayQueueB, @Qualifier("delayExchange") DirectExchange delayExchange) { return BindingBuilder.bind(delayQueueB).to(delayExchange).with(DELAY_QUEUE_B_ROUTING_KEY); } @Bean public Queue deadQueueB() { return new Queue(DEAD_QUEUE_B_NAME, true); } @Bean public Binding deadQueueABinding(@Qualifier("deadQueueB") Queue deadQueueB, @Qualifier("deadExchange") DirectExchange deadExchange) { return BindingBuilder.bind(deadQueueB).to(deadExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY); } }
创建队列C: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME; import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME; @Configuration public class QueueCConfig { private static final String QUEUE_C_NAME = "wd_queue_c"; private static final String DELAY_QUEUE_C_NAME = "wd_delay_queue_c"; private static final String DEAD_QUEUE_C_NAME = "wd_dead_queue_c"; private static final String QUEUE_C_ROUTING_KEY = "queue_c_routing_key"; private static final String DELAY_QUEUE_C_ROUTING_KEY = "delay_queue_c_routing_key"; private static final String DEAD_LETTER_QUEUE_C_ROUTING_KEY = "dead_letter_queue_c_routing_key"; @Bean public Queue queueC() { return new Queue(QUEUE_C_NAME, true); } @Bean public Binding queueCBinding(@Qualifier("queueC") Queue queueC, @Qualifier("exchange") DirectExchange exchange) { return BindingBuilder.bind(queueC).to(exchange).with(QUEUE_C_ROUTING_KEY); } @Bean public Queue delayQueueC() { Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_C_ROUTING_KEY); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", DELAY_TIME); return new Queue(DELAY_QUEUE_C_NAME, true, false, false, args); } @Bean public Binding delayQueueCBinding(@Qualifier("delayQueueC") Queue delayQueueC, @Qualifier("delayExchange") DirectExchange delayExchange) { return BindingBuilder.bind(delayQueueC).to(delayExchange).with(DELAY_QUEUE_C_ROUTING_KEY); } @Bean public Queue deadQueueC() { return new Queue(DEAD_QUEUE_C_NAME, true); } @Bean public Binding deadQueueCBinding(@Qualifier("deadQueueC") Queue deadQueueC, @Qualifier("deadExchange") DirectExchange deadExchange) { return BindingBuilder.bind(deadQueueC).to(deadExchange).with(DEAD_LETTER_QUEUE_C_ROUTING_KEY); } }
创建master的消息监听RabbitListenerContainerFactory
后续使用注解 @RabbitListener 时指定ListenerContainerFactory
@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "masterListenerContainerFactory")
package com.wd.config.master; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqMasterListenerConfig { @Bean public SimpleRabbitListenerContainerFactory masterListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 设置消息转换器 factory.setMessageConverter(messageConverter); // 关闭自动ACK factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, masterConnectionFactory); return factory; } }
创建slave的消息监听RabbitListenerContainerFactory
后续使用注解 @RabbitListener 时指定ListenerContainerFactory
@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "slaveListenerContainerFactory")
package com.wd.config.slave; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqSlaveListenerConfig { @Bean public SimpleRabbitListenerContainerFactory slaveListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // 设置消息转换器 factory.setMessageConverter(messageConverter); // 关闭自动ACK factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, slaveConnectionFactory); return factory; } }