数据处理脚手架PyODPS入门体验

news2024/12/28 19:13:39

ace9ad1ab3e01fe687fe7e6ec1e8ad4c.gif

本文分享了初次使用PyODPS(Python版的Open Data Processing Service)的心路历程。作者通过实际案例,深入浅出地探讨了PyODPS相较于传统ODPS SQL在数据处理上的灵活性与便捷性,特别是在处理复杂JSON字段统计与多条件筛选方面展现出的独特优势。同时,文章诚实地指出了PyODPS学习曲线陡峭、运行效率较低及文档细节需完善等不足。借助一系列代码示例,作者不仅揭示了PyODPS中DataFrame操作的精髓,还贴心地总结了调试技巧与最佳实践,为读者搭建起一套实用的数据处理脚手架。

96b5e86dd87c8e89b2a83a8c86e1ecaa.png

背景介绍

刚开始接触ODPS时,最初有一个需求比较简单,通过ODPS SQL的方式很快得到了解决。

不过最近收到了一个稍微棘手一点的数据处理需求:

  • 统计某些展厅的uv,展厅商品的uv,计算一个比例

  • 统计展厅中某一个json字段内,满足某些条件的数量统计

这里先总结一下PyODPS的优势

  • 灵活的row handle,能灵活地进行数据处理。事实上,需求中也需要对一个json对象进行统计分析,这点上用SQL会非常痛苦。

  • 可以全量加载内容比较少的表、文件资源,降低表处理逻辑上的复杂性。而SQL在这点上没有优势,只能疯狂的join。

  • 优秀的可配置能力,比如说在我这个需求中出现了需要hardCode配置的多关键字过滤

  • 复用SQL处理逻辑,在我的场景里,我需要统计总的比例,与最近15天的比例。但统计逻辑是一样的,不同的是数据的范围~

劣势也很明显:

  • 基本就是写SQL的思路写python。DataFrame基本就是以SQL的表达对数据处理的封装。

  • 运行贼慢,每次调试时间很久。

  • 不能说文档不全面,但是很多语法编译都能过,实际运行没效果。

针对pyodps与python的区别, 我用一段条件判断代码来做个解释:

# 这段是生效的,最后的sql,包含where key in (?) and source == "A"
uv_table = visit_table[
    visit_table.key.isin(target_key_list) \
    & (visit_table.source == "A")
].groupby(visit_table.target_id)
# 这段是有问题的。最后的sql,只有where source == "A"
uv_table = visit_table[
    visit_table.key.isin(target_key_list)
    and (visit_table.source == "A")
].groupby(visit_table.target_id)
# 这段也是ok的,看起来是官方文档中推荐的写法
uv_table = visit_table[
    visit_table.key.isin(target_key_list) & (visit_table.source == "A")
].groupby(visit_table.target_id)
# 这段也是ok的,这里换行没有任何问题,也就是说\可加可不加。
uv_table = visit_table[
    visit_table.key.isin(target_key_list)
    & (visit_table.source == "A")
].groupby(visit_table.target_id)
# 这段就是不行的,会丢失in。对应到SQL就是 where true and source == "A"
uv_table = visit_table[
    visit_table.key in target_key_list & (visit_table.source == "A")
].groupby(visit_table.target_id)

上面的代码示例,全部都可以正常编译且执行,但是从结果上来说却大有不同:

  • 在PyOdps对象中,使用了python语言特性的判断条件,如a in a_list、a is None这类逻辑均不会生效,会被忽略。取而代之,应该使用a.isin(a_list)、a.isnull()这样的pyodps方法。

所以先解释下为啥拿判断条件开头:已经被坑了n次了,编译全过,运行完成,但结果却经常没生效某一些条件,导致来来回回全文检查。甚至我感觉这个是目前来说最容易踩坑的点。

最后推荐的判断条件写法如下:

uv_table = visit_table[
    (visit_table.key.isin(target_key_list))
    & (visit_table.source == "A")
].groupby(visit_table.target_id)

每个判断条件均用()包裹,并换行or不换行&与、|或、~非,分割条件。

从这个点延伸开,我们已经发现了,PyODPS中,有两种思路。一种是面向DataFrame而另一种则是面向纯Python。

正常来说通篇均为面向DataFrame,除了以下情况:

  • 通过TableReader、table.head(10)等方法将表数据读取为python的list对象数据。后续的处理逻辑均需要用python去解决。

  • @output代码处理逻辑,全部为python的能力去解决。

  • class Agg,这种自定义聚合代码,均为Python的逻辑进行处理。

而所有与DataFrame相关的逻辑,都必须查文档来处理,比如说对json的处理,我们就需要使用df.func.get_json_obj(table_name.field),而不能使用python的json.loads()。

数据的空判断则需要用a.isnull()或者a.notnull()等方法。

pyodps文档:https://pyodps.readthedocs.io/zh-cn/stable/api-df.html

写完了脚本回来一看就有种理所当然的感觉~不得不说设计上还是巧妙的。

但是这里不得不提一个点:

  • PyODPS如果用了错误的方式调用,则也不会错误,必须仔细检查我们的SQL。是否达到我们预期的想法。

所以调试我们的PyODPS,就是重中之重

同时,对于去重来说,官方文档的方法好像是有问题的。

# 这段只会提示no field in XXXXGroupBySequence(具体是啥记不住了)
show_room_uv = show_room_uv.agg(show_room_uv=show_room_uv.visitor_id.unique())


# 反复验证后,正确的去重计数是nunique()

吐槽结束,接下来开始本期的重点。

PyODPS开发的基本脚手架

咱们的这个数据处理的功能非常适合以一个基础的脚手架起步~
这里我根据自己的开发经验总结了一个:

from odps.df import DataFrame, Scalar, func, output


# args也是一个内置对象,就是我们在调度配置中的参数
bizdate = args["bizdate"]


output_table = "xxxx"
# 加载我们的数据表。o是一个内置对象。也有o.get_table("xxx").to_df()的写法。
data_process_table = DataFrame(o.get_table("xxxx"))


# 加载我们的数据
import json
filters_words = []
# filters_words.txt就是我们放在MaxCompute -> 资源下的文件。
with o.get_resource('filters_words.txt').open('r', encoding='utf-8') as f:
    filters_words = json.loads(f.read())
# 这里就是odps语法了,这里等同于 where content in (?, ?)。包括说content is null,就是content.isnull()。
# 在DataFrame的范围内,需要遵从官方的API。
data_process_table[
    data_process_table.content.isin(filters_words)
]
# 如果要like怎么处理呢?
data_process_table = data_process_table.query(
    " or ".join([f"content.contains('{x}')" for x in filters_words])
)


@output(["content_len"], ["int64"])
def handle(row):
    # 这里是按行处理数据。如果要做reduce之类的多行处理,要通过agg自定义聚合的逻辑。
    # python的数据处理
    yield len(row.content)


# 很有意思的列处理,这个操作相当于handle处理完多了一列content_len。
# 另外我们可以理解每次[]处理完之后,是拿到了一个新的DataFrame对象。
res_t = data_process_table[
    data_process_table,
    data_process_table.apply(handle, axis = 1)
]
# 这一部分是后补的,纯手撕代码。
class Agg(object):
    def buffer(self):
        # 定义你心仪的聚合结果对象。自定义聚合的本质就是将结果加到这个buffer对象里
        return {
            "merge_length": 0
        }
    def __call__(self, buffer, content_len):
        if content_len is not None:
            # 当前聚合对象数据合并。数据被分成了无数个小片,这是其中一片的n条数据聚合
            buffer["merge_length"] += content_len


    def merge(self, buffer, pbuffer):
        # 和其他的聚合对象进行合并~
        buffer["merge_length"] += pbuffer["merge_length"]


    def getvalue(self, buffer):
        return buffer["merge_length"]
# 调用聚合方法
to_agg = agg(
        [
            # output输出的新字段,我们作为聚合的value去处理
            res_t.content_len
        ],
        Agg,
        rtype="int64",
    )
