一、环境准备
1、准备两台服务器server115 和server116安装好hadoop环境,其中server115配置hdfs的namenode,在server116上配置hdfs的SecondaryNameNode,server116配置yarn的 ResourceManager,启动hadoop集群
2、配置hadoop环境变量
vim /etc/profile
export HADOOP_CLASSPATH=`hadoop classpath`
二、配置FLink集群环境
关于flink的架构图
部署配置:
服务器 | server115 | server116 |
---|---|---|
Flink组件 | taskmanger | jobmanager taskmanager |
(1) 进入server115节点,编辑flink的配置文件flink-conf.yml
# JobManager runs.
jobmanager.rpc.address: server116
jobmanager.bind-host: server116
taskmanager.bind-host: server115
taskmanager.host: server115
rest.port: 8081
rest.address: server116
(2) 进入server116节点,编辑flink的配置文件flink-conf.yml
# JobManager runs.
jobmanager.rpc.address: server116
jobmanager.bind-host: server116
taskmanager.bind-host: server116
taskmanager.host:server116
rest.port: 8081
rest.address: server116
(3)在server115、server116的节点workers配置
server115
server116
三、启动flink
./flink/bin/yarn-session.sh -d
JobManager Web Interface: http://server116:40617
四、任务测试
1、 开启一个tcp服务
nc -l -p 9999
2、编写java侦听代码
public class SocketFlinkExecute {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("server116", 40617);
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("server116", 9999)
.flatMap(new MySplitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
try {
env.execute("Window WordCount Remote");
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
运行程序,tcp服务端发送文本数据