6.2、Flink数据写入到Kafka

news2025/1/15 21:05:18

目录

1、添加POM依赖

2、API使用说明

3、序列化器

3.1 使用预定义的序列化器

3.2 使用自定义的序列化器

4、容错保证级别

4.1 至少一次 的配置

4.2 精确一次 的配置

5、这是一个完整的入门案例


1、添加POM依赖

Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

<!-- 引入 kafka连接器依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.1</version>
</dependency>

2、API使用说明

KafkaSink 可将数据流写入一个或多个 Kafka topic。

官网链接:官网链接

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型为 输入输入的类型
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填项:配置容错保证级别 精准一次、至少一次、不做任何保证
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

3、序列化器

序列化器的作用是将flink数据转换成 kafka的ProducerRecord

3.1 使用预定义的序列化器

功能:将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

// 初始化 KafkaSink 实例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        .build();

3.2 使用自定义的序列化器

功能:可以对 kafka消息的key、value、partition、timestamp进行赋值

/**
 * 如果要指定写入kafka的key,可以自定义序列化器:
 * 		1、实现 一个接口,重写 序列化 方法
 * 		2、指定key,转成 字节数组
 * 		3、指定value,转成 字节数组
 * 		4、返回一个 ProducerRecord对象,把key、value放进去
 */
// 初始化 KafkaSink 实例 (自定义 KafkaRecordSerializationSchema 实例)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                new KafkaRecordSerializationSchema<String>() {

                    @Nullable
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                        String[] datas = element.split(",");
                        byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                        byte[] value = element.getBytes(StandardCharsets.UTF_8);
                        Long currTimestamp = System.currentTimeMillis();
                        Integer partition = 0;
                        return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
                    }
                }
        )
        .build();

4、容错保证级别

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee

  • DeliveryGuarantee.NONE   不提供任何保证
    • 消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复
  • DeliveryGuarantee.AT_LEAST_ONCE  至少一次
    • sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。
    • 消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE 精确一次
    • 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。
    • 因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。
    • 然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
    • 请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

4.1 至少一次 的配置

DataStream<String> stream = ...;

// 初始化 KafkaSink 实例
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers("worker01:9092")
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(
                KafkaRecordSerializationSchema.<String>builder()
                        .setTopic("20230912")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
        )
        // TODO 必填项:配置容灾保证级别设置为 至少一次
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

stream.sinkTo(sink);

4.2 精确一次 的配置

// 如果是精准一次,必须开启checkpoint
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()  // 泛型为 输入输入的类型
        // TODO 必填项:配置 kafka 的地址和端口
        .setBootstrapServers(brokers)
        // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        // TODO 必填项:配置容灾保证级别设置为 精准一次
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        // 如果是精准一次,必须设置 事务的前缀
        .setTransactionalIdPrefix("flink-")
        // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
        .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
        .build();
        
stream.sinkTo(sink);

5、这是一个完整的入门案例

需求:Flink实时读取 socket数据源,将读取到的数据写入到Kafka (要保证不丢失,不重复)

开发语言:java1.8

flink版本:flink1.17.0

package com.baidu.datastream.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

// TODO flink 数据输出到kafka
public class SinkKafka {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 如果是精准一次,必须开启checkpoint
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.指定数据源
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 KafkaSink 实例
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // TODO 必填项:配置 kafka 的地址和端口
                .setBootstrapServers("worker01:9092")
                // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("20230912")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // TODO 必填项:配置容灾保证级别设置为 精准一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("flink-")
                // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
                .build();

        streamSource.sinkTo(kafkaSink);

