RocketMQ-Request-Reply特性

news2024/11/27 12:05:21

源码版本号:版本号:4.9.4

使用场景

随着服务规模的扩大,单机服务无法满足性能和容量的要求,此时需要将服务拆分为更小粒度的服务或者部署多个服务实例构成集群来提供服务。在分布式场景下,RPC是最常用的联机调用的方式。

在构建分布式应用时,有些领域,例如金融服务领域,常常使用消息队列来构建服务总线,实现联机调用的目的。消息队列的主要场景是解耦、削峰填谷,在联机调用的场景下,需要将服务的调用抽象成基于消息的交互,并增强同步调用的这种交互逻辑。为了更好地支持消息队列在联机调用场景下的应用,rocketmq-4.6.0推出了“Request-Reply”特性来支持RPC调用。

设计思路

在rocketmq中,整个同步调用主要包括两个过程:

(1)请求方生成消息,发送给响应方,并等待响应方回包;

(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。

整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:

请求方如何同步等待回包

这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。

public class RequestResponseFuture {
    private final String correlationId;
    private final RequestCallback requestCallback;
    private final long beginTimestamp = System.currentTimeMillis();
    private final Message requestMsg = null;
    private long timeoutMillis;
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile Message responseMsg = null;
    private volatile boolean sendRequestOk = true;
    private volatile Throwable cause = null;

    // 通过CountDownLatch机制实现同步等待
    public Message waitResponseMessage(final long timeout) throws InterruptedException {
        this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
        return this.responseMsg;
    }
    
    public void putResponseMessage(final Message responseMsg) {
        this.responseMsg = responseMsg;
        this.countDownLatch.countDown();
    }
}

RequestResponseFuture中,利用correlationId来标识一个请求。如下图所示,Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。

consumer消费消息后,如何准确回包

(1)producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker,如下图所示。

(2)broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。

响应消息发送和一般的消息发送流程区别在于,响应消息不需要producer拉取,而是由broker直接推给producer。同时选择broker的策略也有变化:请求消息从哪个broker发过来,响应消息也发到对应的broker上。

Producer收到响应消息后,根据消息中的唯一标识符,从RequestResponseFuture的map中找到对应的RequestResponseFuture结构,设置响应消息,同时计数器减一,解除等待状态,使请求方收到响应消息。

broker回调到发送者的代码如下

// client包
// org.apache.rocketmq.client.impl.ClientRemotingProcessor
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    private void processReplyMessage(MessageExt replyMsg) {
        final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
        final RequestResponseFuture requestResponseFuture = RequestFutureHolder.getInstance().getRequestFutureTable().get(correlationId);
        if (requestResponseFuture != null) {
            requestResponseFuture.putResponseMessage(replyMsg);

            RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);

            if (requestResponseFuture.getRequestCallback() != null) {
                requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
            }
        } else {
            String bornHost = replyMsg.getBornHostString();
            log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
                    correlationId, bornHost));
        }
    }
}

使用方法

消费者

public class RRConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQProducer replyProducer = new DefaultMQProducer("producerGroupTest");
        replyProducer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        replyProducer.start();

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupNameTest");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 多个tag之间用||分隔,* 代表所有
        consumer.subscribe("TopicTest001", "*");

        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for (MessageExt msg : msgs) {
                    try {
                        System.out.printf("handle message: %s", msg.toString());

                        // 消费者收到消息后 发送响应消息
                        String replyTo = MessageUtil.getReplyToClient(msg);
                        byte[] replyContent = "reply message contents.".getBytes();
                        // create reply message with given util, do not create reply message by yourself
                        Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
                        // send reply message with producer
                        SendResult replyResult = replyProducer.send(replyMessage, 3000);
                        System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

生产者同步调用

public class RRProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupTest");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();
        byte[] bytes = ("消息内容" + 666).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("TopicTest001","TagA", UUID.randomUUID().toString(), bytes);
        // 第一个参数为发送的消息 第二个参数为同步调用的超时时间
        Message message = producer.request(msg, 6000);
        System.out.println("body=" + new String(message.getBody()) + " -> " + message);
        producer.shutdown();
    }
}

运行代码后发现,打印的内容并不是消费者返回的内容,而是发送的消息内容。

下面分析一下request方法看看原因

