Flink使用

news2025/1/7 15:16:12

Window下启动支持

下载或复制老版本的放在bin目录下即可;

flink.bat

@echo off
setlocal
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
 
SET JVM_ARGS=-Xmx512m
 
SET FLINK_JM_CLASSPATH=%FLINK_LIB_DIR%\*
 
java %JVM_ARGS% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.client.cli.CliFrontend %*
 
endlocal

start-cluster.bat

@echo off
setlocal EnableDelayedExpansion
 
SET bin=%~dp0
SET FLINK_HOME=%bin%..
SET FLINK_LIB_DIR=%FLINK_HOME%\lib
SET FLINK_PLUGINS_DIR=%FLINK_HOME%\plugins
SET FLINK_CONF_DIR=%FLINK_HOME%\conf
SET FLINK_LOG_DIR=%FLINK_HOME%\log
 
SET JVM_ARGS=-Xms1024m -Xmx1024m
 
SET FLINK_CLASSPATH=%FLINK_LIB_DIR%\*
 
SET logname_jm=flink-%username%-jobmanager.log
SET logname_tm=flink-%username%-taskmanager.log
SET log_jm=%FLINK_LOG_DIR%\%logname_jm%
SET log_tm=%FLINK_LOG_DIR%\%logname_tm%
SET outname_jm=flink-%username%-jobmanager.out
SET outname_tm=flink-%username%-taskmanager.out
SET out_jm=%FLINK_LOG_DIR%\%outname_jm%
SET out_tm=%FLINK_LOG_DIR%\%outname_tm%
 
SET log_setting_jm=-Dlog.file="%log_jm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
SET log_setting_tm=-Dlog.file="%log_tm%" -Dlogback.configurationFile=file:"%FLINK_CONF_DIR%/logback.xml" -Dlog4j.configuration=file:"%FLINK_CONF_DIR%/log4j.properties"
 
:: Log rotation (quick and dirty)
CD "%FLINK_LOG_DIR%"
for /l %%x in (5, -1, 1) do ( 
SET /A y = %%x+1 
RENAME "%logname_jm%.%%x" "%logname_jm%.!y!" 2> nul
RENAME "%logname_tm%.%%x" "%logname_tm%.!y!" 2> nul
RENAME "%outname_jm%.%%x" "%outname_jm%.!y!"  2> nul
RENAME "%outname_tm%.%%x" "%outname_tm%.!y!"  2> nul
)
RENAME "%logname_jm%" "%logname_jm%.0"  2> nul
RENAME "%logname_tm%" "%logname_tm%.0"  2> nul
RENAME "%outname_jm%" "%outname_jm%.0"  2> nul
RENAME "%outname_tm%" "%outname_tm%.0"  2> nul
DEL "%logname_jm%.6"  2> nul
DEL "%logname_tm%.6"  2> nul
DEL "%outname_jm%.6"  2> nul
DEL "%outname_tm%.6"  2> nul
 
for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
if not defined FOUND (
    echo java.exe was not found in PATH variable
    goto :eof
)
 
echo Starting a local cluster with one JobManager process and one TaskManager process.
 
echo You can terminate the processes via CTRL-C in the spawned shell windows.
 
echo Web interface by default on http://localhost:8081/.
 
start java %JVM_ARGS% %log_setting_jm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint --configDir "%FLINK_CONF_DIR%" > "%out_jm%" 2>&1
start java %JVM_ARGS% %log_setting_tm% -cp "%FLINK_CLASSPATH%"; org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir "%FLINK_CONF_DIR%" > "%out_tm%" 2>&1
 
endlocal

Flink实战(Java/MongoDB/Mysql)

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
        <flink-version>1.14.6</flink-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <optional>true</optional>
        </dependency>
        <!-- mongodb -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.1.0-1.18</version>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.50</version>
        </dependency>
 
        <!-- Spring Boot Starter MyBatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.6</version>
        </dependency>
 
        <!-- MySQL Driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
    </dependencies>

Flink mongodb 

Flink CDC在MongoDB的副本集(replica set)上使用了$changeStream操作,而该操作仅支持副本集。如果你的MongoDB是单个实例(standalone),则不支持$changeStream操作。

MongoDB开启副本集(Replica Set),否则代码运行会报错。

Flink报错记录

[20210910 10:16:40.107] [DEBUG] [main] [StreamGraph.java:391][addOperator] Vertex: 2
java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
        at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1947)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
        at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
        at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)

添加

            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

