Flink入门实战详解

news2024/11/24 5:09:06

Flink入门实战

Flink项目构建

1)基于Maven+Idea创建项目:

使用maven进行项目构建,如图1所示。

图-34 构建maven项目

输入项目中的maven的坐标和存储坐标,如图2所示。

图2 maven坐标和存储位置

2)Maven依赖:

    <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.12</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <hadoop.version>2.6.0</hadoop.version>
    <flink.version>1.9.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <!-- flink-2-hadoop-->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop-2</artifactId>
        <version>2.7.5-9.0</version>
    </dependency>
    <!-- lombok -->
    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <type>pom</type>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58</version>
    </dependency>
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.4</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!--<arg>-make:transitive</arg>-->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <!--
                                    zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                    -->
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>chapter1.BatchWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Flink基础API概念

Flink编程是在分布式集合的基础的规律的编程模型(比如,执行filtering,mapping,updating,state,joining,grouping,defining,windows,aggregating)。这些集合可以通过外部数据源(比如从文件,kafka的topics、本地或者内存的集合)。通过下沉算子返回结果,比如将数据写入到一个分布式的文件中,或者控制台。Flink程序可以基于各种context、stanalone或者嵌入其他程序进行运行。可以在本地的jvm或者在多台机器间分布式运行。

基于外部的数据源,比如有界或者无界的数据源,我们可能会选择使用批处理的DataSet API或者流处理的DataStream API来处理。

需要注意的是,在DataStream和DataSet中的绝大多数的API是一致的,只需要替换对应的ExecutionEnvironment或者StreamExecutionEnvironment即可。

Flink在编程的过程中使用特定类——DataSet和DataStream来体现数据,类似Spark中的RDD。可以将其认为是一个可以拥有重复的不可变的集合。其中DataSet表示的是一个有界的数据集,DataStream则表示的是无界的集合。

这些集合在一些关键的地方和Java中的普通集合不同。首先,DataSet和DataStream是不可变的,这就意味着一旦被创建,便不能进行add或者remove的操作。同样也不能简单的查看集合内部的元素。

Flink可以通过外部的数据源来创建DataSet或者DataStream,也可以通过在一个已知的集合上面执行一系列的Transformation操作来转换产生新的集合。

Flink程序看起来就是一个普通的程序,进行数据的转换,每一个程序包含如下相同的集合基础概念,通用编程步骤如下:

1)创建一个执行环境ExecutionEnvironment。

2)加载或者创建初始化数据——DataSet或者DataStream。

3)在此数据基础之上进行特定的转化操作。

4)将计算的结果输出到特定的目的地。

5)触发作业的执行。

Flink编程的入口,便是ExecutionEnvironment,不同之处在于,DataSet和DataStream使用的ExecutionEnvironment不同。DataSet使用ExecutionEnviroment,而DataStream使用StreamExectionEnvironment。

获得ExecutionEnvironment可以通过ExecutionEnvironment的如下方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这种方式会自动选择正确的context。如果我们在IDE中执行,则会创建一个Local的Context,如果打包到集群中执行,会返回一个Cluster的Context。

加载数据源的方式有多种。可以一行一个的读入,比如CSV文件,或者自定义格式。如果只是从一个文本文件中按顺序读取行数据。只需要如下操作即可。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path")

创建了一个DataStream或者DataSet,接下来便可以执行各种transformation转换操作。比如执行一个map操作。

创建一个新的DataStream,类型为Integer的集合。

DataStream中包含了最终的结果,我们可以将结果通过创建一个sink操作,写入外部系统中,比如:writeAsText(path)

一旦我们完成整个程序,我们需要通过调用StreamExecutionEnvironment的execute()方法来触发作业的执行。基于ExecutionEnvironment会在本地或者集群中执行。

execute()方法返回值为JobExecutionResult,包含本次执行时间或者累加器结果信息。

与Spark中的Transformation操作相同,Flink中的Transformation操作是Lazy懒加载的,需要execute()去触发。基于此,我们可以创建并添加程序的执行计划。进行任务调度和数据分离,执行更加高效。

目前Flink支持7种数据类型,分别为:

1)Java Tuples和Scala Case Classes。

2)Java POJOS(一种数据结构类型)。

3)Primitive Types(Java的基本数据类型)。

4)Regular Classes(普通类)。

5)Values。

6)Hadoop Writables。

7)SpecialTypes。

DataSet批处理API

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接, 分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._

object WordCountOps {

def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements("Who's there?",

"I think I hear them. Stand, ho! Who's there?"

)

val wordCounts:DataSet[(String, Int)] = text

.flatMap(line => line.split("\\s+")).map((_, 1))

.groupBy(0)

.sum(1)

wordCounts.print()

}

}

