使用Flink的各种技术实现WordCount逻辑

news2024/9/28 9:24:44

使用Flink的各种技术实现WordCount逻辑

在大数据程序中,WordCount程序实现了统计词频的作用,这个WordCount程序也往往在大数据分析处理中一直占着非常重要的地位。统计一天内某网站的访问次数,需要对网站排序后求其词频,统计一段时间内某个用户的登陆次数,也是对网站用户分组后的词频计算.....等等,很多的大数据应用示例都是在WordCount的基础之上进行改良发展,最终实现大数据分析的关键逻辑。

对于wordcount程序来说,基本思想在于输入文件中有每行的英文单词组成的句子,通过行处理的思想,将每个句子的英文单词分割出来,以(单词,1)这种key-value的形式来计数,再通过分组排序,将相同的单词放在一组,最后对每组的单词进行汇总统计。其思想流程图如下图所示。

对于wordcount程序的处理框架,Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink

Source主要负责数据的读取
Transformation主要负责对数据的转换操作
Sink负责最终计算好的结果数据输出。

Source读取词频文件的数据,通过Transformation的转换操作将文件中的英文句子切分单词,并分组统计,最终可以Sink到控制台中输出结果。flink的思想结构如下图。

根据这种思想结构,结合不同的Transformation转换函数,常用算子如:

map算子对一个DataStream中的每个元素使用用户自定义的map函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream。输入每一个句子,就可以通过map后的函数split空格,然后返回形如(单词,1)的新数据流DataStream。

FlatMap只要处理处理一个输入元素,通过后面的函数可以实现输出一个或者多个输出元素的时候,尤其表现在输出一个元素,如wordcount中输出形如(单词,1)的这种元组类型的元素。就可以用到flatMap()。

reduce方法可以对分组后的元素进行统计处理。

当然, wordcount 也可以结合到不同的情况中。如滑动窗口内的wordcount,就需要结合SlidingWindow。

下面就结合不同情况下使用Flink实现wordCount的词频计算。

一、第一种情况:读取words.txt文件通过flink进行流式分析。

这里需要用到DataStream的Source源DataStreamSource.步骤如下。

1、打开Intellij IDEA,然后点击FIle-->New-->project。

2、打开Project对话框后,左边点击maven,右边不需要点击,只要确认jdk的版本,然后Next进入下一步。

3、在弹出的对话框中,输入groupId和artifactId,然后继续点击Next进入下一步。

4、最后Finish结束配置。

5、在pom.xml中设置dependency的依赖包。

6、建立flink的streaming流式wordcount程序。

注意在敲程序时,设定lambda表达式,方式如下

File -----> Project Structure...

在弹出的对话框中,左边选择Modules,右边选择Lambda表达式8,如下图所示。

最终代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;
public class MyWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds=env.readTextFile("f://data//words.txt");
        SingleOutputStreamOperator<Tuple2<String,Long>> wordstream=lineds.flatMap
                ((String line, Collector<Tuple2<String,Long>> out)->{
            String[] words=line.split(" ");
            for(String word:words){
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        KeyedStream<Tuple2<String,Long>,String> groupds=wordstream.keyBy(ds->ds.f0);
        groupds.sum(1).print();
        env.execute();
    }
}

程序中读取硬盘中的words.txt文件,运行后的输出结果如下图。

前面就是线程号,后面进行wordcount词频的统计。

二、第二种情况,通过flink对words.txt完成批处理的分析。

这里需要使用Data的数据源Source。其它步骤与前面的DataStreamSource数据源的项目创建步骤一致,这里直接上代码。

程序如下。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class YouWordCount {
    public static void main(String[] args) throws Exception{
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> lineds=env.readTextFile("f://data//words.txt");
        FlatMapOperator<String,Tuple2<String,Long>> wordds=lineds.flatMap((String line,Collector<Tuple2<String,Long>> out)->{
            String[] words=line.split(" ");
            for(String word:words){
                out.collect(Tuple2.of(word,1L));
            }
        });
        UnsortedGrouping<Tuple2<String,Long>> groupds=wordds.groupBy(0);
        AggregateOperator<Tuple2<String,Long>> result=groupds.sum(1);
        result.print();
    }
}

三、第三种情况:Flink收集流数据的word来计算词频

Wordcount不但可以收集硬盘上的文件,还可以收集linux中的实时流。使用linux系统的 nc 指令提供实时流,命令如下:

nc -l 9000

后面的9000是流传输的端口号,用flink来监控linux中的实时流。

代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class HeWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
        lineds.flatMap((String line,Collector<Tuple2<String,Long>> out)->{
            String[] words=line.split(" ");
            for(String word:words){
                out.collect(Tuple2.of(word,1L));
            }
        }).keyBy(ds->ds.f0).sum(1).print();
        env.execute();
    }
}

