RabbitMQ 应用

news2025/1/20 1:08:01

目录

1. 7种工作模式

1.1 Simple(简单模式)

1.2 Work Queue(工作队列)

1.3 Publish/Subscribe(发布/订阅)

1.4 Routing(路由模式)

1.5 Topics(通配符模式)

1.6 RPC(RPC 通信)

1.7 Publisher Confirms(发布确认)

2. 工作模式的使用案例

2.1 简单模式

2.2 Work Queue(工作队列)

2.3 Publish/Subscribe(发布 / 订阅)

2.4 Routing(路由模式)

2.5 Topics(通配符模式)

2.6 RPC(RPC 模式)

2.7 Publisher Confirms(发布确认)


1. 7种工作模式

1.1 Simple(简单模式)

P:生产者,发送消息的程序

C:消费者,消息的接收者

Queue:消息队列,可以缓存消息,生产者给里面发送消息,消费者从里面取出消息

一个生产者,一个消费者,也称为点对点(Point-to-Point)模式

适用场景:消息只能被单个消费者处理

1.2 Work Queue(工作队列)

一个生产者,多个消费者,在多个消息的情况下,Work Queue 会将消息分发给不同的消费者,每个消费者都会收到不同的消息

例如队列中有 10 条消息,两个消费者共同消费者 10 条消息,消息不会重复消费

使用场景:集群环境中做异步处理

比如 12306 短信通知服务,订票成功后,订单消息会发送到 RabbitMQ,短信服务从 RabbitMQ 中获取订单信息,并发送通知短信

1.3 Publish/Subscribe(发布/订阅)

Exchange:交换机(X)

作用:生产者将消息发送到 X,由交换机将消息按一定规则路由到一个或者多个队列中

RabbitMQ 交换机由 4 中类型,fanout、direct、topic、headers,不同类型有着不同的路由策略

Fanout:广播,把消息交给所有绑定到交换机的队列(Publish/Subscribe 模式)

Direct:定向,把消息交给符合指定 routing key 的队列(Routing 模式)

Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics 模式)

headers 类型的交换机不依赖路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 树形来进行匹配,headers 类型的交换机性能很差,基本不实用

其中 Exchange 只负责发消息,不具备存储消息的能力,如果没有任何队列与 Exchange 绑定或者没有符合路由规则的队列,那么消息就会丢失

RoutingKey:路由键,生产者将消息发送给交换机时,指定一个字符串,用来告诉交换机应该如何处理这个消息

BindingKey:绑定,RabbitMQ 中通过 Binding(绑定),将交换机与队列关联起来,在绑定时指定一个 BindingKey,这样 RabbitMQ 就知道如何正确地将消息路由到队列了

例如当 RoutingKey 为 Bining Key1 时,消息就会路由到第一个队列,为 Binding Key2 时,消息路由到第二个队列

Publish/Subscribe 模式中,一个生产者 P,多个消费者 C1,C2,X 代表交换机消息复制多分,每个消费者接收相同的消息

适合场景:消息需要被多个消费者同时接收的场景,如:实时通知或者广播消息

1.4 Routing(路由模式)

路由模式是发布订阅模式的变化,在发布订阅的基础上,增加路由 key,发布模式是将所有消息分发给所有的消费者,路由模式是 Exchange 根据 RoutingKey 的规则,将数据删选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景

例如,打印日志,日志的等级为 error、warning、info、debug,就可以通过这种模式,把不同的日志发送到不同的队列

1.5 Topics(通配符模式)

路由模式的升级,在 routingKey 的基础上,增加了通配符的功能,Topics 和 Routing 的基本原理相同,生产者将消息发送给交换机,交换机根据 RoutingKey 将消息转发给与 RoutingKey 匹配的队列

Routing 模式是相等匹配,而 Topics 模式是通配符匹配

适合场景:需要灵活配置和过滤消息的场景

1.6 RPC(RPC 通信)

 

在 RPC 通信过程中,没有生产者和消费者,通过两个队列实现了一个可回调的过程

 

1)客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,用于接收服务器的响应

2)服务器接收到请求后,处理请求并发送消息到 replyTo 指定的回调队列

3)客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息 correlationId 属性,以确保它是所期望的响应

1.7 Publisher Confirms(发布确认)

Publisher Confirms 模式是 RabbitMQ 提供的一种消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理

