说明:RabbitMQ(官网:)是一门异步通讯技术,使用异步通讯技术,可解决同步通讯的一些问题。
安装
本文介绍在云服务器上安装RabbitMQ,操作系统是CentOS 7,远程连接工具是WindTerm;
第一步:拉取镜像
镜像版本选择,3.8-management
docker pull rabbitmq:3.8-management
我这里显示已经安装过,首次安装会下载一些文件,等下载完成即可;
第二步:运行容器
输入下面命令,运行RabbitMQ容器,设置登录账号为:root,密码:123456
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
第三步:登录MQ平台
可打开浏览器,输入:IP:15672,进入RabbitMQ的管理平台
查看设置的账户信息;
使用
打开IDEA,创建一个项目(rabbitmq_demo),项目下有两个服务模块(发送者sender,接受者receiver)
第一步:添加依赖
在父模块的pom.xml文件中添加这三个依赖,
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<!--lombok依赖,用于生成set、get、toString方法-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
第二步:编码
父模块不写代码,两个子模块需要创建对应的启动类,然后具体代码写在对应的测试类里;
发送者测试代码(SenderTest.java)
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSender() {
rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!");
}
}
发送者配置文件(application.yml),注意设置对应的IP地址
spring:
rabbitmq:
# MQ ip地址
host: xx.xx.xx.xx
# MQ的端口号
port: 5672
# 虚拟主机 每个用户单独对应一个 不同用户之间无法访问彼此的虚拟主机
virtual-host: /
# 用户名
username: root
# 密码
password: 123456
接收者代码(ReceiverTest.java),注意修改对应的IP地址
import com.rabbitmq.client.*;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class ReceiverTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("xxx.xxx.xxx.xxx");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "demo.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息.....");
}
}
第三步:启动测试
因为是首次使用,此时RabbitMQ管理平台还没有队列,如果启动发送端,消息会被丢失,因为找到不到对应的队列。所以,可以先启动接收端,接收端会在管理平台创建对应的队列,或者可以先在管理平台手动创建队列。
(手动创建队列)
(启动接收端测试代码,此时RabbitMQ管理平台会创建一个demo.queue队列)
(启动发送端测试代码,发送端可以接收到消息)
第四步:关闭接收端
此时,关闭接收端,再次启动发送端代码,程序不会报错。打开管理平台可以在队列中看到有一个消息在准备中
点击队列名(demo.queue),可以查看到消息的内容