【入门Flink】- 02Flink经典案例-WordCount

news2025/1/16 17:54:58

WordCount

需求:统计一段文字中,每个单词出现的频次

添加依赖

	<properties>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

1.批处理

基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

1.1.数据准备

resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代码编写

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
        String filePath = Objects.requireNonNull(
               BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataSource<String> lineDS = env.readTextFile(filePath);

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
                new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

打印结果如下:(结果正确)

image-20231031193224024

上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流处理

DataStreamAPI可以直接处理批处理和流处理的所有场景

2.1读取文件

还是上述words.txt文件

代码实现:

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        String filePath = Objects.requireNonNull(
                StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataStreamSource<String> lineStream = env.readTextFile(filePath);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}

与批处理程序BatchWordCount有几点不同:

  • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
  • 最后执行execute方法,开始执行任务。

2.2读取Socket文件流

实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

  1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
public class StreamSocketWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取文件
        DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 执行
        env.execute();
    }
}
  1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
nc -lk 7777

注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

  1. 从Linux发送数据

1、输入“hello flink”,输出如下内容

image-20231031201232801

2、再输入“hello world”,输出如下内容

image-20231031201316467

Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2<String, Long>。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1166362.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

enum和Collection.stream()你这样用过么

最近在做一个数据图表展示的功能&#xff0c;显示订单近七天或者近半月的数量和金额。可以理解成下图所示的样子&#xff1a; 我是用枚举和集合的stream方法实现的数据初始化和组装&#xff0c;枚举用来动态初始化时间范围&#xff0c;集合的stream方法来将初始化的数据转换成…

《自制编程语言基于c语言》读书笔记

前言&#xff1a; 很久之前&#xff0c;我在双十一的时候入手了一本《自制编程语言基于c语言》。这本书是写《操作系统真象还原》的作者。我当时看他的关于操作系统的这本书&#xff0c;非常不错&#xff0c;就连着这本书一起入了。但是后面&#xff0c;因为各种事情&#xff…

龙芯浏览器是哪家公司开发的?支持信创吗?

最近看到不少小伙伴在问&#xff0c;龙芯浏览器是哪家公司开发的&#xff1f;支持信创吗&#xff1f;这里我们小编就跟大家一起来看看&#xff0c;仅供参考哈&#xff01; 龙芯浏览器是哪家公司开发的&#xff1f; 龙芯浏览器是由龙芯中科牵头&#xff0c;基于主流的渲染引擎G…

学习笔记|多独立样本秩和检验|克鲁斯卡尔-沃利斯检验|多个组间的多重比较|规范表达|《小白爱上SPSS》课程:SPSS第十四讲 | 多独立样本秩和检验如何做?

目录 学习目的软件版本原始文档多独立样本秩和检验一、实战案例读数据&#xff1a; 二、统计策略三、SPSS操作1、正态性检验2、多个独立样本的秩和检验3、多个组间的多重比较 四、结果解读第一&#xff0c;描述性统计结果。 第二 &#xff0c;给出的是不同训练年限各自的样本量…

了解数据库设计,轻轻松松提高工作效率

每个应用程序&#xff0c;无论大小&#xff0c;最终都需要一个数据库来持久保存所有重要数据。对此没有任何争论&#xff01; 什么是数据库设计&#xff1f; 数据库设计是帮助创建、实施和维护企业数据管理系统的一系列步骤的集合。设计数据库的主要目的是为所建议的数据库系统…

拓世法宝 | 数字经济崛起,美业如何抓住流量风口?

爱美之心&#xff0c;人皆有之。无论男女&#xff0c;都会很自然地对美好事物燃起兴致&#xff0c;跟高颜值相关的事物总能聚集注意力。例如直播平台里的美女网红收割流量赚得盆满钵满&#xff0c;面庞俊俏的年轻偶像吸引万千粉丝&#xff0c;还有“央视最美记者”王冰冰、“最…

R语言爬虫代码模版:技术原理与实践应用

目录 一、爬虫技术原理 二、R语言爬虫代码模板 三、实践应用与拓展 四、注意事项 总结 随着互联网的发展&#xff0c;网络爬虫已经成为获取网络数据的重要手段。R语言作为一门强大的数据分析工具&#xff0c;结合爬虫技术&#xff0c;可以让我们轻松地获取并分析网络数据。…

驱动大健康行业数字化转型升级,开利网络与艾博盾达成战略合作

在当今这个数字化飞速发展的时代&#xff0c;各行各业都在寻求数字化转型的创新突破口&#xff0c;以适应日新月异的变化。近日&#xff0c;开利网络科技与广东艾博盾举行战略合作签约仪式&#xff0c;期望基于开利网络在数字化营销领域多年沉淀的经验&#xff0c;为企业数字化…

