broadcast使用如下图,可以看到创建broadcast是val barr1 = sc.broadcast(arr1),使用broadcast是barr1.value
创建broadcast是使用的broadcastManager。
BroadcastManager
cachedValues变量是map结构,key是broadcastId,强引用,GC的时候不会回收,value是broadcast值,弱引用,GC的时候会进行回收。
创建新的broadcast值是生成新的broadcastId,再调用broadcastFactory的方法。
TorrentBroadcastFactory
创建新的broadcast值,就是生成了TorrentBroadcast对象。
TorrentBroadcast
_value是一个缓存,但是是软引用,会被GC回收。
numBlocks是broadcast写入block对应的数量,调用了writeBlocks将obj写入block。
writeBlocks
这个方法是在driver端运行的。首先将value缓存到driver的本地(MEMORY_AND_DISK),在driver运行的task可以直接读取这个值。再将value序列化、压缩后分块(默认4M),将分好的块也缓存下来(MEMORY_AND_DISK_SER),但是tellMaster=true,意思是将缓存结果通知给blockManagerMaster。
value
在task中获取broadcast值,使用value方法。
value方法是调用的getValue方法
getValue
优先从TorrentBroadcast对象的缓存_value中获取。如果取不到就调用readBroadcastBlock方法获取,得到的结果缓存到_value中。
readBroadcastBlock
先根据broadcastId从broadcastManager的cache中获取,如果获取不到就先读取本地block来获取值,返回的值缓存到broadcastManager的cache。
本地没获取到,就调用readBlocks从远端获取(driver或者其它executor)。获取到的结果缓存到本地block,同时缓存到broadcastManager的cache中
readBlocks
broadcast是分块进行存储的,所以也是分块进行获取。
根据pieceId优先从本地进行读取,如果没有值的话,就调用getRemoteBytes从远端进行读取,得到的piece值缓存到本地,并通知给blockManagerMaster,方便别的executor来获取。最后返回放有所有piece结果的blocks。
broadcast存储
分成两部分进行存储。整体存储是提供给运行在本地的task来直接获取,分piece存储是为了提供给远端task获取。
如果只有整体存储没有分块存储的话,一开始broadcast值只在driver端存储。task启动后,对应的executor都会从driver读取,这样就会有driver的io瓶颈。加上分块存储后,一开始broadcast值只在driver端存储,executor向driver请求piece(请求piece的顺序是打乱的),请求数据量不大,而且executor获取到piece后也会缓存到本地,提供给别的executor来请求,这样就将driver的压力分散到各个executor。
如果只有分块存储没有整体存储的话,本地读取也要按照piece读取再合并,效率比较低,因为一个executor上会运行很多task任务。其实我觉得去掉整体存储影响不大,因为广播变量值在内存还有两层缓存。