docker 搭建 flink 并上传任务

news2024/9/27 7:19:14

文章目录

        • 一、docker 搭建 flink
          • 1、选择合适的 flink 版本
          • 2、重新创建 JobManager、TaskManager 容器并挂载配置文件
        • 二、flink 简单示例
          • 1、创建项目架构
          • 2、批处理简单示例
          • 3、流处理简单示例
          • 4、上传 flink 集群
            • ①、UI 界面提交任务
            • ②、命令提交任务
          • 5、web-ui 提交查看撤销任务
        • 三、待解决

一、docker 搭建 flink
1、选择合适的 flink 版本

docker 安装就不介绍了,去 dockerHub 搜索 flink 镜像,选择合适的版本安装 https://hub.docker.com/_/flink/tags

使用 docker 命令 docker pull flink: 1.16.0-scala_2.12-java8拉去镜像
在这里插入图片描述

1.16.0-scala_2.12-java8 镜像版本说明,flink 1.16.0,flink 内置 scala 版本 2.12,Java 版本 8

建议先简单启动 flink 容器 JobManager、TaskManager 两个容器将配置文件复制出来方便挂载

# 创建 docker 网络,方便 JobManager 和 TaskManager 内部访问
 docker network create flink-network

# 创建 JobManager 
 docker run \
  -itd \
  --name=jobmanager \
  --publish 8081:8081 \
  --network flink-network \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
  flink:1.16.0-scala_2.12-java8 jobmanager 
  
# 创建 TaskManager 
 docker run \
  -itd \
  --name=taskmanager \
  --network flink-network \
  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
  flink:1.16.0-scala_2.12-java8 taskmanager 

启动成功
在这里插入图片描述
访问 8081 端口如下
在这里插入图片描述
copy 配置文件

# jobmanager 容器
 docker cp jobmanager:/opt/flink/conf ./JobManager/
# taskmanager 容器
docker cp taskmanager:/opt/flink/conf ./TaskManager/
2、重新创建 JobManager、TaskManager 容器并挂载配置文件

修改 JobManager/conf/flink-conf.yaml web 端口号为 18081
在这里插入图片描述

修改 TaskManager/conf/flink-conf.yaml 容器任务槽为 5
在这里插入图片描述
启动容器挂载配置文件

# 启动 jobmanager   
docker run -itd  -v /root/docker/flink/JobManager/conf/:/opt/flink/conf/ --name=jobmanager --publish 18081:18081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" --network flink-network flink:1.16.0-scala_2.12-java8 jobmanager
# 启动 taskmanager   
docker run -itd  -v /root/docker/flink/TaskManager/conf/:/opt/flink/conf/ --name=taskmanager --network flink-network --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"  flink:1.16.0-scala_2.12-java8 taskmanager

参数解释

  • FLINK_PROPERTIES=“jobmanager.rpc.address: jobmanager” rpc 地址,必须设置,否则 jobmanager 和 taskmanager 的 rpc 地址都是随机生成,会连接不上,当然你也可以在直接修改配置文件 flink-conf.yaml

如下两个容器启动成功,可以看到 web 端口为 18081,taskmanager 启动一个,包含 5 个任务槽
在这里插入图片描述

二、flink 简单示例

官网参考地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/configuration/overview/#getting-started

1、创建项目架构

使用 maven 命令指定原型 Flink Maven Archetype 快速创建一个包含了必要依赖的 Flink 程序骨架,自定义项目 groupId、artifactId、package 等信息

mvn archetype:generate ^
  -DarchetypeGroupId=org.apache.flink ^
  -DarchetypeArtifactId=flink-quickstart-java ^
  -DarchetypeVersion=1.16.0	^
  -DgroupId=com.ye ^
  -DartifactId=flink-study ^
  -Dversion=0.1 ^
  -Dpackage=com.ye ^
  -DinteractiveMode=false

下载成功打开项目目录

