Flink WordCount实践

news2024/11/28 12:43:00

目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

	<properties>
        <flink.version>1.17.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

</mirrors>上面一行添加阿里云仓库镜像

	<mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>        
    </mirror>

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world
hello java
hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

org.exmaple下新建wc包及BatchWordCount

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件读取数据 按行读取
        DataSource<String> lineDS = env.readTextFile("data/words.txt");

        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                    out.collect(Tuple2.of(word,1L));
                }
            }
        });

        // 4. 按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                        String[] words = line.split(" ");

                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

运行结果

与批处理程序BatchWordCount的区别:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。

  • 转换处理之后,得到的数据对象类型不同。

  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。

  • 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
        DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);

        // 3. 转换、分组、求和,得到统计结果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");

                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();

        // 5. 执行
        env.execute();
    }
}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群
[hadoop@node2 ~]$ start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
​
开启nc监听
[hadoop@node2 ~]$ nc -lk 7777
​
命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果
Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log
[hadoop@node2 log]$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out 
(hello,1)
(flink,1)
(hello,2)
(world,1)
​

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群
[hadoop@node2 ~]$ stop-cluster.sh 
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1581834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

37-代码测试(下):Go语言其他测试类型及IAM测试介绍

。 Go中的两类测试&#xff1a;单元测试和性能测试。 我就来介绍下Go 语言中的其他测试类型&#xff1a;示例测试、TestMain函数、Mock测试、Fake测试等&#xff0c; 示例测试 示例测试以Example开头&#xff0c;没有输入和返回参数&#xff0c;通常保存在example_test.go…

Verilog实现手表计时

实现手表的计时功能&#xff1a; 1.具有start启动信号、pause暂停信号&#xff0c;可以自定义其触发机制。 2.具有时间更改接口&#xff0c;可以更改时、分、秒。 3.输出时、分、秒。 Verilog设计 模块端口定义&#xff1a; module watch1(input wire clk …

mp4转flv怎么转?电脑怎么把视频转成flv?

MP4&#xff08;MPEG-4 Part 14&#xff09;是一种多媒体容器格式&#xff0c;广泛用于包含视频、音频、字幕等多种数据流。MP4因其高度灵活性、压缩效率和兼容性成为视频领域的主流格式&#xff0c;支持范围涵盖从在线视频到移动设备的各类应用场景。 FLV文件格式的多个优点 …

Linux虚拟内存简介

Linux&#xff0c;像多数现代内核一样&#xff0c;采用了虚拟内存管理技术。该技术利用了大多数程序的一个典型特征&#xff0c;即访问局部性&#xff08;locality of reference&#xff09;&#xff0c;以求高效使用CPU和RAM&#xff08;物理内存&#xff09;资源。大多数程序…

使用MongoDB 构建AI:轻松应对从预测式AI到生成式AI

毫无疑问&#xff0c;如今从生成式AI (GenAI )中获益最大的&#xff0c;是那些早已运用预测式AI (Predictive AI )的组织。 2023年6月&#xff0c;麦肯锡在2023年6月发布的《生成式人工智能的经济潜力》研究中也得出了与此相同的结论。 原因主要有以下几点&#xff1a; 内部文…

SCI一区 | Matlab实现OOA-TCN-BiGRU-Attention鱼鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测

SCI一区 | Matlab实现OOA-TCN-BiGRU-Attention鱼鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测 目录 SCI一区 | Matlab实现OOA-TCN-BiGRU-Attention鱼鹰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测预测效果基本介绍模型描述程序…

HarmonyOS 开发-MpChart运动健康场景实践案例

介绍 MpChart是一个包含各种类型图表的图表库&#xff0c;主要用于业务数据汇总&#xff0c;例如销售数据走势图&#xff0c;股价走势图等场景中使用&#xff0c;方便开发者快速实现图表UI&#xff0c;MpChart主要包括线形图、柱状图、饼状图、蜡烛图、气泡图、雷达图、瀑布图…

218基于matlab的有限差分法求解泊松方程

基于matlab的有限差分法求解泊松方程&#xff0c;采用SOR超松弛迭代法。模型采用方形区域&#xff0c;划分网格数为100*100&#xff0c;网格数可以很方便的更改。程序已调通&#xff0c;可直接运行。 218有限差分法 泊松方程 SOR超松弛迭代法 - 小红书 (xiaohongshu.com)

react17+antd4 动态渲染导航菜单中的icon

