flink+kafka实现流数据处理学习

news2025/1/2 0:28:21

在应用系统的建设过程中,通常都会遇到需要实时处理数据的场景,处理实时数据的框架有很多,本文将以一个示例来介绍flink+kafka在流数据处理中的应用。

1、概念介绍

  • flink:是一个分布式、高可用、高可靠的大数据处理引擎,提供了一种高效、可靠、可扩展的方式来处理和分析实时数据。

  • kafka:是用于构建实时数据管道和流应用程序并具有横向扩展,容错,wicked fast(变态快)等优点的一种消息中间件。

  • flink-connector-kafka:是flink内置的kafka连接器,它允许Flink应用轻松地从Kafka中读取数据流(Source)或将数据流写入到Kafka(Sink)。

2、实现目标

本文主要从下面3个步骤完成流数据的处理:

  • flink作为kafka消费者,从kafka中消费数据并将消费到的数据转换为flink数据流;

  • flink对获取到的数据流进行计算、聚合等操作;

  • flink对处理之后的数据再次写入到kafka中,实现数据的流动。

3、实现步骤

  • 新建maven工程,将依赖添加到环境中

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.20.0</flink.version>
    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>
  </properties>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>21</java.version>
    <flink.version>1.20.0</flink.version>
    <flink-kafka.version>3.3.0-1.20</flink-kafka.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink-kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--    json处理    -->
    <dependency>
      <groupId>com.alibaba.fastjson2</groupId>
      <artifactId>fastjson2</artifactId>
      <version>2.0.53</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.google.code.findbugs:jsr305</exclude>
                </excludes>
              </artifactSet>
              <filters>
                <filter>
                  <!-- Do not copy the signatures in the META-INF folder.
                  Otherwise, this might cause SecurityExceptions when using the JAR. -->
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <!-- Replace this with the main class of your job -->
                  <mainClass>org.example.App</mainClass>
                </transformer>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  • kafka生产者负责模拟数据流生成

System.out.println("kafka生产者启动....当前时间为:" + LocalDateTime.now());
KafkaProducerStudy kafkaProducerStudy = new KafkaProducerStudy();
KafkaProducer<String, Object> kafkaProducer = kafkaProducerStudy.createKfkaProducer();
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, kafkaProducerStudy.setKafkaUserValue(i));
kafkaProducer.send(record);
Thread.sleep(1000);
}
kafkaProducer.commitTransaction();
kafkaProducer.close();
System.out.println("kafkaProducer关闭当前时间为:" + LocalDateTime.now());
  • flink从kafka中获取数据流

//构建kafkaSource数据源
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic)    //指定topic
.setGroupId(groupId)   //指定groupID
.setStartingOffsets(OffsetsInitializer.latest())    //指定消费数据起始的位置
.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器
.build();
//kafkaSource能够通过指定不同策略的偏移量
//1、OffsetsInitializer.latest():一定从最早的位置开始消费
//2、OffsetsInitializer.latest():一定从最新的位置开始消费
//3、OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//4、OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//5、OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
  • 基于flink基本算子对数据进行加工

map算子:对数据流一对一的加载计算,并返回一个新的对象

sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":1,"value":3,"ts":1734832965640,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":3,"value":10,"ts":1734832967645,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":5,"value":2,"ts":1734832969653,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":7,"value":6,"ts":1734832971657,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
//{"id":9,"value":6,"ts":1734832973662,"source":"flink"}

filter算子:对数据流进行过滤,只返回为true的数据

sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
}).print();
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}

