Flink day01

news2024/11/15 18:32:00

Flink简介
Flink起源于Stratosphere项目,Stratosphere是在2010~2014年由3所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014年4月Stratosphere的代码被复制并捐赠给了Apache软件基金会,参加这个孵化项目的初始成员是Stratosphere系统的核心开发人员,2014年12月,Flink一跃成为Apache软件基金会的顶级项目。
在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧的特点,还因为柏林的松鼠有一种迷人的红棕色,而Flink的松鼠logo拥有可爱的尾巴,尾巴的颜色与Apache软件基金会的logo颜色相呼应,也就是说,这是一只Apache风格的松鼠。

在这里插入图片描述

Flink项目的理念是:“Apache Flink是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

在这里插入图片描述
Flink的重要特点
事件驱动型(Event-driven)
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以kafka为代表的消息队列几乎都是事件驱动型应用。
与之不同的就是SparkStreaming微批次,如图:
在这里插入图片描述
事件驱动型:
在这里插入图片描述
流与批的世界观
批处理的特点是有界、持久、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。
流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。
而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。

无界数据流:无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。
有界数据流:有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

在这里插入图片描述
分层api

在这里插入图片描述
最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
实际上,大多数应用并不需要上述的底层抽象,而是针对核心API(Core APIs) 进行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些API处理的数据类型以类(classes)的形式由各自的编程语言所表示。
Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何。
尽管Table API可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心API更具表达能力,但是使用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会经过内置优化器进行优化。
你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。
Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

目前Flink作为批处理还不是主流,不如Spark成熟,所以DataSet使用的并不是很多。Flink Table API和Flink SQL也并不完善,大多都由各大厂商自己定制。所以我们主要学习DataStream API的使用。实际上Flink作为最接近Google DataFlow模型的实现,是流批统一的观点,所以基本上使用DataStream就可以了。

Flink几大模块
Flink Table & SQL(还没开发完)
Flink Gelly(图计算)
Flink CEP(复杂事件处理)

搭建maven工程

批处理代码

package com.cn.wordcount

import org.apache.flink.api.scala._

object WordCount {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val inputPath="D:\\IdeaProjects\\flinkday01\\src\\main\\resources\\data.txt"
    val inputDS = env.readTextFile(inputPath)
//    inputDS.print()
    // 分词之后,对单词进行groupby分组,然后用sum进行聚合
    val result= inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    result.print()

  }

}

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinkday01</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(and,1)
(are,1)
(scala,1)
(thank,1)
(you,3)
(fine,1)
(flink,1)
(world,1)
(hello,3)
(how,1)

Process finished with exit code 0

报错回顾

Cannot find compatible factory for specified execution.target (=local)

解决
修改1.10.0 的版本为1.10.1 成功

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.1</version>

实时流处理代码

package com.atguigu.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/**
  * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved 
  *
  * Project: FlinkTutorial
  * Package: com.atguigu.wc
  * Version: 1.0
  *
  * Created by wushengran on 2020/5/23 11:47
  */

// 流处理 word count,DataStream API
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流处理执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(8)
//    env.disableOperatorChaining()

    // 从命令参数中读取hostname和port
    val paramTool: ParameterTool = ParameterTool.fromArgs(args)
//    val hostname: String = paramTool.get("host")
//    val port: Int = paramTool.getInt("port")
    val hostname= "192.168.253.129"
    val port= 9999

    // 从socket文本流读取数据
//    val inputDataStream: DataStream[String] = env.socketTextStream("192.168.253.129", 9999)
    val inputDataStream: DataStream[String] = env.socketTextStream(hostname, port)

    // 对DataStream进行转换操作,得到word count结果
    val resultDataStream: DataStream[(String, Int)] = inputDataStream
      .flatMap(_.split(" ")).startNewChain()
      .map( (_, 1))
      .keyBy(_._1)
      .sum(1)

    // 打印输出
    resultDataStream.print().setParallelism(1)

    // 启动job
    env.execute("stream word count job")
  }
}

先启动Linux,输入netcat命令[nc -lk 9999] 9999是端口,然后启动程序,Linux输入内容即可

[root@hadoop01 ~]# nc -lk  9999
55 99 00

输出结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(00,1)
(99,1)
(55,1)

Flink部署
Standalone模式
安装

安装
解压缩 flink-1.10.0-bin-scala_2.11.tgz,进入conf目录中。
1)修改 flink/conf/flink-conf.yaml 文件:

jobmanager.rpc.address: hadoop01

修改 /conf/slaves文件:

hadoop02
hadoop03

