【RabbitMQ与SpringBoot集成测试收发消息】
- 一、环境说明
- 二、实验步骤
- 三、小结
一、环境说明
- 安装环境:虚拟机VMWare + Centos7.6 + Maven3.6.3 + JDK1.8
- RabbitMQ版本:rabbitmq-server-3.8.8-1.el7.noarch.rpm
- 编程工具Idea + 运行JDK为17
二、实验步骤
-
在RabbitMQ的UI界面或命令行上
创建新的Virtual Host
,取名为vhtest02
,如下图所示:
-
使用Idea的
Spring Initializr
创建生产者工程springrabbitmqtest
,坐标如下:
-
配置application.properties,可参考添加如下内容:
spring.rabbitmq.host=192.168.36.132 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=vhtest02 spring.rabbitmq.username=sujiangming spring.rabbitmq.password=openGauss@1234
根据你自己的环境改成你自己的ip、port、virtual-host、用户名和密码。
-
编写生产者配置类,用于创建Exchange、Queue以及将两者绑定在一起,代码如下:
类名为:com.rabbitmq.springboot.config.RabbitMqConfig
,代码如下所示:package com.rabbitmq.springboot.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { //创建交换机 @Bean(name = "topicExchange") public TopicExchange topicExchange(){ return new TopicExchange("springboot_topic_exchange"); } //创建队列 @Bean(name = "topicQueueSpringBoot") public Queue topicQueue(){ return QueueBuilder.durable("springboot_topic_queue").build(); } //队列绑定交换机 @Bean public Binding bindingExchangeTopicQueue(@Qualifier("topicQueueSpringBoot") Queue queue, @Qualifier("topicExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("log.#").noargs(); } }
-
修改
com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests
类,添加注解和测试方法,具体代码如下:package com.rabbitmq.springboot; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class SpringRabbitMqTestApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws InterruptedException { //convertAndSend(交换机名称,路由key,消息内容) int temp = 0; while (true){ rabbitTemplate.convertAndSend("springboot_topic_exchange","log.info","发送了info消息" + temp); rabbitTemplate.convertAndSend("springboot_topic_exchange","log.error","发送了error消息"+ temp); rabbitTemplate.convertAndSend("springboot_topic_exchange","log.warning","发送了warning消息"+ temp); temp++; Thread.sleep(2000); } } @Test void contextLoads() { } }
该程序会一直运行,因为我加了while(true),模拟用户一直产生数据。
-
运行测试:运行
com.rabbitmq.springboot.SpringRabbitMqTestApplicationTests
类中的方法testSendMessage()
,正常运行会看到如下内容:
-
编写消费者工程,具体创建工程如步骤2所示;
-
修改application.properties,如步骤3所示,可直接复步骤3内容即可;
-
创建监听类:
com.rabbitmq.consumer.listener.MessageListener
,用于监听某个队列的消息,一旦监听到有数据,立马进行消费,代码如下:package com.rabbitmq.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息监听器 */ @Component public class MessageListener { /** * 监听某个队列的消息 * @param msg 接收到的消息 */ @RabbitListener(queues = "springboot_topic_queue") public void topicListener(String msg){ System.out.println("接收到消息:" + msg); } }
-
修改启动类:
SpringbootRabbitmqConsumerApplication
,在其类上面添加注解@ComponentScan("com.rabbitmq.consumer.*")
,如不添加该注解运行会自动退出,修改好如下图所示:
-
运行测试:运行
SpringbootRabbitmqConsumerApplication
类,正常情况下会看到如下内容:
三、小结
本文参考了来自网络上的资料,
如有侵权,请及时联系博主进行删除
。本文仅是博主本人在学习过程中作为学习笔记使用,常言道:好记性不如烂笔头。如本文对您有所帮助,请您动动发财的手指给博主点个赞
,谢谢您的阅读~~~