文章目录
- 前言
- 一、flink 写kafka
-
- 1.第一种使用FlinkKafkaProducer API
- 2.第二种使用自定义序列化器
- 3.第三种使用FlinkKafkaProducer011 API
- 4.使用Kafka的Avro序列化 (没有使用过,感觉比较复杂)
- 5.第五种使用 (强烈推荐使用)
- 二、Flink读kafka
- 三、Flink写其他外部系统
前言
提示:这里主要总结在工作中使用到的和遇到到的问题:Java flink版本1.15+
一、flink 写kafka
1.第一种使用FlinkKafkaProducer API
// 假设有一个DataStream<String> named text
DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // 目标Kafka topic
new SimpleStringSchema(), // 序列化schema
props, // 生产者配置
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 语义保证
// 添加sink
text.addSink(myProducer);
此方式flink1.14以上版本已经废弃,不建议使用
2.第二种使用自定义序列化器
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {
private static final long serialVersionUID = 8497940668660042203L;
private String topic;
public CustomKafkaSerializationSchema(final String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
}
}
在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema
3.第三种使用FlinkKafkaProducer011 API
// 假设有一个DataStream<String>
DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");
// Kafka 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用 FlinkKafkaProducer011 写入 Kafka
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
"my-topic", // 目标 Kafka topic
new SimpleStringSchema(), // 序列化 schema
props, // 生产者配置
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // 语义保证
// 添加 sink
text.addSink(myProducer