Flink学习——基本概述

news2025/3/12 1:24:49

目录

一、Flink概述

二、单机版安装配置

1.开启hadoop

2.解压Flink压缩包

3.修改文件名

4.开启客户端

5.访问webUI

三、集群配置

1.jobmanager配置

2.master配置

3.workers配置

4.分发配置

5.开启Flink集群

6.访问webUI

7.查看Job Manager

8.查看Task Managers

9.停止集群命令

10.webUI提交任务

11.命令行提交作业

12.命令行查看运行状态

13.查看所有正在运行的Flink作业

14.查看正在运行和已经取消的Flink作业

15.取消正在运行的Flink作业

16.再次查看已经取消的作业

四、Yarn模式配置

五、输出算子(sink)

1.获取Kafka生产者的消息,打印到控制台

2.开启kafka生产者

3.启动IDEA程序,然后在生产者输入内容,控制台就能够消费到


一、Flink概述

        Flink 是 Apache 基金会旗下的一个开源大数据处理框架,Flink是专门处理实时数据的,下面是一段wordcount代码:

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 1.创建一个流式执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.读取socket文本流数据
    //      val lineDataStream: DataStream[String] = env.socketTextStream("lxm147",7777)
    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    val hostname: String = parameterTool.get("host")
    val port: Int = parameterTool.getInt("port")
    val lineDataStream: DataStream[String] = env.socketTextStream(hostname, port)


    val sum1: DataStream[(String, Int)] = lineDataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1) // 二元组的第二个元素做统计

    sum1.print()
    env.execute()
  }
}

运行架构:程序中一系列操作就是Job,Job中每一步转换计算的操作就是Task,要把每一个Task运行在不同的节点或线程上。

        客户端将代码转换为真正可以执行的作业,将作业提交给JobManager(协调调度中心),然后JobManager根据当前业务的资源进行作业任务的分配,将作业分配给TaskManager(工作节点),就可以进行业务执行。

任务槽:taskmanager.numberOfTaskSlots:1  针对每一个具体的任务给它分配的具体的系统资源,代表当前的taskmanager能够并行执行多少任务,这里的1是指一个Task不能并行执行任务,如果配置数量大一些,就相当于当前一个Task可以并行执行任务,设置了任务槽的数量,运行时未必会占用。

并行度:parallelism.default: 1任务真正运行时一个任务到底如何去并行。

二、单机版安装配置

1.开启hadoop

start-all.sh

2.解压Flink压缩包

tar -zxf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/

3.修改文件名

mv /opt/softwore/flink-1.13.0/ flink

4.开启客户端

[lxm@linux102 flink]$ ./bin/start-cluster.sh 

5.访问webUI

linux102:8081

三、集群配置

1.jobmanager配置

vim /opt/module/flink/conf/flink-conf.yaml

jobmanager.rpc.address: linux102

2.master配置

vim /opt/module/flink/conf/masters 

linux102

3.workers配置

vim /opt/module/flink/conf/workers

linux103
linux104 

4.分发配置

xsync /opt/module/flink

5.开启Flink集群

[lxm@linux102 flink]$ ./bin/start-cluster.sh 

6.访问webUI

7.查看Job Manager

8.查看Task Managers

9.停止集群命令

[lxm@linux102 flink]$ ./bin/stop-cluster.sh

10.webUI提交任务

(1)添加打包插件

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

(2)打包

(3)提交jar包

(4)开启服务器

(5)show plan

(6)submit

任务槽只占用了一个

(7)客户端输入内容

(8)查看结果

(9)关闭服务

任务槽恢复

11.命令行提交作业

开启客户端
[lxm@linux102 flink]$ nc -lk 7777


[lxm@linux102 flink]$ ./bin/flink run -m linux102:8081 -c com.atguigu.chapter02.StreamWordCount -p 2 ./FlinkTutorial-1.0-SNAPSHOT.jar --host linux102 --port 7777

socket文本流无法并行,所以并行度是1

12.命令行查看运行状态

Job has been submitted with JobID baed661e5b5011b4f799a53fd5999122


[lxm@linux102 flink]$ ./bin/flink list baed661e5b5011b4f799a53fd5999122
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.04.2023 15:43:50 : baed661e5b5011b4f799a53fd5999122 : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

13.查看所有正在运行的Flink作业

[lxm@linux102 flink]$ ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.04.2023 15:43:50 : baed661e5b5011b4f799a53fd5999122 : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

14.查看正在运行和已经取消的Flink作业

[lxm@linux102 flink]$ ./bin/flink list -a
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.04.2023 15:43:50 : baed661e5b5011b4f799a53fd5999122 : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
---------------------- Terminated Jobs -----------------------
12.04.2023 15:31:49 : 91b22721aed9cac74adf13498529d6e7 : Flink Streaming Job (CANCELED)
12.04.2023 15:43:01 : 9326b8e938c7ef6d408bf359af99c956 : Flink Streaming Job (FAILED)
--------------------------------------------------------------

15.取消正在运行的Flink作业

