Flink官方例子解析:WordCount

news2025/1/10 2:26:42

1. 简介

今天介绍的是官方子项目flink-examples-streaming里面的WordCount例子。

在这里插入图片描述

WordCount ,中文:单词统计,是大数据计算常用的例子。

2. WordCount需要实现的功能

  • 监听指定目录下的文件,读取文件的文本内容;如果未指定监听路径,则读取静态的字符串变量
  • 分词
  • 统计每个单词的出现次数
  • 把单词统计的结果输出到指定的文件中;如果未指定输出路径,则把结果打印输出

参数说明:
--input 指定监听目录, 非必填
--output 指定结果输出的文件路径, 非必填
--discovery-interval 指定监听的间隔时间, 非必填
--execution-mode 指定Flink的执行模式,非必填,默认为STREAMING模式

3. 代码实现

3.1 指定监听目录

//使用工具类CLI的fromArgs方法解析参数
final CLI params = CLI.fromArgs(args);

//setGlobalJobParameters(params),可以在Flink Web UI 中查看到传入的参数
env.getConfig().setGlobalJobParameters(params);

if (params.getInputs().isPresent()) {  
    // 如果指定了--input参数,则创建file source, 从指定的路径读取文件
   
        FileSource.FileSourceBuilder<String> builder =  
            FileSource.forRecordStreamFormat(  
                    new TextLineInputFormat(), params.getInputs().get());  

    // 如果指定了--discovery-interval参数,file source 会持续监听指定的目录的新文件      
    params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);  
  
    // 指定算子名称为"file-input"
    text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");  
} else {
    // 没有指定--input参数, 从静态变量WordCountData.WORDS读取数据
    // 指定算子名称为"in-memory-input"
    text = env.fromElements(WordCountData.WORDS).name("in-memory-input");  
}

3.2 分词

要统计一段的文章每个单词的词频,分词是重要的一步,例如,这一段:
"To be, or not to be,--that is the question:--"
我们需要忽略掉空格,以及逗号等特殊字符,只保留单词

public static final class Tokenizer  
        implements FlatMapFunction<String, Tuple2<String, Integer>> {  
  
    @Override  
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {  
        // 把字符串转为小写后按单词分隔,存入到数组
        String[] tokens = value.toLowerCase().split("\\W+");  
  
        // 输出每个单词
        for (String token : tokens) {  
            if (token.length() > 0) {  
                out.collect(new Tuple2<>(token, 1));  
            }  
        }  
    }  
}

3.3 统计每个单词的出现次数

DataStream<Tuple2<String, Integer>> counts =  
      text
        // 分词处理,等到二元组 (word, 1) , 
        .flatMap(new Tokenizer()).name("tokenizer")   
        // 按单词分组,f0指的是二元组中的第一个字段        
        .keyBy(value -> value.f0)  
        //单词每出现一次累加1,并输出最新的结果                       
        .sum(1)  
        .name("counter");

3.4 指定输出路径

if (params.getOutput().isPresent()) {  
    // 如果指定了输出的目录,则创建一个FileSink, 并把算子命名为file-sink  
    // 设置输出到文件的策略,1.内存的数据大于1M; 2.每隔10秒输出 。满足其中一个条件即输出
        counts.sinkTo(  
                    FileSink.<Tuple2<String, Integer>>forRowFormat(  
                                    params.getOutput().get(), new SimpleStringEncoder<>())  
                            .withRollingPolicy(  
                                    DefaultRollingPolicy.builder()  
                                        .withMaxPartSize(MemorySize.ofMebiBytes(1))  
                                        .withRolloverInterval(Duration.ofSeconds(10))  
                                        .build())  
                            .build())  
            .name("file-sink");  
} else { 
    // 没有指定输出的目录则打印, 算子名字为print-sink
    counts.print().name("print-sink");  
}

获取WordCount完整代码请参考文章: Flink官方例子解析:Flink源码子项目flink-examples
代码可在IDEA IntelliJ运行。

4. 执行效果

4.1 在IDEA IntelliJ中配置程序的参数

在这里插入图片描述

--input D:\project\source_code\flink\flink-examples\flink-examples-streaming\src\main\java\org\apache\flink\streaming\examples\wordcount\input --discovery-interval 20 --output D:\project\source_code\flink\flink-examples\flink-examples-streaming\src\main\java\org\apache\flink\streaming\examples\wordcount\output

