交换机
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反, 生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单, 一方面它接收来自生产者的消息,另一方面将它们推入队列。 交换机必须确切知道如何处理收到的消息。 是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。 这就的由交换机的类型来决定。
Exchanges 的类型
- 直接(direct)
- 主题(topic)
- 标题(headers) ,
- 扇出(fanout)
声明交换机
//参数:(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANEG_NAME,type);
默认 exchange
默认交换机使用空串标识,消息发送到交换机,交换机根据路由发送到队列中。
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
channel.basicPublic("",queueName,props,message);
临时队列
临时队列,当消费者断开连接后,临时队列将被删除
临时队列声明
String queueName=channel.queueDeclare().getQueue();
绑定(bindings)
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
将队列绑定到交换机。
//参数:(队列,交换机,路由)
channel.queueBind(queueName,EXCHANGE_NAME,RouterKey);
Fanout
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
- 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)
Fanout 这种类型它是将接收到的所有消息广播到它知道的所有队列中。
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明交换机 参数(交换机,交换机类型)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列
String queue = channel.queueDeclare().getQueue();
//队列绑定交换机 参数(队列,交换机,routingKey)
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("Receive02等待消息....");
DeliverCallback askCallback=(consumerTag,message)->{
System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queue,true,askCallback,consumerTag->{});
}
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明交换机 参数(交换机,交换机类型)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列
String queue = channel.queueDeclare().getQueue();
//队列绑定交换机 参数(队列,交换机,routingKey)
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("Receive02等待消息....");
DeliverCallback askCallback=(consumerTag,message)->{
System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queue,true,askCallback,consumerTag->{});
}
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("发出消息:"+message);
}
消费者01和消费者02都将接收到生产者发送的消息
Direct exchange
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
- 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。 绑定键为 blackgreen 和的消息会被发布到队列 Q2, 其他消息类型的消息将被丢弃
多重绑定
当然如果 exchange 的绑定类型是 direct, 但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
Topics
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
这些单词可以是任意单词,比如说: “stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”.这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中
(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和出现,那么该队列绑定类型就是 direct 了
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName="Q1";
channel.queueDeclare(queueName,false,false,false,null);
//队列绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受消息...");
DeliverCallback askCallback=(consumerTag,message)->{
System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName,true,askCallback,consumerTag->{});
}
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName="Q2";
channel.queueDeclare(queueName,false,false,false,null);
//队列绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受消息...");
DeliverCallback askCallback=(consumerTag,message)->{
System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName,true,askCallback,consumerTag->{});
}
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
Map<String,String> binding=new HashMap<>();
binding.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
binding.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
binding.put("quick.orange.fox","被队列 Q1 接收到");
binding.put("lazy.brown.fox","被队列 Q2 接收到");
binding.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
binding.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
binding.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
binding.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> stringEntry : binding.entrySet()) {
String routingKey = stringEntry.getKey();
String message = stringEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:"+message);
}
}
队列
声明队列
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,arguments);
参数介绍:
1、QUEUE_NAME: 队列的名称;
2、durable: 是否持久化;
3、exclusive: 是否独享、排外的;
4、autoDelete: 是否自动删除;
5、arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
死信队列
死信,顾名思义就是无法被消费的消息 一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源 :
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
public class Consumer01 {
public static final String NORMAL_EXCHANGE="normal_exchange";
public static final String DEAD_EXCHANGE="dead_exchange";
public static final String NORMAL_QUEUE="normal_queue";
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明普通交换机和死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//正常队列绑定死信队列信息
Map<String, Object> arguments=new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
arguments.put("x-dead-letter-routing-key","lisi");
//设置队列最大长度
//arguments.put("x-max-length",6);
//声明普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通队列和死信队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
//成功时的回调
DeliverCallback askCallback=(consumerTag,message)->{
//拒绝消息 需要手动应答
//channel.basicReject();
System.out.println("Consumer01接收到的消息为:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,askCallback,consumerTag->{});
}
}
public class Consumer02 {
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
DeliverCallback askCallback=(consumerTag,message)->{
System.out.println("Consumer02接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,askCallback,consumerTag->{});
}
}
延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
RabbitMQ 中的 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-message-ttl",5000)
channel.queueDeclare(queueName,durable,exclusive,autoDelete,arguments);