Flink---1、概述、快速上手

news2024/12/24 17:23:09

1、Flink概述

1.1 Flink是什么

Flink的官网主页地址:https://flink.apache.org/
Flink的核心目标是“数据流上有状态的计算”(Stateful Computations over Data Streams)。
具体说明:Apache Flink是一个“框架和分布式处理引擎”,用于对无界有界数据流进行有状态计算。
在这里插入图片描述

1.1.1 无界数据流

  • 有定义流的开始,但是没有定义流的结束
  • 它们会无休止的产生数据
  • 无界流的数据必须持续处理,即数据被摄取后需要立即处理。我们不能等到所有数据都到达再处理,因为输入时无限的。

1.1.2 有界数据流

  • 有定义流的开始,也有定义流的结束
  • 有界流可以在摄取所有数据后再进行计算
  • 有界流所有的数据可以被排序,所有并不需要有序摄取
  • 有界流处理通常被称为批处理

1.1.3 有状态流处理

把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态,这就是所谓的“有状态的流处理”。
在这里插入图片描述

  • 状态在内存中:优点:速度快;缺点:可靠性差
  • 状态在分布式系统中:优点:可靠性高;缺点:速度慢

1.1.4 Flink发展历史

在这里插入图片描述

1.2 Flink特点

我们处理数据的目标是:低延迟、高吞吐、结果的准确性和良好的容错性。
Flink主要特点如下:

  • 高吞吐和低延迟:每秒处理数百万个事件,毫秒级延迟
  • 结果的准确性:Flink提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到最常用的外部系统,如kafka、Hive、JDBC、HDFS、Redis等
  • 高可用:本身高可用的设置,加上K8S,Yarn和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7x24全天候运行。

1.3 Flink和SparkStreaming(说实话没有比较的必要)

1、Spark是以批处理为根本。
2、Flink是以流处理为根本。

1.4 Flink的应用场景

1、电商和市场营销
2、物联网(IOT)
3、物流配送和服务业
4、银行和金融业

1.5 Flink分层API

在这里插入图片描述

  • 有状态流处理:通过底层API(处理函数),对原始数据加工处理。底层API和DataStreamAPI相集成,可以处理复杂的计算。
  • DataStreamAPI(流处理)和DataSetAPI(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map,flatMap等),连接(joins),聚合(aggregations),窗口(Windows)操作等。注意:Flink1.12后,DataStreamAPI已经实现真正的流批一体,所以DataSetAPI已经过时。
  • TableAPI是以表为中心的声明式编程,其中表可能会动态变化。TableAPI遵循关系模型;表有二维数据结构,类似于关系数据库中的表,同时API提供可比较的操作,例如select、project、join、group by、aggregate等。我们可以在表与DataStream/DataSet之间无缝切换,以允许程序将TableAPI与DataStream以及DataSet混用。
  • SQL这一层在语法与表达能力上与TableAPI类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与TableAPI交互密切,同时SQL查询可以直接在TableAPI定义的表上执行。

2、Flink快速上手

2.1 创建项目

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。
1、创建工程
(1)打开IntelliJ IDEA,创建一个Maven工程。
在这里插入图片描述
2、添加项目依赖

<properties>
        <flink.version>1.17.0</flink.version>
</properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
     </dependency>
</dependencies>

2.2 WordCount代码编写(大数据常用的例子)

需求:统计一段文字中,每个单词出现的频次
环境准备:创建一个com.zhm.wordcount包

2.2.1 批处理

批处理的基本思路:先逐行读入文件数据,然后将每一行文子拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
1、数据准备
(1)在工程根目录下新建一个data文件夹,并在下面创建文本文件words.txt
(2)在文件中输入一些单词

hello hello hello
world world
hello world

2、代码编写
(1)在com.zhm.wordcount包下新建一个Demo01_BatchProcess类


/**
 * @ClassName Batch
 * @Description 利用Flink批处理单词统计
 * @Author Zouhuiming
 * @Date 2023/9/3 9:58
 * @Version 1.0
 */

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
    计算的套路:
        (1) 计算的环境
            Spark:SparkContext
            MR:Driver
            Flink:ExecutionEnvironment
        (2) 把要计算的数据封装为计算模型
            Spark:RDD(Spark Core)
                    DateFrame|DataSet(SparkSQL)
                    DStream(SparkStream)
            MR:k-V
            Flink:DataSource
        (3)调用计算API
            RDD.转换算子()
            MR:自己去编写Mapper、Reducer
            Flink:DataSource.算子()


 */