[lxm@linux102 flink]$ ./bin/flink cancel baed661e5b5011b4f799a53fd5999122
Cancelling job baed661e5b5011b4f799a53fd5999122.
Cancelled job baed661e5b5011b4f799a53fd5999122.

16.再次查看已经取消的作业

[lxm@linux102 flink]$ ./bin/flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.
---------------------- Terminated Jobs -----------------------
12.04.2023 15:31:49 : 91b22721aed9cac74adf13498529d6e7 : Flink Streaming Job (CANCELED)
12.04.2023 15:43:01 : 9326b8e938c7ef6d408bf359af99c956 : Flink Streaming Job (FAILED)
12.04.2023 15:43:50 : baed661e5b5011b4f799a53fd5999122 : Flink Streaming Job (CANCELED)
--------------------------------------------------------------

四、Yarn模式配置

        YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业 所需要的 Slot 数量动态分配 TaskManager 资源。

$ sudo vim /etc/profile.d/my_env.sh
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

注意:记得分发集群

五、输出算子(sink)

1.获取Kafka生产者的消息,打印到控制台

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

object SourceKafkaTest {
  def main(args: Array[String]): Unit = {
    // 获取流执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    // 用Properties保存Kafka连接的相关配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "linux102:9092")
    properties.setProperty("group.id", "consumer-group")

    val stream: DataStream[String] = env
      .addSource(new FlinkKafkaConsumer[String]("clicks", new SimpleStringSchema(), properties))

    stream.print()
    env.execute()
  }
}

2.开启kafka生产者

[lxm@linux102 flink]$ kafka-console-producer.sh -broker-list linux102:9092 --topic clicks

3.启动IDEA程序,然后在生产者输入内容,控制台就能够消费到

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/433303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不同批次板子采集到的传感器压力值不同

问题描述&#xff1a; M340B空压机主控板在接正常压力气源时&#xff0c;显示屏显示压力值过高并报警。 问题排查&#xff1a; 确认可能的故障点&#xff1a;压力传感器、硬件电路&#xff08;供电电路、分压电路、ADC采样电路等&#xff09;、单片机、软件&#xff1b; 排…

MySQL-----表的约束

文章目录 前言一、空属性二、默认值三、列描述四、zerofil五、主键六、自增长七、唯一键八、外键总结 前言 真正约束字段是数据类型,但是数据类型约束很单一,需要有一些额外的约束,更好的保证数据的合法性, 从业务逻辑角度保证数据的正确性.比如有一个字段是email,要求是唯一的…

十八、市场活动备注:修改

功能需求 用户在市场活动明细页面,点击"修改"市场活动备注的图标,弹出修改市场活动备注的模态窗口; 用户在修改市场活动备注的模态窗口,填写表单,点击"更新"按钮,完成修改市场活动备注的功能. *备注内容不能为空 *修改成功之后,关闭模态窗口,刷新备注列表…

VC++ | 编译、运行Debug版本报错-20230419-01

VC++ | 编译、运行Debug版本报错-20230419-01 1.LOG如下 1>------ 已启动生成: 项目: CamManager, 配置: Debug Win32 ------ 1>stdafx.cpp 1>UnicodeLib.cpp 1>MultiLanguage.cpp 1>d:\01_project\02_grain\pdv-tools

DSAI130D 3BSE003127R1在机器视觉系统中主要负责光束调制

DSAI130D 3BSE003127R1在机器视觉系统中主要负责光束调制 如今&#xff0c;随着工业4.0的到来&#xff0c;机器视觉技术在工业自动化中逐渐起着十分重要的地位&#xff0c;机器视觉技术的不断创新&#xff0c;推动了工业自动化、智慧安防以及人工智能等行业的进步&#xff0c;…

Maven打包跳过测试的5种方式

Maven打包跳过测试的5种方式 1、命令行方式跳过测试 我们可以通过使用命令将项目打包&#xff0c;添加跳过测试的命令就可以了&#xff0c;可以用两种命令来跳过测试&#xff1a; -DskipTeststrue mvn package -DskipTeststrue-DskipTeststrue&#xff0c;不执行测试用例&a…

SpringCloud网关——GateWay

GateWay 本专栏学习内容来自尚硅谷周阳老师的视频 有兴趣的小伙伴可以点击视频地址观看 概述 SpringCloud Gateway 是 Spring Cloud 的一个全新项目&#xff0c;基于 Spring 5.0Spring Boot 2.0 和 Project Reactor 等技术开发的网关&#xff0c;它旨在为微服务架构提供一种简…

c++11 标准模板(STL)(std::queue)(三)

定义于头文件 <queue> template< class T, class Container std::deque<T> > class queue; std::queue 类是容器适配器&#xff0c;它给予程序员队列的功能——尤其是 FIFO &#xff08;先进先出&#xff09;数据结构。 类模板表现为底层容器的包…

asp.net+sqlserver企业人力资源信息网站系统