1)生产者将 Channel 设置为 confirm 模式后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,一遍跟踪消息的状态

2)当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个(ACK)给生产者,表达消息已经发送

适用场景:对数据安全性要求较高的场景

2. 工作模式的使用案例

2.1 简单模式

简单模式在上一篇写了,这里省略

2.2 Work Queue(工作队列)

工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收

步骤:

1)引入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

2)编写生产者代码

定义常量类,把端口,账号,密码都设置好

public class Constant {
    public static final String HOST = "44.34.51.65";
    public static final int PORT = 5672;
    public static final String USER_NAME = "lk";
    public static final String PASS_WORD = "lk";
    public static final String VIRTUAL_HOST = "study";

    //工作队列模式
    public static final String WORK_QUEUE = "work.queue";
}
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constant.USER_NAME); //账号
        connectionFactory.setPassword(Constant.PASS_WORD); //密码
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列  使用内置交换机
        //如果队列不存在,则创建,如果队列存在,则不创建
        channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
        //4. 发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello work queue" + i;
            channel.basicPublish("",Constant.WORK_QUEUE,null,msg.getBytes());
        }
        System.out.println("消息发送成功");
        //6. 释放资源
        channel.close();
        connection.close();
    }
}

3)编写消费者代码

public class Comsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.WORK_QUEUE,true,consumer);
    }
}
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.WORK_QUEUE,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.WORK_QUEUE,true,consumer);
    }
}

运行程序,观察结果

由于消息较少,处理较快,如果先启动生产者,在启动消费者,第一个消费者就会瞬间把 10 条消息消费掉,因此先启动消费者

Consumer1:

Consumer2: 

 可以看到有两个消费者,消费的消息都是不同的

2.3 Publish/Subscribe(发布 / 订阅)

在发布 / 订阅模型中,多了一个 Exchange 角色

1)编写生产者代码

需要创建交换机,并且绑定队列和交换机 

public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constant.USER_NAME); //账号
        connectionFactory.setPassword(Constant.PASS_WORD); //密码
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        /**
         *      exchange:交换机
         *      type:交换机类型
         *      durable:持久化
         */
        channel.exchangeDeclare(Constant.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
        //4. 声明队列
        channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
        //5. 交换机和队列绑定
        /**
         *      queue:队列
         *      exchange:交换机
         *      routingKey:
         */
        channel.queueBind(Constant.FANOUT_QUEUE1,Constant.FANOUT_EXCHANGE,"");
        channel.queueBind(Constant.FANOUT_QUEUE2,Constant.FANOUT_EXCHANGE,"");
        //6. 发布消息
        String msg = "hello fanout";
        channel.basicPublish(Constant.FANOUT_EXCHANGE,"",null,msg.getBytes());
        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }
}

2)编写消费者代码

public class Comsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.FANOUT_QUEUE1,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.FANOUT_QUEUE1,true,consumer);
    }
}
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.FANOUT_QUEUE2,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.FANOUT_QUEUE2,true,consumer);
    }
}

运行程序,观察结果

生产者

消费者1

消费者2

两个队列分别有了一条消息

Exchange 多了队列的绑定关系

2.4 Routing(路由模式)

和发布 / 订阅模式不同的是,队列和交换机的绑定,不能是任意绑定了,而是要指定一个 BindingKey(RoutingKey 的一种),消息的发送方向 Exchange 发送消息时,也需要指定消息的 RoutingKey,交换机需要根据消息的 RoutingKey 进行判断,只有队列绑定时的 Binding 和发送消息的 RoutingKey 完全一致,才会接收到消息

1)编写生产者代码

public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constant.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
        //4. 声明队列
        channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null);
        //5. 绑定交换机和队列
        channel.queueBind(Constant.DIRECT_QUEUE1,Constant.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"a");
        channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"b");
        channel.queueBind(Constant.DIRECT_QUEUE2,Constant.DIRECT_EXCHANGE,"c");
        //6. 发送消息
        String msg = "hello direct, my routingKey is a";
        channel.basicPublish(Constant.DIRECT_EXCHANGE,"a",null,msg.getBytes());

        msg = "hello direct, my routingKey is b";
        channel.basicPublish(Constant.DIRECT_EXCHANGE,"b",null,msg.getBytes());

        msg = "hello direct, my routingKey is c";
        channel.basicPublish(Constant.DIRECT_EXCHANGE,"c",null,msg.getBytes());
        System.out.println("发送消息成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

2)编写消费者代码

public class Comsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.DIRECT_QUEUE1,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.DIRECT_QUEUE1,true,consumer);
    }
}
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.DIRECT_QUEUE2,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.DIRECT_QUEUE2,true,consumer);
    }
}

运行程序,观察结果

生产者

消费者1

消费者2

可以看到队列 1 里面有一条消息,队列 2 里面有 3 条消息,符合路由模式

exchange 下队列和 RoutingKey 的绑定关系

2.5 Topics(通配符模式)

Topics 模式使用的交换机类型为 topic,Topics 类型在匹配规则上进行了扩展,BindingKey 支持通配符匹配

Topics 类型的交换机在匹配上的规则:

1)RoutingKey 是一系列由(.)分割的单词,例如"stock.usd.nyse"

2)BindingKey 和 RoutingKey 一样,也是(.)分割的字符串

3)BindingKey 中可以存在两种特殊的字符串,用于模糊匹配

*表示一个单词,#表示多个单词 

1)编写生产者代码

public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constant.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
        //4. 声明队列
        channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
        //5. 绑定交换机和队列
        channel.queueBind(Constant.TOPIC_QUEUE1,Constant.TOPIC_EXCHANGE,"*.a.*");
        channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"*.*.b");
        channel.queueBind(Constant.TOPIC_QUEUE2,Constant.TOPIC_EXCHANGE,"c.#");
        //6. 发送消息
        String msg = "hello topic, my routingKey is ae.a.f";
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"ae.a.f",null,msg.getBytes()); //发送到队列1

        msg = "hello direct, my routingKey is ef.a.b";
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"ef.a.b",null,msg.getBytes());//发送到队列1和队列2

        msg = "hello direct, my routingKey is c.ef.d";
        channel.basicPublish(Constant.TOPIC_EXCHANGE,"c.ef.d",null,msg.getBytes()); //发送到队列2
        System.out.println("发送消息成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

2)编写消费者代码 

public class Comsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.TOPIC_QUEUE1,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.TOPIC_QUEUE1,true,consumer);
    }
}
public class Comsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constant.TOPIC_QUEUE2,true,false,false,null);
        //4. 消费队列
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到的消息:" + new String(body));
            }
        };
        channel.basicConsume(Constant.TOPIC_QUEUE2,true,consumer);
    }
}

 运行程序,观察结果

可以看到队列的消息数

消费者1

消费者2

2.6 RPC(RPC 模式)

通过两个队列实现一个可回调的过程

1)编写客户端代码

客户端代码流程:

声明两个队列,包含回调队列 replyQueueName ,声明本次请求的唯一标志 corrId

将 replyQueueName 和 corrId 配置到要发送的消息队列中

使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中

阻塞队列由消息后,主线程被唤醒,打印返回内容