四、Flink对实时流的Wordcount使用滚动窗口

这里使用Flink程序对实时流可以使用滚动窗口计算5秒内的英文词频。

谈到滚动窗口,不免需要说到Flink中的窗口。

Flink按照时间生成Window,可以根据窗口实现原理的不同分成三类。

1、滚动窗口(Tumbling Window)

滚动窗口是将数据依据固定的窗口⻓度对数据进行切片。

滚动窗口的特点是:时间对⻬、窗口⻓度固定,并且没有重叠。

滚动窗口中有分配器,分配器会将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

其原理图如下。

2、滑动窗口(Sliding Window)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口⻓度和滑动间隔组成。

滑动窗口的特点是:时间对⻬、窗口⻓度固定,并且有重叠。

滑动窗口中也有分配器,这个分配器将元素分配到固定⻓度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率,因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

其原理图如下图。

3、会话窗口(Session Window)

会话窗口是由一系列事件组合一个指定时间⻓度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

会话窗口的特点是时间无对⻬。

会话也叫session,会话窗口中也有分配器,session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。

一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的⻓度。

当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

其原理图如下所示。

图中看出,时间是不对齐的。

这个案例中使用时间对齐不重叠的滚动窗口。

代码如下。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class HeWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
        lineds.flatMap((String line,Collector<Tuple2<String,Long>> out)->{
            String[] words=line.split(" ");
            for(String word:words){
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG))
                .keyBy(ds->ds.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();
        env.execute();
    }
}

启动linux虚拟机,在进行nc -l 9000,wordcount程序会统计实时的计算词频的数据流。

在linux中启动nc命令后输入内容如下图所示。

 

运行程序后的结果如下图所示。

 

五、使用flink SQL来实现5秒内的wordcount词频。

flink sql相当于使用了sql语句来实现5秒内的wordcount词频统计。

使用flink SQL需要使用flink-sql的依赖。

在程序实现上,使用flink-SQL实现5秒内的wordcount词频代码如下 。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.table.api.Table;
import static org.apache.flink.table.api.Expressions.$;
import org.apache.flink.api.common.typeinfo.Types;
public class SheWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env,settings);
        DataStreamSource<String> lineds=env.readTextFile("f://data//words.txt");
        SingleOutputStreamOperator<Tuple2<String,Long>> wordStream=lineds.flatMap
                ((String line,Collector<Tuple2<String,Long>> out)->{
                    String[] words=line.split(" ");
                    for(String word:words){
                        out.collect(Tuple2.of(word,1L));
                    }
                }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        Table table=tableEnv.fromDataStream(wordStream,$("word"),$("count"));
        Table result=table.groupBy($("word"))
                .select($("word"),$("count").sum());
        tableEnv.toRetractStream(result,Types.TUPLE(Types.STRING,Types.LONG)).print();
        env.execute();
    }
}

至此,使用Flink在各种情况下计算wordcount词频统计基本介绍完毕,

github地址:

https://github.com/wawacode/flink_wordcount

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/55870.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【智能电网随机调度】智能电网的双层模型时间尺度随机优化调度(Matlab代码实现)

