0基础学习PyFlink——使用DataStream进行字数统计

news2025/2/25 12:27:45

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

word_count_data = ["A C B",
                   "A E B",
                   "E C D"]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.STRING()
    # define the source
    source = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):
        for s in line.split():
            yield s
            
    splitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

word_count_data = ["A C B",
                   "A E B",
                   "E C D"]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.STRING()
    # define the source
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    def split(line):
        for s in line.split():
            yield s
            
    splitted = source.flat_map(split) 
    # splitted.print()
    mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))
    # mapped.print()
    keyed=mapped.key_by(lambda i: i[0]) 
    # keyed.print()
    reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

    # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1158883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OSPF高级特性

OSPF高级特性(1) 一、OSPF不规则区域类型 产生原因:区域划分不合理,导致的问题 1、非骨干区域无法和骨干区域保持连通 2、骨干区域被分割 造成后果:非骨干区域没和骨干区域相连,导致ABR将不会帮忙转发区域间的路由信息。非骨干区…

MS3142电机驱动器可兼容LV8548M

MS3142/MS3142S 是一个双全桥电机驱动。可兼容LV8548M(功能基本一致,管脚不兼容)。电源电压供电范围 4V 到 18V,平均电流 1.1A,电流峰值 1.54A。如果需要更高的电流能力,可以将双全桥并联使用。 四个输入脚…

数据结构预算法--顺序表

1.顺序表 1.1概念及结构 顺序表是用一段物理地址连续的存储单元依次存储数据元素的线性结构,一般情况下采用数组存储。在数组上完成数据的增删查改。 顺序表一般可以分为: 1. 静态顺序表:使用定长数组存储元素。 2. 动态顺序表:使…

Shiro 身份验证绕过 (CVE-2020-13933)

一、漏洞描述 Apache Shiro是一个强大且易用的Java安全框架,执行身份验证、授权、密码和会话管理。 Apache Shiro身份验证绕过漏洞CVE-2020-11989的修复补丁存在缺陷,在1.5.3及其之前的版本,由于shiro在处理url时与spring仍然存在差异,依然存…

Day42 力扣动态规划 :123.买卖股票的最佳时机III |188.买卖股票的最佳时机IV

Day42 力扣动态规划 :123.买卖股票的最佳时机III |188.买卖股票的最佳时机IV 123.买卖股票的最佳时机III第一印象看完题解的思路dp数组:递推公式:初始化遍历顺序 实现中的困难感悟代码 188.买卖股票的最佳时机IV第一印象初始化递推公式看完题…

黄金矿工小游戏

欢迎来到程序小院 黄金矿工 玩法:点击开始游戏,黄金和钩子,钩子会左右摆动,对准黄金位置点击鼠标左键钓起黄金加对应时间,钓起黑色四块减去响应时间,快去挖矿吧^^。开始游戏https://www.ormcc.com/play/ga…

【错误解决方案】Error: module ‘cv2‘ has no attribute ‘SURF‘

1. 错误提示 python-opencv高版本中,AttributeError: module cv2 has no attribute SURF问题; 错误提示:Error: module ‘cv2‘ has no attribute ‘SURF‘ 2. 解决方案 解决:将sift cv2.SIFT()替换为:sift cv2.x…

windows内存取证-中等难度-下篇

上文我们对第一台Target机器进行内存取证,今天我们继续往下学习,内存镜像请从上篇获取,这里不再进行赘述​ Gideon 攻击者访问了“Gideon”,他们向AllSafeCyberSec域控制器窃取文件,他们使用的密码是什么? 攻击者执…

day57--动态规划15

392.判断子序列 115.不同的子序列 第一题:判断子序列 给定字符串 s 和 t ,判断 s 是否为 t 的子序列。 字符串的一个子序列是原始字符串删除一些(也可以不删除)字符而不改变剩余字符相对位置形成的新字符串。(例如&…

大模型冷思考:企业“可控”价值创造空间还有多少?

文 | 智能相对论 作者 | 叶远风 毫无疑问,大模型热潮正一浪高过一浪。 在发展进程上,从最开始的技术比拼到现在已开始全面强调商业价值变现,百度、科大讯飞等厂商都喊出类似“不能落地的大模型没有意义”等口号。 在模型类型上&#xff0…

2023年【高压电工】考试及高压电工找解析

题库来源:安全生产模拟考试一点通公众号小程序 高压电工考试参考答案及高压电工考试试题解析是安全生产模拟考试一点通题库老师及高压电工操作证已考过的学员汇总,相对有效帮助高压电工找解析学员顺利通过考试。 1、【单选题】 额定容量是指:在变压器铭…

Spring源码分析篇:@Autowired 是怎样完成注入的?究竟是byType还是byName亦两者皆有

1. 五种不同场景下 Autowired 的使用 第一种情况 上下文中只有一个同类型的bean 配置类 package org.example.bean; ​ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; ​ Configuration public class…

docker-compose 简单部署MySQL Database

docker-compose 简单部署MySQL Database 本博文部署MySQL 并与上篇部署的 Flask进行关联 主博客目录:《从零开始学习搭建量化平台笔记》 文章目录 docker-compose 简单部署MySQL Database部署 MySQLMySQL 开放端口与权限 主项目计划需要搭建一个MySQL 数据库为其他部…

“零基础”PHP代码审计入门

目录 一、代码审计目的 二、代码审计基础 三、 代码审计思路 四、PHP核心配置 五、 代码审计环境 六、手动调试代码 七、PHP的弱类型 八、学习漏洞函数 九、审计入门总结 推荐一些demo: 一、代码审计目的 代码审计指的是对源代码进行检查,寻找…

智慧校园地下管线三维可视化管控平台减少人力和物力资源的浪费

随着科技的不断发展,三维可视化管理平台在各个领域得到了广泛的应用。三维可视化管理平台通过将数据以三维形式呈现,使得用户能够更直观地理解和分析数据,从而提高工作效率和决策质量。 VR数字孪生园区系统是通过将实际园区的各种数据和信息进…

开关电源老化试验和性能检测系统软件

开关电源自动测试系统由PC(工控机)、测试工装、可编程直流电源、数字示波器、可编程直流电子负载、继电器模块等部分组成,并通过RS232/LAN通讯总线、测试夹具以及其它线缆等进行连接,为系统组成结构。PC与可编程直流电源、数字示波器、可编程直流电子负载…

c++装饰器模式

前言 装饰器模式,就是可以对一个对象无限装饰一些东西,而且可以没有顺序。比如一个人可能只会说出他的名字,但是可以让他再说哈哈,可以说完哈哈之后再说哇哇。如何后面又不想装饰了,不需要改类原来的代码,…

什么是数字展览馆,数字展览馆有什么应用前景

引言: 数字展览馆作为一种新兴的文化艺术展示形式,以数字化技术和虚拟现实为基础,正在逐渐改变传统展览的方式。 一、什么是数字展览馆? 1.定义 数字展览馆是利用数字技术和虚拟现实技术打造的一种线上文化艺术展示平台。通过虚…

基于springboot实现疫情防控期间外出务工人员信息管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot疫情防控期间外出务工人员信息管理系统 摘要 网络的广泛应用给生活带来了十分的便利。所以把疫情防控期间某村外出务工人员信息管理与现在网络相结合,利用java技术建设疫情防控期间某村外出务工人员信息管理系统,实现疫情防控期间某村外出…

《完蛋!我被美女包围了》突然火了!世界首个开源贡献榜出炉丨 RTE 开发者日报 Vol.75

开发者朋友们大家好: 这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE (Real Time Engagement) 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…