将flnk目录发给另外两台节点

[root@hadoop01 software]# scp -r  flink root@hadoop02:/home/software/
[root@hadoop01 software]# scp -r  flink root@hadoop03:/home/software/

启动

[root@hadoop01 bin]# pwd
/home/software/flink/bin
[root@hadoop01 bin]# ./start-cluster.sh 

Starting cluster.
Starting standalonesession daemon on host hadoop01.
Starting taskexecutor daemon on host hadoop02.
Starting taskexecutor daemon on host hadoop03.

查看进程

[root@hadoop01 bin]# jps
2710 StandaloneSessionClusterEntrypoint
2781 Jps

[root@hadoop02 software]# jps
1921 Jps
1842 TaskManagerRunner

[root@hadoop03 software]# jps
2049 TaskManagerRunner
2100 Jps

访问http://hadoop01:8081可以对flink集群和任务进行监控管理。

在这里插入图片描述

提交任务

  1. 准备数据文件
    发送data.txt 到Linux服务器
    发送jar包到Linux服务器
    在bin目录下编写启动命令

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/193118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter项目实战

1.Androidstudio 获取dart支持才会出现 下图&#xff0c;才可以单独运行 2.需要 flutter pub get 3.Android Studio 中使用 FlutterJsonBeanFactory 插件 注意点&#xff1a;需要保证该 Android Studio 窗口下是一个完整的Flutter项目&#xff08;窗口下有且仅有一个Flutter项目…

录音软件哪个好用?推荐3款亲测好用的录音软件

很多小伙伴不知道电脑如何录制声音&#xff1f;其实电脑录制声音很简单&#xff0c;借助一款好用录音软件&#xff0c;就可以轻松录制。那你知道录音软件哪个好用吗&#xff1f;小编平常的工作经常需要使用到录音软件。今天就给大家推荐3款亲测好用的录音软件&#xff0c;感兴趣…

k8s核心资源Service

1、介绍为一组pod&#xff08;一次部署&#xff09;统一暴露一个ip和端口&#xff0c;供外界访问。2、集群内对外统一暴露kubectl expose deploy <部署名> --port<对外暴露端口> --target-port<容器内部端口>kubectl expose deploy mynginx --port8000 --tar…

torch_geometric--Convolutional Layers

torch_geometric–Convolutional Layers 卷积层 MessagePassing 式中口口口表示可微的排列不变函数&#xff0c;例如: sum, mean, min, max 或者 mul 和 γΘ\gamma_{\Theta}γΘ​和ϕΘ\phi_{\Theta}ϕΘ​表示可微函数&#xff0c;例如 MLPs. 请参见这里的附带教程。 参数…

JDBC-BasicDAO

引入 之前的sql语句是固定的只能通过参数传入&#xff0c;要写那么多方法肯定不好弄 这就引出了DAO 这里是整个流程示意图 最下面开始说 就是mysql库的每一张表对应一个JavaBean 右边是我们获取和关闭连接的工具类 上面XXXDAO的意思是 每一个表&#xff0c;都有一个与之对应的D…

高压放大器在镓基液态金属微型马达驱动实验研究中的应用

实验名称&#xff1a;高压放大器在镓基液态金属微型马达驱动实验研究中的应用 研究方向&#xff1a;新型材料 测试目的&#xff1a; 微/纳马达虽然是一种以实际应用为基础的动力装置&#xff0c;但其在科学研究方面的价值也尤为重要。在微/纳米尺度下&#xff0c;它可以接受能量…

【测试如何学代码?】学习代码的最佳实践经验分享

为什么要写这篇&#xff1f; 经常在群里看到大家问&#xff1a;该选择哪门语言&#xff1f;哪门语言有钱途&#xff1f; 其实&#xff0c;不管哪门语言&#xff0c;只要深入学好了都不会差&#xff0c;当然&#xff0c;我们选择语言最好还是要和自己的技术方向及职业发展相匹配…

基于php的高校社团信息管理系统

摘 要社团是由高校用户依据兴趣爱好自愿组成&#xff0c;按照章程自主开展活动的用户组织。高校社团是实施素质教育的重要途径和有效方式&#xff0c;在加强校园文化建设、提高用户综合素质、引导用户适应社会、促进用户交流等方面发挥着重要作用&#xff0c;是新形势下有效凝聚…

Android studio:Could not find method compile() for arguments 问题解决及两种解决方法探讨延伸