[20210910 10:20:02.172] [INFO] [main] [RestServerEndpoint.java:139][start] Starting rest endpoint.
[20210910 10:20:02.203] [DEBUG] [main] [DispatcherRestEndpoint.java:124][initializeWebSubmissionHandlers] Failed to load web based job submission extension.
org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission.
        at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:197)
        at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeWebSubmissionHandlers(DispatcherRestEndpoint.java:112)
        at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.initializeHandlers(WebMonitorEndpoint.java:268)
        at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:89)
        at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:144)
        at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
        at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:463)
        at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:422)
        at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:366)
        at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:75)
        at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:85)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
        at com.example.Fink_Kafka_Demo.main(Fink_Kafka_Demo.java:27)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
        at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88)
[20210910 10:20:02.535] [DEBUG] [main] [InternalLoggerFactory.java:45][newDefaultFactory] Using SLF4J as the default logging framework
[20210910 10:20:02.536] [DEBUG] [main] [InternalThreadLocalMap.java:56][<clinit>] -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

 添加

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime-web -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web_2.12</artifactId>
    <version>${flink-version}</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2271840.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TI毫米波雷达原始数据解析之Lane数据交换

TI毫米波雷达原始数据解析之Lane数据交换 背景Lane 定义Lane 确认确认LVDS Lane 数量的Matlab 代码数据格式参考 背景 解析使用mmWave Studio 抓取的ADC Data Lane 定义 芯片与DCA100之间的数据使用LVDS接口传输&#xff0c;使用mmWave Studio 配置过程中有一个选项是LVDS L…

2-markdown转网页页面设计 --[制作网页模板]

页面设计 将Markdown转换为网页页面设计通常涉及以下几个步骤&#xff1a; 编写Markdown内容&#xff1a; 首先&#xff0c;你需要创建或已有以Markdown格式编写的文档。Markdown是一种轻量级的标记语言&#xff0c;它允许人们使用易读易写的纯文本格式编写文档&#xff0c;然…

Linux驱动开发(18):linux驱动并发与竞态

并发是指多个执行单元同时、并行执行&#xff0c;而并发的执行单元对共享资源(硬件资源和软件上的全局变量、静态变量等)的访问 则很容易导致竞态。对于多核系统&#xff0c;很容易理解&#xff0c;由于多个CPU同时执行&#xff0c;多个CPU同时读、写共享资源时很容易造成竞态。…

Elasticsearch: 高级搜索

这里写目录标题 一、match_all匹配所有文档1、介绍&#xff1a; 二、精确匹配1、term单字段精确匹配查询2、terms多字段精确匹配3、range范围查询4、exists是否存在查询5、ids根据一组id查询6、prefix前缀匹配7、wildcard通配符匹配8、fuzzy支持编辑距离的模糊查询9、regexp正则…

GitLab集成Runner详细版--及注意事项汇总【最佳实践】

一、背景 看到网上很多用户提出的runner问题其实实际都不是问题&#xff0c;不过是因为对runner的一些细节不清楚导致了误解。本文不系统性的介绍GitLab-Runner&#xff0c;因为这类文章写得好的特别多&#xff0c;本文只汇总一些常几的问题/注意事项。旨在让新手少弯路。 二、…

Spring Boot 中 RabbitMQ 的使用

目录 引入依赖 添加配置 Simple&#xff08;简单模式&#xff09; 生产者代码 消费者代码 ​编辑 Work Queue&#xff08;工作队列&#xff09; 生产者代码 消费者代码 Publish/Subscribe&#xff08;发布/订阅&#xff09; 生产者代码 消费者代码 Routing&#x…

【linux基础I/O(1)】文件描述符的本质重定向的本质

目录 前言1. 理解C语言的文件接口2. 操作文件的系统调用接口2.1 open函数详解2.2 close函数详解2.3 write函数详解2.4 read函数详解 3. 文件描述符fd详解4. 文件描述符的内核本质5. 怎样理解Linux下一切皆文件?6. 理解输出输入重定向7. 重定向的系统调用8. 总结 前言 “在Lin…

全面解析 Node-RED:功能、Docker 部署与实战示例

言简意赅的讲解Node-RED解决的痛点 Node-RED 是一个基于流的编程工具&#xff0c;专为物联网&#xff08;IoT&#xff09;应用而设计。它通过可视化的编程界面&#xff0c;使开发者能够轻松地连接各种硬件设备、API 以及在线服务&#xff0c;构建复杂的应用流程。本文将详细介…

2、pycharm常用快捷命令和配置【持续更新中】

