如题,在windows下安装一个rabbitMQ server;然后用浏览器访问其管理界面;由于rabbitMQ的默认账号guest默认只能本机访问,因此需要设置允许其他机器远程访问。这跟mysql的思路很像,默认只能本地访问,要远程访问需要另外设置,并且应该是新增一个账号来支持远程。这种做法一下子看上去很奇怪,数据库、消息队列天然就是要大家共同使用的,只能本机使用有什么意义?但细想好像又没有什么毛病,虽然有点不方便,但安全意识是有了。
以下是我初次在项目中使用rabbitMQ的一点记录。
一、安装
在windows下,当然就是下载windows的安装包。但rabbitMQ依赖一种叫ErLang的东东,安装时会先检查。如果没有的话,还要去下载erlang。这破东西100多兆,比rabbitMQ的安装包大多了。喧宾夺主。
先安装erlang(就是这个otp_win64_**.exe),然后再安装rabbitmq。
二、开通管理界面
安装好rabbitmq之后,会自动在windows里创建一个服务。
安装过程中,可知rabbitMQ有两个默认端口:5672和15672。5672用于编码,15672用于管理界面。
但是rabbitmq也并不默认打开这个管理界面,需要额外设置:
1、打开RabbitMQ的安装路径的sbin目录,
比如
2、键入cmd,打开命令窗口
3、输入命令:
rabbitmq-plugins.bat enable rabbitmq_management
即可用浏览器访问管理界面。如前所示。
三、允许远程访问
至此rabbitMQ只能本机访问,比如用账号guest/guest。设置允许远程访问步骤如下:
1、创建一个新账号
当然也可以设置guest允许远程访问,但这不符合安全思想。
2、给新账号赋权限
1)点击新建的账号
2)这2个按钮都点一下
3)有权限了
3、重启rabbitMQ服务
四、java写入示例
如果每次访问rabbitMQ,都需要连接一次,开销太大,因此使用连接池,每次用完放回池中,用于下次再用。
1、rabbitMQ连接池
<!--rabbitMQ-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
/**
* 连接池
* 提高性能,不必每次发送消息都构建连接
*/
@Component
public class RabbitMQConnectionPool {
private static ObjectPool<Connection> pool;
@Value(value = "${rabbitmq.host:localhost}")
private String host;
@Value(value = "${rabbitmq.port:5672}")
private int port;
@Value(value = "${rabbitmq.username:guest}")
private String username;
@Value(value = "${rabbitmq.password:guest}")
private String password;
public RabbitMQConnectionPool() {
initializePool();
}
public Connection getConnection() {
try {
return pool.borrowObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
//返回连接到连接池
public void returnConnection(Connection connection) {
if (connection != null) {
try {
pool.returnObject(connection); // 直接返回连接
} catch (Exception e) {
e.printStackTrace();
}
}
}
@PostConstruct
private void initializePool() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setConnectionTimeout(30000); // 设置连接超时
factory.setRequestedHeartbeat(60); // 设置心跳
GenericObjectPoolConfig<Connection> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(10); // 设置最大连接数
config.setMinIdle(2); // 设置最小空闲连接数
config.setBlockWhenExhausted(true); // 允许在连接池耗尽时等待
config.setMaxWait(Duration.ofMillis(10000)); // 设置最大等待时间
pool = new GenericObjectPool<>(new BasePooledObjectFactory<Connection>() {
@Override
public Connection create() throws Exception {
return factory.newConnection();
}
@Override
public void destroyObject(PooledObject<Connection> pooledObject) throws Exception {
Connection conn = pooledObject.getObject();
if (conn != null) {
conn.close();
}
}
@Override
public boolean validateObject(PooledObject<Connection> pooledObject) {
Connection conn = pooledObject.getObject();
return conn != null && conn.isOpen();
}
@Override
public PooledObject<Connection> wrap(Connection conn) {
return new DefaultPooledObject<>(conn);
}
},config);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、发送处理器
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 发送类,外部可调用其中的发送方法
*/
@Component
public class RabbitMQSender {
@Autowired
private RabbitMQConnectionPool connectionPool;
private final int MaxRetries = 5; // 最大重试次数
public boolean sendMessage(String queueName, String message){
return sendMessage(queueName,message,null);
}
/**
* category:业务类型
*
* 发送时如果连接失败,自动重连,直至成功或重连次数超标
*/
public boolean sendMessage(String queueName, String message,String category) {
boolean ok = true;
int attempt = 0;
while (attempt < MaxRetries) {
/**
* 定义:Channel 是在一个 Connection 上创建的虚拟连接。
* 作用:通道用于实际的消息传递操作,包括发送和接收消息、声明队列、交换机等。
* 连接是底层的 TCP 连接,而通道是基于连接的轻量级虚拟连接,用于处理具体的消息传递操作。
* 使用连接池来复用 Connection,同时为每个操作创建和关闭 Channel,可以提高性能和资源利用率。
*/
Connection connection = null;
Channel channel = null;
try {
connection = connectionPool.getConnection();
if (connection == null) {
System.out.println("Failed to get connection, retrying...");
attempt++;
Thread.sleep(1000); // 等待一段时间后重试
continue;
}
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
System.out.println(String.format(" [%s] Sent to '%s',length:%d", category != null ? category : "x",
queueName, message.length()));
break; // 发送成功后退出循环
} catch (Exception e) {
attempt++;
System.out.println("An error occurred, retrying...");
e.printStackTrace();
} finally {
// 确保通道和连接在这里被关闭
try {
if (channel != null) {
channel.close();
}
} catch (Exception e) {
e.printStackTrace();
}
if (connection != null) {
// 返回连接到连接池,而不是关闭它
connectionPool.returnConnection(connection);
}
}
}
if (attempt >= MaxRetries) {
ok = false;
System.out.println("Failed to send message after " + MaxRetries + " attempts.");
}
return ok;
}
}
3、调用示例
@Autowired
private RabbitMQSender rabbitMQSender;
if (!rabbitMQSender.sendMessage(QueueName, jsonStr, "测试信息")) {
System.err.println("发送测试信息失败");
}
参考文章
Windows下开启rabbitMQ的图形界面
【RabbitMQ】超详细Windows系统下RabbitMQ的安装配置