目录 1 概述 2 数学模型 3 运行结果 4 结论 5 参考文献 6 Matlab代码实现 1 概述 随着可再生能源发电量的增加&#xff0c;配电网的能源管理正成为一项计算上具有挑战性的任务。来自光伏&#xff08;PV&#xff09;装置的太阳能可以在一分钟内发生显着变化。可以命令光伏…

MongoDB安装Mac M1

1、下载安装包&#xff1a; axInstall MongoDB Community Edition on macOS — MongoDB Manualhttps://www.mongodb.com/docs/v6.0/tutorial/install-mongodb-on-os-x/下载解压&#xff0c;重命名为mongodb 放到 /usr/local 目录下 2、配置文件打开配置文件 open -e .bash_p…

java计算机毕业设计ssm某大学校园竞赛管理系统07494(附源码、数据库)

java计算机毕业设计ssm某大学校园竞赛管理系统07494&#xff08;附源码、数据库&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&…

阿里云企业用户12月活动:c7/g7/r7系列产品报价出炉

阿里云企业用户12月优惠活动开始了&#xff0c;阿里云超企业用户热销产品&#xff0c;c7/g7/r7系列&#xff0c;4折起&#xff0c;性能优异&#xff0c;助您数智化转型。通过芯片快速路径加速&#xff0c;实现存储、网络性能以及计算稳定性的大幅提升。下方卡片显示价格为企业新…

一文读懂 HTTP/2 特性

HTTP/2 是 HTTP 协议自 1999 年 HTTP 1.1 发布后的首个更新&#xff0c;主要基于 SPDY 协议。由互联网工程任务组&#xff08;IETF&#xff09;的 Hypertext Transfer Protocol Bis&#xff08;httpbis&#xff09;工作小组进行开发。该组织于2014年12月将HTTP/2标准提议递交至…

Redis专题(六):Redis主从复制、哨兵搭建以及原理

安装redis&#xff1a;Rdis专题目录 搭建步骤 复制 redis.conf文件 > redis-6380.conf 。将上述复制的文件修改如下配置&#xff1a; a. 端口号&#xff1a;port 6380 b. pidfile /var/run/redis_6380.pid #吧pid进程号写入pidfile配置的文件 c. logfile “6380.log” d. d…

JUC系列(七) ForkJion任务拆分与异步回调

&#x1f4e3; &#x1f4e3; &#x1f4e3; &#x1f4e2;&#x1f4e2;&#x1f4e2; ☀️☀️你好啊&#xff01;小伙伴&#xff0c;我是小冷。是一个兴趣驱动自学练习两年半的的Java工程师。 &#x1f4d2; 一位十分喜欢将知识分享出来的Java博主⭐️⭐️⭐️&#xff0c;擅…

【OPENCV_系列电子PDF图书连载】计算机视觉从入门到精通完整学习路线专栏

OPENCV_PDF图书连载之— 图像的几何变换 一、图像几何变换_a:图像坐标仿射 仿射自定义代码展示&#xff1a; warpAffine.pointsAffine【自定义包】 from img_pakage.ocv import warpAffineimg_path f../img/three_angle.png warpAffine.pointsAffine(img_path,0,0,24,217,…

python数据容器——元组、字符串

目录 一.思考 二.元组 元组定义 元组不可修改 注意事项 三.元组的操作 1.嵌套使用 2. .index方法 3. .count方法 4. len&#xff08;元组&#xff09;方法 四.字符串 1.字符串的下标 2.字符串的常用操作 字符串.index(字符串) 查找特定字符串的下标索引值 ​编辑…

1.1 大数据简介-hadoop-最全最完整的保姆级的java大数据学习资料

文章目录1 hadoop-最全最完整的保姆级的java大数据学习资料1.1 大数据简介1.1.1 大数据的定义1.1.2 大数据的特点1.1.3 大数据的应用场景1.1.4 大数据的发展趋势及职业路线1.4.4.1 大数据发展趋势1.4.4.2 大数据职业发展路线1 hadoop-最全最完整的保姆级的java大数据学习资料 大…

同事老是吐槽我的接口性能差,原来真凶就在这里!

