深入理解Storm 之 TridentStrom

news2024/9/30 9:23:12
从Demo讲起:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);
 
TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

Trident keeps track of a small amount of state for each input source (metadata about what it has consumed) in Zookeeper, and the "spout1" string here specifies the node in Zookeeper where Trident should keep that metadata.  (Trident 在zookeeper中为每一个输入资源(metaData)建立一系列状态监控和追踪.   "spout1" 指定了在zookeer中,Trident应该存储metaData的位置.)

the spout emits a stream containing one field called "sentence". The next line of the topology definition applies the Split function to each tuple in the stream, taking the "sentence" field and splitting it into words. Each sentence tuple creates potentially many word tuples – for instance, the sentence "the cow jumped over the moon" creates six "word" tuples.  (就是说每一个处理输入就是一个tuple,输出就是一个单独的tuple.)

One of the cool things about Trident is that it has fully fault-tolerant, exactly-once processing semantics. This makes it easy to reason about your realtime processing. Trident persists state in a way so that if failures occur and retries are necessary, it won't perform multiple updates to the database for the same source data.

Trident实现了容错和一次执行的原子性操作, 通过事务管理数据的整个处理过程。 并且失败后,可以重新尝试, 同一组数据不会多次更新。

DRPC

 a low latency distributed query on the word counts

低延迟的分布式查询

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
"words" 表示流名称.
topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
wordCounts: 另外一个数据流拓扑对象.

Each DRPC request is treated as its own little batch processing job that takes as input a single tuple representing the request. The tuple contains one field called "args" that contains the argument provided by the client.

一个单独的tuple 代表一个 ERPC request.  tuple 包含一个“args”字段,里面包含客户端请求参数. 比如:"cat dog the man"

Trident is intelligent about how it executes a topology to maximize performance. 

1.Operations that read from or write to state (like persistentAggregate and stateQuery) automatically batch operations to that state.  从state中读或者向state中写,自动转换为批量处理. 也可以可以设置cache,最大程度限制读写次数,提高性能和提供方便.

2.Trident aggregators are heavily optimized.   聚合最优化.

Rather than transfer all tuples for a group to the same machine and then run the aggregator, Trident will do partial aggregations when possible before sending tuples over the network. For example, the Count aggregator computes the count on each partition, sends the partial count over the network, and then sums together all the partial counts to get the total count. This technique is similar to the use of combiners in MapReduce.

aggregators  并不是转换所有的tuple给一个相同的机器,然后再去做聚合. Trident 会对tuple做部分聚合,然后把部分聚合发送到其他集群节点,一起汇总一个总的聚合.

另外一个例子:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState()); //曾经转发url的tweeter用户.
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState()); //tweeter用户的粉丝数量.

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) // args,DRPCClient的请求参数,url. 根据url获取到转发该url的tweeter用户集合. 一个tuple.
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) // 展开tweeters集合用户, tweeter, 很多个tuple,取决于用户集合数量.
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) // 根据个体用户tweeter,查询他或她的粉丝群,followers集合,一个tuple.
       .parallelismHint(200) // 为粉丝群开启200个并行任务.
       .each(new Fields("followers"), new ExpandList(), new Fields("follower")) //展开粉丝群, followers -> follower. 多个tuple.
       .groupBy(new Fields("follower")) //对粉丝群进行分组.
       .aggregate(new One(), new Fields("one")) //合并相同的粉丝为一个.
       .parallelismHint(20) 
       .aggregate(new Count(), new Fields("reach")); // 计算人数.
 

Fields and tuples

The Trident data model is the TridentTuple which is a named list of values. 

过滤操操作:

Consider this example. Suppose you have a stream called "stream" that contains the fields "x", "y", and "z". To run a filter MyFilter that takes in "y" as input, you would say:

stream.each(new Fields("y"), new MyFilter())

Suppose the implementation of MyFilter is this:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

This will keep all tuples whose "y" field is less than 10.  会过滤掉所有y<10的tuple.

each 操作:

假设stream里面有x,y,z字段.

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied")); // 运算完之后的字段是: x, y, z, added, multiplied

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) { // 这个tuple里面仅仅有x,y两个字段.没有z字段.
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

聚合操作:

With aggregators, on the other hand, the function fields replace the input tuples. So if you had a stream containing the fields "val1" and "val2", and you did this:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

The output stream would only contain a single tuple with a single field called "sum", representing the sum of all "val2" fields in that batch.

流输出结果仅包含一个tuple,tuple里面仅有一个字段sum. 代表在此轮批量处理中val2的总和.

分组聚合操作: 

With grouped streams, the output will contain the grouping fields followed by the fields emitted by the aggregator. For example:

stream.groupBy(new Fields("val1")) // 输出字段里包含正在分组的字段val1, 跟随着聚合字段sum.
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum")) // sum 字段替换val2字段.
 

In this example, the output will contain the fields "val1" and "sum".

State

如何保证每个消息仅仅执行一次:

Trident solves this problem by doing two things:

  1. Each batch is given a unique id called the "transaction id". If a batch is retried it will have the exact same transaction id.   给每个批次赋予交易id, 失败重试的时候,交易id是一样的.
  2. State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.  //交易id按序列运行, 如果2没有成功,3也不会执行.

    存储交易id和计数value到数据库里面,

Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count.

交易id也不是必须得存储起来,Trident有一套自己的"至少一次消息处理"机制,保证失败的情况.

Execution of Trident topologies  ,  Trident 拓扑的执行流程: 

tuple 只有在shuffle或group的时候,才会进行网络同步.

Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle.

Trident topologies compile down into as efficient of a Storm topology as possible. Tuples are only sent over the network when a repartitioning of the data is required, such as if you do a groupBy or a shuffle.

Trident拓扑会最优化的编译成最高效的Storm拓扑.

So if you had this Trident topology:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/381011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新库上线 | CnOpenData中国债券市场债券信息数据

中国债券市场债券信息数据 一、数据简介 债券是政府、企业、银行等债务人为筹集资金&#xff0c;按照法定程序发行并向债权人承诺于指定日期还本付息的有价证券。债券购买者或投资者与发行者之间是一种债权债务关系&#xff0c;债券发行人即债务人&#xff0c;投资者&#xff…

关于 python 的异常使用说明 (python 的文件和异常)

文章目录异常1. 处理异常 ZeroDivisionError 异常2. 使用 try-except 代码块3. 使用异常避免崩溃4. else 代码块5. 处理 FileNotFoundError 异常6. 分析文本7. 失败时一声不吭异常 pyhong 使用被异常成为异常的特殊对象来管理程序执行期间发生的错误。 每当发生让 python 不知所…

【计算机网络:自顶向下方法】Chapter5 网络层:控制平面

本系列文章为笔者在学习b站中科大郑烇老师的计算机网络课程时&#xff08;郑老师讲得很清晰&#xff01;&#xff01;&#xff09;&#xff0c;结合课程PPT与《计算机网络&#xff1a;自顶向下方法》&#xff08;第七版&#xff09;所作的学习笔记&#xff0c;部分图片源自课程…

gitee 奇安信代码卫士使用

注册 gitee 账号后&#xff0c;push 一个项目&#xff0c;或者 fork 一个别人的项目&#xff0c;这里 fork 了一个 java-sec-code 靶场&#xff0c;使用的是个人版&#xff0c;像是低配版的 fortify 在项目的 服务 项下&#xff0c;选择奇安信代码卫士 创建分析 新建分析&…

【Java|golang】2373. 矩阵中的局部最大值

给你一个大小为 n x n 的整数矩阵 grid 。 生成一个大小为 (n - 2) x (n - 2) 的整数矩阵 maxLocal &#xff0c;并满足&#xff1a; maxLocal[i][j] 等于 grid 中以 i 1 行和 j 1 列为中心的 3 x 3 矩阵中的 最大值 。 换句话说&#xff0c;我们希望找出 grid 中每个 3 x …

操作系统笔记、面试八股(一)—— 进程、线程、协程

文章目录1. 进程、线程、协程1.1 进程1.1.1 进程间的通信方式1.1.2 进程同步方式1.1.3 进程的调度算法1.1.4 优先级反转1.1.5 进程状态1.1.6 PCB进程控制块1.1.7 进程的创建和撤销过程1.1.8 为什么要有进程1.2 线程1.2.1 为什么要有线程1.2.2 线程间的同步方式1.3 协程1.3.1 什…

创建Firebase项目并接入Firebase推送: Firebase Cloud Messaging (FCM)

1.FCM简介&#xff1a;Firebase Cloud Messaging (FCM) 是一种跨平台消息传递解决方案&#xff0c;可供您可靠地传递消息&#xff0c;而且还是免费的服务。支持 Android&#xff0c;IOS,Web,Flutter,Unity.消息类型可以使用 FCM 向客户端发送两种类型的消息&#xff1a;通知消息…

CEC2017:鱼鹰优化算法(Osprey optimization algorithm,OOA)求解cec2017(提供MATLAB代码)

一、鱼鹰优化算法简介 鱼鹰优化算法&#xff08;Osprey optimization algorithm&#xff0c;OOA&#xff09;由Mohammad Dehghani 和 Pavel Trojovsk于2023年提出&#xff0c;其模拟鱼鹰的捕食行为。 鱼鹰是鹰形目、鹗科、鹗属的仅有的一种中型猛禽。雌雄相似。体长51-64厘米…

Allegro如何设置铜皮避让的优先级操作指导

Allegro如何设置铜皮避让的优先级操作指导 在用Allegro进行PCB设计的时候,时常需要使用动态铜皮进行设计,当两块动态铜皮存在交集的时候,避让就会存在一个优先级,如下图 上方的铜皮避让调了下方的铜皮,上方的铜皮被避让了 如何调整让下方的铜皮避让上方的铜皮,如下图 具…

入门JAVA第十六天 数据库

一 、数据库技术学习内容与方法 1.1学习内容 1 Oracle 数据库 目前最好的关系型数据库 基本的CRUD命令。 SQL语句。select(R),update(U),detele(D),insert(C) 2 MySQL数据库 中小型醒目非常好用的关系型数据库。 灵活&#xff0c;小巧。 3 扩展软件开发流程中数据库设计原则 …

严格模式和非严格模式下的this指向问题

一、全局环境 1.函数调用 非严格模式&#xff1a;this指向是Window // 普通函数 function fn () { console.log(this, this); } fn() // 自执行函数 (function fn () { console.log(this, this); })() 严格模式&#xff1a;this指向是undefined //…

866363-70-4,N3-C5-NHS ester,叠氮-C5-NHS 主要物理性质分享

●外观以及性质&#xff1a;Azido-Aca-NHS淡黄色或无色油状&#xff0c;叠氮化物可以与炔烃、DBCO和BCN进行铜催化的点击化学反应。NHS酯可以与胺基反应&#xff0c;形成稳定的酰胺键。●中文名&#xff1a;叠氮-C5-NHS ester&#xff0c;6-叠氮己酸活性酯●英文名&#xff1a;…

「TCG 规范解读」PC 平台相关规范(2)

可信计算组织&#xff08;Ttrusted Computing Group,TCG&#xff09;是一个非盈利的工业标准组织&#xff0c;它的宗旨是加强在相异计算机平台上的计算环境的安全性。TCG于2003年春成立&#xff0c;并采纳了由可信计算平台联盟&#xff08;the Trusted Computing Platform Alli…

CentOS 7安装N卡驱动和CUDA和cuDNN

前言系统一开始是CentOS 7.6&#xff0c;安装依赖时yum给的内核文件的版本号和uname -r的结果不一样&#xff0c;这时不能直接装依赖&#xff0c;装上后后面装驱动时会报错找不到内核头文件(最开始我直接装依赖了&#xff0c;以为高版本兼容低版本&#xff0c;然后装驱动时报错…

番外11:使用ADS对射频功率放大器进行非线性测试3(使用带宽5MHz的WCDMA信号进行ACLR测试)

番外11&#xff1a;使用ADS对射频功率放大器进行非线性测试3&#xff08;使用带宽5MHz的WCDMA信号进行ACLR测试&#xff09; 其他测试&#xff1a; 番外9&#xff1a;使用ADS对射频功率放大器进行非线性测试1&#xff08;以IMD3测试为例&#xff09; 番外10&#xff1a;使用AD…

前端工程构建问题汇总

1.less less-loader安装失败问题 npm install less-loader --save --legacy-peer-deps 加上–legacy-peer-deps就可以了 在NPM v7中&#xff0c;现在默认安装peerDependencies&#xff0c;这会导致版本冲突&#xff0c;从而中断安装过程。 –legacy-peer-deps标志是在v7中引…

MyBatis Plus Invalid bound statement 终极解决方案

MyBatis Plus Invalid bound statement 终极解决方案一、项目1.1 编码部分1.1.1 实体类1.1.2 dao层1.1.3 mapper.xml1.2 环境配置1.3 问题描述二、解决方案2.1 手动指定mapper.xml资源路径匹配规则2.2 使用mybatis自动配置2.3 测试效果三、附件一、项目 1.1 编码部分 1.1.1 实…

imx6ull_SPI

SPI简介 SPI 是Motorola 公司推出的一种同步串行接口技术&#xff0c;是一种高速、全双工的同步通信总线。SPI 以主从方式工作&#xff0c;通常是有一个主设备和一个或多个从设备&#xff0c;一般SPI 需要4 根线&#xff0c;但是也可以使用三根线(单向传输) 这四根线如下&…

torch函数合集

torch.tensor() 原型&#xff1a;torch.tensor(data, dtypeNone, deviceNone, requires_gradFalse) 功能&#xff1a;其中data可以是:list,tuple,NumPy,ndarray等其他类型,torch.tensor会从data中的数据部分做拷贝(而不是直接引用),根据原始数据类型生成相应类型的torch.Tenso…

RestTemplate Java调用restful接口

目录1. GET请求相关方法&#xff08;1&#xff09;getForObject&#xff08;2&#xff09;getForEntity2. POST请求相关设置请求头和请求体&#xff1a;HttpEntity&#xff08;1&#xff09;postForLocation&#xff08;2&#xff09;postForObject&#xff08;3&#xff09;po…