在路由信息对照表中的icon可以有两种形式&#xff1a;一种是组件形式&#xff0c;一种是字符串形式的。 在antd4的Menu.Item和SubMenu中的icon属性的格式为&#xff1a; 1.组件形式 这种方法在渲染时很方便&#xff0c;与antd中的Menu.Item中的icon属性的形式是一致的&#…

第9章 文件和内容管理

思维导图 9.1 引言 文件和内容管理是指针对存储在关系型数据库之外的数据和信息的采集、存储、访问和使用过程的管理。它的重点在于保持文件和其他非结构化或半结构化信息的完整性&#xff0c;并使这些信息能够被访问。文件和非结构化内容也应是安全且高质量的。 确保文件和内容…

说说我理解的数据库中的Schema吧

一、SQL标准对schema如何定义&#xff1f; ISO/IEC 9075-1 SQL标准中将schema定义为描述符的持久命名集合&#xff08;a persistent, named collection of descriptors&#xff09;。 大部分的网上资料定义Schema如下&#xff1a; schema是用来组织和管理数据的一种方式。它…

【攻防世界】mfw(.git文件泄露)

首先进入题目环境&#xff0c;检查页面、页面源代码、以及URL&#xff1a; 发现页面无异常。 使用 dirsearch 扫描网站&#xff0c;检查是否存在可访问的文件或者文件泄露&#xff1a; 发现 可访问界面/templates/ 以及 .git文件泄露&#xff0c;故使用 GItHack 来查看泄露的 …

Visual Studio Code SSH 连接远程服务器

Visual Studio Code通过 SSH 连接远程服务器并实现免密登录&#xff0c;你可以按照以下步骤进行操作&#xff1a; 1. **安装插件**&#xff1a;首先&#xff0c;在 VS Code 中安装 "Remote - SSH" 插件。打开 VS Code&#xff0c;点击左侧的扩展图标&#xff0c;搜索…

springboot3整合consul实现服务注册和配置管理快速入门

服务注册&#xff1a; 配置管理&#xff1a; 注册中心的比较&#xff1a; 在微服务的世界中&#xff0c;服务注册是必不可少的。现在比较流行的也就是Consul和Nacos&#xff0c;Zookeeper没有管理界面&#xff0c;一般不建议使用&#xff0c;而Eureka已经处于停更&#xff0…

【VUE】Vue3+Element Plus动态间距处理

目录 1. 动态间距调整1.1 效果演示1.2 代码演示 2. 固定间距2.1 效果演示2.2 代码演示 其他情况 1. 动态间距调整 1.1 效果演示 并行效果 并列效果 1.2 代码演示 <template><div style"margin-bottom: 15px">direction:<el-radio v-model"d…

解析大语言模型训练三阶段

大语言模型的训练过程一般包括3个阶段&#xff1a;预训练&#xff08;Pre-training&#xff09;、SFT&#xff08;有监督的微调&#xff0c;Supervised-Finetuning&#xff09;以及RLHF&#xff08;基于人类反馈的强化学习&#xff0c;Reinforcement Learning from Human Feedb…

【Python系列】Jupyter Notebook 中执行 Shell 脚本的方法

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

PDF-XChange 10.0 下载地址及安装教程

PDF-XChange是一款功能强大的PDF阅读器和编辑工具。它提供了一系列丰富的功能&#xff0c;可用于查看、注释、编辑和转换PDF文件。 作为一个PDF阅读器&#xff0c;PDF-XChange提供了快速和高效的阅读体验。它支持多种视图模式、缩放选项和导航工具&#xff0c;使用户能够方便地…

python小游戏

这些游戏你玩过几个&#xff1f; 1.贪吃蛇2.吃豆人3.加农炮4.四子棋5. Fly Bird<font color #f3704ab>6.记忆&#xff1a;数字对拼图游戏&#xff08;欢迎挑战&#xff01;用时&#xff1a;2min&#xff09;7.乒乓球8.上课划水必备-井字游戏&#xff08;我敢说100%的人都…

人工智能——大语言模型

5. 大语言模型 5.1. 语言模型历史 20世纪90年代以前的语言模型都是基于语法分析这种方法&#xff0c;效果一直不佳。到了20世纪90年代&#xff0c;采用统计学方法分析语言&#xff0c;取得了重大进展。但是在庞大而复杂的语言信息上&#xff0c;基于传统统计的因为计算量巨大…