Flink 定时加载数据源

news2024/11/26 0:27:55

一、简介

flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单

如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现

二、pom.xml 文件

注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
在这里插入图片描述

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ye</groupId>
    <artifactId>flink-study</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.0</flink.version>
        <!--<flink.version>1.12.7</flink.version>-->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
<!--
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ye.DataStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

三、自定义数据源

使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串

public class TimerSinkRich extends RichSourceFunction<String> {

    private  ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    private boolean flag = true;
    private Timer timer;
    private TimerTask timerTask;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        timerTask = new TimerTask() {
            @Override
            public void run() {
            // 可以在这块获取 MySQL、redis 等连接并查询数据
                Random random = new Random();
                StringBuilder str = new StringBuilder();
                for (int i = 0; i < 10; i++) {
                    char ranLowLetter = (char) ((random.nextInt(26) + 97));
                    str.append(ranLowLetter);
                }
                queue.add(str.toString());
            }
        };
        timer = new Timer();
        // 延时和执行周期参数可以通过构造方法传递
        timer.schedule(timerTask,1000,5000);
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (flag){
            if(queue.size()>0){
                ctx.collect(queue.remove());
            }
        }
    }


    @Override
    public void cancel() {
        if(null!=timer) timer.cancel();
        if(null!=timerTask) timerTask.cancel();
        // 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败
        if(queue.size()<=0) flag = false;
    }
}

四、flink 加载数据源并启动

public class TimerSinkStreamJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());
        streamSource.print();
        executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");
    }
}

本地测试成功

在这里插入图片描述

五、上传 flink 集群

1、flink 1.16.0

启动成功
在这里插入图片描述
撤销任务成功
在这里插入图片描述
solt 也成功释放
在这里插入图片描述

2、flink 1.12.7

启动成功
在这里插入图片描述
撤销任务当然也没问题,同样能正常释放 slot
在这里插入图片描述

当然你也可以不要 open() 方法

public class DiySinkRich extends RichSourceFunction<String> {

    private TimerTask timerTask;
    private Timer timer;
    private boolean flag = true;
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    @Override
    public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
        timerTask = new TimerTask() {
            @Override
            public void run() {
                Random random = new Random();
                StringBuilder str = new StringBuilder();
                for (int i = 0; i < 10; i++) {
                    char ranLowLetter = (char) ((random.nextInt(26) + 97));
                    str.append(ranLowLetter);
                }
                queue.add(str.toString());
            }
        };
        timer = new Timer();
        timer.schedule(timerTask, 1000, 5000);
        while (flag) {
            if (queue.size() > 0) {
                ctx.collect(queue.remove());
            }
        }
    }


    @Override
    public void cancel() {
        if (timer != null) timer.cancel();
        if (timerTask != null) timerTask.cancel();
        if (queue.size() == 0) flag = false;
    }
}

以上就是 flink 定时加载数据源的简单实例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403223.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

三、HTTP协议之三

文章目录一、HTTPS协议概述二、 HTTPS使用成本三、从HTTP到HTTPS四、HTTP协议的瓶颈五、双工通信的websocket六、HTTP2.0一、HTTPS协议概述 二、 HTTPS使用成本 HTTPS对性能的影响 https之所有安全是因为使用TLS(SSL)来加密传输. 三、从HTTP到HTTPS 了解个大概 第一步&#x…

react:二、jsx语法规则

目录 1.传输数据的xml和json格式举例 2.jsx语法规则 3.js语句跟js表达式的区别 4.jsx的小练习 1.传输数据的xml和json格式举例 2.jsx语法规则 1.定义虚拟DOM时&#xff0c;不要写引号。 2.标签中混入JS表达式时要用{}。 3.样式的类名指定不要用class&#xff0c;要用cla…

第十一章 寡头垄断市场中的企业决策

寡头垄断市场的定义、条件 寡头垄断市场&#xff1a;少数几家企业控制了某一行业的市场&#xff0c;供给该行业生产的大部分产品 寡头垄断市场应具备的条件&#xff1a; 一个行业或市场中&#xff0c;只有少数几家企业企业之间存在着相互制约、相互依存的关系新企业进入行业比…

golang大杀器GMP模型

golang 大杀器——GMP模型 文章目录golang 大杀器——GMP模型1. 发展过程2. GMP模型设计思想2.1 GMP模型2.2 调度器的设计策略2.2.1 复用线程2.2.2 利用并行2.2.3 抢占策略2.2.4 全局G队列2.3 go func()经历了那些过程2.4 调度器的生命周期2.5 可视化的CMP编程2.5.1 trace方式2…

【LeetCode】33. 搜索旋转排序数组、1290. 二进制链表转整数

作者&#xff1a;小卢 专栏&#xff1a;《Leetcode》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 33. 搜索旋转排序数组 1290. 二进制链表转整数 33. 搜索旋转排序数组 33. 搜索旋转排序…

JavaEE简单示例——Bean的实例化

简单介绍&#xff1a; 在我们之前使用某个对象&#xff0c;那么就要创建这个类的对象&#xff0c;创建对象的过程就叫做实例化。对于Spring来说&#xff0c;实例化Bean的方式有三种&#xff0c;分别是构造方法实例化&#xff0c;静态方法实例化&#xff0c;实例工厂实例化。我…

哪款手推式洗地机好用?2023洗地机推荐

虽然现在市面上的洗地机层出不穷&#xff0c;但是无论洗地机怎么变&#xff0c;关于洗地机的选择看准吸力、除菌、续航、清洁力这几点就够了。因此&#xff0c;一款好用的洗地机必须要拥有良好的清洁力和续航时间&#xff0c;最好还拥有除菌等细节功能。那么下面就让我们一起来…