4.2 启动程序

程序启动后,可在Flink WebUI看到以下DAG图

在这里插入图片描述

5. 结语

本篇到此结束,欢迎订阅Flink专栏,学习更多Flink的相关知识。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/189641.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python继承机制及其使用

Python 类的封装、继承、多态 3 大特性&#xff0c;前面章节已经详细介绍了 Python 类的封装&#xff0c;本节继续讲解 Python 类的继承机制。继承机制经常用于创建和现有类功能类似的新类&#xff0c;又或是新类只需要在现有类基础上添加一些成员&#xff08;属性和方法&#…

RASP技术进阶系列(三):重大漏洞自动化热修复

在上篇文章《RASP技术进阶系列&#xff08;二&#xff09;&#xff1a;东西向Web流量智能检测防御》中提到&#xff0c;在企业日常安全运营以及HW场景下&#xff0c;应用漏洞攻击应急响应和恶意流量溯源分析是安全团队的重点工作。在恶意流量溯源方面&#xff0c;指向攻击来源的…

趁着你对象吃泡面的功夫,我修复了误删除的文件

文章目录前言一. linux下文件删除原理1.1 文件删除原理的简单介绍1.2 测试inode号是否容易被覆盖&#xff1f;二. 实验测试过程2.1 实验环境&#xff1a;2.2 新增一块硬盘测试2.3 对磁盘分区2.3.1 分区&#xff08;使用fdisk分区&#xff09;2.3.2 格式化&#xff0c;创建目录挂…

网络化多智能体系统的共识与合作

在所有参与者之间提供快速协议和团队合作的算法通过自组织网络系统实现有效的任务执行。By Reza Olfati-Saber, Member IEEE, J. Alex Fax, and Richard M. Murray, Fellow IEEE小于 翻译摘要&#xff1a;本文提供了一个理论框架&#xff0c;用于分析多智能体网络系统的共识算法…

Linux文件与目录的查看:ls

前言 ls作为我们在Linux系统中最常用的命令&#xff0c;因为我们常常需要去知道文件或是目录的相关信息&#xff0c;但我们Linux的文件所记录的信息实在是太多了&#xff0c;ls也没有需要全部都列出来&#xff0c;所以&#xff0c;当我们执行ls命令时&#xff0c;默认显示的只…

【数据结构】基础:二叉搜索树

【数据结构】基础&#xff1a;二叉搜索树 摘要&#xff1a;本文为二叉树的进阶&#xff0c;主要介绍其概念与基本实现&#xff08;递归与非递归&#xff09;&#xff0c;再介绍其应用&#xff0c;主要介绍内容为KV模型。最后为简单的性能分析。 文章目录【数据结构】基础&#…

【数据结构】1.1 数据结构的研究内容

文章目录数据结构的研究内容数据结构研究的内容小结数据结构的研究内容 早期&#xff0c;计算机主要用于数值计算: 首先&#xff0c;分析问题、提取操作对象&#xff0c;然后&#xff0c;找出操作对象之间的关系&#xff0c;用数学语言加以描述&#xff0c;建立相应数学方程。…

Java日志门面技术 SLF4J

文章目录背景SLF4J概述切换日志框架实际应用配合自身简单日志实现(slf4j-simple)配置logback日志实现配置Log4J日志实现(需适配器)配置JUL日志实现(需适配器)添加slf4j-nop依赖(日志开关)桥接旧的日志实现框架背景 随着系统开发的进行&#xff0c;可能会更新不同的日志框架&am…

TF数据流图图与TensorBoard

2.1 TF数据流图 学习目标 目标 说明TensorFlow的数据流图结构应用 无内容预览 2.1.1 案例&#xff1a;TensorFlow实现一个加法运算 1 代码2 TensorFlow结构分析2.1.2 数据流图介绍 2.1.1 案例&#xff1a;TensorFlow实现一个加法运算 2.1.1.1 代码 def tensorflow_demo():&…

CMMI对企业有什么价值,如何高效落地?

1、获得权威认证 CMMI是全球性软件与系统工程行业的唯一权威认证&#xff0c;是对企业软件研发与能力服务的认可。 CMMI企业价值 CoCode项目管理全面支持CMMI3-5级高效落地​ 2、降本增效&#xff0c;提高企业能力。 CMMI对软件开发过程进行规范化梳理&#xff0c;保证软…

