0基础学习PyFlink——Map和Reduce函数处理单词统计

news2025/2/28 2:49:08

在很多讲解大数据的案例中,往往都会以一个单词统计例子来抛砖引玉。本文也不免俗,例子来源于PyFlink的《Table API Tutorial》,我们会通过几种方式统计不同的单词出现的个数,从而达到循序渐进的学习效果。

常规方法

# input.py
word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]

一般的思路我们是:

  1. 遍历这个list将每行用空格切割成独立单词,存储到一个新的list中
  2. 遍历步骤1产生的新的list,使用map记录统计结果,key是单词,value是次数
# common.py
from input import word_count_data

wordCount = dict()
for line in word_count_data:
    wordsOneline = line.split()
    for word in wordsOneline:
        wordCount.update({word:wordCount.get(word,0)+1})

print(wordCount)

{‘To’: 4, ‘be,’: 1, ‘or’: 1, ‘not’: 2, ‘to’: 7, ‘be,–that’: 1, ‘is’: 2, ‘the’: 15, ‘question:–’: 1, ‘Whether’: 1, “'tis”: 1, ‘nobler’: 1, ‘in’: 3, ‘mind’: 1, ‘suffer’: 1, ‘The’: 7, ‘slings’: 1, ‘and’: 7, ‘arrows’: 1, ‘of’: 14, ‘outrageous’: 1, ‘fortune’: 1, ‘Or’: 1, ‘take’: 1, ‘arms’: 1, ‘against’: 1, ‘a’: 5, ‘sea’: 1, ‘troubles,’: 1, ‘And’: 5, ‘by’: 2, ‘opposing’: 1, ‘end’: 2, ‘them?–To’: 1, ‘die,–to’: 2, ‘sleep,–’: 1, ‘No’: 2, ‘more;’: 1, ‘sleep’: 2, ‘say’: 1, ‘we’: 4, ‘heartache,’: 1, ‘thousand’: 1, ‘natural’: 1, ‘shocks’: 1, ‘That’: 3, ‘flesh’: 1, ‘heir’: 1, “to,–'tis”: 1, ‘consummation’: 1, ‘Devoutly’: 1, ‘be’: 1, “wish’d.”: 1, ‘sleep;–’: 1, ‘sleep!’: 1, ‘perchance’: 1, ‘dream:–ay,’: 1, “there’s”: 2, ‘rub;’: 1, ‘For’: 2, ‘that’: 3, ‘death’: 1, ‘what’: 1, ‘dreams’: 1, ‘may’: 1, ‘come,’: 1, ‘When’: 2, ‘have’: 2, ‘shuffled’: 1, ‘off’: 1, ‘this’: 2, ‘mortal’: 1, ‘coil,’: 1, ‘Must’: 1, ‘give’: 1, ‘us’: 3, ‘pause:’: 1, ‘respect’: 1, ‘makes’: 2, ‘calamity’: 1, ‘so’: 1, ‘long’: 1, ‘life;’: 1, ‘who’: 2, ‘would’: 2, ‘bear’: 2, ‘whips’: 1, ‘scorns’: 1, ‘time,’: 1, “oppressor’s”: 1, ‘wrong,’: 1, ‘proud’: 1, “man’s”: 1, ‘contumely,’: 1, ‘pangs’: 1, “despis’d”: 1, ‘love,’: 1, “law’s”: 1, ‘delay,’: 1, ‘insolence’: 1, ‘office,’: 1, ‘spurns’: 1, ‘patient’: 1, ‘merit’: 1, ‘unworthy’: 1, ‘takes,’: 1, ‘he’: 1, ‘himself’: 1, ‘might’: 1, ‘his’: 1, ‘quietus’: 1, ‘make’: 2, ‘With’: 2, ‘bare’: 1, ‘bodkin?’: 1, ‘these’: 1, ‘fardels’: 1, ‘bear,’: 1, ‘grunt’: 1, ‘sweat’: 1, ‘under’: 1, ‘weary’: 1, ‘life,’: 1, ‘But’: 1, ‘dread’: 1, ‘something’: 1, ‘after’: 1, ‘death,–’: 1, “undiscover’d”: 1, ‘country,’: 1, ‘from’: 1, ‘whose’: 1, ‘bourn’: 1, ‘traveller’: 1, ‘returns,–puzzles’: 1, ‘will,’: 1, ‘rather’: 1, ‘those’: 1, ‘ills’: 1, ‘Than’: 1, ‘fly’: 1, ‘others’: 1, ‘know’: 1, ‘of?’: 1, ‘Thus’: 1, ‘conscience’: 1, ‘does’: 1, ‘cowards’: 1, ‘all;’: 1, ‘thus’: 1, ‘native’: 1, ‘hue’: 1, ‘resolution’: 1, ‘Is’: 1, ‘sicklied’: 1, “o’er”: 1, ‘with’: 1, ‘pale’: 1, ‘cast’: 1, ‘thought;’: 1, ‘enterprises’: 1, ‘great’: 1, ‘pith’: 1, ‘moment,’: 1, ‘regard,’: 1, ‘their’: 1, ‘currents’: 1, ‘turn’: 1, ‘awry,’: 1, ‘lose’: 1, ‘name’: 1, ‘action.–Soft’: 1, ‘you’: 1, ‘now!’: 1, ‘fair’: 1, ‘Ophelia!–Nymph,’: 1, ‘thy’: 1, ‘orisons’: 1, ‘Be’: 1, ‘all’: 1, ‘my’: 1, ‘sins’: 1, “remember’d.”: 1}