public class Demo01_BatchProcess {
    public static void main(String[] args) throws Exception {
        //创建支持Flink计算的环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //使用环境去读取数据,封装为计算模型
        DataSource<String> dataSource = env.readTextFile("data/words.txt");
        //调用计算API
        dataSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {

            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(new Tuple2<String,Integer>(s1,1));
                }
            }
        }).groupBy(0)
                .sum(1)
                .print();
    }
}

运行结果:
在这里插入图片描述
注意:这种实现是基于DataSetAPI的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用DataStreamAPI,在提交任务时通过将执行模式设为BATCH来进行批处理;

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSetAPI就没有用了,在实际应用中我们只要维护一套DataStreamAPI就可以。这里只是为了方便大家理解,我们依然用DataSetAPI做了批处理的实现。

2.2.2 流处理

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批一体之后的DataStreamAPI更加强大,可以直接处理批处理和流处理的所有场景。
下面我们就针对不同类型的的输入数据源,用具体的代码来实现流处理。
1、读取文件(有界流)
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。
在com.zhm.wordcount包下新建一个Demo02_BoundedStreamProcess类

package com.zhm.wordcount;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName Demo02_BoundedStreamProcess
 * @Description 有界流
 * @Author Zouhuiming
 * @Date 2023/9/3 10:26
 * @Version 1.0
 */


public class Demo02_BoundedStreamProcess {
    public static void main(String[] args) throws Exception {
        //1、创建支持Flink计算的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.1 设置一个线程处理这个流(默认是根据你的cpu数和单词种类个数,取最小值)
//        env.setParallelism(1);

        //2、获取数据源
        FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/words.txt")).build();
        //3、利用环境将数据源的数据封装为计算模型
        DataStreamSource<String> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "myfile");
        //4、调用API对数据进行计算

        //4.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型(word,1)
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(new Tuple2<>(s1,1));
                }
            }
            //4.2 根据word分组
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
            //4.3 根据分组之后,按照元组中的第二列聚相加
        }).sum(1)
                // 4.4 打印结果
                .print();

        //5、提交job
        env.execute();
    }
}


运行结果:
在这里插入图片描述
和批处理程序BatchWordCount的不同:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
  • 转换处理之后,得到的数据对象类型不同
  • 分组操作调用的方法是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
  • 代码末尾需要调用env的execute方法,开始执行任务。

2、读取Socket文本流(无界流)
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续的处理捕获的数据。为了模拟这种场景,可以监听Socket端口,然后向该端口不断地发生数据。
(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取Socket文本流的方法socketTextStream。具体代码实现如下:

package com.zhm.wordcount;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName Demo03_UnBoundedStreamProcess
 * @Description 无界流
 * @Author Zouhuiming
 * @Date 2023/9/3 10:39
 * @Version 1.0
 */
public class Demo03_UnBoundedStreamProcess {
    public static void main(String[] args) throws Exception {

        //1、创建支持Flink计算的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.1 设置一个线程处理这个流
        env.setParallelism(1);

        //2、获取数据源
        DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);

        //3.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型(word,1)
        streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] split = s.split(" ");
                        for (String s1 : split) {
                            collector.collect(new Tuple2<>(s1,1));
                        }
                    }
                    //3.2 根据word分组
                }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2.f0;
                    }
                    //3.3 根据分组之后,按照元组中的第二列聚相加
                }).sum(1)
                // 3.4 打印结果
                .print();

        //4、提交job
        env.execute();
    }
}