public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
/**
 * RPC 客户端
 *  1. 发送请求
 *  2. 接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(Constant.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constant.RPC_RESPONSE_QUEUE,true,false,false,null);
        //3. 发送请求
        String msg = "hello rpc";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constant.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("",Constant.RPC_REQUEST_QUEUE,properties,msg.getBytes());

        //4. 接收消息
        //使用阻塞队列来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String resMsg = new String(body);
                System.out.println("接收到回调消息:" + resMsg);
                if (correlationID.equals(properties.getCorrelationId())) {
                    //说明 correlationID 校验一致
                    response.offer(resMsg);
                }
            }
        };

        channel.basicConsume(Constant.RPC_RESPONSE_QUEUE,true,consumer);
        String result = response.take();
        System.out.println("RPC Client响应结果:" + result);
    }
}

2)编写服务端代码

/**
 * RPC server
 *  1. 接收请求
 *  2. 发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body);
                System.out.println("接收到请求:" + request);
                String response = "针对 request:" + request + "响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("",Constant.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Constant.RPC_REQUEST_QUEUE,false,consumer);
    }
}

先运行客户端

可以看到客户端这边产生了阻塞,客户端是 request 的生产者,是 response 的消费者

运行服务端

 

可以看到服务器端接收到了请求之后,发送响应,客户端这边正确收到了响应

2.7 Publisher Confirms(发布确认)

作为消息中间件,都会面临消息丢失的问题,其中消息丢失分为三种情况:

1)生产者问题,因为程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息

2)消息中间件自身问题,生产者成功发送给了 Broker,但是 Broker 没有把消息保存好导致丢失

3)消费者问题,Broker 发送消息到消费者,因为消费者没有处理好,导致 Broker 将消息从队列中删除了

情况1,可以采用发布确认机制实现

生产者将信道设置为 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),使生产者直到消息已经正确到达目的队列了

其中 deliveryTag 包含了确认消息的序号,此处 broker 也可以设置 channel.basicAck 方法中的 multiple参数,标识这个序号之前的所有消息都已经得到了处理

发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息

1)当消息最终确认之后,生产者可以通过回调方法来处理该确认消息

2)如果 RabbitMQ 因为自身错误导致消息丢失们就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令

发布确认有 3 中策略 Publishing Messages Individually(单独确认)、Publishing Messages in Batches(批量确认)、Handling Publisher Confirms Asynchronously(异步确认),此处将 3 种代码写在一起

public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirm.queue1";
public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirm.queue2";
public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirm.queue3";
public class PublisherConfirm {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constant.HOST);
        connectionFactory.setPort(Constant.PORT);
        connectionFactory.setUsername(Constant.USER_NAME);
        connectionFactory.setPassword(Constant.PASS_WORD);
        connectionFactory.setVirtualHost(Constant.VIRTUAL_HOST);
        return connectionFactory.newConnection();
    }
    public static void main(String[] args) throws Exception {
        //单独确认
        publishingMessagesIndividually();
        //批量确认
        publishingMessagesInBatches();
        //异步确认
        handlingPublisherConfirmsAsynchronously();
    }

    /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception {
        try (Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为 confirm 模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
            //4. 监听 confirm
            //集合中存储的是未确认的消息 ID
            long start = System.currentTimeMillis();
            //有序集合,元素按照自然顺序进行排序,存储未 confirm 消息序号
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // multiple 批量
                    // confirmSeqNo.headSet(n),方法返回当前集合中小于 n 的集合
                    if (multiple) {
                        //批量确认:将集合中小于等于当前序号 deliveryTag 元素的集合删除,标识这批序号的消息被 ack 了
                        confirmSeqNo.headSet(deliveryTag + 1).clear();
                    }else {
                        //单挑确认:将当前的 deliveryTag 从集合中移除
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple) {
                        confirmSeqNo.headSet(deliveryTag + 1).clear();
                    }else {
                        confirmSeqNo.headSet(deliveryTag);
                    }
                    //业务需要根据实际场景来处理,例如重发,整理省略
                }
            });
            //5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms" + i;
                //拿到消息的 ID
                long seqNo = channel.getNextPublishSeqNo();
                //发送消息时,会带着序号
                channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
                //将消息的序号添加到有序集合,表示这个消息发送过去了但还未确认
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()) {
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start);
        }
    }

    /**
     * 批量确认
     */
    private static void publishingMessagesInBatches() throws Exception {
        try (Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为 confirm 模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
            //4. 发送消息
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms" + i;
                channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize) {
                    //当 outstandingMessageCount 和 batchSize 相等,就等待 5000 ms 之后在批量确认
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount > 0) {
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略,消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end - start);
        }
    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception {
        try (Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为 confirm 模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constant.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
            //4. 发送消息,并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms" + i;
                channel.basicPublish("",Constant.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
                //等待确认消息,只要消息被确认,这个方法就会被返回,如果超时过期,则抛出 TimeoutException,如果消息被
                //nack(丢失),waitForConfirmsOrDie将抛出IOException
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略,消息条数: %d,耗时: %d ms \n",MESSAGE_COUNT,end - start);
        }
    }
}

运行程序,观察结果

从上述程序可以看出,单独确认效率最低,而异步确认消息最高

单独确认:

这种策略时每发送一条消息后就调用 channel.waitForConfirmsOrDie 方法,之后等待服务端的确认,实际上时一种串行同步等待的方式,对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回

批量确认:

每发送一条消息后,调用 channel.waitForConfirmsOrDie 方法,等待服务端的确认返回,相比单独确认,数据量越大,效率越高,缺点是出现 Basic.Nack 或者超时时,不清楚是那条消息出现了问题,客户端需要将者一批消息全部重发,当消息经常丢失时,批量确认的性能不升反降

