Apache Pulsar源码解析之Lookup机制

news2025/1/19 20:18:35

引言

在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧

Lookup是什么

在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种

  • 应该跟哪台Broker建立连接
  • Topic的Schema信息
  • Topic的分区信息

其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
在这里插入图片描述

  1. 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
  2. Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
  3. 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
  4. 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
  5. 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入

补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。

客户端实现原理

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
                        ProducerInterceptors interceptors, Optional<String> overrideProducerName) {
				....
        //这里进去就是创建跟Broker的网络连接
        grabCnx();
    }

    void grabCnx() {
      	//实际上是调用ConnectionHandler进行的
        this.connectionHandler.grabCnx();
    }

		protected void grabCnx(Optional<URI> hostURI) {
  		....
      //这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法
      cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
  		....
    }


    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
      	//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

    public LookupService getLookup(String serviceUrl) {
        return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
            try {
              	//忽略其他的,直接跟这里进去
                return createLookup(serviceUrl);
            } catch (PulsarClientException e) {
                log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
                throw new IllegalStateException("Failed to update url " + url);
            }
        });
    }

    public LookupService createLookup(String url) throws PulsarClientException {
      	//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询
        if (url.startsWith("http")) {
            return new HttpLookupService(conf, eventLoopGroup);
        } else {
            return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }
    }

		
    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
            throws PulsarClientException {
      	//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数
        this.httpClient = new HttpClient(conf, eventLoopGroup);
        this.useTls = conf.isUseTls();
        this.listenerName = conf.getListenerName();
    }

    protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
			....
      //可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包
      httpClient = new DefaultAsyncHttpClient(config);
			....
    }

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
        TopicName topicName = TopicName.get(topic);
        return getLookup(url).getBroker(topicName)
                .thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),
                        lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));
    }

   public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
        //判断访问哪个版本的接口
        String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
        String path = basePath + topicName.getLookupName();
        path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
        //获取要访问的Broker地址
        return httpClient.get(path, LookupData.class)
                .thenCompose(lookupData -> {
            URI uri = null;
            try {
              	//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口
                InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                //HTTP通过Lookup方式访问服务端绝对不会走代理
                return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,
                        false /* HTTP lookups never use the proxy */));
            } catch (Exception e) {
							....
            }
        });
    }

public class LookupTopicResult {
  	//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口
    private final InetSocketAddress logicalAddress;
    private final InetSocketAddress physicalAddress;
    private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节

服务端实现原理

服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url

    public void lookupTopicAsync(
            @Suspended AsyncResponse asyncResponse,
            @PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
            @QueryParam("listenerName") String listenerName,
            @HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
        TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
        if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
            listenerName = listenerNameHeader;
        }
        //可以看得到这里是获取Lookup的,跟踪进去看看
        internalLookupTopicAsync(topicName, authoritative, listenerName)
                .thenAccept(lookupData -> asyncResponse.resume(lookupData))
                .exceptionally(ex -> {
                    ....
                });
    }


 			protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
                            //获得目标Broker地址, 继续从这里进去
                            .getBrokerServiceUrlAsync(topicName,
                                    LookupOptions.builder()
                                            .advertisedListenerName(listenerName)
                                            .authoritative(authoritative)
                                            .loadTopicsInBundle(false)
                                            .build());
    }

    public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
        long startTime = System.nanoTime();

        // 获取这个Topic所归属的Bundle
        CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
                .thenCompose(bundle -> {
                    //根据获得的bundle信息查询归属的Broker
                    return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
                      			//如果findRedirectLookupResultAsync方式没查到则走这里进行查询
                            return findBrokerServiceUrl(bundle, options); 
                    });
                });

        future.thenAccept(optResult -> {
            ....
        }).exceptionally(ex -> {
						....
        });

        return future;
    }

先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
        return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
          			//直接看findBundle,命名意思已经很清晰了
                .thenApply(bundles -> bundles.findBundle(topic));
    }

    public NamespaceBundle findBundle(TopicName topicName) {
        checkArgument(nsname.equals(topicName.getNamespaceObject()));
      	//同理,继续跟踪进去
        return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);
    }

    public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
        //计算Topic名称的哈希值
        long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
        //根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的
        NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);
        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
            bundle.setHasNonPersistentTopic(true);
        }
        return bundle;
    }

    protected NamespaceBundle getBundle(long hash) {
        //通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle
      	//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息
        int idx = Arrays.binarySearch(partitions, hash);
        int lowerIdx = idx < 0 ? -(idx + 2) : idx;
        return bundles.get(lowerIdx);
    }