(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试(前提是要安装netcat)

nc -lk hadoop102 9999

(3)启动Demo03_UnBoundedStreamProcess程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接受数据才会执行任务、输出统计结果。
在这里插入图片描述

(4)从hadoop102发送数据
在这里插入图片描述
(5)观察idea控制台
在这里插入图片描述
说明:Flink还具有一个类型提前系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的–只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显示地提供类型信息,才能使得应用程序正常工作或提高其性能。
因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple<String,Long>。只有显示地告诉系统当前的返回类型,才能正确的解析出完整数据。

2.2.3 执行模式

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamAPI执行模式包括:流执行模式、批执行模式和自动模式。

  • 流执行模式(Streaming)
    这是DataStreamAPI最经典的模式,一边用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。
  • 批执行模式(Batch)
    专门用于批处理的执行模式
  • 自动模式
    在这种模式下,将由程序根据输入数据源是否有界来自动选择执行模式。
    批执行模式的使用:主要有两种方式:
    (1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

(2)通过代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

2.2.4 本地WebUI

在Idea本地运行程序时,可以通过添加本地WebUI依赖,使用WebUI界面查看Job的运行情况。

  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

添加后,在代码中可以指定绑定的端口:

Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

之后,在程序启动后,打开本地浏览器,访问localhost:3333即可查看job的运行情况。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/971900.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年轨道交通行业研究报告

第一章 行业概况 1.1 定义和分类 在全球行业分类标准&#xff08;GICS&#xff09;的框架下&#xff0c;轨道交通行业被精准地划定为交通运输行业的一个重要子集&#xff0c;其主要职责是专注于沿着预设轨道路线进行乘客和货物运输的系统的设计、实施、维护以及管理。这个行业…

C# 采用3DES-MAC进行签名 base64解码与编码

** 3DES-MAC ** 3DES-MAC&#xff08;Triple Data Encryption Standard Message Authentication Code&#xff09;是一种消息认证码&#xff08;MAC&#xff09;算法&#xff0c;用于验证消息的完整性和真实性。3DES-MAC使用了3DES&#xff08;Triple Data Encryption Standa…

java八股文面试[JVM]——JVM性能优化

JVM性能优化指南 JVM常用命令 jps 查看java进程 The jps command lists the instrumented Java HotSpot VMs on the target system. The command is limited to reporting information on JVMs for which it has the access permissions. jinfo &#xff08;1&#xff09;实时…

第三章微服务配置中心

文章目录 Nacos配置中心统一配置管理在nacos中添加配置文件从微服务拉取配置 配置热更新多环境共享配置 搭建Nacos集群搭建集群初始化数据库配置Nacos启动nginx反向代理 Nacos配置中心 Nacos配置管理 Nacos除了可以做注册中心&#xff0c;同样可以做配置管理来使用。 统一配置…

Vue生成多文件pdf准考证

这是渲染的数据 这是生成的pdf文件&#xff0c;直接可以打印 需要安装和npm依赖和引入封装的pdf.js文件 npm install --save html2canvas // 页面转图片 npm install jspdf --save // 图片转pdfpdf.js文件 import html2canvas from "html2canvas"; import jsPDF …

容器编排工具的比较:Kubernetes、Docker Swarm、Nomad

随着容器化技术的普及&#xff0c;容器编排工具成为了现代应用部署和管理的重要组成部分。容器编排工具能够自动化容器的部署、扩展和管理&#xff0c;从而提高应用的可靠性和可伸缩性。在众多的容器编排工具中&#xff0c;Kubernetes、Docker Swarm和Nomad是三个备受关注的主要…

三维跨孔电磁波CT数据可视化框架搭建

三维跨孔电磁波CT数据可视化框架搭建 文章目录 三维跨孔电磁波CT数据可视化框架搭建1、三维CT可视化结果2、matlab代码2.1、CT数据格式整理并保存2.2、三维可视化 利用matlab实现对跨孔电磁波CT实测数据反演&#xff0c;并搭建了三维CT数据可视化框架&#xff0c;可装填实测CT反…

2023-09-04 LeetCode每日一题(序列化和反序列化二叉搜索树)

2023-09-04每日一题 一、题目编号 449. 序列化和反序列化二叉搜索树二、题目链接 点击跳转到题目位置 三、题目描述 序列化是将数据结构或对象转换为一系列位的过程&#xff0c;以便它可以存储在文件或内存缓冲区中&#xff0c;或通过网络连接链路传输&#xff0c;以便稍后…

Qt--自定义搜索控件,QLineEdit带前缀图标

写在前面 这里自定义一个搜索控件&#xff0c;通过自定义LineEdit的textChange信号&#xff0c;搜索指定内容&#xff0c;并以QCheckBox的方式显示在QListWidget中。 开发版本 Qt: 5.15.2 Qt: Creator10.0.2 编译环境&#xff1a;msvc2019_64bit release 效果 代码 自定义…

使用 Sealos 在离线环境中光速安装 K8s 集群

作者&#xff1a;尹珉。Sealos 开源社区 Ambassador&#xff0c;云原生爱好者。 当容器化交付遇上离线环境 在当今快节奏的软件交付环境中&#xff0c;容器化交付已经成为许多企业选择的首选技术手段。在可以访问公网的环境下&#xff0c;容器化交付不仅能够提高软件开发和交付…

【ccf-csp题解】第0次csp认证-第四题-有趣的数-题解

题目描述 思路说明 本题涉及组合数学的知识 目的是在n个空位上放置0、1、2、3&#xff0c;问符合题意的放法有多少种 首先注意到一个重要的事实&#xff1a; 只要0和1的位置已经确定&#xff0c;那么2和3的摆放就十分容易了 那么把所有情况分为n-2种&#xff1a; 第一种…

uniapp开发 众筹平台源码 高仿水滴筹平台源码 大病救助平台源码 可二开

uniapp开发 众筹平台源码 高仿水滴筹平台源码 大病救助平台源码 可二开

Calico IP In IP模拟组网

Calico IP In IP模拟组网 网络架构 模拟组网 先在k8s-master-1节点执行如下命令&#xff1a; # 创建veth-pair设备对ip link add veth1 type veth peer name eth0# 创建ns1网络命名空间ip netns add ns1# 将eth0网卡插入ns1网络命名空间ip link set eth0 netns ns1# 为ns1网…

GUIslice Builder 安装及使用

GUIslice Builder是一个可视化UI设计工具&#xff0c;可以简化GUIslice的UI设计流程。下面是GUIslice Builder的安装和使用步骤&#xff1a; 首先&#xff0c;下载GUIslice Builder并解压缩文件。 然后&#xff0c;进入解压后的文件夹&#xff0c;并运行GUIsliceBuilder.exe。…

PCL入门(一):ubuntu20使用apt安装pcl

目录 0. 背景1. apt安装的版本2. 更新apt源3. apt安装命令4. 测试 0. 背景 使用源码安装pcl较为麻烦&#xff0c;因为存在依赖库vtk&#xff0c;flann&#xff0c;boost&#xff0c;eigen等&#xff0c;都不太好安装&#xff0c;因此采用apt方式安装。 下面内容主要参考博客《…

深入浅出Android同步屏障机制

原文链接 Android Sync Barrier机制 诡异的假死问题 前段时间&#xff0c;项目上遇到了一个假死问题&#xff0c;随机出现&#xff0c;无固定复现规律&#xff0c;大量频繁随机操作后&#xff0c;便会出现假死&#xff0c;整个应用无法操作&#xff0c;不会响应事件&#xff…

第一章初识微服务

文章目录 认识微服务单体架构分布式架构需要考虑的问题 微服务微服务的具体架构微服务技术对比企业中的技术需求 总结 服务拆分注意事项 认识微服务 随着互联网行业的发展&#xff0c;对服务的要求也越来越高&#xff0c;服务架构也从单体架构逐渐演变为现在流行的微服务架构。…

优化Docker权限管理:配置Docker用户组

Docker 利用 Linux 的用户和组权限来管理对 Docker 守护进程的访问权限。一般情况下&#xff0c;只有 root 用户和属于 docker 用户组的用户才被允许访问 Docker 守护进程。在 Linux 系统上使用 Docker 时&#xff0c;如果您尚未配置 docker 用户组&#xff0c;那么作为非 root…

Apache Linkis 与 OceanBase 集成:实现数据分析速度提升

导语&#xff1a;恭喜 OceanBase 生态全景图中又添一员&#xff0c;Apache Linkis 构建了一个计算中间件层&#xff0c;以促进上层应用程序和底层数据引擎之间的连接、治理和编排。 近日&#xff0c;计算中间件 Apache Linkis 在其新版本中通过数据源功能&#xff0c;支持用户通…

vue2 vue3 组件传值的方式

文章目录 组件间传值的方法总结什么是单向数据流父组件给子组件传值方式1: propsoptions API写法default默认值 composition API | defineProps编译宏props类型声明的默认值 | widthDefaults编译宏 方法2&#xff1a;组件身上的属性与事件vue2 $attrs $listenersvue3 useAttrs…