Flink Flink数据写入Kafka

news2025/1/21 11:24:56

一、环境准备

flink 1.14写入Kafka,首先在pom.xml文件中导入相关依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.6</flink.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hbase.version>1.4.9</hbase.version>
        <hive.version>2.3.5</hive.version>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>8.0.22</mysql.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

二、Flink将Socket数据写入Kafka(精准一次)

注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟

package com.flink.DataStream.Sink;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class flinkSinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        // 如果是精准一次,必须开启 checkpoint
        streamExecutionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        /**
         * TODO Kafka Sink
         * TODO 注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
         * 1、开启 checkpoint
         * 2、设置事务前缀
         * 3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟
         */
        Properties properties=new Properties();
        properties.put("transaction.timeout.ms",10 * 60 * 1000 + "");
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("localhost:9092")
                // 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化)
                .setRecordSerializer(KafkaRecordSerializationSchema
                        .<String>builder()
                        .setTopic("testtopic01")
                        // 指定value的序列化器
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                // 写到 kafka 的一致性级别: 精准一次、至少一次
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("flinkkafkasink-")
                // 如果是精准一次,必须设置 事务超时时间: 大于 checkpoint间隔,小于 max 15 分钟
                .setKafkaProducerConfig(properties)
                //.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        streamSource.sinkTo(kafkaSink);
        streamExecutionEnvironment.execute();
    }
}

查看ProduceerConfig配置
在这里插入图片描述

三、启动Zookeeper、Kafka

#启动zookeeper
${ZK_HOME}/bin/zkServer.sh start
#查看zookeeper状态
${ZK_HOME}/bin/zkServer.sh status
#启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
#查看topic
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper localhost:2181
#创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testtopic02 --partitions 2 --replication-factor 1
#删除topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testtopic02
#生产消息
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic01
#消费消息
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic01 --from-beginning

通过socket模拟数据写入Flink之后,Flink将数据写入Kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1290302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL联合查询、最左匹配、范围查询导致失效

服务器版本 客户端&#xff1a;navicat premium16.0.11 联合索引 假设有如下表 联合索引就是同时把多列设成索引&#xff0c;如(empno&#xff0c;ename)在查询的时候就会先按照empno进行查询&#xff0c;再按照ename进行查询其中empno是全局有序&#xff0c;ename是局部有…

java数字千分位格式转换

java数字千分位格式转换 public static void main(String[] args) {System.out.println(thousandsSeparator("123123131"));}public static String thousandsSeparator(String value) {if (isNotNull(value)) {String[] arr value.split("");for (int i …

Kotlin Flow 操作符

前言 Kotlin 拥有函数式编程的能力&#xff0c;使用Kotlin开发&#xff0c;可以简化开发代码&#xff0c;层次清晰&#xff0c;利于阅读。 然而Kotlin拥有操作符很多&#xff0c;其中就包括了flow。Kotlin Flow 如此受欢迎大部分归功于其丰富、简洁的操作符&#xff0c;巧妙使…

top K问题(C语言)

目录 前言 top K问题 模拟数据 建堆 验证&#xff08;简单了解即可&#xff09; 最终代码 调试部分 前言 在大小堆的实现&#xff08;C语言&#xff09;中我们讨论了堆的实际意义&#xff0c;在看了就会的堆排序&#xff08;C语言&#xff09;中我们完成了堆排序&#…

MYSQL全语法速查(含示例)

文章目录 1.从简单的查询开始查找所有记录(SELECT *)查找记录中的所有登录名(SELECT)查找登录名为admin的密码(WHERE)查找电话号码非空的记录(IS NOT NULL)查找所在城市为北京或者用户名字是李四的记录(OR)查找所在城市为北京并且用户名字是张三的记录(AND)查找用户名字是李四或…

6个实用又好用的交互原型工具!

在 UI/UX 设计中&#xff0c;原型设计是至关重要的一步。正如用户体验中的其它环节一样&#xff0c;有无数的交互原型工具可以帮助你完成原型设计。市场上有太多的交互原型工具&#xff0c;如果你不知道选择哪一种&#xff0c;那么我们将为你介绍 6 个实用又好用的交互原型工具…

C++ day56 两个字符串的删除操作 编辑距离

题目1&#xff1a;583 两个字符串的删除操作 题目链接&#xff1a;两个字符串的删除操作 对题目的理解 返回使两个单词word1和word2相同的最少删除多少个元素&#xff0c;两个单词至少包含一个字母&#xff0c;且仅包含小写字母 思路1&#xff1a;这道题与昨天的不同子序列…

Dagger2使用

在android引入Dagger2库 //引入Dagger2implementation("com.google.dagger:dagger:2.48.1")annotationProcessor ("com.google.dagger:dagger-compiler:2.48.1") 构造器注入 创建一个类 public class Car {//在构造器上面添加dagger的Inject即可Injectp…

现代雷达车载应用——第2章 汽车雷达系统原理 2.1节

经典著作&#xff0c;值得一读&#xff0c;英文原版下载链接【免费】ModernRadarforAutomotiveApplications资源-CSDN文库。 2.1 基本雷达功能 雷达系统通过天线或天线阵列向空间辐射电磁能量。辐射的电磁能量“照亮”周围的目标。“被照亮”的目标拦截一些辐射能量&#xff0…

二十章总结

20.1线程介绍 世间有很多工作都是可以同时完成的。例如&#xff0c;人体可以同时进行呼吸、血液循环、思考问题等活动&#xff1b;用户既可以使用计算机听歌&#xff0c;也可以使用它打印文件。同样&#xff0c;计算机完全可以将多种活动同时进行&#xff0c;这种思想放在 Jav…

配置主机与外网时间服务器同步时间

目录 NTP服务简介 时间管理命令 第一步&#xff1a;更改当前主机所在地的时间 方法一&#xff1a;使用tzselect命令查询需要的时区 1、使用tzselect命令查询需要的时区 2、添加变量到 ~/.bash_profile 文件中&#xff0c;即追加类似的内容&#xff1a; 3、重新连接一个…

Flink(九)【时间语义与水位线】

前言 2023-12-02-20:05&#xff0c;终于写完啦&#xff0c;最近状态不错。刚写完又收到了她的消息哈哈哈哈&#xff0c;开心。 再去全力打拼一次&#xff0c;奋战一场&#xff0c;就算最后打了败仗也无所谓&#xff0c;至少你留下了足迹。 《解忧杂货店》 1、时间语义 …

团队git操作流程

项目的开发要求&#xff1a;&#xff08;1&#xff09;项目组厉员代码提交不少于20次 &#xff08;2&#xff09;项目组厉员每天提交不少于20次 &#xff08;3&#xff09;企业项目开发代码的每天的提交一般提交3-5次 &#xff08;4&#xff09;代码仓库的管理 git的基础操作流…

【CSP】202305-1_重复局面Python实现

文章目录 [toc]试题编号试题名称时间限制内存限制题目背景问题描述输入格式输出格式样例输入样例输出样例说明子任务提示Python实现 试题编号 202305-1 试题名称 重复局面 时间限制 1.0s 内存限制 512.0MB 题目背景 国际象棋在对局时&#xff0c;同一局面连续或间断出现3次或3…

会声会影2024软件还包含了视频教学以及模板素材

会声会影2024中文版是一款加拿大公司Corel发布的视频编软件。会声会影2024官方版支持视频合并、剪辑、屏幕录制、光盘制作、添加特效、字幕和配音等功能&#xff0c;用户可以快速上手。会声会影2024软件还包含了视频教学以及模板素材&#xff0c;让用户剪辑视频更加的轻松。 会…

【Linux 进度条小程序】缓冲区+回车换行

文章目录 回车与换行缓冲区举个栗子fflush函数倒计时小程序进度条小程序 回车与换行 回车和换行是不同的两个概念 回车&#xff1a;\r 使光标回到本行行首。 换行&#xff1a;\n使光标下移一格。 一般我们的键盘上的Enter键是回加换行键 在c语言中 \n 表示回车换行 效果和Ent…

iNet Network Scanner for Mac:简洁高效的WiFi网络扫描软件

随着无线网络的普及&#xff0c;WiFi网络已经成为我们日常生活中必不可少的一部分。无线网络的稳定性和速度对我们的工作和娱乐体验至关重要。因此&#xff0c;一款功能强大、简洁高效的WiFi网络扫描软件非常重要。今天&#xff0c;我们向大家推荐一款优秀的Mac平台WiFi网络扫描…

AI生成工具助力业界,黎万强投资。/微信支付和支付宝境外银行卡全面支持 | 魔法半周报

我有魔法✨为你劈开信息大海❗ 高效获取AIGC的热门事件&#x1f525;&#xff0c;更新AIGC的最新动态&#xff0c;生成相应的魔法简报&#xff0c;节省阅读时间&#x1f47b; &#x1f525;资讯预览 AI应用一年半&#xff0c;IBM HR部省12000小时 华为沉默中&#xff0c;何时…

提高工厂能源效率的关键:工厂能耗监测平台

工业做为能源消耗的重要场所&#xff0c;所以节能减排对工业来讲是一个亟需解决的问题。除了对设备进行更新换代外&#xff0c;还需要能源管理消耗监测平台&#xff0c;帮助企业实现节能减排的目标。 工厂能源消费量非常庞大&#xff0c;能源比较难以监测与控制。传统能源的管…

《微信小程序开发从入门到实战》学习四十二

4.3 云开发文件存储 文件存储功能支持将任意数量和格式的文件&#xff08;如图片和视频&#xff09;保存在云端&#xff0c;支持 以文件夹的形式将文件归类。 在云开发控制台中&#xff0c;可以对云端保存的文件进行管理。 也可以通过文件存储API对文件进行上传、删除、移动…