【Linux】文件系统详解

&#x1f60a;&#x1f60a;作者简介&#x1f60a;&#x1f60a; &#xff1a; 大家好&#xff0c;我是南瓜籽&#xff0c;一个在校大二学生&#xff0c;我将会持续分享C/C相关知识。 &#x1f389;&#x1f389;个人主页&#x1f389;&#x1f389; &#xff1a; 南瓜籽的主页…

Unity脚本复习

1.在Project面板中显示和创建的每一个脚本其实都是一个类&#xff0c;当我们把脚本挂载到Hierarchy层级中的游戏物体时&#xff0c;其实我们就实现了将脚本类实例化为一个脚本组件&#xff08;对象&#xff09;的过程 2.在游戏运行时&#xff0c;场景加载&#xff0c;游戏对象…

云边端协同时序数据库的挑战与解决方案

现今&#xff0c;时序数据库在经济金融、环境监控、医疗生物等多个领域有着极为广泛的需求。其中&#xff0c;在环境监控等领域&#xff0c;时序数据库主要部署在云边端架构中。但如何实现云边端协同是目前TSDB所面临的巨大挑战。由于云、边和端的计算、存储资源状况和对数据管…

【LeetCode】剑指 Offer(21)

目录 题目&#xff1a;剑指 Offer 39. 数组中出现次数超过一半的数字 - 力扣&#xff08;Leetcode&#xff09; 题目的接口&#xff1a; 解题思路&#xff1a; 代码&#xff1a; 过啦&#xff01;&#xff01;&#xff01; 题目&#xff1a;剑指 Offer 40. 最小的k个数 -…

论文阅读和分析:A Tree-Structured Decoder for Image-to-Markup Generation

目录1.主要内容&#xff1a;2.树解码器3、损失函数4、结论&#xff1a;参考&#xff1a;1.主要内容&#xff1a; &#xff08;1、提出创新的树结构解码器来表示树、输出树、优化基于注意力的编解码框架&#xff1b; &#xff08;2、设计一个问题说明特别是在复杂结构时字符解…

AidLux AI 应用案例悬赏征集活动正式启动!

ChatGPT爆火之后&#xff0c;AI领域的人才需求迎来了疯狂增长&#xff0c;AI学习也一跃成为业界大热门。 但AI囊括知识广、学习周期长&#xff0c;要克服理论、实战等多重阻碍并不容易。 而持续降低AI学习门槛是我们一直在做的事情。 为此&#xff0c;我们举办了多期AidLux …

R语言基础(五):流程控制语句

R语言基础(一)&#xff1a;注释、变量 R语言基础(二)&#xff1a;常用函数 R语言基础(三)&#xff1a;运算 R语言基础(四)&#xff1a;数据类型 6.流程控制语句 和大多数编程语言一样&#xff0c;R语言支持选择结构和循环结构。 6.1 选择语句 选择语句是当条件满足的时候才执行…

【麒麟服务器操作系统忘记开机密码怎么办?---银河麒麟服务器操作系统更改用户密码】

银河麒麟服务器操作系统更改用户密码 1.启动主机进入 grub 菜单&#xff0c;如图 1.1 以最新版本 Kylin-Server-10-SP2-x86-Release-Build09-20210524 为例。 图 1.1 grub 菜单 2 编辑 kernel 2.1按下”e”输入&#xff0c;输入用户名和密码&#xff08;root/Kylin123123&…

【数据结构初阶】由浅入深学习链表

目录 前言 链表的概念及结构 链表的分类 单链表的实现 接口实现 1.结构体 2.创建一个新结点 3.打印链表数据 4.尾插数据 5.尾删数据 6.头插数据 7.头删数据 8.任意位置删除 9.查找位置 10.pos之前插入 11.pos之后插入 12.释放内存 完整源码 总结 前言 在我们…

Java Web 实战 07 - 多线程基础之单例模式

大家好 , 这篇文章给大家带来的是单例模式 , 单例模式中分为懒汉模式和饿汉模式 , 懒汉模式是需要用的到的时候才去创建实例 , 而饿汉模式是程序一启动就立刻创建实例 , 在这其中还有很多其他问题需要我们去研究 推荐大家跳转到这里 , 观看效果更加 上一篇文章的链接我也贴在这…

1641_strchr函数的功能分析以及peek功能实现分析

全部学习汇总&#xff1a; GreyZhang/g_unix: some basic learning about unix operating system. (github.com) 继续分析shell例程代码&#xff0c;再次遇到了一个陌生的库函数strchr。 1. 从这里看&#xff0c;这个是一个库函数无疑了。 2. 这个函数&#xff0c;或者说这三个…

2个步骤就能批量给视频添加滚动字幕

现在很多小伙伴在剪辑视频的时候都会给自己的视频添加适配的字幕&#xff0c;但是有很多的视频想要添加一样的滚动字幕时&#xff0c;有一个能批量添加剪辑的工具非常重要&#xff0c;今天小编就给大家分享一个可以批量剪辑大量视频的工具&#xff0c;下面一起看看具体的操作步…

超导百年:物理学“圣杯”是如何诞生的?

最近科技圈流传的大新闻&#xff0c;大家都知道了吧&#xff1f;简单来说&#xff0c;美国物理学会的三月会议上&#xff0c;来自罗彻斯特大学的Ranga Dias宣布&#xff0c;他们团队在近环境压强下实现了室温超导。这个消息在中文互联网流传之后&#xff0c;很快就有了详细的解…