- 新建类
package test01;
import jdk.nashorn.internal.runtime.regexp.joni.Config;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TestUnion {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
executionEnvironment.setParallelism(1);
//Union合并流的流数据类型必须一致,如果不一致,需要在合并前先转换类型,使其一致
//创建要合并的流
DataStreamSource<String> stream1 = executionEnvironment.fromElements("a", "b", "c");
DataStreamSource<String> stream2 = executionEnvironment.fromElements("aa", "bb", "cc");
DataStreamSource<String> stream3 = executionEnvironment.fromElements("aaa", "bbb", "ccc");
//方式一:多次调用union
//DataStream<String> union = stream1.union(stream2).union(stream3);
//方式二:一次调用,中间用逗号隔开
DataStream<String> union = stream1.union(stream2, stream3);
union.print();
executionEnvironment.execute();
}
}
- 运行程序
方式一:
方式二: