Flink DataSource介绍

news2025/1/11 6:19:20

介绍

Flink的Data Source(数据源、源算子)是Flink作业的起点,它定义了数据输入的来源。Flink可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。以下是对Flink Data Source的详细介绍:

概述:

  • Flink中的Data Source用于定义数据输入的来源。
  • 将数据源添加到Flink执行环境中,可以创建一个数据流。
  • Flink支持多种类型的数据源,包括内置数据源和自定义数据源。

内置数据源:

  • 基于集合构建:使用Flink的API(如fromCollection、fromElements等)将Java或Scala中的集合数据转化为数据流进行处理。
  • 基于文件构建:从文件系统中读取数据,支持多种文件格式,如CSV、JSON等。
  • 基于Socket构建:从Socket连接中读取数据,适用于实时流数据场景。

自定义数据源:

  • Flink允许用户通过实现SourceFunction接口或扩展RichParallelSourceFunction来自定义数据源。
  • 常见的自定义数据源包括从第三方系统连接器(如Kafka、RabbitMQ、MongoDB等)中读取数据。

添加数据源到Flink执行环境:

  • 使用StreamExecutionEnvironment.addSource(sourceFunction)方法将数据源添加到Flink执行环境中。
  • sourceFunction需要实现SourceFunction接口或扩展RichParallelSourceFunction。

数据流处理:

  • 一旦数据源被添加到Flink执行环境中,就可以创建一个数据流(DataStream)。
  • 接下来,可以使用Flink的各种算子(如map、filter、reduce等)对数据流进行转换处理。

输出结果:

  • 处理后的数据可以写入其他系统,如文件系统、数据库、消息队列等。
  • Flink支持多种输出方式,如使用DataStream的writeAsText、writeAsCsv等方法将数据写入文件,或使用Flink的连接器将数据写入Kafka、HBase等系统。

总之,Flink的Data Source是构建Flink数据流处理应用的重要组成部分。通过选择合适的数据源和输出方式,可以方便地构建高效、可靠的数据流处理应用。

样例

程序中添加数据源

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(new SourceMySQL()).print();
        env.execute("Flink add mysql sourc");

Flink 已经提供了若干实现好了的SourceFunctions也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,

stream sources

StreamExecutionEnvironment 加载Source

基于集合:

  1. fromCollection(Collection)
  2. fromCollection(Iterator, Class)
  3. fromElements(T …)
  4. fromParallelCollection(SplittableIterator, Class)
  5. generateSequence(from, to)
    例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
	new Event(1, "ba", 4.0),
	new Event(2, "st", 5.0),
	new Event(3, "fo", 6.0),
	...
);

文件

  1. readTextFile(String filePath)
  2. readTextFile(String filePath, String charsetName)
  3. readFile(FileInputFormat inputFormat, String filePath)
    样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/file");