异步确认:

异步 confirm 方法实现最为复杂,Channel 接口提供了一个方法 addConfirmListener 这个方法,可以添加 ConfirmListener 回调接口

ConfirmListener 接口中包含两个方法:handleAck(long deliveryTag,boolean multiple)和 handleNack(long deliveryTag,boolean multiple),分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack

deliveryTag 表示发送消息的序号,multiple 表示是否批量确认

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1976705.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Yolov8在RK3588上进行自定义目标检测(四)

参考 Yolov8在RK3588上进行自定义目标检测(一) Yolov8在RK3588上进行自定义目标检测(二) Yolov8在RK3588上进行自定义目标检测(三) YOLOV8火灾检测模型的边缘端推理 验证rknn模型 1.将转换好的rknn模型上传到板子上&#xff0c;再在板子上安装rknn-toolkit-lite2,将上面的…

Nexus3 Repository代理pypi设置与应用

目录 1. 创建Blob库并指定路径 2. 创建pypi阿里镜像源 3. 创建pypi腾讯镜像源 4. 创建一个pypi组管理 5. 配置pip 6. 下载测试 扩展&#xff1a;配置好后无法下载解决思路。 Nexus 存储库中的 Blob 存储是指一种用于存储大量非结构化数据的技术。在 Nexus 存储库的上下文…

基于YOLOv8的垃圾检测系统

