- 新建类
package test01;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class TestConnection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
executionEnvironment.setParallelism(1);
//Connect可以将不同数据类型的流进行合并,但形成的是ConnectedStream,并不是DataStream,也就是说对外是一个整体的合并后的流,但其实内部是各自处理各自的数据。
//创建流1:数字流,但是由于我们输入时是字符串,所以这里需要将字符串进行类型转换,转换为数值类型的.
SingleOutputStreamOperator<Integer> dataSource = executionEnvironment
.socketTextStream("localhost", 7777)
.map(value -> Integer.parseInt(value));
//创建流2:字符串流
DataStreamSource<String> stringSource = executionEnvironment.socketTextStream("localhost", 8888);
//合并流,与union不同的是,union可以在一个source的后面多次调用union()合并多个stream,但是在connect中只能单次调用connect()进行合并
ConnectedStreams<Integer, String> connect = dataSource.connect(stringSource);
/**
* 注意ConnectedStreams中没有print(),有map()、process()等方法用来对合并后的流中得到不同类型流进行分别处理.
* 这里使用map(),CoMapFunction的参数一指的是调用connect()方法的数据流类型,参数二指的是被调用的数据流类型,
* 也就是connect()括号中的数据流类型,参数三是最终合并后的数据流类型,可以看到参数一和参数二已经根据前面我们调用connect时的两个数据流类型
* 自动帮我们获取到了数据类型,参数三初始是Object类型,这里我们想要使合并后的数据流类型变成String类型,所以参数三设置为String。
*/
SingleOutputStreamOperator<String> outputStream = connect.map(new CoMapFunction<Integer, String, String>() {
//重写map1()和map2(),map1()指的就是参数一对应的数据流,map2()指的是参数二对应的数据流
@Override
public String map1(Integer integer) throws Exception {
//在map1()方法中对数据进行处理,使之返回值为String
return "原始的数值流:" + integer.toString();
}
@Override
public String map2(String s) throws Exception {
return "原始的字符串流:" + s;
}
});
outputStream.print();
executionEnvironment.execute();
}
}
- 启动两个窗口
- 启动程序
此时在窗口中输入数据,注意在7777端要输入数字,8888端输入字符串,然后观察控制台输出。