本文引用的代码源自《RabbitMQ实战指南》
关键的类和接口主要有Channel、Connection、ConnectionFactory、Consumer等,它们主要的作用如下:
- Channel:实现AMQP协议层的操作
- Connection:开启信道(Channel)、注册事件处理器、关闭连接
与RabbitMQ相关的开发工作,也主要围绕Channel、Connection这两个类展开
1.连接RabbitMQ:创建Connection
知道了RabbitMQ的IP地址、端口号、用户名、密码后,可以通过以下代码连接RabbitMQ,
2.创建信道
在创建完Connection以后,就建立了当前服务与RabbitMQ的连接。下一步就是使用Connection对象建立信道
Channel channel = conn.createChannel()
创建完Channel之后,可以使用Channel对象来发送、接收消息
注意:一个Connection可以创建多个Channel对象,但是Channel对象不是线程安全的。也就是说,最好为每个线程创建一个Channel
一般来说,调用connectionFactory.newConnection()
或者connection.createChannel()
以后,Connection和Channel就处于开启状态了。如果在后续使用channel的过程中,Connection或者Channel关闭了,程序会抛出异常
3.声明交换器、绑定队列
在绑定交换器和队列前,要确保先声明交换器和队列,即前两行所示
-
channel.exchangeDeclare
是声明一个交换器。这里传入参数的意思是:声明一个名为exchangeName
的交换器,交换器类型为direct
,并且这是一个可持久化的交换器(第三个入参=true)。可持久化的意思是交换器信息会存入2磁盘,服务器重启后不会丢失交换器的信息 -
channel.queueDeclare
是声明一个队列,不带任何入参是声明一个由RabbitMQ命名的、非持久化队列 -
channel.queueBind
是绑定交换器和队列
既然交换器和队列存在绑定操作,也就必然存在解绑操作,可以通过queueUnbind
方法来解绑
4.创建队列的时机
虽然RabbitMQ使用交换器和队列来处理消息,但因为消息只存储在队列中,所以交换器实际不消耗服务器资源,只有队列会消耗服务器资源。
换句话说,是否在代码里创建队列也可以见仁见智:
- 动态创建:如果服务器资源充足,那么RabbitMQ官方建议生产者和消费者都应该显式建立队列。这样做可以确保交换器和队列是正确绑定匹配的(避免发送消息的交换器没有绑定任何队列、或者发送消息的路由键匹配不上队列,导致消息丢失)
- 静态创建:如果业务已经充分确定了队列长度,可以在上线前在服务器上以RabbitMQ命令创建好队列,这样业务层就不用再声明队列了
5.发送消息
发送消息可以使用channel.basicPulish
方法,例如:
以上代码意思是:
- 将消息发送到exchangeName交换器汇总
- 交换器根据routingKey将消息存储到RabbitMQ的相应队列中
- 消息内容是messageBodyBytes
除了byte数组,还可以使用channel.basicPulish
的重载方法,发送具有特殊格式的消息
6.消费消息
RabbitMQ有两种消费消息的方式:push(推)、pull(拉)
6.1 Push
Push模式下,消费者持续订阅队列,一般要实现Consumer接口或继承DefaultConsumer类
消费动作是调用channel.basicConsume
来实现
以上调用channel.basicConsume
的意思是:
- 从queueName队列中订阅消息
- autoAck=true时,消费者需要在接收到消息后显式地进行ack操作(调用
channel.basicAck
) - 当前消费者从队列中订阅消息的同时,也可能有其他消费者订阅相同的队列,使用consumerTag来区分多个消费者
- 最后一个入参,传入了一个DefaultConsumer的匿名实现类,这其实是消费者的回调方法。当消费者收到订阅队列的消息时,会执行重写的
handleDelivery
方法