背景
在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出
副输出
本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:
package wikiedits.processfunc.job;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;
import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;
public class SideOutPutJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> readings = see.addSource(new SensorSource());
SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());
// 打印附输出
monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();
// 打印主输出
monitoredReadings.print();
see.execute();
}
}
package wikiedits.processfunc.process;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import wikiedits.processfunc.pojo.SensorReading;
public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {
private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
if (value.temperature < 32.0) {
ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);
}
out.collect(value);
}
}
package wikiedits.processfunc.source;
/*
* Copyright 2015 Fabian Hueske / Vasia Kalavri
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;
import java.util.Calendar;
import java.util.Random;
/**
* Flink SourceFunction to generate SensorReadings with random temperature values.
*
* Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.
*
* Note: This is a simple data-generating source function that does not checkpoint its state.
* In case of a failure, the source does not replay any data.
*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {
// flag indicating whether source is still running
private boolean running = true;
/** run() continuously emits SensorReadings by emitting them through the SourceContext. */
@Override
public void run(SourceContext<SensorReading> srcCtx) throws Exception {
// initialize random number generator
Random rand = new Random();
// look up index of this parallel task
int taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
// initialize sensor ids and temperatures
String[] sensorIds = new String[10];
double[] curFTemp = new double[10];
for (int i = 0; i < 10; i++) {
sensorIds[i] = "sensor_" + (taskIdx * 10 + i);
curFTemp[i] = 65 + (rand.nextGaussian() * 20);
}
while (running) {
// get current time
long curTime = Calendar.getInstance().getTimeInMillis();
// emit SensorReadings
for (int i = 0; i < 10; i++) {
// update current temperature
curFTemp[i] += rand.nextGaussian() * 0.5;
// emit reading
srcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));
}
// wait for 100 ms
Thread.sleep(3000);
}
}
/** Cancels this SourceFunction. */
@Override
public void cancel() {
this.running = false;
}
}
程序运行结果: