[RabbitMQ] 7种工作模式详细介绍

news2025/1/12 18:52:41

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 7种工作模式介绍
    • 1.1 简单模式(simple)
    • 1.2 工作队列(Work Queue)
    • 1.3 对交换机和路由键的解释
    • 1.4 广播模式/发布订阅模式(Publish/Subscribe)
    • 1.5 路由模式(Routing)
    • 1.6 通配符模式(Topics)
    • 1.7 RPC通信(RPC)
    • 1.8 发布确认(Publish Confirms)
  • 2. 工作模式的代码实现
    • 2.1 简单模式
    • 2.2 工作队列
    • 2.3 广播模式/发布订阅模式(Publish/Subscribe)
    • 2.4 路由模式(Routing)
    • 2.5 通配符模式
    • 2.6 RPC通信(RPC)
    • 2.7 发布确认(Publisher Confirms/消息可靠性保证)
      • 2.7.1 概述
      • 2.7.2 单独确认
      • 2.7.3 批量确认
      • 2.7.4 异步确认

1. 7种工作模式介绍

在这里插入图片描述

1.1 简单模式(simple)

在这里插入图片描述
P: producer生产者,也就是要发送消息的程序.
C: consumer消费者,消息的接收者.
Queue: 消息队列,其中可以缓存信息,生产者可以向其中投递信息,消费者从其中获取信息.
特点: 一个生产者,一个消费者,一个消息只能被消费一次,也称为点对点模式.

1.2 工作队列(Work Queue)

在这里插入图片描述
一个生产者P,多个消费者C1,C2,在队列中有多个消息的情况下,work Queue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.
特点: 消息不会重复,分配给不同的消费者.

1.3 对交换机和路由键的解释

在这里插入图片描述

  • X: 代表的是交换机(exchange),作用就是生产者在将消息发送到交换机之后,交换机会按照生产者指定的RoutingKey,也就是路由规则把消息路由到一个或者多个队列中.
  • RabbitMQ中的交换机有四种类型: fanout,direct,topic,headers,不同的类型有不同的路由策略.
    1. fanout: 广播模式,将消息发送给所有绑定交换机的队列.(Publish/Subscribe模式)
    2. direct: 定向模式,将消息交给符合指定的routingKey的队列.(Routing模式)
    3. topic: 通配符模式,将消息交给符合指定的Routing pattern的队列.(Topics模式).
    4. header: headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在.
      交换机只是负责转发消息,不具有存储消息的功能,因此,如果没有任何队列和交换机绑定,或者是消息没有任何符合路由规则的队列,那么信息就会丢失.
  • RoutingKey: 路由键,生产者在消息转发给交换机的时候,指定的一个字符串,用来让交换机知道该如何处理这条消息.
  • BindingKey: 绑定键,BindingKey是把交换机和消息队列绑定的字符串,这样交换机就会知道如何根据路由键将对应的消息转发到指定的队列了.
    在这里插入图片描述
    下面我们就举一个具体的例子来对RoutingKey和BindingKey具体说明.

在发送消息的时候,生产者设置了RoutingKey为orange,消息就会路由到Q1.
在这里插入图片描述

BindingKey其实也属于RoutingKey的一种,我们为了把这两个键混淆,我们可以这样理解:
在交换机和队列绑定的时候,需要的路由键是BindingKey.在发送消息的时候,需要的路由键是RoutingKey.

1.4 广播模式/发布订阅模式(Publish/Subscribe)

在这里插入图片描述
一个生产者P,多个消费者C1,C2,X代表交换机,交换机会将消息复制多份,发送给所有和交换机绑定的队列,每个消费者接收相同的消息.生产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者.
适用场景: 消息需要被多个消费者同时接收的场景.比如: 实时通知或者广播消息.

比如国家地震局发送地震预警,地震发生的时候,需要把预警消息发送给可能有震感地区的所有电子设备.

1.5 路由模式(Routing)