在这里插入图片描述
如下:注意运行需要设置启动参数,否则启动会找不到类,因为 pom.xml 文件 flink 相关包都添加了 <scope>provided</scope> 表示只用于生产环境,另一种方法就是将 <scope>provided</scope> 修改为<scope>runtime</scope>
在这里插入图片描述

流处理和批处理在 flink 低版本(貌似1.12)需要区分,目前都使用流处理写法

2、批处理简单示例

下面代码用来统计单词出现的的次数

public class DataBatchJob {
    /* 下面示例统计单词出现的次数 */
    public static void main(String[] args) throws Exception {
        // 获取 flink 环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 添加数据源
        DataStreamSource<String> streamSource = env.fromElements("hello world", "hello flink", "flink", "hello", "world");
        // 对传入的流数据分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            // value 传入的数据,out
            // Tuple2 二元组
            // out 传出的值
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] split = value.split(" ");
                for (String s : split) {
                    out.collect(Tuple2.of(s, 1));
                }
            }
        });
        // 按二元组的第 0 个位置分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = streamOperator.keyBy(0);
        // 按二元组的第 1 个位置求和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
        sum.print();
        env.execute("统计单词出现的次数");
    }
}

执行结果如下
在这里插入图片描述
上传 flink 集群

3、流处理简单示例

下面示例通过 socket 文本源,对输入的大于 500 和小于 500 的分别求和

public class DataStreamJob {

    private static final Logger logger = LoggerFactory.getLogger(DataStreamJob.class);

    /* 下面示例对大于 500 和小于 500 的分别求和 */
    public static void main(String[] args) throws Exception {
        
        // 获取 flink 环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加 socket 文本流数据源
        //DataStreamSource<String> streamSource = env.fromElements("200", "100", "6000", "500", "2000", "300", "1500", "900");
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 7777);

        // 对大于 500 和小于 500 进行分组
        KeyedStream<String, String> stringKeyedStream = streamSource.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String s) throws Exception {
                int i = Integer.parseInt(s);
                return i > 500 ? "ge" : "lt";
            }
        });
        // 开 10 秒滚动窗口,每 10 秒为一批数据 【00:00:00 ~ 00:00:10)、【00:00:10 ~ 00:00:20)左闭右开区间
        WindowedStream<String, String, TimeWindow> windowedStream = stringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        
        // 窗口处理函数,泛型 String, Integer, String, TimeWindow 依次对应 输入类型、输出类型、 KEY类型(即keyBy 返回的类型), 窗口
        SingleOutputStreamOperator<Integer> outputStreamOperator = windowedStream.process(new ProcessWindowFunction<String, Integer, String, TimeWindow>() {
            /*
            * key: 分组的 key
            * context: 上下文信息
            * elements: 传过来的一批数据
            * out: 数据输出
            * */
            @Override
            public void process(String key, ProcessWindowFunction<String, Integer, String, TimeWindow>.Context context, Iterable<String> elements, Collector<Integer> out) throws Exception {
                System.out.println(key);
                AtomicInteger sum = new AtomicInteger();
                elements.forEach(item -> sum.addAndGet(Integer.parseInt(item)));
                out.collect(sum.get());
            }
        });
        // 输出
        outputStreamOperator.print();
        env.execute("分组求和");
    }
}

在 window 或 Linux 开启 Socket 文本流测试
在这里插入图片描述

4、上传 flink 集群

打包项目:可以在 pom.xml 修改启动类,也可以在命令启动或者 ui 界面上传设置启动类参数
在这里插入图片描述

①、UI 界面提交任务

使用 ui 界面上传 jar 到 flink 集群,点击 submit 运行

在这里插入图片描述

②、命令提交任务
# 如果集群( 即JobManager) 在当前服务器可以使用如下命令
	$ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
# 如果集群( 即JobManager) 不在当前服务器,在 TaskManager 服务器提交作业可以使用如下命令
	# -m 指定 JobManager 服务器地址
	# -c 指定作业入口程序
	# -p 指定并行度
	$ bin/flink run -m 192.168.1.1:8081 -c com.ye.StreamWordCount -p 2 <jarFile>