下面分别论述本系统的各个功能模块的作用&#xff1a; &#xff08;1&#xff09;员工管理模块&#xff1a;该模块主要是查看自己/同事的资料&#xff0c;以更好促进公司员工之间的相互了解。同时也可以修改自己的部分信息&#xff0c;管理员可以实现对员工信息的添加&#xff…

Wi-Fi 6(802.11ax)解析14:非主动形式的BSR(Buffer Status)反馈

序言 该机制的基本思想就是通过AP竞争&#xff0c;获得TXOP传输时间后&#xff0c;根据各个终端的缓存情况&#xff0c;进行RU资源的分配&#xff0c;当分配完成后&#xff0c;进行上行OFDMA的传输。在这个过程中&#xff0c;缓存情况的反馈可以通过AP询问的方式主动完成&…

Visual Studio Code.app/vscode学习

vscode快速上手使用。 目录&#xff1a; 前言快捷键篇布局篇插件篇vscode问题&#xff08;调试step into无法跳入&#xff09; 建议阅读时间&#xff1a;7min x.1 前言 vscode是一款免费的轻量级编辑器&#xff0c;搭配vim可以实现带debug能力的vim自由。 vim自由&#xf…

OpenAI-ChatGPT最新官方接口《错误代码大全》全网最详细中英文实用指南和教程,助你零基础快速轻松掌握全新技术(九)(附源码)

Error codes 错误码 前言Introduction 导言API errors API 错误401 - Invalid Authentication 401 -验证无效401 - Incorrect API key provided 401 -提供的API密钥不正确401 - You must be a member of an organization to use the API 401 -您必须是组织的成员才能使用API429…

Apollo配置中心2.0版本详解

目的 部署一个单节点的Apollo配置中心,且包含dev和pro环境的配置。 需要部署的服务 Portal Service, Admin Service, Config Service(包含Meta service 和 Eureka) 架构图 部署图 部署过程: 数据库脚本: portaldb: https://github.com/apolloconfig/apollo/blob/mast…

HTML+CSS+JS 学习笔记(三)———Javascript(上)

&#x1f331;博客主页&#xff1a;大寄一场. &#x1f331;系列专栏&#xff1a;前端 &#x1f331;往期回顾&#xff1a;HTMLCSSJS 学习笔记&#xff08;一&#xff09;———HTML(上) HTMLCSSJS 学习笔记&#xff08;一&#xff09;———HTML(中) HTMLCSSJS 学习笔记&#…

2023年14届蓝桥杯省赛“日期统计”题解

问题描述 小蓝现在有一个长度为 100 的数组&#xff0c;数组中的每个元素的值都在 0 到 9 的范围之内。数组中的元素从左至右如下所示&#xff1a; 5 6 8 6 9 1 6 1 2 4 9 1 9 8 2 3 6 4 7 7 5 9 5 0 3 8 7 5 8 1 5 8 6 1 8 3 0 3 7 9 2 7 0 5 8 8 5 7 0 9 9 1 9 4 4 6 8 6 3 …

cubase elements12中文免费版 详细安装流程

cubase9免费版下载是由Steinberg公司开发的一款音乐制作软件&#xff0c;具有音频编辑处理、多轨录音缩混、视频配乐及环绕声处理等功能&#xff0c;对作曲家和混合工程师来说十分好用&#xff0c;可以大大提高编辑效率&#xff0c;需要的朋友赶快下载吧&#xff01; 软件地址&…

【Shell-HDFS】使用Shell脚本判断HDFS文件、目录是否存在

【Shell-HDFS】使用Shell脚本判断HDFS文件、目录是否存在 1&#xff09;文档编写目的2&#xff09;测试原理3&#xff09;Shell脚本测试3.1.测试路径是否存在3.2.测试目录是否存在3.3.测试文件是否存在3.4.测试路径大小是否大于03.5.测试路径大小是否等于0 4&#xff09;总结 1…

调节磁盘和CPU的矛盾-InnoDB的Buffer

一、缓存的重要性 所以 InnoDB 存储引擎在处理客户端的请求时&#xff0c;当需要访问某个页的数据时&#xff0c;就会把完整的页的数据全部加载到内存中&#xff0c;也就是说即使我们只需要访问一个页的一条记录&#xff0c;那也需要先把整个页的数据加载到内存中。将整个页加…

十大排序总结

十大排序 稳定性 有一个数组,3,2,3,4,1 我们把第一个3标记为黑色 第二个3标记为红色 如果排序之后,黑色的3仍在红色3前边,我们就认为这个排序是稳定的 如果红色3在黑色3前面,我们就认为这个排序是不稳定的 插入排序 直接插入排序 想象斗地主时咋摸牌的,保证有序 import…

【Linux系统】Linux文件系统与日志分析

文件系统与日志分析 一、inode与block1.1inode和block概述1.2inode包含文件的元信息1.3Linux系统文件的三个主要时间属性1.4用户通过文件名打开文件的过程1.5inode的大小1.6模拟磁盘ionde用完 二、日志文件2.1日志的功能2.2日志文件的分类2.2.1内核及系统日志2.2.2用户日志2.2.…