Flink的处理函数——processFunction

news2025/1/20 18:19:05

目录

一、处理函数概述       

二、Process函数分类——8个

(1)ProcessFunction

(2)KeyedProcessFunction

(3)ProcessWindowFunction

(4)ProcessAllWindowFunction

(5)CoProcessFunction

(6)ProcessJoinFunction

(7)BroadcastProcessFunction

(8)KeyedBroadcastProcessFunction

三、KeyedProcessFunction案例

1.运行processElement方法中的事件时间

(1)输入数据

2.运行processElement方法中的处理时间

(1)先输入一条数据

(2)再快速输入两条数据

3.运行processElement方法中的水位线

(1)输入数据

4.总结:

四、应用案例——求TopN

(一)思路一:hashmap

(二)思路二:keyby

(三)思路三:使用侧输出流——推荐


一、处理函数概述       

        在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

 

二、Process函数分类——8个

Flink提供了8个不同的处理函数:

(1)ProcessFunction

最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

(2)KeyedProcessFunction

对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

(3)ProcessWindowFunction

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

(4)ProcessAllWindowFunction

同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

(5)CoProcessFunction

合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

(6)ProcessJoinFunction

间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

(7)BroadcastProcessFunction

广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。

(8)KeyedBroadcastProcessFunction

按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

三、KeyedProcessFunction案例

        基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。

stream.keyBy( t -> t.f0 )
       .process(new MyKeyedProcessFunction())

        类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。

代码示例:

import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node141", 9999)
                .map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序
                // 提取watermark的时间戳
                .withTimestampAssigner((element, recordTimestamp) ->
                        element.getTs() * 1000L
                );

        SingleOutputStreamOperator<WaterSensor> sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        KeyedStream<WaterSensor, String> sensorKS = sensorDSWithWatermark.keyBy(WaterSensor::getId);
        // key    输入的类型     输出的类型
        SingleOutputStreamOperator<String> process = sensorKS.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据调用一次
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取当前数据的key
                        String currentKey = ctx.getCurrentKey();

                        // TODO 1.定时器注册
                        TimerService timerService = ctx.timerService();

                        // 1、事件时间的案例
//                        Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
//                        timerService.registerEventTimeTimer(5000L);
//                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");

                        // 2、处理时间的案例
                        long currentTs = timerService.currentProcessingTime();
                        timerService.registerProcessingTimeTimer(currentTs + 5000L);
                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");

                        // 3、获取 process的 当前watermark
//                        long currentWatermark = timerService.currentWatermark();
//                        System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);


                        // 注册定时器: 处理时间、事件时间
//                        timerService.registerProcessingTimeTimer();
//                        timerService.registerEventTimeTimer();

                        // 删除定时器: 处理时间、事件时间
//                        timerService.deleteEventTimeTimer();
//                        timerService.deleteProcessingTimeTimer();

                        // 获取当前时间进展: 处理时间-当前系统时间,  事件时间-当前watermark