# 撤销任务	
	$ bin/flink cancle <jobId>
5、web-ui 提交查看撤销任务

批处理运行完成在这里插入图片描述
流处理正在运行
在这里插入图片描述

三、待解决

使用 docker 启动的 flink 集群发现 UI 界面的 stdout 没有 print 输出
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1130710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

区块链技术与应用 【全国职业院校技能大赛国赛题目解析】第五套区块链系统部署与运维

第五套区块链系统部署与运维题目 环境 : ubuntu20 fisco : 2.8.0 子任务1-2-1: 登陆Linux服务器,安装并部署下图所示的单机、四机构、三群组、八节点的星形组网拓扑区块链系统,具体工作内容如下 此题在官网有例子如图: 每个机构拥有两个节点,机构A属于中心,属于群组1,…

Linux文件系统 struct dentry 结构体解析

文章目录 前言一、目录项简介二、struct dentry2.1 简介2.2 dentry和inode关联2.3 目录项状态2.4 目录项特点 三、dentry cache3.1 简介3.2 dentry cache 初始化3.3 dentry cache 查看 四、dentry与mount、file的关联五、其他参考资料 前言 这两篇文章介绍了: VFS 之 struct f…

10月26日,起立LG新品首发第五代OLED透明显示屏

“聚力蓝海显示屏&#xff0c;合作共赢”&#xff0c;10月26号14:30分&#xff0c;起立携手LG于德金会展国际酒店&#xff0c;对起立自主研发新品第五代OLED透明显示屏首发&#xff0c;欢迎各大商业人士莅临参与&#xff0c;一起挖掘显示屏在当下内卷泛滥的时代新商机。 一、央…

微信小程序设计之主体文件app-wxss/less

一、新建一个项目 首先&#xff0c;下载微信小程序开发工具&#xff0c;具体下载方式可以参考文章《微信小程序开发者工具下载》。 然后&#xff0c;注册小程序账号&#xff0c;具体注册方法&#xff0c;可以参考文章《微信小程序个人账号申请和配置详细教程》。 在得到了测…

JDK JVM JRE和Java API的关系

Java SE 英文全称是Java Standared Edition&#xff0c;它是Java的标准版。 Java SE由四部分组成&#xff1a;JDK JVM JRE和Java语言。 1.JDK Java Development Kit Java开发工具包。包含了所有编译&#xff0c;运行Java程序所需要的工具&#xff0c;还包含了Java运行环境&a…

不开源项目aspose.cells最新版23.10的一些科普

1.基本介绍 日常工作中我们常常会使用到Excel来做一些事情&#xff0c;也常常需要使用代码程序来解析Excel文件&#xff0c;目前来说对于poi、easypoi、easyexcel、jxls的使用已经非常多了&#xff0c;它们都在一些特定情况下很好的去处理Excel文件&#xff0c;但有些时候我们…

ElasticSearch:实现高效数据搜索与分析的利器!项目中如何应用落地,让我带你实操指南。

1.难点解答 收集到几个问题&#xff1a; elasticsearch是单独建一个项目&#xff0c;作为全文搜索使用&#xff0c;还是直接在项目中直接用&#xff1f; ES 服务器是要单独部署的&#xff0c;你可以把 ES 理解为 Redis。 新增数据时&#xff0c;插入到mysql中&#xff0c;需不…

基于SSM的幼儿园管理系统

基于SSM的幼儿园管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatis工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 登录界面 管理员界面 摘要 基于SSM&#xff08;Spring、Spring MVC、MyBatis&#…

python 之输入、输出的简单介绍

文章目录 输入输出 输入 在Python中&#xff0c;您可以使用input()函数来接收用户的输入。input()函数会等待用户输入&#xff0c;并将输入的内容以字符串的形式返回给您。以下是一个简单的示例&#xff1a; user_input input("请输入您的姓名: ") # 提示用户输入…

asp.net网上商城系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio协同过滤设计

一、源码特点 asp.net网上商城系统是一套完善的web设计管理系统系统采用协同过滤算法进行商品推荐&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库 为sqlserver2008&#xff0c;使用c#语言开发 ASP…