知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
        if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
            return CompletableFuture.completedFuture(Optional.empty());
        }
        return redirectManager.findRedirectLookupResultAsync();
    }

总结

以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1571945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习实战18-机器学习中XGBClassifier分类器模型的应用实战,以及XGBClassifier分类器的调优策略

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下机器学习实战18-机器学习中XGBClassifier分类器模型的应用实战&#xff0c;以及XGBClassifier分类器的调优策略。XGBClassifier是基于eXtreme Gradient Boosting (XGBoost)算法的分类器模型&#xff0c;在机器学习领…

成为图像SoC大牛有多难?

IPC&#xff08;Internet Protocol Camera&#xff0c;即网络摄像机&#xff09;芯片架构师需要具备一系列跨学科的知识和技能。IPC芯片架构师的工作涉及到先进工艺、低功耗、SoC架构、处理器架构、图像处理、视频压缩、网络通信以及嵌入式系统设计等多个领域。以下是一些关键的…

Ubuntu22.04中基于Qt开发Android App

文章目录 前言在Ubuntu22.04中配置开发环境案例测试参考 前言 使用Qt开发手机应用程序是一种高效且灵活的选择。Qt作为一个跨平台的开发框架&#xff0c;为开发者提供了统一的开发体验和丰富的功能库。首先&#xff0c;Qt的跨平台性让开发者可以使用相同的代码库在不同的操作系…

收藏|深入浅出分析光刻机

光刻技术是指在光照作用下&#xff0c;借助光致抗蚀剂&#xff08;又名光刻胶&#xff09;将掩膜版上的图形转移到基片上的技术。 光刻机是半导体生产制造的主要生产设备之一&#xff0c;也是决定整个半导体生产工艺水平高低的核心技术机台。半导体技术发展都是以光刻机的光刻线…

【测试开发学习历程】python流程控制

前言&#xff1a;写到这里也许自己真的有些疲惫了&#xff0c;但是人生不就是像西西弗斯推石上山一样的枯燥乏味吗&#xff1f; 在python当中的流程控制主要包含以下两部分的内容&#xff1a; 条件判断 循环 1 条件判断 条件判断用if语句实现&#xff0c;if语句的几种格式…

微软detours代码借鉴点备注

comeasy 借鉴点1 Loadlibray的时间选择 注入库wrotei.dll&#xff0c;为了获取istream的接口&#xff0c;需要loadlibrary&#xff0c;但是在dllmain中是不建议这样做的。因此&#xff0c;动态库在dllmain的时候直接挂载了comeasy.exe的入口 //获取入口 TrueEntryPoint (i…

太阳能光伏储能系统:全周期一站式解决方案

随着全球能源结构的不断变革&#xff0c;清洁能源的重要性日益凸显。太阳能光伏储能系统作为一种高效、环保的能源解决方案&#xff0c;正逐渐成为推动能源转型的关键力量。本文将详细介绍太阳能光伏储能系统的全周期一站式解决方案&#xff0c;以期为读者提供全面、深入的了解…

动态多目标优化:动态约束多目标优化测试集DCP1-DCP9的TruePF(提供MATLAB代码)

一、进化动态约束多目标优化测试集DCP1-DCP9 参考文献&#xff1a; [1]G. Chen, Y. Guo, Y. Wang, J. Liang, D. Gong and S. Yang, “Evolutionary Dynamic Constrained Multiobjective Optimization: Test Suite and Algorithm,” in IEEE Transactions on Evolutionary Com…

聚能共创下一代智能终端操作系统 软通动力荣膺“OpenHarmony优秀贡献单位”

近日&#xff0c;由开放原子开源基金会指导&#xff0c;以“开源共享未来”为主题的OpenHarmony社区年会在北京成功举办。本次活动汇集OpenHarmony项目群共建单位及生态伙伴等多方力量&#xff0c;旨在对2023年度OpenHarmony年度开源事业全面总结的同时&#xff0c;吸引更多伙伴…

VSCode如何调试C#代码?

