Flink+Kafka消费

news2025/1/11 14:19:47

引入jar

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-java</artifactId>
	<version>1.8.0</version>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-streaming-java_2.11</artifactId>
	<version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.11</artifactId>
	<version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);

FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据

//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);


//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠
		.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集
		.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){
	//定义消费者信息
	Properties properties = new Properties();
	properties.put("bootstrap.servers", "192.168.131.147:9092");
	properties.put("group.id", "flink-consumer-kafka-group");
	properties.put("auto.offset.reset", "latest");
	properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

	FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);
	return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

酸菜行业分析:占整体市场规模比重的1.2%

酸菜的加工不仅有巨大的市场需求&#xff0c;且对分布于湖南、四川、广西、贵州等地的芥菜等植物资源和农产品价值&#xff0c;予以有效提升&#xff0c;是贫困地区农民增收的重要方式。单杨建议&#xff0c;以酸菜为原料的食品加工企业应引以为戒&#xff0c;将食品安全重点防…

Gartner发布2024年网络安全预测 :IAM 和数据安全相结合,解决长期存在的挑战

安全和风险管理领导者需要采用可组合的数据安全视图。这项研究预测&#xff0c;将数据安全创新应用于痛点和高级用例将有助于组织将其数据用于几乎任何用例。 主要发现 在所有云服务模型中&#xff0c;数据安全以及身份和访问管理 (IAM) 的责任均由最终客户承担。 由于这两个学…

大数据技术14:FlinkCDC数据变更捕获

前言&#xff1a;Flink CDC是Flink社区开发的flink-cdc-connectors 组件&#xff0c;这是⼀个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。 https://github.com/ververica/flink-cdc-connectors 一、CDC 概述 CDC 的全称是 Change …

【倒计时征稿中,ACM独立出版,有确定的ISBN号,ei检索稳定且快】2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023)

