0基础学习PyFlink——使用PyFlink的SQL进行字数统计

news2025/1/9 1:45:41

在《0基础学习PyFlink——Map和Reduce函数处理单词统计》和《0基础学习PyFlink——模拟Hadoop流程》这两篇文章中,我们使用了Python基础函数实现了字(符)统计的功能。这篇我们将切入PyFlink,使用这个框架实现字数统计功能。

PyFlink安装

安装Python

sudo apt install python3.10
sudo ln -s /usr/bin/python3.10 /usr/bin/python

安装虚拟环境

sudo apt install python3.10-venv

创建工程所在文件夹,并创建虚拟环境

mkdir pyflink-test
cd pyflink-test
python -m venv .env

进入虚拟环境,并安装PyFlink

source .env/bin/activate
pip3.10 install apache-flink

统计代码

Flink为开发者提供了如下不同层级的抽象。本篇我们将尽量使用SQL来实现功能。
在这里插入图片描述

创建环境

执行环境用于设置任务的属性(batch还是stream),以及一些运行时参数(parallelism.default等)。
和Hadoop不同的是,Flink是流批一体(既可以处理流,也可以处理批处理)的引擎,而前者是批处理引擎。
批处理很好理解,即给一批数据,我们一次性、成批处理完成。
而流处理则是指,数据源源不断进入引擎,没有尽头。
本文不对此做过多展开,只要记得本例使用的是批处理模式(in_batch_mode)即可。

import argparse
import logging
import sys

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)

def word_count(input_path):
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)

Source

在前两篇文章中,我们使用内存中的常规结构体,如dict等来保存Map过后的数据。而本文介绍的SQL方式,则是通过Table(表)的形式来存储,即输入的数据会Map到一张表中

    # define the source
    my_source_ddl = """
            create table source (
                word STRING
            ) with (
                'connector' = 'filesystem',
                'format' = 'csv',
                'path' = '{}'
            )
        """.format(input_path)
    t_env.execute_sql(my_source_ddl).print()
    tab = t_env.from_path('source')

这张表只有一个字段——String类型的word。它用于记录被切分后的一个个字符串。
这儿有个关键字with。它可以用于描述数据读写相关信息,即完成数据读写相关的设置。
connector用于指定连接方式,比如filesystem是指文件系统,即数据读写目标是一个文件;jdbc则是指一个数据库,比如mysql;kafka则是指一个Kafka服务。
format用于指定如何把二进制数据映射到表的列上。比如CSV,则是用“,”进行列的切割。

Execute

    # execute insert
    my_select_ddl = """
        select word, count(1) as `count`
        from source
        group by word
    """
    t_env.execute_sql(my_select_ddl).wait()

上述SQL我们按source表中的word字段聚类,统计每个字符出现的个数。
完整输出如下

Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
OK
+--------------------------------+----------------------+
|                           word |                count |
+--------------------------------+----------------------+
|                              A |                    3 |
|                              B |                    1 |
|                              C |                    2 |
|                              D |                    2 |
|                              E |                    1 |
+--------------------------------+----------------------+
5 rows in set

完整代码

# sql_print.py
import argparse
import logging
import sys

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment)

def word_count(input_path):
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)

    # define the source
    my_source_ddl = """
            create table source (
                word STRING
            ) with (
                'connector' = 'filesystem',
                'format' = 'csv',
                'path' = '{}'
            )
        """.format(input_path)
    t_env.execute_sql(my_source_ddl).print()
    tab = t_env.from_path('source')
    
    my_select_ddl = """
        select word, count(1) as `count`
        from source
        group by word
    """
    t_env.execute_sql(my_select_ddl).print()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input)

测试的输入文件

“A”,
“B”,
“C”,
“D”,
“A”,
“E”,
“C”,
“D”,
“A”,

运行的指令是

python sql_print.py --input input1.csv

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1125682.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深入解析i++和++i的区别及性能影响

在我们编写代码时,经常需要对变量进行自增操作。这种情况下,我们通常会用到两种常见的操作符:i和i。最近在阅读博客时,我偶然看到了有关i和i性能的讨论。之前我一直在使用它们,但从未从性能的角度考虑过,这…

微信小程序——后台交互

目录 后台准备 pom.xml 配置数据源 整合mtbatis 前后端交互 method1 method2 后台准备 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org…

第二章 16位微处理器

考点1&#xff1a;功能结构类型、寄存器结构的类型 8088/86寄存器的总结 有8个8位、8个16位通用寄存器 有6个状态标志和3个控制标志 将1MB存储空间分段管理,有4个段寄存器【CPU中】&#xff0c;对应程序中4种逻辑段【段首地址】 默认【DS】的情况允许改变,需要使用段超越前…

nginx安装详细步骤和使用说明

下载地址&#xff1a; https://download.csdn.net/download/jinhuding/88463932 详细说明和使用参考&#xff1a; 地址&#xff1a;http://www.gxcode.top/code 一 nginx安装步骤&#xff1a; 1.nginx安装与运行 官网 http://nginx.org/1.1安装gcc环境 # yum install gcc-c…

k8s-----9、pod 影响调度的因素

资源限制和节点选择器 1、pod资源限制2、节点选择器&#xff08;pod属性&#xff09;3、 节点亲和性(nodeAffinity)&#xff08;pod属性&#xff09;3.1 硬和软亲和性3.2 反亲和性 4、污点&#xff08;taint&#xff09;和污点容忍&#xff08;节点属性&#xff09;4.1 定义4.2…

