Flink系列之Flink 流式编程模式总结

news2024/10/3 0:30:13

title: Flink系列


一、Flink 流式编程模式总结

1.1 基础总结

官网: https://flink.apache.org/

Apache Flink® — Stateful Computations over Data Streams

在这里插入图片描述

三个任意:

	任意的数据源 Source
	任意的计算类型 Transformation
	任务的数据目的地 Sink

其中关于 Flink 的编程:

在这里插入图片描述

所有的计算,都是这样的大流程:

  • ​ 从哪里读取数据
  • ​ 读取到了数据执行什么样的计算
  • ​ 计算得到结果之后,输出到哪里

Flume + DataX(从哪里收集?内部的处理?数据输出到哪里?)
总结一下:

在这里插入图片描述

通过 Flink WordCount 总结出来的编程模式:

01、获得一个执行环境:(Execution Environment)
02、加载/创建初始数据:(Source)
03、指定转换这些数据:(Transformation)
04、指定放置计算结果的位置:(Sink)
05、触发程序执行:(Action)

1.2 提交到集群上面运行流程

1.2.1 通过页面方式提交

1、测试程序如下

package com.aa.flinkscala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkscala
 * 动态参数形式测试
 */
object WordCountScalaStreamWithParameter {
  def main(args: Array[String]): Unit = {
    //1、获取执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //修改成动态参数
    val parameterTool = ParameterTool.fromArgs(args)
    val hostname = parameterTool.get("hostname")
    val port = parameterTool.getInt("port")

    //2、获取数据源
    val textStream: DataStream[String] = env.socketTextStream(hostname, port)

    //3、数据计算处理逻辑
    val wordCountStreamRes: DataStream[(String, Int)] = textStream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(line => line._1)
      //.keyBy(0) //过期的方法,将来可能不支持啦
      .sum(1)

    //4、打印输出结果
    wordCountStreamRes.print()

    //5、启动应用程序
    env.execute("WordCountScalaStreamWithParameter")
  }
}

2、打成jar

直接通过idea右侧的maven方式打包即可

在这里插入图片描述

打包完了之后如下:

在这里插入图片描述

3、启动集群进行测试即可

在hadoop10上面启动:

[root@hadoop10 bin]# cd /software/flink/bin/
[root@hadoop10 bin]# start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host hadoop10.
Starting taskexecutor daemon on host hadoop11.
Starting taskexecutor daemon on host hadoop12.
[root@hadoop10 bin]# 

页面如下:

在这里插入图片描述

4、在页面中提交

(1)点击Submit New Job

在这里插入图片描述

(2)点击Add New

在这里插入图片描述

(3)选择上传之后如下:

在这里插入图片描述

(4)单击jar包的名字,填写参数
在这里插入图片描述

(4)填写完参数如下:

在这里插入图片描述

(5)记得先启动 hadoop12 上面的 9999 端口 。 否则报错连接不上。

[root@hadoop12 software]# nc -lk 9999

(6)点击Submit提交运行

然后去运行中查看对应程序

在这里插入图片描述

(7)到hadoop12 的 9999 端口中输入数据进行测试

[root@hadoop12 software]# nc -lk 9999

hello world hello hadoop hello flink
hello

(8)去页面中查看

在这里插入图片描述

(9)取消程序

点击页面中的左上角的Cancel Job即可

在这里插入图片描述

或者使用命令取消:

[root@hadoop10 bin]# flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
23.02.2022 16:11:02 : 9aab4c22c946dc0bfeb1546741fca390 : WordCountScalaStreamWithParameter (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
[root@hadoop10 bin]# flink cancel 9aab4c22c946dc0bfeb1546741fca390
Cancelling job 9aab4c22c946dc0bfeb1546741fca390.
Cancelled job 9aab4c22c946dc0bfeb1546741fca390.
[root@hadoop10 bin]# flink list
Waiting for response...
No running jobs.
No scheduled jobs.
[root@hadoop10 bin]# 

截图如下:

在这里插入图片描述

4、通过命令显示所有历史任务

[root@hadoop10 bin]# flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.
---------------------- Terminated Jobs -----------------------
23.02.2022 16:04:45 : 550fe2157ca70d796aeca4cf8b4dc622 : WordCountScalaStreamWithParameter (FAILED)
23.02.2022 16:11:02 : 9aab4c22c946dc0bfeb1546741fca390 : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:24:33 : b2fdf9e8b8cb122a235d2948a307b244 : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:30:26 : 67e18591ba85782bde8dd3a49e1488cb : WordCountScalaStreamWithParameter (CANCELED)
23.02.2022 16:31:24 : 52d672e2f628badf42a2fee396ffaf18 : WordCountScalaStreamWithParameter (FAILED)
--------------------------------------------------------------
[root@hadoop10 bin]# 

1.2.2 通过代码方式提交

1、基础理论

去到flink的bin目录下面:

flink run -c com.aa.flinkscala.WordCountScalaStreamWithParameter /home/data/flink-1.0-SNAPSHOT.jar --hostname hadoop12 --port 9999

或者:

flink run \
-c com.aa.flinkscala.WordCountScalaStreamWithParameter \
/home/data/flink-1.0-SNAPSHOT.jar \
--hostname hadoop12 --port 9999

2、实践

[root@hadoop10 bin]# flink run -c com.aa.flinkscala.WordCountScalaStreamWithParameter /home/data/flink-1.0-SNAPSHOT.jar --hostname hadoop12 --port 9999
Job has been submitted with JobID b2fdf9e8b8cb122a235d2948a307b244

去页面中查看

正在运行的如下:

在这里插入图片描述

同理输入测试就一样了。

3、小总结

提交 Flink 应用程序到 Flink Cluster 中运行:

​ 搭建 Flink 集群!
​ 编写 wordcount,打成 jar 包,提交到 flink 集群运行

flink run \
--c 全类路径名 \
jar包绝对路径 \
--hostname hadoop10 \
--port 6789

当你需要提交一个 jar 包到 flink standalone 集群或者 YARN 中运行的时候,其实是通过 flink run … 搞定的 ==>
这个 shell 命令的底层就是: java CliFrontend

提交完程序之后,在另外一个节目里面通过jps,可以看到这个CliFrontend 。

在这里插入图片描述

在源码中的全路径名为:

flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java

在源码中的位置如下:

在这里插入图片描述



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/54245.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

trt多流、多batch、多context

(1)一个engine可以创建多个context,一个engine可以有多个执行上下文,允许一组权值用于多个重叠推理任务。例如,可以使用一个引擎和一个上下文在并行CUDA流中处理图像。每个上下文将在与引擎相同的GPU上创建。 &#xf…

跨境电商市场也“内卷”,出海卖家如何破圈?

近些年来,跨境电商从业者都在调侃本行业越来越“卷”,大家需要铆足了劲竞争更有利的资源,以往的流量红利期似乎一去不复返了。事实上,很多跨境电商卖家对这种局势并不陌生,一些人甚至经历了国内电商从流量红利期至流量…

复习计算机网络——第三章记录(1)

数据链路层 功能:通过一些数据链路层的协议,在不太可靠的物理链路上实现可靠的数据传输。 相关基本概念: 1、结点(node):网络中的主机(host)和路由器(router&#xff…

JS实现简易计算器(input表单)

实现效果: 代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"></head><body><p>整数1:<input type"text" id"num1"></p><p>整数2:<input type"text&q…

Kamiya丨Kamiya艾美捷抗BCMA单抗,克隆Vicky-2说明书

Kamiya艾美捷抗BCMA单抗化学性质&#xff1a; 同义词&#xff1a;B细胞成熟蛋白&#xff0c;TNFRSF 17&#xff0c;CD269。 特异性&#xff1a;识别鼠标BCMA。 物种反应性&#xff1a;老鼠不与人BCMA发生交叉反应。其他物种未经测试。 Ig同种型&#xff1a;大鼠IgG2a 免疫…

4×30m钢筋混凝土简支T梁桥结构设计与计算

目 录 1设计资料 1 1.1桥面净宽 1 1.2设计荷载 1 1.3主梁跨径和全长 1 1.4材料 1 1.5设计依据 1 1.6参考资料 1 2任务与要求 2 2.1结构尺寸拟定 2 2.2行车道板计算 2 2.3主梁计算 2 2.4横梁的计算 2 3结构尺寸拟定 3 4行车道板计算 4 4.1永久荷载及其效应 4 4.2截面设计、配筋与…

机器视觉计算(一)

本文主要记录像素尺寸映射到实际物理尺寸的计算&#xff0c;并给出一些参考文献。 分辨率 视野(Field of View)/像素(Pixel) 比如我要看的产品大小是30mm*10MM&#xff0c;使用200万像素&#xff08;1600pixel*1200pixel&#xff09;的相机。因为产品是长条形&#xff0c;为了…

[附源码]Python计算机毕业设计SSM酒店客户管理系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

read-after-write consistency 写后读一致性的解决方法

问题定义 写后读一致即写完数据之后马上读&#xff0c;直接能读到新的数据&#xff0c;而不是老的数据。 导致这个问题主要是数据库之间的同步延时。这里只讨论一主多从的情况。 如下图&#xff1a; 用户添加新评论用户刷新&#xff0c;读请求到从节点1&#xff0c;此时从节…

【Matplotlib绘制图像大全】(十三):甜甜圈饼图

前言 大家好,我是阿光。 本专栏整理了《Matplotlib绘制图像大全》,内包含了各种常见的绘图方法,以及Matplotlib各种内置函数的使用方法,帮助我们快速便捷的绘制出数据图像。 正在更新中~ ✨ 🚨 我的项目环境: 平台:Windows10语言环境:python3.7编译器:PyCharmMatp…

Vue2.0开发之——Vue基础用法-vue-cli(30)

一 概述 vue-cli—介绍并安装vue-clivue-cli—基于vue-cli创建vue项目vue-cli—项目预览效果vue-cli—项目目录结构vue-cli—vue项目运行过程vue-cli—组件的基本使用 二 vue-cli—介绍并安装vue-cli 2.1 什么是单页面应用程序 单页面应用程序&#xff08;英文名&#xff1a…

C语言刷题系列——6.(递归)实现顺序输出整数

递归实现顺序输出整数 ❄️一) 题目要求☃️1.函数接口定义&#xff1a;☃️2.裁判测试程序样例&#xff1a;❄️二) 非递归 解法☃️step1.统计位数☃️step2.循环&#xff0c;打印每一位☃️step3.实现❄️三) 递归 解法☃️step1.分析☃️step2.图解流程☃️step3.实现)❄️…

举个栗子~Tableau 技巧(245):用辅助标识快速查看标靶图

我们经常会使用 标靶图&#xff08;靶心图&#xff09;来参照目标评估指标的表现。例如&#xff1a;销售配额评估、实际花费与预算的比较情况、绩效优劣范围&#xff08; 优/良/差&#xff09;等等。 指标不多的情况&#xff0c;标靶图其实是较直观的。但如果指标很多&#xf…

面试官:请问如何提升TCP三次握手的性能?

本文主要分享在 Linux 操作系统下&#xff0c;如何优化 TCP 的三次握手流程&#xff0c;提升握手速度。 TCP 是一个可以双向传输的全双工协议&#xff0c;所以需要经过三次握手才能建立连接。三次握手在一个 HTTP 请求中的平均时间占比在 10% 以上&#xff0c;在网络状况不佳、…

请问财务管理的作用有哪些?

财务管理作为企业管理的一个重要组成部分&#xff0c;如何在当前企业经营方式转变过程中发挥出更大的作用&#xff0c;如何进一步促进财务管理理论和方法的改革与发展&#xff0c;从而使财务管理更具有适应性和有效性&#xff0c;是企业管理理论与实务工作者一个共同关心的话题…

常用数据结构 ——— 队列(环形队列和顺序队列)

目录 一、队列简介 二、顺序队列 三、环形队列 四、环形队列代码 1、队列结构体 2、队列初始化 3、判断队列是否为满 4、判断队列是否为空 5、将数据插入到队列中 6、读取队列中的数据 7、释放队列空间 8、功能测试 一、队列简介 队列只允许在队列头&#xff08;fr…

_Linux(共享内存)

文章目录0. 共享内存1. 共享内存示意图2. 共享内存函数2.1 shmget函数2.2 shmat函数2.3 shmdt函数2.4 shmctl函数2.5 查看共享内存指令2.6 删除共享方法2.6.1 指令删除2.6.2 代码删除3. 实例代码3.0 log.hpp打印日志信息3.1 comm.hpp(shmServer.cc和shmClicent.cc共有文件)3.2 …

Java 17的这些新特性,Java迈入新时代

前言 2021年9月14日Java 17发布&#xff0c;作为新时代的农民工&#xff0c;有必要了解一下都有哪些新东西。 Java 17是Java 11以来又一个LTS&#xff08;长期支持&#xff09;版本&#xff0c;Java 11 和Java 17之间发生了那些变化可以在OpenJDK官网找到JEP&#xff08;Java…

Unity中用Natrue Renderer做自己的地形Terrain.

效果图 一、下载与导入Nature Renderer Nature Renrderer是个强大的插件&#xff0c;它本身就可以作为地形编辑的工具取代Unity的地形细节和树木的渲染系统。 nature-renderer官网 1.下载链接 推荐&#xff08;已经购买的许可证&#xff0c;可直接使用&#xff09;&#xf…

设计原则和设计模式01

一&#xff1a;软件设计原则 1.单一职责原则&#xff1a; 有且只有一个原因引起类的变化(类或者接口的职责单一化) 2.里氏替换原则&#xff1a; 子类可以扩展父类的功能,但不能改变父类原有的功能 3.依赖倒置原则&#xff1a; 1.高层模块不应该依赖于底层模块&#xff0c…