Flink(一)【WordCount 快速入门】

news2024/11/26 4:40:27

前言

        学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

        最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

        这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
        就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

        我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

API 环境搭建

添加依赖

pom.xml

<properties>
 <flink.version>1.13.0</flink.version>
 <java.version>1.8</java.version>
 <scala.binary.version>2.12</scala.binary.version>
 <slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖-->
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-api</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>${slf4j.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-to-slf4j</artifactId>
 <version>2.14.0</version>
</dependency>
</dependencies>

log4j.properties 

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 入门案例

0、数据准备

在 根目录下创建 words.txt

hello flink
hello java
hello spark
hello hadoop

1、批处理

批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个执行批式数据处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件中读取数据 String类型  批式数据处理环境得到的 DataSource 继承自 DataSet
        DataSource<String> lineDS = env.readTextFile("input/words.txt");

        // 3. 将每行数据转换成一个二元组类型
        // 输入类型: String 输出类型: Tuple2
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =
                // String lines: 输入数据行  Collector<Tuple2<String,Long>> out: 输出类型
                lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));  //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型

        // 4. 根据 word 分组
        UnsortedGrouping<Tuple2<String, Long>> wordGroup = wordAndOne.groupBy(0);   // 0 是索引位置

        // 5. 分组内进行聚合
        AggregateOperator<Tuple2<String, Long>> res = wordGroup.sum(1); // 1 也是索引位置

        // 6. 打印结果
        res.print();

    }
}

运行结果:

(hadoop,1)
(flink,1)
(hello,4)
(java,1)
(spark,1)

Process finished with exit code 0

因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2、流处理

2.1、有界数据流处理

这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        DataStreamSource<String> lineDS = env.readTextFile("input/words.txt");

        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);

        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);

        // 6. 输出结果
        res.print();

        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果:

3> (java,1)
13> (flink,1)
1> (spark,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
5> (hello,4)
15> (hadoop,1)

        我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

        Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

        我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

2.2、无界数据流处理

这里我们使用 netcat 来模拟产生数据流

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class UnBoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");
        DataStreamSource<String> lineDS = env.socketTextStream(host,port);

        // 3. flatMap 打散数据 返回元组
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        
        // 4. 根据 word 分组
        KeyedStream<Tuple2<String, Long>, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);

        // 5. 根据键对索引为 1 处的值进行合并
        SingleOutputStreamOperator<Tuple2<String, Long>> res = wordGroupByKey.sum(1);

        // 6. 输出结果
        res.print();

        // 7. 执行
        env.execute();  // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    }
}

运行结果: 

        可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

总结

        Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

        还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1177843.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言100~200中不能整除3的数

#define _CRT_SECURE_NO_WARNINGS 1#include <stdio.h> int main() {int n;for (n 100; n < 200; n){if (n%3 0){continue;}printf("%d\n",n);}}

RFC使用与WebService

RFC连接 CSDN RFC中引用类型组 http://t.csdnimg.cn/wQWAYhttp://t.csdnimg.cn/wQWAY 远程目标系统维护SM59 这里的类型指的是目标系统的系统类型(目标系统即rfc函数存在的系统). 类型2&#xff08;R/2连接&#xff09;&#xff0c;只需给出主机名&#xff0c;所有通信信息…

软件系统设计方法和工具介绍

软件系统设计方法和工具介绍 在构建系统时&#xff0c;尤其是一些大项目实施的过程中&#xff0c;可以接触和学习一些高阶层面分析问题和系统架构的方法论&#xff0c; 如麦肯锡的解决问题7步法&#xff1a;定义问题、分解问题、排定优先级、制定工作计划、分析问题、综合分析…

比SAM小60倍的分割一切模型:MobileSAM

1 MobileSAM SAM就是一类处理图像分割任务的通用模型。与以往只能处理某种特定类型图片的图像分割模型不同&#xff0c;SAM可以处理所有类型的图像。 在SAM出现前&#xff0c;基本上所有的图像分割模型都是专有模型。比如&#xff0c;在医学领域&#xff0c;有专门分割核磁图…

Python批量下载ERA5数据

1. ERA5数据简介 ERA5是第五代ECMWF大气再分析全球气候数据(ECMWF)&#xff0c;该数据集的第一部分现在可以公开使用(1979年到3个月内)。ERA5数据提供每小时的大气、陆地和海洋气候变量的估计值&#xff0c;地球数据精确到了30km网格&#xff0c;包括了137层的大气数据。 网址…

【教3妹学编程-算法题】最大单词长度乘积

3妹&#xff1a;哇&#xff0c;今天好冷啊&#xff0c; 不想上班。 2哥&#xff1a;今天气温比昨天低8度&#xff0c;3妹要空厚一点啊。 3妹 : 嗯&#xff0c; 赶紧把我的羽绒服找出来穿上&#xff01; 2哥&#xff1a;哈哈&#xff0c;那倒还不至于&#xff0c; 不过气温骤降&…

使用Anaconda安装TensorFlow环境以及没有搜到的报错的解决方法