# 用id去做聚合,对content_len的值进行运算,最后输出一个新字段value
res_t = res_t.groupby("id").agg(value=to_agg)
# 此时res_t的列有 id、value,两个字段。
# 调试用,看看数据,最后换成persist持久化到output表里。
res_t.head(10)
# 最后要写数据库了,直接用下面的方法.
# res_t.persist(output_table, partition=f"ds='{bizdate}'", drop_partition=True, create_partition=True)

在总结脚手架的时候,不得不说PyODPS是一个精妙的设计,估计是再也回不去写SQL的日子了。

PyODPS核心思想就两点:

  • 在DataFrame中做列处理和聚合。删除列,按条件过滤,整列计算。

  • 在handle中做行处理,同时定义按行处理后的输出列。难以分析的学习,直接用代码分析~

核心文档,写的过程中还是需要不断借鉴:

  • 列运算

  • 聚合操作,里面的unique应该是过期了,用nunique。一旦聚合后就是一个GroupBy对象,需要调用agg对聚合结果处理后,回到DataFrame

另外还得吐槽一句,确实很难写。

# 看着是不是没问题。直接报错.agg Syntax Error
closely_count_table = data_process_table.groupby('content_len')
.agg(closely_count = data_process_table.content_len)

这个写法里有两个问题:

  • 第一个是我怎么都摸不清的换行问题,即使不是这个情况,有的时候换行就会解析不了,包括条件判断。

  • 第二个呢,就是对象问题,agg函数的入参应该是一个GroupBy对象,而不是DataFrame。

但是,自定义聚合连着写又没啥问题,只能说最终解释权都在PyOdps。所以这里这么写是最保险的。

closely_count_table = data_process_table.groupby('content_len')
closely_count_table = closely_count_table.agg(closely_count = closely_count_table.content_len)

即使同为DataFrame也有一样的问题,不要妄想用多个[][]来完成多次处理。第一个[]内可以用当前的DataFrame,但第二个[]就不一样了,它需要的是第一个[]返回的DataFrame对象。举个例子:

# 过滤了content_len小于等于10的数据,并输出content.
# 但这个大概率是错的。虽然我没试过。
data_process_table = data_process_table[
    data_process_table.content_len > 10
][
    data_process_table.content
]
# 你可以这么写,因为过滤content_len的DataFrame仍然有content字段,通过字符是可以取出来的。
data_process_table = data_process_table[
    data_process_table.content_len > 10
]["content"]
# 保守写法
data_process_table = data_process_table[data_process_table.content_len > 10]
data_process_table = data_process_table[data_process_table.content]

关于list type:

@output(
    [
        "list_value"
    ],
    ["list<string>"]
)
def handle_list_type(row):
    yield [["test1", "test2"]]

试了很多次才得到这个结果。看到结果的瞬间一下次就想明白了。

用这个例子做个解释:

@output(
    [
        "int_value",
        "string_value"
    ],
    ["int64", "string"]
)
def handle_list_type(row):
    yield 10, "test"

这里的10, "test"是一个元组,恐怕用了list()之类的方法对返回进行了包装。

我最初直接返回["test1", "test2"]的情况下,等同于返回2个string。

所以必须再包一层。想明白了这个原理,那么下面的写法会更加优雅:

@output(
    [
        "list_value"
    ],
    ["list<string>"]
)
def handle_list_type(row):
    res = ["test1", "test2"]
    yield res, #这里有一个逗号
结语

PyODPS的列处理与聚合功能、行处理自定义逻辑,为大数据处理提供了新的视角和工具,让作者乃至更多开发者在告别纯SQL编写的同时,开启了数据处理的新篇章。总之,拥抱变化,勇于实践,PyODPS的潜力等待着每一位数据工程师去挖掘。

bf7e13a66fa46d5299256e1b292d965f.png

团队介绍

我们是淘天集团的场景智能技术团队,作为一支专注于通过AI和3D技术驱动商业创新的技术团队, 依托大淘宝丰富的业务形态和海量的用户、数据, 致力于为消费者提供创新的场景化导购体验, 为商家提供高效的场景化内容创作工具, 为淘宝打造围绕家的场景的第一消费入口。我们不断探索并实践新的技术, 通过持续的技术创新和突破,创新用户导购体验, 提升商家内容生产力, 让用户享受更好的消费体验, 让商家更高效、低成本地经营。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2130479.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于verilog的流水线CPU(无中断异常)

