SpringBootMQ整合Rabbit
RabbitMQ安装以及SpringBoot整合
1、Docker安装RabbitMQ
#拉取rabbitmq镜像
docker pull rabbitmq
#启动RabbitMQ
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
#安装图形化插件
#进入容器
docker exec -it 容器id /bin/bash
#安装插件
rabbitmq-plugins enable rabbitmq_management
注意:
你在操作图形化界面时,可能会出现该问题:Management API returned status code 500,如下图:
解决方案:
#进入rabbitmq容器,修改配置文件
docker exec -it 容器id /bin/bash
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
exit
运行成功后可以访问:localhost:15672 初始化账号和密码为:guest
远端服务器如果访问不到,注意是否忘记安全组设置开放端口
2、SpringBoot集成RabbitMQ
2.1 引入依赖
父类pom文件
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.6.0</version>
</parent>
<!--依赖-->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2.2 创建生产者项目producer
application.yml
server:
port: 8081
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
RabbitMQConfig
@Configuration
public class RabbitMQConfig {
/**
* 创建交换机
* @return
*/
@Bean(name = "itemTopicExchange")
public Exchange createTopicExchange(){
return ExchangeBuilder.topicExchange("item_topic_exchange").build();
}
/**
* 创建消息队列
* @return
*/
@Bean(name = "itemQueue")
public Queue createItemQueue(){
return QueueBuilder.durable("item_queue").build();
}
/**
* 队列绑定交换机
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindingQueueToExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
TopicController
@RestController
@RequestMapping("/topic")
public class TopicController {
//用于发送MQ消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sendMessageToTopicQueue(){
rabbitTemplate.convertAndSend("item_topic_exchange", "item.insert", "商品新增,routing key 为item.insert");
rabbitTemplate.convertAndSend("item_topic_exchange", "item.update", "商品修改,routing key 为item.update");
rabbitTemplate.convertAndSend("item_topic_exchange", "item.delete", "商品删除,routing key 为item.delete");
return "生产者发送消息到消息队列";
}
}
2.3 创建消费者工程
application.yml
server:
port: 8082
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
消息监听MessageListener
@Component
public class MessageListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "item_queue")
public void myListener(String message){
System.out.println("消费者接收到的消息" + message);
}
}
启动生产者,并发送消息:
发送请求:http://localhost:8081/topic/send
消费者监听到消息后消费消息:
Gitee项目地址:rabbitmq-topic模式demo