1.在官网下载Anaconda 这一步几乎不会有人报错 下稳定的版本 或者最新的版本都可以 2.TensorFlow分两个版本 一个是用cpu跑 另一个是用gpu跑 显而易见 cpu的计算性能已经比不上现在主流的显卡了 所以有独显的电脑尽量安装gpu版本 CPU版本: 先给出cpu版本的安装方法: 打开A…

体坛巨星商业价值完美呈现,B体育等超巨品牌堪称经典案例

近几年&#xff0c;伴随着互联网的发展&#xff0c;我们惊喜的发现体坛巨星的商业代言越来越多&#xff0c;他们代言的广告已经融入到我们的生活之中&#xff0c;陪伴很多人度过了美妙的时刻。越来越多的品牌也意识到&#xff0c;比起娱乐明星&#xff0c;体坛巨星的全球属性对…

EMS员工管理系统 python

python基础练习&#xff0c;简单的增删改查&#xff0c;涉及python基础语法&#xff0c;逻辑、分支结构以及一些基础数据格式的操作&#xff0c;文件操作&#xff0c;思路理解等等 部分代码如下 print(""*20,"欢迎使用员工管理系统",""*20)# em…

浏览器自动播放音视频-前端实现方案

目录 前言 浏览器自动播放策略 策略详情&#xff1a; 实现方案 方案1&#xff1a; 互动后播放 方案2&#xff1a; 互动后出声 总结 前言 在开发中可能有遇到这样的需求&#xff0c;当用户打开页面后&#xff0c;需要自动播放视频或音频&#xff0c;按理说那就打开页面…

Vue2和Vue3生命周期映射关系及异同

生命周期映射关系表 beforeCreate -> 使用 setup() created -> 使用 use setup() beforeMount ->onBeforeMount mounted -> onMounted beforeUpdate -> onBeforeUpdate updated -> onUpdated beforeDestroy-> onBeforeUnmount destroyed ->onUnmounted…

800*A. Domino piling(规律数学)

Problem - 50A - Codeforces #include<bits/stdc.h> using namespace std; const int N1e55; int n,m,res; signed main(){scanf("%d%d",&n,&m);if(n>1){resn/2*m;n%2; }if(n1) resm/2;cout<<res;return 0; }

WPS表格无法粘贴信息,原因是复制区域与粘贴区域形状不同

WPS表格无法粘贴信息&#xff0c;原因是复制区域与粘贴区域形状不同 问题描述 我是选中了一整列&#xff0c;复制&#xff0c;但是无法粘贴到另一个EXCEL表格中 原因 首先我的数据量很大&#xff0c;有20万行&#xff0c;然后需要复制的EXCEL是.xls格式的&#xff0c;.xls格…

缓解缓存击穿的大杀器之---singleflight深入浅出

singleflight简单介绍 singlefight直译“单飞”&#xff0c;那顾名思义就是有一堆鸟&#xff0c;但是咱只让一只鸟单飞。。。&#x1f604; singleflight 提供了重复函数调用抑制机制&#xff0c;使用它可以避免同时进行相同的函数调用。第一个调用未完成时后续的重复调用会等…

20231106_抽象类abstract

抽象类abstract 关键字 abstract运用抽象类抽象方法:修饰抽象类中的某个方法,强制子类重写该方法 归纳 关键字 abstract 对于子类必须要实现特定方法,当时父类无法明确时,可定义为抽象类及抽象方法 不合理: 动物吃东西是基础,在这里写吃的方法过于简单,信息没有实际意义; 怎…

FSDiffReg:心脏图像的特征和分数扩散引导无监督形变图像配准

论文标题&#xff1a; FSDiffReg: Feature-wise and Score-wise Diffusion-guided Unsupervised Deformable Image Registration for Cardiac Images 翻译&#xff1a; FSDiffReg&#xff1a;心脏图像的特征和分数扩散引导无监督形变图像配准 摘要 无监督可变形图像配准是医学…

人工智能:技术进步与未来趋势

人工智能&#xff1a;技术进步与未来趋势 随着科技的快速发展&#xff0c;人工智能(AI)已经深入影响到我们生活的方方面面。从智能手机、自动驾驶汽车&#xff0c;到医疗诊断、工业自动化&#xff0c;AI的应用越来越广泛。这篇文章将探讨人工智能的技术发展、现状以及未来趋势。…

Webpack 中 Plugin 的作用是什么?常用 plugin 有哪些?

说说webpack中常见的Plugin&#xff1f;解决了什么问题&#xff1f;- 题目详情 - 前端面试题宝典 1、plugin 的作用 Plugin 是一种计算机应用程序&#xff0c;它和主应用程序互相交互&#xff0c;以提供特定的功能。 是一种遵循一定规范的应用程序接口编写出来的程序&#…

制作甘特图

教程秒懂百科​​​​​​

一文学会Scala【Scala一站式学习笔记】

文章目录 为什么要学习Scala语言什么是Scala如何快速掌握Scala语言Scala环境安装配置Scala命令行 Scala的基本使用变量数据类型操作符if 表达式语句终结符循环高级for循环 Scala的集合体系集合SetListMapArrayArrayBuffer数组常见操作Tuple总结 Scala中函数的使用函数的定义函数…