RocketMQ底层通信机制

news2025/1/12 1:58:02

        分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的TCP通信协议是很有挑战的,本节介绍RocketMQ是如何解决这个问题的。

1.Remoting模块

RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构,如图4-1所示。


 

图4-1 Remoting模块的类继承关系

RemotingService为最上层接口,定义了三个方法:

·void start();

·void shutdown();

·void registerRPCHook(RPCHook rpcHook);

RemotingClient和RemotingServer继承RemotingService接口,并增加了自己特有的方法。RemotingClient的主要函数定义如代码清单4-6所示。

代码清单4-6 RemotingClient主要函数定义

void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);
RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis);
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback);
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis);
void updateNameServerAddressList(final List<String> addrs);

 

然后看看具体的实现类,NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer,而且都继承了NettyRemotingAbstract类。通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成,各个模块间的通信实现简洁明了。比如NameServer模块中,NameServerController有一个remotingServer变量,NameServer在启动时初始化各个变量,然后启动remotingServer即可,剩下NameServer要做的是专心实现处理RemotingCommand的逻辑,如代码清单4-7所示。

代码清单4-7 NameServer处理主流程代码

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }
    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

 

在Consumer的源码中,获取消息的底层通信部分同样发送一个RemotingCommand请求,返回的response也是个RemotingCommand类型,如代码清单4-8所示。

代码清单4-8 Consumer请求消息底层实现代码

private PullResult pullMessageSync(//
    final String addr, // 1
    final RemotingCommand request, // 2
    final long timeoutMillis// 3
) throws RemotingException, InterruptedException, MQBrokerException {
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processPullResponse(response);
}

 

从源码中可以看出,RocketMQ中复杂的通信过程,被RemotingCommand统一起来,大部分的逻辑都是通过发送、接受并处理Command来完成的。

2.协议设计和编解码

        RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换。协议格式如图4-2所示。

图4-2 RocketMQ的通信协议

1)第一部分是大端4个字节整数,值等于第二、三、四部分长度的总和;

2)第二部分是大端4个字节整数,值等于第三部分的长度;

3)第三部分是通过Json序列化的数据;

4)第四部分是通过应用自定义二进制序列化的数据。

消息的解码过程在RemotingCommand的decode函数里,如代码清单4-9所示。

代码清单4-9 消息解码函数

ublic static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();
    int oriHeaderLen = byteBuffer.getInt();
    int headerLength = getHeaderLength(oriHeaderLen);
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);
    RemotingCommand cmd = headerDecode(headerData, getProtocolType (oriHeaderLen));
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;
    return cmd;
}

 

对应的消息编码过程在RemotingCommand的encode函数中,如代码清单4-10所示。

代码清单4-10 消息编码函数

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;
    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;
    // 3> body data length
    if (this.body != null) {
        length += body.length;
    }
    ByteBuffer result = ByteBuffer.allocate(4 + length);
    // length
    result.putInt(length);
    // header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    // header data
    result.put(headerData);
    // body data;
    if (this.body != null) {
        result.put(this.body);
    }
    result.flip();
    return result;
}

 

3.Netty库

        RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket、NIO等复杂细节,用户只需用好Netty,就可以实现一个“网络编程专家+并发编程专家”水平的Server、Client网络程序。应用Netty有一定的门槛,需要了解它的EventLoopGroup、Channel、Handler模型以及各种具体的配置。RocketMQ利用Netty实现的通信类是NettyRemotingServer和NettyRemotingClient,用户也可以参考这两个类的实现来学习使用Netty。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1206206.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linkage Mapper 报错

1 . 错误提示&#xff1a;“No module named lm_config” 错误原因&#xff1a;**** 2.错误提示&#xff1a;“Cannot find an installation of Circuitscape in your Program Files directory.” 错误原因&#xff1a;***** 3. 错误提示&#xff1a;UnicodeEncodeError: ‘asc…

Windows 微PE WePE_64_V2.3 PE模式下启用账号和修改密码

启动PE后&#xff0c;进入桌面打开运行dism程序 选择带有系统的盘符&#xff08;默认选的是PE盘&#xff09;&#xff0c;然后打开会话 选择左侧工具箱&#xff0c;然后右侧找到账户管理 然后就可以对已有账号进行管理了 结束。

Java SE 封装、包、static关键字和代码块

1.封装 1.1封装的概念 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要研究的就是封装特性。何为封装呢&#xff1f;简单来说 就是套壳屏蔽细节。 封装&#xff1a;将数据和操作数据的方法进行有机结合&#xff0c;隐藏对象的属性和实现细…

springboot-error

Invalid bound statement (not found): com.example.demo.mapper.UserMapper.findAll 一直报错&#xff0c;找不到相应的mapper文件。 排除以下原因之后&#xff0c;还是不对&#xff1a; https://blog.csdn.net/xxpxxpoo8/article/details/127548543 最后发现是因为我的mapp…

云原生实战课大纲

1. 云原生是什么 原生应用&#xff08;java,pyrhon&#xff09; 上云的过程应用上云遇到的问题1.微服务的拆分 微服务的访问关系应用的架构云原生适合什么样的人去学具备什么样的前提条件云原生要学习什么docker k8s devlops server mesh jks k8s监控吧自己的微服务上云另…

聊聊leetcode可包含重复数字的序列的《47. 全排列 II》中的vis标记函数

1 题目描述&#xff08;字节二面题目&#xff09; 2 代码 class Solution {List<List<Integer>>res;List<Integer>list;boolean[]used;public List<List<Integer>> permuteUnique(int[] nums) {resnew ArrayList<>();listnew ArrayList&l…

PHP中$_SERVER全局变量

在PHP中&#xff0c;$_SERVER 是一个全局数组变量&#xff0c;它包含了有关服务器和当前脚本的信息。$_SERVER 数组中的每个元素都是服务器环境的一个参数&#xff0c;如请求的方法、请求的 URI、客户端 IP 地址等。 PATH 系统环境变量的值&#xff0c;包含了多个目录的路径…

SaaS 电商设计 (三) 如何做大促压测

一.背景&目标 1.1 常见的压测场景 电商大促:一众各大厂的促销活动场景,如:淘宝率先推出的天猫双11,而后京东拉出的京东 618 .还是后续陆陆续续的一些年货节, 3.8 女神节等等.都属于一些常规的电商大促 票务抢购:常见的如承载咱们 80,90 青春回忆的 Jay 的演唱会,还有普罗…

Docker - 容器数据卷

Docker - 容器数据卷 什么是容器数据卷 等同于挂载&#xff0c;将容器内的目录地址指向于宿主机文件系统中 直接使用命令来挂载 -v docker run -it -v 主机目录:容器内目录# 测试 docker run -it -v /root:/home centos /bin/bash [rootiZ2zeg7mctvft5renx1qvbZ ~]# docker …

求最大公约数math.gcd()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 求最大公约数 math.gcd() [太阳]选择题 下列代码执行输出的结果是&#xff1f; import math print("【执行】print(math.gcd(6, 8))") print(math.gcd(6, 8)) print(&quo…

vue3项目 Element-Plus DatePicker日期选择器组件限制只能选择7天内

需求&#xff1a;时间选择器 只能选择7天及以内 <template><el-date-pickerv-model"valueDate"type"daterange"range-separator"⇀"start-placeholder"开始日期"end-placeholder"结束日期"format"YYYY-MM-DD&…

保姆级Decimal.js的使用(如何解决js精度问题)

精度问题控制台图样 如果银行的业务你这样做&#xff0c;不知道要损失多少钱&#xff0c;这样是不行的&#xff0c;计算的不准确是需要背锅的&#xff0c;我们给后端去做吧&#xff0c;其实我们前端也是可以做的&#xff0c;引入Decimal.js 01.引入Decimal.js decimal.js是使用…

用户日期格式不一致导致BDC报时间格式不一致问题

问题描述 在做销售开票的功能时用的BDC&#xff0c;业务在测试的时候总是报日期格式不一致的错误&#xff0c;而我自己测的时候却没啥问题&#xff0c;调试的时候发现是我和业务的时间格式不一致&#xff08;我是YYYYMMDD,他是MMDDYYYY&#xff09;。 解决方案 用函数CONVERT…

博捷芯BJCORE:国内划片机品牌优势

国内划片机品牌在半导体设备制造领域奋起直追&#xff0c;展现出以下几个优势&#xff1a; 1. 技术提升&#xff1a;国内划片机品牌在技术上持续取得突破&#xff0c;例如设备精准度和切割精度的提高&#xff0c;可以在短时间内完成大量加工&#xff0c;提高了生产效率。 2. 适…

Python 如何实现访问者设计模式?什么是访问者(Visitor)模式?实际案例中有什么作用?

什么是访问者设计模式&#xff1f;访问者&#xff08;Visitor&#xff09;设计模式介绍&#xff1a; 访问者&#xff08;Visitor&#xff09;设计模式是一种行为设计模式&#xff0c;用于在不修改被访问对象的前提下定义新的操作。它通过将操作封装到独立的访问者类中&#xf…

centos7安装pandora

因为需要python3.7以上的环境所以下载minicanda安装脚本 1.下载地址 https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-py38_4.9.2-Linux-x86_64.sh把脚本上传到服务器 2&#xff0c;给.sh文件添加x执行权限 sudo chmod ux Miniconda3-py38_4.9.2-Linu…

【汇编】汇编语言的介绍

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、汇编是什么&#xff1f;二、为什么要学习汇编语言&#xff1f;三、学习汇编语言的好处四、安装汇编环境4.1 下载虚拟环境4.2 配置虚拟环境 总结 前言 计算…

瑞吉外卖01-实现管理端登录登出功能

开发前准备 准备数据表 结合页面原型创建数据库reggie,可以使用图形化界面或者MySQL命令运行SQL文件导入表结构(使用命令时sql文件不要放在中文目录中) 创建工程 创建一个SpringBoot的工程(勾选Spring Web&#xff0c;MySQL和MyBatis),配置pom.xml文件导入druid&#xff0c;…

本地化工具:Soluling Localization Crack

Soluling 是一个本地化工具&#xff0c;包含本地化项目所需的所有功能。Solling 使本地化变得非常容易。Soluling 是桌面应用程序和命令行工具的组合 。Solling支持100多种文件格式。通过 Soluling&#xff0c;您可以本地化桌面应用程序、移动应用程序、Web 应用程序、文档和在…

【赠书第4期】机器学习与人工智能实战:基于业务场景的工程应用

文章目录 前言 1 机器学习基础知识 2 人工智能基础知识 3 机器学习和人工智能的实战案例 4 总结 5 推荐图书 6 粉丝福利 前言 机器学习与人工智能是当前最热门的领域之一&#xff0c;也是未来发展的方向。随着科技的不断进步&#xff0c;越来越多的企业开始关注和投入机…