【Flink入门修炼】1-3 Flink WordCount 入门实现

news2024/9/24 12:28:56

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。

一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。

mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2

因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目
image.png

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.2</flink.version> <!-- 1.14 之后部分类和函数有变化,可自行探索 -->
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //参数检查
        if (args.length != 2) {
            // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            // return;
            args = new String[]{"127.0.0.1", "9000"};
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // 创建 streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // 计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:

nc -l 9000

然后启动程序,能看到控制台一些输出:
image.png

接下来,在 nc 中输入:

$ nc -l 9000
hello world
flink flink flink

回到我们的程序,能看到统计的输出:

3> (hello,1)
6> (world,1)
8> (flink,1)
8> (flink,2)
8> (flink,3)

image.png

三)如果有报错

如果出现执行报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
	at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
	... 1 more

在 IDE 中把 「Add dependencies with “Provided” scope to classpath」勾选上:
image.png

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:

select word as word, sum(frequency) as frequency from WordCount group by word
  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。

其中,WordCount 表数据如下:

wordfrequency
hello1
world1
flink1
flink1
flink1

那么接下来我们看,如何写一个 FlinkSQL 的程序。

二)环境和程序

首先,添加 FlinkSQL 需要的依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

程序如下:

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 创建上下文环境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        // 读取一行模拟数据作为输入
        String words = "hello world flink flink flink";
        String[] split = words.split("\\W+");

        ArrayList<WC> list = new ArrayList<>();

        for (String word : split) {
            WC wc = new WC(word, 1);
            list.add(wc);
        }

        DataSource<WC> input = fbEnv.fromCollection(list);

        // DataSet 转 SQL,指定字段名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        // 注册为一个表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");

        DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
        ds1.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

执行,结果输出:

(
  `word` STRING,
  `frequency` BIGINT
)
flink, 3
world, 1
hello, 1

image.png

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440240.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pwn学习笔记(1)前置基础

pwn学习笔记&#xff08;1&#xff09; &#xff08;1&#xff09;pwn简介&#xff1a; ​ 以下来自于百度百科&#xff1a;”Pwn”是一个黑客语法的俚语词&#xff0c;是指攻破设备或者系统发音类似“砰”&#xff0c;对黑客而言&#xff0c;这就是成功实施黑客攻击的声音—…

【开源】SpringBoot框架开发医院门诊预约挂号系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 功能性需求2.1.1 数据中心模块2.1.2 科室医生档案模块2.1.3 预约挂号模块2.1.4 医院时政模块 2.2 可行性分析2.2.1 可靠性2.2.2 易用性2.2.3 维护性 三、数据库设计3.1 用户表3.2 科室档案表3.3 医生档案表3.4 医生放号…

【前端素材】bootstrap4实现绿色植物Lukani平台

一、需求分析 绿色植物商城是一个专门销售绿色植物的零售商店或在线平台。它提供各种类型和品种的室内植物、室外植物和盆栽等。绿色植物商城的作用可以从以下几个方面来分析&#xff1a; 1. 提供多样化的选择&#xff1a;绿色植物商城通常会提供各种各样的绿色植物选项&…

.NET Core 实现 JWT 认证

写在前面 JWT&#xff08;JSON Web Token&#xff09;是一种开放标准, 由三部分组成&#xff0c;分别是Header、Payload和Signature&#xff0c;它以 JSON 对象的方式在各方之间安全地传输信息。通俗的说&#xff0c;就是通过数字签名算法生产一个字符串&#xff0c;然后在网络…

『运维备忘录』之 Kubernetes(K8S) 常用命令速查

一、简介 kubernetes&#xff0c;简称K8s&#xff0c;是用8代替名字中间的8个字符“ubernete”而成的缩写&#xff0c;是一个开源的&#xff0c;用于管理云平台中多个主机上的容器化的应用。kubernetes是基于容器技术的分布式架构解决方案&#xff0c;具有完备的集群管理能力&a…

UDP端口探活的那些细节

一 背景 商业客户反馈用categraf的net_response插件配置了udp探测, 遇到报错了&#xff0c;如图 udp是无连接的&#xff0c;无法用建立连接的形式判断端口。 插件最初的设计是需要配置udp的发送字符&#xff0c;并且配置期望返回的字符串&#xff0c; [[instances]] targets…

【PyTorch][chapter 14][李宏毅深度学习][Word Embedding]

前言&#xff1a; 这是用于自然语言处理中数据降维的一种方案。 我们希望用一个向量来表示每一个单词. 有不同的方案 目录&#xff1a; one-hot Encoding word-class 词的上下文表示 count-based perdition-based CBOW Skip-Gram word Embedding 词向量相似…

数据结构(C语言)代码实现(八)——顺序栈实现数值转换行编辑程序汉诺塔

目录 参考资料 顺序栈的实现 头文件SqStack.h&#xff08;顺序栈函数声明&#xff09; 源文件SqStack.cpp&#xff08;顺序栈函数实现&#xff09; 顺序栈的三个应用 数值转换 行编辑程序 顺序栈的实现测试 栈与递归的实现&#xff08;以汉诺塔为例&#xff09; 参考资…

Vision Pro新机测评!“这才是MR硬件该有的模样!”

期盼很久的Vision Pro终于到了&#xff0c;小编迫不及待地体验了一把&#xff0c;效果相当非常震撼&#xff0c;操作非常丝滑&#xff0c;画面非常清晰…来不急解释了&#xff0c;快和小编一起来看一下吧~ 新机一到公司&#xff0c;为解大家对Vision Pro 的“相思之苦”&#x…

(43)找出中枢整数

文章目录 每日一言题目解题思路法一&#xff1a;法二&#xff1a; 代码法一&#xff1a;法二&#xff1a; 结语 每日一言 即使慢&#xff0c;驰而不息&#xff0c;纵令落后&#xff0c;纵令失败&#xff0c;但一定可以达到他所向往的目标。——鲁迅 题目 题目链接&#xff1a…

SQL注入(SQL Injection)从注入到拖库 —— 简单的手工注入实战指南精讲

基本SQL注入步骤&#xff1a; 识别目标&#xff1a;确定目标网站或应用程序存在潜在的SQL注入漏洞。收集信息&#xff1a;通过查看页面源代码、URL参数和可能的错误信息等&#xff0c;搜集与注入有关的信息。判断注入点&#xff1a;确定可以注入的位置&#xff0c;比如输入框、…

网络套件字(理论知识)

一、源IP地址和目的IP地址 上次说到IP地址是为了是为了让信息正确的从原主机传送到目的主机&#xff0c;而原IP地址和目的IP地址就是用于标识两个主机的&#xff0c;既然叫做地址必然有着路径规划的作用&#xff0c;而路径规划最重要的就是&#xff0c;从哪来到哪去&#xff0…

tab 切换类交互功能实现

tab切换类交互&#xff1a; 记录激活项&#xff08;整个对象/id/index)动态类型控制 下面以一个地址 tab 切换业务功能为例&#xff1a; <div class"text item" :class"{active : activeAddress.id item.id}" click"switchAddress(item)"…

2023年出版的新书中提到的《人月神话》(202402更新)(2)共8本

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 《人月神话》于1975年出版&#xff0c;1995年出二十周年版。自出版以来&#xff0c;该书被大量的书籍和文章引用&#xff0c;直到现在热潮不退。 2023年&#xff0c;清华大学出版社推…

MySQL 升级脚本制作

当数据库更新字段后或添加一些基础信息&#xff0c;要对生产环境进行升级&#xff0c;之前都是手动编写sql&#xff0c;容易出错还容易缺失。 通过 Navcat 工具的数据库结构同步功能和数据同步功能完成数据库脚本的制作。 一、结构同步功能 1、选择 工具–结构同步&#xff1…

WifiConfigStore初始化读取-Android13

WifiConfigStore初始化读取 1、StoreData创建并注册2、WifiConfigStore读取2.1 文件读取流程2.2 时序图2.3 日志 1、StoreData创建并注册 packages/modules/Wifi/service/java/com/android/server/wifi/WifiConfigManager.java mWifiConfigStore.registerStoreData(mNetworkL…

机器学习 - 梯度下降

场景 上一章学习了代价函数&#xff0c;在机器学习中&#xff0c;代价模型是用于衡量模型预测值与真实值之间的差异的函数。它是优化算法的核心&#xff0c;目标是通过调整模型的参数来最小化代价模型的值&#xff0c;从而使模型的预测结果更接近真实值。常见的代价模型是均方…

猫头虎分享已解决Bug ‍ || TypeError: Object of type ‘int64‘ is not JSON serializable

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

fghbbbbbbbbbb

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起探讨和分享Linux C/C/Python/Shell编程、机器人技术、机器学习、机器视觉、嵌入式AI相关领域的知识和技术。 磁盘满的本质分析 专栏&#xff1a;《Linux从小白到大神》 | 系统学习Linux开发、VIM/GCC/GDB/Make工具…

【精选】java继承进阶——构造方法的访问特点 this、super使用

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…