基于YOLOv8的垃圾检测系统 (价格85) 包含 [硬纸板&#xff0c;玻璃&#xff0c;金属&#xff0c;有机废物&#xff0c;纸&#xff0c;塑料] 6个类 通过PYQT构建UI界面&#xff0c;包含图片检测&#xff0c;视频检测&#xff0c;摄像头实时检测。 &#xff08;该系统可以…

马来西亚原生静态IP注册的账号稳定吗?

马来西亚作为东南亚重要的经济体之一&#xff0c;其网络基础设施和互联网服务水平在近年来有了显著提升。静态IP作为一种固定的互联网协议地址&#xff0c;对于某些特定的网络应用和需求非常重要。本文将围绕马来西亚原生静态IP注册的账号稳定性进行探讨&#xff0c;分析其在不…

JVM—虚拟机类加载器

参考资料&#xff1a;深入理解Java虚拟机&#xff1a;JVM高级特性与最佳实践&#xff08;第3版&#xff09;周志明 1. 类加载器 JVM设计团队有意把类加载阶段中的 “通过一个类的全限定名来获取该类的二进制字节流” 这个动作放到JVM外部实现&#xff0c;这个动作的代码称为类…

classical Chinese

classical Chinese 中型娃娃暑假作业背诵 文言文《伯牙鼓琴》 1&#xff09;拿到文言文&#xff0c;先看一遍 2&#xff09;用白话文&#xff08;现代文&#xff09;翻译一次 3&#xff09;用白话文对照回去文言文&#xff08;白话文中那些需要替换回文言文呢&#xff09; 虽…

电脑入门|如何设置默认程序打开文件的软件?弄它!

前言 最近发现一件很奇葩的事情&#xff1a;电脑文件使用不合适的软件打开&#xff0c;然后就以为打不开文件了。 千万不要笑&#xff0c;这个问题是电脑小白经常遇到的。 我曾经见过有小伙伴用Photoshop打开一个.rar的文件…… 奇奇怪怪的事情总会有很多&#xff0c;毕竟谁…

【算法设计题】合并两个非递减有序链表,第1题(C/C++)

目录 第1题 合并两个非递减有序链表 得分点&#xff08;必背&#xff09; 题解 函数声明与初始化变量&#xff1a; 初始化合并链表的头节点&#xff1a; 合并两个链表&#xff1a; 处理剩余节点&#xff1a; 返回合并后的链表&#xff1a; 完整测试代码 &#x1f308;…

如何将文件转换成PDF(四种PDF虚拟打印机介绍)

Microsoft Print To PDF 这是Windows 10及以上系统自带的转换成PDF的工具 运行输入 optionalfeatures 打开可选功能&#xff0c;钩上 [Microsoft Print To PDF] 安装完成后&#xff0c;打开一个支持打印的文件类型或者网页&#xff0c;选择打印&#xff0c;在打印机界面可以看…

4.Redis数据结构通用命令

Redis数据结构 Redis是一个键值对的数据库。 key&#xff1a;大多都是String value: 类型多种多样 Redis通用命令 keys :查看所有的key 不建议在生产环境上使用keys命令&#xff0c;因为redis是单线程的&#xff0c;keys命令会搜索很长一段时间&#xff0c;搜索的期间redi…

[数据集][目标检测]金属罐缺陷检测数据集VOC+YOLO格式8095张4类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;8095 标注数量(xml文件个数)&#xff1a;8095 标注数量(txt文件个数)&#xff1a;8095 标注…

Llama3.1技术报告简要解读--附PDF

以为前些天是在炒作llama3.1泄露&#xff0c;没想到Meta在24号凌晨直接开源了&#xff0c;包括三个不同参数规模的模型&#xff08;8B、70B、405B&#xff09;&#xff0c;三个模型上下文长度都是128K&#xff0c;其中超大杯拥有4050亿参数&#xff0c;从评测指标来看必是最强开…

基于MPC在线优化的有效集法位置控制器simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 模型预测控制是一种基于模型的优化控制方法&#xff0c;它利用过程模型预测未来行为&#xff0c;并通过求解一个优化问题来确定最优控制序列。MPC的特点在于它能够处理系统的动…

罗汉果糖基转移酶 SgUGT94-289-3--文献精读-37

Structural insights into the catalytic selectivity of glycosyltransferase SgUGT94-289-3 towards mogrosides 关于糖基转移酶 SgUGT94-289-3 对罗汉果苷催化选择性的结构洞察 摘要 罗汉果苷是一系列从罗汉果 (Siraitia grosvenorii) 中提取的天然甜味剂。这些罗汉果苷具…

PyQt5入门

Python中经常使用的GUI控件集有PyQt、Tkinter、wxPython、Kivy、PyGUI和Libavg。其中PyQt是Qt(c语言实现的)为Python专门提供的扩展 PyQt是一套Python的GUI开发框架,即图形用户界面开发框架.。而在Python中则使用PyQt这一工具包&#xff08;PyQt5、PyQt5-tools、PyQt5-stubs&am…

数学建模--支持向量机

目录 SVM的基本原理 SVM的应用场景 实现细节与案例分析 总结 支持向量机&#xff08;SVM&#xff09;在处理非线性数据时的核函数有哪些&#xff0c;以及它们各自的优缺点是什么&#xff1f; 如何选择支持向量机的惩罚参数CC以优化模型性能和计算效率&#xff1f; 在实际…

Day17_1--AJAX学习之GET/POST传参

AJAX 简介 AJAX 是一种在无需重新加载整个网页的情况下&#xff0c;能够更新部分网页的技术。其实AJAX就可以理解为就是JS。通过AJAX也就实现了前后端分离&#xff0c;前端只写页面&#xff0c;后端生成数据&#xff01; 现在开始通过实例学习&#xff1a; 1--GET传参 <!…

4.4 标准正交基和格拉姆-施密特正交化

本节的两个目标就是为什么和怎么做(why and how)。首先是知道为什么正交性很好&#xff1a;因为它们的点积为零&#xff1b; A T A A^TA ATA 是对角矩阵&#xff1b;在求 x ^ \boldsymbol{\hat x} x^ 和 p A x ^ \boldsymbol pA\boldsymbol{\hat x} pAx^ 时也会很简单。第二…

深入解析 kubectl describe pod:全面了解 Kubernetes Pod 的运行状态

引言 在 Kubernetes 集群中&#xff0c;kubectl describe pod 命令是运维人员和开发者常用的工具之一&#xff0c;它提供了有关 Pod 的详细信息&#xff0c;帮助我们了解 Pod 的状态、配置和运行状况。这篇博文将深入解析 kubectl describe pod 命令的输出内容&#xff0c;逐项…

Transformer网络的魔改结构与应用领域

Transformer网络的魔改结构与应用领域 Transformer的基础架构Transformer的变体Transformer的应用领域未来发展方向 参考文献 自从Transformer架构在2017年被提出以来&#xff0c;它已经成为深度学习领域的一项革命性技术。Transformer最初应用于自然语言处理&#xff08;NLP&a…