大数据-Storm流式框架(五)---DRPC

news2024/11/16 13:26:18

DRPC

概念

分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。 Storm拓扑接收函数参数流作为输入,并为每个函数调用发送结果的输出流。

DRPC并不是Storm的一个特征,因为它基于Storm的spouts,bolts和拓扑的高级抽象。DRPC本可以打包成Storm独立的库,但是跟storm绑定在一起很有用。

顶层视角

分布式RPC由“DRPC服务器”协调(Storm随附实现)。 DRPC服务器协调接收RPC请求,将请求发送到Storm拓扑,从Storm拓扑接收结果,并将结果发送回等待的客户端。 从客户端的角度来看,分布式RPC调用看起来就像常规的RPC调用。 例如,以下是客户端如何使用参数“http://twitter.com”计算“到达”函数的结果:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

分布式RPC工作流程:

客户端向DRPC服务器发送要执行的函数名称以及该函数的参数。实现该功能的拓扑使用DRPCSpout从DRPC服务器接收函数调用流。 每个函数调用都由DRPC服务器标记唯一ID。 然后拓扑计算结果,在拓扑结束时,一个名为ReturnResults的bolt连接到DRPC服务器,并为其提供函数调用id的结果。 然后,DRPC服务器使用id来匹配客户端正在等待的结果,取消阻塞等待的客户端,并将结果发送给它。

LinearDRPCTopologyBuilder

Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:

     1、设置spout

     2、将结果返回给DRPC服务器

     3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合

我们来看一个简单的例子。 这是DRPC拓扑的实现,它返回带有“!”的输入参数。附:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}

public static void main(String[] args) throws Exception {
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
    builder.addBolt(new ExclaimBolt(), 3);
    // ...
}

正如你所看到的,没有几行代码。 创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。 LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。

在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。 LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。

本地模式的DRPC

DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();

cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));

System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。 LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。

启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。

远程模式的DRPC

在实际集群上使用DRPC也很简单。 有三个步骤:

     1、启动DRPC服务器

     2、配置DRPC服务器的位置

     3、将DRPC拓扑提交给Storm集群

启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:

bin/storm drpc

接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用于为storm集群创建合适的拓扑。

稍微复杂的示例

感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。

URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:

    1、获取推文网址的所有人

    2、获得所有这些人的所有粉丝

    3、独特的追随者

    4、统计一组独特的粉丝

在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。

此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
        .shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6)
        .fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
        .fieldsGrouping(new Fields("id"));

拓扑执行为四个步骤:

    1、GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。

    2、GetFollowers获得推特的追随者。它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。

    3、PartialUniquer通过关注者ID对关注者流进行分组。这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。

4、最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。

PartialUniquer代码:

public class PartialUniquer extends BaseBatchBolt {
    BatchOutputCollector _collector;
    Object _id;
    Set<String> _followers = new HashSet<String>();

    @Override
    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
        _collector = collector;
        _id = id;
    }

    @Override
    public void execute(Tuple tuple) {
        _followers.add(tuple.getString(1));
    }

    @Override
    public void finishBatch() {
        _collector.emit(new Values(_id, _followers.size()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "partial-count"));
    }
}

PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。

当PartialUniquer在execute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。

批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。

在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。 CoordinatedBolt利用直接流来管理这种协调。

拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。

非线性DRPC拓扑

LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。

LinearDRPCTopologyBuilder工作流程:

DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。

创建一个拓扑包括:

  1. DRPCSpout
  2. PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)
  3. CoordinatedBolt
  4. JoinResult(使用return info合并结果)
  5. ReturnResult(连接DRPC服务器以及返回结果)

LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。

进阶

KeyedFairBolt用于编织多个同时请求的处理

如何直接使用CoordinatedBolt

DRPC (Distributed RPC)  remote procedure call

分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。

DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。

