目录
一、Flink中的状态
二、状态编程
(一)ValueState案例——判断传感器的数据
1.代码实现
2.端口进行传输数据
3.运行结果
(二)ListState
(三)MapState案例——比较学生每次考试成绩
1.代码实现
2.端口传输学生成绩
3.运行结果
(四)ReducingState
一、Flink中的状态
在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。
例如,监测温度的变化趋势的时候,如果现在的温度与上一秒的温度不一样,就说明处于不同的状态。
二、状态编程
(一)ValueState案例——判断传感器的数据
1.代码实现
import source.SensorReading
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object TransformTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777) //加载集合数据源*/
val dataStream: DataStream[SensorReading] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
val alterStream: DataStream[(String, Double, Double)] = dataStream.keyBy(_.id)
.flatMap(new ChangeAlert)
alterStream.print()
env.execute()
}
}
class ChangeAlert extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
// TODO 定义状态对象,保存上一次的温度值
// TODO "last-temp"是指给当前状态在运行程序的上下文中起名 后面的classOf[Double]是要指明last-temp的类型
// TODO 关键字要改为lazy,等到使用的时候才创建对象
lazy val lastState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
// TODO 判断机器是否为第一次开启
lazy val firstState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("first-start", classOf[Int]))
// lastState中有value()——取数和update()——更新 两个方法
override def flatMap(value: SensorReading, out: Collector[
(String, Double, Double)]): Unit = {
// 首先,机器开启,判断是否为第一次开启,如果不是第一次开启,就进行温度判断,如果是第一次开启,就默认温度为0.0
val firstValue: Int = firstState.value()
val lastValue: Double = lastState.value()
val dif: Double = (lastValue - value.temperature).abs
if (firstValue != 0) {
// 取到上一次状态中的值
// TODO 对比这一次的值与上一次的温度的差
if (dif > 10)
// 如果差值>10,就输出
out.collect((value.id, lastValue, value.temperature))
} else {
// 如果机器第一次开启,就更新机器状态
firstState.update(1)
if (dif > 10)
out.collect((value.id, lastValue, value.temperature))
}
// 每次新来一个温度值,原有的温度状态就要更新
lastState.update(value.temperature)
}
}
2.端口进行传输数据
3.运行结果
(二)ListState
class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
lazy val listState: ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String]("liststate", classOf[String]))
override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
val strings: lang.Iterable[String] = listState.get()
listState.add("hello")
val ls = new util.ArrayList[String]()
ls.add("html")
ls.add("flink")
listState.addAll(ls)
listState.update(ls)
}
}
(三)MapState案例——比较学生每次考试成绩
1.代码实现
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import java.util.Map
import java.util
/**
* 状态编程判断学生成绩
*/
object StateTest2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("ant168", 7777)
// TODO 对成绩进行清洗
val dataStream: DataStream[Test] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
Test(arr(0).trim, arr(1).trim, arr(2).trim.toDouble)
})
val scoreStream: DataStream[(String, String, Double, Double)] = dataStream
.keyBy(data => (data.id, data.subject))
.flatMap(new MyTest)
scoreStream.print()
env.execute("state test")
}
}
// 定义一个样例类,表示测试结果
case class Test(id: String, subject: String, score: Double)
class MyTest extends RichFlatMapFunction[Test, (String, String, Double, Double)] {
lazy val mapState: MapState[(String, String), Double] = getRuntimeContext
.getMapState(new MapStateDescriptor[(String, String), Double]("map-temp", classOf[(String, String)], classOf[Double]))
override def flatMap(value: Test,
out: Collector[(String, String, Double, Double)]): Unit = {
// 放入第一次的成绩
val previousScore: Double = Option(mapState.get(value.id, value.subject)).getOrElse(0.0)
mapState.put((value.id, value.subject), previousScore)
// 获取第二次的成绩
val iter: util.Iterator[Map.Entry[(String, String), Double]] = mapState.iterator()
while (iter.hasNext) {
val unit: Map.Entry[(String, String), Double] = iter.next()
val key: (String, String) = unit.getKey// 第一次考试的id,subject
val value1: Double = unit.getValue// 第一次开始的score
val dif: Double = (previousScore - value.score).abs
if (dif >= 10) {
out.collect((key._1, key._2, previousScore, value.score))
}
mapState.put((key._1, key._2), value.score)
}
}
}
2.端口传输学生成绩
3.运行结果
(四)ReducingState
class MyRichFunction extends RichFlatMapFunction[SensorReading, String] {
lazy val reducingState: ReducingState[SensorReading] = getRuntimeContext
.getReducingState(new ReducingStateDescriptor[SensorReading]("reducestate",
new MyReduceFunction2, classOf[SensorReading]))
override def flatMap(value: SensorReading, out: Collector[String]): Unit = {
val reading: SensorReading = reducingState.get()
reducingState.add(reading)
}
}