【Flink-1.17-教程】-【一】Flink概述、Flink快速入门

news2024/11/17 16:54:00

【Flink-1.17-教程】-【一】Flink概述、Flink快速入门

  • 1)Flink 是什么
    • 1.1.有界流和无界流
    • 1.2.Flink 的发展史
  • 2)Flink 特点
  • 3)Flink vs SparkStreaming
  • 4)Flink 的应用场景
  • 5)Flink 分层 API
  • 6)Flink 快速入门
    • 6.1.创建项目
    • 6.2.WordCount 代码编写
      • 6.2.1.批处理(了解)
      • 6.2.2.流处理(主流)

1)Flink 是什么

在这里插入图片描述

1.1.有界流和无界流

在这里插入图片描述

在这里插入图片描述

1.2.Flink 的发展史

在这里插入图片描述

2)Flink 特点

在这里插入图片描述

3)Flink vs SparkStreaming

在这里插入图片描述

在这里插入图片描述

4)Flink 的应用场景

在这里插入图片描述

5)Flink 分层 API

在这里插入图片描述

6)Flink 快速入门

6.1.创建项目

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个 Flink 程序了。首先我们要做的,就是在 IDEA 中搭建一个 Flink 项目的骨架。我们会使用 Java 项目中常见的 Maven 来进行依赖管理。

1、创建工程

(1)打开 IntelliJ IDEA,创建一个 Maven 工程。

在这里插入图片描述

(2)将这个 Maven 工程命名为 FlinkTutorial。

在这里插入图片描述

(3)选定这个 Maven 工程所在存储路径,并点击 Finish,Maven 工程即创建成功。

在这里插入图片描述

2、添加项目依赖

在项目的 pom 文件中,添加 Flink 的依赖,包括 flink-java、flink-streaming-java,以及 flink-clients(客户端,也可以省略)。

<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

6.2.WordCount 代码编写

需求:统计一段文字中,每个单词出现的频次。

环境准备:在 src/main/java 目录下,新建一个包,命名为 com.atguigu.wc。

6.2.1.批处理(了解)

批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次

1、数据准备

(1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt

(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

2、代码编写

(1)在 com.atguigu.wc 包下新建 Java 类 BatchWordCount,在静态 main 方法中编写代码。具体代码实现如下:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {

        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)
        DataSource<String> lineDS = env.readTextFile("input/words.txt");
        
        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word,1L));
                }
            }
        });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
        
        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

(2)输出

(flink,1)
(world,1)
(hello,3)
(java,1)

需要注意的是,这种代码的实现方式,是基于 DataSet API 的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。所以从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API 就没什么用了,在实际应用中我们只要维护一套 DataStream API 就可以。这里只是为了方便大家理解,我们依然用 DataSet API 做了批处理的实现。

6.2.2.流处理(主流)

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的 DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。

下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

1、读取文件

我们同样试图读取文档 words.txt 中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。

在 com.atguigu.wc 包下新建 Java 类 StreamWordCount,在静态 main 方法中编写代码。具体代码实现如下:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
    
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
        
        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(data -> data.f0)
           .sum(1);

        // 4. 打印
        sum.print();
        
        // 5. 执行
        env.execute();
    }
}

输出:

3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)

主要观察与批处理程序BatchWordCount的不同:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
  • 代码末尾需要调用env的execute方法,开始执行任务。

2、读取socket文本流

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

(1)将 StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket 文本流的方法 socketTextStream。具体代码实现如下:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);
        
        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");

            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        
        // 5. 执行
        env.execute();
    }
}

(2)在 Linux 环境的主机 hadoop102 上,执行下列命令,发送数据进行测试

[atguigu@hadoop102 ~]$ nc -lk 7777

注意:要先启动端口,后启动 StreamWordCount 程序,否则会报超时连接异常。

(3)启动 StreamWordCount 程序

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为 Flink 的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从 hadoop102 发送数据

①在 hadoop102 主机中,输入“hello flink”,输出如下内容

13> (flink,1)
5> (hello,1)

②再输入“hello world”,输出如下内容

2> (world,1)
5> (hello,2)

泛型擦除的说明:

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的——只告诉 Flink 当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

因为对于 flatMap 里传入的Lambda表达式,系统只能推断出返回的是 Tuple2 类型,而无法得到 Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1386485.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity寻路A星算法

文章目录 实现步骤概览&#xff1a; 计算移动成本1. **定义移动成本函数**&#xff1a;2. **考虑不同类型的格子**&#xff1a;3. **动态调整成本**&#xff1a;4. **实际应用**&#xff1a; 优先级队列1. **初始化**&#xff1a;2. **节点评估**&#xff1a;3. **更新节点状态…

电脑桌面便签在哪设置?备忘录软件哪个好?

好记性不如烂笔头&#xff01;相信很多打工族在电脑面前办公的时候&#xff0c;都需要随时记录工作中的事项&#xff0c;有的用TXT记录&#xff0c;有的手写笔记&#xff0c;还有一些用电脑桌面便签类软件。而当我们待办事项繁多的时候&#xff0c;手写或文本记录并不能有效帮我…

删除运行框中的文件打开历史记录

当我们使用everything、百度、迅雷等软件&#xff0c;在列表中右键选中打开文件夹时。 当使用 winR 快捷键等方式打开运行时&#xff0c;输入盘符会出现之前打开过的文件夹&#xff0c; 一方面展示的特别多会比较混乱&#xff0c;另一方面 记得在之前的window版本中&#xff08…

What is `@Repository` does?

Repository 是Spring注解&#xff0c;标识数据访问层组件&#xff08;DAO, Data Access Object&#xff09; 当一个类被标记为 Repository 时&#xff1a; 1、组件扫描与自动代理&#xff1a; Spring通过组件扫描&#xff08;Component Scan&#xff09;机制发现带有 Reposit…

KEIL MDK 工程(.uvprojx)目录更改后快速修改方法

概述 在项目过多时&#xff0c;为了方便管理&#xff0c; 需借用文件夹命名来&#xff0c;举例&#xff1a; 1、原工程是在“STM32_Test_Project\MDK-ARM”目录下&#xff0c;我现在创建新文件夹&#xff0c;“Test”&#xff0c;避免原来的受污染&#xff0c;或者好管理等等好…

Linux 多个php版本选择需要的php的版本(修改环境变量)

这两天遇到了个问题&#xff0c; 原本服务器的php版本是7.3.13&#xff0c;经过一些操作之后不知道怎么了变成了5.6 #php版本查看 php -v然后我就对 5.6版本进行了升级&#xff0c;升级到了7.3.33&#xff0c; 这个时候 php -v 是7.3.33&#xff0c; 神奇的一幕出现了&#xf…

鸿蒙开发之组合手势

当我们需要支持多个手势的时候&#xff0c;可以通过GestureGroup来实现&#xff0c;如下实现了同时支持Tap和Pan手势 import Prompt from system.prompt Entry Component struct OfficialGestureGroupPage {State message: string Hello Worldbuild() {Column() {Column() {T…

STM32F103标准外设库——认识STM32(一)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

SSH远程访问与控制

目录 ssh优点 作用 SSH的 软件 公钥首次连接原理 ssh远程登录 shh命令 远程连接 直接连接先输入ssh IP 连接指定用户 在 /etc/ssh/sshd_config下面修改端口号 修改服务端配置文件 ​编辑 白名单&#xff0c;只能登录本机的mcb用户 SSH服务的最佳实践 openSSH 服…

Vue.js轻量级框架:快速搭建可扩展的管理系统

一、前言 在项目实战开发中&#xff0c;尤其是大平台系统的搭建&#xff0c;针对不同业务场景&#xff0c;需要为用户多次编写用于录入、修改、展示操作的相应表单页面。一旦表单需求过多&#xff0c;对于开发人员来说&#xff0c;算是一种重复开发&#xff0c;甚至是繁杂的工作…

