提前说一下,创建队列,交换机,绑定交换机和队列都是在生产者。消费者只负责监听就行了,不用配其他的。
完成这个场景需要两个服务哦。
1直连交换机-生产者的代码。
在配置类中创建队列,交换机,绑定交换机和队列
@Configuration public class DirectRabbitConfigTest { //队列 @Bean public Queue TestDirectQueue(){ return new Queue("TestDirectQueue",true); } //交换机 @Bean DirectExchange TestDirectExchange(){ return new DirectExchange("TestDirectExchange",true,false); } //绑定 @Bean Binding bindingDirect(){ return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");//TestDirectRouting为路由键 } }
发送消息,访问这个接口就行。
@Controller public class test001 { @Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok"; } }
2直连交换机-消费者的代码
@Component @RabbitListener(queues = "TestDirectQueue")//监听的队列 public class DirectReceiver { @RabbitHandler public void process(Map testMessage){ System.out.println("消息one:"+testMessage.toString());//这个map就是发送的消息 } }
直连交换机总结:你可以自己加几个消费者,发送消息你会发现消息被轮询了。
访问@GetMapping("/sendDirectMessage")消息就发出并被接收。
3topic交换机-生产者的代码
注意:需要自己去控制台添加firstQueue和secondQueue队列,不然消费者启动不起来。原因未知,在直连交换机的时候机器自己创建队列了,topic不知道为啥不行。
@Configuration public class TopicRabbitConfigTest { //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { //return new Queue(TopicRabbitConfigTest.man); return new Queue("firstQueue"); } @Bean public Queue secondQueue() { //return new Queue(TopicRabbitConfigTest.woman); return new Queue("secondQueue"); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
发送消息,访问这两个接口就行。
@Controller public class TopicTest002 { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; } @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok"; } }
4topic交换机-消费者的代码
需要监听两个队列
@Component @RabbitListener(queues = "firstQueue")//监听的队列 public class TopicManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString()); } }
@Component @RabbitListener(queues = "secondQueue")//监听的队列 public class TopicWoManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicTotalReceiver消费者收到消息 : " + testMessage.toString()); } }
先访问@GetMapping("/sendTopicMessage1")接口,再访@GetMapping("/sendTopicMessage2")接口,结果如下:
总结:绑定和发送的时候都要设置路由键。
5fanout交换机-生产者
注意:这种交换机也需要自己去控制台创建队列。fanout交换机不需要设置路由key
@Configuration public class FanoutRabbitConfigTest { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
发送消息,访问这个接口就行。
@Controller public class FanoutTest003 { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendFanoutMessage") public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, map); return "ok"; } }
6fanout交换机-消费者
@Component @RabbitListener(queues = "fanout.A")//监听的队列 public class FanoutA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString()); } }
@Component @RabbitListener(queues = "fanout.B")//监听的队列 public class FanoutB { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString()); } }
@Component @RabbitListener(queues = "fanout.C")//监听的队列 public class FanoutC { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString()); } }
总结:三个消费者都收到了消息
重点:直连交换机不管绑定多少个队列,消息会到达所有队列,然后去消费者方进行轮询。