Flink第一章:环境搭建

news2024/11/26 8:58:54

系列文章目录

Flink第一章:环境搭建


文章目录

  • 系列文章目录
  • 前言
  • 一、Idea项目
    • 1.创建项目
    • 2.pom.依赖
    • 3.DataSet
    • 4.DataStreaming
  • 二、环境搭建
    • 1.Standalone
    • 2.Flink on Yarn
  • 总结


前言

Flink也是现在现在大数据技术中火爆的一门,反正大数据的热门技术学的也差不多了,啃完Flink基本的大数据技术就差不多哦学完了.


一、Idea项目

1.创建项目

2.pom.依赖

这里说明一下我选择的环境.
java8
scala2.12
flink采用最新的1.17
请大家根据自己的环境更换版本

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>groupId</groupId>
    <artifactId>FlinkTutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.17.0</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>


    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
</project>

3.DataSet

:这里使用DataSet对数据进行批处理,但是在新版本flink中DataStreaming已经做到了流批一体,未来会慢慢移除DataSet接口,所以这里只是做个示例.
在这里插入图片描述
BatchWC.scala

package com.atguigu.chapter01

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, GroupedDataSet, createTypeInformation}


object BatchWC {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //2.读取文本数据
    val lineData: DataSet[String] = env.readTextFile("input/word.txt")

    //3.对数据进行处理
    val wordAneOne: DataSet[(String, Int)] = lineData.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: GroupedDataSet[(String, Int)] = wordAneOne.groupBy(0)

    val sum: AggregateDataSet[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()
  }
}

在这里插入图片描述

4.DataStreaming

DataStreaming进行批处理

BoundedStreamingWordCount.scala

package com.atguigu.chapter01

import org.apache.flink.streaming.api.scala._


object BoundedStreamingWordCount{
  def main(args: Array[String]): Unit = {
    //1.创建一个流式执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.读取文本文件
    val lineDataStreaming: DataStream[String] = env.readTextFile("input/word.txt")

    //3.对数据进行处理
    val wordAneOne: DataStream[(String, Int)] = lineDataStreaming.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: KeyedStream[(String, Int), String] = wordAneOne.keyBy(_._1)

    val sum: DataStream[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()

    //4.执行方法
    env.execute()
  }
}

在这里插入图片描述
DataStreaming进行流处理
StreamingWC.scala

package com.atguigu.chapter01

import org.apache.flink.streaming.api.scala._


object StreamingWC{
  def main(args: Array[String]): Unit = {
    //1.创建一个流式执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.读取文本文件
    val lineDataStreaming: DataStream[String] = env.socketTextStream("hadoop102",7777)


    //3.对数据进行处理
    val wordAneOne: DataStream[(String, Int)] = lineDataStreaming.flatMap(_.split(" ")).map(word => (word, 1))

    val wordAndOneGroup: KeyedStream[(String, Int), String] = wordAneOne.keyBy(_._1)

    val sum: DataStream[(String, Int)] = wordAndOneGroup.sum(1)

    sum.print()

    //4.执行方法
    env.execute()
  }
}

这里我们选择对hadoop102的7777端口进行监听,所以要提前打开虚拟机.
在这里插入图片描述
输入数据查看结果
在这里插入图片描述

二、环境搭建

在这里插入图片描述
我们直接使用官方推荐最新版.
官方下载连接

1.Standalone

但节点模式,一般用于数据测试,我们在hadoop102上进行.
上传并解压文件

tar -xvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
cd ../module/
mv flink-1.17.0/ flink

修改配置
本来单节点是不需要修改配置的,但是咱们虚拟机没有桌面,需要从外部访问,所以还是需要修改一下.
在这里插入图片描述
在203行修改,或者用vim的搜索功能.
在这里插入图片描述
启动Flink

./bin/start-cluster.sh

在这里插入图片描述
在Web UI界面查看一下
hadoop102:8081
在这里插入图片描述
现在我们跑一下官方的测试案例进行测试.

 ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

在这里插入图片描述
在这里插入图片描述
然后直接停掉集群Flink就行了,命令行操作,咱们后边再说.

./bin/stop-cluster.sh

在这里插入图片描述

2.Flink on Yarn

修改环境变量

vim /etc/profile.d/my_env.sh 

新增这一行是Flink文档中要求,我也不知道啥意思
在这里插入图片描述
然后source环境.
修改配置文件

vim ./conf/masters

在这里插入图片描述

vim ./conf/workers

在这里插入图片描述
修改完之后,启动集群.
因为我们在Yarn上完成任务,所以我们要启动Hadoop集群.
在这里插入图片描述
向Yar提交任务,有三证模式,其中包括.
会话模式,应用模式,单作业模式.

应用程序模式将在 YARN 上启动一个 Flink 集群,其中应用程序 jar 的 main() 方法在 YARN 中的 JobManager 上执行。 应用程序完成后,群集将立即关闭。您可以使用或通过取消 Flink 作业手动停止集群。

会话模式有两种操作模式:
附加模式(默认):客户端将 Flink 集群提交到 YARN,但客户端继续运行,跟踪集群的状态。如果群集失败,客户端将显示错误。如果客户端被终止,它也会向群集发出关闭信号。yarn-session.sh
分离模式(或):客户端将 Flink 集群提交到 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。-d–detachedyarn-session.sh

单作业模式
单作业集群模式将在 YARN 上启动一个 Flink 集群,然后在本地运行提供的应用程序 jar,最后将 JobGraph 提交给 YARN 上的 JobManager。如果传递参数,则客户端将在接受提交后停止。–detached

官方建议使用应用模式,并且单作业模式已经从1.15之后就被移除了,所以咱们只演示前两种.
如果日后工作有需要,自己看看文档就行了

应用模式
这里直接跑官方给的案例了.

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

在这里插入图片描述
记住这里给的项目appID 后边要用

查看项目

./bin/flink list -t yarn-application -Dyarn.application.id=application_1682757957558_0001

在这里插入图片描述
停止项目

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_1682757957558_0001 ff18e3a66c94a581f8da0c027bbe4bc3

在这里插入图片描述
当集群上没有项目时,项目就会停止,这时在查看项目,就会报错.
在这里插入图片描述
由于当项目全部停止后,集群就会停止,所以当你的集群经常只跑单个项目时,就总会重启集群,所以生产中也不是最常用的.

生产中最常用的还是会话模式,它可以在没有项目运行的时候也使Flink集群处于启动状态.

会话模式
创建会话