Could not find method compile() for arguments 问题全称 Could not find method compile() for arguments [org.tensorflow:tensorflow-lite:] on object of type org.gradle.api.internal.artifacts.dsl.dependencies.DefaultDependencyHandler. 如图 解决方法1(简单) …

[数据结构基础]排序算法第四弹 -- 归并排序和计数排序

目录 一. 归并排序 1.1 归并排序的实现思想 1.2 归并排序的递归实现 1.2.1 归并排序递归实现的思想 1.2.2 归并排序递归实现的代码 1.3 归并排序的非递归实现 1.3.1 归并排序非递归实现的思想 1.3.2 归并排序非递归实现的代码 1.4 归并排序的时间复杂度分析 二. 计数排…

c++之模板【进阶版】

前言 对于泛型编程&#xff0c;学好模板这节内容是非常有必要的。在前面学习的STL中&#xff0c;由于模板的可重性和扩展性&#xff0c;几乎所有的代码都采用了模板类和模板函数的方式&#xff0c;这相比于传统的由函数和类组成的库来说提供了更好的代码重用机会。 模板初阶 …

Hugging face教程-使用速查表-快速入门

Hugging face笔记 course url&#xff1a;https://huggingface.co/course/chapter5/8?fwpt 函数详细情况&#xff1a;https://huggingface.co/docs/transformers/main_classes/pipelines#transformers.TokenClassificationPipeline 基础掌握transformers和datasets&#xf…

软件测试 利器 | AppCrawler 自动遍历测试工具实践(一)

本文为霍格沃兹测试学院学院学员课程学习笔记&#xff0c;系统学习交流文末加群。 AppCrawler 是由霍格沃兹测试学院校长思寒开源的一个项目,通过名字我们大概也能猜出个方向&#xff0c;Crawler 是爬虫的意思&#xff0c;App 的爬虫&#xff0c;遍历 App &#xff1a; 官方 G…

linux性能优化-中断

一、概念 中断其实是一种异步的事件处理机制&#xff0c;可以提高系统的并发处理能力。Linux将中断处理过程分成了两个阶段&#xff1a;上半部和下半部 &#xff08;1&#xff09;上半部用来快速处理中断&#xff0c;它在中断禁止模式下运行&#xff0c;主要处理跟硬件紧密相关…

云计算是什么

&#x1f4d2;博客主页&#xff1a; 微笑的段嘉许博客主页 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐留言&#x1f4dd; &#x1f4cc;本文由微笑的段嘉许原创&#xff01; &#x1f4c6;51CTO首发时间&#xff1a;&#x1f334;2023年2月1日&#x1f334; ✉…

gcc 简介

一、gcc简介gcc与g&#xff0c;当程序中出现using namespace std等带有c特性的语句时&#xff0c;如果用gcc编译时&#xff0c;必须显式地指明这个程序要用c编译库编译&#xff0c;而g可以直接编译。二、gcc支持的文件.c&#xff0c;c语言的源程序.C, c的源程序.cc&#xff0c;…

数据结构——堆的介绍以及应用

前言&#xff1a;对于数据结构而言&#xff0c;大多存在着对应的物理结构和逻辑结构&#xff0c;而我们一开始介绍的顺序表&#xff0c;链表&#xff0c;栈&#xff0c;队列等的物理结构和逻辑结构还是比较类似的。今天要介绍的堆则有所不同&#xff0c;其物理结构是数组&#…

JS前端基于canvas给图片添加水印,并下载带有水印的图片

基于canvas给图片添加水印实现效果图图片添加水印的步骤1.获取图片路径&#xff0c;将图片转换为canvas2.canvas画布上绘制文字水印3.水印绘制完成后&#xff0c;将canvas转换为图片格式4.水印绘制完成后&#xff0c;将canvas下载为图片完整代码总结1、在utils.js 封装添加水印…

POE交换机全方位解读(中)

POE供电距离到底怎么算 只针对符合IEEE802.3af/at 标准PoE设备 ① 网线对供电距离的影响 首先我们先来看下表IEEE802.af和IEEE802.3at标准中对Cat5e网线要求&#xff1a; 说明&#xff1a;Type 1 value和Type 2 value 分别指IEEE802.3af和IEEE802.3at的要求。 从表中可以看出&a…

PCB电路板单面板和双面板的区别和共同点

PCB电路板可以分为单面板、双面板和多面板&#xff0c;我们常用的主要是单面板和双面板&#xff0c;那么单面板和双面板有哪些区别呢&#xff1f;在了解二者区别前&#xff0c;沐渥小编先给大家介绍一下什么是单面板和双面板。 单面板是指单面的线路板&#xff0c;元器件在一面…