Flink1.14新版KafkaSource和KafkaSink实践使用(自定义反序列化器、Topic选择器、序列化器、分区器)

news2024/7/6 17:53:57

前言

在官方文档的描述中,API FlinkKafkaConsumer和FlinkKafkaProducer将在后续版本陆续弃用、移除,所以在未来生产中有版本升级的情况下,新API KafkaSource和KafkaSink还是有必要学会使用的。下面介绍下基于新API的一些自定义类以及主程序的简单实践。

官方案例

官方文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/

KafkaSource的自定义类

自定义反序列化器

自定义反序列化器可以以指定的格式取到来源Kafka消息中我们想要的元素。该类需要继承 KafkaDeserializationSchema ,这里简单将来源Kafka的topic、key、value以Tuple3[String, String, String]的格式取出来。

MyKafkaDeserializationSchemaTuple3.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

import java.nio.charset.StandardCharsets

/**
 * @author hushhhh
 */
class MyKafkaDeserializationSchemaTuple3 extends KafkaDeserializationSchema[(String, String, String)] {
  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, String) = {
    new Tuple3[String, String, String](
      record.topic(),
      new String(record.key(), StandardCharsets.UTF_8),
      new String(record.value(), StandardCharsets.UTF_8))
  }

  override def isEndOfStream(nextElement: (String, String, String)): Boolean = false

  override def getProducedType: TypeInformation[(String, String, String)] = {
    TypeInformation.of(classOf[(String, String, String)])
  }
}

KafkaSink的自定义类

自定义Topic选择器

自定义一个 TopicSelector 可以将流中多个topic里的数据根据一定逻辑分发到不同的目标topic里。该类需要继承 TopicSelector ,这里简单根据来源Kafka的topic名拼接下。

MyTopicSelector.scala

import org.apache.flink.connector.kafka.sink.TopicSelector

/**
 * @author hushhhh
 */
class MyTopicSelector extends TopicSelector[(String, String, String)] {
  override def apply(t: (String, String, String)): String = {
    // t: 来源kafka的topic、key、value
    "TOPIC_" + t._1.toUpperCase()
  }
}

自定义序列化器

自定义序列化器可以将数据根据自己的业务格式写到目标Kafka的key和value里,这里将来源Kafka里的key和value直接写出去,这两个类都需要继承 SerializationSchema 。

ProducerRecord Key的序列化器

MyKeySerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema

/**
 * @author hushhhh
 */
class MyKeySerializationSchema extends SerializationSchema[(String, String, String)] {
  override def serialize(element: (String, String, String)): Array[Byte] = {
    // element: 来源kafka的topic、key、value
    element._2.getBytes()
  }
}

ProducerRecord Value的序列化器

MyValueSerializationSchema.scala

import org.apache.flink.api.common.serialization.SerializationSchema

/**
 * @author hushhhh
 */
class MyValueSerializationSchema extends SerializationSchema[(String, String, String)] {
  override def serialize(element: (String, String, String)): Array[Byte] = {
    // element: 来源kafka的topic、key、value
    element._3.getBytes()
  }
}

自定义分区器

自定义分区器可以根据具体逻辑对要写到目标Kafka 里的数据进行partition分配。该类需要继承 FlinkKafkaPartitioner ,这里根据key的hash分配到不同的partition里(如果目标topic有多个partition的话)。

MyPartitioner.scala

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner

/**
 * @author hushhhh
 */
class MyPartitioner extends FlinkKafkaPartitioner[(String, String, String)] {
  override def partition(record: (String, String, String), key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
    // record: 来源kafka的topic、key、value
    Math.abs(new String(record._2).hashCode % partitions.length)
  }
}

主类

Main.scala

import format.{MyKafkaDeserializationSchemaTuple3, MyKeySerializationSchema, MyPartitioner, MyTopicSelector, MyValueSerializationSchema}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
import org.apache.kafka.clients.consumer.OffsetResetStrategy

import java.util.Properties
import scala.collection.JavaConverters._

/**
 * @author hushhhh
 */
object Main {
  def main(args: Array[String]): Unit = {
    /**
     * env
     */
    // stream环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * source
     */
    // 定义 KafkaSource
    lazy val kafkaSource: KafkaSource[(String, String, String)] = KafkaSource.builder()
      // Kafka消费者的各种配置文件,此处省略配置
      .setProperties(new Properties())
      // 配置消费的一个或多个topic
      .setTopics("sourceTopic1,sourceTopic2,...".split(",", -1).toList.asJava)
      // 开始消费位置,从已提交的offset开始消费,没有的话从最新的消息开始消费
      .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
      // 反序列化,使用之前我们自定义的反序列化器
      .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchemaTuple3))
      .build()
    // 添加 kafka source
    val inputDS: DataStream[(String, String, String)] = env.fromSource(
      kafkaSource,
      WatermarkStrategy.noWatermarks(),
      "MyKafkaSource")
      .setParallelism(1)

    /**
     * transformation
     */
    // 数据加工处理,此处省略

    /**
     * sink
     */
    // 定义 KafkaSink
    lazy val kafkaSink: KafkaSink[(String, String, String)] =
      KafkaSink.builder[(String, String, String)]()
        // 目标集群地址
        .setBootstrapServers("bootstrap.servers")
        // Kafka生产者的各种配置文件,此处省略配置
        .setKafkaProducerConfig(new Properties())
        // 定义消息的序列化模式
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
          // Topic选择器,使用之前我们自定义的Topic选择器
          .setTopicSelector(new MyTopicSelector)
          // Key的序列化器,使用之前我们自定义的Key序列化器
          .setKeySerializationSchema(new MyKeySerializationSchema)
          // Value的序列化器,使用之前我们自定义的Value序列化器
          .setValueSerializationSchema(new MyValueSerializationSchema)
          // 自定义分区器,使用之前我们自定义的自定义分区器
          .setPartitioner(new MyPartitioner)
          .build()
        )
        // 语义保证,保证至少一次
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build()

    // 添加 kafka sink
    inputDS.sinkTo(kafkaSink)
      .name("MyKafkaSink")
      .setParallelism(1)

    /**
     * execute
     */
    env.execute("myJob")
  }

}

以上就是KafkaSource和KafkaSink API的简单使用。大佬们感觉有用的话点个赞吧~😉

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1432070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JavaEE进阶】 图书管理系统开发日记——伍

文章目录 🎋前言🌲需求分析🎄约定前后端交互接口🌳实现服务器代码🚩控制层🚩业务层🚩数据层 🍃修改前端代码⭕总结 🎋前言 这次我们来实现图书管理系统的增加图书模块。…

QT 应用中集成 Sentry

QT 应用中集成 Sentry QT应用中集成 SentrySentry SDK for C/C注册 Sentry 账号QT 应用中集成 Sentry触发 Crash 上报 QT应用中集成 Sentry Sentry 是一个开源的错误监控和日志记录平台,旨在帮助开发团队实时捕获、跟踪和解决软件应用程序中的错误和异常。它提供了…

qml中布局属性讲解

1.行布局&列布局:RowLayout&ColumnLayout RowLayout {id: layoutanchors.fill: parentspacing: 6Rectangle {color: tealLayout.fillWidth: trueLayout.minimumWidth: 50Layout.preferredWidth: 100Layout.maximumWidth: 300Layout.minimumHeight: 150Text {anchors.c…

基于nginx的虚拟主机配置

目录 一.基于不同ip的虚拟主机 二.基于不同端口的虚拟主机 三.基于不同域名的虚拟主机 一.基于不同ip的虚拟主机 1.关闭 SELinux和防火墙 2.在/data目录中创建三个目录,分别为nginx1、nginx2 和nginx3,具体名为: 3.分别在三个目录中创建index.html,并输入内容“…

C++程序在开机自启和定时器执行时遇到的问题和解决方法

遇到的错误如下: Camera is created.load vfvlog.[dll/so] failed for dll[/vfvlog.so] unexistedLoadDbgConfig, LoadFile fail, err:-3, errno: No such file or directoryqt.qpa.xcb: could not connect to displayqt.qpa.plugin: Could not load the Qt platfo…

XSS haozi靶场通关笔记

XSS靶场地址:alert(1) 靶场的要求是输出一个内容为1的弹窗;这个靶场限制了输入位置只能是input code;而且浏览器发送内容时会自动进行url编码;所以重点考察的是代码的分析和基础payload构造;一切完成在当前页面&#…

毫米波雷达在汽车领域的原理、优势和未来趋势

1 毫米波雷达的原理 汽车引入毫米波雷达最初主要是为了实现盲点监测和定距巡航。毫米波实质上是电磁波,其频段位于无线电和可见光、红外线之间,频率范围为10GHz-200GHz。工作原理类似一般雷达,通过发射无线电波并接收回波,利用障…

Dynamo根据几何相交对墙体进行分组——群问题整理002

你好,这里是 BIM 的乐趣,我是九哥~ 近期给大家分享一些短平快的小教程,基本都是来自群里面常问的问题,不做过多的介绍了,直接上截图和代码。 问题:002 - 根据几何相交对墙体进行分组 今天分享的&#xff0…

C++后端开发之Sylar学习二:配置VSCode远程连接Ubuntu开发

C后端开发之Sylar学习二:配置VSCode远程连接Ubuntu开发 没错,我不能像大佬那样直接在Ubuntu上面用Vim手搓代码,只能在本地配置一下VSCode远程连接Ubuntu进行开发咯! 本篇主要是讲解了VSCode如何配置ssh连接Ubuntu,还有…

【文件增量备份系统】前端项目构建

文章目录 创建项目安装项目依赖引入element plus组件下载组件在main.js中使用组件测试 整合路由router下载组件创建路由管理器index.js使用路由App.vue上面使用 <router-view />测试 整合axios下载组件工具类axiosRequest.js工具类使用 创建项目 damwangrunqindeMBP dev…

PyTorch 2.2 中文官方教程(十四)

参数化教程 原文&#xff1a; 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Mario Lezcano 注意 点击这里下载完整示例代码 在本教程中&#xff0c;您将学习如何实现并使用此模式来对模型进行约束。这样做就像编写自己的nn.Module一样容易。 对深…

Mybatis基础教程及使用细节

本篇主要对Mybatis基础使用进行总结&#xff0c;包括Mybatis的基础操作&#xff0c;使用注解进行增删改查的练习&#xff1b;详细介绍xml映射文件配置过程并且使用xml映射文件进行动态sql语句进行条件查询&#xff1b;为了简化java开发提高效率&#xff0c;介绍一下依赖&#x…

树莓派5一键安装C++版本OpenCV

安装环境 本人当前的安装环境&#xff1a; 树莓派5Raspberry Pi os (64-bit) Debian12 Bookworm 镜像下载地址 我这里是将镜像安装好后直接安装opencv&#xff0c;如果不是刚安装好的镜像需要注意是否有openCV的python之类的安装过&#xff0c;不然可能出现编译错误 一、扩展内…

vue3+threejs+koa可视化项目——模型文件上传(第四步)

文章目录 ⭐前言&#x1f496;往期node系列文章&#x1f496;threejs系列相关文章&#x1f496;vue3threejs系列 ⭐koa后端文件上传(koa-body)&#x1f496;自动创建目录&#x1f496;自定义目录上传&#x1f496;apifox自测上传接口 ⭐vue3前端上传模型文件&#x1f496; axio…

echarts使用之饼图(四)

1 基本使用 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" cont…

latex multirow学习

今天搞了一晚上的这个multirow&#xff0c;总算弄出来了几个比较好的例子&#xff0c;主要是这个multirow的语法我没看懂&#xff0c;这个逻辑我是没理解&#xff0c;就很尴尬&#xff0c;一改就报错&#xff0c;只能先弄几个例子&#xff0c;自己慢慢试 \documentclass{artic…

k8s学习-Kubernetes的网络

Kubernetes作为编排引擎管理着分布在不同节点上的容器和Pod。Pod、Service、外部组件之间需要⼀种可靠的方找到彼此并进行通信&#xff0c;Kubernetes网络则负责提供这个保障。1.1 Kubernetes网络模型 Container-to-Container的网络 当Pod被调度到某个节点&#xff0c;Pod中的…

【Python】【完整代码】解析Excel 文件中的内容并检查是否包含某字符串,并返回判断结果

示例&#xff1a; 开发需求&#xff1a;解析Excel 文件中的内容并检查是否包含 "Fail" 字符&#xff0c;若没有则返回True&#xff0c;若有则返回False 实现代码&#xff1a; #!/usr/bin/env python3 # -*- encoding: utf-8 -*-File : check_excel_for_fail.py Ti…

后端程序员入门react笔记——react的生命周期(二)

React常用的钩子函数 constructor 这个函数我们太常见了&#xff0c;在初始化类的state的时候&#xff0c;或者初始化类的props的时候都会用到&#xff0c;就是一个类的构造函数。对后端人员来说很熟悉 constructor() {super();this.state {age: 18}}getDerivedStateFromPro…

【C++入门学习指南】:函数重载提升代码清晰度与灵活性

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; C入门到进阶 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、函数重载1.1 函数重载的概念1.2 函数重载的作用1.3 C支持函数重载的原理1.4 扩展 &…