上述的代码在一个双层for循环中简单粗暴的解决了问题。如果不给用双层for循环,则需要将其改成两个单层for循环

# common_1.py
from input import word_count_data

words = []
for line in word_count_data:
    words.extend(line.split())
        
wordCount = {}
for word in words:
    wordCount.update({word:wordCount.get(word,0)+1})
        
print(wordCount)       

如果不给显示的使用for循环,有什么办法呢?这儿我们就引入map和reduce。

Map

map(func, *iterables) --> map object
Make an iterator that computes the function using arguments from each of the iterables. Stops when the shortest iterable is exhausted.

简单来说,map会对传入的迭代器(第二个参数)执行处理方法(第一个参数),并将该方法的返回结果放入一个结构中,最后我们可以使用map返回的迭代器逐个访问计算结果。
举个例子:

import sys
source=[1,2,3,4,5,6]
iter=map(lambda x: x+1, source)
while True:
    try:
        print(next(iter))
    except StopIteration:
        sys.exit()

2
3
4
5
6
7

上例中我们给map的处理函数设置为一个匿名函数,它会返回每个遍历数字的自增1的值。
对应到我们单词统计的例子,我们可以使用下面代码,遍历word_count_data每行,然后将其用空格切分出list并返回。这样wordsLists就是“一个元素是一行单词list”的list的迭代器。

from input import word_count_data
wordsLists=map(lambda line: line.split(), word_count_data)