在这里插入图片描述
路由模式是发布订阅模式的变种,在发布订阅模式的基础上,增加了路由key.
相比广播模式,路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定的规则分发消息的场景.
比如我们在Spring中学习的日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列.

1.6 通配符模式(Topics)

在这里插入图片描述
路由模式的升级版,在routingKey的基础上,增加了通配符的功能,使之更加灵活.其中,一个.是一个节,使用*代表的是一个节,使用#代表的是多个节.Topics和Routing的基本原理相同.不同之处是:routingKey的匹配方式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适用场景: 需要灵活匹配和过滤消息的场景.

1.7 RPC通信(RPC)

在RPC模式中没有生产者和消费者,大概就是通过两个队列实现了一个消息回调的过程.有点类似与我们在网络中学习的"请求和响应",这个功能是MQ的额外功能.
在这里插入图片描述

  1. 客户端发送消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了回调队列,用于接收服务端返回的响应.
  2. 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列.
  3. ⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.因为队列中不仅仅有一条消息,保证发送的请求和收到的响应通过correlationId对应的上.

1.8 发布确认(Publish Confirms)

Publish Confirms模式是RabbitMQ提供的一种确保消息可靠发送到RabbitMQ服务器的机制.在这种模式之下,生产者可以等待RabbitMQ服务器确认,可以确保消息已经被服务器接收并处理.

  1. 生产者将Channel设置为Confirm模式,发布的每一条消息都会获得一个唯一的ID,生产者可以将这些序号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理之后,服务器会一步地向生产者发送一个(ACK)给生产者(其中包含消息的唯一ID),表名消息已经送达.
    通过该模式,生产者可以确保消息被RabbitMQ服务器成功接收,从而避免消息丢失的问题.
    适用场景: 对数据安全性较高的场景,比如金融交易,订单处理.
    在这里插入图片描述

2. 工作模式的代码实现

前面我们对这几种工作模式有了简单的了解,接下来我们学习他们的写法.

2.1 简单模式

就是快速上手中的程序,此处忽略.

2.2 工作队列

就是简单模式的增强版,和简单模式下最大的区别就是,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被⼀个消费者接收.
在这里插入图片描述

  • 引入依赖
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.7.3</version>
</dependency>
  • 编写生产者
    工作队列模式的代码和简单模式的代码没有多大的出入.我们只是发送了10次消息.我们把发送消息的地方改为10次发送.
    有一些配置相关的东西是固定的,所以我们可以把他们单独提出来一个Constant类.
public class Constant {
    public static String HOST = "39.105.137.64";
    public static int PORT = 5672;
    public static String USER_NAME = "jiangruijia";
    public static String PASSWORD = "qwe123524";
    public static String QUEUE_NAME = "work";
    public static String VIRTUAL_HOST = "/";
}

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
        for (int i = 0; i < 10 ; i++){//发送10次消息
            String message = "hello work~" + i;
            channel.basicPublish("",Constant.QUEUE_NAME,null,message.getBytes());
        }
        channel.close();
        connection.close();
    }
}
  • 消费者代码
public class Consumer2 {//两个消费者是竞争关系
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.QUEUE_NAME,true,false,false,null);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("接收到消息" + message);
            }
        };
        channel.basicConsume(Constant.QUEUE_NAME,true,consumer);
        //先不要关闭资源,因为需要先开启消费者,等待生产者发送消息
    }
}

另一个消费者和这个消费者相同,直接复制粘贴一份.

  • 运行程序,观察结果
    先启动两个消费者(而且两个消费者不可以关闭资源),再运行生产者,如果先启动生产者,在启动第一个消费者的时候,消息会被瞬间消费完.
    在这里插入图片描述
    在这里插入图片描述

2.3 广播模式/发布订阅模式(Publish/Subscribe)

在这里插入图片描述

  • 编写生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机.
        channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);
        //声明队列
        channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
        //绑定队列与交换机
        channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");
        channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");
        String message = "发送广播消息";
        channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,message.getBytes());
        channel.close();
        connection.close();
    }
}

参数解释:
- exchangeDeclare: 第一个参数是交换机名称,第二个参数是交换机的路由规则(这里指定为FANOUT广播类型),第三个参数是是否持久化,如果设置持久化,那么在重启服务之后,交换机不会被释放,第四个参数是是否自动删除,当没有队列与其绑定的时候,它就会被删除.第五个参数是是否是内部使用的,一般情况下为false,第六个参数是指定相关参数.
- queueDeclare: 声明队列,我们在之前解释过,这里不再赘述.
- queueBind: 绑定队列与交换机,第一个参数是队列名称,第二个参数是交换机名称,第三个参数是交换机和队列之间的路由规则,在这里我们是广播模式,所以我们没有指定路由规则,指定为默认的"".
- basicPublish: 第一个参数是发送消息的交换机,第二个参数是发送消息时的路由关键字,这里为广播模式,所以我们指定为"",第三个参数是一些相关配置,第四个参数是发送的消息.

  • 编写消费者代码
    虽然在消费者中可以不用声明队列,但是为有时候生产者和消费者不会在一个主机上,我们还是加上队列的声明比较符合逻辑,队列存在的时候,不会重复创建队列.
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //队列声明可以省略,如果队列已经存在,则不会创建队列
        channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("获取到广播消息:"+message);
            }
        };
        channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);
    }
}

第二个消费者直接复制一份,改掉消费者于队列的绑定和队列的声明即可.

  • 运行程序
    首先启动两个消费者,再运行生产者.
    我们看到生产者生产出两条消息之后,迅速被消费者消费掉.
    在这里插入图片描述
    两个消费者接收到了消息:
    在这里插入图片描述
    在这里插入图片描述

2.4 路由模式(Routing)

相比于发布订阅模式,交换机和队列不可以是任意绑定了==,而是需要指定一个BindingKey(RoutingKey的一种)==.生产者在向交换机发送消息的时候,也需要指定RoutingKey.这时,Exchange也不再把消息交给每⼀个绑定的key,而是根据消息RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全⼀致,才会接收到消息.
在这里插入图片描述

  • 编写生产者
    与发布订阅模式的不同是,交换机类型不同,而且绑定队列的bindingKey不同.
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(Constant.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
        channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.ROUTING_QUEUE2,true,false,false,null);
        channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");
        channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"a");
        channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"b");
        channel.queueBind(Constant.ROUTING_QUEUE2,Constant.ROUTING_EXCHANGE,"c");
        String message1 = "routingKey_a";
        String message2 = "routingKey_b";
        String message3 = "routingKey_c";
        channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());
        channel.basicPublish(Constant.ROUTING_EXCHANGE,"b",null,message2.getBytes());
        channel.basicPublish(Constant.ROUTING_EXCHANGE,"c",null,message3.getBytes());
        connection.close();
        channel.close();
    }
}

和上面广播模式不同的是,在绑定队列与交换机的时候,需要指定bindingKey.channel.queueBind(Constant.ROUTING_QUEUE1,Constant.ROUTING_EXCHANGE,"a");比如这一行指定了bindingKey为a.
在发送消息的时候,需要指定消息的RoutingKey,比如:channel.basicPublish(Constant.ROUTING_EXCHANGE,"a",null,message1.getBytes());.

  • 编写消费者
    Routing模式的消费者代码和Publish/Subscribe代码⼀样,同样复制出来两份.修改消费的队列名称就可以.
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.ROUTING_QUEUE1,true,false,false,null);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("接收到消息:"+message);
            }
        };
        channel.basicConsume(Constant.ROUTING_QUEUE1,true,consumer);
    }
}
  • 运行代码
    在这里插入图片描述
    队列中的消息被消费.
    队列2中的消息:
    在这里插入图片描述
    队列1中的消息:
    在这里插入图片描述

2.5 通配符模式