//                        long currentTs = timerService.currentProcessingTime();
//                        long wm = timerService.currentWatermark();
                    }

                    /**
                     * TODO 2.时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展,就是定时器被触发时的时间
                     * @param ctx       上下文
                     * @param out       采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String currentKey = ctx.getCurrentKey();

                        System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");
                    }
                }
        );

        process.print();

        env.execute();
    }
}

1.运行processElement方法中的事件时间

// 1、事件时间的案例
Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间
timerService.registerEventTimeTimer(5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");
(1)输入数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s1,3,3
s2,4,4
s2,5,5
s3,9,9

运行结果:注册定时器后,5s后触发

 

2.运行processElement方法中的处理时间

// 2、处理时间的案例
long currentTs = timerService.currentProcessingTime();
timerService.registerProcessingTimeTimer(currentTs + 5000L);
System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");
(1)先输入一条数据
[root@node141 ~]# nc -lk 9999
s1,1,1

运行结果:相差5s

(2)再快速输入两条数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s2,2,2

运行结果:相差5s,5s后定时器触发

3.运行processElement方法中的水位线

// 3、获取 process的 当前watermark
long currentWatermark = timerService.currentWatermark();
System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);
(1)输入数据
[root@node141 ~]# nc -lk 9999
s1,1,1
s1,5,5
s1,9,9

运行结果:

数据经过map算子,然后再到process方法中。

        当第一条数据输入时,进入map时的watermark是1s-3s-1ms=-2001ms,数据随后进入process方法,调用了processElement方法,在processElement方法中获取当前的watermark,此时-2001ms这一watermark还没有进入process中,所以当前process的watermark是long的最小值;

        第一条数据处理完成后,第二条继续输入到map中,此时process中的watermark变为-2001ms,而map中的watermark是5s-3s-1ms=1999ms,同样,数据再进入process中,调用processElement方法,此时1999ms这一watermark仍然还没有进入process中,所以当前process中的watermark是之前的-2001ms;

       第二条数据处理完成后,第三条数据继续输入到map中,此时process中的watermark变为1999ms,而map中的watermark是9s-3s-1ms=5999ms,同样,数据再进入process中,调用processElement方法,此时5999ms这一watermark也没有来得及进入process中,所以当前process中的watermark是之前的1999ms。

4.总结:

TODO 定时器
1、keyed才有
2、事件时间定时器,通过watermark来触发的
      watermark >= 注册的时间
      注意: watermark = 当前最大事件时间 - 等待时间 -1ms, 因为 -1ms,所以会推迟一条数据
       比如, 5s的定时器,
       如果 等待=3s, watermark = 8s - 3s -1ms = 4999ms,不会触发5s的定时器
       需要 watermark = 9s -3s -1ms = 5999ms ,才能去触发 5s的定时器
3、在process中获取当前watermark,显示的是上一次的watermark
      ==> 因为process还没接收到这条数据对应生成的新watermark

四、应用案例——求TopN

(一)思路一:hashmap

使用所有数据到一起,用hashmap来存储,key=vc,value=count值

import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

public class TopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node141", 9999)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序
                                .withTimestampAssigner((element, recordTimestamp) ->
                                        element.getTs() * 1000L
                                ));

        // 最近10s=窗口长度  每5s输出=滑动步长
        // TODO 思路一:使用所有数据到一起,用hashmap来存储,key=vc,value=count值

        sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .process(new MyTopNPawf())
                .print();

        env.execute();
    }

    public static class MyTopNPawf extends ProcessAllWindowFunction<WaterSensor, String, TimeWindow> {

        @Override
        public void process(
                ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
            // 定义一个hashmap用来存,key=vc,value=count值
            Map<Integer, Integer> vcCountMap = new HashMap<>();
            // 1.遍历数据,统计各个vc出现的次数
            for (WaterSensor element : elements) {
                Integer vc = element.getVc();
                if (vcCountMap.containsKey(vc)) {
                    // 1.1 key存在,不是这个key的第一条数据,直接累加
                    vcCountMap.put(vc, vcCountMap.get(vc) + 1);
                } else {
                    // 1.2 如果key不存在,则初始化
                    vcCountMap.put(vc, 1);
                }
            }

            // 2.对count值进行排序,利用list来实现排序
            List<Tuple2<Integer, Integer>> datas = new ArrayList<>();
            // 2.1 对list进行排序,根据count值进行降序
            for (Integer vc : vcCountMap.keySet()) {
                datas.add(Tuple2.of(vc, vcCountMap.get(vc)));
            }
            // count值相减
            datas.sort((o1, o2) -> o2.f1 - o1.f1);

            // 3.取出count最大的2个vc
            StringBuffer outStr = new StringBuffer();

            outStr.append("\n=======================\n");
            for (int i = 0; i < Math.min(2, datas.size()); i++) {
                Tuple2<Integer, Integer> vcCount = datas.get(i);
                outStr.append("窗口开始时间:" + DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss.SSS"));
                outStr.append("\n");
                outStr.append("top:" + (i + 1));
                outStr.append("\n");
                outStr.append("vc=" + vcCount.f0);
                outStr.append("\n");
                outStr.append("count=" + vcCount.f1);
                outStr.append("\n");
                outStr.append("窗口结束时间:" + DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS\n"));
            }
            outStr.append("\n=======================\n");

            // 输出
            out.collect(outStr.toString());
        }
    }
}

输入数据:

[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,1
s1,3,3
s1,6,1
s1,8,2
s1,9,3
s1,10,2
s1,11,1
s1,13,2

运行结果:

 

思路一不推荐,因为要将数据攒到一起才会计算,效果不好。

(二)思路二:keyby

代码思路:

 

实现步骤:
1、按照vc做keyby,开窗,分别count
   ==》 增量聚合,计算 count
   ==》 全窗口,对计算结果 count值封装 ,  带上 窗口结束时间的 标签
         ==》 为了让同一个窗口时间范围的计算结果到一起去
2、对同一个窗口范围的count值进行处理: 排序、取前N个
   =》 按照 windowEnd做keyby
   =》 使用process, 来一条调用一次,需要先存,分开存,用HashMap进行存储,key=windowEnd,value=List
     =》 使用定时器,对 存起来的结果 进行 排序、取前N个

import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KeyedProcessFunctionTopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node141", 9999)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序
                                .withTimestampAssigner((element, recordTimestamp) ->
                                        element.getTs() * 1000L
                                ));

        // 最近10s=窗口长度  每5s输出=滑动步长
        // TODO 使用keyedProcessFunction实现

        // 1.按照vc分组、开窗、聚合(增量计算+全量打标签)
        // 开窗聚合后,就是普通的流,没有了窗口信息,需要自己打上窗口的标记 windowEnd
        SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg = sensorDS
                .keyBy(WaterSensor::getVc)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(
                        new VcCountAgg(),
                        new WindowResult()
                );

        // 2.按照窗口标签(窗口结束时间)keyby,保证同一个窗口时间范围的结果,到一起去,排序、取TopN
        windowAgg
                .keyBy(r -> r.f2)
                .process(new TopN(2))
                .print();

        env.execute();
    }

    public static class VcCountAgg implements AggregateFunction<WaterSensor, Integer, Integer> {

        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            return accumulator + 1;
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return null;
        }
    }

    /**
     * 泛型如下:
     * 第一个:输入类型=增量函数的输出 count值,Integer
     * 第二个:输出类型=Tuple3<vc,count,windowEnd>,带上 窗口结束时间的标签
     * 第三个:key的类型:vc,Integer
     * 第四个:窗口类型
     */
    public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {
        @Override
        public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
            // 迭代器里面只有一条数据,next一次即可
            Integer count = elements.iterator().next();
            long windowEnd = context.window().getEnd();
            out.collect(Tuple3.of(key, count, windowEnd));
        }
    }

    /**
     * 泛型如下:
     * 第一个:key的类型,是windowEnd
     * 第二个:输入的类型,三元组Tuple3<vc,count,windowEnd>,带上 窗口结束时间的标签
     * 第三个:打印输出结果
     */
    public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {
        private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;

        // 要取的top数量
        private int threshold;

        public TopN(int threshold) {
            this.threshold = threshold;
            dataListMap = new HashMap<>();
        }

        @Override
        public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            // 进入这个方法,只是一条数据,要排序,必须等数据到齐才行,将不同窗口的数据用hashmap分开存起来,
            // todo  1.存到hashmap中  注意此时的key是窗口的结束时间
            Long windowEnd = value.f2;
            if (dataListMap.containsKey(windowEnd)) {
                // 1.1 包含vc,不是该vc的第一条,直接添加到list中
                List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
                dataList.add(value);// 也就是说只需要把value的值添加到dataListMap中即可
            } else {
                // 1.2 不包含,是该vc的第一条,需要初始化list
                List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
                dataList.add(value);
                dataListMap.put(windowEnd, dataList);
            }

            // todo 2.注册一个定时器,windowEnd+1ms即可
            // 同一个窗口范围,应该同时输出
            ctx.timerService().registerEventTimeTimer(windowEnd + 1);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 定时器触发,同一个窗口范围的计算结果攒齐了,开始  排序、取TopN
            Long windowEnd = ctx.getCurrentKey();
            // 1.排序
            List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
            dataList.sort((o1, o2) -> o2.f1 - o1.f1);

            // 2.取TopN
            StringBuffer outStr = new StringBuffer();

            outStr.append("\n=======================\n");
            for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
                Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
                outStr.append("top:" + (i + 1));
                outStr.append("\n");
                outStr.append("vc=" + vcCount.f0);
                outStr.append("\n");
                outStr.append("count=" + vcCount.f1);
                outStr.append("\n");
                outStr.append("窗口结束时间:" + DateFormatUtils.format(vcCount.f2, "yyyy-MM-dd HH:mm:ss.SSS" + "\n"));
                outStr.append("=======================\n");
            }

            // 用完的List,及时清理
            dataList.clear();

            // 输出
            out.collect(outStr.toString());
        }
    }
}

输入数据:

[root@node141 ~]# nc -lk 9999
s1,1,1
s1,2,2
s1,8,1
s1,10,1

运行结果:

(三)思路三:使用侧输出流——推荐

import com.atguigu.bean.WaterSensor;
import com.atguigu.split.WaterSensorMapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("node141", 9999)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 乱序
                                .withTimestampAssigner((element, recordTimestamp) ->
                                        element.getTs() * 1000L
                                ));

        OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);
        SingleOutputStreamOperator<WaterSensor> process = sensorDS.keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
                    @Override
                    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                        // 使用侧输出流告警
                        if (value.getVc() > 10) {
                            ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!!!");
                        }

                        // 主流正常发送数据
                        out.collect(value);
                    }
                });
        process.print("主流");

        process.getSideOutput(warnTag).printToErr("warn");

        env.execute();
    }
}

输入数据:

[root@node141 ~]# nc -lk 9999
s1,1,5
s1,10,1
s1,6,11
s1,7,20

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1064162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSDN Markdown

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

数据中台实战(05)-如何统一管理纷繁杂乱的数据指标?

各种类型的元数据有什么用&#xff1f;跟数据中台啥关系&#xff1f; 元数据在指标管理、模型设计、数据质量和成本治理四个领域都发挥作用&#xff0c;这些领域构成数据中台OneData 数据体系。今天逐一了解元数据在上述领域的应用 1 指标管理 指标&#xff0c;一种特定类型…

【Java项目推荐之黑马头条】你的发布文章业务是怎么实现的?

前言 在学习Java的路上还是遇到了很多不错的好项目的&#xff0c;今天分享给大家&#xff0c;希望能对大家面试有所帮助&#xff01; 后续会继续推荐其他好的项目&#xff0c;这次推荐的是B站开源的视频黑马头条项目&#xff0c;来吧学会它一起去虐面试官&#xff01;&#x…

MySQL:增量备份和恢复(5)

介绍 增量备份的特点 MySQL数据库二进制日志对备份的意义 增量备份的优点是没有重复数据&#xff0c;备份量不大&#xff0c;时间短。缺点也很明显&#xff0c;需要上次完全备份及完全备份之后所有的增量备份才能恢复&#xff0c;反推恢复&#xff0c;操作较为繁琐。 Mysql没有…

buildroot添加package包

本文通过一个简单的例子介绍如何在 RK3568的buildroot/package 目录下添加一个自己的 package&#xff08;软件包&#xff09; 一、开发源码工程 首先进入/app 目录下&#xff0c;在该目录下创建一个名为“mypackage”的文件夹&#xff0c;如下所示&#xff1a;   在 mypac…

LVGL_基础控件进度条bar

LVGL_基础控件进度条bar 1、创建进度条控件 // 创建一个 bar 组件(对象)&#xff0c;他的父对象是活动屏幕对象lv_obj_t *bar lv_bar_create(lv_scr_act()); LV_LOG_USER("lv_bar_get_value(bar) %d", lv_bar_get_value(bar));/* 设置位置 */ lv_obj_center(bar);…

Linux 安装字体

1.进入/usr/share/fonts路径&#xff0c;为了方便区分新安装的字体&#xff0c;最好单独创建文件夹 2.把需要安装的字体解压到自己创建的文件夹里面&#xff0c;这里要安装Hack 字体 3.输入以下三条命令如下图所示 ####三条命令要在字体路径下执行 sudo mkfontscale sudo mkfo…

【产品设计】如何开展你的B端产品需求调研

对于B端产品来说&#xff0c;需求调研是经常做而且很重要的一件事&#xff0c;只有对需求足够了解&#xff0c;才能设计出大众所喜欢的、市场所需要的好的产品。那么&#xff0c;应该如何开始你的需求调研呢&#xff1f;需求调研的执行过程又是怎样的&#xff1f;如何对结果进行…

快排(三种单趟排序法,递归非递归算法)

快排发明者:霍尔 (Sir Charles Antony Richard Hoare) 是一位英国计算机科学家。 计算机领域的爵士——托尼霍尔(Tony Hoare)(1934年1月11日出生),英文全称Sir Charles Antony Richard Hoare,常被称为Tony Hoare或者C. A. R. Hoare,1959年博士毕业于

Java方法:重复使用的操作可以写成方法哦

&#x1f451;专栏内容&#xff1a;Java⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、方法的概念1、什么是方法&#xff1f;2、方法的定义3、方法调用的过程 二、方法重载1、重载的概念2、方法签名 在日常生活中…

elementui修改message消息提示颜色

/* el弹出框样式 */ .el-message {top: 80px !important;border: 0; }.el-message * {color: var(--white) !important;font-weight: 600; }.el-message--success {background: var(--themeBackground); }.el-message--warning {background: var(--gradientBG); }.el-message--…

按键精灵调用大漠插件源码例子

源码名称&#xff1a;按键精灵调用大漠插件例子源码完整备注 源码名称&#xff1a;按键精灵调用大漠插件例子源码完整备注 蓝奏下载&#xff1a;https://wwi.lanzoup.com/iuffr0riiowf 飞书网盘&#xff1a;Docs

国庆作业 day 2

select实现服务器并发 #include<myhead.h> #define ERR_MSG(msg) do{\fprintf(stderr, "__%d__:", __LINE__); \perror(msg);\ }while(0)#define PORT 8888 //端口号&#xff0c;范围1024~49151 #define IP "192.168.0.103" //本…

VUE3照本宣科——package.json与vite.config.js

VUE3照本宣科——package.json与vite.config.js VUE3照本宣科系列导航 前言一、package.json1.name2.version3.private4.scripts5.dependencies6.devDependencies 二、vite.config.js1.plugins2.resolve.alias3.base4.mode 三、VUE3照本宣科系列总结 VUE3照本宣科系列导航 1.VU…

ZRTP交叉编译与移植

1 ZRTP源码下载 这里采用的是libzrtp来自于freeswitch&#xff1a;libs/libzrtp。 2 ZRTP交叉编译 zrtp编译比较简单&#xff0c;采用configure进行编译在根目录心中zrtp编译脚本&#xff0c;只需要指定交叉编译工具链和安装地址即可。脚本如下所示&#xff1a; unset CC C…

文心一言 VS 讯飞星火 VS chatgpt (107)-- 算法导论10.1 5题

五、用go语言&#xff0c;栈插入和删除元素只能在同一端进行&#xff0c;队列的插入操作和删除操作分别在两端进行&#xff0c;与它们不同的&#xff0c;有一种双端队列(deque)&#xff0c;其插入和删除操作都可以在两端进行。写出4个时间均为 O(1)的过程&#xff0c;分别实现在…

Python之字符串分割替换移除

Python之字符串分割替换移除 分割 split(sepNone, maxsplit-1) -> list of strings 从左至右sep 指定分割字符串&#xff0c;缺省的情况下空白字符串作为分隔符maxsplit 指定分割的次数&#xff0c;-1 表示遍历整个字符串立即返回列表 rsplit(sepNone, maxsplit-1) -> …

【熬夜爆肝版】JAVA基础入门专栏——1.JAVA开发入门

JAVA开发入门 1、Java概述1&#xff09;起源2&#xff09;特点3&#xff09;应用领域 2、JDK1&#xff09;定义2&#xff09;作用3&#xff09;组成4&#xff09;JDK版本与兼容性5&#xff09;JDK的安装与配置6&#xff09;JDK的发行版 3、系统环境变量1&#xff09;定义2&…

【Java项目推荐之黑马头条】你的登录鉴权业务是怎么实现的?

前言 在学习Java的路上还是遇到了很多不错的好项目的&#xff0c;今天分享给大家&#xff0c;希望能对大家面试有所帮助&#xff01; 后续会继续推荐其他好的项目&#xff0c;这次推荐的是B站开源的视频黑马头条项目&#xff0c;来吧学会它一起去虐面试官&#xff01;&#x…

【C语言初阶】初识C语言

目录 一、什么是C语言 二、第一个C语言程序 三、数据类型 类型的使用&#xff1a; 四、变量、常量 4.1 定义变量的方法 4.2 变量的命名 4.3 变量的分类 4.4 变量的使用 4.5 变量的作用域和生命周期 4.5.1 作用域 4.5.2 生命周期 4.6 常量 五、字符串转义字符注释 …