        // 3.触发程序执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1016531.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python:函数调用的实参

相关阅读 Python专栏https://blog.csdn.net/weixin_45791458/category_12403403.html 调用就是附带可能为空的一系列参数来执行一个可调用对象 &#xff08;例如函数&#xff09;&#xff0c;它的语法的BNF范式如下所示&#xff0c;有关BNF范式的规则&#xff0c;可以参考之前…

无涯教程-JavaScript - CEILING.MATH函数

描述 CEILING.MATH函数将数字四舍五入到最接近的整数或最接近的有效倍数。 Excel CEILING.MATH函数是Excel中的十五个舍入函数之一。 语法 CEILING.MATH (number, [significance], [mode])争论 Argument描述Required/OptionalNumberNumber must be less than 9.99E307 and …

运维自动化:提高效率的秘诀

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

从一到无穷大 #16 ByteSeries,思考内存时序数据库的必要性

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 引言 在[3]中我基于Gorilla讨论了时序数据库设置cache的可行性&#xff0c;最后得出结论&…

许可分析 license分析 第十章

许可分析是指对软件许可证进行详细的分析和评估&#xff0c;以了解组织内部对软件许可的需求和使用情况。通过许可分析&#xff0c;可以帮助组织更好地管理和优化软件许可证的使用。以下是一些可能的许可分析方法和步骤&#xff1a; 软件许可证供应商管理&#xff1a;评估和管理…

CSS 浮动布局

浮动的设计初衷 float: left/right/both;浮动是网页布局最古老的方式。 浮动一开始并不是为了网页布局而设计&#xff0c;它的初衷是将一个元素拉到一侧&#xff0c;这样文档流就能够包围它。 常见的用途是文本环绕图片&#xff1a; 浮动元素会被移出正常文档流&#xff0c;…

Redis集群总结

Redis&#xff0c;作为一款开源的、内存中的数据结构存储系统&#xff0c;以其出色的性能和丰富的数据结构在业界赢得了广泛的认可。然而&#xff0c;当我们面临大量数据和高并发请求时&#xff0c;单个 Redis 实例可能无法满足我们的需求。这时&#xff0c;我们就需要使用到 R…

Kubernetes实践:从入门到实践

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

认识Git的工作区、暂存区与版本库

使用 git init 命令在 gitcode 文件夹下创建如下图所示的Git仓库。现在思考这样一个问题&#xff1a;gitcode目录下创建的README文件可以直接被git管理和追踪吗&#xff1f; 答案是否定的&#xff0c;因为只有 Git 本地仓库中的文件才可以被版本控制。什么&#xff1f;难道当前…

内网隧道代理技术(二十七)之 DNS隧道介绍

DNS隧道介绍 DNS协议介绍 域名系统(Domain Name System,缩写:DNS)是互联网的一项服务。它作为将域名和IP地址相互映射的一个分布式数据库,能够使人更方便地访问互联网。DNS使用TCP和UDP端口53。当前,对于每一级域名长度的限制是63个字符,域名总长度则不能超过253个字符…

第二篇------Virtual I/O Device (VIRTIO) Version 1.1

上篇文章&#xff1a;https://blog.csdn.net/Phoenix_zxk/article/details/132917657 篇幅太大&#xff0c;所以分开写&#xff0c;接下来续上 4.3.3.2.1 设备要求&#xff1a;Guest->Host 通知 设备必须忽略 GPR2 的位 0-31&#xff08;从左边数&#xff09;。这样可以使…

Eclipse开源代码下载

当前插件开发&#xff0c;需要修改eclipse源码&#xff0c;如需要修改remote相关的代码&#xff0c;所以需要下载相关源码。网上大多资料都说的不清不楚的&#xff0c;也可能我太小白&#xff0c;不明白&#xff0c;反正就是折腾了一两天才感觉有点思路&#xff0c;改如何找源码…

Linux安全加固:保护你的服务器

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

基于springboot+vue的问卷调查系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

聊天机器人

收集窗帘相关的数据 可以用gpt生成&#xff0c;也可以用爬虫 图形化界面 gradio 向量数据库 faiss python代码 import gradio as gr import random import timefrom typing import Listfrom langchain.embeddings.openai import OpenAIEmbeddings from langchain.vectorstor…

央媒发稿不能改?媒体发布新闻稿有哪些注意点

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 “央媒发稿不能改”是媒体行业和新闻传播领域的普遍理解。央媒&#xff0c;即中央主要媒体&#xff0c;是权威性的新闻源&#xff0c;当这些媒体发布新闻稿或报道时&#xff0c;其他省、…

服务器监控工具:选择与应用

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

积木报表 JimuReport v1.6.2-GA5版本发布—高危SQL漏洞安全加固版本

项目介绍 一款免费的数据可视化报表&#xff0c;含报表和大屏设计&#xff0c;像搭建积木一样在线设计报表&#xff01;功能涵盖&#xff0c;数据报表、打印设计、图表报表、大屏设计等&#xff01; Web 版报表设计器&#xff0c;类似于excel操作风格&#xff0c;通过拖拽完成报…

活锁 死锁

一、活锁&#xff08;liveLock&#xff09; 活锁是指线程间资源冲突激烈&#xff0c;引起线程不断的尝试获取资源&#xff0c;不断的失败。活锁有点类似于线程饥饿&#xff0c;虽然资源并没有被别人持有&#xff0c;但由于各种原因而无法得到。最常见的原因是进程组的执行顺序…

gRpc入门和springboot整合

gRpc入门和springboot整合 一、简介 1、gprc概念 gRpc是有google开源的一个高性能的pc框架&#xff0c;Stubby google内部的rpc,2015年正式开源&#xff0c;云原生时代一个RPC标准。 tips:异构系统&#xff0c;就是不同编程语言的系统。 2、grpc核心设计思路 grpc核心设计…