一 RabbitMQ下载
RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安装
1 安装erlang环境
安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的
执行安装指令如下:
rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
执行后如下图:
验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:
2 安装RabbitMQ
执行安装RabbitMQ指令如下:
rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
执行安装中,如下图:
注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图:
如果出现上图的错误,请参考上一步重新安装erlang环境即可。
安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?
只需一条指令可查询当前状态信息:
rabbitmq-diagnostics status
执行后如下图:
从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:
- 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
- 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!
3 配置RabbitMQ(可选项)
安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:
vi /etc/rabbitmq/rabbitmq.config
配置文件内容:
[
{rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},
{rabbitmq_management, [
{listener, [{port,59876}, {ssl, false}]}
]}
].
通过配置配置文件实现变更:
- 客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
- 管理端口 59876 用于RabbitMQ的Web管理。
再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:
上图可以看到刚刚创建的配置文件已被引用状态。
4 RabbitMQ 启动与关闭
RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
#启动
systemctl start rabbitmq-server
#停止关闭
systemctl stop rabbitmq-server
#重启
systemctl restart rabbitmq-server
#开机启动
systemctl enable rabbitmq-server
#查看状态
systemctl status rabbitmq-server
操作如下图:
5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)
RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:
rabbitmq-plugins enable rabbitmq_management
我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。
新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.
#新增人员
rabbitmqctl add_user hua abc123uuPP
#设置权限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"
#设置为管理员
rabbitmqctl set_user_tags hua administrator
*
表示授予该用户对该虚拟主机上所有队列和交换机的 configure
、write
和 read
权限。
- 第一个
".*"
表示用户可以配置任意队列和交换机。 - 第二个
".*"
表示用户可以向任意队列和交换机发送消息。 - 第三个
".*"
表示用户可以从任意队列中消费消息。
执行过程如图:
执行上面命令增加一个Web管理员:
- 用户名称:hua
- 密码:abc123uuPP
- 权限 :管理员
如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:
- 默认用户:guest
- 默认密码:guest
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陆
进入RabbitMQ Web 登陆页面如下:
首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:
使用上面新建的账号hua登陆,登陆成功如下图:
2 用户管理
用户管理,用户增加操作简单,如下图:
3 用户权限设置
用户管理,用户权限设置操作简单,如下图:
四 java代码接入
方式一 java通用:
1 引入mvn依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
JAVA 连接RabbitMQ生产消息与接收消费测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-21 18:01
*/
public class TestRabbitMQ {
private final static String QUEUE_NAME = "hello";
public static void main1(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_consumer");
factory.setPassword("java_consumer");
// 连接到 RabbitMQ 服务器
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列(确保队列存在)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义回调函数,当有消息送达时执行
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
}
测试运行发送消息,发送成功。如下图:
测试运行接收消息,消费成功。如下图:
上面测试通过后,改成服务类方便生产环境使用来发送消息代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hua
* @date 2024-08-22
*/
@Service
public class RabbitMqServiceImpl {
private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);
private static final String QUEUE_NAME = "test";
private Connection connection;
private Channel channel;
public RabbitMqServiceImpl() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xx.xx.xx.xx");
factory.setPort(51091);
factory.setUsername("java_producer");
factory.setPassword("java_producer");
//如果不指定虚拟机默认会使用/
factory.setVirtualHost("test");
try {
this.connection = factory.newConnection();
this.channel = connection.createChannel();
this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);
logger.info("RabbitMqServiceImpl initialized successfully.");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());
throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);
}
}
public void sendMessage(String message) {
try {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
logger.error("Failed to send message: {}", e.getMessage());
}
}
public void close() {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
方式二 SpringBoot框架使用
mvn依赖包:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
spring配置文件:
Spring:
rabbitmq:
host: xx.xx.xx.xx
port: 51091
username: java_consumer
password: java_consumer
virtual-host: hellow
connection-timeout: 6000
JAVA代码:
发送消息java代码:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}
}
接收消息java代码:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author hua
* @date 2024-08-22
*/
@Component
public class RabbitListener {
private static final Logger logger = LogManager.getLogger(RabbitListener.class);
@RabbitListener(queues = "test")
public void receiveMessage(String message) {
try {
System.out.println("rabbit rev <- "+message);
//具体业务
} catch (Exception e) {
e.printStackTrace();
logger.error("rabbit err= ", e);
}
}
}
上面代码在生产发送消息时通过编码方式更灵活,接收直接使用注解更简单。