Streaming流式处理API

Flink中的DataStream程序是实现数据流转换的常规程序(例如 filtering, updating state, defining windows, aggregating)。最初从各种源(例如, message queues, socket streams, files)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object StreamDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val file = env.socketTextStream("localhost", 9999)
        val spliFile: DataStream[String] = file.flatMap(_.split(" "))
        val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
        val keyed = wordAndOne.keyBy(data=>data._1)
        val wordAndCount: DataStream[(String, Int)] = keyed.sum(1)
        wordAndCount.print()
        env.execute()
    }
}

要运行示例程序,首先从终端使用netcat启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。

Flink程序提交到集群

1)Web提交方式:

图3 web提交方式

1)脚本方式:

#!/bin/sh

FLINK_HOME=/home/bigdata/apps/flink

$FLINK_HOME/bin/flink run \

-c BatchDemo \

/root/wc.jar \

hdfs://hadoop101:8020/wordcount/words.txt \

hdfs://hadoop101:8020/wordcount/output3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1846413.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BEVM基于OP-Stack发布首个以WBTC为GAS连接以太坊和比特币生态的中继链

为了更好的连接以太坊和比特币生态&#xff0c;BEVM团队正在基于OPtimism的OP Stack来构建一个以WBTC为GAS兼容OP-Rollup的中继链&#xff0c;这条中继链将作为一种完全去中心化的中间层&#xff0c;把以太坊上的主流资产(WBTC/ ETH/USDC/USDT等)引入到BEVM网络。 不仅如此&am…

最新扣子(Coze)实战案例:扣子卡片的制作及使用,完全免费教程

&#x1f9d9;‍♂️ 大家好&#xff0c;我是斜杠君&#xff0c;手把手教你搭建扣子AI应用。 &#x1f4dc; 本教程是《AI应用开发系列教程之扣子(Coze)实战教程》&#xff0c;完全免费学习。 &#x1f440; 关注斜杠君&#xff0c;可获取完整版教程。&#x1f44d;&#x1f3f…

Element-UI实现el-dialog弹框拖拽功能

在实际开发中&#xff0c;会发现有些系统&#xff0c;弹框是可以在浏览器的可见区域自由拖拽的&#xff0c;这极大方便用户的操作。但在查看Element-UI中弹框&#xff08;el-dialog&#xff09;组件的文档时&#xff0c;发现并未实现这一功能。不过也无须担心&#xff0c;vue中…

GaussDB关键技术原理:高性能(一)

引言 对数据库性能进行优化是令人激动的&#xff0c;无论是对其进行性能需求分析、性能需求设计、性能问题定个位都是富于变化又充满挑战的工作&#xff0c;本章围绕“数据库性能”进行全面系统化的介绍&#xff0c;首先从数据库在现代软件栈中所处的位置出发&#xff0c;介绍…

车载模块负载基础认识

车载模块负载是指车辆上的各种电子设备和系统&#xff0c;如导航系统、音响系统、空调系统、安全气囊等。这些设备和系统在车辆运行过程中需要消耗一定的电能&#xff0c;以保证其正常工作。车载模块负载的基础认识主要包括以下几个方面&#xff1a; 1. 负载类型&#xff1a;车…

计算机毕业设计Python+Spark音乐推荐系统 音乐数据分析 音乐可视化 音乐爬虫 音乐大数据 大数据毕业设计 大数据毕设

2023届本科生毕业论文&#xff08;设计&#xff09;开题报告 知识图谱音乐推荐系统 学 院&#xff1a; XXX 专 业&#xff1a; XXX 年 级 班 级&#xff1a; XXX 学 生 姓 名&#xff1a; XXX 指 导 教 师&#xff1a; XXX 协助指导教师&#xff1a; …

收藏丨世界地形图(超高清)

原文链接https://mp.weixin.qq.com/s?__bizMzUyNzczMTI4Mg&mid2247668146&idx2&snbebd2f4921994ab05ed47efdebe8e706&chksmfa770e8fcd008799bf1d1cabd62edca7f60a5c7a1ef9c0bacf3bdc102bccf0f12d54d6c07c49&token1405091246&langzh_CN&scene21#we…

[信号与系统]模拟域中的一阶低通滤波器和二阶滤波器

前言 不是学电子出身的&#xff0c;这里很多东西是问了朋友… 模拟域中的一阶低通滤波器传递函数 模拟域中的一阶低通滤波器的传递函数可以表示为&#xff1a; H ( s ) 1 s ω c H(s) \frac{1}{s \omega_c} H(s)sωc​1​ 这是因为一阶低通滤波器的设计目标是允许低频信…

