一、消息队列MQ
中间件
1.1 什么是消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
1.2 为何用消息队列
支付成功
service层功能:(操作)
1.修改订单状态;update 订单表
2.扣减库存;update 库存表
3.淘金币;update 淘金币表
4.发消息;A a,B b
5.发放优惠卷;insert 用户优惠卷表
业务service | 流程 |
---|---|
支付 | 成功:1,2,3,4,5|54321|24531 |
发货 | 1 |
客服聊天 | 4 |
从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。
1.3 RabbitMQ
RabbitMQ是一个消息代理。核心思想:接收,保存,转发消息。是目前非常热门的一款消息中间件。
1.4 特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
-
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
-
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
-
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
-
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
-
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
-
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
-
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
-
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
-
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
应用场景
-
异步处理:把所有消息保存在中间件中,等到需要处理的时候再出处理消息;
-
流量销峰:短时间内访问量突然增加,使用mq,进行流量销峰,直接拒绝多余的请求;
-
日志处理:
-
应用程序解耦:A服务向B服务发送请求,B服务需要修改业务逻辑,A发送的请求全部保存在消息队列中先不处理,等到B服务器修改完成重新部署完成之后,再读取队列中的信息,对A服务器发送的请求进行处理。
1.4 RabbitMQ 中的概念模型
考试:
生产者:学生(提交试卷到FTP)
队列:FTP(保存试卷)
消费者:老师(从FTP上获取试卷,批改)
消息:试卷
消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
-
producing的意思是发送。一个发送消息的程序叫做producer
-
queue,即队列,它由RabbitMQ管理。尽管消息会在你的应用和RabbitMQ之间流过,但他们只被保存在队列中。队列没有边界限制,你想存多少消息就能存多少——它本质上是一个无限制的缓冲区。一个队列可以接收多个producer的消息,也可以被多个consumer读取。
-
consuming的意思类似于接收。一个等待接收消息的程序叫做consumer
消息流
RabbitMQ 基本概念
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
RabbitMQ 内部结构
-
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
-
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
-
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
-
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
-
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
-
Connection
网络连接,比如一个TCP连接。
-
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
-
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
-
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
-
Broker
表示消息队列服务器实体。
RabbitMQ 常用交换器
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct(直联)、fanout(扇形)、topic(主题)、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
生产者A/B ---> 队列A/队列B --->消费者A/B/C
direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“
”。#匹配0个或多个单词,
匹配不多不少一个单词。
二、RabbitMQ 安装
2.1 tar安装
2.1.1 安装文件准备
上传文件到对应文件夹下
-
rabbitmq-server-generic-unix-3.6.1.tar.xz
2.1.2 安装Erlang
由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。
安装编辑工具
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
创建文件夹erlang
makdir erlang makdir rabbitmq
上传安装包并解压
tar -zxvf otp_src_22.0.tar.gz
进入解压目录下,配置安装路径
cd otp_src_22.0 ./configure --prefix=/usr/local/erlang
安装
make & make install
配置erlang环境变量
echo 'export PATH=$PATH:/opt//erlang/bin' >> /etc/profile
刷新环境变量
source /etc/profile
检验是否安装成功
#输入命令检验是否安装成功,如下输出表示安装成功 erl # 输入halt().命令退出 halt().
2.1.3 安装RabbitMQ
由于是tar.xz格式的所以需要用到xz,没有的话就先安装
yum install -y xz
第一次解压,解压为tar
/bin/xz -d rabbitmq-server-3.7.15.tar.xz
第二次解压
tar -xvf rabbitmq-server-3.7.15.tar
改名
mv rabbitmq-server-3.7.15.tar rabbitmq
配置环境变量
echo 'export PATH=$PATH:/opt/rabbitmq/rabbitmq/sbin' >> /etc/profile
刷新环境变量
source /etc/profile
2.2 rpm安装
tar包:压缩文件,windows中的zip包
rpm包:安装文件,windows中的安装包,.exe文件,自动处理软件之间的依赖关系
2.2.1 安装文件准备
2.2.2 安装Erlang
由于RabbitMQ是基于Erlang(面向高并发的语言)语言开发,所以在安装RabbitMQ之前,需要先安装Erlang。
执行rpm -ivh xxx.rpm安装
yum -y install epel-release yum -y install socat rpm -ivh erlang-23.2.1-1.el7.x86_64.rpm rpm -ivh rabbitmq-server-3.8.14-1.el7.noarch.rpm
https://pkgs.org/download/libcrypto.so.1.1(OPENSSL_1_1_0)(64bit)
2.3 Rabbitmq命令
启动
启动:
-detached在后台启动Rabbit
rabbitmq-server -detached
停止:
rabbitmqctl stop
状态:
rabbitmqctl status
开启web插件
rabbitmq启动之后默认有很多插件可以使用∶
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
访问:http://127.0.0.1:15672/
默认账号密码:guest guest(这个账号只允许本机访问)
rabbitmq默认只能使用localhost访问,windows无法直接访问
添加管理员账号
角色:
administrator:管理员
managment:普通管理员
# 添加用户 # rabbitmqctl add_user Username Password rabbitmqctl add_user rabbitadmin 123456 # 分配用户标签 # rabbitmqctl set_user_tags User Tag #[administrator]:管理员标签 rabbitmqctl set_user_tags rabbitadmin administrator
控制台
创建交换机
默认
创建队列
绑定
交换机和队列做绑定
三、RabbitMQ工作模式
3.1 引入依赖关系
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.11.0</version> </dependency>
3.2 队列模式(消费者)
队列模式不需要生命交换机,使用交换机,交换机使用默认交换机(直连交换机,key=队列名字)
1. 简单队列
一个生产者对应一个消费者
获取连接
package test.mq; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.190.130"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("sn"); factory.setPassword("sn"); Connection connection = null; try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; } }
生产者
queueDeclare(name,durable,exclusive,autoDelete);
-
name: 队列的名称;
-
durable: 是否持久化;
-
exclusive: 是否独享、排外的;
-
autoDelete: 是否自动删除;
/** * 〈简单队列——消息生产者〉 */ public class Producer { private final static String QUEUE_NAME = "my-que"; public static void main(String[] args) throws Exception { sendMessage(); } public static void sendMessage() throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection(); //2、声明信道 Channel channel = connection.createChannel(); //3、声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定义消息内容 String message = "hello rabbitmq "; //5、发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息:" + message ); //6、关闭通道 channel.close(); //7、关闭连接 connection.close(); } }
管理工具中查看消息
消费者
/** * 〈消息消费者〉 */ public class Customer { private final static String QUEUE_NAME = "my-que"; public static void main(String[] args) throws Exception { getMessage(); } public static void getMessage() throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection(); //2、声明通道 Channel channel = connection.createChannel(); //3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定义队列的消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "utf-8"); System.out.println("接收的消息:" + msgString); } }; //5、监听队列 /* true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费 false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈, 如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其 发送消息,直到该消费者反馈。 */ channel.basicConsume(QUEUE_NAME, true, consumer); } }
2.Work模式
一个生产者,多个消费者,每个消费者获取到的消息唯一,默认轮询获取。
2.1 轮询分发
不关心速度,效率低,平均,123123123
生产者
/** * 〈轮询分发——生产者〉 */ public class Send { private static final String QUEUE_NAME = "my-que"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello " + i; System.out.println("[mq] send:" + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); } channel.close(); connection.close(); } }
消费者
创建两个消费者
-
消费者1:每接收一条消息后休眠1秒
-
消费者2:每接收一条消息后休眠2秒
/** * 〈消费者1〉 */ public class Receive1 { private static final String QUEUE_NAME = "my-que"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel、 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Receive1 msg:" + msg); try { // 注:消费者2修改为2秒,其他一样 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
结果
work模式默认轮询分发,将消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。
2.2 公平分发
消费者关闭自动应答,开启手动回执,消费者完成业务接口方法后可以告知消息队列处理完成,消息队列从队列中取一条消息发送给消费者。
效率高的消费者消费消息多。
/* * 〈消费者1〉 */ public class Receive1 { private static final String QUEUE_NAME = "my-que"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel、 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(0,1,false); //定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Receive1 msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); //手动回执 // 应答成功,basicAck(消息标识符,是否批量应答) // channel.basicAck(envelope.getDeliveryTag(), true); // 应答失败,basicReject(消息标识符,是否重新发送) channel.basicReject(envelope.getDeliveryTag(), false); } } }; boolean autoAck = fasle; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
3.3 交换器模式(交换机)
在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers
3.发布/订阅模式(Fanout)
未支付的订单--付款:消息(用户编号,商品编号,付款金额)
-
queue-a:修改订单状态 1s
-
queue-b:发消息 1s
-
queue-c:发红包 1s
一个生产者发送的消息会被多个消费者获取(一样的)。发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(binding)的所有的Queue上。这种模式不需要任何Routekey,需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以和多个Exchange绑定。如果接收到消息的Exchange没有与任何Queue绑定,则消息会丢失。
生产者
/** * 〈订阅模式——生产者〉 */ public class Send { private static final String EXCHANGE_NAME = "my-fanout-ex"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发 //发送消息 String msg = "hello exchange"; System.out.println("[mq] send:" + msg); channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); } }
消费者
两个消费者绑定不同的队列,绑定相同的交换机
/** * 〈消费者1〉 */ public class Receive1 { private static final String QUEUE_NAME = "my-que-a"; private static final String EXCHANGE_NAME = "my-fanout-ex"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //定义一个消费这 Consumer consumer = new DefaultConsumer(channel) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Receive1 msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
/** * 〈消费者2〉 */ public class Receive1 { private static final String QUEUE_NAME = "my-que-b"; private static final String EXCHANGE_NAME = "my-fanout-ex"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //定义一个消费这 Consumer consumer = new DefaultConsumer(channel) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Receive1 msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
两个消费者获得了同一条消息。一个消息从交换机同时发送给了两个队列中,监听这两个队列的消费者消费了这个消息;如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。
4.路由模式(Direct)
任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue,这种模式下不需要将Exchange进行任何绑定(binding)操作,消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。如果vhost中不存在该队列名,消息会丢失。
也就是让消费者有选择性的接收消息
生产者
/** * 〈路由模式-消息发送者〉 */ public class Send { public static final String EXCHANGE_NAME = "my-direct-ex"; public static final String ROUTING_KEY = "rt-a"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg = "route message ->" + ROUTING_KEY; System.out.println("对 " + ROUTING_KEY + " 发送消息:" + msg); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes()); //关闭连接 channel.close(); connection.close(); } }
消费者
两个消费者,绑定相同的交换机,不同的队列,不一样的路由
/** * 〈接收消息1〉 */ public class Receive1 { public static final String QUEUE_NAME = "my-que-a"; public static final String EXCHANGE_NAME = "my-direct-ex"; public static final String ROUTING_KEY_A = "rt-a"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置预读取数 channel.basicQos(1); //绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_A); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
/** * 〈接收消息2〉 */ public class Receive2 { public static final String QUEUE_NAME = "my-que-b"; public static final String EXCHANGE_NAME = "my-direct-ex"; public static final String ROUTING_KEY_A = "rt-a"; public static final String ROUTING_KEY_B = "rt-b"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置预读取数 channel.basicQos(1); //绑定交换机和路由器,可以绑定多个路由 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_A); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_B); //定义消息消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg); } }; //接收消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。
5.主题模式(Topic)
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。在进行绑定时要提供一个该队列对应的主题。‘ # ’表示0个或若干个关键字,‘ * ’表示一个关键字。如果Exchange没有发现能够与RouteKey匹配的Queue,消息会丢失。
一个关键字是一个单词:root
/** * 〈主题模式-消息发送者〉 */ public class Send { public static final String EXCHANGE_NAME = "my-topic-ex"; public static final String ROUTING_KEY = "rt-key"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = ROUTING_KEY + ".publish"; // String routingKey = ROUTING_KEY + ".add"; // String routingKey = ROUTING_KEY + ".update"; String msg = "route message ->" + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("对 " + routingKey + " 发送消息:" + msg); //关闭连接 channel.close(); connection.close(); } }
消费者
/** * 〈接收消息1〉 */ public class Receive1 { public static final String QUEUE_NAME = "my-que-a"; public static final String EXCHANGE_NAME = "my-topic-ex"; public static final String ROUTING_KEY = "rt-key"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置预读取数 channel.basicQos(1); //绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
/** * 〈接收消息1〉 */ public class Receive1 { public static final String QUEUE_NAME = "my-que-b"; public static final String EXCHANGE_NAME = "my-topic-ex"; public static final String ROUTING_KEY = "rt-key"; public static void main(String args[]) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //获取channel Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //设置预读取数 channel.basicQos(1); //绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
3.4 总结
-
队列模式:关注是一个队列有几个消费者,发布者向队列发送消息(使用服务器默认得交换机,direct,key=队列名)
-
简单队列模式:一个生产者一个队列一个消费者
-
Work模式:一个生产者一个队列多个消费者
-
轮询分发:123123123消息评分分配
-
公平分发:效率高得多分
-
-
-
交换机模式:关注是接收消息得交换机类型,发布者向交换机发送消息
-
发布订阅模式:fanout扇形交换机,没有routekey,所有和交换机绑定得队列都接收消息
-
路由模式:direct直连交换机,有routekey(不能使用通配符),根据routekey对应队列接收消息
-
主题模式:topic主题交换机,有routekey(使用通配符),根据routekey匹配对应队列接收消息
-