Topic和Routing模式的区别就是:
1. Topic模式使用的交换机类型为topic(Routing模式使⽤的交换机类型为direct).
2. Topic类型的交换机在匹配规则上进行了扩展,bindingKey支持通配符匹配.
在这里插入图片描述
在Topic类型的交换机在匹配的规则上,有一些要求:
1. RoutingKey是一系列由.分割的单词,比如"a.b.c".
2. BindingKey和RoutingKey一样,也是点.分割的字符串.
3. BindingKey中可以存在两种特殊字符串,用于模糊匹配.其中*表示一个单词,#表示多个单词.
比如:
• BindingKey为"d.a.b"会同时路由到Q1和Q2.
• BindingKey为"d.a.f"会路由到Q1.
• BindingKey为"c.e.f"会路由到Q2.
• BindingKey为"d.b.f"会被丢弃,或者返回给⽣产者(需要设置mandatory参数).
接下来我们就来实现Topic模式:

  • 编写生产者
    和路由模式最大的区别就是: 交换机类型不同,绑定队列的RoutingKey不同.
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
        channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);//交换机类型不同
        channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");
        channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");
        channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");//bindingKey不同
        String message1 = "hello_b.a.c";
        String message2 = "hello_a.c.b";
        String message3 = "hello_c.a.b";
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"b.a.c",null,message1.getBytes());
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"a.c.b",null,message2.getBytes());
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.a.b",null,message3.getBytes());
        channel.close();
        connection.close();
    }
}
  • 编写消费者
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("收到消息: "+s);
            }
        };
        channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);
    }
}

第二个消费者直接修改一下队列名称就可以.

  • 运行结果
    队列产生消息,并消费
    在这里插入图片描述
    消费者根据RoutingKey收到了关键字:
    消费者2:
    在这里插入图片描述
    消费者1:
    在这里插入图片描述

2.6 RPC通信(RPC)

RPC通信,是远程过程调用,它是一种发送请求,得到响应的模式,有点类似与我们之前学习网络时候的http协议.
RabbitMQ实现RPC通信的过程,大概是通过两个队列实现⼀个可回调的过程.
在这里插入图片描述
在这个模式中,没有明确的生产者和消费者,在发送请求的时候,客户端是生产者,服务端是消费者,在接收响应的时候,服务端是生产者,客户端是消费者.
大概的流程如下:

  1. 客户端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队列,在消息中还会设置correlate_id,这个字段用于保证接收到的响应是与请求对应的响应,服务端处理后,会把响应结果发送到这个队列.
  2. 服务端接收到消息之后,处理请求并发送响应消息到replyTo指定的回调队列.
  3. 客户端在回调队列上等待响应消息.⼀旦收到响应,客户端会检查消息的correlationId属性,以确保它是所期望的响应.

接下来我们来实现PCR模式:

  • 客户端代码
    客户端代码的主要流程如下:
    首先声明两个队列,包含回调队列replyQueueName,和发送请求的队列.并声明本次请求的唯一标志corrld.之后将replyQueueName和corrld配置到要发送的消息队列中.之后使用阻塞队列来阻塞当前的进程,监听回调队列中的消息,把请求放到阻塞队列中.阻塞队列中有消息之后,主线程被唤醒,打印返回的内容.
public class Client {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constant.SRC_RESPONSE_QUEUE,true,false,false,null);
        //发送请求
        String corrId = UUID.randomUUID().toString();
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .correlationId(corrId)
                .replyTo(Constant.SRC_RESPONSE_QUEUE)//指定相关属性
                .build();
        String request = "发送请求";
        //如果没有交换机的时候,RoutingKey就是队列的名称
        channel.basicPublish("",Constant.SRC_REQUEST_QUEUE,properties,request.getBytes());
        //接收请求
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if(corrId.equals(properties.getCorrelationId())){//判断corrId是否相等
                    queue.offer(new String(body));
                }
            }
        };
        channel.basicConsume(Constant.SRC_RESPONSE_QUEUE,true,consumer);
        String ret = queue.take();
        System.out.println("收到请求:" + ret);
        channel.close();
        connection.close();
    }
}

  • 服务端代码
    服务端需要做的就是:接收消息,根据消息内容进行响应处理,把应答结果返回到回调队列中.
