目录
1.添加依赖
2.生产者代码
3.消费者代码
4.效果
1.发送消息
2.消费消息
5.注意
1.添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
2.生产者代码
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
String exchangerName = "ex_exchanger_name";
// 交换机名称
String queueName = "ex_queue_name";
// 队列名称
ConnectionFactory connectionFactory = new ConnectionFactory();
// 创建连接工厂
connectionFactory.setHost("ip地址");
// RabbitMQ服务器地址(写自己服务器对应的ip地址)
connectionFactory.setUsername("admin");
// RabbitMQ用户名,这里是自定义用户名
connectionFactory.setPassword("123456");
// RabbitMQ密码,这里是自定义密码
connectionFactory.setPort(5672);
// RabbitMQ端口号
Connection connection = connectionFactory.newConnection();
//创建连接
Channel channel = connection.createChannel();
//创建信道
/**
* 创建交换机
* 1、交换机名称
* 2.交换机类型,direct,topic,fanout和header(这里选择direct)
* 3.指定交换机是否需要持久化,如果设置为true,那么交换机的元数据要持久化
* 4.指定交换机没有队列绑定时是否需要删除,设置为false表示不删除
* 5.Map<String,Object>类型,用来指定我们交换机其它的一些结构化参数,我们在这里直接设置为null
*/
channel.exchangeDeclare(exchangerName, BuiltinExchangeType.DIRECT,true,false,null);
/**
*生成一个队列
* 1.队列名称
* 2.队列是否需要持久化(只是队列名称持久化,而非队列中的消息)
* 3.表示队列是否私有,只有创建他的应用程序才能消费消息
* 4.队列在没有消费者订阅的情况下是否自动删除
* 5.队列的一些结构化信息,比如声明死信队列,磁盘队列会用到
*/
channel.queueDeclare(queueName,true,false,false,null);
/**
* 将我们的交换机和队列绑定
* 1.队列名称
* 2.交换机名称
* 3.路由键,在我们直连模式下,可以为我们的队列名称
*/
channel.queueBind(queueName,exchangerName,queueName);
//发送消息
String message = "hello rabbitmq";
/**
* 发送消息
* 1.发送到哪个交换机
* 2.队列名称
* 3.其它参数信息
* 4.发送消息的消息体
*/
channel.basicPublish(exchangerName,queueName,null,message.getBytes());
channel.close();//关闭信道
connection.close();//关闭连接
}
}
3.消费者代码
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 创建连接工厂
connectionFactory.setHost("ip地址");
// RabbitMQ服务器地址(写自己服务器对应的ip地址)
connectionFactory.setUsername("admin");
// RabbitMQ用户名,这里是自定义用户名
connectionFactory.setPassword("123456");
// RabbitMQ密码,这里是自定义密码
connectionFactory.setPort(5672);
// RabbitMQ端口号
Connection connection = connectionFactory.newConnection();
//创建连接
Channel channel = connection.createChannel();
//创建信道
DeliverCallback deliverCallback = (consumerTage,message) -> {
System.out.println("接收到消息"+new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTage-> {
System.out.println("消息消费中断");
};
/**
* 消费消息
* 1.消费哪个队列
* 2.消费成功后,是否需要自动应答,如果为true,则是自动应答
* 3.接收消息的一个回调函数
* 4.取消消息的回调函数
*/
channel.basicConsume("ex_queue_name",true,deliverCallback,cancelCallback);
channel.close();//关闭信道
connection.close();//关闭连接
}
}
4.效果
为了显示效果,这里需要登录RabbitMQ对应的web登录管理界面:
如果不知如何启动RabbitMQ服务或登录该管理界面,参考之前文章Rabbitmq的安装与使用(Linux版)https://blog.csdn.net/Kristabo/article/details/131965339
1.发送消息
启动Producer程序:
可以看到多了一个名称为:"ex_queue_name"的队列,同时多了一条未消费信息:
2.消费消息
启动Consumer程序
运行后,可以接收到发送的消息内容:
同时在此检查队列情况:
可以发现名称为 "ex_queue_name"的队列中已没有未读消息
5.注意
这里用到的是direct类型的交换机,如果还需要其他类型交换机相关代码参考,可关注公众号【蜗牛变涡流】,回复rabbitMQ获取完整代码