一、简单的生产者-消费者
1.1、创建连接工具类获取信道
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个链接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 链接RabbitMQ的队列
factory.setHost("192.168.116.3");
factory.setVirtualHost("/test");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123");
//创建链接
Connection connection = factory.newConnection();
//获取链接当中的信道
Channel channel = connection.createChannel();
return channel;
}
}
1.2、创建生产者
步骤:
①生产者连接RabbitMQ服务端,获取信道(即连接)
②由信道声明一个队列(调用queueDeclare方法),其中参数表示:
1.队列名称
2.队列里面的消息是否持久化(磁盘),默认情况下消息存储在内存中(并不进行持久化)
3.该队列是否只供一个消费者进行消费 是否进行消息的共享,true可以多个消费者消费,默认为false:只能一个消费者消费
4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true自动删除 false不自动删除
5.其他参数(后续使用时说明)
③控制台输入消息内容并发送,发送调用basicPublish方法,其中参数表示:
1.发送到哪个交换机(入门采用默认交换机,后续使用时详细说明其用法)
2.路由的key值是哪个 本次是队列名称
3.其他参数信息(后续使用时说明)
4.发送消息的消息体
public class Task01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕:" + message);
}
}
}
1.3、创建消费者
步骤:
①消费者连接RabbitMQ服务端,获取信道(即连接)
②信道调用basicConsume方法接收消息,其中参数表示:
1.消费哪个队列
2.消费成功之后是否自动应答 true代表的是自动应答 false代表的是手动应答
3.当一个消息发送过来后的回调接口,即接收到消息如何处理
4.消费者取消消费的回调
public class Worker01 {
//队列的名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println("工作线程2接收到的消息:" + new String(message.getBody()));
};
/**
取消消费的回调
*/
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println(consumeTag + "消费者取消消息消费接口回调逻辑");
};
System.out.println("T2等待接收消息......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
二、轮训分发-生产者消费者代码
轮训分发顾名思义就是生产者将消息发送到MQ当中时,由多个消费者来处理这些消息,消费者1一条,消费者2一条,以此类推,直到MQ当中的消息消费完。
2.1、生产者
此处采用消息持久化策略(添加MessageProperties.PERSISTENT_TEXT_PLAIN),即将队列和队列当中的信息持久化到磁盘当中。
public class Task {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
boolean durable = true;//表示开启消息持久化
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.nextLine();
//设置生产者发送的消息为持久化消息(要求保存到磁盘上)
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
2.2、消费者1
此处将消费者1设置为手动应答,自动应答则是消费者一旦接收到消息就告知队列,队列则将该消息从队列中删除,手动应答是为了确保消息确确实实被处理掉了才告知队列进行删除,以防处理时出现问题导致消息未被完全处理就丢弃的情况。
public class Consumer01 {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println("C1接收到的消息:" + new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
* 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
/**
* 采用手动应答
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
2.3、消费者2
消费者2和消费者1一样采取手动应答,确保消息的正确处理。
public class Consumer02 {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println("C2接收到的消息:" + new String(message.getBody(),"UTF-8"));
//手动应答
/**
* 1.消息的标记 tag 每一条消息都会有唯一标识 此处表示应答哪一条消息
* 2.是否批量应答 false:不批量应答信道当中的消息(仅回答此消息)
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
/**
* 采用手动应答
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
2.4、关于自动应答和手动应答
自动应答可以保证高吞吐量,但是保证不了数据传输安全性。一方面,当采用自动应答时,队列发送完消息,当消息还未被消费者接收到时消费者一旦断开连接,此条消息则会丢失。另一方面,例如消费者端并没有设置传递消息的数量限制,队列持续的发消息给消费端,然而消费者端处理消息的能力低下,导致消息大量积压,最终导致内存耗尽,此时消费者线程会被操作系统杀死,那么这些消息依然没有被消费就丢失了。手动应答可以完美的规避自动应答的弊端,但是,手动应答的吞吐量并不高,因此采用哪种应答方式需要根据实际应用场景进行权衡。
消息应答的方法有两种:
三、消息重新入队
参考二当中的代码,引入线程沉睡表示消息处理的效率,让消费者1处理一条消息仅需1s,消费者2处理一条消息需要30s,生产者发送AA\BB\CC\DD四条消息,按照上述逻辑,消费者1会打印AA\CC,消费者2对打印BB\DD,当生产者发送完DD时就将消费者2线程杀死。观察消费者1的打印台情况,可以观察到DD则由消费者1进行消费了,当MQ未收到消费者2对于DD的应答,DD这条消息依然存放在MQ当中,由于消费者2断开了连接,此时MQ就会将DD发送到消费者1的信道当中,由消费者1处理DD这条消息。
生产者:
消费者1:
消费者2:
四、不公平分发
参考三当中的线程沉睡,我们类比成消费者1的处理能力高效,消费者2的处理能力低下,此处采用不公平分发,即能者多劳,让闲着的线程处理更多的消息。在消费者1和消费者2当中分别引入如下代码:
/**
* 设置不公平分发
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
生产者发送AA\BB\CC\DD四条消息,消费者1和消费者2打印台执行效果和三当中的效果一样,唯一区别在于我们此时并没有强制杀死消费者2线程。
五、预取值
预取值是对不公平分发的进一步优化,所谓预取值,举个例子,此时队列当中有1\2\3\4\5\6\7\8,共8条消息,设置消费者1的预取值为5,消费者2的预取值为2,表示将这8条消息当中的5条发送给消费者1的信道当中,将2条发送给消费者2的信道当中,消费者1每应答一条消息,队列才会将下一条消息发往消费者1的信道,类似的,消费者2也是如此。这样做的目的在于假设我们可以预估到两个消费者的处理能力,根据他们的能力让他们处理对标能力的消息。预取值的大小我们一般是不会准确把握的,只有通过反复的测试才会把握到一个接近处理能力的预取值。观察两个消费者的打印台输出情况:
设置消费者1预取值及其打印台:
/**
* 预取值
*/
int prefetchCount = 5;
channel.basicQos(prefetchCount);
设置消费者2预取值及其打印台:
/**
* 预取值
*/
int prefetchCount = 2;
channel.basicQos(prefetchCount);
六、发布确认模式
发布确认模式针对生产者,生产者将信道设置为confirm模式,此模式下,生产者发送的消息将需要RabbitMQ进行确认收到,防止生产者发送的消息丢失。发布确认模式有三种策略,分别为单个确认模式、批量确认模式、异步批量确认模式。针对以上三种策略,分别实现其生产者发送消息,观察其发送效率。
将三种模式封装成三个方法,并由主函数调用:
/**
* 队列名称
*/
private static final String QUEUE_NAME = "confirm";
/**
* 批量发送消息条数
*/
private static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.confirmSelect();//开启发布确认模式
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
/*
* 发布确认模式
* 1、单个确认
* 2、批量确认
* 3、异步批量确认
*/
publishMessageIndividually(channel);//25432
publishMessageBatch(channel);//391
asynchronousConfirmMessage(channel);//22
}
6.1、单个确认模式
单个确认模式即生产者发送一条,MQ收到后回应一条,之后生产者继续发送下一条。
//单个确认
public static void publishMessageIndividually(Channel channel) throws Exception{
//开始时间
long startTime = System.currentTimeMillis();
//批量的发消息 单个发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
//单个消息就立即进行发布确认
boolean flg = channel.waitForConfirms();
if (!flg){
System.out.println("第" + i + "条消息发送失败~");
}
}
//结束时间
long endTime = System.currentTimeMillis();
long spendTime = endTime - startTime;
System.out.println("单个确认模式发布1000条消息耗时:" + spendTime + " ms");
}
6.2、批量确认模式
批量确认模式即发送一批消息后一起确认是否收到,与上述单个确认模式相比要大大减少耗时,但是一旦某条消息未被收到,我们将很难排查到是哪条消息。
//批量确认
public static void publishMessageBatch(Channel channel) throws Exception{
//开始时间
long startTime = System.currentTimeMillis();
//批量确认消息大小
int confirmSize = 100;//每100条确认一次
//批量的发送消息 批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
//判断达到100条消息的时候批量的确认一次
if (((i + 1) % confirmSize) == 0){
boolean flg = channel.waitForConfirms();
if (!flg){
System.out.println("第" + (i - 99) + "条~第" + i + "条持久化失败~" );
}else {
System.out.println("第" + (i - 99) + "条~第" + i + "条持久化成功~" );
}
}
}
//结束时间
long endTime = System.currentTimeMillis();
long spendTime = endTime - startTime;
System.out.println("批量确认模式发布1000条消息耗时:" + spendTime + " ms");
}
6.3、异步批量确认模式
异步批量确认模式完美的规避了上两种确认模式的弊端,异步批量确认模式中生产者只负责发送消息,而无需关心消息是否发送到队列当中并进行了持久化,由MQ当中的broker负责告知消息的发送结果,生产者只需要在代码当中准备一个监听器,监听broker的消息,broker告知监听器哪些消息发送成功了,哪些消息发送失败了,我们准备一个线程安全的数据容器即可,将发送的消息放到这个容器当中。监听器当中的参数ackCallback和nackCallback会分别对容器当中的数据进行处理。ackCallback有两手准备,multiple为true时表示批量应答,此时将容器当中Key小于该deliverTag的移到临时的容器当中,反之为false时,表示只应答一条,则从容器中移除该数据。
//异步确认
public static void asynchronousConfirmMessage(Channel channel) throws Exception{
/**
* 线程安全有序的哈希表 适用于高并发的情况下
* 1.轻松的将序号和消息进行关联
* 2.可以轻松的批量删除条目 只要给到序号
* 3.支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
//消息确认成功回调函数
/**
* 1、消息的标记
* 2、是否为批量确认
*/
ConfirmCallback ackCallback = (deliveryTag,multiple) -> {
//删除已经确认的消息 剩下的就是未确认的消息
if (multiple){
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else {
outstandingConfirms.remove(deliveryTag);
}
};
//消息确认失败回调函数
/**
* 1、消息的标记
* 2、是否为批量确认
*/
ConfirmCallback nackCallback = (deliveryTag,multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
//3--->打印未确认的消息都有哪些
System.out.println("未确认的消息:序号->" + deliveryTag + " 消息体:" + message);
};
//准备消息的监听器 监听哪些消息成功了 哪些消息失败了
/**
* 1、监听哪些消息成功了
* 2、监听哪些消息失败了
*/
channel.addConfirmListener(ackCallback,nackCallback);//异步通知
//开始时间
long startTime = System.currentTimeMillis();
//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
//此处记录所有要发送的消息
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
}
//结束时间
long endTime = System.currentTimeMillis();
long spendTime = endTime - startTime;
System.out.println("异步确认模式发布1000条消息耗时:" + spendTime + " ms");
}
七、交换机
7.1、Exchanges概念
7.2、发布/订阅模式(fanout类型)
假设有这么一个场景,在网购系统当中,某家店铺需要将新产品进行推广,该产品面向中老年人退出,因此就需要将推广消息发送给该群体用户。中年群众和老年群众作为两种消费者群体,如何做到只发布一次消息就能做到这两种消费者都能收到该消息?此时,MQ就为我们提供了发布/订阅模式。
消息发送方:
public class EmitLog {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "LOGS";
/**
* 交换机类型
*/
private static final String EXCHANGE_TYPE = "fanout";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机,交换机名称、交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送方发送消息:" + message);
}
}
}
消费者1:
public class ReceviceLogs01 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "LOGS";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列 临时队列
/**
* 生成一个临时队列 队列名是随机的
* 当消费者与队列断开连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("接收方01等待接收消息......");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println(new String(message.getBody()));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
消费者2:
public class ReceviceLogs02 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "LOGS";
/**
* 交换机类型
*/
private static final String EXCHANGE_TYPE = "fanout";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列 临时队列
/**
* 生成一个临时队列 队列名是随机的
* 当消费者与队列断开连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("接收方02等待接收消息......");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println(new String(message.getBody()));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
执行结果:
fanout交换机说明:此类型的交换机可以将接收到的消息发送给所有与它绑定的队列。参考我们上面所说的轮训分发,轮训分发是将队列当中的消息发送给消费者,一条消息仅能由一个消费者进行消费,如果使用默认的交换机,则需要发送多次消息,由队列再分发给每个消费者。
总结:只要该队列和此交换机绑定,无论routing-key是什么,都会接收到生产者发送的消息。
7.3、直接交换机(Direct类型)
假设有这么一个场景,同样是网购系统,在下单时我们需要将交易金额1W以上的订单交给某支付机构A处理,5000-1W的交给支付机构B处理,5000元以下的也交给支付机构A处理。怎么做?支付机构A和支付机构B分别对应消费者1和消费者2,订单为生产者,生产者(网购系统)将订单信息发送至交换机,交换机根据生产者提供的交易金额(routing-key)决定由哪个消费者(支付机构)去处理。这就是直接交换机的用处,可以指定哪个消费者去消费此条消息。
生产者:
public class DirectLogs {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "direct_logs";
/**
* 交换机类型
*/
public static final String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));
}
}
}
消费者1:
public class ReceiveLogsDirect01 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列
channel.queueDeclare("console",false,false,false,null);
/**
* 路由绑定
*/
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println(new String(message.getBody()));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume("console",true,deliverCallback,cancelCallback);
}
}
消费者2:
public class ReceiveLogsDirect02 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//声明一个队列
channel.queueDeclare("disk",false,false,false,null);
/**
* 路由绑定
*/
channel.queueBind("disk",EXCHANGE_NAME,"error");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
System.out.println(new String(message.getBody()));
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume("disk",true,deliverCallback,cancelCallback);
}
}
避坑:队列一旦与direct类型的交换机绑定,则生产者发送消息时则不能使用队列名称来指定发送到哪个队列,只能使用routing-key来进行队列选择。
7.4、Topic交换机
Topic交换机的应用场景是非广泛,fanout交换机和direct交换机的功能都可以使用Topic交换机实现。发送到topic交换机的消息不能具有任意的 routing_key —— 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。binding key也必须采用相同的形式。topic交换机背后的逻辑 类似于direct交换机——使用特定 routing key 发送的消息将被传递到与匹配binding key绑定的所有队列。但是binding key有两个重要的特殊情况:
7.4.1、实用案例:
生产者:
public class TopicExchangePro {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topic_logs";
/**
* 交换机类型
*/
private static final String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明一个交换机
*/
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String rountingKey = scanner.next();
System.out.println("输入的rountingKey为:" + rountingKey);
String message = scanner.next();
System.out.println("输入消息体的为:" + message);
channel.basicPublish(EXCHANGE_NAME,rountingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息体:" + message + "已发送给rountKey为:" + rountingKey + "的队列");
}
}
}
消费者1:
public class TopicExchangeCon01 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明一个队列
*/
channel.queueDeclare("receive1",false,false,false,null);
/**
* 路由绑定
*/
channel.queueBind("receive1",EXCHANGE_NAME,"*.*.pink");
channel.queueBind("receive1",EXCHANGE_NAME,"green.#");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
String rountingKey = message.getEnvelope().getRoutingKey();
System.out.println("receive1接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume("receive1",true,deliverCallback,cancelCallback);
}
}
消费者2:
public class TopicExchangeCon02 {
/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
/**
* 声明一个队列
*/
channel.queueDeclare("receive2",false,false,false,null);
/**
* 路由绑定
*/
channel.queueBind("receive2",EXCHANGE_NAME,"*.red.*");
/**
* 声明 接收消息
*/
DeliverCallback deliverCallback = (consumeTag, message) ->{
String rountingKey = message.getEnvelope().getRoutingKey();
System.out.println("receive2接收消息体为:" + new String(message.getBody()) + " 其RountingKey为:" + rountingKey);
};
//取消消费的回调
CancelCallback cancelCallback = (consumeTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume("receive2",true,deliverCallback,cancelCallback);
}
}
7.4.2、结果分析及测试:
消费者1routing-key:
*.*.pink
green.#
两种匹配规则,路由key为(什么什么pink)可以发往该队列,路由key为(green什么什么什么...)也可以发往该队列。
消费者2routing-key:
*.red.*
匹配规则:路由key是(什么red什么)可以发往该队列。
测试用例:
dog.is.pink 消费者1
green.like.pink 消费者1
green.red.dog 消费者1和消费者2
green.cat 消费者1
house.red.pink 消费者1和消费者2
we.red.family 消费者2
测试结果:
生产者:
消费者1:
消费者2:
避坑:当路由key匹配到同一个队列时(即路由key匹配到同一队列的多个routing-key时),只会给队列发送一次消息。
值得注意的是:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了 。