Janus: Data-Centric MoE 通讯成本分析(2)

文章链接&#xff1a;Janus: A Unified Distributed Training Framework for Sparse Mixture-of-Experts Models 发表会议: ACM SIGCOMM 2023 (计算机网络顶会) 系统学习&#xff1a;Janus: 逆向思维&#xff0c;以数据为中心的MoE训练范式&#xff08;1&#xff09; 目录 前…

【Javascrpt】比较,逻辑运算符

目录 比较运算符 逻辑运算符 &&(与&#xff09; ||&#xff08;或&#xff09; 两真&#xff08;||左侧为真&#xff0c;||右侧为真&#xff09; 两假&#xff08;||左侧为假&#xff0c;右侧为假&#xff09; 一真一假&#xff08;||一侧为假&#xff0c;另一侧为…

接口自动化必学的20个难点,学完至少涨5k

难点1&#xff1a;接口文档的不完整性 当我开始设计接口自动化测试用例时&#xff0c;我发现接口文档非常不完整。有些必要的字段没有说明&#xff0c;有些接口没有文档&#xff0c;这给我带来了很大的困难。 解决方案&#xff1a;与开发人员进行沟通&#xff0c;尽可能补充接口…

【Java集合类面试十】、HashMap中的循环链表是如何产生的?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;HashMap中的循环链表是如…

单片机矩阵键盘

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是矩阵键盘&#xff1f;1.独立键盘2.矩阵键盘变化1变化2变化3 3. 通过变型&#xff0c;举一反三&#xff0c;就可以实现4*4的矩阵键盘扫描 二、使用步骤…

一文教你学会使用Cron表达式定时备份MySQL数据库

各位小伙伴大家好&#xff0c;今天我就来讲述一下作为一个运维&#xff0c;如何解放自己的双手去让服务器定时备份数据库数据&#xff0c;防止程序操作数据库出现数据丢失。 mysql_dump_script.sh脚本文件 #!/bin/bash#保存备份个数&#xff0c;备份7天数据 number7 #备份保存…

使用Kali进行实验---主机发现

主机发现 【实训目的】 掌握主机扫描的工作原理&#xff0c;学会使用ping等扫描工具&#xff0c;发现网络当中活跃的主机。 【场景描述】 在虚拟机环境下配置4个虚拟系统“Win XP1” “Win XP2” “Kali Linux”和“Metasploitable2”&#xff0c;使得4个系统之间能够相互通…

数据结构与算法第一课

文章目录 数据结构什么是数据结构数据结构相关单位 物理结构与逻辑结构物理结构逻辑结构 算法一个案例认识算法的重要性时间复杂度与空间复杂度时间复杂度事前计算时间复杂度与事后判断时间复杂度函数的渐进式变化常见函数的时间复杂度最坏时间与平均时间 空间复杂度 先看下目录…

使用vite搭建前端项目

1、在vscode 终端那里执行创建前端工程项目&#xff0c;其中shop-admin为项目名称&#xff1a; npm init vite-app shop-admin 提示如需安装其他依赖执行npm install ....,否则忽略(第三步再讲)。 2、执行npm run dev 命令直接运行创建好的项目&#xff0c;在浏览器打开链接…

变量的引用

1、变量 和 数据 都是保存在 内存 中的 2、在 Python 中 函数的参数传递以及返回值都是靠引用传递的 引用的概念 在 Python 中变量 和 数据 是分开存储的&#xff1b;数据 保存在内存中的一个位置&#xff1b;变量 中保存着数据在内存中的地址&#xff1b;变量 中 记录数据的…

初步认识Java

文章目录 一、什么是Java1、Java语言的重要性2、Java语言的发展史3、Java语言的特性 二、初识Java的main方法1、main方法示例2、运行Java程序3、JDK、JRE、JVM之间的关系 三、注释四、标识符 一、什么是Java Java是一种优秀的程序设计语言&#xff0c;它具有令人赏心悦目的语法…

依靠继承与聚合,实现maven搭建分布式项目

简介聚合 对于复杂的Maven项目&#xff0c;一般建议采用多模块的方式来设计开发&#xff0c;便于后期维护管理。但是构建项目时&#xff0c;如果每次都需要按模块一个一个进行构建会十分麻烦&#xff0c;而Maven的聚合功能就可以很好的解决这个问题&#xff0c;当用户对聚合模…

软件企业知识库应用场景?如何搭建软件企业知识库?

想要减少人工干预、减少不必要的时间和人力成本、快速获取准确信息……这些应用场景对于我们企业来说是非常渴望在短期内实现的。 软件企业知识库 因为传统知识库仅仅是存储&#xff1a;知识只是“存储”&#xff0c;根本用不起来&#xff0c;缺乏有效的管理方式和储存载体&am…

【项目设计】网络对战五子棋(下)

我不再装模作样地拥有很多朋友&#xff0c;而是回到了孤单之中&#xff0c;以真正的我开始了独自的生活。有时我也会因为寂寞而难以忍受空虚的折磨&#xff0c;但我宁愿以这样的方式来维护自己的自尊&#xff0c;也不愿以耻辱为代价去换取那种表面的朋友。 文章目录 一、项目设…

理解Hash表

注&#xff1a;本文翻译自 https://www.baeldung.com/cs/hash-tables 1 介绍 有效管理数据的技术是计算机科学的传统热点。除了存储数据之外&#xff0c;从存储中高效地恢复数据是另一个相关问题。 即使使用最好的算法处理某些特定的数据&#xff0c;如果没有优化数据管理&a…