flink将处理之后的数据再次写到kafka中,实现数据的流动

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
processResult.sinkTo(sink);
  • kafka消费者订阅对应的topic

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "study02-ubuntu:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "iot1");
//        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String,Object> kafkaConsumer = new KafkaConsumer<>(properties);
TopicPartition p0 = new TopicPartition(topic, 0);
TopicPartition p1 = new TopicPartition(topic, 1);
kafkaConsumer.assign(Arrays.asList(p0,p1));
while (true) {
ConsumerRecords<String,Object> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
//todo 处理消息
System.out.println(record.value());
}
}
//output
//{"id":0,"value":5,"ts":1734832964534,"source":"flink"}
//{"id":2,"value":7,"ts":1734832966643,"source":"flink"}
//{"id":4,"value":2,"ts":1734832968648,"source":"flink"}
//{"id":6,"value":1,"ts":1734832970654,"source":"flink"}
//{"id":8,"value":6,"ts":1734832972660,"source":"flink"}
flink接收kafka数据通过算子计算之后再次转发到kafka中完整代码示例:
package com.yanboot.flink.connector;

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KafkaStreamDataProcess {
private final static String kafka_server = "study02-ubuntu:9092";
private final static String pub_topic = "sunlei";
private final static String sub_topic = "sub_sunlei";
private final static String groupId = "kafka-demo";

public static void main(String[] args) throws Exception {
//设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//指定并行度
env.setParallelism(1);
//构建kafkaSource
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_server) //指定kafka服务
.setTopics(pub_topic)    //指定topic
.setGroupId(groupId)   //指定groupID
//OffsetsInitializer.latest():一定从最早的位置开始消费
//OffsetsInitializer.latest():一定从最新的位置开始消费
//OffsetsInitializer.timestamp(long timestamp):从指定时间开始消费
//OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST):获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会从起始位置开始读取
//OffsetsInitializer.committedOffsets():获取每个分区已提交的偏移量,从提交的偏移量开始消费,如果没有已提交的偏移量会抛出异常
.setStartingOffsets(OffsetsInitializer.latest())    //指定offset的位置
.setValueOnlyDeserializer(new SimpleStringSchema())     //指定反序列化器
.build();
DataStreamSource<String> sou = env.fromSource(kafkaSource, //指定数据源
WatermarkStrategy.noWatermarks(), //指定水位线
"flink kafka source");
SingleOutputStreamOperator<String> processResult = sou.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(s);
jsonObject.put("source", "flink");
return jsonObject.toString();
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer id = jsonObject.getInteger("id");
return id % 2 == 0;
}
});
processResult.print();

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(kafka_server)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(sub_topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

processResult.sinkTo(sink);

//启动作业
env.execute();

}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2267727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前后端分离(前后端交互步骤)

