系列文章目录
Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
文章目录
- 系列文章目录
- 前言
- 一、基本处理函数(ProcessFunction)
- 二、按键分区处理函数(KeyedProcessFunction)
- 1.处理时间定时服务
- 2.事件时间定时服务
- 三、TopN案例
- 1.ProcessAllWindowFunction
- 2.KeyedProcessFunction
- 总结
前言
处理函数
简单来时就是比DataStream API更加底层的函数,能够处理更加复杂的问题
创建scala文件
一、基本处理函数(ProcessFunction)
我们用它来实现一个简单的Map操作,如果点击用户是Marry就输出用户名,是Alice就输出用户名+url
ProcessFunction.scala
package com.atguigu.chapter04
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
stream.process(new ProcessFunction[Event,String] {
override def processElement(value: Event, ctx: ProcessFunction[Event, String]#Context, out: Collector[String]): Unit = {
if (value.user.equals("Mary"))
out.collect(value.user)
else if (value.user.equals("Alice")){
out.collect(value.user+value.url)
}
}
}).print()
env.execute()
}
}
二、按键分区处理函数(KeyedProcessFunction)
在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。
1.处理时间定时服务
主要是在数据到达一段时间后进行数据操作
ProcessingTimeTimerTest.scala
package com.atguigu.chapter04
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object ProcessingTimeTimerTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
stream.keyBy(data=>true)
.process(new KeyedProcessFunction[Boolean,Event,String] {
override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {
val currenTime: Long = ctx.timerService().currentProcessingTime()
out.collect("数据到达,当前时间是:"+currenTime)
//注册一个5秒之后的定时器
ctx.timerService().registerProcessingTimeTimer(currenTime+5*1000)
}
//执行逻辑
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("定时器触发,触发时间为:"+timestamp)
}
}).print()
env.execute()
}
}
2.事件时间定时服务
在数据产生一段时间后进行处理
EventTimeTimeTest.scala
package com.atguigu.chapter04
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object EventTimeTimeTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new CustomSource)
.assignAscendingTimestamps(_.timestamp)
stream.keyBy(data=>true)
.process(new KeyedProcessFunction[Boolean,Event,String] {
override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {
val currenTime: Long = ctx.timerService().currentWatermark()
out.collect(s"数据到达,当前时间是: $currenTime,当前数据时间戳是:${value.timestamp}")
//注册一个5秒之后的定时器
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+5*1000)
}
//执行逻辑
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("定时器触发,触发时间为:"+timestamp)
}
}).print()
env.execute()
}
class CustomSource extends SourceFunction[Event]{
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
ctx.collect(Event("Mary","./home",1000L))
Thread.sleep(5000)
ctx.collect(Event("Mary","./home",2000L))
Thread.sleep(5000)
ctx.collect(Event("Mary","./home",6000L))
Thread.sleep(5000)
ctx.collect(Event("Mary","./home",6001L))
Thread.sleep(5000)
}
override def cancel(): Unit = ???
}
}
三、TopN案例
1.ProcessAllWindowFunction
TopNProcessAllWindowExample.scala
package com.atguigu.chapter04
import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable
object TopNProcessAllWindowExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
// 直接开窗统计
stream.map(_.url)
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
// 1.统计每个url的访问次数
// 初始化Map (url,count)
val urlCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()
elements.foreach(
data => urlCountMap.get(data) match {
case Some(count) => urlCountMap.put(data, count + 1)
case None => urlCountMap.put(data, 1)
}
)
//2.对数据进行排序提取
val urlCountList: List[(String, Long)] = urlCountMap.toList.sortBy(-_._2).take(2)
//3.包装信息打印输出
val result = new mutable.StringBuilder()
result.append(s"=========窗口: ${context.window.getStart} - ${context.window.getEnd}=======\n")
for (i <- urlCountList.indices){
val tuple: (String, Long) = urlCountList(i)
result.append(s"浏览量TopN ${i+1}")
.append(s"url: ${tuple._1} ")
.append(s"浏览量是: ${tuple._2} \n")
}
out.collect(result.toString())
}
}).print()
env.execute()
}
}
2.KeyedProcessFunction
TopNkeyProcessFunctionExample.scala
package com.atguigu.chapter04
import com.atguigu.chapter02.Source.{ClickSource, Event}
import com.atguigu.chapter03.UrlViewCount
import com.atguigu.chapter03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
import scala.collection.mutable
object TopNkeyProcessFunctionExample {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream: DataStream[Event] = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)
// 1.结合使用增量聚合函数和全窗口函数,统计每个url的访问频次
val urlCountStream: DataStream[UrlViewCount] = stream.keyBy(_.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
// 2.按照窗口信息进行分组提取,排序输出
val resultStream: DataStream[String] = urlCountStream.keyBy(_.windowEnd)
.process(new TopN(2))
resultStream.print()
env.execute()
}
class TopN(n: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {
// 声明列表状态
var urlViewCountListState: ListState[UrlViewCount] = _
override def open(parameters: Configuration): Unit = {
urlViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("list-state", classOf[UrlViewCount]))
}
override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
//每来一个数据,就直接放入ListState中
urlViewCountListState.add(value)
//注册一个窗口结束时间1ms之后的定时器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 先把数据提取出来放到List里
val urlViewCountList: List[UrlViewCount] = urlViewCountListState.get().toList
val topnList: List[UrlViewCount] = urlViewCountList.sortBy(-_.count).take(n)
//结果包装输出
val result = new mutable.StringBuilder()
result.append(s"=========窗口: ${timestamp - 1 - 10000} - ${timestamp - 1}=======\n")
for (i <- topnList.indices) {
val urlViewCount = topnList(i)
result.append(s"浏览量Top ${i + 1} ")
.append(s"url: ${urlViewCount.url} ")
.append(s"浏览量是: ${urlViewCount.count} \n")
}
out.collect(result.toString())
}
}
}
总结
有关Flink底层处理函数的Api就到这里.