[
[‘To’, ‘be,’, ‘or’, ‘not’, ‘to’, ‘be,–that’, ‘is’, ‘the’, ‘question:–’],
[‘Whether’, “'tis”, ‘nobler’, ‘in’, ‘the’, ‘mind’, ‘to’, ‘suffer’],
……
]

Reduce

functools.reduce(function, iterable[, initializer])
Apply function of two arguments cumulatively to the items of iterable, from left to right, so as to reduce the iterable to a single value. For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates ((((1+2)+3)+4)+5). The left argument, x, is the accumulated value and the right argument, y, is the update value from the iterable. If the optional initializer is present, it is placed before the items of the iterable in the calculation, and serves as a default when the iterable is empty. If initializer is not given and iterable contains only one item, the first item is returned.

它等价于下面的代码

def reduce(function, iterable, initializer=None):
    it = iter(iterable)
    if initializer is None:
        value = next(it)
    else:
        value = initializer
    for element in it:
        value = function(value, element)
    return value

它和map的相同点是:

  • 都需要提供一个处理函数(第一个参数)
  • 处理函数都有一个返回值

不同点是:

  • 处理函数接受两个参数
  • 接受第三个参数作为初始返回数据

直接看一个例子。下面这个例子中匿名函数中y参数是source的某个遍历值;x最开始是初始值100,后来是匿名函数上次执行的返回值。这样下面的结果就相当于100+1+2+3+4+5+6。

from functools import reduce
source=[1,2,3,4,5,6]
r=reduce(lambda x,y: x+y, source, 100)
print(r)

121

对应到单词统计的例子。reduce方法可以将上面list中套list的结构“简化”为一层list。

words=reduce(lambda wordsAll,wordsOneline: wordsAll+wordsOneline, wordsLists, [])

words的值是

[‘To’, ‘be,’, ‘or’, ‘not’, ‘to’, ‘be,–that’, ‘is’, ‘the’, ‘question:–’, ‘Whether’, ……]

然后对这层list做计算,统计每个单词出现的次数,也“缩小”了words说表达的单词所占的“空间”。

wordCount=reduce(lambda wordCount,word: wordCount.update({word:wordCount.get(word,0)+1}) or wordCount, words, {})

{‘To’: 4, ‘be,’: 1, ‘or’: 1, ‘not’: 2, ‘to’: 7, ‘be,–that’: 1, ‘is’: 2, ‘the’: 15,……]

总体来说,map让输入数据被拆解(映射)到最小数据单元;reduce减少数据规模,并最终产出结果。
在这里插入图片描述

参考资料

  • https://docs.python.org/3.10/library/functools.html?highlight=reduce

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1107630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《Python入门核心技术》专栏总目录

❤️ 专栏名称:《Python入门核心技术》 🌸 内容介绍:基础篇、进阶篇、Web篇、网络爬虫、数据分析、数据可视化、自动化等,适合零基础和进阶的同学。 🚀 订阅专栏:订阅后可阅读专栏内所有内容,专…

易点易动设备管理系统:提升生产企业设备保养效率的利器

在现代生产企业中,设备保养是确保生产线稳定运行和产品质量的关键环节。然而,传统的设备保养方式往往面临效率低下、数据不准确等问题,影响了生产效率和竞争力。随着科技的进步,易点易动设备管理系统应运而生,以其智能…

【Netty专题】【网络编程】从OSI、TCP/IP网络模型开始到BIO、NIO(Netty前置知识)

目录 前言前置知识一、计算机网络体系结构二、TCP/IP协议族2.1 简介*2.2 TCP/IP网络传输中的数据2.3 地址和端口号2.4 小总结 三、TCP/UDP特性3.1 TCP特性TCP 3次握手TCP 4次挥手TCP头部结构体 3.2 UDP特性 四、总结 课程内容一、网络通信编程基础知识1.1 什么是Socket1.2 长连…

kkFileView源码编译并发布详细教程

文章目录 概述为啥要自己进行源码编译我不懂Java代码,可以编译吗为什么写这篇教程 废话不多说,下面是详细操作教程安装JDK安装Git安装Maven编译kkFileView源码 kkFileView安装和使用编译后获得安装包,进行解压修改配置文件执行在线安装&#…

开源贡献难吗?

本文整理自字节跳动 Flink SQL 技术负责人李本超在 CommunityOverCode Asia 2023 上的 Keynote 演讲,李本超根据自己在开源社区的贡献经历,基于他在贡献开源社区过程中的一些小故事和思考,如何克服困难,在开源社区取得突破&#x…

DNS压测工具-dnsperf的安装和使用(centos)

系统调优 系统调优脚本,保存为sh文件,chmod提权后执行即可 #!/bin/sh #系统全局允许分配的最大文件句柄数: sysctl -w fs.file-max2097152 sysctl -w fs.nr_open2097152 echo 2097152 > /proc/sys/fs/nr_open #允许当前会话 / 进程打开文…

1.JDK的安装方法以及环境变量的配置

学习Java的第一步应该从配置环境开始,这篇博文介绍了在哪下载安装包以及如何在windows电脑中配置,希望大家看完后可以独立安装 ~ 文章目录 一、下载安装包二、 安装路径配置三、 环境变量配置四、 验证是否配置成功 一、下载安装包 安装包可以从官网下载…

第三方软件测评单位可为企业带来哪些收益?

随着信息科技的发展,软件市场竞争也越来越大,软件企业为了更好的专注于产品开发,以及保障软件质量,会将软件测试交由第三方软件测评单位进行。 第三方软件测评,顾名思义,是由独立的、与软件开发商无关的专业…

Unity中Shader的XRay透视效果

文章目录 前言一、模拟菲涅尔效果1、获取 V 向量2、获取 N 向量3、点积输出效果4、模拟出菲涅尔效果(中间暗,周围亮) 二、实现 XRay 效果1、使用半透明排序、修改混合模式、加点颜色2、增加分层效果(使用 frac 函数,只取小数部分&…

CAPL如何实现27服务解锁

在文章《CANoe-如何实现27服务解锁》里,我们介绍了诊断控制台中如何实现27解锁,如果我想在CANoe中使用CAPL程序解锁的话,又要如何实现呢? CAPL脚本也是通过模拟手动操作来实现27解锁,所以步骤为: 发送10 03发送27 01接收67 01,获取seed值根据seed值和算法,计算出key值…

自5月以来,俄罗斯Sandworm黑客侵入了11家乌克兰电信公司

导语:据乌克兰计算机应急响应团队(CERT-UA)的最新报告称,自2023年5月至9月,俄罗斯政府支持的黑客组织Sandworm成功侵入了乌克兰的11家电信服务提供商。这一组织被认为与俄罗斯武装部队的GRU有关。 简介 根据乌克兰计算…

提高研发效率还得看Apipost

随着数字化转型的加速,API(应用程序接口)已经成为企业间沟通和数据交换的关键。而在API开发和管理过程中,API文档、调试、Mock和测试的协作显得尤为重要。Apipost正是这样一款一体化协作平台,旨在解决这些问题&#xf…

阿里内推强推的并发编程学习笔记,原理+实战+面试题,面面俱到!

并发编程 谈到并发编程,可能很多人都有过经验,甚至比我了解的更多。 那么并发与并行的区别又是什么? 并发编程是编程中的核心问题,实践中,当人们希望利用计算机处理一些现实世界问题,以及希望同时处理多…

使用DelayQueue的实现延时任务

1、背景 项目中经常会用到类似一些需要延迟执行的功能,比如缓存。java提供了DelayQueue来很轻松的实现这种功能。Delayed接口中的getDelay方法返回值小于等于0的时候,表示时间到达,可以从DelayQueue中通过take()方法取的到期的对象。到期对象…

电液比例负载控制变量泵PQ放大器

对驱动执行元件,仅供应所需最小限度的压力、流量的节能型泵控制系统。与专用功率放大器配合使用。流量和全截流压力按功率放大器的输入电流成比例地进行控制。在实际应用中,该控制系统与专用功率放大器配合使用,可以根据实际需求,…

Python学习基础笔记七十五——Python调用其他程序

Python经常被用来开发自动化程序。自动化程序往往需要调用其他的程序。 例如,我们可以代码中调用wget程序,而不是自己开发下载的代码。 这就是我们经常做的,在我们的Python程序中调佣其它程序,帮我们实现功能。 Python中调用外部…

python项目之网上商城的设计与实现(vue+django)

项目简介 网上商城的设计与实现实现了以下功能: 网上商城系统的开发设计按照系统应用功能划分分为了前端用户和后台用户。 通过网站平台实现首页商品信息的查看,同时可以结合需求进行在线的商品信息搜索,也可以按照系统首页提供的商品类别按…

什么是CSGO大行动,2023年CSGO大行动时间预测

什么是CSGO大行动,2023年CSGO大行动时间预测 什么是CSGO大行动,2023年CSGO大行动时间预测 那天群里在提大行动,不明所以的新同学在问,什么是大行动,是不是官方红锁大行动要来了?当然不是,别自己…

Windows/Linux系统ftp服务器搭建

文章目录 一、Windows系统ftp服务器搭建二、Linux系统ftp服务器搭建二、安装完成测试 一、Windows系统ftp服务器搭建 系统是2008r2,全图,按照图一步一步点就行了 找个有telnet的电脑测试一下端口,windows的ftp默认端口是21 返回…

常用的自动化测试框架

无论是在自动化测试实践,还是日常交流中,经常听到一个词:框架。之前学习自动化测试的过程中,一直对“框架”这个词知其然不知其所以然。 最近看了很多自动化相关的资料,加上自己的一些实践,算是对“框架”…