突然发现这个还没传&#xff0c;懒得写了&#xff0c;直接把实验报告传上来吧。 流水线CPU实验报告 ​ 本实验最终实现了11条指令&#xff0c;完成了2位控制的分支预测&#xff0c;以及load-use的阻塞&#xff0c;jump的阻塞&#xff0c;beq预测失败的阻塞&#xff0c;还有RA…

《2024中国数据要素产业图谱2.0版》重磅发布

数据猿出品 本次“数据猿2024年度三大媒体策划活动——《2024中国数据要素产业图谱2.0版》”的发布&#xff0c;下一次版本迭代将于2024年12月底发布2024年3.0版&#xff0c;敬请期待&#xff0c;欢迎报名。 大数据产业创新服务媒体 ——聚焦数据 改变商业 随着技术不断革新&a…

C++函数在库中的地址

本文讲述C如何直接调用动态库dll或者so中的函数。 首先我们准备一个被调用库&#xff0c;这个库里面有两个函数&#xff0c;分别是C98 与 C11 下的&#xff0c;名称是run2和run1。 被调用库 相关介绍请看之前的文章《函数指针与库之间的通信讲解》。 //dll_ex_im.h #ifndef…

【通俗理解】二项分布的均值与方差——从成功与失败的概率看分布

【通俗理解】二项分布的均值与方差——从成功与失败的概率看分布 关键词提炼 #二项分布#均值#方差#成功概率#失败概率#伯努利试验 公式解释与案例 二项分布的基本公式 二项分布描述的是在n次独立重复的伯努利试验中&#xff0c;成功次数的概率分布。每次试验的成功概率为p&…

【Android安全】Ubuntu 16.04安装GDB和GEF

1. 安装GDB sudo apt install gdb-multiarch 2. 安装GEF(GDB Enhanced Features) 官网地址&#xff1a;https://github.com/hugsy/gef 2.1 安装2021.10版本 但是在Ubuntu 16.04上&#xff0c;bash -c "$(curl -fsSL https://gef.blah.cat/sh)"等命令不好使&…

如何用 OBProxy 实现 OceanBase 的最佳路由策略

引言 OBProxy&#xff0c;即OceanBase Database Proxy&#xff0c;也简称为ODP&#xff0c;是 OceanBase数据库的专属服务代理。通过应用OBProxy&#xff0c;由后端OceanBase集群的分布式特性所带来的复杂性得以屏蔽&#xff0c;从而使得访问分布式数据库的体验如同访问单机数…

linux上用yolov8训练自己的数据集(pycharm远程连接服务器)

pycharm如何远程连接服务器&#xff0c;看之前的文章 首先去GitHub上下载项目地址&#xff0c;然后下载预训练模型放到项目主目录下 然后下载数据集&#xff0c;我这有个推荐的数据集下载网站&#xff0c;可以直接下载yolov8格式的数据集&#xff08;还支持其他格式的数据集&a…

进程间通信-命名管道

目录 原理 代码 简单通信 回归概念 原理 mkfifo 是 Linux 系统中的一个命令&#xff0c;用于创建命名管道&#xff08;named pipe&#xff09;&#xff0c;也称为 FIFO&#xff08;First In, First Out&#xff09;。命名管道是一种特殊类型的文件&#xff0c;用于进程间通…

从0到1!本地部署一个大语言模型!完整方法!

要想从零开始部署一个**大语言模型&#xff08;LLM&#xff09;**到本地&#xff0c;不仅仅是硬件上安装软件包&#xff0c;还需要对模型选择、优化和应用搭建有一定的理解。下面是一份完整教程&#xff0c;手把手带你走过如何在本地环境中部署LLM。 1. 了解部署需求与硬件准备…

交换机链路聚合

一、概述 通过链路聚合&#xff0c;可以提高链路的可靠性&#xff0c;提升链路带宽。链路具有一般有手工模式和LACP模式。 二、拓扑图 三、实操演练 1、手工模式 手工模式一般用于老旧、低端设备。 缺点&#xff1a; &#xff08;1&#xff09;为了使链路聚合接口正常工作…

