项目中,作为生产者自定义消息发送到RabbitMq。
1.引入rmq依赖
<!-- rabbitmq 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
2.创建链接、断开连接工具类。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMq 工具类
*
* @author Klay
* @date 2023/6/25
*/
public class RabbitmqUtils {
private static Channel channel = null;
private static Connection connection = null;
/**
* 获取连接
*
* @author hukelei
* @date 2023/6/25 10:37
*/
public static Channel getChannel() {
//定义连接池
ConnectionFactory factory = new ConnectionFactory();
//设置主机地址
factory.setHost("127.0.0.1");
//设置端口
factory.setPort(5672);
//设置用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//虚拟机路径
factory.setVirtualHost("/");
try {
connection = factory.newConnection();
//创建信道
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return channel;
}
/**
* 关闭连接
*
* @author hukelei
* @date 2023/6/25 10:37
*/
public static void closeConnection() {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
3.发送消息。大部分业务应用场景中,只需要发送消息到指定的交换机(exchange)中。如果业务需要创建交换机,则将注释的代码打开,创建交换机、队列,并绑定。发送消息时,将对应的交换机、路由进行替换即可。
import com.hikvision.ardatatormq.utils.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
/**
* @author Klay
* @date 2023/10/16 016
*/
@Slf4j
public class SendMessageTest {
public static void main(String[] args){
//利用写好工具类获取信道连接
Channel channel = RabbitmqUtils.getChannel();
try {
/**
*创建一个交换机
*1.交换机名称
*2.交换机类型有fanout,direct,topic,headers
*3.是否持久化
*4.设置是自动删除,当没有队列与当前交换机绑定时自动删除
*5.设置是否内置,表示内置的交换机
*6.设置其他的一些结构化参数
*/
// channel.exchangeDeclare("text_pubsub", BuiltinExchangeType.FANOUT, false, false, false, null);
/**
*1.队列名称
*2.是否持久化,持久化会存盘,重启也还存在
*3.exclusive 是否排他如果一个队列被声明为排他的队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除这里需要注意三点:排他的队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其它连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
*4.是否自动删除至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
*5.设置队列的其他配置参数
*/
// channel.queueDeclare("pubsub_queue1", false, false, false, null);
/**
*将交换机与队列绑定
*1.队列名称
*2.交换机名称
*3.routerkey(路由key)
*4.其他的绑定参数
*/
// channel.queueBind("pubsub_queue1", "text_pubsub", "routingKeyTest");
/**
*发送消息
*1.交换机名称
*2.routerkey路由key,目前没有指定双引号即可
*3.无额外配置写null
*4.消息体
*/
String msg = "发布订阅模式!!!";
channel.basicPublish("amq.topic", "routingKeyTest", null, msg.getBytes());
log.info("消息发送成功!:{}", msg);
} catch (IOException e) {
log.error("发送消息IOException:{}", e);
} finally {
//关闭连接
RabbitmqUtils.closeConnection();
}
}
}
4.进行测试。
4.1为交换机amq.topic创建一个测试队列并绑定。
4.2发送消息