内部调用的是DefaultMQProducerImpl.request(Message, long)

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public class DefaultMQProducerImpl implements MQProducerInner {
    // 找到1366行
    public Message request(final Message msg,
                           long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginTimestamp = System.currentTimeMillis();
        // 里面会生成 correlationId 和 requestClientId
        prepareSendRequest(msg, timeout);
        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

        try {
            // 构造RequestResponseFuture对象
            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
            /**
             * 将RequestResponseFuture对象放入到Map中, key为correlationId
             * 后面broker回调的时候会通过correlationId找到RequestResponseFuture并将响应结果设置进去
             * 回调方法在:ClientRemotingProcessor#processReplyMessage
             */
            RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

            long cost = System.currentTimeMillis() - beginTimestamp;
            // 发送消息并设置回调方法
            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    /**
                     * 消息发送成功后调用回调方法
                     * 这里直接调用putResponseMessage方法并把当前发送的消息作为参数
                     */
                    requestResponseFuture.setSendRequestOk(true);
                    requestResponseFuture.putResponseMessage(msg);
                }

                @Override
                public void onException(Throwable e) {
                    requestResponseFuture.setSendRequestOk(false);
                    requestResponseFuture.putResponseMessage(null);
                    requestResponseFuture.setCause(e);
                }
            }, timeout - cost);
            /**
             * 因为消息发送成功后直接调用了RequestResponseFuture的putResponseMessage方法
             * 所以这里就直接返回了
             */
            return waitResponse(msg, timeout, requestResponseFuture, cost);
        } finally {
            /**
             * 移除RequestResponseFuture对象
             * 所以当broker回调ClientRemotingProcessor的processReplyMessage方法时也找不到对应的信息
             */
            RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
        }
    }
}

查看源代码后可以发现,虽然RocketMQ提供了RPC调用的能力,但是好像并不能直接用它提供的方法来打到同步调用的效果。

生产者异步调用

public class RRProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroupTest");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();
        byte[] bytes = ("消息内容" + 666).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("RRTopicTest010","TagA", UUID.randomUUID().toString(), bytes);
        // 第一个参数为发送的消息 第二个参数为同步调用的超时时间
        producer.request(msg, new RequestCallback() {
            @Override
            public void onSuccess(Message message) {
                if (message == null) {
                    System.out.println("message is null");
                } else {
                    System.out.println("body=" + new String(message.getBody()) + " -> " + message);
                }
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 6000);
        Thread.sleep(60*1000);
        producer.shutdown();
    }
}

运行代码发现,回调方法的onSuccess会被调用两次,第一次回调时消息为null,第二次回调时才是消费者返回的响应信息。

查看源码可以发现,消息发送成功时会被调用一次,而这个时候broker还没有来得及将相应信息设置到中,所以返回null。

当broker回调时,会将消费者的响应信息设置进去,所以第二次是有值的。

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public class DefaultMQProducerImpl implements MQProducerInner {
    // 找到1398行
    public void request(Message msg, final RequestCallback requestCallback, long timeout)
            throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
        long beginTimestamp = System.currentTimeMillis();
        prepareSendRequest(msg, timeout);
        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);

        final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
        RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

        long cost = System.currentTimeMillis() - beginTimestamp;
        this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                requestResponseFuture.setSendRequestOk(true);
                // 消息发送成功时直接调用回调方法
                requestResponseFuture.executeRequestCallback();
            }

            @Override
            public void onException(Throwable e) {
                requestResponseFuture.setCause(e);
                requestFail(correlationId);
            }
        }, timeout - cost);
    }
}

基于RocketMQ提供的方法自己实现

import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.utils.CorrelationIdUtil;

public class LzcProducer extends DefaultMQProducer {
    public LzcProducer(String producerGroup) {
        super(producerGroup);
    }
    /**
     * 异步调用
     * @param msg
     * @param callback
     * @param timeout
     */
    void rpcRequest(final Message msg, RequestCallback callback, long timeout) {
        long beginTimestamp = System.currentTimeMillis();
        prepareSendRequest(msg, timeout);
        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
        try {
            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
            RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);
            super.defaultMQProducerImpl.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    requestResponseFuture.setSendRequestOk(true);
                    // 消息发送成功
                    if (callback != null) {
                        try {
                            requestResponseFuture.waitResponseMessage(timeout - (System.currentTimeMillis() - beginTimestamp));
                            callback.onSuccess(requestResponseFuture.getResponseMsg());
                        } catch (InterruptedException e) {
                            callback.onException(e);
                        }
                    }
                }
                @Override
                public void onException(Throwable e) {
                    requestResponseFuture.setCause(e);
                }
            }, timeout);
        } catch (Exception e){
            throw  new RuntimeException(e);
        }
    }

    /**
     * 同步调用
     * @param msg
     * @param timeout
     * @return
     */
    Message rpcRequest(final Message msg, final long timeout) {
        prepareSendRequest(msg, timeout);
        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
        try {
            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
            RequestFutureHolder.getInstance().getRequestFutureTable().put(correlationId, requestResponseFuture);

//            SendResult sendResult = super.defaultMQProducerImpl.send(msg, timeout);
//            if (SendStatus.SEND_OK == sendResult.getSendStatus()) {
//                requestResponseFuture.setSendRequestOk(true);
//                Message message = requestResponseFuture.waitResponseMessage(requestResponseFuture.getTimeoutMillis());
//                if (requestResponseFuture.isSendRequestOk() && requestResponseFuture.isTimeout()) {
//                    // 消息发送成功但是超时......
//                }
//                return message;
//            }
//            // 发送失败
//            return null;

            super.defaultMQProducerImpl.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    requestResponseFuture.setSendRequestOk(true);
                }
                @Override
                public void onException(Throwable e) {
                    requestResponseFuture.setCause(e);
                }
            }, timeout);
            Message message = requestResponseFuture.waitResponseMessage(requestResponseFuture.getTimeoutMillis());
            if (requestResponseFuture.isSendRequestOk() && requestResponseFuture.isTimeout()) {
                // 消息发送成功但是超时......
            }
            return message;
        } catch (Exception e){
            throw  new RuntimeException(e);
        } finally {
            RequestFutureHolder.getInstance().getRequestFutureTable().remove(correlationId);
        }
    }

    private void prepareSendRequest(final Message msg, long timeout) {
        DefaultMQProducerImpl defaultMQProducerImpl = super.getDefaultMQProducerImpl();
        String correlationId = CorrelationIdUtil.createCorrelationId();
        String requestClientId = defaultMQProducerImpl.getMqClientFactory().getClientId();
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/620691.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高考季,17岁VS人工智能,谁的作文更胜一筹?

又到一年高考日。想起十二年前我也曾和众多莘莘学子一样,在这场人生的史诗里挣扎奋斗。 那时的我满怀着期待和焦虑,站在人生的岔口,茫然纠结该循哪条道路。十二年光阴荏苒,岁月如梭, 如今我已不复当年学子的面容,更无法回首当时的迷茫与彷徨。 时过境迁,我如今以另一种身份再…

flask+scrapy

管道数据库 class SpiderBookPipeline:def __init__(self):host localhostuser rootpassword hdp020820db 警察大学信息检索self.conn pymysql.connect(hosthost, useruser, passwordpassword, dbdb)self.cursor self.conn.cursor()def process_item(self, item, spider…

【Python】Python系列教程-- Python3 元组(十三)

文章目录 前言访问元组修改元组删除元组元组运算符元组索引&#xff0c;截取元组内置函数关于元组是不可变的 前言 往期回顾&#xff1a; Python系列教程–Python3介绍&#xff08;一&#xff09;Python系列教程–Python3 环境搭建&#xff08;二&#xff09;Python系列教程–…

项目中的Echarts图表统计

数据可视化 一、Echarts二、前端&#xff08;VueEcharts&#xff09;HomeView.vue&#xff08;完整代码&#xff09; 三、后端&#xff08;SpringBootMyBatis&#xff09;BorrowController.javaIBorrowService.javaBorrowService.javadatetimeToDateStr()函数countList()函数 B…

同样是产品经理 段位差别大

同样是产品经理&#xff0c;段位差别大 趣讲大白话&#xff1a;做人的差距大 【趣讲信息科技189期】 **************************** 市场越内卷 对产品的要求越来越高 不管叫不叫产品经理这个头衔 产品开发的重要性不会降低 《人人都是产品经理》总结的段位 姑且一看&#xff…

java线程多线程并发

文章目录 对java线程的认识wait&#xff08;&#xff09;和sleep&#xff08;&#xff09;区别&#xff1f;wait&#xff0c;notify为什么要放在同步代码块中&#xff1f; 多线程**什么时候使用多线程**&#xff1a;**多线程的优缺点**&#xff1a;**线程安全问题**&#xff1a…

MATLAB应用

目录 网站 智能图像色彩缩减和量化 网站 https://yarpiz.com/ 智能图像色彩缩减和量化 使用智能聚类方法&#xff1a;&#xff08;a&#xff09;k均值算法&#xff0c;&#xff08;b&#xff09;模糊c均值聚类&#xff08;FCM&#xff09;和&#xff08;c&#xff09;自组织神…

Mysql—存储过程

简介 存储过程就是类似于MySQL触发器&#xff0c;经过事先编写好相应语句&#xff0c;通过编译后存储在数据库上。触发器不需要手动调用即可实现相应SQL功能。MySQL存储过程&#xff0c;需要自己去调用得到相应的结果。 语法 创建存储过程 CREATE DEFINER CURRENT_USER PR…

git---->团队开发神器,一篇文章直接掌握

git---->团队开发神器&#xff0c;一篇文章直接掌握 一 学习git的原因概念版本的概念1 版本控制软件的基础功能2 集中式版本控制软件3 分布式版本控制 二 git的安装三 GitHub Desktop的使用四 团队操作五 中央服务器--github从github上下载文件到本地仓库传输文件 六 国内中…

chatgpt赋能python:Python如何实现自增

Python如何实现自增 在Python编程中&#xff0c;自增是一种非常常用的操作&#xff0c;它可以让我们在循环、计数等场景中更加方便地进行操作。实际上&#xff0c;在Python中&#xff0c;实现自增非常简单&#xff0c;本文将介绍Python中常用的自增操作&#xff0c;并分享自增…

时间复杂度 空间复杂度

概览 时间复杂度与空间复杂度的作用是在衡量一个算法的优劣性&#xff0c;以及在二者之间进行权衡&#xff0c;寻找二者的平衡点。 时间复杂度是指执行算法所需时间的增长率&#xff0c;而空间复杂度则是指执行算法所需存储空间的增长率。 高时间复杂度的算法可能需要在短时间…

LayUI前框框架普及版

LayUI 一、课程目标 1. 【了解】LayUI框架 2. 【理解】LayUI基础使用 3. 【掌握】LayUI页面元素 4. 【掌握】LayUI内置模块二、LayUI基本使用 2.1 概念 layui&#xff08;谐音&#xff1a;类UI) 是一款采用自身模块规范编写的前端 UI 框架&#xff0…

Nginx+Tomcat 负载均衡、动静分离

目录 一、Nginx代理服务器概念 1.正向代理 2.反向代理 二、动静分离 三、负载均衡 四、Nginx七层代理实验 1.部署Nginx服务 2. 部署Tomcat服务 2.1在192.168.88.50 虚拟机上部署双实例 2.2在192.168.88.60 上部署Tomcat服务器3 3.动静分离配置 3.1Tomcat1 server 配…

RecyclerView的回收缓存均由内部类Recycler完成

1. RecyclerView的三级缓存 通常在RecyclerView中存在着四级缓存&#xff0c;从低到高分别为&#xff1a; 可直接重复使用的临时缓存&#xff08;mAttachedScrap/mChangedScrap&#xff09; mAttachedScrap中缓存的是屏幕中可见范围的ViewHoldermChangedScrap只能在预布局状态…

Material—— 常用材质节点

目录 Coordinates Absolute World Position Actor Position Object Position Utility SphereMask Coordinates 表示坐标类的节点&#xff1b; Absolute World Position 别名为WorldPosition&#xff0c;此节点输出当前像素在世界空间内的位置&#xff1b;常用于查找相机到…

作为过来人:有什么话想对当年高考前的自己说

目录 引言千人千面-有什么话想对当年高考前的自己说怀念高中&#xff0c;数学太难多考一分&#xff0c;人生就会不一样一定要勇敢&#xff0c;不止高考别把高考不当回事6的我没话说想到啥就去做别选**大学/专业强烈想出名的拖鞋哥英语全选C&#xff0c;理综要细心会的全做对当时…

Spring - 注解开发

文章目录 Spring的注解开发一、Bean 基本注解开发1.1 Component Bean的配置1.2 其他注解配置Bean1.3 Component 衍生注解 二、Bean依赖注入注解开发2.1 Value2.2 Autowired2.3 Qualifier2.4 Resource 三、非自定义注解开发3.1 无参非自定义注解开发3.2 有参非自定义注解开发 四…

Domino 14新内核

大家好&#xff0c;才是真的好。 还记得去年&#xff0c;我们不断跟进而放出的Notes/Domino产品路线图吗&#xff1f;是的&#xff0c;HCL正在按照产品路线图稳步推进&#xff0c;而很多人提出的idea&#xff0c;也逐步加入到产品中&#xff0c;等会我们也会聊到。 我最喜欢这…

MySQL安装-Linux版

MySQL-Linux版安装 1、准备一台Linux服务器 云服务器或者虚拟机都可以&#xff1b; Linux的版本为 CentOS7&#xff1b; 2、 下载Linux版MySQL安装包 下载地址 3、上传MySQL安装包 使用FinalShell软件上传即可&#xff01; 4、 创建目录,并解压 mkdir mysqltar -xvf my…

【Web服务器】Nginx网站服务

文章目录 一、Nginx 概述1.什么是 Nginx2.Nginx 的特点3.Nginx 应用场景 二、Nginx 服务基础1.编译安装 Nginx 服务1.1 布置环境1.2 安装依赖包1.3 创建运行用户、组1.4 编译安装 2.Nginx 的运行控制2.1 检查配置文件2.2 启动、停止 Nginx2.3 日志分割以及升级 Nginx 服务2.4 添…