【hcie-cloud】【1】华为云Stack解决方案介绍、华为文档获取方式 【上】

文章目录 华为文档获取方式前言云计算发展背景国家政策、社会发展驱动数字经济开启新时代深化数字化转型提升效率&#xff0c;国家数字主权云进入落地阶段从Cloud-Based到Cloud-Native&#xff0c;两种模式长期并存适合政企智能升级的云华为云Stack&#xff0c;政企智能升级首选…

Spring Security使用总结一,简单的引用Spring Security,坐着火箭就入门了

最近因为一些不能言语的原因&#xff0c;研究了一下Spring Security。因为感觉在使用上来说难度不大&#xff0c;所以把我研究的使用过程总结一下&#xff0c;放上来供大家学习研究思考进步的。我的思路大致就是&#xff0c;一个东西拿过来&#xff0c;先研究怎么使用&#xff…

数据库进阶教学——事务

目录 一、事务简介 二、事务操作方式 1、设置事务提交方式 1.1、命令 1.2、示例 2、开启事务 2.1、命令 2.2、示例 三、事务四大特性 四、并发事务问题 五、事务隔离级别 5.1、命令 5.2、示例 5.2.1、脏读 5.2.2、不可重复读 5.2.3、幻读 一、事务简介 事务是…

亚马逊、美客多卖家测评:如何建立养号团队实现运营化式测评?

大家好&#xff0c;我是跨境电商测评养号7年从事经验的珑哥。养号环境软件开发&#xff0c;深度解决各跨境平台矩阵养号防关联、砍单、F号问题。关注珑哥解决更多跨境养号测评问题。 测评&#xff0c;相信这个词对于大部分跨境卖家来说&#xff0c;想必都不陌生&#xff0c;因…

voronoi diagram(泰森多边形) 应用 - Good Manners

欢迎关注更多精彩 关注我&#xff0c;学习常用算法与数据结构&#xff0c;一题多解&#xff0c;降维打击。 voronoi 图求解点击前往 题目链接&#xff1a;https://vjudge.net/problem/URAL-1504 题目大意 有一个桌子&#xff0c;形状是圆形。 桌上放着很多蛋糕&#xff0c…

Python模块psutil:系统进程管理与Selenium效率提升的完美结合

前言 在前面编写一个Selenium的自动化程序时候&#xff0c;发现一个问题。 因笔记本配置较为差&#xff0c;所以每次初始化Selenium的WebDriver都会非常慢&#xff0c;整个等待过程是不友好的。 所以我就想到&#xff1a; 在程序中初始化一个全局的WebDriver对象&#xff0c…

Git Rebase 优化项目历史

在软件开发过程中&#xff0c;版本控制是必不可少的一环。Git作为当前最流行的版本控制系统&#xff0c;为开发者提供了强大的工具来管理和维护代码历史。git rebase是其中一个高级特性&#xff0c;它可以用来重新整理提交历史&#xff0c;使之更加清晰和线性。本文将详细介绍g…

1060 爱丁顿数

一.问题&#xff1a; 英国天文学家爱丁顿很喜欢骑车。据说他为了炫耀自己的骑车功力&#xff0c;还定义了一个“爱丁顿数” E &#xff0c;即满足有 E 天骑车超过 E 英里的最大整数 E。据说爱丁顿自己的 E 等于87。 现给定某人 N 天的骑车距离&#xff0c;请你算出对应的爱丁…

【计算机网络笔记】传输层——TCP特点与段结构

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

Springboot JSP项目如何以war、jar方式运行

文章目录 一&#xff0c;序二&#xff0c;样例代码1&#xff0c;代码结构2&#xff0c;完整代码备份 三&#xff0c;准备工作1. pom.xml 引入组件2. application.yml 指定jsp配置 四&#xff0c;war方式运行1. 修改pom.xml文件2. mvn执行打包 五&#xff0c;jar方式运行1. 修改…

深入了解汽车级功率MOSFET NVMFS2D3P04M8LT1G P沟道数据表

汽车级功率MOSFET是一种专门用于汽车电子领域的功率MOSFET。它具有高电压、高电流、高温、高可靠性等特点&#xff0c;能够满足汽车电子领域对功率器件的严格要求。汽车级功率MOSFET广泛应用于汽车电机驱动、泵电机控制、车身控制等方面&#xff0c;能够提高汽车电子系统的效率…

sqlserver字符串拼接

本文主要介绍了sqlserver字符串拼接的实现&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值。 1. 概 在SQL语句中经常需要进行字符串拼接&#xff0c;以sqlserver&#xff0c;oracle&#xff0c;mysql三种数据库为例&#…