public class Server {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASSWORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constant.SRC_REQUEST_QUEUE,true,false,false,null);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功接收到请求:"+new String(body));
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                String message = "返回响应";
                channel.basicPublish("",Constant.SRC_RESPONSE_QUEUE,basicProperties,message.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Constant.SRC_REQUEST_QUEUE,false,consumer);//设置收到消息之后不自动应答,在发送响应之后手动应答
    }
}

RabbitMQ消息确定的机制:
在RabbitMQ中,basicConsumer方法的autoAck参数用于指定消费者是否应该自动向消息队列确认消息.
自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck方法来确认消息.手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景.

  • 运行程序
    在这里插入图片描述
    在这里插入图片描述

2.7 发布确认(Publisher Confirms/消息可靠性保证)

2.7.1 概述

作为消息中间件,都会面临丢失的问题.
消息丢失大概分为三种类型:

  1. 生产者问题.因为应用程序故障,网络都用等原因,生产者没有成功向Broker发送消息.
  2. 消息中间件自身问题,生产者成功发送给了Broker,但是Broker没有吧消息保存好,导致了信息丢失.
  3. 消费者问题,Broker发送消息到消费者,消费者在消费消息的时候,没有处理好,导致Broker将消费失败的消息从队列中删除了.
    在这里插入图片描述
    上面的这几个问题都有对应的解决方式,问题2可以通过持久化的机制来解决,问题3可以通过消息应答机制来解决,问题1,可以采用发布确认的机制来实现.
    发布确认属于RabbitMQ的七大工作模式之一.
    生产者会将Channel设置为Confirm模式,一旦信道进入Confirm模式,所有在该信道上面的消息都会被指派为一个唯一的Id.一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经确认到达目的队列了.Broker回传给生产者的确认消息中包含了确认消息的序号,此外Broker也可以设置channel.basicAck方法中的multiple参数,true表示在deliveryTag序号之前的消息都已经收到了,如果为false,那么则有消息没有收到,消息确认出了一些问题.
    在这里插入图片描述
    使用发送机制的时候,必须要将信道设置为Confirm模式.发布确认有3种策略,单独确认,批量确认,异步确认,接下来我们就来学习着三种策略.
    下面是开启信道确认模式的方法.
Channel channel = connection.createChannel();
channel.confirmSelect();//开启信道确认模式

2.7.2 单独确认

首先我们需要建立连接,建立连接需要放入try语句中,所以我们可以把建立连接单独提出一个静态方法.

private static Connection createConnection() throws IOException, TimeoutException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(Constant.HOST);
    connectionFactory.setPort(Constant.PORT);
    connectionFactory.setUsername(Constant.USER_NAME);
    connectionFactory.setPassword(Constant.PASSWORD);
    connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
    return connectionFactory.newConnection();
}

之后在主方法中调用三种策略,

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    publishingMessagesIndividually();
    publishMessagesInBatch();
    handlePublishConfirmsAsynchronously();
}

之后我们来编写单条确认模式:

    /**
     * 单条确认
     */
private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = createConnection()){//首先建立连接
        Channel channel = connection.createChannel();
        channel.confirmSelect();//开启信道确认模式
        channel.queueDeclare(Constant.CONFIRM_QUEUE1,true,false,false,null);//创建队列
        String message = "发送信息";
        for (int i = 0; i < 200; i++){
            channel.basicPublish("",Constant.CONFIRM_QUEUE1,null,(message+i).getBytes());
            channel.waitForConfirms(5000);//等待消息确认,如果超过规定的等待时间还没有确认,则抛出异常
        }
    }
}

