一、说明
STream 处理支持在传输中和存储之前对数据进行实时分析,并且可以是有状态的,也可以是无状态的。
有状态流处理用于实时建议、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。
无状态流处理用于内联转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。
总体而言,数据流在工业中被广泛使用,并且可以应用于欺诈检测、患者监控或事件预测维护等用例。
二、 数据流必须考虑关键是数据的质量
与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同,流数据需要持续监视。
在从收集到馈送下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织来说,糟糕的数据质量的成本可能很高。
在本文中,我们将向您展示如何结合以分析和提高流媒体流的质量!bytewax
ydata-profiling
三、使用 Bytewax 为数据专业人员提供流处理
是专门为Python开发人员设计的OSS流处理框架。
它允许用户构建具有类似于Flink,Spark和Kafka Streams功能的流数据管道和实时应用程序,同时提供友好和熟悉的界面以及与Python生态系统的100%兼容性。
使用内置连接器或现有的 Python 库,您可以连接到实时和流数据源(Kafka、RedPanda、WebSocket 等),并将转换后的数据写入各种下游系统(Kafka、拼花地板文件、数据湖等)。
对于转换,Bytewax 通过映射、窗口和聚合方法促进有状态和无状态转换,并具有恢复和可伸缩性等熟悉的功能。
Bytewax 促进了 Python 优先和以数据为中心的数据流体验,并且是为数据工程师和数据科学家构建的。它允许用户构建流数据管道和实时应用程序,并创建满足其需求所需的自定义项,而无需学习和维护基于 JVM 的流平台,如 Spark 或 Flink。
Bytewax 非常适合许多用例,即为生成 AI 嵌入管道、处理数据流中的缺失值、在流上下文中使用语言模型来理解金融市场等等。有关用例灵感和更多信息,如文档、教程和指南,请随时查看字节蜡网站。
四、为什么要对数据流进行数据剖析?
数据剖析是成功启动任何机器学习任务的关键,指的是彻底了解数据的步骤:其结构、行为和质量。
简而言之,数据分析涉及分析与数据格式和基本描述符相关的方面(例如,样本数量、特征的数量/类型、重复值)、其内在特征(例如存在缺失数据或不平衡的特征)以及在数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致的特征)。
确保高数据质量标准对所有领域和组织都至关重要,但对于使用输出连续数据的域运营的领域尤其重要,其中情况可能会快速变化,可能需要立即采取行动(例如,医疗保健监测、股票价值、空气质量政策)。
对于许多领域,从探索性数据分析的角度使用数据分析,考虑存储在数据库中的历史数据。相反,对于数据流,数据分析对于沿流持续验证和质量控制变得至关重要,需要在流程的不同时间范围或阶段检查数据。
通过将自动分析嵌入到我们的数据流中,我们可以立即获得有关数据当前状态的反馈,并收到任何潜在关键问题的警报 - 无论是与数据一致性和完整性有关(例如,损坏的值或更改格式),还是与短时间内发生的事件(例如,数据漂移, 偏离业务规则和结果)。
在现实世界的领域——你只知道墨菲定律一定会发生,“一切都可能出错”——自动分析可能会让我们免于多个大脑难题和需要停止生产的系统!
在涉及数据剖析方面,无论是表格数据还是时间序列数据,它一直是大众的最爱。难怪为什么 - 它是一组广泛的分析和见解的一行代码。ydata-profiling
复杂且耗时的操作是在后台完成的:ydata 分析会自动检测数据中包含的特征类型,并根据特征类型(数字或分类)调整分析报告中显示的汇总统计数据和可视化效果。
该软件包促进了以数据为中心的分析,还突出了特征之间的现有关系,重点关注它们的成对交互和相关性,并提供了对数据质量警报的全面评估,从重复或常量值到偏斜和不平衡的特征。
它实际上是我们数据质量的 360º 视图 - 只需最少的努力。
分析报告:突出显示潜在的数据质量问题。图片由作者提供。
五、把所有东西放在一起:字节蜡和ydata-profile。
在开始项目之前,我们需要先设置 python 依赖项并配置数据源。
首先,让我们安装 和 软件包(您可能希望为此使用虚拟环境 - 如果您需要一些额外的指导,请查看这些说明!bytewax
ydata-profiling
pip install bytewax==0.16.2 ydata-profiling==4.3.1
然后,我们将上传环境传感器遥测数据集(许可证 — CC0:公共域),其中包含来自不同 IoT 设备的温度、湿度、一氧化碳液化石油气、烟雾、光线和运动的多个测量值: 在生产环境中,这些测量将由每个设备连续生成,输入看起来像我们在 Kafka 等流媒体平台中期望的。在本文中,为了模拟我们在流数据中找到的上下文,我们将一次一行地从 CSV 文件中读取数据,并使用字节蜡创建数据流。
(作为快速旁注,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG)
首先,让我们进行一些必要的导入:
from datetime import datetime, timedelta, timezone
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main
map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以单独分析每个设备的数据,而不是同时分析所有设备的数据。
flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))
# parse timestamp
def parse_time(reading_data):
reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
return reading_data
flow.map(parse_time)
# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))
现在,我们将利用有状态功能,在我们定义的时间段内收集每个设备的数据。 需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。
在bytewax
ydata-profiling
中,我们能够为为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成引用每个物联网设备或特定时间框架的数据快照:ydata-profiling
from bytewax.window import EventClockConfig, TumblingWindow
# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
acc.append(reading)
return acc
# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
return reading["ts"]
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))
# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))
flow.fold_window("running_average", cc, wc, list, acc_values)
flow.inspect(print)
定义快照后,利用就像为我们要分析的每个数据帧调用 一样简单:ydata-profiling
PorfileReport
import pandas as pd
from ydata_profiling import ProfileReport
def profile(device_id__readings):
print(device_id__readings)
device_id, readings = device_id__readings
start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
df = pd.DataFrame(readings)
profile = ProfileReport(
df,
tsmode=True,
sortby="ts",
title=f"Sensor Readings - device: {device_id}"
)
profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
return f"device {device_id} profiled at hour {start_time}"
flow.map(profile)
在此示例中,我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到一些远程存储中。配置文件完成后,数据流需要一些输出,因此我们可以使用内置设备打印已分析的设备,以及从映射步骤中的配置文件函数传递的配置文件时间:StdOutput
flow.output("out", StdOutput())
有多种方法可以执行字节蜡数据流。在这个例子中,我们使用相同的本地机器,但 Bytewax 也可以在多个 Python 进程上运行,跨多个主机,在 Docker 容器中运行,使用 Kubernetes 集群等等。
在本文中,我们将继续使用本地设置,但我们鼓励你查看我们的帮助程序工具 waxctl,该工具在管道准备好过渡到生产环境后管理 Kubernetes 数据流部署。
假设我们与具有数据流定义的文件位于同一目录中,则可以使用以下方法运行它:
python -m bytewax.run ydata-profiling-streaming:flow
然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的更改,并比较不同设备或时间窗口之间的数据特征。
事实上,我们可以利用比较报告功能,以直接的方式突出显示两个数据配置文件之间的差异,从而更容易检测需要调查的重要模式或必须解决的问题:
snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")
comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")
六、准备好探索您自己的数据流了吗?
验证数据流对于连续识别数据质量问题并比较不同时间段的数据状态至关重要。
对于医疗保健、能源、制造和娱乐领域的组织(所有组织都在处理连续的数据流),分析是建立从质量评估到数据隐私的数据治理最佳实践的关键。
这需要对数据快照进行分析,如本文所示,可以通过组合 和 无缝实现数据快照。bytewax
ydata-profiling
Bytewax负责处理数据流并将其构建为快照所需的所有过程,然后可以通过数据特征的综合报告对其进行汇总并与ydata分析进行比较。
能够适当地处理和分析传入的数据开启了跨不同领域的大量用例,从纠正数据架构和格式中的错误到突出显示和缓解实际活动产生的其他问题,例如异常检测(例如,欺诈或入侵/威胁检测)、设备故障以及其他偏离预期的事件(例如,数据偏移或与业务规则不一致)。
现在,您就可以开始探索数据流了!让我们知道您发现了哪些其他用例,并一如既往地在评论中给我们留言,或在以数据为中心的 AI 社区中找到我们以获取进一步的问题和建议!再见!