Socket

  1. socketTextStream(String hostname, int port)
  2. socketTextStream(String hostname, int port, String delimiter)
  3. socketTextStream(String hostname, int port, String delimiter, long maxRetry)
    样例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
        .socketTextStream("localhost", 8888) // 监听 localhost 的 8888端口过来的数据
        .flatMap(new Splitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

自定义资源

通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
继承类

SourceFunction 非并行

SourceFunction 是一个用于定义数据源的函数接口。这个接口的实现通常负责从外部系统(如 Kafka、文件系统、数据库等)读取数据,并将这些数据作为 Flink 流处理或批处理作业的输入。

SourceFunction 通常与 Flink 的 DataStream API 一起使用,以定义和构建数据流处理任务。尽管 Flink 的内部实现和 API 可能会随着版本的更新而有所变化,但通常,一个 SourceFunction 会被实现为:

  1. 创建一个可以持续生成数据或等待外部数据到达的组件。
  2. 当有新数据到达时,将数据作为 Flink 的 DataStream 的一部分进行发射(emit)。

在 Flink 的内部,SourceFunction 可能会有多种不同的实现,具体取决于其要处理的数据源类型。例如,对于 Kafka 这样的消息队列,Flink 提供了专门的 Kafka 连接器和相应的 SourceFunction 实现,用于从 Kafka 主题中读取数据。

在实现自定义的 SourceFunction 时,你需要考虑以下几个方面:

  • 数据源的连接和断开连接逻辑。
  • 数据的读取和解析逻辑。
  • 如何在 Flink 运行时环境中优雅地处理可能的错误和失败。
  • 如何将读取的数据转换为 Flink 可以理解的格式(如 Tuple、POJO 或其他自定义类型)。

ParallelSourceFunction 并行

ParallelSourceFunction 是一个接口,用于定义并行数据源的行为。这个接口允许你创建自定义的数据源,这些数据源能够并行地读取数据并传递给 Flink 的数据处理管道。

ParallelSourceFunction 继承自 SourceFunction,但增加了并行处理的能力。当 Flink 任务需要并行处理多个数据流时,你可以通过实现 ParallelSourceFunction 来创建并行数据源。

Flink 还提供了一个 RichParallelSourceFunction 抽象类,它是 ParallelSourceFunction 的子类,并提供了更多的生命周期方法和上下文信息。使用 RichParallelSourceFunction 可以让你更容易地管理你的并行数据源,因为它提供了诸如 open()、close() 和 cancel() 等方法,这些方法可以在数据源的生命周期中的不同阶段被调用。

下面是一个简单的示例,演示了如何使用 RichParallelSourceFunction 创建一个并行数据源,该数据源生成递增的数字:

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;  
public class IncrementingNumberSource extends RichParallelSourceFunction<Long> {  
    private volatile boolean running = true;  
    private long count = 0L;  
    @Override  
    public void run(SourceContext<Long> ctx) throws Exception {  
        while (running) {  
            synchronized (ctx.getCheckpointLock()) {  
                ctx.collect(count++);  
                // 这里可以添加一些休眠或其他逻辑来控制数据的生成速度  
                Thread.sleep(100);  
            }  
        }  
    }  
    @Override  
    public void cancel() {  
        running = false;  
    }  
}

在这个示例中,IncrementingNumberSource 类继承了 RichParallelSourceFunction,并覆盖了 run() 和 cancel() 方法。在 run() 方法中,我们创建了一个无限循环来生成递增的数字,并使用 ctx.collect() 方法将每个数字发送到 Flink 的数据处理管道中。在 cancel() 方法中,我们设置了一个标志来停止 run() 方法中的循环,以便在需要时可以优雅地关闭数据源。

自定义资源DEMO

/**
 * Desc: 自定义 source mysql 数据
 */
public class SourceMySQL extends RichSourceFunction<Map<String, Object>> {

    PreparedStatement ps;
    private Connection connection;

    /**
     * open 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = MySQLUtil.getConnection("com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");
        String sql = "select * from ST;";
        ps = this.connection.prepareStatement(sql);
    }

    /**
     * 关闭连接和释放资源的动作
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * DataStream 从run方法用来获取数据
     */
    @Override
    public void run(SourceContext<Map<String, Object>> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Map<String, Object> rs = new HashMap<>();
            rs.put("id", resultSet.getInt("id"));
            rs.put("name", resultSet.getString("name").trim());
            ctx.collect(rs);
        }
    }
    @Override
    public void cancel() {
    }
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1654779.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我独自升级崛起怎么玩 我独自升级崛起游玩教程分享

《我独自升级&#xff1a;ARISE》是一款预计在 Android、iOS 和 PC 平台推出的动作 RPG&#xff0c;故事内容基于网络漫画版本改编&#xff0c;讲述世界各地出现「次元传送门」&#xff0c;而少部分人类觉醒了可以对抗传送门中怪物的「猎人」能力&#xff0c;玩家可以在故事模式…

【大模型认识】警惕AI幻觉,利用插件+微调来增强GPT模型

文章目录 一. 大模型的局限1. 大模型不会计算2. 甚至明目张胆的欺骗 二. 使用插件和微调来增强GPT模型1. 模型的局限性2. 插件来增强大模型的能力3. 微调技术-提高特定任务的准确性 一. 大模型的局限 1. 大模型不会计算 LLM根据给定的输入提示词逐个预测下一个词&#xff08;…

【STM32G474】利用Cpp编写STM32代码后,Cubemx修改配置后代码报错147个error,如何处理?

问题描述 打开Cubemx&#xff0c;添加TIM7用于定时器精准延时&#xff0c;生成代码后&#xff0c;Keil提示有147个error。 之前是Cubemx是没有问题的&#xff0c;是利用Cpp编写stm32&#xff08;将Keil改为Version6&#xff09;后才导致Cubemx配置失败&#xff1a; debug成功…

Jmeter用jdbc实现对数据库的操作

我们在用Jmeter进行数据库的操作时需要用到配置组件“JDBC Connection Configuration”&#xff0c;通过配置相应的驱动能够让我们通过Jmeter实现对数据库的增删改查&#xff0c;这里我用的mysql数据库一起来看下是怎么实现的吧。 1.驱动包安装 在安装驱动之前我们要先查看当前…

【面试干货】http请求报文的组成与作用?

【面试干货】http请求报文的组成与作用&#xff1f; 一、http 的请求报文组成二、请求行&#xff08;Request Line&#xff09;三、请求头部&#xff08;Request Headers&#xff09;四、请求体&#xff08;Request Body&#xff09;五、响应头部 &#xff08;Response Headers…

Java | Leetcode Java题解之第59题螺旋矩阵II

题目&#xff1a; 题解&#xff1a; class Solution {public int[][] generateMatrix(int n) {int num 1;int[][] matrix new int[n][n];int left 0, right n - 1, top 0, bottom n - 1;while (left < right && top < bottom) {for (int column left; co…

【Java】IO流:字节流 字符流 缓冲流

接续上文&#xff0c;在这篇文章将继续介绍在Java中关于文件操作的一些内容【Java】文件操作 文章目录 一、“流”的概念1.“流”的分类1.1输入流和输出流1.2字节流和字符流 字节和字符的区别&#xff1f;为什么要有字符流&#xff1f;1.3节点流和处理流 字符流自带缓冲区&…

基士得耶(GESTETNER ) CP 6303C 速印机简介

规格参数 产品名称: 基士得耶&#xff08;GESTETNER &#xff09; CP 6303C 速印机 品牌中文: 基士得耶/GESTETNER 型 号: CP-6303C 工作方式&#xff1a; 数码式 制版方式: 自动印刷 制版时间&#xff1a; 曝光玻璃: 31秒(A4长边…

使用SpringBoot+Redis做一个排行榜【推荐】

SpringBoot Redis实现排行榜 一、Zset有序集合介绍 Zset是一个没有重复元素的字符串集合。不同之处是有序集合的每个成员都关联了一个评分( score) ,这个评分( score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的&#xff0c;但是评分可以是重复了…

MATLAB 基于规则格网的点云抽稀方法(自定义实现)(65)

MATLAB 基于规则格网的点云抽稀方法(自定义实现)(65) 一、算法介绍二、算法实现1.代码2.结果一、算法介绍 海量点云的处理,需要提前进行抽稀预处理,相比MATLAB预先给出的抽稀方法,这里提供一种基于规则格网的自定义抽稀方法,步骤清晰,便于理解抽稀内涵, 主要涉及到使…

【深度学习】网络安全,SQL注入识别,SQL注入检测,基于深度学习的sql注入语句识别,数据集,代码

文章目录 一、 什么是sql注入二、 sql注入的例子三、 深度学习模型3.1. SQL注入识别任务3.2. 使用全连接神经网络来做分类3.3. 使用bert来做sql语句分类 四、 深度学习模型的算法推理和部署五、代码获取 一、 什么是sql注入 SQL注入是一种常见的网络安全漏洞&#xff0c;它允许…

CSS-盒子模型元素溢出

作用&#xff1a;控制溢出的元素的内容的显示方式 属性&#xff1a;overflow 属性值 属性值效果hidden溢出隐藏scroll溢出滚动&#xff08;无论是否溢出&#xff0c;都显示滚动条位置&#xff09;auto溢出滚动&#xff08;溢出才显示滚动条位置&#xff09; <!DOCTYPE html&…

字体设计_西文字体设计(英文字体设计)

一 西文字体设计基础知识 设计目标和历史成因 设计目标&#xff1a;让眼睛看着舒服的字体 那什么样的字体让眼睛看着舒服呢&#xff1f; 让眼睛看着舒服的字体造型其实是我们记忆里的手写体、自然造型。 所以就能理解西文字体为什么同一笔画&#xff0c;有的地方粗有的地方…

国科大深度学习期末历年试卷

本文借鉴 国科大深度学习复习 深度学习期末 深度学习2020 一&#xff0e;名词解释&#xff08;每个2分&#xff0c;共10分&#xff09; 深度学习&#xff0c;稀疏自编码器&#xff0c;正则化&#xff0c;集成学习&#xff0c;Dropout 二&#xff0e;简答题&#xff08;每题…

【汇总】虚拟机网络不通(Xshell无法连接虚拟机)排查方法

搜索关键字关键字关键字&#xff1a;虚拟机虚拟机虚拟机连接失败、虚拟机无法连接、Xshell连接失败、ping baidu.com失败、静态IP设置 Kali、CentOS、远程连接 描述&#xff1a;物理机无法连接虚拟机&#xff1b;虚拟机无法访问百度&#xff0c;虚拟机无法访问baidu.com 虚拟机…

五月加仓比特币

作者&#xff1a;Arthur Hayes Co-Founder of 100x. 编译&#xff1a;Liam 编者注&#xff1a;本文略有删减 (以下内容仅代表作者个人观点&#xff0c;不应作为投资决策的依据&#xff0c;也不应被视为参与投资交易的建议或意见&#xff09;。 从四月中旬到现在&#xff0c;当你…

5月8日学习记录

_[FBCTF2019]RCEService&#xff08;preg_match函数的绕过&#xff09; 涉及知识点&#xff1a;preg_match函数绕过&#xff0c;json的格式&#xff0c;正则回溯 打开环境&#xff0c;要求用json的格式输入 搜索学习一下json的语法规则 数组&#xff08;Array&#xff09;用方括…

抓取Google时被屏蔽怎么办?如何避免?

在当今数字化时代&#xff0c;数据采集和网络爬取已成为许多企业和个人必不可少的业务活动。对于爬取搜索引擎数据&#xff0c;特别是Google&#xff0c;使用代理IP是常见的手段。然而&#xff0c;使用代理抓取Google并不是一件轻松的事情&#xff0c;有许多常见的误区可能会导…

VisualGLM-6B微调(V100)

Visualglm-6b-CSDN博客文章浏览阅读1.3k次。【官方教程】XrayGLM微调实践&#xff0c;&#xff08;加强后的GPT-3.5&#xff09;能力媲美4.0&#xff0c;无次数限制。_visualglm-6bhttps://blog.csdn.net/u012193416/article/details/131074962?ops_request_misc%257B%2522req…

一键自动化博客发布工具,用过的人都说好(阿里云篇)

阿里云有个开发者社区&#xff0c;入驻过的朋友可能想要把自己的博客发布到阿里云社区上。 今天我来介绍一下blog-auto-publishing-tools自动发布博客到阿里云的实现原理。 阿里云的博客发布界面比较简单&#xff0c;只有标题&#xff0c;正文&#xff0c;摘要&#xff0c;关…