目录
一、基本转换算子
1.map
2.filter
3.flatMap
3.聚合算子Aggregation
(1)keyBy
(2)简单聚合:sum、min、minBy、max、maxBy
(3)归约聚合:reduce
二、UDF
三、富函数类
四、物理分区
1.随机分区(shuffle)
2. 轮询分区(Round-Robin)
3. 重缩放分区(rescale)
4.广播(broadcast)——不常用
5.全局分区(global)——不常用
6. 自定义分区(Custom)
一、基本转换算子
1.map
package com.atguigu.transform
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object MapTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 1000L),
Event("zhangsan", "./cart", 2000L))
// TODO 提取每次点击事件的用户名
// 1.使用匿名函数
stream.map(_.user).print("1:")
// 2.也可以实现MapFunction接口,需要定义一个类
stream.map(new UserExtractor).print("2:")
env.execute()
}
class UserExtractor extends MapFunction[Event,String] {
override def map(t: Event): String = {
t.user
}
}
}
// 结果:
1:> Mary
2:> Mary
1:> zhangsan
2:> zhangsan
2.filter
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object FilterTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 1000L),
Event("zhangsan", "./cart", 2000L))
// TODO 过滤出用户为Marry的点击事件
// 1.使用匿名函数
stream.filter(_.user == "Mary").print("1:")
// 2.也可以实现FilterFunction接口,需要定义一个类
stream.filter(new UserFilter).print("2:")
env.execute()
}
class UserFilter extends FilterFunction[Event] {
override def filter(t: Event): Boolean = t.user == "zhangsan"
}
}
// 结果:
1:> Event(Mary,./home,1000)
2:> Event(zhangsan,./cart,2000)
3.flatMap
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object FlatMapTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 1000L),
Event("zhangsan", "./cart", 2000L),
Event("lisi", "./cart", 3000L)
)
// TODO 找出用户Mary的点击记录
val value: DataStream[Event] = stream.flatMap(new FlatMapFunction[Event, Event] {
override def flatMap(t: Event, collector: Collector[Event]): Unit = {
if (t.user.equals("Mary")) {
collector.collect(t)
} else if (t.user.equals("zhangsan")) {
collector.collect(t)
}
}
})
value.print("1:")
// 结果:
1:> Event(Mary,./home,1000)
1:> Event(zhangsan,./cart,2000)
// TODO 采用自定义类实现
stream.flatMap(new MyFlatMap).print("2:")
// 结果:
2:> Mary
2:> zhangsan
2:> ./cart
env.execute()
}
// 自定义实现FlatMapFunction
class MyFlatMap extends FlatMapFunction[Event, String] {
override def flatMap(t: Event, collector: Collector[String]): Unit = {
// TODO 如果当前数据是Mary的点击事件,那么就直接输出User
if (t.user == "Mary") {
collector.collect(t.user)
} else if (t.user == "zhangsan") {
collector.collect(t.user)
collector.collect(t.url)
}
}
}
}
3.聚合算子Aggregation
(1)keyBy
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object KayByTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 1000L),
Event("wangwu", "./home", 1000L),
Event("zhaoliu", "./home", 1000L),
Event("xiaoming", "./home", 1000L),
Event("Mary", "./home", 1000L),
Event("zhangsan", "./cart", 2000L),
Event("zhangsan", "./home", 2000L),
Event("lisi", "./cart", 3000L)
)
// TODO 指定 Event 的 user 属性作为 key
// 方法一:
stream.keyBy(_.user).print()
// 方法二:
stream.keyBy(new MyKeySelector).print()
// 结果:相同的key放在同一个分区
3> Event(wangwu,./home,1000)
1> Event(xiaoming,./home,1000)
4> Event(lisi,./cart,3000)
2> Event(Mary,./home,1000)
3> Event(zhangsan,./cart,2000)
2> Event(zhaoliu,./home,1000)
3> Event(zhangsan,./home,2000)
2> Event(Mary,./home,1000)
env.execute()
}
class MyKeySelector extends KeySelector[Event, String] {
override def getKey(in: Event): String = in.user
}
}
(2)简单聚合:sum、min、minBy、max、maxBy
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object KayByTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 9000L),
Event("wangwu", "./id", 8000L),
Event("zhaoliu", "./home", 6000L),
Event("xiaoming", "./home", 1000L),
Event("Mary", "./cart", 10000L),
Event("zhangsan", "./cart", 2000L),
Event("zhangsan", "./home", 8000L),
Event("zhangsan", "./home", 6000L),
Event("lisi", "./cart", 3000L)
)
// TODO 指定 Event 的 user 属性作为 key
// TODO 简单聚合
// TODO sum():对值按key进行累加
stream.keyBy(_.user).sum("timestamp").print()
/*Event(Mary,./home,9000)
Event(wangwu,./home,8000)
Event(zhaoliu,./home,6000)
Event(xiaoming,./home,1000)
Event(Mary,./home,10000)
Event(zhangsan,./cart,8000)
Event(zhangsan,./cart,10000)
Event(lisi,./cart,3000)*/
// TODO minBy()则会返回包含字段最小值的整条数据。
stream.keyBy(_.user).minBy("timestamp").print()
/*Event(Mary,./home,9000)
Event(wangwu,./id,8000)
Event(zhaoliu,./home,6000)
Event(xiaoming,./home,1000)
Event(Mary,./home,9000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./cart,2000)
Event(lisi,./cart,3000)*/
// TODO min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值
stream.keyBy(_.user).min("timestamp").print()
/*Event(Mary,./home,9000)
Event(wangwu,./id,8000)
Event(zhaoliu,./home,6000)
Event(xiaoming,./home,1000)
Event(Mary,./home,9000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./cart,2000)
Event(lisi,./cart,3000)*/
// TODO maxBy:则会返回包含字段最大值的整条数据。
stream.keyBy(_.user).maxBy("timestamp").print()
/* Event(Mary,./home,9000)
Event(wangwu,./id,8000)
Event(zhaoliu,./home,6000)
Event(xiaoming,./home,1000)
Event(Mary,./cart,10000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./home,8000)
Event(zhangsan,./home,8000)
Event(lisi,./cart,3000)*/
// TODO max:只计算指定字段的最大值,其他字段会保留最初第一个数据的值
stream.keyBy(_.user).max("timestamp").print()
// 结果出现了两次Event(Mary,./home,9000)说明第一次读取的数据是最大值,第二次读取的数据比前一次小,就返回前一次读取的内容
// 并且会返回的内容会被更新
/*Event(Mary,./home,9000)
Event(wangwu,./id,8000)
Event(zhaoliu,./home,6000)
Event(xiaoming,./home,1000)
Event(Mary,./home,10000)
Event(zhangsan,./cart,2000)
Event(zhangsan,./cart,8000)
Event(zhangsan,./cart,8000)
Event(lisi,./cart,3000)*/
env.execute()
}
class MyKeySelector extends KeySelector[Event, String] {
override def getKey(in: Event): String = in.user
}
}
(3)归约聚合:reduce
import com.atguigu.chapter05.Event
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
object ReduceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 9000L),
Event("zhaoliu", "./home", 6000L),
Event("Mary", "./cart", 10000L),
Event("zhangsan", "./cart", 2000L),
Event("zhangsan", "./home", 8000L),
Event("zhangsan", "./home", 6000L),
)
// TODO reduce归约聚合,求最活跃的用户
stream.map(x=>(x.user, 1L))// 长整型统计
.keyBy(_._1)
.reduce((x, y) => (x._1, x._2 + y._2))
.keyBy(_ => true) // 将所有数据按照同样的key分到同一个组中
.reduce((x, y) => if (x._2 > y._2) x else y) // 选取当前最活跃的用户
.print()
/* (Mary,1)
(zhaoliu,1)
(Mary,2)
(Mary,2)
(zhangsan,2)
(zhangsan,3)*/
env.execute()
}
}
二、UDF
对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。 所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._
object UdfTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 9000L),
Event("zhaoliu", "./home", 6000L),
Event("Mary", "./cart", 10000L),
Event("zhangsan", "./cart", 2000L),
Event("zhangsan", "./home", 8000L),
Event("zhangsan", "./home", 6000L)
)
// TODO 测试UDF的用法,筛选url中包含某个关键字home的Event事件
// TODO 方法一:实现一个自定义的函数类
stream.filter(new MyFilterFunction).print()
// TODO 方法二:
stream.filter(new FilterFunction[Event] {
override def filter(t: Event): Boolean = t.url.contains("home")
}).print()
// TODO 方法三:
stream.filter(new MyFilterFunction2("home")).print()
env.execute()
}
class MyFilterFunction() extends FilterFunction[Event] {
override def filter(t: Event): Boolean = t.url.contains("home")
}
class MyFilterFunction2(url: String) extends FilterFunction[Event] {
override def filter(t: Event): Boolean = t.url.contains(url)
}
}
三、富函数类
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object RichFunctionTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
env.setParallelism(2)
val stream: DataStream[Event] = env.fromElements(
Event("Mary", "./home", 9000L),
Event("zhaoliu", "./home", 6000L),
Event("Mary", "./cart", 10000L),
Event("zhangsan", "./cart", 2000L),
Event("zhangsan", "./home", 8000L),
Event("zhangsan", "./home", 6000L),
)
// TODO 自定义一个RichMapFunction,测试富函数类的功能
stream.map(new MyRichMapFunction).print()
// 当并行度为1时:
/*索引号为:0的任务开始
9000
6000
10000
2000
8000
6000
索引号为:0的任务结束*/
// 当并行度为2时:
/*
索引号为:0的任务开始
索引号为:1的任务开始
2> 9000
2> 10000
1> 6000
2> 8000
1> 2000
1> 6000
索引号为:1的任务结束
索引号为:0的任务结束*/
env.execute()
}
class MyRichMapFunction extends RichMapFunction[Event, Long] {
override def open(parameters: Configuration): Unit = {
println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务开始")
}
override def close(): Unit = {
println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务结束")
}
override def map(in: Event): Long = in.timestamp
}
}
四、物理分区
keyBy()操作就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个key的数据具体会分到哪一区去,这些是完全无从控制的,这时,为了避免出现上述这种数据倾斜的现象,我们就要手动地对数据进行物理分区。
所谓物理分区就是人为控制分区策略,精准地调配数据,告诉每个数据到底去哪里,重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。
1.随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随机地分配到下游算子的并行任务中去。 随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。
shuffle实现:
import com.atguigu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
object ShuffleTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// TODO 读取自定义的数据源
val stream: DataStream[Event] = env.addSource(new ClickSource)
// TODO 洗牌之后打印输出
stream.shuffle.print().setParallelism(4)
env.execute()
}
}
2> Event(Cary,./fav,1681456335725)
2> Event(Bob,./cart,1681456336737)
3> Event(Bob,./pord?id=1,1681456337746)
3> Event(Cary,./pord?id=2,1681456338754)
1> Event(Cary,./cart,1681456339760)
2> Event(Alice,./pord?id=2,1681456340767)
3> Event(Alice,./cart,1681456341773)
2> Event(Alice,./home,1681456342781)
4> Event(Marry,./home,1681456343789)
1> Event(Bob,./fav,1681456344796)
1> Event(Marry,./pord?id=1,1681456345805)
ClickSource类:
import org.apache.flink.streaming.api.functions.source.SourceFunction
import java.util.Calendar
import scala.util.Random
class ClickSource extends SourceFunction[Event] {
// 标志位
var running = true
override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {
// 随机数生成器
val random = new Random()
val users: Array[String] = Array("Marry", "Alice", "Bob", "Cary")
val urls: Array[String] = Array("./home", "./cart", "./fav", "./pord?id=1", "./pord?id=2", "./pord?id=3")
// 用标志位作为循环判断条件,不停地发出数据
while (running) {
val event: Event = Event(users(random.nextInt(users.length)), urls(random.nextInt(urls.length)), Calendar.getInstance.getTimeInMillis)
// 调用sourceContext方法向下游发送数据
sourceContext.collect(event)
// 每隔1s发送一条数据
Thread.sleep(1000)
}
}
override def cancel(): Unit = running = false
}
2. 轮询分区(Round-Robin)
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance()使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
import com.atguigu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
object ReblanceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// TODO 读取自定义的数据源
val stream: DataStream[Event] = env.addSource(new ClickSource)
// TODO 下面两种写法都是轮询分区
stream.print().setParallelism(4)
stream.rebalance.print("reblance").setParallelism(4)
// 结果:
reblance:1> Event(Cary,./pord?id=1,1681456987398)
reblance:2> Event(Cary,./pord?id=1,1681456988408)
reblance:3> Event(Cary,./home,1681456989417)
reblance:4> Event(Alice,./pord?id=3,1681456990424)
reblance:1> Event(Bob,./cart,1681456991437)
reblance:2> Event(Cary,./pord?id=3,1681456992438)
reblance:3> Event(Bob,./cart,1681456993445)
reblance:4> Event(Cary,./cart,1681456994456)
reblance:1> Event(Marry,./home,1681456995468)
reblance:2> Event(Cary,./home,1681456996480)
reblance:3> Event(Marry,./pord?id=2,1681456997488)
env.execute()
}
}
3. 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。也就是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
package com.atguigu.partition import com.atguigu.chapter05.{ClickSource, Event} import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ object RescaleTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // TODO 读取自定义的数据源 // TODO 定义2次输出,相当于2个数据源,分到4个分区中 val stream: DataStream[Int] = env.addSource(new RichParallelSourceFunction[Int] { override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = { for (i <- 0 to 7) { if (getRuntimeContext.getIndexOfThisSubtask == (i + 1) % 2) { // TODO 奇数分到3和4分区;偶数分到1和2分区 sourceContext.collect(i + 1) } } } override def cancel(): Unit = ??? }).setParallelism(2) // TODO 轮询分区 stream.rebalance.print("rescale").setParallelism(4) rescale:1> 4 rescale:2> 6 rescale:4> 2 rescale:3> 8 rescale:4> 5 rescale:1> 7 rescale:3> 3 rescale:2> 1 // TODO 重缩放分区 stream.rescale.print("rescale").setParallelism(4) rescale:4> 3 rescale:3> 1 rescale:1> 2 rescale:2> 4 rescale:1> 6 rescale:3> 5 rescale:4> 7 rescale:2> 8 env.execute() } }
注意:rebalance与rescale的区别:
4.广播(broadcast)——不常用
经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
// TODO 读取自定义的数据源 val stream: DataStream[Event] = env.addSource(new ClickSource) // TODO 广播分区:每隔1秒一次性输出4个数据,且4个并行子任务输出的内容是一样的,特殊场合下才会使用 stream.broadcast.print("broadcast").setParallelism(4) broadcast:3> Event(Cary,./pord?id=1,1681461381359) broadcast:2> Event(Cary,./pord?id=1,1681461381359) broadcast:1> Event(Cary,./pord?id=1,1681461381359) broadcast:4> Event(Cary,./pord?id=1,1681461381359) broadcast:1> Event(Alice,./fav,1681461382364) broadcast:2> Event(Alice,./fav,1681461382364) broadcast:3> Event(Alice,./fav,1681461382364) broadcast:4> Event(Alice,./fav,1681461382364)
5.全局分区(global)——不常用
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
// TODO 读取自定义的数据源 val stream: DataStream[Event] = env.addSource(new ClickSource) // TODO 全局分区:每一次只生成一条数据,而且每一条数据都在第1个分区 stream.global.print().setParallelism(4) 1> Event(Cary,./home,1681461414206) 1> Event(Bob,./home,1681461415222) 1> Event(Alice,./fav,1681461416225) 1> Event(Alice,./home,1681461417236) 1> Event(Bob,./fav,1681461418242)
6. 自定义分区(Custom)
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与keyBy指定key基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个KeySelector接口。
import org.apache.flink.api.common.functions.Partitioner import org.apache.flink.streaming.api.scala._ object CustomTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // TODO 读取自定义的数据源 val stream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8) val ds: DataStream[Int] = stream.partitionCustom(new Partitioner[Int] { override def partition(k: Int, i: Int): Int = { k % 2 } }, x => x) ds.print("partitionCustom").setParallelism(4) // 结果: partitionCustom:1> 2 partitionCustom:2> 1 partitionCustom:1> 4 partitionCustom:2> 3 partitionCustom:1> 6 partitionCustom:2> 5 partitionCustom:1> 8 partitionCustom:2> 7 env.execute() } }