 bin/yarn-session.sh -nm test -d

-nm 指定会话名称
-d 将当前会话挂载到后台
在这里插入图片描述
启动成功后,有两条日志需要注意一下,一个是Web UI的网址,一个是关掉会话的方法.
我们先去Web查看一下
在这里插入图片描述
因为他是动态分配,所以显示的可用资源永远都是0,当任务提交时,他会向Yarn申请资源,然后执行任务.
我们将之前写的代码打包然后将其提交到hadoop102Flink文件
在这里插入图片描述
提交任务
记得要开启nc
在这里插入图片描述

./bin/flink run -c com.atguigu.chapter01.StreamingWC ./FlinkTutorial-1.0-SNAPSHOT.jar 

在这里插入图片描述
Web查看一下
在这里插入图片描述
可用资源还是0,但是这个任务已经跑一起来了,现在查看一下效果.
在这里插入图片描述
在这里插入图片描述
关闭项目.
在这里插入图片描述
在这里插入图片描述
关闭会话

echo "stop" | ./bin/yarn-session.sh -id application_1682757957558_0002

在这里插入图片描述
至此Flink环境搭建完成,建议保留快照


总结

Flink是做数据实时分析必不可少的技术,也要学习.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/475581.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Packet Tracer - 研究直连路由

Packet Tracer - 研究直连路由 目标 第 1 部分&#xff1a;研究 IPv4 直连路由 第 2 部分&#xff1a;研究 IPv6 直连路由 拓扑图 背景信息 本活动中的网络已配置。 您将登录路由器并使用 show 命令发现并回答以下有关直连路由的问题。 注&#xff1a;用户 EXEC 密码是 c…

A2B汽车音响系统开发设计与改装

hezkz17进数字音频系统研究开发答疑群 1 前装与后装

安装了Volar插件vue文件没有显示Volar的图标

vue3官网 推荐使用Volar来替换Vetur 一、安装Volar 安装Volar前&#xff1a; 安装Volar后&#xff1a; 二、安装Volar插件后&#xff0c;无法显示高亮 之前我安装Volar插件后&#xff0c;vue文件的<script>、<template>、<style>标签仍然是白色的&#xff0c…

Doris(17):动态分区

动态分区是在 Doris 0.12 版本中引入的新功能。旨在对表级别的分区实现生命周期管理(TTL)&#xff0c;减少用户的使用负担。 目前实现了动态添加分区及动态删除分区的功能。 1 原理 在某些使用场景下&#xff0c;用户会将表按照天进行分区划分&#xff0c;每天定时执行例行任…

【网课平台】Day14.集成RabbitMQ:消息队列实现异步通知

文章目录 一、需求&#xff1a;支付通知1、需求分析2、技术方案3、集成RabbitMQ4、生产端发送消息5、消费方发送消息 二、需求&#xff1a;在线学习1、需求分析2、表设计与实体类3、接口定义--查询课程4、接口定义获取视频5、Service层开发6、FeignClient定义7、代码完善 三、需…

数字设计小思 - D触发器与死缠烂打的亚稳态

前言 本系列整理数字系统设计的相关知识体系架构&#xff0c;为了方便后续自己查阅与求职准备。在FPGA和ASIC设计中&#xff0c;D触发器是最常用的器件&#xff0c;也可以说是时序逻辑的核心&#xff0c;本文根据个人的思考历程结合相关书籍内容和网上文章&#xff0c;聊一聊D…

Hudi数据湖技术之数据中心案例实战

目录 1 案例架构2 业务数据2.1 客户信息表2.2 客户意向表2.3 客户线索表2.4 线索申诉表2.5 客户访问咨询记录表 3 Flink CDC 实时数据采集3.1 开启MySQL binlog3.2 环境准备3.3 实时采集数据3.3.1 客户信息表3.3.2 客户意向表3.3.3 客户线索表3.3.4 客户申诉表3.3.5 客户访问咨…

微信小程序 WebSocket 通信 —— 在线聊天

在Node栏目就讲到了Socket通信的内容&#xff0c;使用Node实现Socke通信&#xff0c;还使用两个流行的WebSocket 库&#xff0c;ws 和 socket.io&#xff0c;在小程序中的WebSocket接口和HTML5的WebSocket基本相同&#xff0c;可以实现浏览器与服务器之间的全双工通信。那么本篇…

SSH 服务器、NFS 服务器、TFTP 服务器详解及测试

文章目录 前言一、SSH 服务器1、SSH 能做什么&#xff1f;2、安装 SSH 服务器3、测试 SSH 服务4、用 SecureCRT 测试 二、NFS 服务器1、NFS 能做什么&#xff1f;2、安装 NFS 软件包3、添加 NFS 共享目录4、启动 NFS 服务5、测试 NFS 服务器 三、TFTP 服务器1、TFTP 能做什么&a…

轻松掌握mysql事务的四大特性ACID及实现原理

1、介绍 要实现这四大特性&#xff0c;我们先了解下mysql中的缓冲池和数据页 2、保证原子性和一致性 1、通过undo log保证数据的原子性和一致性 undo log保证了事务的原子性和一致性。 3、保证隔离性 1、并发事务产生时容易产生的隔离性问题 脏读 不可重复读 幻读…

【数据库复习】第四章数据库保护 1

数据库安全性&#xff1a; 数据库的一大特点是数据可以共享 数据共享必然带来数据库的安全性问题 数据库系统中的数据共享不能是无条件的共享 用户标识与鉴别 用户名和口令易被窃取&#xff0c;每个用户预先约定好一个计算过程或者函数 存取控制 常用存取控制方法 自主存…

电子电气架构——车辆E/E架构常识

我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 人只有在举棋不定,无从把握的时候才感到疲惫。只有去行动就能获得解放,哪怕做的不好也比无所作为强! 本文主要介绍车辆E/E架构常识,主要涉及E/E架构面临…

LNMP、Tomca

构建Nginx服务器 使用源码包安装nginx软件包 [rootproxy ~]# yum -y install gcc pcre-devel openssl-devel #安装依赖包 [rootproxy ~]# useradd -s /sbin/nologin nginx [rootproxy ~]# tar -xf nginx-1.17.6.tar.gz [rootproxy ~]# cd nginx-1.17.6 [rootproxy …

【Shell编程之循环语句与函数】

目录 一、for循环语句示例:示例1示例2 示例3 二、跳出循环举例 转义符三、while 语句的结构示例: 四、until语句的结构1、基本格式 五、seq命令 一、for循环语句 读取不同的变量值&#xff0c;用来逐个执行同一组命令 #!/bin/bash for(( i0;i<5;i ))i0 定义for循环i变量初…

【逻辑位移和算数位移】

<< 运算符 && >> 运算符 正数位移 当 x>>n 中 x 为正数时&#xff0c;会将x的所有位右移x位&#xff0c;同时左边高位补0 显而易见&#xff0c;运算结束后&#xff0c;值为1 。 可知右移n位&#xff0c;结果就是 x / 2^n&#xff1a;7 / 2 ^2 1;…

运行vue项目报DONE Build complete. The dist directory is ready to be deployed.解决办法

一、问题描述 今天在运行一个vue项目时发现运行途中报这样一个错误&#xff0c;经过查阅相关资料可知&#xff0c;这是dist文件夹下 二、解决办法 根据官方文档&#xff0c;目录需要启动一个 HTTP 服务器来访问 (除非你已经将 publicPath 配置为了一个相对的值)&#xff0c;所…

flex垂直方向布局与overflow结合使用

主要是需要留意 flex布局和overflow 之间的关系&#xff0c; 最外面的container 和 里面的main-box 之间分别使用了flex布局 和 overflow:hiddenmain-box 和 里面的main-body 之间分别使用了 flex布局 和 overflow:auto 有点类似于聊天的窗口布局 <!DOCTYPE html> <h…

zabbix监控远程主机

zabbix监控远程主机 在Zabbix服务器上安装Zabbix代理程序 在远程主机上安装Zabbix代理程序。安装方式取决于操作系统&#xff0c;可以从Zabbix官网上下载相应的安装包进行安装。 监控agent1 在agent1上安装agent yum install zabbix-agent另外在zabbix server上要关闭防火…

【前端知识】内存泄漏与垃圾回收机制 (上)

【前端知识相关分享】内存泄漏与垃圾回收机制 &#xff08;上&#xff09; 1. 内存的生命周期1.1 内存生命周期的一般流程1.2 C&#xff0c;JS和python内存分配和释放的区别 2. JS中的内存管理2.1 两种数据类型与两种内存一个思考2.2 两种内存空间的区别对比 3. 内存泄漏的定义…