一、分区规则(DataStream Broadcast)和广播变量(Flink Broadcast)
1.1 DataStream Broadcast(分区规则)
分区规则是把元素广播给所有的分区,数据会被重复处理。
DataStream.broadcast()
1.2 Flink Broadcast(广播变量)
类似于Spark广播变量,广播的数据是Dataset,接收广播的也是Dataset
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object BroadCastTest1 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment //创建批处理执行环境
val broadcastData = List(("zs", 18), ("ls", 28), ("ww", 38)) // 创建要广播的dataset
val tupleData = env.fromCollection(broadcastData)
val toBroadcastData = tupleData.map(tup => {
Map(tup._1->tup._2)
})
val text: DataSet[String] = env.fromElements("zs", "ls", "ww") //创建接收广播的dataset
val result = text.map(new RichMapFunction[String, String] {
var listData: java.util.List[Map[String, Int]] = null
var allMap = Map[String, Int]()
override def open(parameters: Configuration): Unit = {
this.listData = getRuntimeContext.getBroadcastVariable[Map[String, Int]]("bd") //获取broadcast
val it = listData.iterator()
while(it.hasNext) {
val next = it.next()
allMap = allMap.++(next)
}
}
override def map(value: String): String = {
val age = allMap.getOrElse(value, 1)
value + ", " + age
}
}).withBroadcastSet(toBroadcastData, "bd")
result.print
env.execute()
}
}