V-xin&#xff1a;ruyuanhadeng获得600页原创精品文章汇总PDF 一、前情回顾 上篇文章&#xff1a;《为什么每个程序员都必须坚持写博客&#xff1f;这篇文章教你怎么写&#xff01;》聊了一下系统架构中&#xff0c;百亿流量级别高并发写入场景下&#xff0c;如何承载这种高并…

Kotlin高仿微信-第37篇-拍照

Kotlin高仿微信-项目实践58篇详细讲解了各个功能点&#xff0c;包括&#xff1a;注册、登录、主页、单聊(文本、表情、语音、图片、小视频、视频通话、语音通话、红包、转账)、群聊、个人信息、朋友圈、支付服务、扫一扫、搜索好友、添加好友、开通VIP等众多功能。 Kotlin高仿…

ZPL II 语言编程基础

ZPL II 语言概述 ZPL语言是一种script语言&#xff0c;分为ZPL语言和ZPL II 语言Zebra打印机支持最广泛的一种语言 ZPL II语言支持复杂标签格式&#xff0c;如文字&#xff0c;图片&#xff0c;条形码&#xff0c;序列号打印等等 ZPL II文件可以通过以下两种方式实现 纯文本编…

java字符编码总结

一、字符集(Charcater Set)与字符编码(Encoding) 字符集(Charcater Set 或 Charset)&#xff1a;是一个系统支持的所有抽象字符的集合&#xff0c;也就是一系列字符的集合。字符是各种文字和符号的总称&#xff0c;包括各国家文字、标点符号、图形符号、数字等。常见的字符集有…

记录--从AI到美颜全流程讲解

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 美颜和短视频 美颜相关APP可以说是现在手机上的必备的软件&#xff0c;例如抖音&#xff0c;快手&#xff0c;拍出的“照骗”和视频不加美颜效果&#xff0c;估计没有人敢传到网上。很多人一直好奇美颜…

力扣hot100——第3天:11盛最多水的容器、15三数之和、17电话号码的字母组合

文章目录1.11盛最多水的容器1.1.题目1.2.解答1.2.1.题解1.2.2.自己对参考题解的进一步解释2.15三数之和【代码随想录已刷】3.17电话号码的字母组合【代码随想录已刷】1.11盛最多水的容器 参考&#xff1a;力扣题目链接&#xff1b;题解 1.1.题目 1.2.解答 1.2.1.题解 这道题…

Mybatis-多表联查

多表联查一、步骤一:创建pojo实体类二、步骤二&#xff1a;明确两个实体类之间的关系三、步骤三:修改pojo实体类四、步骤四&#xff1a;编写Mapper接口五、步骤五&#xff1a;编写Mapper映射文件题目1&#xff1a;通过订单id查询订单详情以及所属用户题目2&#xff1a;通过用户…

OpenCV入门(C++/Python)- 使用OpenCV读取、显示和写入图像(一)

使用OpenCV读取、显示和写入图像1.imread()读取图像imread()函数2.imshow()在窗口中显示图像waitKey()destoryAllWindows()3.imwrite()将图像写入文件目录读取、显示和写入图像是图像处理和计算机视觉的基础。即使裁剪、调整大小、旋转或应用不同的过滤器来处理图像&#xff0c…

C. Carrying Conundrum(思维 + 奇偶数位)

Problem - 1567C - Codeforces 爱丽丝刚刚学会了加法。但是&#xff0c;她还没有完全学会 "携带 "的概念--她不是携带到下一列&#xff0c;而是携带到左边两列的列。 例如&#xff0c;评估20392976这个和的常规方法是如图所示。 然而&#xff0c;爱丽丝是按照图中的…

【在SpringBoot项目中使用Validation框架检查数据格式-常用的检查注解】

常用的检查注解 使用Validation框架检查数据格式时&#xff0c;常用的检查注解有&#xff1a; NotNull&#xff1a;不允许为null值 可用于任何类型的参数NotEmpty&#xff1a;不允许为空字符串&#xff0c;即长度为0的字符串 仅用于检查字符串类型的参数NotBlank&#xff1a;不…