在编写代码之前,我们首先要使用try语句与服务器建立连接.
之后在创建Channel之后,需要开启信道的确认模式.channel.confirmSelect().
这里我们在发送消息之后,需要做的最重要的一件事就是等待消息的确认channel.waitForConfirms(5000),在这个方法中可以指定阻塞时间,如果在指定的时间内消息被确认,这个方法就会立即返回,如果在指定时间之内没有确认消息,则会抛出异常.

2.7.3 批量确认

/**
 * 批量确认
 */
private static void publishMessagesInBatch() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = createConnection()){
        Channel channel = connection.createChannel();
        channel.confirmSelect();//设置为确认模式
        channel.queueDeclare(Constant.CONFIRM_QUEUE2,true,false,false,null);
        String message = "批量确认发送消息";
        int batchSize = 100;//每次批量发送的消息条数
        int outstandingMessageCount = 0;//记录已经发送的条数
        for (int i = 0 ; i < 200 ; i++){
            channel.basicPublish("",Constant.CONFIRM_QUEUE2,null,(message+i).getBytes());
            outstandingMessageCount++;//每发送一条消息,参数就进行++
            if(outstandingMessageCount == batchSize){//达到了一次性批量发送的指定数量,等待确认,确认完成之后,将参数清零
                channel.waitForConfirms(5000);
                outstandingMessageCount = 0;
            }
        }
        //如果发送的消息不是100的倍数,就还有消息没有确认
        if (outstandingMessageCount > 0){
            channel.waitForConfirms(5000);
        }
    }
}

这里需要注意的几点就是,在发送的消息达到一次性最大的批量数量的时候,就要确认,如果确认成功之后,需要把记录的发送数量清零.
之后,就是在出循环之后,如果发送的消息条数不是batchSize的整数倍的时候,这时候不满足循环之内的if条件,还是有一些消息没有确认完成,就需要在循环之外再次进行确认.

2.7.4 异步确认

异步确认就是,生产者在发送消息的同时,还可以确认消息是否收到.
Channel接口中为我们提供了一个方法,addConfirmListener.这个方法可以添加ConfirmListener回调接口.
ConfirmListener中包含两个方法: handleAck(long deliveryTag, boolean multiple)handleNack(long deliveryTag, boolean multiple),分别对应处理的是MQ发送给生产者的ack和nack.其中ack代表的是消息确认成功,即消息都收到的情况下,nack指的是消息在确认的时候出现了一些问题.deliveryTag表示的是发送消息的序号,multiple表示是否批量确认.
这里我们还需要一个有序集合来存储为确认的消息.

/**
 * 异步确认
 */
private static void handlePublishConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = createConnection()){
        Channel channel = connection.createChannel();
        channel.confirmSelect();
        channel.queueDeclare(Constant.CONFIRM_QUEUE3,true,false,false,null);
        SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());//设置一个有序集合,用来存储未确认消息的序号
        channel.addConfirmListener(new ConfirmListener() {//为信道添加监听器,监听消息的确认情况
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {//表示消息被成功确认
                if (multiple){//判断消息的处理是否批量
                    set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
                }else {
                    set.remove(set.last());//如果不是批量,清除最后一个即可
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){//判断消息的处理是否批量
                    set.headSet(deliveryTag+1).clear();//将小于deliveryTag的消息全部清除,证明这批消息已经被ack了
                }else {
                    set.remove(set.last());//如果不是批量,清除最后一个即可
                }
                //这里在消息确认不成功的时候,需要重传,这里省略
            }
        });
        String message = "异步确认发送消息";
        for (int i = 0 ;i < 200 ;i++){
            long nextPublishSeqNo = channel.getNextPublishSeqNo();//获取到消息发送的序号
            channel.basicPublish("",Constant.CONFIRM_QUEUE3,null,(message+i).getBytes());
            set.add(nextPublishSeqNo);//把这些消息都添加到集合中
        }
        while (!set.isEmpty()){//等待集合中的消息都被确认完成
            Thread.sleep(1000);
        }
    }
}

