目录
1、MQ介绍
1.1、什么是MQ?
1.2、MQ的能够解决的问题
1.2.1、削峰填谷
1.2.3、异步处理
1.3、MQ的选择
1.3.1、Kafka
1.3.2、ActiveMQ
1.3.3、RocketMQ
1.3.4、RabbitMQ
2、RabbitMQ的介绍
2.1、RabbitMQ的概述
2.2、AMQP
2.3、JMS
2.4、RabbitMQ模式
3、RabbitMQ的安装
3.1、安装Erlang
3.2、RabbitMQ安装
3.3、创建用户
3.4、创建虚拟主机Virtual Hosts
4、RabbitMQ工作模式
4.1、简单模式
4.1.1、生产者
4.1.2、消费者
4.2、工作队列模式(Work queues)
4.2.1、生产者
4.2.2、消费者1
4.2.3、消费者2
编辑4.3、发布/订阅模式(Publish/Subscribe)
4.3.1、生产者
4.3.2、消费者1
4.3.3、消费者2
4.4、路由模式(Routing)
4.4.1、生产者
4.4.2、消费者1
4.4.3、消费者2
4.5、通配符模式(Topic)
4.5.1、生产者
4.5.1、消费者1
4.5.2、消费者2
4.6、发布确认模式(Publisher Confirms)
4.6.1、单次确认
4.6.2、批量确认
4.6.3、异步确认
4.6.4、速度对比
4.7、RabbitMQ模式总结
1、MQ介绍
1.1、什么是MQ?
从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
1.2、MQ的能够解决的问题
1.2.1、削峰填谷
以12306为例,假设平时可能买票的人不多,所以订单系统的QPS( 每秒查询率 )也不是很高,每秒也就处理1000个请求,但是一到节假日、春运期间可能抢票的人就非常多,并发量远远大于平时,这个时候,订单系统明显扛不住了。怎么办呢,当然我们可以设计弹性伸缩的集群,进行机器扩容,保证高可用。但是我们依然可以采用MQ来解决这个问题。
MQ的吞吐能力还是还强大的,所以我们可以设计高可用的MQ,让所有的请求都到MQ,缓存起来。这样一来高峰期的流量和数据都将积压在MQ中,流量高峰就被削弱了(削峰),然后我们的订单系统就避免了高并发的请求,它可以慢慢的从MQ中拉取自己能力范围内的消息就行处理。这样一来,高峰期积压的消息也终将被消费完,可以叫做填谷。
1.2.2、应用解耦
产品经理提需求,好多人关注了我们12306微信客户端,我们需要买票成功后在通知微信小程序。那么我们又需要修改订单系统的代码。一次还好,如果隔一段时间发生一件这样的事,那谁能忍受?
某一天,短信系统挂了,然后客户成功买到一张票,然后呢是短信也没收到,邮件也没收到,库存也没扣,这还得了。你短信系统坏了,我邮件系统好好的,凭什么影响我,让客户收不到邮件,这就不合理。 所以呢,还是各个系统之间的耦合太高了,我们应该解耦。不是有人说互联网的任何问题都可以通过一个中间件来解决吗,那么我们看MQ如何帮我们解决这件棘手的事情。
那么我们发现其实短信系统、邮件系统等都只依赖订单系统产生的一条数据那就是订单,因此我们在订单系统产生数据后,将订单这条数据发送给MQ,就返回成功,然后让短信、邮件等系统都订阅MQ,一旦发现MQ有消息,他们主动拉取消息,然后解析,进行业务处理。这样一来,就算你短信系统挂了,丝毫不会影响其他系统,而且如果后来想加一个新的系统,你也不用改订单系统的代码了,你只要订阅我们的MQ提供的消息就行了
1.2.3、异步处理
还以上面12306为例,假设我们不用MQ,那么我们的代码必然耦合在一起,下单成功后,依次要通过RPC远程调用这几个系统,然后同步等到他们的响应才能返回给用户是否成功的结果。假设每个系统耗时200ms,那么就得花费600ms
但是其实有时候我们发现,下单是个核心业务,可能压力本来就大,客户也着急知道下单是否成功,但是短信邮件等通知,可能大多数人根本不急或者压根不关心,那么我们为什么要让这些细枝末节的业务影响核心业务的效率呢,是不是有点舍本逐末。所以这个逻辑我们可以设计成异步的。我们可以当下单成功后,只需要将订单消息发给MQ,然后立即将结果返回通知客户。这才是正确的打开姿势。这样一来,我订单系统只需要告诉你MQ,我下单成功了,其他模块收到消息后,该发短信的发短信,发邮件的发邮件。因为本来MQ的性能就很好,所以这个效率一下就提升了。
1.3、MQ的选择
1.3.1、Kafka
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。
1.3.2、ActiveMQ
优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性,较低的概率丢失数据。
缺点:维护越来越少,高吞吐量场景较少使用。
1.3.3、RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
1.3.4、RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ,本章就以RabbitMQ为例。
2、RabbitMQ的介绍
2.1、RabbitMQ的概述
RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:
Messaging that just works — RabbitMQhttp://www.rabbitmq.com/
RabbitMQ的架构图
2.2、AMQP
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP是协议,类比HTTP。
2.3、JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JSM是API接口规范,类比JDBC。
2.4、RabbitMQ模式
RabbitMQ提供了7种模式:
1、简单模式
2、work-queue工作队列模式
3、Publish/Subscribe发布与订阅模式
4、Routing路由模式
5、Topics主题模式
6、RPC远程调用模式(远程调用,不太算MQ,不作介绍)
7、Publisher Confirms发布确认
官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ
3、RabbitMQ的安装
官方windows安装文档,Installing on Windows — RabbitMQ
安装注意事项: 1、 推荐使用默认的安装路径 2、 系统用户名必须是英文
百度网盘资源
链接:https://pan.baidu.com/s/1C87Piy7co6BWf9v8N-4WrQ
提取码:c17d
3.1、安装Erlang
RabbitMQ是由erlang语言开发,所以我在安装 RabbitMQ 一定要先安装Erlang环境,注意版本匹配
-
RabbitMQ Erlang Version Requirements — RabbitMQ 查看版本选择
-
https://erlang.org/download/otp_win64_25.0.exe erlang版本下载
-
右击 otp_win64_25.0.exe 以管理员身份运行 进行安装
-
安装Erlang只需要下一步下一步即可
3.2、RabbitMQ安装
-
Release RabbitMQ 3.10.10 · rabbitmq/rabbitmq-server · GitHub rabbitMQ版本下载
-
安装RabbitMQ只需要下一步下一步即可
-
右击 rabbitmq-server-3.10.10.exe以管理员身份运行 进行安装
安装完后,cmd输入services.msc打开服务,开启RabbitMQ服务
通过windows快捷键直接找到sbin命令,如果没有直接找到安装目录下找到sbin目录,以管理员身份运行。
输入下面命令,启动管理页面。
rabbitmq-plugins.bat enable rabbitmq_management
RabbitMQ在安装好后,可以访问 http://localhost:15672 ,其自带了guest/guest 的用户名和密码。
如果以上操作都进行后,仍然访问不到页面,请重启电脑再次测试。
3.3、创建用户
角色说明:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
3.4、创建虚拟主机Virtual Hosts
给用户添加虚拟主机权限
4、RabbitMQ工作模式
首先创建一个Maven项目
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
4.1、简单模式
只有一个生产者,一个消费者;生产者将消息发送到队列,消费者从队列中获取消息。
创建一个工具类,用来连接RabbitMQ
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("localhost");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/yh");
//连接用户名;默认为guest
connectionFactory.setUsername("guest");
//连接密码;默认为guest
connectionFactory.setPassword("guest");
//创建连接
return connectionFactory.newConnection();
}
}
4.1.1、生产者
public class Producer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接。只能有一个消费者监听到这队列
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//发送信息
String message="hello RabbitMQ";
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("已发送信息:"+message);
//关闭资源
channel.close();
connection.close();
}
}
4.1.2、消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由的key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
}
}
先启动消费者,再启动生产者
4.2、工作队列模式(Work queues)
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
4.2.1、生产者
public class Producer {
static final String QUEUE_NAME="simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接。只能有一个消费者监听到这队列
* 参数4:是否在不
* 使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//发送信息
for (int i = 1; i <=10 ; i++) {
String message="hello RabbitMQ"+i;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("已发送信息:"+message);
}
//关闭资源
channel.close();
connection.close();
}
}
4.2.2、消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//收到的消息
System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
}
}
4.2.3、消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//收到的消息
System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//监听消息
channel.basicConsume(Producer.QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
}
}
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看消费者消费的消息。
4.3、发布/订阅模式(Publish/Subscribe)
一个生产者发送的消息会被多个消费者获取。发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(binding)的所有的Queue上。这种模式不需要任何Routekey,需要提前将Exchange 与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以和多个Exchange绑定。如果接收到消息的Exchange没有与任何Queue绑定,则消息会丢失。
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
-
Fanout:广播,将消息交给所有绑定到交换机的队列
-
Direct:定向,把消息交给符合指定routing key 的队列
-
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
4.3.1、生产者
public class Producer {
//交换机名称
static final String FANOUT_EXCHANGE="fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1="fanout_queue_1";
static final String FANOUT_QUEUE_2="fanout_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//发送信息
for (int i = 1; i <=10 ; i++) {
String message="发布/订阅模式--"+i;
/**
* 参数1:交换机名称,如果没有指定则使用默认Default Exchange
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
System.out.println("已发送信息:"+message);
}
//关闭资源
channel.close();
connection.close();
}
}
4.3.2、消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//声明队列
channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
//队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到的消息
System.out.println("消费者1--接收到的消息:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
//不关闭资源,应该一直监听消息
}
}
4.3.3、消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到的消息
System.out.println("消费者2--接收到的消息:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
//不关闭资源,应该一直监听消息
}
}
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 fanout_exchange
的交换机,可以查看到如下的绑定:
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
4.4、路由模式(Routing)
任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue,这种模式下不需要将Exchange进行任何绑定(binding)操作,消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) -
消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 -
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
4.4.1、生产者
public class Producer {
//交换机名称
static final String DIRECT_EXCHANGE = "direct_exchange";
//队列名称
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
// 参数1:交换机名称
// 参数2:交换机类型,fanout、topic、direct、headers
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
// 发送信息到消费者1
String message = "新增了商品。路由模式;routing key 为 insert ";
channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息到消费者2
message = "修改了商品。路由模式;routing key 为 update";
channel.basicPublish(DIRECT_EXCHANGE, "update", null, message.getBytes());
System.out.println("已发送消息:" + message);
//关闭资源
channel.close();
connection.close();
}
}
4.4.2、消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT,true,false,false,null);
//队列绑定交换机
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE,"insert");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
//不关闭资源,应该一直监听消息
}
}
4.4.3、消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
//队列绑定交换机
channel.queueBind(Producer.DIRECT_QUEUE_UPDATE,Producer.DIRECT_EXCHANGE,"update");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
//不关闭资源,应该一直监听消息
}
}
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 direct_exchange
的交换机,可以查看到如下的绑定:
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
4.5、通配符模式(Topic)
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey指定主题的Queue中。就是每个队列都有其关心的主题,所有的消息都带有一个标题(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配队列。这种模式需要Routekey并且提前绑定Exchange与Queue。在进行绑定时要提供一个该队列对应的主题。‘ # ’表示0个或若干个关键字,‘ * ’表示一个关键字。如果Exchange没有发现能够与RouteKey匹配的Queue,消息会丢失。
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
4.5.1、生产者
public class Producer {
//交换机名称
static final String TOPIC_EXCHANGE = "topic_exchange";
//队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
// 参数1:交换机名称
// 参数2:交换机类型,fanout、topic、direct、headers
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
// 发送信息
String message = "新增了商品。Topic模式;routing key 为 item.insert " ;
channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "修改了商品。Topic模式;routing key 为 item.update" ;
channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 发送信息
message = "删除了商品。Topic模式;routing key 为 item.delete" ;
channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes());
System.out.println("已发送消息:" + message);
//关闭资源
channel.close();
connection.close();
}
}
4.5.1、消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,false,null);
//队列绑定交换机
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.insert");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.update");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到的消息
System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
//不关闭资源,应该一直监听消息
}
}
4.5.2、消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//队列绑定交换机
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.delete");
//接收信息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//收到的消息
System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
//不关闭资源,应该一直监听消息
}
}
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 topic_exchange
的交换机,可以查看到如下的绑定:
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
4.6、发布确认模式(Publisher Confirms)
与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。
发布确认模式有三种策略: 1、单次确认 2、批量确认 3、异步确认
4.6.1、单次确认
public class Single {
//单个确认 267ms
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
String queueName = UUID.randomUUID().toString();
//声明队列
channel.queueDeclare(queueName,true,false,false,null);
//开启消息确认发布应答模式
channel.confirmSelect();
//记录开始时间
long start = System.currentTimeMillis();
//发送1000条信息
for (int i = 1; i <=1000 ; i++) {
//模拟信息
String message=i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//单个确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("--------第"+(i)+"条信息发送成功!");
}else{
System.out.println("=========第"+(i)+"条消息发送失败!");
}
}
//记录结束时间
long end = System.currentTimeMillis();
System.out.println("共耗时:"+(end-start)+"ms");
}
}
4.6.2、批量确认
//批量确认 72ms
public class More {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
String queueName = UUID.randomUUID().toString();
//声明队列
channel.queueDeclare(queueName,true,false,false,null);
//开启消息确认发布应答模式
channel.confirmSelect();
//记录开始时间
long start = System.currentTimeMillis();
//发送1000条信息
for (int i = 1; i <=1000 ; i++) {
//模拟信息
String message=i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//批量确认
if(i%100==0){
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("--------第"+(i)+"条信息发送成功!");
}else{
System.out.println("该批消息有确认失败的,需要重新发送整批失败的消息");
}
}
}
//记录结束时间
long end = System.currentTimeMillis();
System.out.println("共耗时:"+(end-start)+"ms");
}
}
4.6.3、异步确认
//异步确认 46ms
public class Asny {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
String queueName = UUID.randomUUID().toString();
//声明队列
channel.queueDeclare(queueName,true,false,false,null);
//开启消息确认发布应答模式
channel.confirmSelect();
//消息发送成功回调函数
ConfirmCallback ackCallback=(deliveryTag, multiple)->{
System.out.println("消息发送成功");
};
//消息法送失败回调函数
ConfirmCallback nackCallback=(deliveryTag, multiple)->{
System.out.println("消息发送失败");
};
//注册监听器监听,异步通知
channel.addConfirmListener(ackCallback,nackCallback);
//记录开始时间
long start = System.currentTimeMillis();
//发送消息
int message_num =1000;
for (int i = 1; i <= message_num; i++) {
String message=i+"";
channel.basicPublish("",queueName,null,message.getBytes());
}
//记录发消息后时间
long end=System.currentTimeMillis();
System.out.println("该模式为异步批量确认模式:"+message_num+",耗时:"+(end-start)+"ms");
}
}
4.6.4、速度对比
-
同步-单独发布消息:同步等待确认,简单,但吞吐量非常有限。
-
同步-批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题很难推断出是那条消息出现了问题。
-
异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。
4.7、RabbitMQ模式总结
-
不直接Exchange交换机(默认交换机)
-
simple简单模式: 一个生产者、一个消费者,生产者生产消息到一个队列被一个消费者接收
-
work Queue工作队列模式: 一个生产者、多个消费者(竞争关系),生产者发送消息到一个队列中,可以被多个消费者监听该队列;一个消息只能被一个消费者接收,消费者之间是竞争关系
-
-
使用Exchange交换机;订阅模式(交换机:广播fanout、定向direct、通配符topic)
-
发布与订阅模式:使用了fanout广播类型的交换机,可以将一个消息发送到所有绑定了该交换机的队列
-
路由模式:使用了direct定向类型的交换机,消费会携带路由key,交换机根据消息的路由key与队列的路由key进行对比,一致的话那么该队列可以接收到消息
-
通配符模式:使用了topic通配符类型的交换机,消费会携带路由key(*, #),交换机根据消息的路由key与队列的路由key进行对比,匹配的话那么该队列可以接收到消息
-
-
发布确认模式(Publisher Confirms)
1、单次确认
2、批量确认
3、同步处理