一. 前言
在Presto中,Join的类型主要分成Partitioned Join和Broadcast Join,在Presto 之Hash Join的Partition_王飞活的博客-CSDN博客 中已经介绍了Presto的Partitioned Join的实现过程,本文主要介绍Broadcast Join的实现。
二. Presto中Broadcast Join的实现
Broadcast的实现的过程如下:
-
Presto首先在上游Stage扫描到的小表数据全部广播到各个Presto中的所有Worker。
-
然后Presto会将大表分拆成多个Split分发到所有的Worker并行执行。
-
各个Worker扫描到大表的数据后,仅需要将自己扫描到的大表数据与小表进行Join碰撞,产生各个Worker独立的Join结果。
-
各个Worker独立Join结果的汇总则为整个Join的结果,数据既不会重复,也不会丢失。
三. Presto中小表广播的实现
与Partitioned Join相比,Broadcast的实现区别主要是在小表广播部分,后边的数据碰撞过程是一样的。在Presto中,所谓的广播,其实是所有worker中通过Exchange到上游的Stage数据,但是上游的Stage给各个Worker中都返回相同的数据,从而实现数据的广播而已。
但是我们知道,上游Stage其实给每个Worker分配的Buffer地址是不一样的,比如worker1的buffer为buffers[0], worker1对应的buffer地址为buffers[1]......依次类推,在Presto中是怎么实现将所有worker的buffer数据完全的拷贝复制的呢?
其实这主要依赖于在上游的Stage中使用了BroadcastOutputBuffer来实现的。在BroadcastOutputBuffer中,所有worker对应的buffer组合成一个buffer的List,BroadcastOutputBuffer中有数据进来时,在List中所有的buffer都add一遍。其主要的核心代码如下所示:
1. 如果对应的worker的buffer还没初始化,那么add进来的数据先在initialPagesForNewBuffers中保存。
2. 等下游的worker第一次过来拉取数据的时候,先初始化对应worker的buffer,并将initialPagesForNewBuffers中的数据放到buffer中去并返回给下游的worker。
3. 下游的worker的buffer初始化完成后,在BroadcastOutputBuffer中,如果后续再有数据进来, BroadcastOutputBuffer会在各个worker的buffer中都add一份,实现数据的复制,主要代码如下所示:
4. 上述2和3使得下游不同的worker到上游的Stage拉取数据的时候,都是一样的数据,且都是完整的数据,因此实现了数据广播的功能。