这里我们在消息确认成功之后,即handleAck方法被调用的时候,需要把这些消息都从集合中清除掉.一种是批量的情况,直接清除掉deliveryTag之前所有的消息,另一种是没有批量的情况,直接清除掉最后一个元素即可.当然在没有确认成功的情况下,我们需要根据具体的业务逻辑进行消息的重发.在给队列中发送消息的时候,我们需要从Channel中获取到下次发送消息开始的序号,之后我们把开始的序号放入set中,代表这些消息还没有被处理过.最后我们需要等待消息确认完成,只要存放未确认消息的set中不为空,就证明还有消息没有被确认,我们就进行阻塞等待.
上面三种方式中,假如发送的消息较多,这三种策略的执行时间:单个确认>批量确认>异步确认.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2178889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】如何用shell脚本一键安装Java和Maven环境

Shell脚本安装环境 前言脚本Java安装脚本使用方式 Java卸载脚本Maven安装脚本Maven卸载脚本 前言 无论是在云服务器上部署Java项目 还是在本地的Linux虚拟机上运行Java项目 都需要Java的环境 设置环境则需要一些繁琐的操作 为了简化并复用这些操作 我们可以封装这些操作为一个…

AD导出gerber文件(光绘文件)

第一步&#xff1a; 英寸 2:5 勾选你想显示的层 默认默认 第二步&#xff1a; 第三步&#xff1a; 默认

开关电源为什么要进行负载测试,负载测试都包含哪些项目?

开关电源在现代电子设备中占据着重要的地位&#xff0c;其性能的稳定性和可靠性直接影响着电子设备的正常运行。为了确保开关电源的质量&#xff0c;需要对其进行负载测试。负载测试可以模拟实际工作环境中的负载情况&#xff0c;检测开关电源在不同负载条件下的输出特性、稳定…

如何创建虚拟环境并实现目标检测及验证能否GPU加速

创建虚拟环境&#xff1a; 先创建一个虚拟python环境&#xff0c;敲如下代码 然后再到该虚拟环境里面安装自己想要的包 激活虚拟环境 然后再聚类训练这些 验证GPU加速 阿里源 pip install torch torchvision -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mir…

Python的风格应该是怎样的?除语法外,有哪些规范?

写代码不那么pythonic风格的&#xff0c;多多少少都会让人有点难受。 什么是pythonic呢&#xff1f;简而言之&#xff0c;这是一种写代码时遵守的规范&#xff0c;主打简洁、清晰、可读性高&#xff0c;符合PEP 8&#xff08;Python代码样式指南&#xff09;约定的模式。 Pyth…

线段树及应用

目录 1. 线段树基础 &#xff08;1&#xff09;什么是线段树 &#xff08;2&#xff09;线段树的拆分原理 &#xff08;3&#xff09;相关算法对比 &#xff08;4&#xff09;线段树的使用前提 &#xff08;5&#xff09;线段树建树操作 &#xff08;6&#xff09;线段树…

Unity 编辑器多开

开发多人联机的功能时大多数会遇到测试机不方便的问题。想多开同一个项目Uinty又禁止。。。因为在使用Unity Editor打开一个项目时&#xff0c;Unity Editor会在项目目录建立一个Temp目录&#xff0c;同时对里面的一个UnityLockfile文件进行加锁。SO...可以使用以下方法进行多开…

macOS 开发环境配置与应用开发

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

【CSS Tricks】css动画详解

目录 引言动画关键帧序列动画各属性拆解1. animation-name2. animation-duration3. animation-delay设置delay为正值设置delay为负值 4. animation-direction5. animation-iteration-count6. animation-fill-mode7. animation-play-state8. animation-timing-function非阶跃函数…

【2025】基于Django的鱼类科普网站(源码+文档+调试+答疑)

文章目录 一、基于Django的鱼类科普网站-项目介绍二、基于Django的鱼类科普网站-开发环境三、基于Django的鱼类科普网站-系统展示四、基于Django的鱼类科普网站-代码展示五、基于Django的鱼类科普网站-项目文档展示六、基于Django的鱼类科普网站-项目总结 大家可以帮忙点赞、收…

