flink命令行提交jar包任务

news2024/10/6 2:26:26

1. 环境准备

1.1 flink环境准备

关于如何安装flink,这个写的非常详细,https://blog.csdn.net/qq_43699958/article/details/132826440
在flink的bin目录启动flink cluster

[root@localhost bin]# ./start-cluster.sh

1.2 Linux环境准备

1.2.1 关闭linux防火墙

会用到的命令如下,
a. 查看防火墙状态:firewall-cmd --state
如果是not running状态,说明没有启动防火墙
b. 关闭防火墙:systemctl stop firewalld.service
c. 设置开机禁启:systemctl disable firewalld.service

[root@localhost bin]# firewall-cmd --state
not running
[root@localhost bin]# systemctl stop firewalld.service
[root@localhost bin]# systemctl disable firewalld.service
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
[root@localhost bin]#

1.2.2 hosts文件准备

因为下文的java代码中,代码段socketTextStream(“hadoop01”, 8888)涉及到了host名称,所以需要将linux环境中的/ect/hosts修改一下
文件中添加上这么一行

192.168.126.223 hadoop01

1.3 nc准备

启动一个数据源,用到命令为

[root@localhost ~]# nc -lk 8888

上面命令的意思是:
-k, --keep-open Accept multiple connections in listen mode
-l, --listen Bind and listen for incoming connections

1.4 jar包准备

代码如下,打包成一个jar包即可

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author Amos
 * @date 2023/9/11
 */

public class WordCountStreamUnboundedDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop01", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        sum.print();
        env.execute();
    }
}

2. jar包提交

2.1 首先打开nc

[root@localhost ~]# nc -lk 8888

2.2 在flink环境提交jar包

为了方便,直接将jar包放到了bin目录中,名称为上面java代码的工程名称,
MyFlinkTutorial-117-1.0-SNAPSHOT.jar

[root@localhost bin]# pwd
/root/flink-1.17.1/bin
[root@localhost bin]# ll
total 2364
-rw-r--r--. 1  501 games 2290658 May 19 07:13 bash-java-utils.jar
-rwxr-xr-x. 1  501 games   22811 May 19 04:43 config.sh
-rwxr-xr-x. 1  501 games    1318 May 19 04:43 find-flink-home.sh
-rwxr-xr-x. 1  501 games    2381 May 19 04:43 flink
-rwxr-xr-x. 1  501 games    4446 May 19 04:43 flink-console.sh
-rwxr-xr-x. 1  501 games    6783 May 19 04:43 flink-daemon.sh
-rwxr-xr-x. 1  501 games    1564 May 19 04:43 historyserver.sh
-rwxr-xr-x. 1  501 games    2498 May 19 04:43 jobmanager.sh
-rwxr-xr-x. 1  501 games    1650 May 19 04:43 kubernetes-jobmanager.sh
-rwxr-xr-x. 1  501 games    1717 May 19 04:43 kubernetes-session.sh
-rwxr-xr-x. 1  501 games    1770 May 19 04:43 kubernetes-taskmanager.sh
-rw-r--r--. 1 root root     9055 Sep 12 03:41 MyFlinkTutorial-117-1.0-SNAPSHOT.jar
-rwxr-xr-x. 1  501 games    2994 May 19 04:43 pyflink-shell.sh
-rwxr-xr-x. 1  501 games    4051 May 19 04:44 sql-client.sh
-rwxr-xr-x. 1  501 games    3299 May 19 04:44 sql-gateway.sh
-rwxr-xr-x. 1  501 games    2006 May 19 04:43 standalone-job.sh
-rwxr-xr-x. 1  501 games    1837 May 19 04:43 start-cluster.sh
-rwxr-xr-x. 1  501 games    1854 May 19 04:43 start-zookeeper-quorum.sh
-rwxr-xr-x. 1  501 games    1617 May 19 04:43 stop-cluster.sh
-rwxr-xr-x. 1  501 games    1845 May 19 04:43 stop-zookeeper-quorum.sh
-rwxr-xr-x. 1  501 games    2960 May 19 04:43 taskmanager.sh
-rwxr-xr-x. 1  501 games    1725 May 19 04:43 yarn-session.sh
-rwxr-xr-x. 1  501 games    2405 May 19 04:43 zookeeper.sh
[root@localhost bin]#

因为1.1已经启动了flink环境,并且2.1中启动了nc,所以这里直接提交jar包
命令为:

./flink run -c com.atguigu.wc.WordCountStreamUnboundedDemo MyFlinkTutorial-117-1.0-SNAPSHOT.jar

命令中,-c 指定了包的入口类为com.atguigu.wc.WordCountStreamUnboundedDemo, 后面接上jar名称即可
然后会生成一个jobid

[root@localhost bin]# ./flink run -c com.atguigu.wc.WordCountStreamUnboundedDemo MyFlinkTutorial-117-1.0-SNAPSHOT.jar
Job has been submitted with JobID 28526936241a9f2486c15c9ddb7faa64


在这里插入图片描述
job正常提交和运行,具体如何配置webui,可以看这个文章,也是我写的,所以很有连贯性,https://blog.csdn.net/qq_43699958/article/details/132826440

nc中输入单词测试

[root@localhost ~]# nc -lk 8888
google flink
haerbin
tangshan
hangzhou
kunming
留下青春的都
jiayou nikeyi

在这里插入图片描述
搞定!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1005332.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flutter国内镜像配置

在windows中添加镜像 打开环境变量 PUB_HOSTED_URLhttps://pub.flutter-io.cn FLUTTER_STORAGE_BASE_URLhttps://storage.flutter-io.cn 为 Flutter 设定镜像配置 如果你在国内使用 Flutter&#xff0c;那么你可能需要找一个与官方同步的可信的镜像站点&#xff0c;帮助你的…

MySQL如何进行增量备份与恢复?

目录 一、MySQL 介绍 二、增量备份 三、备份恢复 一、MySQL 介绍 MySQL是一款开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它以其可靠性、灵活性和易于使用而备受赞誉。以下是关于MySQL数据库的介绍&#xff1a; MySQL是由瑞典公司MySQL AB开发&…

MyBatisPlus(三)基础Service接口:增删改查

MyBatisPlus&#xff1a;基础Service接口&#xff1a;增删改查 使用 MyBatisPlus 的 Service 接口&#xff0c;实现基础的增删改查功能。 创建Service 创建Service&#xff0c;继承自MyBatisPlus提供的Service接口。 代码 package com.example.web.service;import com.bao…

(matplotlib)如何让各个子图ax大小(宽度和高度)相等

文章目录 不相等相等 import matplotlib.pyplot as plt import numpy as np plt.rc(font,familyTimes New Roman) import matplotlib.gridspec as gridspec不相等 我用如下subplots代码画一行四个子图&#xff0c; fig,(ax1,ax2,ax3,ax4)plt.subplots(1,4,figsize(20,10),dpi…

程序员接单都在用这六大平台,你呢?

你还在一边熬夜敲代码&#xff0c;一边为自己的健康担忧吗&#xff1f; 你有被工位束缚&#xff0c;为缺乏自由闲暇的时间苦恼吗&#xff1f; 你有因工作交接不顺&#xff0c;给自己的“码农”生活雪上加霜吗&#xff1f; 你是否也在为自己这份“青春饭”&#xff0c;还能吃多久…

iwebsec靶场 文件包含漏洞通关笔记3-session文件包含

目录 1.打开靶场 2.源码分析 &#xff08;1&#xff09;session文件包含漏洞的的工作原理 &#xff08;2&#xff09;sessionstart()做了哪些初始化工作 3.获取session文件位置 4.向session写入webshell 5.访问webshell 1.打开靶场 iwebsec 靶场漏洞库iwebsechttp://iw…

敏捷团队如何领任务?有哪些误区?

敏捷开发团队&#xff08;Scrum团队&#xff09;在每天开每日站会的时候会领取当天的任务&#xff0c;这个实践在敏捷开发中叫做sign-up-for-tasks即领任务。这个实践源自极限编程&#xff0c;在1998年&#xff0c;极限编程最早期的介绍中提到了&#xff0c;“指派任务”和“领…

【1++的C++进阶】之C++11(一)

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的C进阶】 文章目录 一&#xff0c;前言二&#xff0c;列表初始化三&#xff0c;声明自动类型推断&#xff08;auto&#xff09;decltypenullptr 四&#xff0c;右值引用五&#xff0c;完美转发…

docker容器技术实战-2

03docker hub 首先注册上号&#xff1a; https://hub.docker.com/ 上传自己的镜像仓库 创建自己的仓库 webserver 拉取镜像 配置加速器 04搭建私有仓库 上传镜像 在主机1上 在主机2 上 激活内核选项 激活内核选项文件传输过去 配置使用非加密端口 05 docker私有仓库 仓库加…

机器学习笔记自最优化理论与方法(十一)无约束优化问题——关于共轭方向法重要特征的相关证明

机器学习笔记之最优化理论与方法——关于共轭方向法重要特征的相关证明 引言回顾&#xff1a;共轭方向法的思想与几何解释共轭方向法的重要特征(2023/9/12)共轭方向法重要特征的证明 引言 上一节介绍了共轭方向法的朴素思想与几何意义。本节将继续介绍共轭方向法的重要特征以及…

如何判断一篇论文有没有被SCI收录?

打开 Web of Science 网站设置 SCI 筛选条件&#xff08;因为 WoS 收录的不只是 SCI&#xff09; 3. 输入论文题目&#xff0c;点击搜索

如何有效地设置示波器衰减比

在电子电路实验时&#xff0c;往往需要搭配测量设备方可检验电路可行性、了解电路实时动态。示波器是常见的测量设备之一&#xff0c;而搭配示波器使用的测量工具却有很多&#xff0c;如常见标配的无源电压探头、测输出端电压的有源差分探头、测某点电流信号的电流钳等等。 由…

【玩儿】Win 11 安装安卓子系统

Win 11 安装安卓子系统 一、Android子系统的要求二、配置 Windows 虚拟化支持三、Win11 正式版安装安卓子系统方法教程 (离线包安装)下载离线包安装子系统 四、软件安装应用商店下载ADB 调试模式下安装打开调试模式&#xff08;开发人员模式&#xff09;下载 ADB 调试工具ADB 配…

类和对象续

目录 包 自定义包 包的访问权限控制 常见的包 Static成员 静态成员变量 静态成员方法 代码块 构造块 静态块 重写 继承 继承是啥&#xff1f; 父类成员访问 子类中访问父类成员变量 两者不同名 两者同名 子类中访问父类对的成员方法 super 子类构造方法 …

浮点型数据在内存中是如何存储的

文章目录 浮点型变量的存储规则&#xff1a;浮点数怎么转化为二进制浮点型数据在内存中的存储E不全为0或不全为1E为全0E为全1 浮点型变量的存储规则&#xff1a; 根据国际标准IEEE&#xff08;电气和电子工程协会&#xff09; 754&#xff0c;任意一个二进制浮点数V可以表示成下…

java判断两个时间是不是同一天的方法、Java中判断是否是当天时间

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 热爱技术的小郑 前言 开发中会遇到这样一…

简简单单教你如何用C语言实现获取当前所有可用网口!

一、获取本机所有可用网卡名 原理&#xff1a; 在 Linux 系统中&#xff0c;/proc 目录是一个位于内存中的伪文件系统。 /proc目录是内核提供给我们的查询中心&#xff0c;通过查询该目录下的文件内容&#xff0c;可以获取到有关系统硬件及当前运行进程的信息&#xff0c;如…

【C++基础】左值引用、右值引用、move、forward

本文参考&#xff1a;右值引用 | 爱编程的大丙 转移和完美转发 | 爱编程的大丙 左值、右值、左值引用、右值引用 左值 是指存储在内存中、有明确存储地址&#xff08;可取地址&#xff09;的数据&#xff1b; 右值 是指可以提供数据值的数据&#xff08;不可取地址&#x…

阶乘的素因数分解

一、题目 整除问题_牛客题霸_牛客网 (nowcoder.com) 二、普通数字的素因数分解 假如对n进行素因数分解。先利用素数筛法筛选出0~n范围内的全部素数。然后依次遍历这些素数&#xff0c;用n除以这些素数&#xff0c;直至无法整除。然后接着用下一个素数作为除数&#xff0c;直到n…

重庆OV证书和EV证书有什么区别

SSL数字证书按照保护的域名数量和类型可以分为单域名SSL证书、多域名SSL证书和通配符SSL证书三种&#xff0c;按照验证方式可以将SSL数字证书分为DV基础型SSL证书、OV企业型SSL证书和EV增强型SSL证书三种。今天就随SSL盾小编了解OV证书和EV证书的区别。 1.OV企业型SSL证书由CA…