Flink--Data Source 介绍

news2025/1/9 16:54:42

Data Source 简介

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

        Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

        Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink Data Source分类

Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:

集合数据源

        集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。

1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import java.util.Arrays;
import java.util.List;

public class CollectionDataSourceExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 创建一个包含整数的集合
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

        // 将集合转化为Flink的DataSet
        DataSet<Integer> dataset = env.fromCollection(data);

        // 打印数据集中的元素
        dataset.print();
    }
}

关于使用集合数据源的注意事项:

  1. 数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。

  2. 内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。

  4. 调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。

使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。

文件数据源

        文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。

1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

public class FileDataSourceExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从文件创建数据集
        String filePath = "path/to/your/file.txt";
        DataSet<String> text = env.readTextFile(filePath);

        // 打印文件中的内容
        text.print();
    }
}

关于使用文件数据源的注意事项:

  1. 文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。

  2. 文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。

  4. 文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。

  5. 文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。

使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。

Socket数据源

        Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SocketDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从socket创建数据流
        String hostname = "localhost";
        int port = 9999;
        env.socketTextStream(hostname, port)
           .print();

        // 执行作业
        env.execute("Socket Data Source Example");
    }
}

关于使用Socket数据源的注意事项:

  1. 主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。

  2. 网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。

  5. 容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。

自定义数据源

        自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class CustomDataSource extends RichParallelSourceFunction<String> {
    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            // 生成数据
            String data = generateData();
            // 发射数据
            ctx.collect(data);
            // 控制数据生成频率
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    private String generateData() {
        // 实现自定义的数据生成逻辑
        return "some data";
    }
}

        在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。

关于使用自定义数据源的注意事项:

  1. 并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。

  2. 数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。

  3. 容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。

  4. 数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。

  5. 资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。

使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。

Apache Kafka数据源

        Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-consumer-group");

        // 创建Kafka数据流
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
        DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);

        kafkaDataStream.print();

        // 执行作业
        env.execute("Kafka Data Source Example");
    }
}

在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Kafka数据源的注意事项:

  1. Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。

  2. 序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。

Apache Pulsar数据源

        Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;

public class PulsarDataSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String serviceUrl = "pulsar://localhost:6650";
        String topic = "my-topic";

        FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(
                serviceUrl,
                topic,
                Schema.STRING
        );

        DataStream<String> pulsarDataStream = env.addSource(pulsarSource);

        pulsarDataStream.print();

        env.execute("Pulsar Data Source Example");
    }
}

        在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Pulsar数据源的注意事项:

  1. Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。

  2. Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

        使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。

        这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。

更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1184665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端面试题之Javascript篇

一、JavaScript基础 1、数组有哪些方法 添加/删除元素 push() 向尾部添加元素pop() 从尾部提取一个元素shift() 从首端提取元素unshift() 从首端添加元素splice(start, deleteCount, item1...itemN) start表示开始计算的索引&#xff0c;deleteCount表示从start开始计算的元…

PTA_乙级_1006

思路&#xff1a;把数字的每一位都取出来&#xff0c;然后for循环把字符输入 #include <iostream> #include <string> using namespace std;int main() {int n;cin >> n;int b n / 100; // 计算百位数int s (n % 100) / 10; // 计算十位数int g n…

大学智能电表改造解决方案

随着科技的不断发展&#xff0c;我国高等教育院校在基础设施方面也在不断进行智能化升级。电力系统作为大学校园的重要组成部分&#xff0c;对其进行智能化改造已成为当前高校建设的热门话题。本文将详细介绍大学智能电表改造解决方案&#xff0c;以期为我国高校的电力系统智能…

Unity Input System最简单使用

开始学的是 Input Manager 比较好理解&#xff0c;Input System却不好理解&#xff0c;教程也找了很多&#xff0c;感觉都讲的不清楚&#xff0c;我这里做一个最简单的用 Input System 添加鼠标左键和右键的效果。 1. 安装 Input System 包 首先这个功能不是内置的&#xff0…

Linux内核有什么之内存管理子系统有什么第二回 —— 单刀直入

接前一篇文章&#xff1a;Linux内核有什么之内存管理子系统有什么第一回 —— 引言 一、单刀直入 —— 一切从malloc开始 想必大家都使用过malloc()&#xff0c;malloc的全称是memory allocation&#xff0c;中文叫做动态内存分配。其用于申请一块连续的指定大小的内存块区域以…

公众号生成链接

登录公众号 后点击左侧“设置与开发” 下 公众号设置&#xff0c;点击账号详情&#xff0c;然后查看源代码&#xff0c;搜索uin_base64 找到参数&#xff0c;然后设置到地址_biz上。在微信中打开就可以链接到微信公众号。 如下&#xff1a;

CSS 网页布局

网页布局有很多种方式&#xff0c;一般分为以下几个部分&#xff1a;头部区域、菜单导航区域、内容区域、底部区域&#xff1a; 1&#xff09;、头部区域位于整个网页的顶部&#xff0c;一般用于设置网页的标题或者网页的logo。 <style> body { margin: 0; } /* 头部样…

香港优才计划diy申请失败怎么回事?怎么办?那些后悔莫及的事儿!