B3621 枚举元组

1.递归的具体过程&#xff0c;一个dfs1&#xff0c;产生3个dfs2&#xff0c;一个dfs2产生3个dfs3&#xff0c;一共输出9个&#xff08;用n2&#xff0c;k3举例&#xff09; 2.要记得使用return 结束当前递归 #include<bits/stdc.h> using namespace std; int n, k, a[10…

telnet发送邮件教程:安全配置与操作指南?

telnet发送邮件的详细步骤&#xff1f;怎么用telnet命令发邮件&#xff1f; 尽管现代邮件客户端和服务器提供了丰富的功能和安全性保障&#xff0c;但在某些特定场景下&#xff0c;了解如何使用telnet发送邮件仍然是一项有价值的技能。AokSend将详细介绍如何安全配置和操作tel…

英集芯IP5911:集成锂电池充电管理和检测唤醒功能的低功耗8位MCU芯片

英集芯IP5911是一款集成锂电池充电管理、咪头检测唤醒、负载电阻插拔和阻值检测等功能的8bit MCU芯片。其封装采用QFN16&#xff0c;应用时仅需极少的外围器件&#xff0c;就能够有效减小整体方案的尺寸&#xff0c;降低BOM成本&#xff0c;为小型电子设备提供高集成度的解决方…

QT 开发日志:QT 布局管理 —— 如何使用布局器组织 UI 元素

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

探索高效免费的PDF转Word工具,开启便捷办公之旅

无论是为了方便对文档内容进行编辑、修改&#xff0c;还是为了更好地适应不同的工作和学习场景&#xff0c;将 PDF 文档转换为可编辑的 Word 格式都具有重要意义。今天我就分享几款pdf转换成word免费版工具来解决大家的困扰。 1.Foxit PDF转换大师 链接一下>>https://w…

系统架构设计师-知识产权与标准化

目录 一、保护范围与对象 二、保护期限 三、知识产权人确定 四、侵权判断 五、标准化 一、保护范围与对象 知识产权是权利人依法就下列课题享有的专有权利&#xff1a; &#xff08;一&#xff09;作品&#xff08;著作&#xff09; &#xff08;二&#xff09;发明、实用…

通过OpenScada在ARMxy边缘计算网关上实现远程监控

随着工业互联网技术的发展&#xff0c;边缘计算逐渐成为连接物理世界与数字世界的桥梁。在众多边缘计算设备中&#xff0c;ARMxy BL340系列因其强大的性能、灵活的I/O配置及广泛的适用性&#xff0c;成为了工业控制、物联网关等领域的优选方案之一。 一、ARMxy BL340系列概述 …

波导阵列天线 馈电网络2 一种使用有着多反射零点的T型结的毫米波48%带宽高增益3D打印天线阵列

摘要&#xff1a; 一种设计毫米波宽带大规模天线阵列的创新方法被提出了&#xff0c;其使用有着多个反射零点的波导T型结来构建一个H型全公共馈网。通过联合优化反射零点的性质&#xff0c;可以减弱馈网中不期望的小反射的同相叠加&#xff0c;因此提升阵列的带宽。调研了合成有…

04 B-树

目录 常见的搜索结构B-树概念B-树的插入分析B-树的插入实现B树和B*树B-树的应用 1. 常见的搜索结构 种类数据格式时间复杂度顺序查找无要求O(N)二分查找有序O( l o g 2 N log_2N log2​N)二分搜索树无要求O(N)二叉平衡树无要求O( l o g 2 N log_2N log2​N)哈希无要求O(1) 以…

[网络]NAT、代理服务、内网穿透、内网打洞

目录 一、NAT 1.1 NAT 技术背景 1.2 NAT IP 转换过程 1.3 NAPT&#xff08;Network Address Port Translation&#xff09; 1.地址转换表 2. NAPT&#xff08;网络地址端口转换Network Address Port Translation&#xff09; 3. NAT技术的缺陷 二、代理服务器 2.1 正向…