flink读写案例合集

news2024/12/25 9:02:04

文章目录

  • 前言
  • 一、flink 写kafka
    • 1.第一种使用FlinkKafkaProducer API
    • 2.第二种使用自定义序列化器
    • 3.第三种使用FlinkKafkaProducer011 API
    • 4.使用Kafka的Avro序列化 (没有使用过,感觉比较复杂)
    • 5.第五种使用 (强烈推荐使用)
  • 二、Flink读kafka
  • 三、Flink写其他外部系统


前言

提示:这里主要总结在工作中使用到的和遇到到的问题:Java flink版本1.15+

一、flink 写kafka

1.第一种使用FlinkKafkaProducer API

// 假设有一个DataStream<String> named text  
        DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");  
  
        Properties props = new Properties();  
        props.put("bootstrap.servers", "localhost:9092");  
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(  
            "my-topic",                // 目标Kafka topic  
            new SimpleStringSchema(),  // 序列化schema  
            props,                     // 生产者配置  
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 语义保证  
  
        // 添加sink  
        text.addSink(myProducer);

此方式flink1.14以上版本已经废弃,不建议使用
在这里插入图片描述

2.第二种使用自定义序列化器

import org.apache.flink.api.common.serialization.SerializationSchema;  
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;  
  
public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {
   
	private static final long serialVersionUID = 8497940668660042203L;
	private String topic;
	public CustomKafkaSerializationSchema(final String topic) {
   
		this.topic = topic;
	}

	@Override
	public ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {
   
		return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
	}
}

在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema

3.第三种使用FlinkKafkaProducer011 API

       // 假设有一个DataStream<String>  
        DataStream<String> text = env.fromElements("Hello", "World", "Flink", "Kafka");

        // Kafka 生产者配置  
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 使用 FlinkKafkaProducer011 写入 Kafka  
        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(
                "my-topic",          // 目标 Kafka topic  
                new SimpleStringSchema(),  // 序列化 schema  
                props,                     // 生产者配置  
                FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); // 语义保证  

        // 添加 sink  
        text.addSink(myProducer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2059146.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Kettle实战】组件讲解(战前磨刀)

目录 【 CSV 文件输入 】组件【过滤记录】组件【字段选择】组件【排序记录】组件【分组】组件【Excel输出】组件【 CSV 文件输入 】组件 基础参数解释: 字段参数解释: 【过滤记录】组件 在数据处理时,往往要对数据所属类别、区域和时间等进行限制,将限制范围外的数据过…

上千条备孕至育儿指南速查ACCESS\EXCEL数据库

虽然今天这个数据库的记录数才不过区区上千条&#xff0c;但是每条记录里的内容都包含四五个子标题&#xff0c;可以将相关的知识完整且整齐的展现&#xff0c;是个属于简而精的数据库。并且它包含2级分类。 【备孕】大类包含&#xff1a;备孕百科(19)、不孕不育(23)、精子卵子…

QStorageInfo 出现C2228报错

这里使用 QStorageInfo info(QDir(path));创建就会报错 改为 // 获取给定路径所在的磁盘信息 QDir d(path); QStorageInfo info(d);就不会报错&#xff0c;怪噻&#xff01;

【区块链+商贸零售】消费券 2.0 应用方案 | FISCO BCOS应用案例

方案基于FISCO BCOS区块链技术与中间件平台WeBASE&#xff0c;实现新一代消费券安全精准高效发放&#xff0c;实现消费激励&#xff0c; 促进消费循环。同时&#xff0c;方案将用户消费数据上链&#xff0c;实现账本记录与管理&#xff0c;同时加密机制保证了数据安全性。

基于python的坦克游戏的设计与实现

获取源码联系方式请查看文章结尾&#x1f345; 摘 要 随着互联网的日益普及、python语言在Internet上的实现&#xff0c;python应用程序产生的 互联网增值服务逐渐体现出其影响力&#xff0c;对丰富人们的生活内容、提供快捷的资讯起着不可忽视的作用。本论文中介绍了python的相…

梯度、偏导数、导数

梯度 对于一个多变量函数 f(x1,x2,…,xn)&#xff0c;其梯度 ∇f 是一个 n 维向量&#xff0c;定义为&#xff1a; ​ 是函数 f在 方向上的偏导数。 偏导数 偏导数是多元函数在某一个方向上的导数&#xff0c;它描述了函数在该方向上的局部变化率。偏导数的计算过程涉及对函…

ARR 竟然超过 150 万美元!斯坦福都在使用的 AI 学术搜索引擎 Consensus获 USV 领投的 1100 万美元。

惊爆&#xff01;就在当下&#xff0c;AI 学术搜索引擎 Consensus 传来令人震撼的消息&#xff0c;其已成功完成 1100 万美元融资。此轮 A 轮融资由 Union Square Ventures 领衔主导&#xff0c;其他参与的投资者有 Nat Friedman、Daniel Gross 以及 Draper Associates 等等。 …

springboot大学生时间管理分析系统---附源码130930

摘 要 时间是一种无形资源,但可以对其进行有效的使用与管理。时间管理倾向是个体在运用时间方式上所表现出来的心理和行为特征&#xff0c;具有多维度、多层次的心理结构&#xff0c;由时间价值感、时间监控观和时间效能感构成。时间是一种重要的资源,作为当代大学生,在进行生…

【UltraVNC】私有远程工具VNC机器部署方式

旨在解决监控端非固定IP的计算机A,远程连接受控的端非固定IP的计算机B。 一、UltraVNC下载和安装 官网:Home - UltraVNC VNC OFFICIAL SITE, Remote Desktop Free Opensource 二、部署私有的远程维护VNC机器-方式一 UltraVNC中继模式原理: UltraVNC中继模式部署: 1.1 中…

在ubuntu16.04下使用词典工具GoldenDict

前言 本来要装有道词典&#xff0c;结果发现各种问题&#xff0c;放弃。 网上看大家对GoldenDict评价比较高&#xff0c;决定安装GoldenDict 。 安装 启动 添加词库 GoldenDict本身并不带词库&#xff0c;需要查词的话&#xff0c;必须先下载离线词库或者配置在线翻译网址才…

安泰电压放大器的设计要求是什么样的

电压放大器的设计要求是一个广泛而复杂的领域&#xff0c;它在电子工程中扮演着至关重要的角色。电压放大器是一种电子电路&#xff0c;用于将输入信号的电压增大&#xff0c;而不改变其波形&#xff0c;通常用于放大微弱的信号以便进行后续处理或传输。下面将详细介绍电压放大…

【Mybatis-plus】Mybatis-plus的踩坑日记之速查版

【Mybatis-plus】Mybatis-plus踩坑日记之速查版 开篇词&#xff1a;干货篇&#xff1a;1.TableField(fill FieldFill.INSERT_UPDATE)的错误使用2.采用MybatisPlus自带update方法&#xff0c;但无法更新null的问题3.表字段为json类型的入库问题4.字段忽略未生效5.自带id生成策略…

RabbitMQ中消息的分发策略

我的后端学习大纲 RabbitMQ学习大纲 1.不公平分发&#xff1a; 1.1.什么是不公平分发&#xff1a; 1.在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发&#xff0c;但在某种场景下这种策略并不是很好&#xff0c;比方说有两个消费者在处理任务&#xff0c;其中有个…

基于vue全家桶的pc端仿淘宝系统_kebgy基于vue全家桶的pc端仿淘宝系统_kebgy--论文

TOC springboot478基于vue全家桶的pc端仿淘宝系统_kebgy基于vue全家桶的pc端仿淘宝系统_kebgy--论文 绪 论 1.1开发背景 改革开放以来&#xff0c;中国社会经济体系复苏&#xff0c;人们生活水平稳步提升&#xff0c;中国社会已全面步入小康社会。同时也在逐渐转型&#xf…

【中项第三版】系统集成项目管理工程师 | 第 15 章 组织保障

前言 本章的知识点预计上午会考1-2分&#xff0c;下午可能会考&#xff0c;一般与其他管理领域进行结合考查。学习要以教材为主。 目录 15.1 信息和文档管理 15.1.1 信息和文档 15.1.2 信息&#xff08;文档&#xff09;管理规则和方法 15.2 配置管理 15.2.1 基本概念 …

web渗透测试 学习导图

web渗透学习路线 前言 一、web渗透测试是什么&#xff1f; Web渗透测试分为白盒测试和黑盒测试&#xff0c;白盒测试是指目标网站的源码等信息的情况下对其渗透&#xff0c;相当于代码分析审计。而黑盒测试则是在对该网站系统信息不知情的情况下渗透&#xff0c;以下所说的Web…

测绘程序设计|初识C#编程语言|C#源码结构|面向对象|MFC、WinFrom与WPF

由于微信公众号改变了推送规则&#xff0c;为了每次新的推送可以在第一时间出现在您的订阅列表中&#xff0c;记得将本公众号设为星标或置顶喔~ 根据笔者经验&#xff0c;分享了C#编程语言、面向对象以及MFC、WinForm与WPF界面框架相关知识~ &#x1f33f;前言 c#作为测绘程序…

微信小程序SSL证书申请重点和方法

微信小程序运行模式主要在手机微信内&#xff0c;这一套程序可以解决了用户注册账户及支付相关问题&#xff0c;另外使用很方便&#xff0c;用户不用特意的去安装小程序&#xff0c;只要在微信里面就可以开发&#xff0c;只因为这样微信小程序很受欢迎。 对于开发者来说&#…

车企数据治理实践:业务场景为抓手势在必行

在这个信息爆炸的时代&#xff0c;数据已经成为推动企业发展的核心动力&#xff0c;而数据治理则是确保数据价值得以最大化发挥的关键。在整车制造的研发、生产及供应链业务中&#xff0c;数据治理扮演着举足轻重的角色。 数据治理对于提升数据质量至关重要。高质量的数据是企…

elemeUI中table的列内容宽度不够时的省略号如何去掉

在外层套个div来解决 <div><el-input-number class"no-ellipsis" style"width: 88px;" size"small" controls-position"right" v-model{scope.row.supplied_area}></el-input-number> </div>