香港优才计划diy申请失败怎么回事&#xff1f;怎么办&#xff1f;那些后悔莫及的事儿&#xff01; 今年香港优才计划申请由于政策放开&#xff0c;申请人数确实暴涨&#xff01;不过近期收获到不少客户申请被拒的倾诉&#xff0c;我能感受到他们对申请失败的失落心情&#xff0…

PODsys:大模型AI算力平台部署的开源“神器”

大模型是通用人工智能的底座&#xff0c;但大模型训练对算力平台的依赖非常大。大模型算力平台是指支撑大模型训练和推理部署的算力基础设施&#xff0c;包括业界最新的加速卡、高速互联网络、高性能分布式存储系统、液冷系统和高效易用的大模型研发工具和框架。在算力平台的部…

【k8s】pod控制器

一、pod控制器及其功用 Pod是kubernetes的最小管理单元&#xff0c;在kubernetes中&#xff0c;按照Pod的创建方式可以将其分为两类 自主式Pod&#xff1a; kubernetes直接创建出来的Pod&#xff0c;这种Pod删除后就没有了&#xff0c;也不会重建 控制器创建的Pod&#xff1a…

物联网水表有什么弊端吗?

物联网水表作为新一代智能水表&#xff0c;虽然在很大程度上提高了水资源的管理效率&#xff0c;但也存在一定的弊端。在这篇文章中&#xff0c;我们将详细讨论物联网水表的弊端&#xff0c;以帮助大家更全面地了解这一技术。 一、安全隐患 1.数据泄露&#xff1a;物联网水表通…

vue+java实现语音转文字思路

思路&#xff1a; 前端录音生成wav文件后端去解析 技术&#xff1a; 后端&#xff1a; Vosk是一个离线开源语音识别工具。它可以识别16种语言&#xff0c;包括中文。 API接口&#xff0c;让您可以只用几行代码&#xff0c;即可迅速免费调用、体验功能。 目前支持 WAV声音文件…

MobaXterm配置SSHTunnel

本地与远程服务器之间存在防火墙&#xff0c;防火墙只允许SSH端口通过&#xff0c;为访问远程服务器&#xff0c;我们可以借助MobaXterm来与SSH服务器建立隧道&#xff0c;使得防火墙外的用户能够访问远程服务器 配置 打开SSHTunnel 新建SSH tunnel 点击开启就生效了&…

数据链路相关技术

文章目录 数据链路相关技术MAC地址半双工通信与全双工通信共享介质型网络非共享介质网络根据MAC地址转发以太网无线通信 数据链路相关技术 MAC地址 MAC地址用于识别数据链路中互连的节点&#xff0c;以太网或FDDI中&#xff0c;根据IEEE802.3的规范使用MAC地址。其中IEEE指的…

详解交叉验证中【KFold】【Stratified-KFold】【StratifiedShuffleSplit】的区别

交叉验证是一种统计分析方法&#xff0c;它的目的是通过在同一数据集上重复并分割训练和测试数据&#xff0c;来评估机器学习模型的性能。以下是​这三种交叉验证方法的区别&#xff1a; KFold&#xff08;K-折叠&#xff09; 在KFold交叉验证中&#xff0c;原始数据集被分为K个…

思杰Citrix将全面退出中国市场,是真的吗?

引言&#xff1a;国内虚拟化市场依然有潜力&#xff0c;转换思路继续开发&#xff0c;这个可能性最大。 【科技明说 &#xff5c; 热点关注】 业内讨论说&#xff0c;虚拟化大佬思杰Citrix将全面退出国内市场&#xff0c;不知道消息是否属实&#xff1f; 另外假如消息属实的话…

串口调试助手和网络调试助手使用总结

串口调试助手和网络调试助手是用的比较多的两款工具。 先来看看串口调试助手。 本人用的比较多的串口助手是正点原子的XCOM以及大虾丁丁的SSCOM 首先&#xff0c;解决下串口收发时的统计问题。 注意&#xff1a;这里统计的单位是字节。 串口工具发送时&#xff0c;就只统计你…

05-MySQL-进阶-存储引擎索引SQL优化

一、存储引擎 涉及资料 链接&#xff1a;https://pan.baidu.com/s/1M1oXN_pH3RGADx90ZFbfLQ?pwdCoke 提取码&#xff1a;Coke ①&#xff1a;MySQL体系结构 1.连接层 最上层是一些客户端和链接服务&#xff0c;包含本地sock 通信和大多数基于客户端/服务端工具实现的类似于 T…

掌握未来:PureBasic for Mac引领BASIC语言编辑器的新潮流

PureBasic for Mac是一种创新的BASIC语言编辑器&#xff0c;它赋予了编程更多的可能性。在这个充满机遇的时代&#xff0c;掌握编程就等于掌握了一种强大的工具&#xff0c;能够更好地理解和塑造世界。而PureBasic for Mac&#xff0c;正是这样一个让你轻松上手&#xff0c;高效…

超图Web许可无法访问

1. 报错 docker 容器(7f6f88XXXXX)找不到许可&#xff0c;查看日志&#xff0c;发现报错日志 2. 原因&#xff1a; 查看管理页面&#xff0c;发现许可被172.17.0.8占用 根据容器id寻找容器&#xff0c;找不到&#xff0c;猜测可能是以前删除过的容器&#xff0c;占用了名额 解决…