RabbitMQ官方文档:RabbitMQ Tutorials — RabbitMQ
一、RabbitMQ安装(Linux下)
你可以选择原始的方式安装配置,也可以使用docker进行安装,方便快捷!
1. 安装docker
没有docker的先安装一下docker,我这里示例的是Ubuntu的环境下:
CentOS下安装docker参考: Docker 从入门到精通-CSDN博客
#更新系统软件包列表
sudo apt update
#安装必要的软件包以允许apt通过HTTPS使用存储库
sudo apt install apt-transport-https ca-certificates curl software-properties-common
#添加Docker的官方GPG密钥
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
#添加Docker的稳定存储库
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
#更新软件包列表以包含Docker存储库
sudo apt update
#安装Docker引擎
sudo apt install docker-ce docker-ce-cli containerd.io
#验证Docker是否正确安装
sudo docker run hello-world
2. 安装 RabbitMQ
#搜索并拉取RabbitMQ的Docker镜像
sudo docker pull rabbitmq:management
#创建一个RabbitMQ容器并启动它
sudo docker run -d --name my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:management
#检查容器的状态
sudo docker ps
解释一下这个命令的参数:
-d
:表示容器将以后台(守护进程)模式运行。--name my-rabbit
:为容器指定一个名称,这里是"my-rabbit",你也可以根据需要更改它。-p 5672:5672
:将主机的5672端口映射到容器的5672端口,这是RabbitMQ的默认端口。-p 15672:15672
:将主机的15672端口映射到容器的15672端口,这是RabbitMQ管理插件的Web界面端口。rabbitmq:management
:指定要使用的RabbitMQ镜像及其标签。
3. 访问RabbitMQ的Web界面
注意:如果是云服务器,要在安全组配置中放行对应的端口。
使用用户名 guest,密码 guest 登录
二、运行Hello World
1. 初始化工程
(1)在本地IDEA中创建一个空工程:
(2)在这个空工程下添加两个子模块:rabbitmq-producer 和 rabbitmq-consumer
(3)添加rabbitmq依赖
分别在两个子模块的pom文件中添加如下内容:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.3</version>
</plugin>
</plugins>
</build>
2. 编写生产者代码
package com.edu.chd.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.设置连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.249.112.12"); //这里对应的启动了rabbitMQ的云服务器的IP
factory.setPort(5672); //默认的端口号就是5672 不写也行
//3.创建连接
Connection connection = factory.newConnection();
//4.创建channer
Channel channel = connection.createChannel();
//5.创建队列 Queue
/*
在 RabbitMQ 中,channel.queueDeclare() 方法用于声明一个队列。该方法接受多个参数,下面是对每个参数的解释:
queue(队列名称):指定要声明的队列的名称。
durable(持久化):指定队列是否是持久化的。当 RabbitMQ 重新启动时,持久化的队列将被保留下来。如果将该参数设置为 true,则队列将被持久化;如果设置为 false,则队列不会被持久化。注意,这里指的是队列本身的持久化,而不是队列中的消息。
exclusive(排他性):指定队列是否是排他的。如果将该参数设置为 true,则该队列只能被当前连接的消费者使用,并且在连接关闭时会自动删除该队列。如果设置为 false,则队列可供多个消费者使用。
autoDelete(自动删除):指定队列在不再被使用时是否自动删除。如果将该参数设置为 true,则当队列不再被消费者使用时,将自动删除该队列。如果设置为 false,则队列不会自动删除。
arguments(参数):指定队列的其他属性和参数,以键值对的形式提供。
这些参数在声明队列时提供了一些灵活性和控制选项,以适应不同的应用场景和需求。
*/
//以下代码声明了一个名为 "hello_world" 的队列,该队列是持久化的、非排他的、不自动删除的
channel.queueDeclare("hello_world",true,false,false,null);
//6.发送消息
String body = "hello world! RabbitMQ!";
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}
运行生产者代码,访问RabbitMQ的控制页面:
可以看到已经有了一条消息在队列中
3. 编写消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.设置连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.249.112.12");
factory.setPort(5672);
//3.创建连接
Connection connection = factory.newConnection();
//4.创建channer
Channel channel = connection.createChannel();
//5.接收消息
//创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时,将自动执行该方法。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费消息内容:" + new String(body));
}
};
/*
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return this.basicConsume(queue, autoAck, "", callback);
}
参数说明:
queue(队列名称):指定要从中消费消息的队列的名称。
autoAck(自动确认):指定消费者在接收到消息后是否自动发送确认。如果将该参数设置为 true,消费者在接收到消息后会自动确认,告知 RabbitMQ 该消息已被成功处理。
如果设置为 false,则需要手动调用 channel.basicAck() 方法来发送确认。
consumerTag(消费者标签):指定消费者的标识符。如果将该参数设置为空字符串 "",RabbitMQ 会自动生成一个唯一的消费者标签。
callback(消费者回调):指定一个 Consumer 对象,用于处理接收到的消息。你可以实现自己的 Consumer 接口或使用现有的实现。
*/
channel.basicConsume("hello_world",true,consumer);
//不需要关闭资源,因为关闭了就没法从消息队列获取消息了
}
}
运行消费者代码,可以看到控制台的输出:
进入RabbitMQ控制页面查看,可以看到有一个消费者,并且刚才的消息已经被消费了: