Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

news2025/1/22 12:30:08

在这里插入图片描述
                       星光下的赶路人star的个人主页

                      欲买桂花同载酒,终不似,少年游

文章目录

  • 1、 聚合算子
    • 1.1 按键分区(KeyBy)
    • 1.2 简单聚合(Sum/Min/MinBy/MaxBy)
    • 1.3 归约聚合(Reduce)
  • 2、用户自定义函数(UDF)
    • 2.1 函数类(Function Classes)
    • 2.2 富函数类(Rich Function Classes)
  • 3、物理分区算子(Physical Partitioning)
    • 3.1 随机分区(Shuffle)
    • 3.2 轮询分区(Round-Robin)
    • 3.3 重缩放分区(rescale)
    • 3.4 广播(BroadCast)
    • 3.5 全局分区(Global)
    • 3.6 自定义分区(Custom)
  • 4、分流
    • 4.1 简单实现
    • 4.2 使用测输出流
  • 5、基本合流操作
    • 5.1 联合(union)
    • 5.2 连接(Connect)

1、 聚合算子

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是所谓的“聚合”(Aggregation),类似于MapReduce中的Reduce操作。

1.1 按键分区(KeyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过KeyBy来完成的。

KeyBy是聚合前必须用到的一个算子。KeyBy通过指定键(key),可以将一条流从逻辑上划分为不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中;这样一来,所有具有相同的key的数据,都将被发往同一个分区。
在这里插入图片描述
在内部,是通过计算key的哈希值(hash code),对分区进行取模运算来实现的,所以这里key如果是POJO的话,必须重写hashCode方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key;比如Tuple数据类型,可以指定字段的位置或多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。

public class Demo01_CommonAgg {
    public static void main(String[] args) throws Exception {
        //获取Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<String> ds = env.socketTextStream("hadoop102", 9999);

        /**
         * SingleOutputStreamOperator:这个算子是不具有聚合功能的
         */
        //一对一映射为WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> map = ds.map(new WaterSensorFunction());

        //KeyedStream:有聚合功能
        KeyedStream<WaterSensor, String> keyBy = map.keyBy(WaterSensor::getId);

        /**
         * 统计每一种传感器的vc之和
         *
         * sum|min|max:只求和指定列,除了聚合的列和分组的列,其他列取的是当前分组第一条数据的属性
         *
         * minBy|maxBy(列):取最大或最小一条数据的所有属性
         * minBy|maxBy(列,false):取最大或最小一条,最新数据的所以属性
         */

//        keyBy.sum("vc")
//                .print();
        keyBy.minBy("vc",false)
                .print();

        env.execute();
    }
}

运行截图
在这里插入图片描述

1.2 简单聚合(Sum/Min/MinBy/MaxBy)

有了按键分区的数据流KeyedStream,我们可以就可以给予它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据
  • maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()和MinBy()的区别一样。

简单聚合算子使用很方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定字段就可以了。指定字段的方式有两种:指定位置和指定名称

对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称是以f0、f1、f2、…来命名的。

如果数据流的类型是POJO类,那么就只能通过指定字段名称来指定,不能通过位置来指定了。

例子从上图看!!!

简单聚合算子返回的同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,一个只用在含有有限个key的数据流上。

1.3 归约聚合(Reduce)

Reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算。
Reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的Reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口里需要实现Reduce()方法,这个方法接受两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。

定义一个WaterSensorMapFunction:

public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {

    @Override
    public WaterSensor map(String s) throws Exception {
        String[] fileds = s.split(",");
        return new WaterSensor(fileds[0],Long.valueOf(fileds[1]),Integer.valueOf(fileds[2]));
    }
}

案例:使用Reduce实现取最小值功能:

/**
 * 规律:无KeyBy(分组),无聚合
 *
 * reduce(ReduceFunction x):
 *          ReduceFunction的逻辑由用户自己编写,灵活点
 *              特点:
 *                  输入的类型和输出的类型必须是一致的
 *                  两两聚合。每两条数据,执行一次聚合
 */
public class Demo02_Reduce {
    public static void main(String[] args) throws Exception {
        //获取Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        DataStreamSource<String> ds = env.socketTextStream("hadoop102", 9999);
 //设置并行度是1
        env.setParallelism(1);
        /**
         * SingleOutputStreamOperator:这个算子是不具有聚合功能的
         */
        //一对一映射为WaterSensor对象
        SingleOutputStreamOperator<WaterSensor> map = ds.map(new WaterSensorMapFunction());

        //KeyedStream:有聚合功能
        KeyedStream<WaterSensor, String> keyBy = map.keyBy(WaterSensor::getId);

        /**
         * 如果是第一条数据的话是不走方法的,而是直接将数据储存在value1中
         *
         * 后面的每个数据都会走这个方法,然后返回值会重新写入到value1中
         */
        keyBy.reduce(new ReduceFunction<WaterSensor>() {
            /**
             * value1:上一次计算的结果
             * value2:当前最新到达的数据
             */
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                //实现minBy(vc)的效果
                // if (value1.getVc() <= value2.getVc()){
                //实现minBy(vc,false)的效果
                if (value1.getVc()<value2.getVc()){
                    return value1;
                }else {
                    return value2;
                }

            }
        })
                .print();

        env.execute();
    }
}

运行截图:
在这里插入图片描述
Reduce和简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将Reduce算子作用在一个有限key的流上。

2、用户自定义函数(UDF)

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类。

2.1 函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现FilterFunction接口

public class TransFunctionUDF {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );
       
        DataStream<String> filter = stream.filter(new UserFilter());
      
        filter.print();
        env.execute();
    }

    public static class UserFilter implements FilterFunction<WaterSensor> {
        @Override
        public boolean filter(WaterSensor e) throws Exception {
            return e.id.equals("sensor_1");
        }
    }
}

方式二:通过匿名类来实现FilterFunction接口:

DataStream<String> stream = stream.filter(new FilterFunction< WaterSensor>() {
    @Override
    public boolean filter(WaterSensor e) throws Exception {
        return e.id.equals("sensor_1");
    }
});

方法二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"sensor_1"抽象出来作为类的属性,调用构造方法时传进去。

DataStreamSource<WaterSensor> stream = env.fromElements(        
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);

DataStream<String> stream = stream.filter(new FilterFunctionImpl("sensor_1"));

public static class FilterFunctionImpl implements FilterFunction<WaterSensor> {
    private String id;

    FilterFunctionImpl(String id) { this.id=id; }

    @Override
    public boolean filter(WaterSensor value) throws Exception {
        return thid.id.equals(value.id);
    }
}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
        );    

        //map函数使用Lambda表达式,不需要进行类型声明
        SingleOutputStreamOperator<String> filter = stream.filter(sensor -> "sensor_1".equals(sensor.id));

        filter.print();

        env.execute();
    }
}

2.2 富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function有生命周期的概念。典型的生命周期方法有:

  • open()方法:是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
  • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map,在每条数据来到后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env
                .fromElements(1,2,3,4)
                .map(new RichMapFunction<Integer, Integer>() {
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
                    }

                    @Override
                    public Integer map(Integer integer) throws Exception {
                        return integer + 1;
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
                    }
                })
                .print();

        env.execute();
    }
}

运行截图:
在这里插入图片描述

3、物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播

3.1 随机分区(Shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

在这里插入图片描述
经过随机分区之后,得到的依然是一个DataStream。
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。

public class ShuffleExample {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        DataStreamSource<String> stream = env.socketTextStream("hadoop102", 9999);;

        stream.shuffle().print();

        env.execute();
    }
}

第一次运行截图:
在这里插入图片描述
第二次运行截图:
在这里插入图片描述

3.2 轮询分区(Round-Robin)

轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

在这里插入图片描述

3.3 重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
在这里插入图片描述

3.4 广播(BroadCast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

stream.broadcast()

3.5 全局分区(Global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去(0号)。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

3.6 自定义分区(Custom)

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

1、自定义分区器

public class MyPartitioner implements Partitioner<String> {

    @Override
    public int partition(String key, int numPartitions) {
        return Integer.parseInt(key) % numPartitions;
    }
}

2、使用自定义分区

public class PartitionCustomDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(2);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        DataStream<String> myDS = socketDS
                .partitionCustom(
                        new MyPartitioner(),
                        value -> value);
                

        myDS.print();

        env.execute();
    }
}

4、分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

在这里插入图片描述

4.1 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

代码实现:

public class Demo01_FilterDivide {
    public static void main(String[] args) throws Exception {
        //创建Flink配置对象
        Configuration configuration = new Configuration();
        //修改Flink的webUI端口配置
        configuration.setInteger("rest.port",8888);
        //创建Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

  //设置并行度是2
        env.setParallelism(2);
        //不能进行算子链的合并
        env.disableOperatorChaining();

        //获取网络传输数据源,并且映射为数字
        SingleOutputStreamOperator<Integer> map = env.socketTextStream("hadoop102", 9999)
                .name("s")
                .map(Integer::valueOf);

        //奇数一个流
        map.filter(s -> s % 2 == 1).print("奇数");
        //偶数一个流
        map.filter(s -> s % 2 == 0).print("偶数");

        //执行
        env.execute();
    }
}

运行截图:

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
在这里插入图片描述

4.2 使用测输出流

简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

代码实现:

public class SplitStreamByOutputTag {    
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777)
              .map(new WaterSensorMapFunction());


        OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};
        OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};
       //返回的都是主流
        SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
        {
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

                if ("s1".equals(value.getId())) {
                    ctx.output(s1, value);
                } else if ("s2".equals(value.getId())) {
                    ctx.output(s2, value);
                } else {
                    //主流
                    out.collect(value);
                }

            }
        });

        ds1.print("主流,非s1,s2的传感器");
        SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);
        SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);

        s1DS.printToErr("s1");
        s2DS.printToErr("s2");
        
        env.execute();
 
}
}

5、基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

5.1 联合(union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。

代码实现:

public class Demo01_Union {
    public static void main(String[] args) throws Exception {
        //创建Flink配置对象
        Configuration configuration = new Configuration();
        //修改Flink的webUI端口配置
        configuration.setInteger("rest.port",8888);
        //创建Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

	env.setParallelism(1);
        //创建四个流(前三个流的元素是一致类型)
        DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> ds1 = env.fromElements(11, 12, 13, 14, 15);
        DataStreamSource<Integer> ds2 = env.fromElements(21, 22, 23, 24, 25);
        DataStreamSource<String> ds3 = env.fromElements("1","2","3");

        //合并为一个流 使用union算子的前提是流中的元素类型是一致的,要不然会报错导致用不了
        DataStream<Integer> unionDS = ds.union(ds1, ds2);
        unionDS.print();

        env.execute();
    }
}

运行截图:
在这里插入图片描述

5.2 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。

1、连接流(Connect)
在这里插入图片描述

代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。

public class ConnectDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
//        DataStreamSource<String> source2 = env.fromElements("a", "b", "c");

        SingleOutputStreamOperator<Integer> source1 = env
                .socketTextStream("hadoop102", 9999)
                .map(i -> Integer.parseInt(i));

        DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 9998);

        /**
         * TODO 使用 connect 合流
         * 1、一次只能连接 2条流
         * 2、流的数据类型可以不一样
         * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
         */
        ConnectedStreams<Integer, String> connect = source1.connect(source2);

        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "来源于数字流:" + value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于字母流:" + value;
            }
        });

        result.print();

        env.execute();    }
}


运行截图:
在这里插入图片描述

上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。

2、CoProcessFunction

与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

        // 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);

        SingleOutputStreamOperator<String> result = connectKey.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 定义 HashMap,缓存来过的数据,key=id,value=list<数据>
                    Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

                    @Override
                    public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1.来过的s1数据,都存起来
                        if (!s1Cache.containsKey(id)) {
                            // 1.1 第一条数据,初始化 value的list,放入 hashmap
                            List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                            s1Values.add(value);
                            s1Cache.put(id, s1Values);
                        } else {
                            // 1.2 不是第一条,直接添加到 list中
                            s1Cache.get(id).add(value);
                        }

                        //TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据
                        if (s2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                out.collect("s1:" + value + "<--------->s2:" + s2Element);
                            }
                        }
                    }

                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1.来过的s2数据,都存起来
                        if (!s2Cache.containsKey(id)) {
                            // 1.1 第一条数据,初始化 value的list,放入 hashmap
                            List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                            s2Values.add(value);
                            s2Cache.put(id, s2Values);
                        } else {
                            // 1.2 不是第一条,直接添加到 list中
                            s2Cache.get(id).add(value);
                        }

                        //TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据
                        if (s1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                out.collect("s1:" + s1Element + "<--------->s2:" + value);
                            }
                        }
                    }
                });

        result.print();

        env.execute();
    }
}

运行截图:
在这里插入图片描述

在这里插入图片描述
                      您的支持是我创作的无限动力

在这里插入图片描述
                      希望我能为您的未来尽绵薄之力

在这里插入图片描述
                      如有错误,谢谢指正;若有收获,谢谢赞美

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1034373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis实战-实现笔记点赞和点赞排行榜

发布探店笔记 探店笔记类似点评网站的评价&#xff0c;往往是图文结合。对应的表有两个&#xff1a; tb_blog&#xff1a;探店笔记表&#xff0c;包含笔记中的标题、文字、图片等 tb_blog_comments&#xff1a;其他用户对探店笔记的评价 保存笔记service层 Overridepublic Re…

实例讲解Spring boot动态切换数据源

前言 在公司的系统里&#xff0c;由于数据量较大&#xff0c;所以配置了多个数据源&#xff0c;它会根据用户所在的地区去查询那一个数据库&#xff0c;这样就产生了动态切换数据源的场景。 今天&#xff0c;就模拟一下在主库查询订单信息查询不到的时候&#xff0c;切换数据…

什么是UWB定位技术?UWB定位的应用场景及功能介绍

说到定位我们并不陌生&#xff0c;定位技术一直与我们的生活密不可分&#xff0c;比如最常见的车辆导航。 根据使用场景&#xff0c;定位技术分为室内定位和室外定位。 室外定位主要依靠GPS&#xff0c;北斗&#xff0c;GLONASS&#xff0c;伽利略等全球卫星定位导航系统。室内…

系统集成|第十一章(笔记)

目录 第十一章 项目人力资源管理11.1 项目人力资源管理的定义及有关概念11.2 主要过程11.2.1 编制项目人力资源管理计划11.2.2 组建项目团队11.2.3 建设项目团队11.2.4 管理项目团队 11.3 现代激励理论11.4 项目经理所需具备的影响力11.5 常见问题 上篇&#xff1a;第十章、质量…

有效保护敏感数据的最佳实践

在当今数据驱动的环境中&#xff0c;数据就是力量&#xff0c;组织仍然高度关注如何利用其数据进行 BI、分析和其他业务驱动计划。 事实上&#xff0c;最近的研究表明&#xff0c;数据领导者的主要动机是对高质量分析洞察的需求&#xff0c;而不是合规性。 然而&#xff0c;…

八大排序--------(五)堆排序

本专栏内容为&#xff1a;八大排序汇总 通过本专栏的深入学习&#xff0c;你可以了解并掌握八大排序以及相关的排序算法。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;八大排序汇总 &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库…

如何防止商业秘密泄露(洞察眼MIT系统商业机密防泄密解决方案)

在当今的商业环境中&#xff0c;保护公司的商业秘密是至关重要的。商业秘密可能包括独特的业务流程、客户列表、研发成果、市场策略等&#xff0c;这些都是公司的核心竞争力。一旦这些信息被泄露&#xff0c;可能会对公司的生存和发展产生重大影响。本文将探讨如何通过使用洞察…

【数据结构--排序】堆排序

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

Java 项目-基于 SpringBoot+Vue的疫情网课管理系统

文章目录 第一章 简介第二章 技术栈第三章 系统分析3.4.2学生用例 第四章 系统设计第五章 系统实现5.1学生功能模块5.2管理员功能模块5.3教师功能模块 六 源码咨询 第一章 简介 疫情网课也都将通过计算机进行整体智能化操作&#xff0c;实现的功能如下。 例如 管理员&#x…

如何监控公司电脑上网记录(员工上网行为监控软件有哪些?)

在当今数字化的世界中&#xff0c;互联网已经成为企业运营的重要组成部分。然而&#xff0c;随着这一转变&#xff0c;企业也面临着新的挑战&#xff0c;尤其是关于员工上网行为监控的问题。本文旨在解释公司上网行为监控的含义&#xff0c;重要性&#xff0c;实施方法以及最佳…

msvcp100.dll丢失原因,电脑出现msvcp100.dll丢失错误的解决方法

msvcp100.dll 是一个动态链接库文件&#xff0c;它包含了 C 运行时库的一些函数和类&#xff0c;例如全局对象、异常处理、内存管理、文件操作等。它是 Visual Studio 2010 及以上版本中的一部分&#xff0c;用于支持 C 应用程序的运行。如果 msvcp100.dll 丢失或损坏&#xff…

记一次linux下pip安装包时出错及奇怪的解决过程

一、问题说明 如图&#xff0c;在使用pip安装测速工具speedtest-cli时&#xff0c;终端提示“Externally managed environment &#xff08;从外部管理的环境&#xff09;”&#xff0c;导致无法安装该库。 二、问题解决 1 尝试提示的解决方案&#xff0c;改用命令apt inst…

C++【个人笔记1】

1.C的初识 1.1 简单入门 #include<iostream> using namespace std; int main() {cout << "hello world" << endl;return 0; } #include<iostream>; 预编译指令&#xff0c;引入头文件iostream.using namespace std; 使用标准命名空间cout …

qt 打印当前路径

//当前根目录qDebug()<< QDir::currentPath();//当前exe目录qDebug()<< QCoreApplication::applicationDirPath();//当前exe路径qDebug()<< QCoreApplication::applicationFilePath();分别输出&#xff1a;

文件储存平方根

任务描述 本关任务&#xff1a;给定程序中&#xff0c;函数fun的功能是将自然数1&#xff5e;10以及它们的平方根写到名为myfile3.txt的文本文件中&#xff0c;然后再顺序读出显示在屏幕上。请不要增行或删行&#xff0c;或更改程序的结构。 相关知识 相关知识略 编程要求 …

【Android Framework系列】第17章 Android Q沙箱模式(Scoped Storage)

1 背景 上一章节【Android Framework系列】第16章 存储访问框架 (SAF) 主要分析了Android4.4引入的存储访问框架&#xff08;SAF&#xff09;&#xff0c;本章节我们对Android10&#xff08;Q&#xff09;的存储相关进行分析&#xff0c;了解下其限制存储方式。 Google为了让…

oracle19c 集群部署的问题汇总

1、互信报错 处理过程 01、发现/etc/sysctl.conf中有net.ipv4.icmp_echo_ignore_all1配置&#xff0c;注释后发现还是无法通过 02、# cat /proc/sys/net/ipv4/icmp_echo_ignore_all发现返回1&#xff0c;说明还是禁ping&#xff0c;两个节点执行# echo 0 > /proc/sys/net…

dosbox调试模式下0000:0000地址中内容被修改的原因

跟着王爽老师学习汇编&#xff0c;执行以下指令时&#xff0c;发现自己手动算出来的和dosbox验证的不一致 dosbox用的是debug模式&#xff0c;确保了内存数据和指令都完全一致的情况下&#xff0c;逐步执行&#xff0c;发现写在0000:0000位置的内存数据在执行add命令的时候被修…

改写软件-怎么选择改写软件

什么是改写软件&#xff1f;改写软件是基于自然语言处理技术的工具&#xff0c;它们可以分析一段文字&#xff0c;并将其重新表达&#xff0c;以保持原始意义&#xff0c;但使用不同的词汇和结构。这种技术可用于减少内容的重复&#xff0c;增加多样性&#xff0c;或者简化复杂…

小说推文授权和短剧推广渠道的介绍怎么申请

新赛道小说推文和短剧推广怎么申请授权的&#xff1f; 可以通过”巨量推文“进行授权 只需要通过平台申请对应的推广信息&#xff08;短剧或小说&#xff09;通过后即可展开推广 具体的推广方式也都有资料