brew install node提示:Error: No such keg: /usr/local/Cellar/node

打开本地文件发现Cellar目录下无法生成 node文件&#xff0c;应该是下载时出现问题&#xff0c;重复下载无法解决问题&#xff0c;只能重新安装brew。 步骤1&#xff08;安装 brew&#xff09;&#xff1a; /bin/zsh -c “$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/ra…

打造高效实时数仓,从Hive到OceanBase的经验分享

本文作者&#xff1a;Coolmoon1202&#xff0c;大数据高级工程师&#xff0c;专注于高性能软件架构设计 我们的业务主要围绕出行领域&#xff0c;鉴于初期采用的数据仓库方案面临高延迟、低效率等挑战&#xff0c;我们踏上了探索新数仓解决方案的征途。本文分享了我们在方案筛选…

uniapp离线(本地)打包

安卓离线打包 注意&#xff1a;jdk建议选择1.8 下载Android Studio配置gradle仓库地址 第一步&#xff1a;先下载对应的版本&#xff0c;进行压缩包解压 第二步&#xff1a;在电脑磁盘&#xff08;D盘&#xff09;&#xff0c;创建文件夹存放压缩包并进行解压&#xff0c;并创…

8.8canny算子检测

目录 实验原理 示例代码 运行结果 实验原理 在OpenCV中&#xff0c;Canny边缘检测是一种广泛使用的边缘检测算法。它是由John F. Canny在1986年提出的&#xff0c;并且因其性能优良而被广泛应用。在OpenCV中&#xff0c;Canny边缘检测是通过Canny函数实现的。 函数原型 v…

【爬虫软件】小红书按关键词批量采集笔记,含笔记正文、转评赞藏等!

一、背景介绍 1.1 爬取目标 熟悉我的小伙伴都了解&#xff0c;我之前开发过2款软件&#xff1a; 【GUI软件】小红书搜索结果批量采集&#xff0c;支持多个关键词同时抓取&#xff01; 【GUI软件】小红书详情数据批量采集&#xff0c;含笔记内容、转评赞藏等&#xff01; 现在…

HuggingFists算子能力扩展-PythonScript

HuggingFists作为一个低代码平台&#xff0c;很多朋友会关心如何扩展平台算子能力。扩展平台尚不支持的算子功能。本文就介绍一种通过脚本算子扩展算子能力的解决方案。 HuggingFists支持Python和Javascript两种脚语言的算子。两种语言的使用方式相同&#xff0c;使用者可以任选…

C++速通LeetCode简单第3题-相交链表

简单解&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode(int x) : val(x), next(NULL) {}* };*/ class Solution { public:ListNode *getIntersectionNode(ListNode *headA, ListNode *headB) {Li…

ACL-latex模板中参考文献出现下划线---由于宏包的冲突

% \usepackage{ulem} %加入后造成参考文献有下划线&#xff0c;正常情况是没有的。 别的包也可能造成此情况&#xff0c;可以仔细检查。 如下图所示&#xff1a; \usepackage{ulem}在LaTeX中的作用主要是提供了一系列用于文本装饰和强调的命令。ulem宏包由Donald Arseneau…

移动订货小程序哪个好 批发订货系统源码哪个好

订货小程序就是依托微信小程序的订货系统&#xff0c;微信小程序订货系统相较于其他终端的订货方式&#xff0c;能够更快进入商城&#xff0c;对经销商而言更为方便。今天&#xff0c;我们一起盘点三个主流的移动订货小程序&#xff0c;看看哪个移动订货小程序好。 第一、核货宝…

Redis搭建集群

功能概述 Redis Cluster是Redis的自带的官方分布式解决方案&#xff0c;提供数据分片、高可用功能&#xff0c;在3.0版本正式推出。 使用Redis Cluster能解决负载均衡的问题&#xff0c;内部采用哈希分片规则&#xff1a; 基础架构图如下所示&#xff1a; 图中最大的虚线部分…