RabbitMQ基础概念
RabbitMQ是基于AMQP协议开发的一个MQ产品。
虚拟主机 virtual host
RabbitMQ出于服务器复用的想法,可以在一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有AMQP的全套基础组件,并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的。
连接 Connection
客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接,这个连接就是Connection。
信道 Channel
一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,所以在实际业务中,对于Connection和Channel的分配也需要根据实际情况进行考量。
交换机 Exchange
RabbitMQ中进行数据路由的重要组件。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。
交换机多用来与生产者打交道。生产者发送的消息通过Exchange交换机分配到各个不同的Queue队列上,而对于消息消费者来说,通常只需要关注自己感兴趣的队列就可以了。
队列 Queue
队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序,消息最终都会被分发到不同的队列当中,然后才被消费者进行消费处理。这也是最近RabbitMQ功能变动最大的地方。最为常用的是经典队列Classic。RabbitMQ 3.8.X版本添加了Quorum队列,3.9.X又添加了Stream队列。
Classic 经典队列
这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性。
Quorum 仲裁队列
仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。
Stream队列
Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
RabbitMQ编程模型
使用RabbitMQ提供的原生客户端API进行交互。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
基础编程模型
这些各种各样的消息模型其实都对应一个比较统一的基础编程模型。
1,首先创建连接,获取Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
2,声明queue队列
channel.queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments);
3,Producer根据应用场景发送消息到queue
channel.basicPublish(String exchange, String routingKey, BasicProperties
props,message.getBytes("UTF-8")) ;
4,Consumer消费消息
定义消费者,消费消息进行处理,并向RabbitMQ进行消息确认。确认了之后就表 明这个消息已经消费完了,否则RabbitMQ还会继续让别的消费者实例来处理。
两种消费方式
1、被动消费模式,Consumer等待rabbitMQ 服务器将message推送过来再消费。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
2、另一种是主动消费模式。Comsumer主动到rabbitMQ服务器上去获取指定的
messge进行消费
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
3.Stream队列消费 在当前版本下,消费Stream队列时,需要注意三板斧的设置。
channel必须设置basicQos属性。
正确声明Stream队列。
消费时需要指定offset。
5,完成成以后关闭连接,释放资源
channel.close();