NetCore部署微服务(三)

接上文&#xff0c;服务端部署完成之后&#xff0c;同样我们也需要修改一下客户端代码 Blocking Queries 1.1 服务发现 在客户端代码中使用Nuget安装consul包 修改配置文件&#xff0c;我们首先需要把consul的请求地址配置在配置文件中 修改control方法 using Consul; usin…

第七在线荣获百灵奖 Buylink Awards 2023零售圈年度卓越服务商品牌

1月11日&#xff0c;由零售圈主办、20零售连锁协会协办、30零售行业媒体支持的中国零售圈大会暨2024未来零售跨年盛典在西安落下帷幕&#xff0c;在这个零售行业盛典中&#xff0c;第七在线凭借其高精尖产品和卓越的服务质量成功入选&#xff0c;并荣获了“百灵奖 Buylink Awar…

旧路由重置新路由设置新路由设置教程|适用于PPPoE拨号

前言 前几天朋友说路由器想要重置&#xff0c;但不知道怎么弄。所以就想着只帮忙重置路由器的话&#xff0c;只能帮到一个人。但把整个过程写成图文&#xff0c;就可以帮助更多人。 本文章适合电脑小白&#xff0c;请注意每一步哦&#xff01; 注意事项 开始之前需要确认光猫…

深度学习环境常用命令(持续更新......)

深度学习涉及常用命令 在深度学习过程中常涉及的命令记录备查。 本文中涉及命令均在windows上&#xff0c;使用Anaconda管理环境的情况下。 显卡环境相关命令 1.pytorch下查看cuda版本&#xff0c;查看cudnn版本 import torch print(torch.version.cuda) print(torch.back…

Docker 如何安装 MySQL 并实现远程连接

Hello各位小伙伴们大家好&#xff01;我是咕噜铁蛋&#xff01;随着云计算和容器化技术的兴起&#xff0c;Docker 已经成为现代软件开发的核心工具之一。它提供了一种轻量级、可移植、自包含的部署方式&#xff0c;使得开发人员可以更加便捷地构建、测试和发布应用程序。而 MyS…

docker安装部署Elasticsearch(ES)以及相关配置

Elasticsearch简介 mysql用作持久化存储&#xff0c;ES用作检索 基本概念&#xff1a;index库>type表>document文档 index索引&#xff08;相当于MySQL的数据库&#xff09; 动词&#xff1a;相当于mysql的insert 名词&#xff1a;相当于mysql的db Type类型&#xff…

Rust-借用检查

Rust语言的核心特点是&#xff1a;在没有放弃对内存的直接控制力的情况下&#xff0c;实现了内存安全。 所谓对内存的直接控制能力&#xff0c;前文已经有所展示&#xff1a;可以自行决定内存布局&#xff0c;包括在栈上分配内存&#xff0c;还是在堆上分配内存&#xff1b;支…

【开发篇】三、并发下的OOM分析

文章目录 1、并发下的OOM分析2、Jmeter模拟并发 1、并发下的OOM分析 用户请求过来&#xff0c; 后端查询数据库后封装Vo对象返回给前端后&#xff0c;然后正常这个Vo就可以被GC清理掉了。 但并发时&#xff0c;如果数据处理时间很长&#xff0c;大量对象存于内存&#xff0c;或…

开源28181协议视频平台搭建流程

最近项目中用到流媒体平台&#xff0c;java平台负责信令部分&#xff0c;c平台负责流媒体处理&#xff0c;找了评分比较好的开源项目 https://gitee.com/pan648540858/wvp-GB28181-pro 流媒体服务基于 c写的 https://github.com/ZLMediaKit/ZLMediaKit 说明文档&#xff1a;h…

web前端(第二次作业)

1、计算用户指定的数值内的奇数和。例如用户输入的是 10&#xff0c;则计算 1 3 5 7 9 的和 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><script>var nprompt("请输入数值&#xff1a;&…