虚拟机ubuntu系统内存满,无法进入桌面,扩展内存

1、 关闭虚拟机&#xff0c;在虚拟机设置中将原先20GB扩展到30GB 注意&#xff1a;有快照需要删除快照后才能扩展 2、命令行进入ubuntu 内存满了&#xff0c;无法进入Ubuntu图形界面 按下ctrlaltf2~f6组合键 输入用户名和密码进入命令行模式 3、删除一些东西 删除回收站…

vuex的modules和辅助函数

一、回顾&#xff1a;vuex状态管理器1、版本问题&#xff1a;vue2对应的是vuex3&#xff1b;vue3对应的vuex42、vuex作用&#xff1a;每个vuex中都有一个Store(仓库)&#xff0c;用于管理vue项目中用到的状态变量&#xff08;属性&#xff09;。vuex维护的是一个单一的状态树vu…

工作常用cron总结

一、cron表达式详解 corn从左到右&#xff08;用空格隔开&#xff09;&#xff1a; 秒 分 小时 日 月 周 (星期中的日期&#xff0c;1代表周日&#xff0c;7代表周六) 年 定时任务统计 数据同步 0 0 10 * * &#xff1f; 每天上午10点触发…

Spring 整合Mybatis。

目录 一、环境准备 1、Mybatis 环境 2、整合思路分析 二、Spring整合Mybatis 三、Spring整合Junit 一、环境准备 1、Mybatis 环境 ▶ 步骤1 : 准备数据库表 Mybatis是来操作数据库表&#xff0c;所以先创建一个数据库及表 create database spring_db character set utf8; …

LeetCode刷题系列 -- 1008. 前序遍历构造二叉搜索树

给定一个整数数组&#xff0c;它表示BST(即 二叉搜索树 )的 先序遍历 &#xff0c;构造树并返回其根。保证 对于给定的测试用例&#xff0c;总是有可能找到具有给定需求的二叉搜索树。二叉搜索树 是一棵二叉树&#xff0c;其中每个节点&#xff0c; Node.left 的任何后代的值 严…

JVM的理解(垃圾回收算法和类加载过程)

文章目录1、JVM的位置2、JVM的体系结构3、JVM组件3.1、类加载器&#xff08;加载class文件&#xff09;3.1.1、类加载器的执行步骤3.2、PC寄存器3.3、方法区3.4、栈3.5、堆4、GC算法4.1、引用计数法4.2、复制算法1、模型2、原理图4.3、标记清除4.4、标记压缩总结&#xff1a;1、…

2023年了学Java还能找到工作么?

Java人才需求缺口巨大 为何还有人找不到工作&#xff1f; 近两年&#xff0c;传统企业开始数字化转型&#xff0c;各企业对互联网IT技术人才呈现井喷趋势。对于进可攻前端、后可守后端的Java程序员而言&#xff0c;市场对他们青睐有加&#xff0c;薪资更是水涨船高。然而在…

Cesium 本地化部署和新增sandcastle案例

源码下载git clone https://gitee.com/mirrors-gis/cesium.gitcd cesium npm install // or yarn install构建 因为下载的源码,还没有构建出cesium的api,以及api对应的文档 ,如果此时直接运行 npm start ,会启动一个8080端口的一个服务,通过 http://localhost:8080 可以看…

SpringCloud Alibaba_Nacos服务注册和配置中心

目录一、Nacos简介1.为什么叫Nacos2.是什么3.能干嘛4.去哪下5.各种注册中心比较二、安装并运行Nacos三、Nacos作为服务注册中心演示1.官网文档2.基于Nacos的服务提供者3.基于Nacos的服务消费者4.服务注册中心对比4.1 Nacos和CAP4.2 Nacos支持AP和CP模式的切换四、Nacos作为服务…

sonar的安装以及使用

sonar的安装以及使用简介1. sonar是什么2. SonarQube与Sonar安装1.下载sonarqubexxx.zip并且解压即可:2.配置数据库3.重启sonarQube会自动建表。使用1.下载sonar-scanner:(这个工具是对源码进行扫描&#xff0c;并将结果保存到数据库以便用上面的sonarqube进行分析)2.配置mysql…