【极数系列】Flink集成DataSource读取集合数据(07)

news2025/1/12 13:16:45

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于集合读取数据
    • 3.1 集合创建数据流
    • 3.2 迭代器创建数据流
    • 3.3 给定对象创建数据流
    • 3.4 迭代并行器创建数据流
    • 3.5 基于时间间隔创建数据流
    • 3.6 自定义数据流
  • 04 源码实战demo
    • 4.1 pom.xml依赖
    • 4.2 创建集合数据流作业
    • 4.3 运行结果日志

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkListSourceJob(集合)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。

2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。

3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于集合读取数据

3.1 集合创建数据流

fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型

3.2 迭代器创建数据流

fromCollection(Iterator, Class) 
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

3.3 给定对象创建数据流

fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

3.4 迭代并行器创建数据流

注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、

错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable

fromParallelCollection(SplittableIterator, Class) 
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型

3.5 基于时间间隔创建数据流

generateSequence 
基于给定间隔内的数字序列并行生成数据流。

3.6 自定义数据流

addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

04 源码实战demo

4.1 pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.2 创建集合数据流作业

注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可

package com.aurora.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.util.*;

/**
 * @description flink的list集合source应用
 * @author 浅夏的猫
 * @datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {

    private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class);

    public static void main(String[] args) throws Exception {

        //1.创建Flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置Flink运行模式:
        //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        List<String> list = Arrays.asList("测试", "开发", "运维");

        // 01 从集合创建数据流
        DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list);

        // 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
//        DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class);

        // 03 从给定的对象序列中创建数据流
        DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维");

        // 04 从迭代器并行创建数据流
        NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10);
        DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE);

        // 05 基于给定间隔内的数字序列并行生成数据流
        DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10);

        //自定义数据流
        DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> sourceContext) throws Exception {
                //自定义你自己的数据来源
                for (int i = 0; i < 10; i++) {
                    sourceContext.collect("测试数据" + i);
                }
            }

            @Override
            public void cancel() {

            }
        });

        //5.输出打印
        dataStreamSource_01.print();
//        dataStreamSource_02.print();
        dataStreamSource_03.print();
        dataStreamSource_04.print();
        dataStreamSource_05.print();
        dataStreamSource_06.print();

        //6.启动运行
        env.execute();
    }

}


4.3 运行结果日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用ASM HEMT模型提取GaN器件的参数

标题&#xff1a;Physics-Based Multi-Bias RF Large-Signal GaNHEMT Modeling and Parameter Extraction Flow (JEDS 17年) 模型描述 该模型的核心是对表面势&#xff08;ψ&#xff09;及其随施加的栅极电压&#xff08;Vg&#xff09;和漏极电压&#xff08;Vd&#xff09…

【数据结构1-3】集合

有时候&#xff0c;我们并不关心数据之间的前后关系&#xff0c;也不关心数据的层次关系。一些确定元素只是单纯的聚集在一起&#xff0c;这样的元素聚集体被称为集合。 当希望知道某个数据是否存在一个集合中&#xff0c;或者两个元素是否在同一个集合中时&#xff0c;就需要使…

JVM系列——对象管理

JVM对象分布 对象头 第一类是用于存储对象自身的运行时数据&#xff0c;如哈希码&#xff08;HashCode&#xff09;、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等 另外一部分是类型指针&#xff0c;即对象指向它的类型元数据的指针&#xff0c;Java 虚…

敲黑板啦!CSGO游戏搬砖项目操作注意事项

CSGO游戏搬砖项目怎么赚钱的&#xff0c;利润在哪&#xff1f; 1.两个平台之间币种不一样&#xff0c;就存在一个汇率差&#xff0c;两平台装备价格也不一样&#xff0c;汇率差-价格差利润。 CSGO游戏搬砖项目具体有哪些操作步骤&#xff1f; 1、准备一台电脑&#xff0c;配置…

Git学习,基础,安装,配置,笔记总结

Git安装与常用命令 本教程里的git命令例子都是在Git Bash中演示的,会用到一些基本的linux命令,在此为大家提前列举: ls/ll 查看当前目录 cat 查看文件内容 touch 创建文件 vi vi编辑器(使用vi编辑器是为了方便展示效果,学员可以记事本、editPlus、notPad++等其它编 辑…

C#使用OpenCvSharp4库读取电脑摄像头数据并实时显示

一、OpenCvSharp4库 OpenCvSharp4库是一个基于.Net封装的OpenCV库&#xff0c;Github源代码地址为&#xff1a;https://github.com/shimat/opencvsharp&#xff0c;里面有关于Windows下安装OpenCvSharp4库的描述&#xff0c;如下图所示&#xff1a; 二、C#使用OpenCvSharp4库…

React 组件生命周期-概述、生命周期钩子函数 - 挂载时、生命周期钩子函数 - 更新时、生命周期钩子函数 - 卸载时

React 组件生命周期-概述 学习目标&#xff1a; 能够说出组件的生命周期一共几个阶段 组件的生命周期是指组件从被创建到挂在到页面中运行&#xff0c;在到组件不用时卸载组件 注意&#xff1a;只有类组件才有生命周期&#xff0c;函数组件没有生命周期(类组件需要实例化&…

监听项目中指定属性数据,点击或模块显示时

当项目中&#xff0c;需要获取某个页面上、某个标签上、有指定自定义属性时&#xff0c;需要在点击该元素时进行公共逻辑处理&#xff0c;或该元素在显示的时候进行逻辑处理&#xff0c;这时可以定义一个公共的方法&#xff0c;在每个页面引用&#xff0c;并写入数据即可 &…

Win11系统连接带HDMI接口的显示器后,电脑没有声音如何调试

解决这个问题的方法很简单&#xff0c;没有那么复杂。之所以使用HDMI接口连接了显示器后没声音&#xff0c;原因就是HDMI接口是包含音频视频两种信号的接口。当电脑的HDMI接口被使用时&#xff0c;系统就会默认从HDMI设备输出声音信号了&#xff0c;而此时如果HDMI设备没有声音…

基本控件(二)QMainWindow主窗口相关 以及 手写控件的方法 (按F2)

探究过程 先创建个QMainWindow项目。 鼠标点击选中QMainWindow之后按F2&#xff0c;就会进入其最一开始的定义的头文件中。 来到qmainwindow.h头文件中&#xff0c;可以清楚看到这些继承关系&#xff1a; 同样的操作&#xff0c;来到QWidget的定义之处&#xff1a; UI设计界面…

操作符详解(下)

目录 下标访问[ ]、函数调用() [ ] 下标引用操作符 函数调用操作符 结构成员访问操作符 结构体成员的直接访问 操作符的属性&#xff1a;优先级、结合性 优先级 结合性 整型提升 算术转换 下标访问[ ]、函数调用() [ ] 下标引用操作符 操作数&#xff1a;一个数组…

Matlab处理excel数据

我们新建个excel文档&#xff0c;用Matlab读取里面的内容&#xff0c;计算和判断里面的计算结果是否正确&#xff0c;并打印到另一个文档当中。 新建文档 新建输入文档&#xff0c;文件名TestExcel 编写脚本 [num,txt] xlsread(TestExcel.xlsx); SNcode num(:,1);%从序号中…

github连不上

github连不上 错误提示解决方案steam 采用Hosts加速 错误提示 fatal: unable to access ‘https://github.com/Ada-design/qianduan.git/’: Failed to connect to github.com port 443 after 21073 ms: Couldn’t connect to server 解决方案 下载steam https://steampp.ne…

爬虫基础-前端基础

Html是骨骼、css是皮肤、js是肌肉&#xff0c;三者之间的关系可以简单理解为m(html)-v(css)-c(js) 浏览器的加载过程 构建dom树 子资源加载-加载外部的css、图片、js等外部资源 样式渲染-css执行 DOM树 ajax、json、xml AJAX 是一种在无需重新加载整个网页的情况下&#xf…

【CANoe使用大全】——Logging窗口

&#x1f64b;‍♂️【CANoe使用大全】系列&#x1f481;‍♂️点击跳转 文章目录 1.概述2.Logging窗口打开方式3.创建Logging4.配置4.1. 命名4.2.格式选择4.3. 路径选择与命名4.3.1.Logging文件命名_自定义4.3.2.Logging文件命名_系统内选择 5.Logging触发方式5.1 Logging模块…

MATLAB - 仿真单摆的周期性摆动

系列文章目录 前言 本例演示如何使用 Symbolic Math Toolbox™ 模拟单摆的运动。推导摆的运动方程&#xff0c;然后对小角度进行分析求解&#xff0c;对任意角度进行数值求解。 一、步骤 1&#xff1a;推导运动方程 摆是一个遵循微分方程的简单机械系统。摆最初静止在垂直位置…

怎样做好Code Review

Code Review方案 定义 Code Review代码评审是指在软件开发过程中&#xff0c;通过对源代码进行系统性检查的过程。通常的目的是查找各种缺陷&#xff0c;包括代码缺陷、功能实现问题、编码合理性、性能优化等&#xff1b;保证软件总体质量和提高开发者自身水平 code review …

【C语言刷题系列】计算整数的二进制位中1的个数 (三种方式)

文章目录 一、文章简介 1.取模 配合 整除 的方式 2.按位与 配合 右移 的方式 3.按位与 的方式 一、文章简介 本文所属专栏C语言刷题_倔强的石头106的博客-CSDN博客 注&#xff1a;如果没有特别说明&#xff0c;本文所提及的整数为有符号整型&#xff0c;即 int 类型 本文…

452. 用最少数量的箭引爆气球 - 力扣(LeetCode)

题目描述 有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面上的气球记录在整数数组 points &#xff0c;其中points[i] [xstart, xend] 表示水平直径在 xstart 和 xend之间的气球。你不知道气球的确切 y 坐标。 一支弓箭可以沿着 x 轴从不同点 完全垂直 地射出。在坐标 …

JVM 笔记

JVM HotSpot Java二进制字节码的运行环境 好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;具有垃圾回收功能数组下标越界检查多态&#xff08;虚方法表&#xff09; JVM组成 类加载子系统&#xff08;Java代码转换为字节码&#xff09;运行时数据…