在上一节中,我们使用docker部署了RabbitMQ,这一节我们将写一段生产者和消费者的代码。将用到rabbitmq的原生API来进行生产和发送消息。
一、准备工作
开始前,我们先在RabbitMQ控制台建相好关的数据
本机的RabbitMQ部署机器是192.168.56.201
其中控制台的地址是
http://192.168.56.201:15672/
输入控制台的账号后,可以进入
1、我们先建好一个用户
用户名:hello,密码:world
2、再建Virtual Host:virtual01
3. 为User设置访问Virtual hosts权限
设置好后,hello用户就有virtual01的权限了
二、代码
先引入依赖,由于我们后续要用springboot来写生产者消费者代码,这里我们就直接引springboot的包了。如果只想用原生的客户端,可以引原生的包。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
生产者和消费者代码如下
public class RabbitMqSimpleTest {
private static final String EXCHANGE_NAME = "hello_exchange";
private static final String QUEUE_NAME = "hello_queue";
private static final String ROUTING_KEY = "hello_routing";
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.201");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("hello");
connectionFactory.setPassword("world");
connectionFactory.setVirtualHost("virtual01");
//获取TCP长连接
Connection conn = connectionFactory.newConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
//第一个参数:队列名称ID
//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
//其他额外的参数, null
//手动创建一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//exchange 交换机
//队列名称
//额外的设置属性
//最后一个参数是要传递的消息字节数组
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (System.currentTimeMillis() + ",hello this is my first message!").getBytes());
channel.close();
conn.close();
System.out.println("===发送成功===");
}
@Test
public void consumer() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.201");
connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
connectionFactory.setUsername("hello");
connectionFactory.setPassword("world");
connectionFactory.setVirtualHost("virtual01");
//获取TCP长连接
Connection conn = connectionFactory.newConnection();
//创建通信“通道”,相当于TCP中的虚拟连接
Channel channel = conn.createChannel();
//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//从MQ服务器中获取数据
//创建一个消息消费者
//第一个参数:队列名
//第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
//第三个参数要传入DefaultConsumer的实现类
channel.basicConsume(QUEUE_NAME, false, new Receiver(channel));
}
}
消费者回调实现
public class Receiver extends DefaultConsumer {
private Channel channel;
//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
public Receiver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String message = new String(body);
System.out.println("消费者接收到的消息:"+message);
System.out.println("消息的TagId:"+envelope.getDeliveryTag());
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
运行一下send发送消息,成功了。
去控制台后台看一下
队列成功创建好了
消息发送成功了,有一条待消费的消息在队列里面
可以在这里查看到刚才发送的消息内容
在这里可以看到queue和exchange的绑定关系
控制台还有很多有意思的功能,大家可以下来尝试一下。
同时启动消费者,也能成功消费