Flink Kafka-Source

news2025/1/15 17:23:21

文章目录

    • Kafka Source
      • 1. 使用方法
      • 2. Topic / Partition 订阅
      • 3. 消息解析
      • 4. 起始消费位点
      • 5. 有界 / 无界模式
      • 6. 其他属性
      • 7. 动态分区检查
      • 8. 事件时间和水印
      • 9. 空闲
      • 10. 消费位点提交
      • 11. 监控
      • 12. 安全

  • Apache Kafka 连接器
    Flink 提供了 Apache Kafka 连接器使用精确一次(Exactly-once)的语义在 Kafka topic 中读取和写入数据。
  • 依赖
<!-- flink kakfa -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

Kafka Source

1. 使用方法

Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

2. Topic / Partition 订阅

以下属性在构建 KafkaSource 时是必须指定的:

  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition,请参阅 Topic / Partition 订阅一节
  • 用于解析 Kafka 消息的反序列化器(Deserializer),请参阅消息解析一节
    Kafka Source 提供了 3 种 Topic / Partition 的订阅方式:
  1. Topic 列表,订阅 Topic 列表中所有 Partition 的消息:
KafkaSource.builder().setTopics("topic-a", "topic-b");
  1. 正则表达式匹配,订阅与正则表达式所匹配的 Topic 下的所有 Partition:
KafkaSource.builder().setTopicPattern("topic.*");
  1. Partition 列表,订阅指定的 Partition:
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
        new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
        new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);

3. 消息解析

代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析。 反序列化器通过 setDeserializer(KafkaRecordDeserializationSchema) 来指定,其中 KafkaRecordDeserializationSchema 定义了如何解析 Kafka 的 ConsumerRecord。

如果只需要 Kafka 消息中的消息体(value)部分的数据,可以使用 KafkaSource 构建类中的 setValueOnlyDeserializer(DeserializationSchema) 方法,其中 DeserializationSchema 定义了如何解析 Kafka 消息体中的二进制数据。

也可使用 Kafka 提供的解析器 来解析 Kafka 消息体。例如使用 StringDeserializer 来将 Kafka 消息体解析成字符串:

import org.apache.kafka.common.serialization.StringDeserializer;

KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

4. 起始消费位点

Kafka source 能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:

KafkaSource.builder()
    // 从消费组提交的位点开始消费,不指定位点重置策略
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
    // 从最早位点开始消费
    .setStartingOffsets(OffsetsInitializer.earliest())
    // 从最末尾位点开始消费
    .setStartingOffsets(OffsetsInitializer.latest());

如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(OffsetsInitializer),如果未指定位点初始化器,将默认使用 OffsetsInitializer.earliest();

5. 有界 / 无界模式

Kafka Source 支持流式和批式两种运行模式。默认情况下,KafkaSource 设置为以流模式运行,因此作业永远不会停止,直到 Flink 作业失败或被取消。 可以使用 setBounded(OffsetsInitializer) 指定停止偏移量使 Kafka Source 以批处理模式运行。当所有分区都达到其停止偏移量时,Kafka Source 会退出运行。

流模式下运行通过使用 setUnbounded(OffsetsInitializer) 也可以指定停止消费位点,当所有分区达到其指定的停止偏移量时,Kafka Source 会退出运行。

6. 其他属性

除了上述属性之外,您还可以使用 setProperties(Properties) setProperty(String, String) 为 Kafka Source 和 Kafka Consumer 设置任意属性。KafkaSource 有以下配置项:

  • client.id.prefix,指定用于 Kafka Consumer 的客户端 ID 前缀
  • partition.discovery.interval.ms,定义 Kafka Source 检查新分区的时间间隔。
  • register.consumer.metrics 指定是否在 Flink 中注册 Kafka Consumer 的指标
  • commit.offsets.on.checkpoint 指定是否在进行 checkpoint 时将消费位点提交至 Kafka broker

请注意,即使指定了以下配置项,构建器也会将其覆盖:

  • key.deserializer 始终设置为 ByteArrayDeserializer
  • value.deserializer 始终设置为 ByteArrayDeserializer
  • auto.offset.reset.strategy 被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆盖
  • partition.discovery.interval.ms 会在批模式下被覆盖为 -1

7. 动态分区检查

为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。要启用动态分区检查,请将 partition.discovery.interval.ms 设置为非负值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

分区检查功能默认不开启。需要显式地设置分区检查间隔才能启用此功能。

8. 事件时间和水印

默认情况下,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间。您可以定义自己的水印策略(Watermark Strategy) 以从消息中提取事件时间,并向下游发送水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");

9. 空闲

如果并行度高于分区数,Kafka Source 不会自动进入空闲状态。您将需要降低并行度或向水印策略添加空闲超时。如果在这段时间内没有记录在流的分区中流动,则该分区被视为“空闲”并且不会阻止下游操作符中水印的进度。

10. 消费位点提交

Kafka source 在 checkpoint 完成时提交当前的消费位点 ,以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。如果未开启 checkpoint,Kafka source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑,自动提交功能由 enable.auto.commitauto.commit.interval.ms 两个 Kafka consumer 配置项进行配置。

注意:Kafka source 不依赖于 broker 上提交的位点来恢复失败的作业。提交位点只是为了上报 Kafka consumer 和消费组的消费进度,以在 broker 端进行监控。

11. 监控

Kafka source 会在不同的中汇报下列指标。
在这里插入图片描述
该指标反映了最后一条数据的瞬时值。之所以提供瞬时值是因为统计延迟直方图会消耗更多资源,瞬时值通常足以很好地反映延迟

指标监控参考

12. 安全

要启用加密和认证相关的安全配置,只需将安全配置作为其他属性配置在 Kafka source 上即可。下面的代码片段展示了如何配置 Kafka source 以使用 PLAIN 作为 SASL 机制并提供 JAAS 配置:

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_PLAINTEXT")
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_SSL")
    // SSL 配置
    // 配置服务端提供的 truststore (CA 证书) 的路径
    .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
    .setProperty("ssl.truststore.password", "test1234")
    // 如果要求客户端认证,则需要配置 keystore (私钥) 的路径
    .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
    .setProperty("ssl.keystore.password", "test1234")
    // SASL 配置
    // 将 SASL 机制配置为 as SCRAM-SHA-256
    .setProperty("sasl.mechanism", "SCRAM-SHA-256")
    // 配置 JAAS
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在 JAR 中实际的类路径来改写以上配置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/528869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

由浅入深理解java集合(四)——集合 Queue

Queue用于模拟队列这种数据结构&#xff0c;队列通常是指“先进先出”&#xff08;FIFO&#xff09;的容器。新元素插入&#xff08;offer&#xff09;到队列的尾部&#xff0c;访问元素&#xff08;poll&#xff09;操作会返回队列头部的元素。通常&#xff0c;队列不允许随机…

【C++初阶】类与对象(中)之构造函数与析构函数

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

C++继承详解——基类派生类对象赋值转换、菱形虚拟继承

hello&#xff0c;这里是bangbang&#xff0c;今天来讲下继承。 面向对象三大特性&#xff1a;封装、继承、多态。 目录 1. 继承的概念及定义 1.1 继承的概念 1.2 继承定义 1.2.1 定义格式 1.2.2 继承关系和访问限定符 1.2.3 继承基类成员访问方式的变化 2. 基类和派生类对…

Java面试知识点(全)-设计模式二

Java面试知识点(全) 导航&#xff1a; https://nanxiang.blog.csdn.net/article/details/130640392 注&#xff1a;随时更新 13.模板模式 定义一个操作中算法的骨架&#xff0c;而将一些步骤延迟到子类中&#xff0c;模板方法使得子类可以不改变算法的结构即可重定义该算法的…

7. 深入理解MVCC与BufferPool缓存机制

MySQL性能调优 1. MVCC多版本并发控制机制1.1 undo日志版本链与read view机制详解 2. Innodb引擎SQL执行的BufferPool缓存机制 本文是按照自己的理解进行笔记总结&#xff0c;如有不正确的地方&#xff0c;还望大佬多多指点纠正&#xff0c;勿喷。 本节课内容&#xff1a; 1、…

PowerShell install 一键部署VMware_Workstation

VMware Workstation 前言 VMware Workstation Pro 是业界标准的桌面 Hypervisor&#xff0c;用于在 Linux 或 Windows PC 上运行虚拟机 download VMware_Workstation VMware_Workstation WindowsVMware_Workstation linux文档downloaddownload参考 前提条件 开启wmi,配置…

2023接口自动化测试框架9项必备功能(建议收藏)

当你准备使用一个接口测试框架或者自造轮子的时候&#xff0c;或许你需要先了解下一个接口自动化测试框架必须具备什么功能。 一、校验 这个很好了解&#xff0c;如果没有校验&#xff0c;单纯的执行接口的话&#xff0c;那就谈不上测试了。所以支持对返回值校验是一个必须的…

【集合详解】——python基础——如桃花来

目录索引 集合的特点&#xff1a;创建集合&#xff1a;集合常见操作&#xff1a;增加数据&#xff1a;*add():**update():* 删除数据&#xff1a;*remove():**discard():**pop():* 查找数据&#xff1a;*in:**not in:* 集合的特点&#xff1a; 没有重复元素,即使重复&#xff0…