Java 中 String 类

目录 1 常用方法 1.1 字符串构造 1.2 字符串包含的成员 1.3 String 对象的比较 1.4 字符串查找 1.5 转化 1.5.1 数值和字符串转化 1.5.2 大小写转化 1.5.3 字符串转数组 1.5.4 格式化 1.6 字符串替换 1.7 字符串拆分 1.8 字符串截取 1.9 其他操作方法 1.10 字符…

智慧办公新篇章:可视化技术引领园区管理革命

随着科技的飞速发展&#xff0c;办公方式也在经历着前所未有的变革。在这个信息爆炸的时代&#xff0c;如何高效、智能地管理办公空间&#xff0c;成为了每个企业和园区管理者面临的重要课题。 智慧办公园区作为未来办公的新趋势&#xff0c;以其高效、便捷、智能的特点&#x…

面向对象修炼手册(一)(类与对象)(Java宝典)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;面向对象修炼手册 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 思想 代理和团体 类 1 基本概…

设施布置之车间布局优化SLP分析

一 物流分析&#xff08;Flow Analysis&#xff09; 的基本方法 1、当物料移动是工艺过程的主要部分时&#xff0c;物流分析就是工厂布置设计的核心工作&#xff0c;也是物料搬运分析的开始。 2、零部件物流是该部件在工厂内移动时所走过的路线&#xff0c; 物流分析不仅要考虑…

鸿蒙开发通信与连接:【@ohos.wifi (WLAN)】

WLAN 说明&#xff1a; 本模块首批接口从API version 6开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import wifi from ohos.wifi;wifi.isWifiActive isWifiActive(): boolean 查询WLAN是否已激活。 需要权限&#xff1a; ohos.p…

java基于ssm+jsp KTV点歌系统

1管理员功能模块 管理员登录&#xff0c;通过填写注册时输入的用户名、密码进行登录&#xff0c;如图1所示。 图1管理员登录界面图 管理员登录进入KTV点歌系统可以查看个人中心、用户管理、歌曲库管理、歌曲类型管理、点歌信息管理等信息。 修改密码&#xff0c;在修改密码页…

【数据分享】《中国改革年鉴》1989-2022

最近老有同学过来询问《中国经济体制改革年鉴》、《中国改革年鉴》这两本数据的关系以及怎么获取这两本本数据。今天就在这里给大家分享一下这三本数据的具体情况。 《中国改革年鉴》由国家发展和改革委员会主管,中国经济体制改革研究会主办,中国经济体制改革杂志社编辑出版,是…

iOS开发工具-网络封包分析工具Charles

一、Charles简介 Charles 是在 Mac 下常用的网络封包截取工具&#xff0c;在做 移动开发时&#xff0c;我们为了调试与服务器端的网络通讯协议&#xff0c;常常需要截取网络封包来分析。 Charles 通过将自己设置成系统的网络访问代理服务器&#xff0c;使得所有的网络访问请求…

Lighthouse浮游菌采样器AC100H及操作使用介绍-中邦兴业

Lighthouse浮游菌采样器AC100H介绍 Lighthouse浮游菌采样器AC100是一款高性能的便携式浮游菌采样器&#xff0c;专为洁净室和无菌环境设计。它基于先进的光学技术和安德森撞击原理&#xff0c;实现了对微小浮游菌的高效采集。采样器内置了HEPA高效过滤器&#xff0c;能够过滤掉…

泽众云真机-平台即将升级支持华为机型HarmonyOS NEXT系统

具小编了解&#xff0c;泽众云真机即将升级支持华为机型HarmonyOS NEXT系统。有些人可能对HarmonyOS NEXT系统了解不多。 之前我们有个银行项目&#xff0c;客户要求测试华为HarmonyOS NEXT系统环境&#xff0c;当时我们云真机尚未有该系统的机型&#xff0c;然后技术人员向华为…

React的服务器端渲染(SSR)和客户端渲染(CSR)有什么区别?

React的服务器端渲染&#xff08;SSR&#xff09;和客户端渲染&#xff08;CSR&#xff09;是两种不同的页面渲染方式&#xff0c;它们各自有不同的特点和适用场景&#xff1a; 服务器端渲染&#xff08;SSR&#xff09; 页面渲染: 页面在服务器上生成&#xff0c;然后将完整的…

如何最简单的方式使用nodejs中的http-server发布轻量级的html网页

1、查看nodejs是否安装。 node 2、设置环境路径。 3、使用npm install http-server -g安装http-server >npm install http-server -g 5、启动http-server服务,查看是否正确安装。 http-server 6、查看是否能够正常运行。 5、创建文件夹&#xff0c;复制html、css、js、in…