echart案例之横向多数据柱状图(含属性详解)

一、此案例基于Vue3ts&#xff0c;效果展示&#xff1a; 二、单个属性的值&#xff1a; 1、grid 整个图表的位置 grid.containLabel 是否包含标签 1.简单来说如果是false,到底部的距离是从坐标轴线开始计算的 2.如果是true,到底部的距离就是从坐标文字底部开始计算 2、legen…

职能篇—自动驾驶产品经理

自动驾驶产品开发流程 在讲自动驾驶产品经理之前&#xff0c;先简单了解一下自动驾驶的开发体系。如上图所示&#xff0c;从产品需求开始&#xff0c;经由系统需求、系统架构、软件需求、软件架构&#xff0c;最终分解到软件代码实现模块&#xff0c;再经由MIL、SIL、HIL、VIL完…

学习笔记二十二:K8s控制器Replicaset

K8s控制器Replicaset Replicaset控制器&#xff1a;概念、原理解读Replicaset概述Replicaset工作原理&#xff1a;如何管理PodReplicaset控制器三个组成部分 Replicaset资源清单文件编写技巧Replicaset使用案例&#xff1a;部署Guestbook留言板编写一个ReplicaSet资源清单资源清…

Windows 安装 jmeter

注&#xff1a;在安装Jmeter之前&#xff0c;请先检查下电脑有没有装JDK&#xff1a;开始->运行->然后输入cmd->进入命令行界面&#xff0c;输入java -version &#xff0c; 出现以下信息就是此电脑已安装了JDK&#xff1a; 下载地址 http://jmeter.apache.org/downlo…

Hive安装配置笔记

版本说明 hadoop-3.3.6&#xff08;已安装&#xff09; mysql-8&#xff08;已安装&#xff09; hive-3.1.3 将hive解压到对应目录后做如下配置&#xff1a; 基本配置与操作 1、hive-site <configuration><!-- jdbc连接的URL --><property><name>ja…

软考高级之系统架构师之数据流图和流程图

数据流图 概述 数据流图&#xff0c;DFD&#xff0c;用于表示业务信息系统中的数据流&#xff0c;它表达系统中的据传从输入到存储间所涉及的程序。采用图形方式来表达系统的逻辑功能、数据在系统内部的逻辑流向和逻辑变换过程&#xff0c;是结构化系统分析方法的主要表达工具…

[蓝桥双周赛]铺地砖

题目描述 小蓝家要装修了&#xff0c;小蓝爸爸买来了很多块&#xff08;你可以理解为数量无限)23规格的地砖&#xff0c;小蓝家的地板是n m规格的&#xff0c;小蓝想问你&#xff0c;能否用这些23的地砖铺满地板。 铺满地板:对于地板的每个区域&#xff0c;都有且只有一块地…

监控与升级

文章目录 主要内容一.部署Metrics1.部署代码如下&#xff08;示例&#xff09;: 2.解释 二.升级控制平面1.先确定要升级的版本代码如下&#xff08;示例&#xff09;: 2.禁止master节点接受新调度代码如下&#xff08;示例&#xff09;: 3.驱逐master节点上的现有任务代码如下&…

IP地址和子网掩码

1.域名 计算机主机名.本地名.组名.最高层域名 http://www.baidu.com 2.IP地址 每个IP地址都由4个小于256的数字组成&#xff0c;数字之间用“.”分开。Internet的IP地址共有32位&#xff0c;4个字节。它有两种表示格式&#xff1a;二进制格式和十进制格式。二进制格式是计算…

Matlab进阶绘图第32期—小提琴图(Violin Chart)

​小提琴图结合了箱线图与核密度图的特征&#xff0c;可用于展示多组数据的分布状态及概率密度。 由于Matlab中未收录小提琴图的绘制函数&#xff0c;因此需要大家自行解决。 本文在violin工具&#xff08;Hoffmann H, MathWork, 2015&#xff09;的基础上&#xff0c;对一些…