(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。

(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑

方法1:

通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)

该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

方法2:

直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑

需要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式:

1、本地模式

2、远程模式(集群模式)

修改配置文件conf/storm.yaml

drpc.servers:

    - "node1“

启动DRPC Server

bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑

案例:

Twitter 中某个URL的受众人数统计(这篇twitter到底有多少人看到过)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1141114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐一个高效测试用例工具:XMind2TestCase..

一、背景 软件测试的核心是什么&#xff1f;毫无疑问是测试分析和测试用例设计&#xff0c;也是日常测试投入最多时间的工作内容之一。 然而&#xff0c;传统的测试用例设计过程有很多痛点&#xff1a; 1、使用Excel表格进行测试用例设计&#xff0c;虽然成本低&#xff0c;但…

FL Studio音乐编曲软件好不好用?要不要购买

音乐编曲软件的出现使得音乐创作者能够克服时间和空间的限制&#xff0c;随时随地进行创作。随着信息时代的发展&#xff0c;使用编曲软件已成为音乐创作领域的主流。那么编曲软件哪个好用呢&#xff1f;我推荐这三款。 在业内&#xff0c;常用的音乐编曲软件包括Cubase、Logi…

增强常见问题解答搜索引擎:在 Elasticsearch 中利用 KNN 的力量

在快速准确的信息检索至关重要的时代&#xff0c;开发强大的搜索引擎至关重要。 随着大型语言模型和信息检索架构&#xff08;如 RAG&#xff09;的出现&#xff0c;在现代软件系统中利用文本表示&#xff08;向量/嵌入&#xff09;和向量数据库已变得越来越流行。 在本文中&am…

scratch接钻石 2023年9月中国电子学会图形化编程 少儿编程 scratch编程等级考试三级真题和答案解析

目录 scratch接钻石 一、题目要求 1、准备工作 2、功能实现 二、案例分析

postgis ST_ClipByBox2D用法

官方文档 概述 geometry ST_ClipByBox2D(geometry geom, box2d box); 描述 以快速且宽松但可能无效的方式通过 2D 框剪切几何体。 拓扑上无效的输入几何图形不会导致抛出异常。 不保证输出几何图形有效&#xff08;特别是&#xff0c;可能会引入多边形的自相交&#xff09;…

FL Studio21.2中文版多少钱?值得下载吗

水果&#xff0c;全称Fruity Loop Studio&#xff0c;简称FL Studio。是一款全能的音乐制作软件&#xff0c;经过二十多年的演化更迭&#xff0c;其各项功能非常的先进。其开创性的Pat\song模式&#xff0c;也为初学者的学习提供了便利。那么水果音乐制作软件需要多少钱呢&…

鸡尾酒学习——沧海桑田

1、材料&#xff1a;冰块&#xff08;或者雪莲&#xff09;、蓝橙力娇酒、伏特加、橙汁、柠檬、雪碧/气泡水&#xff1b; 2、口感&#xff1a;酸甜口味&#xff0c;下层感觉是再喝橙汁&#xff0c;上层在喝有些度数的雪碧&#xff0c;可能是昨天的长岛冰茶过于惊艳&#xff0c;…

机器学习(四十九):高斯混合模型

补充一个聚类算法:高斯混合模型 假设有一组需要根据它们的相似性分组到几个部分或簇中的数据点。在机器学习中,这被称为聚类。有几种可用的聚类方法: K均值聚类分层聚类高斯混合模型在这篇文章中,我们将讨论高斯混合模型。 文章目录 正态或高斯分布期望最大化(EM)算法期…

微信 macOS 版迎来 3.8.4.20 更新,新功能一览

微信 macOS 版迎来 3.8.4.20 更新&#xff0c;增加了多个新功能&#xff0c;包括可将某个聊天在独立窗口中显示、可在聊天中搜索表情等。 附更新信息如下&#xff1a; 可将某个聊天在独立窗口中显示&#xff1b; ・可在聊天中搜索表情&#xff1b; ・新增 「看一看」&#…

Umijs创建一个antd+Ts项目环境

上文搭建Umijs环境并创建一个项目 介绍基本操作中 我们构建了一个Umijs环境的环境 但也只创建了一个页面 真正开发来讲 也不可能只创建几个界面这么简单 这里面的创建 还是非常完整的 这里 我创建一个文件夹 主要是做我们的项目目录 然后 我们在终端输入命令 然后 打开目录终…

VScode 自定义主题各参数解析

参考链接&#xff1a; vscode自定义颜色时各个参数的作用(史上最全)vscode编辑器&#xff0c;自己喜欢的颜色 由于 VScode 搜索高亮是在是太不起眼了&#xff0c;根本看不到此时选中到哪个搜索匹配了&#xff0c;所以对此进行了配置&#xff0c;具体想增加更多可配置项可参考…

第三篇:实践篇 《使用Assembler 实现图片任意切割功能》

实现原理&#xff1a; 共用一个texture、material、渲染状态等。紧通过修改vertex、uvs、indexes数据即可实现任意切割功能。 一、线段分割多边形&#xff0c;并分散多边形 线段分割多边形 已知多边形points&#xff0c;线段sp、ep。线段分割多边形得到两个多边形。 publi…

双十一期间VBA钜惠

大家好&#xff0c;本年度双11即将到来&#xff0c;为了答谢大家多年来的支持及更广泛的推广VBA的应用&#xff0c;“VBA语言専功”在此期间推出巨大优惠&#xff1a;此期间打包购买VBA技术资料实行半价优惠。 1 &#xff1a;面向对象&#xff1a;学员及非学员 2 &#xff1a…

AR眼镜安卓主板,智能眼镜光机方案定制

AR智能眼镜是一项涉及广泛技术的创新产品&#xff0c;它需要考虑到光学、显示、功耗、散热、延迟、重量以及佩戴人体工学等多个方面的因素&#xff0c;每一个项目都是技术进步所需攻克的难题。 在本文中&#xff0c;我们将重点讨论AR眼镜的主板和光学方案。 首先是AR智能眼镜的…

OpenCV学习(三)——响应鼠标事件(获取点击点坐标和颜色,利用鼠标进行绘图)

响应鼠标事件 3. 响应鼠标事件3.1 获取鼠标点击的坐标3.2 获取鼠标点击像素点的颜色3.3 在鼠标点击的位置生成圆3.4 通过拖动鼠标来绘制填充矩形3.5 通过拖动鼠标绘制未填充矩形3.6 使用鼠标选点绘制多边形3.7 按住鼠标左键进行绘图 3. 响应鼠标事件 使用OpenCV读取图像&#…

2023年第四届MathorCup大数据挑战赛(B题)|电商零售商家需求预测及库存优化问题|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2021年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 希望这些想法对大家的做题有一定的启发和借鉴意义。 让我们来…

python接口自动化测试

写在前面的话&#xff1a; 这个是我实际工作中写的项目&#xff0c;主要用来备注和后期查看~~大家可以参考学习&#xff0c;但是请不要用于其他不好的途径~~ 准备工作&#xff1a; 先下载HTMLTestRunner.py 下载地址&#xff1a;HTMLTestRunner - tungwaiyips software 参考&a…

框架安全-CVE 复现SpringStrutsLaravelThinkPHP漏洞复现

目录 服务攻防-框架安全&CVE 复现&Spring&Struts&Laravel&ThinkPHP概述PHP-开发框架安全-Thinkphp&Laravel漏洞复现Thinkphp-3.X RCEThinkphp-5.X RCELaravel框架安全问题- CVE-2021-3129 RCE JAVAWEB-开发框架安全-Spring&Struts2Struts2框架安全…

【MATLAB源码-第59期】基于matlab的QPSK,16QAM164QAM等调制方式误码率对比,调制解调函数均是手动实现未调用内置函数。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 正交幅度调制&#xff08;QAM&#xff0c;Quadrature Amplitude Modulation&#xff09;是一种在两个正交载波上进行幅度调制的调制方式。这两个载波通常是相位差为90度&#xff08;π/2&#xff09;的正弦波&#xff0c;因此…