【ONE·C++ || set和map(三):基于红黑树的封装框架】

总言 主要介绍map、set的封装框架。 文章目录 总言1、基本框架说明2、map、set封装Ⅰ&#xff1a;用于比较的仿函数3、map、set封装Ⅱ&#xff1a;迭代器实现3.1、基本说明3.2、begin()、end()、operator*、operator&、operator、operator!3.2.1、begin()、end()3.2.2、op…

( 位运算 ) 693. 交替位二进制数 / 476. 数字的补数 ——【Leetcode每日一题】

❓ 题目一 693. 交替位二进制数 难度&#xff1a;简单 给定一个正整数&#xff0c;检查它的二进制表示是否总是 0、1 交替出现&#xff1a;换句话说&#xff0c;就是二进制表示中相邻两位的数字永不相同。 示例 1&#xff1a; 输入&#xff1a;n 5 输出&#xff1a;true 解…

使用inbuilder完成UBML低代码设计

文章目录 一、活动介绍二、我所认识的低代码平台三、使用inbuilder开发工具拖拉跩实现 低代码产品开发四、环境搭建五、5分钟完成低代码实验六、财务报销报表实验活动成果截图七、总结 一、活动介绍 开放原子训练营开启inBuilder低代码实验室活动。无论您是计算机行业相关从业…

(css)el-select多选以tag展示时,超过显示长度以...省略号显示

(css)el-select多选以tag展示时&#xff0c;超过显示长度以…省略号显示 效果 代码&#xff1a; <span>系统词典维度&#xff1a;</span><el-selectv-model"dNum"placeholder"请选择"multiplecollapse-tags //设置collapse-tags属性将它…

Windows批处理指令

前言 批处理文件&#xff08;batch file&#xff09;包含一系列 DOS 命令&#xff0c;通常用于自动执行重复性任务。用户只需双击批处理文件便可执行任务&#xff0c;而无需重复输入相同指令。编写批处理文件非常简单&#xff0c;但难点在于确保一切按顺序执行。编写严谨的批处…

Java进阶-面向对象入门

1. 面向过程与面向对象 1.1 何谓“面向对象”的编程思想&#xff1f; 首先解释一下“思想”。 先问你个问题&#xff1a;你想做个怎样的人&#xff1f; 可能你会回答&#xff1a;我想做个好人&#xff0c;孝敬父母&#xff0c;尊重长辈&#xff0c;关爱亲朋…… 你看&#…

2023年Android开发者路线-第2部分

2023年Android开发者路线-第1部分 2023年Android开发者路线-第2部分 2023年Android开发者路线-第3部分 2023年Android开发者路线-第4部分 2023Android开发者路线-第2部分 在上一篇文章中&#xff0c;我们讨论了 Android 架构的重要元素&#xff0c;包括主要的 Android 语言…

探索iOS之AVFoundation框架

AVFoundation框架的业务层主要是AVKit和UIKit&#xff0c;内核层包括CoreVideo、CoreAudio、CoreMedia、VideoToolBox等。AVFoundation作为iOS的音视频框架&#xff0c;提供音视频播放、录制、编辑、编解码、音效设置等。接下来&#xff0c;让我们看一下整体的框架图。 一、AVK…

ANR基础篇 - Trace.txt文件分析

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 文章目录 系列文章目录前言一、trace.txt文件示例二、日志分析2.1 CPU 负载2.2 内存信息2.3 堆栈信息schedst…

Mybatis 案例

文章目录 Mybatis 案例一、 准备工作1.1 数据库表1.2 Restfull规范1.3 封装结果类1.4 实体类 二、部门管理2.1 查询全部部门信息2.2 删除部门2.3 新增部门 三、员工管理3.1 分页查询3.2 分页查询 - PageHelper插件3.3 分页查询 - 条件查询3.4 批量删除员工3.5 新增员工3.6 修改…

蓝桥杯模块学习3——蜂鸣器与继电器

第一章 硬件部分 1.1 电路的组成部分 1.1.1 译码器和锁存器 具体可回顾之前LED灯的文章&#xff1a; https://blog.csdn.net/weixin_63568691/article/details/130660096 1.1.2 ULN2003达林顿管 原理图&#xff1a; 功能&#xff1a; &#xff08;1&#xff09;改变电路特性…

使用Spring初始化器创建Spring Boot项目

注&#xff1a;初始化向导需要联网创建Spring Boot项目 new project 项目创建完成 resources 文件夹中目录结构&#xff1a; static &#xff1a;保存所有的静态资文件&#xff0c; js css images templates &#xff1a;保存所有的模板页面&#xff08;Spring Boot默认j…