1、启动VSCode&#xff1b; 一、创建项目 1、创建一个文件夹(workspace)&#xff1a; 2、进入这个文件夹 cd tt1 3、创建解决方案 dotnet new sln -o MyApp 4、进入解决方案 cd .\MyApp\ 5、创建项目&#xff08;在此假定为一个命令行的项目&#xff09; dotnet new …

PCIe 7.0|不要太卷,劝你先躺平

PCIe 6.0都已经发布了2-3年了&#xff0c;目前业内生态还没完全建立。甚至很多人都还没用上PCIe 5.0呢&#xff01; 近日&#xff0c;PCIe 7.0 ver0.5版本已经开放&#xff0c;同时宣布马不停蹄准备在2025年完成正式SPEC规范发布。 回顾PCIe 7.0变更&#xff0c;PCI-SIG在2022年…

力扣1448---统计二叉树中好节点的数量(Java、DFS、中等题)

题目描述&#xff1a; 给你一棵根为 root 的二叉树&#xff0c;请你返回二叉树中好节点的数目。 「好节点」X 定义为&#xff1a;从根到该节点 X 所经过的节点中&#xff0c;没有任何节点的值大于 X 的值。 示例 1&#xff1a; 输入&#xff1a;root [3,1,4,3,null,1,5] 输出…

SSM项目实战——哈哈音乐(四)前台模块开发

1、项目准备 ①导入依赖和前端资源 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.x…

Educational Codeforces Round 162 (Rated for Div. 2) ----- E. Count Paths --- 题解

E. Count Paths&#xff1a; 题目大意&#xff1a; 思路解析&#xff1a; 根据题目中定义的美丽路径&#xff0c;我们可以发现路径只有两种情况&#xff1a; 当前结点作为起始结点&#xff0c;那我们只需要知道它的子树下有多少个相同颜色的结点&#xff0c;并且相同颜色的结…

攻防世界:mfw[WriteUP]

根据题目提示考虑是git库泄露 这里在地址栏后加.git也可以验证是git库泄露 使用GitHack工具对git库进行恢复重建 在templates目录下存在flag.php文件&#xff0c;但里面并没有flag 有内容的只有主目录下的index.php index.php源码&#xff1a; <?phpif (isset($_GET[page…

2024年最新版FL Studio21.2.3 Build 4004 for Mac 版激活下载和图文激活教程

FL studio21中文别名水果编曲软件&#xff0c;是一款全能的音乐制作软件&#xff0c;包括编曲、录音、剪辑和混音等诸多功能&#xff0c;让你的电脑编程一个全能的录音室&#xff0c;它为您提供了一个集成的开发环境&#xff0c;使用起来非常简单有效&#xff0c;您的工作会变得…

重读Java设计模式: 桥接模式详解

引言 在软件开发中&#xff0c;经常会遇到需要在抽象与实现之间建立连接的情况。当系统需要支持多个维度的变化时&#xff0c;使用传统的继承方式往往会导致类爆炸和耦合度增加的问题。为了解决这一问题&#xff0c;我们可以使用桥接模式。桥接模式是一种结构型设计模式&#…

机器学习中的GBDT模型及其优缺点(包含Python代码样例)

目录 一、简介 二、优缺点介绍 三、Python代码示例 四、总结 一、简介 GBDT&#xff08;Gradient Boosting Decision Tree&#xff09;是一种集成学习算法&#xff0c;被广泛应用于机器学习中的回归和分类问题。它由多个决策树组成&#xff0c;每个决策树都通过迭代逐渐提升…

巧用 STM32CubeIDE 之编译警告

1. 前言 编译警告对于工程师们来说&#xff0c;是再常见不过的了。对于严谨的工程师们来说&#xff0c;任何warning 都是不可忽视的。 2. 巧妙使用 warning 在 STM32CubeIDE 中&#xff0c;我们可以通过主动 warning&#xff08;甚至 error&#xff09;的方式来通知工程师&a…

计算机网络-TCP基础、三次挥手、四次握手过程

TCP基础 定义&#xff1a;TCP是面向连接的、可靠的、基于字节流的传输层通信协议。这意味着在发送数据之前&#xff0c;TCP需要建立连接&#xff0c;并且它能确保数据的可靠传输。此外&#xff0c;TCP将数据视为无结构的连续字节流。面向连接&#xff1a;TCP只能一对一进行连接…