2023 人工智能、系统与网络安全国际学术会议 (AISNS 2023&#xff09; 2023 International Conference on Artificial Intelligence, Systems and Network Security 一、大会简介 由西南科技大学计算机科学与技术学院主办的2023人工智能、系统与网络安全国际学术会议 (AISNS …

【Hive】——DDL(PARTITION)

1 增加分区 1.1 添加一个分区 ALTER TABLE t_user_province ADD PARTITION (provinceBJ) location/user/hive/warehouse/test.db/t_user_province/provinceBJ;必须自己把数据加载到增加的分区中 hive不会帮你添加 1.2 一次添加多个分区 ALTER TABLE table_name ADD PARTITION…

Spring Boot整合Sharding-JDBC实现数据脱敏

目录 背景ShardingSphere脱敏规则sharding-jdbc数据脱敏数据脱敏配置数据分片 数据脱敏配置 背景 对互联网公司、传统行业来说&#xff0c;数据安全一直是极为重视和敏感的话题。数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形&#xff0c;实现敏感隐私数据的可靠保护…

HTTP 410错误:资源已永久删除,了解与处理

在Web开发中&#xff0c;HTTP状态码是用于表示Web服务器响应的各种状态。其中&#xff0c;HTTP 410错误表示资源已永久删除。这意味着请求的资源已经不再存在&#xff0c;无法通过HTTP请求再次获取。 当HTTP 410错误出现时&#xff0c;客户端可能会收到一个“410 Gone”响应&a…

机器学习支持向量机(SVM)

svm与logstic异同 svm支持向量机&#xff0c;因其英文名为support vector machine&#xff0c;故一般简称SVM&#xff0c;通俗来讲&#xff0c;它是一种二类分类模型&#xff0c;其基本模型定义为特征空间上的间隔最大的线性分类器&#xff0c;其学习策略便是间隔最大化&#x…

Logistic 回归算法

Logistic 回归 Logistic 回归算法Logistic 回归简述Sigmoid 函数Logistic 回归模型表达式求解参数 $\theta $梯度上升优化算法 Logistic 回归简单实现使用 sklearn 构建 Logistic 回归分类器Logistic 回归算法的优缺点 Logistic 回归算法 Logistic 回归简述 Logistic 回归是一…

Nginx七层代理,四层代理 + Tomcat多实例部署

目录 1.tomcat多实例部署 准备两台虚拟机 进入pc1 pc2同时安装jdk 进入pc1 pc2安装tomcat PC1配置&#xff08;192.168.88.50&#xff09; 安装tomcat多实例 tomcat2中修改端口 启动tomcat1 tomcat2 分别在三个tomcat服务上部署jsp的动态页面 2.nginx的七层代理&…

多分类预测 | MATLAB实现CNN-LSTM-Attention多输入分类预测

分类预测 | MATLAB实现CNN-LSTM-Attention多输入分类预测 分类效果 需要源码和数据的私信&#xff08;微微有偿取哦&#xff09;

微软microsoft推出了最新的小型但强大的开源语言AI模型Phi-2

微软推出了最新的小型开源语言模型 Phi-2。该模型只有 27 亿个参数&#xff0c;却能超过比它大 25 倍的模型的性能。Phi-2 是微软 Phi 项目的一部分&#xff0c;旨在制作小而强大的语言模型。该项目包括 13 亿参数的 Phi-1&#xff0c;据称在 Python 编码方面实现了最先进的性能…

HTML5 Canvas画布讲解

一、canvas 简介 ​<canvas> 是 HTML5 新增的&#xff0c;一个可以使用脚本(通常为 JavaScript) 在其中绘制图像的 HTML 元素。它可以用来制作照片集或者制作简单(也不是那么简单)的动画&#xff0c;甚至可以进行实时视频处理和渲染。 ​它最初由苹果内部使用自己 MacO…

linux高级管理——LAMP平台部署及应用

一、认识LAMP&#xff1a; 在LAMP平台的四个构成组件中&#xff0c;每个组件都承担着一部分关键应用。经过十几年的发展&#xff0c;各组件间的兼容性得到了不断的完善&#xff0e;协作能力和稳定性也不断增强&#xff0c;可以构建出非常优秀的Web应用系统。各组件的主要作用如…

持续集成交付CICD:Jenkins使用基于SaltStack的CD流水线部署前后端应用

目录 一、实验 1.Jenkins使用基于SaltStack的CD流水线部署后端应用 2.Jenkins使用基于SaltStack的CD流水线部署前端应用 一、实验 1.Jenkins使用基于SaltStack的CD流水线部署后端应用 &#xff08;1&#xff09;GitLab添加Token (2)Jenkins添加凭据 &#xff08;3&#xf…

数据分析的基本步骤

了解过数据分析的概念之后&#xff0c;我们再来说下数据分析的常规步骤。 明确目标 首先我们要确定一个目标&#xff0c;即我们要从数据中得到什么。比如我们要看某个指标A随时间的变化趋势&#xff0c;以期进行简单的预测。 数据收集 当确定了目标之后&#xff0c;就有了取…

RT-DETR 图片目标计数 | 特定目标进行计数

全类别计数特定类别计数如何使用 RT-DETR 进行对象计数 有很多同学留言说想学 RT-DETR 目标计数。那么今天这篇博客,我将教大家如何使用 RT-DETR 进行对象计数。RT-DETR 是一种非常强大的对象检测模型,它可以识别图像中的各种对象。我们将学习如何利用这个模型对特定对象进行…

四十六、Redis哨兵

目录 一、哨兵的作用及原理 1、哨兵的结构和作用如下&#xff1a; 2、服务状态监控 3、选举新的master 4、小结 二、RedisTemplate的哨兵模式 一、哨兵的作用及原理 Redis提供了哨兵&#xff08;Sentinel&#xff09;机制来实现主从集群的自动故障恢复。 1、哨兵的结构和作…

11.jvm第三方工具使用实践

目录 概述GCEasy官网jvm内存占用情况关键性能指标堆内存与元空间优化 MAT安装MAT相关概念说明内存泄漏与内存溢出shallow heap及retained heapoutgoing references与incoming referencesDominator Tree GCViewerArthas下载安装与启动jdk8jdk 11jdk11自定义boot jarjdk17 常用命…

FIFO的Verilog设计(三)——最小深度计算

文章目录 前言一、FIFO的最小深度写速度快于读速度写速度等于或慢于读速度 二、 举例说明1. FIFO写时钟为100MHz&#xff0c;读时钟为80Mhz情况一&#xff1a;一共需要传输2000个数据&#xff0c;求FIFO的最小深度情况二&#xff1a;100个时钟写入80个数据&#xff0c;1个时钟读…