flink单机部署
Java版本:1.8.0_45
flink下载:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz
解压安装包:
[root@vm-9f-mysteel-dc-ebc-test03 opt]# tar -zxvf flink-1.7.2-bin-scala_2.11.tgz
flink-1.7.2/
flink-1.7.2/opt/
flink-1.7.2/opt/flink-table_2.11-1.7.2.jar
flink-1.7.2/opt/flink-metrics-graphite-1.7.2.jar
flink-1.7.2/opt/flink-queryable-state-runtime_2.11-1.7.2.jar
flink-1.7.2/opt/flink-swift-fs-hadoop-1.7.2.jar
flink-1.7.2/opt/flink-metrics-statsd-1.7.2.jar
flink-1.7.2/opt/flink-ml_2.11-1.7.2.jar
flink-1.7.2/opt/flink-s3-fs-hadoop-1.7.2.jar
进入到bin目录,启动flink:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./start-cluster.sh
访问web界面:http://192.168.201.143:8081
实时处理socket流数据
执行以下命令构建flink项目:
D:\\\apache-maven-3.6.0\\\bin\\\mvn.cmd archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2
构建完之后导入至idea,项目结构如下:
编写实时处理类:
package com.fen;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamingJob {
public static void main(String[] args) throws Exception {
//参数检查
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;
}
String hostname = args[0];
Integer port = Integer.parseInt(args[1]);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = env.socketTextStream(hostname, port);
//计数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
sum.print();
env.execute("Java WordCount from SocketTextStream Example");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token: tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
这个实时处理作业的source 是 scoket ,slink是print,对实时接收数据中的单词进行个数统计
打包:
把jar包上传至安装了flink的服务器中:
在linux中安装nc:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# yum install nc
利于nc启动socket server
[root@vm-9f-mysteel-dc-ebc-test03 ~]# nc -l 127.0.0.1 8888
执行flink job:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink run -c com.fen.StreamingJob ./com.fen-1.0-SNAPSHOT.jar 127.0.0.1 8888
看streaming作业会一直处于running运行中
再来看看怎么对实时数据进行统计的:
停止flink job
1、可以通过以下界面的形式进行取消:
2、使用命令取消:
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink list
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Waiting for response...
------------------ Running/Restarting Jobs -------------------
13.12.2022 19:17:49 : c99f75881ce909ef625c8311e0f5e575 : Java WordCount from SocketTextStream Example (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@vm-9f-mysteel-dc-ebc-test03 bin]# ./flink cancel -m 127.0.0.1:8081 c99f75881ce909ef625c8311e0f5e575
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.7.2/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apply/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cancelling job c99f75881ce909ef625c8311e0f5e575.
Cancelled job c99f75881ce909ef625c8311e0f5e575.