1.设计数据库 /*Navicat Premium Data Transfer ​Source Server : localhost_3306Source Server Type : MySQLSource Server Version : 80037 (8.0.37)Source Host : localhost:3306Source Schema : studymysql ​Target Server Type : MySQL…

从零开始学AI,完成AI 企业知识库的AI问答搭建

1&#xff1a;本地安装一个ollama玩下&#xff0c;ollama下载模型默认路径为C盘&#xff0c;但该盘空间不足。 解决方案&#xff1a;添加系统环境变量OLLAMA_MODELS&#xff0c;设置其值为新的路径。 2&#xff1a;安装完成后&#xff0c;访问http://127.0.0.1:11434/ 查看服务…

Redis6为什么引入了多线程?

大家好&#xff0c;我是锋哥。今天分享关于【Redis6为什么引入了多线程&#xff1f;】面试题。希望对大家有帮助&#xff1b; Redis6为什么引入了多线程&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Redis 6 引入了多线程的主要目的是提高性能&#…

C# OpenCV机器视觉:产品计数

在一个阳光灿烂得有点 “嚣张” 的早晨&#xff0c;阿强正在实验室里和他那些宝贝仪器们 “眉来眼去”&#xff0c;捣鼓他的最新宝贝项目。突然&#xff0c;实验室的门被 “砰” 地一声撞开&#xff0c;他的好朋友小王像个没头苍蝇似的冲了进来&#xff0c;脸上的焦虑都快溢出来…

若依定时任务

表结构 目录 quartz框架 SysJobServiceImpl实现类 使用切点,在构造器执行的时候执行定时任务的构建(这个类是交给IOC容器的,所以这个时间点就是项目启动的时候)SysJobServiceImpl实现类的init方法创建任务 /*** 创建定时任务*/public static void createScheduleJob(Scheduler …

LeetCode - Google 校招100题 第5天 双指针(Two Pointers) (11题)

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/144742777 LeetCode 合计最常见的 112 题: 校招100题 第1天 链表(List) (19题)校招100题 第2天 树(Tree) (21题)校招100题 第3天 动态规划(DP) (20题)

(icml2024)SLAattention,基于原文时序模型进行改进

#代码&#xff1a; https://github.com/xinghaochen/SLAB #论文&#xff1a;https://arxiv.org/pdf/2405.11582 相关工作 1. 高效Transformer架构 背景&#xff1a; Transformer从最初的自然语言处理扩展到计算机视觉领域&#xff08;例如ViT&#xff09;&#xff0c;但由于…

每日小题打卡

目录 幂次方 手机键盘 简单排序 校庆 性感素数 幂次方 题目描述 对任意正整数 N&#xff0c;计算 X^Nmod233333 的值。 输入格式 共一行&#xff0c;两个整数 X 和 N。 输出格式 共一行&#xff0c;一个整数&#xff0c;表示 X^Nmod233333 的值。 数据范围 1≤…

【Spring】 Bean 注入 HttpServletRequest 能保证线程安全的原理

文章目录 前言1. 图示2. 源码坐标后记 前言 今天看了一段老业务代码&#xff0c;HttpServletRequest 被注入后直接用于业务逻辑。 好奇Spring是如何解决线程安全问题。 Controller public class TestController {ResourceHttpServletRequest request;ResponseBodyGetMapping(…

iOS Masonry对包体积的影响

01 Masonry介绍 Masonry是iOS在控件布局中经常使用的一个轻量级框架&#xff0c;Masonry让NSLayoutConstraint使用起来更为简洁。Masonry简化了NSLayoutConstraint的使用方式&#xff0c;让我们可以以链式的方式为我们的控件指定约束。 常用接口声明与实现&#xff1a; 使用方式…

C 实现植物大战僵尸(二)

C 实现植物大战僵尸&#xff08;二&#xff09; 前文链接&#xff0c;C 实现植物大战僵尸&#xff08;一&#xff09; 五 制作启动菜单 启动菜单函数 void startUI() {IMAGE imageBg, imgMenu1, imgMenu2;loadimage(&imageBg, "res/menu.png");loadimage(&am…

sqlserver镜像设置

本案例是双机热备&#xff0c;只设置主体服务器&#xff08;主&#xff09;和镜像服务器&#xff08;从&#xff09;&#xff0c;不设置见证服务器 设置镜像前先检查是否启用了 主从服务器数据库的 TCP/IP协议 和 RemoteDAC &#xff08;1&#xff09;打开SQL Server配置管理器…

springboot503基于Sringboot+Vue个人驾校预约管理系统(论文+源码)_kaic

摘 要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装个人驾校预约管理系统软件来发挥其高效地信息处理的作用&am…

游戏引擎学习第61天

回顾并计划接下来的事情 我们现在的目标是通过创建一个占位符版本的游戏来展示我们所做的工作。这个版本的游戏包含了许多基本要素&#xff0c;目的是快速构建一些东西&#xff0c;进行测试&#xff0c;并观察代码结构的形成。这些代码的实施是为了理解系统如何工作&#xff0…

探索PyTorch:从入门到实践的demo全解析

探索PyTorch:从入门到实践的demo全解析 一、环境搭建:PyTorch的基石(一)选择你的“利器”:安装方式解析(二)步步为营:详细安装步骤指南二、基础入门demo:点亮第一盏灯(一)张量操作:深度学习的“积木”(二)自动求导:模型学习的“幕后英雄”三、数据处理demo:喂饱…

hiprint结合vue2项目实现静默打印详细使用步骤

代码地址是&#xff1a;vue-plugin-hiprint: hiprint for Vue2/Vue3 ⚡打印、打印设计、可视化设计器、报表设计、元素编辑、可视化打印编辑 本地安装包地址&#xff1a;electron-hiprint 发行版 - Gitee.com 1、先安装hipint安装包在本地 2、项目运行npm&#xff08;socket.…

Docker Container 可观测性最佳实践

Docker Container 介绍 Docker Container&#xff08; Docker 容器&#xff09;是一种轻量级、可移植的、自给自足的软件运行环境&#xff0c;它在 Docker 引擎的宿主机上运行。容器在许多方面类似于虚拟机&#xff0c;但它们更轻量&#xff0c;因为它们不需要模拟整个操作系统…

GXUOJ-算法-第二次作业

1.矩阵连&#xff08;链&#xff09;乘 问题描述 GXUOJ | 矩阵连乘 代码解答 #include<bits/stdc.h> using namespace std;const int N50; int m[N][N]; int p[N]; int n;int main(){cin>>n;//m[i][j] 存储的是从第 i 个矩阵到第 j 个矩阵这一段矩阵链相乘的最小…

OpenCV计算机视觉 02 图片修改 图像运算 边缘填充 阈值处理

目录 图片修改&#xff08;打码、组合、缩放&#xff09; 图像运算 边缘填充 ​阈值处理 上一篇文章&#xff1a; OpenCV计算机视觉 01 图像与视频的读取操作&颜色通道 图片修改&#xff08;打码、组合、缩放&#xff09; # 图片打码 import numpy as np a cv2.imre…

不修改内核镜像的情况下,使用内核模块实现“及时”的调度时间片超时事件上报

一、背景 之前的博客 不修改内核镜像的情况下&#xff0c;使用内核模块实现高效监控调度时延-CSDN博客 里&#xff0c;我们讲了不修改内核镜像高效监控每次的调度时延的方法。这篇博客里&#xff0c;我们对于调度时间片也做这么一个不修改内核镜像的改进。关于调度时间片过长的…