1、常用快捷命令 Ctrl / 行注释/取消行注释 Ctrl Alt L 代码格式化 Ctrl Alt I 自动缩进 Tab / Shift Tab 缩进、不缩进当前行 Ctrl N 跳转到类 Ctrl 鼠标点击方法 可以跳转到方法所在的类 2、使用pip命令安装request库 命令&#xff1a;pip install requests 安装好了…

2025-01-04 Unity插件 YodaSheet1 —— 插件介绍

文章目录 1 介绍2 工作原理2.1 ScriptableObject -> YadeSheetData2.2 YadeDatabase 存储多个 YadeSheetData 3 用途4 缺点5 推荐 1 介绍 ​ Yade 提供类似于 Excel 或者 Google Sheets 的表格编辑器&#xff0c;可以轻松地在 Unity 编辑器中 编辑&#xff0c;搜索&#xf…

用 C++ 创建控制台计算器

本文内容 先决条件创建应用项目验证新应用是否生成并运行编辑代码 显示另外 5 个 C 程序员通常从在命令行上运行的“Hello, world!”应用程序开始。 你将以本文为起点&#xff0c;逐步进阶&#xff0c;加深学习难度&#xff1a;计算器应用。 先决条件 在 Visual Studio 中…

IDEA 撤销 merge 操作(详解)

作为一个开发者&#xff0c;我们都知道Git是一个非常重要的版本控制工具&#xff0c;尤其是在协作开发的过程中。然而&#xff0c;在使用Git的过程中难免会踩一些坑&#xff0c;今天我来给大家分享一个我曾经遇到的问题&#xff1a;在使用IDEA中进行merge操作后如何撤销错误的合…

限时特惠,香港服务器,低至53元/年

家人们谁懂啊&#xff01;香港服务器这价格简直逆天了&#xff0c;居然比内地的还便宜&#xff01;就拿阿里云来说&#xff0c;人家最低配置的服务器&#xff0c;价格都很难做到这么亲民。 最低配的就不说了&#xff0c;2 核 4G 的配置&#xff0c;应对日常业务稳稳当当&#x…

EF Core配置及使用

Entity Framework Core是微软官方的ORM框架。 ORM&#xff1a;Object Relational Mapping。让开发者用对象操作的形式操作关系数据库。 EF Core是对于底层ADO.NET Core的封装&#xff0c;因此ADO.NET Core支持的数据库不一定被EF Core支持。 代码创建数据库Code First 建实…

GPT分区 使用parted标准分区划分,以及相邻分区扩容

parted 是一个功能强大的命令行工具&#xff0c;用于创建和管理磁盘分区表和分区。它支持多种分区表类型&#xff0c;如 MBR&#xff08;msdos&#xff09;、GPT&#xff08;GUID Partition Table&#xff09;等&#xff0c;并且可以处理大容量磁盘。parted 提供了一个交互式界…

【mybatis-plus问题集锦系列】使用mybatis实现数据的基础增删改查

使用mybatis实现数据的基础增删改查,简单的增删改查操作方法步骤 代码实现 pom.xml <dependencies><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>3.0.…

tlias项目实战笔记

一个小项目写了一个多月&#xff0c;在考试周穿插&#xff0c;终于能有时间来写个小总结了&#xff0c;废话少说&#xff0c;我们直接来步入正题。 一、项目开发规范 1.开发风格Restful 案例是基于当前最为主流的前后端分离模式进行开发。 在前后端分离的开发模式中&#xff…

Arduino Uno简介与使用方法

目录 一、Arduino Uno概述 1. 硬件特性 2. 开发环境 二、Arduino Uno的基本使用方法 1. 硬件连接 2. 软件编程 三、Arduino Uno编程基础 1. 基本语法 2. 常用函数 四、Arduino Uno应用举例 1. LED闪烁 2. 温度检测 3. 超声波测距 五、Arduino Uno的扩展与应用 1…

go 模拟TCP粘包和拆包,及解决方法

1. 什么是 TCP 粘包与拆包&#xff1f; 粘包&#xff08;Sticky Packet&#xff09; 粘包是指在发送多个小的数据包时&#xff0c;接收端会将这些数据包合并成一个数据包接收。由于 TCP 是面向流的协议&#xff0c;它并不会在每次数据发送时附加边界信息。所以当多个数据包按顺…

新能源电动汽车动力电池技术

新能源电动汽车动力电池技术是新能源汽车发展的核心之一&#xff0c;以下是动力电池技术的一些关键方面&#xff1a; 技术进展 能量密度提升&#xff1a;近年来&#xff0c;动力电池的能量密度有了显著提升&#xff0c;从2010年的100Wh/kg提高到2024年的300Wh/kg。能量密度的…