目录
一、执行环境
1.1 创建执行环境
1.2 执行模式
二、源算子
2.1 从集合中读取数据
2.2 从文件读取数据
2.3 从socket读取数据
2.4 从kafka读取数据
三、转换算子
3.1 基本转换算子
(1)映射(map)
(2)过滤(filter)
(3)扁平映射(flatMap)
3.2 聚合转换算子(Aggregation)
(1) 按键分区(keyBy)
(2) 简单聚合(sum/min/max/minBy/maxBy)
(3) 归约聚合(reduce)
3.3 用户自定义函数
3.4 物理分区算子
1.自定义分区:
2.随机分区:
四、输出算子
4.1 连接到外部系统
4.2 传输到文件
4.3 传输到kafka
4.4 传输到MySQL
4.5 自定义Sink输出
一、执行环境
1.1 创建执行环境
我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
(1)getExecutionEnvironment
最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
(2)createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
(3)createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。
1.2 执行模式
1)流执行模式(Streaming)
默认情况下,程序使用的就是Streaming执行模式。
2)批执行模式(Batch)
专门用于批处理的执行模式。
3) 自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
二、源算子
一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。
2.1 从集合中读取数据
fromCollection方法进行读取
java:
public class FlinkFromCollectionExample {
public static void main(String[] args) throws Exception {
// 创建流处理环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据
List<String> data = Arrays.asList("element1", "element2", "element3");
DataStream<String> stream = env.fromCollection(data);
// 使用 MapFunction 对每个元素进行处理,这里我们简单地将其转换为大写
DataStream<String> upperCaseStream = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
return value.toUpperCase();
}
});
// 打印结果到控制台
upperCaseStream.print();
// 执行作业
env.execute("Flink from collection example");
}
}
/**这个示例中,我们首先创建了一个流处理环境,然后使用 fromCollection 方法从 Java 的 List 中读取数据。接下来,我们使用 map 操作对读取的数据进行处理(这里简单地将每个元素转换为大写)。最后,我们打印结果到控制台并执行作业。*/
scala:
object SimpleFlinkApp {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建数据源,这里我们使用集合作为数据源
val data = env.fromElements(1, 2, 3, 4, 5)
// 定义一个简单的转换操作
val transformedData = data.map(x => x * 2)
// 定义一个简单的打印操作
transformedData.print()
// 开始执行作业
env.execute("Flink Scala WordCount Example")
}
}
/**这个示例展示了如何使用 Flink 的 DataStream API 从一个集合中读取数据,然后通过 map 转换操作将每个元素乘以2,最后使用 print 操作将结果打印到控制台。
注意:在实际应用中,你可能需要从外部系统(如数据库、消息队列等)读取数据,而不是从一个集合中读取。*/
2.2 从文件读取数据
readTextFile方法进行读取
java:
public class FileReadingExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据
DataStream<String> text = env.readTextFile(new Path("file:///path/to/your/file"));
// 定义一个简单的转换操作,将每行文本转换为单词元组
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// 打印结果到控制台
counts.print();
// 开始执行作业
env.execute("Flink Java File Reading Example");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 按空格拆分每行文本,并将每个单词元组发送到下游操作符
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
/**这个示例展示了如何使用 Flink 的 DataStream API 从文件中读取数据,并使用自定义的 Tokenizer 类将每行文本转换为单词元组。然后,使用 keyBy 和 sum 操作符对单词进行计数,并将结果打印到控制台。*/
scala:
object FileReadingExample {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用 TextInputFormat 从文件中读取数据
val text = env.readTextFile("path/to/your/file")
// 定义一个简单的转换操作
val transformedData = text.map(line => line.split(" ") match {
case Array(word, _*) => (word, 1)
})
// 定义一个简单的计数操作
val counts = transformedData.keyBy(_._1).sum(1)
// 定义一个简单的打印操作
counts.print()
// 开始执行作业
env.execute("Flink Scala File Reading Example")
}
}
/**在这个示例中,我们首先创建了一个 StreamExecutionEnvironment 对象,然后使用 readTextFile 方法从指定的文件路径读取数据。读取的数据是一个 DataStream[String],然后我们通过 map 转换操作将每一行数据拆分成单词并计数,最后通过 keyBy 和 sum 操作计算每个单词的出现次数。最后,我们使用 print 操作将结果打印到控制台。*/
2.3 从socket读取数据
socketTextStream方法进行读取 一般是用于测试
java:
DataStream<String> stream = env.socketTextStream("localhost", 7777);
scala:
val strem=env.socketTextStream("localhost",777)
2.4 从kafka读取数据
fromSource方法进行读取
java:
public class SourceKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092")
.setTopics("topic_1")
.setGroupId("iii")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
stream.print("Kafka");
env.execute();
}
}
scala:
object SourceKafka {
def main(args: Array[String]): Unit = {
val env=StreamExecutionEnvironment.getExecutionEnvironment
val sourceKafka=KafkaSource.builder()
.setTopics("topic_1")
.setBootstrapServers("bigdata1:9092")
.setGroupId("iii")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
env.setParallelism(1)
val stream=env.fromSource(sourceKafka,WatermarkStrategy.noWatermarks(),"kafka_source")
stream.print("kafka")
env.execute()
}
}
三、转换算子
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
3.1 基本转换算子
(1)映射(map)
map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
java:
public class TransMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_2", 2, 2)
);
// 方式一:传入匿名类,实现MapFunction
stream.map(new MapFunction<WaterSensor, String>() {
@Override
public String map(WaterSensor e) throws Exception {
return e.id;
}
}).print();
// 方式二:传入MapFunction的实现类
// stream.map(new UserMap()).print();
env.execute();
}
public static class UserMap implements MapFunction<WaterSensor, String> {
@Override
public String map(WaterSensor e) throws Exception {
return e.id;
}
}
}
scala:
object SimpleMapExample {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建数据源
val text = env.fromElements("Hello, Flink", "Flink is powerful", "Stream processing at scale")
// 使用 map 操作转换数据
val mapped = text.map(word => word.toUpperCase)
// 打印结果到控制台
mapped.print()
// 执行任务
env.execute("Flink Scala Map Example")
}
}
(2)过滤(filter)
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
java:
public class TransFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);
// 方式一:传入匿名类实现FilterFunction
stream.filter(new FilterFunction<WaterSensor>() {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals("sensor_1");
}
}).print();
// 方式二:传入FilterFunction实现类
// stream.filter(new UserFilter()).print();
env.execute();
}
public static class UserFilter implements FilterFunction<WaterSensor> {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals("sensor_1");
}
}
}
scala:
object FilterExample {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建数据源
val text = env.fromElements("Alice", "Bob", "Charlie", "David", "Eve")
// 定义过滤条件
def filterNameLength(name: String): Boolean = name.length > 3
// 使用 filter 函数进行过滤
val filtered = text.filter(filterNameLength)
// 打印过滤后的结果
filtered.print()
// 执行任务
env.execute("Flink Filter Example")
}
}
(3)扁平映射(flatMap)
将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
java:
public class TransFlatmap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);
stream.flatMap(new MyFlatMap()).print();
env.execute();
}
public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {
@Override
public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
if (value.id.equals("sensor_1")) {
out.collect(String.valueOf(value.vc));
} else if (value.id.equals("sensor_2")) {
out.collect(String.valueOf(value.ts));
out.collect(String.valueOf(value.vc));
}
}
}
}
scala:
object FlatMapExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements("Hello, world!", "Flink flatMap example")
val flatMapped = text.flatMap { line =>
line.split("\\s+") // 将每行文本按空格拆分成单词
}
flatMapped.print() // 打印结果到控制台
env.execute("Flink flatMap example")
}
}
3.2 聚合转换算子(Aggregation)
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。
(1) 按键分区(keyBy)
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。
java:
public class TransKeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);
// 方式一:使用Lambda表达式
KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
// 方式二:使用匿名类实现KeySelector
KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
@Override
public String getKey(WaterSensor e) throws Exception {
return e.id;
}
});
env.execute();
}
}
scala:
object KeyByExample {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建数据流
val dataStream = env.fromElements(
("apple", 3),
("banana", 2),
("orange", 5),
("apple", 1),
("banana", 3),
("orange", 2)
)
// 使用 keyBy 对数据进行分组,这里按照水果名称进行分组
val keyedStream = dataStream.keyBy(0)
// 打印结果
keyedStream.print()
// 执行作业
env.execute("KeyBy Example")
}
}
(2) 简单聚合(sum/min/max/minBy/maxBy)
- sum():在输入流上,对指定的字段做叠加求和的操作。
- min():在输入流上,对指定的字段求最小值。
- max():在输入流上,对指定的字段求最大值。
- minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
- maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
java:
public class SumMinMaxExample {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, String>> stream = env.fromElements(
new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(3, "c"), new Tuple2<>(4, "d"), new Tuple2<>(5, "e")
); // 输入数据流
// 求和
DataStream<Tuple2<Integer, String>> sum = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, sum)
}
}); // 计算每个元素的和
sum.print(); // 打印结果
// 求最小值
DataStream<Tuple2<Integer, String>> min = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, min)
}
}); // 计算每个元素的最小值
min.print(); // 打印结果
// 求最大值
DataStream<Tuple2<Integer, String>> max = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map(Tuple2<Integer, String> value) throws Exception {
return new Tuple2<>(value.f0, value.f1, value.f0); // 将每个元素转换为三元组 (value, _, max)
}
}); // 计算每个元素的最大值
max.print(); // 打印结果
// 按某个字段求最小值(例如按第二个字段)
DataStream<Tuple2<String, Integer>> minBySecondField = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<Integer, String> value) throws Exception {
return new Tuple2<>(value.f1, value.f0); // 将每个元素转换为二元组 (_, value)
}
}); // 按第二个字段求最小值(由于是二元组,此处不需进一步处理)
minBySecondField.print(); // 打印结果
// 按某个字段求最大值(例如按第二个字段)
DataStream<Tuple2<String, Integer>> maxBySecondField = stream.map(new MapFunction<Tuple2<Integer, String>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<Integer, String> value) throws Exception {
return new Tuple2<>(value.f1, value.f0); // 将每个元素转换为二元组 (_, value)
}
}); // 按第二个字段求最大值(由于是二元组,此处不需
scala:
object SumMinMaxExample {
def main(args: Array[String]): Unit = {
// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(
(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")
) // 输入数据流
// 求和
val sum = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, sum)
sum.print() // 打印结果
// 求最小值
val min = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, min)
min.print() // 打印结果
// 求最大值
val max = stream.map(x => (x._1, x._2, x._1)) // 将每个元素转换为三元组 (value, _, max)
max.print() // 打印结果
// 按某个字段求最小值(例如按第二个字段)
val minBySecondField = stream.map(x => (x._2, x._1)) // 将每个元素转换为二元组 (_, value)
minBySecondField.keyBy(0).minBy(1).print() // 按第二个字段求最小值并打印结果
// 按某个字段求最大值(例如按第二个字段)
val maxBySecondField = stream.map(x => (x._2, x._1)) // 将每个元素转换为二元组 (_, value)
maxBySecondField.keyBy(0).maxBy(1).print() // 按第二个字段求最大值并打印结果
// 启动流处理作业
env.execute("Sum, Min, Max and MinBy/MaxBy example")
}
}
(3) 归约聚合(reduce)
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
java:
public class ReduceExample {
public static void main(String[] args) throws Exception {
// 创建流处理环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建一个整数数据集
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
// 使用reduce()函数对数据集进行求和
DataSet<Integer> sum = numbers.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
// 打印结果
sum.print();
// 执行流处理作业
env.execute("Reduce example");
}
}
scala:
object ReduceExample {
def main(args: Array[String]): Unit = {
// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3, 4, 5) // 输入数据流
// 求和
val sum = stream.reduce(_ + _) // 使用 reduce 操作进行求和
sum.print() // 打印结果
// 启动流处理作业
env.execute("Reduce example")
}
}
3.3 用户自定义函数
3.4 物理分区算子
1.自定义分区:
使用用户定义的 Partitioner 为每个元素选择目标任务。
Java:
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
Scala
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
2.随机分区:
将元素随机地均匀划分到分区。
Java:
dataStream.shuffle();
Scala:
dataStream.shuffle()
四、输出算子
4.1 连接到外部系统
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。
4.2 传输到文件
FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
- 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
- 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
Java:
public class FileSinkExample {
public static void main(String[] args) throws Exception {
// 创建流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源数据流,这里简单起见,我们直接创建一个包含字符串的流
DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Streaming");
// 使用RichSinkFunction创建一个将数据写入文件的Sink
text.addSink(new RichSinkFunction<String>() {
private FileWriter fileWriter;
private BufferedWriter bufferedWriter;
@Override
public void open(Configuration parameters) throws IOException {
super.open(parameters);
fileWriter = new FileWriter("output.txt", true); // 第二个参数表示是否追加到文件末尾
bufferedWriter = new BufferedWriter(fileWriter);
}
@Override
public void invoke(String value, RuntimeContext runtimeContext) throws IOException {
bufferedWriter.write(value); // 将元素写入文件
bufferedWriter.newLine(); // 换行
}
@Override
public void close() throws IOException {
super.close();
bufferedWriter.close(); // 关闭写入器
fileWriter.close(); // 关闭文件写入器
}
});
// 执行任务
env.execute("File Sink Example");
}
}
scala:
object FileSinkExample {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 创建数据源
val data = env.fromElements("Hello, Flink!", "Goodbye, Flink!")
// 创建文件输出流
val output = data.writeAsText("/path/to/output/file")
// 执行任务
env.execute("File Sink Example")
}
}
4.3 传输到kafka
addSink 方法实现
java:
public class ProducerKafkaFlink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从kafka读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("tuzisir", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
stream.print();
//将结果写到kafka
stream.addSink(new FlinkKafkaProducer<>(
"localhost:9092",
"student-write",
new SimpleStringSchema()
)).name("flink-connectors-kafka");
env.execute("write to kafka");
}
}
scala:
object KafkaExample {
def main(args: Array[String]): Unit = {
// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建数据源
val input = env.fromElements("Hello", "World")
// 定义Kafka生产者配置
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "test")
// 创建Kafka序列化器
val schema = new KafkaSerializationSchema[String](new SimpleStringSchema()) {
override def serialize(element: String, partitioner: Int): Array[Byte] = {
element.getBytes(StandardCharsets.UTF_8)
}
}
// 创建Kafka生产者并输出数据到Kafka主题
val kafkaProducer = new FlinkKafkaProducer[String]("my-topic", schema, kafkaProps)
input.addSink(kafkaProducer)
// 执行流处理任务
env.execute("Flink Kafka Example")
}
}
4.4 传输到MySQL
toAppendStream 方法实现
java:
public class FlinkToMySQLExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.enableCatalogs(); // 启用 catalogs 支持
tableEnv.useCatalog("kafka"); // 使用名为 "kafka" 的 catalog
tableEnv.getCatalog("kafka").get().open(); // 打开 catalog 连接
tableEnv.executeSql("CREATE TABLE kafka_table (name STRING, age INT) WITH (...)"); // 创建 Kafka 表并指定连接参数(这里需要指定 Kafka 的连接参数)
tableEnv.executeSql("CREATE TABLE mysql_table (name STRING, age INT) WITH ('connector' = 'mysql', 'hostname' = 'localhost', 'database-name' = 'mydb', 'username' = 'root', 'password' = 'password')"); // 创建 MySQL 表并指定连接参数(这里需要指定 MySQL 的连接参数)
// 读取 Kafka 中的数据并插入到 MySQL 中
Table kafkaTable = tableEnv.sqlQuery("SELECT * FROM kafka_table"); // 从 Kafka 表中选择数据
tableEnv.toAppendStream(kafkaTable, Row.class).map(row -> row).addSink(Sinks.jdbc("INSERT INTO mysql_table VALUES (?, ?)", "name, age", new JdbcAppendStreamSinkFunction<>(new JdbcConnectionOptions("jdbc:mysql://localhost:3306/mydb", "root", "password")))); // 将数据插入到 MySQL 表中
env.execute(); // 执行 Flink 作业
}
}
scala:
object WriteToMySQL {
def main(args: Array[String]): Unit = {
// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// 创建输入数据流
val inputStream = env.fromElements("John", "Anna", "Peter", "Linda")
// 使用简单的字符串格式化器
.map(new MapFunction[String, Row]() {
override def map(value: String): Row = {
val row = new Row(1)
row.setField(0, value)
row
}
})
// 注册为表进行查询操作
val table = tEnv.fromDataStream(inputStream, $"name")
tEnv.toAppendStream[Row](table, $"name") // 将表转换为流并输出名字字段,流中的每条记录都是一个名字。
// 写入 MySQL 数据库,此处以 localhost:3306/dbname 为例,请根据实际情况修改。
// 注意:MySQL JDBC URL 的格式为 jdbc:mysql://hostname:port/databaseName?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&autoReconnect=true&serverTimezone=UTC&allowMultiQueries=true&useUnicode=true&autoReconnect=true&allowPublicKeyRetrieval=true&allowMultiQueries=true&useUnicode=true&autoReconnect=true。其中hostname:port/databaseName为你实际的MySQL地址和数据库名。此处的例子只是为了演示。
// 注意:在生产环境中,需要配置好合适的异常处理和重试机制。本示例中未包含。
// 注意:此处的代码示例是简化的,只包含基本的写入操作,并未包含所有可能的错误处理和优化。在生产环境中,需要更全面的错误处理和优化策略。此处的代码仅供参考。
// 注意:在生产环境中,需要配置好合适的序列化和反序列化机制。本示例中未包含。
// 注意:在使用 JDBC 连接器时,需要考虑连接池的使用和资源的管理。本示例中未包含。
// 注意:在使用 JDBC 连接器时,需要考虑 SQL 注入攻击的风险。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含。本示例中未包含
4.5 自定义Sink输出
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。