【数据挖掘】bytewax 与 ydata工具可实时了解您的数据

news2025/1/18 1:56:43

一、说明

         在这篇博文中,我们将介绍如何将开源流式处理解决方案 bytewax 与 ydata 分析相结合并加以利用,以提高流式处理流的质量。

        STream 处理支持在传输中和存储之前对数据进行实时分析,并且可以是有状态的,也可以是无状态的。

        有状态流处理用于实时建议、模式检测或复杂事件处理,其中处理需要已发生事件的历史记录(窗口、按键连接等)。

        无状态流处理用于内联转换,不需要了解流中的其他数据点,例如屏蔽电子邮件或转换类型。        

        总体而言,数据流在工业中被广泛使用,并且可以应用于欺诈检测患者监控事件预测维护等用例。

二、 数据流必须考虑关键是数据的质量

        与通常在创建数据仓库或仪表板解决方案期间评估数据质量的传统模型不同,流数据需要持续监视

        在从收集到馈送下游应用程序的整个过程中保持数据质量至关重要。毕竟,对于组织来说,糟糕的数据质量的成本可能很高。

        在本文中,我们将向您展示如何结合以分析和提高流媒体流的质量!bytewaxydata-profiling

三、使用 Bytewax 为数据专业人员提供流处理

          是专门为Python开发人员设计的OSS流处理框架。

        它允许用户构建具有类似于Flink,Spark和Kafka Streams功能的流数据管道和实时应用程序,同时提供友好和熟悉的界面以及与Python生态系统的100%兼容性。

        使用内置连接器或现有的 Python 库,您可以连接到实时和流数据源(Kafka、RedPanda、WebSocket 等),并将转换后的数据写入各种下游系统(Kafka、拼花地板文件、数据湖等)。

        对于转换,Bytewax 通过映射窗口聚合方法促进有状态和无状态转换,并具有恢复和可伸缩性等熟悉的功能。

        Bytewax 促进了 Python 优先和以数据为中心的数据流体验,并且是为数据工程师和数据科学家构建的。它允许用户构建流数据管道和实时应用程序,并创建满足其需求所需的自定义项,而无需学习和维护基于 JVM 的流平台,如 Spark 或 Flink。

        Bytewax 非常适合许多用例,即为生成 AI 嵌入管道、处理数据流中的缺失值、在流上下文中使用语言模型来理解金融市场等等。有关用例灵感和更多信息,如文档、教程和指南,请随时查看字节蜡网站。

四、为什么要对数据流进行数据剖析?

        数据剖析是成功启动任何机器学习任务的关键,指的是彻底了解数据的步骤:其结构、行为和质量。

        简而言之,数据分析涉及分析与数据格式和基本描述符相关的方面(例如,样本数量、特征的数量/类型、重复值)、其内在特征(例如存在缺失数据或不平衡的特征)以及在数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致的特征)。

        确保高数据质量标准对所有领域和组织都至关重要,但对于使用输出连续数据的域运营的领域尤其重要,其中情况可能会快速变化,可能需要立即采取行动(例如,医疗保健监测、股票价值、空气质量政策)。

        对于许多领域,从探索性数据分析的角度使用数据分析,考虑存储在数据库中的历史数据。相反,对于数据流,数据分析对于沿流持续验证和质量控制变得至关重要,需要在流程的不同时间范围或阶段检查数据。

        通过将自动分析嵌入到我们的数据流中,我们可以立即获得有关数据当前状态的反馈,并收到任何潜在关键问题的警报 - 无论是与数据一致性和完整性有关(例如,损坏的值或更改格式),还是与短时间内发生的事件(例如,数据漂移, 偏离业务规则和结果)。

        在现实世界的领域——你只知道墨菲定律一定会发生,“一切都可能出错”——自动分析可能会让我们免于多个大脑难题和需要停止生产的系统!

        在涉及数据剖析方面,无论是表格数据还是时间序列数据,它一直是大众的最爱。难怪为什么 - 它是一组广泛的分析和见解的一行代码。ydata-profiling

        复杂且耗时的操作是在后台完成的:ydata 分析会自动检测数据中包含的特征类型,并根据特征类型(数字或分类)调整分析报告中显示的汇总统计数据和可视化效果。

        该软件包促进了以数据为中心的分析,还突出了特征之间的现有关系,重点关注它们的成对交互相关性,并提供了对数据质量警报的全面评估,从重复常量值到偏斜不平衡的特征。

        它实际上是我们数据质量的 360º 视图 - 只需最少的努力。

        分析报告:突出显示潜在的数据质量问题。图片由作者提供。

五、把所有东西放在一起:字节蜡和ydata-profile。

        在开始项目之前,我们需要先设置 python 依赖项并配置数据源。

        首先,让我们安装 和 软件包(您可能希望为此使用虚拟环境 - 如果您需要一些额外的指导,请查看这些说明!bytewaxydata-profiling

pip install bytewax==0.16.2 ydata-profiling==4.3.1

        然后,我们将上传环境传感器遥测数据集(许可证 — CC0:公共域),其中包含来自不同 IoT 设备的温度、湿度、一氧化碳液化石油气、烟雾、光线和运动的多个测量值:        在生产环境中,这些测量将由每个设备连续生成,输入看起来像我们在 Kafka 等流媒体平台中期望的。在本文中,为了模拟我们在流数据中找到的上下文,我们将一次一行地从 CSV 文件中读取数据,并使用字节蜡创建数据流。

        (作为快速旁注,数据流本质上是一个数据管道,可以描述为有向无环图 — DAG)

        首先,让我们进行一些必要的导入

from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main
        然后,我们定义数据流对象。之后,我们将使用无状态映射方法,在其中传入一个函数将字符串转换为 datetime 对象并将数据重组为格式(device_id,数据)。

        map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是,我们可以在接下来的步骤中轻松地对数据进行分组,以单独分析每个设备的数据,而不是同时分析所有设备的数据。

flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))

# parse timestamp
def parse_time(reading_data):
    reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
    return reading_data

flow.map(parse_time)


# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))

        现在,我们将利用有状态功能,在我们定义的时间段内收集每个设备的数据。 需要一段时间内的数据快照,这使得窗口运算符成为执行此操作的完美方法。

        在bytewaxydata-profiling 中,我们能够为为特定上下文指定的数据帧生成汇总统计信息。例如,在我们的示例中,我们可以生成引用每个物联网设备或特定时间框架的数据快照:ydata-profiling

from bytewax.window import EventClockConfig, TumblingWindow

# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
    acc.append(reading)
    return acc


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
    return reading["ts"]

  
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))

# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))

flow.fold_window("running_average", cc, wc, list, acc_values)

flow.inspect(print)

        定义快照后,利用就像为我们要分析的每个数据帧调用 一样简单:ydata-profilingPorfileReport

import pandas as pd
from ydata_profiling import ProfileReport

def profile(device_id__readings):
    print(device_id__readings)
    device_id, readings = device_id__readings
    start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
    df = pd.DataFrame(readings)
    profile = ProfileReport(
        df,
        tsmode=True,
        sortby="ts",
        title=f"Sensor Readings - device: {device_id}"
    )

    profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
    return f"device {device_id} profiled at hour {start_time}"


flow.map(profile)

        在此示例中,我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在将来将它们保存到一些远程存储中。配置文件完成后,数据流需要一些输出,因此我们可以使用内置设备打印已分析的设备,以及从映射步骤中的配置文件函数传递的配置文件时间:StdOutput

flow.output("out", StdOutput())

        有多种方法可以执行字节蜡数据流。在这个例子中,我们使用相同的本地机器,但 Bytewax 也可以在多个 Python 进程上运行,跨多个主机,在 Docker 容器中运行,使用 Kubernetes 集群等等。

        在本文中,我们将继续使用本地设置,但我们鼓励你查看我们的帮助程序工具 waxctl,该工具在管道准备好过渡到生产环境后管理 Kubernetes 数据流部署。

假设我们与具有数据流定义的文件位于同一目录中,则可以使用以下方法运行它:

python -m bytewax.run ydata-profiling-streaming:flow

        然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的更改,并比较不同设备或时间窗口之间的数据特征

        事实上,我们可以利用比较报告功能,以直接的方式突出显示两个数据配置文件之间的差异,从而更容易检测需要调查的重要模式或必须解决的问题:

snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")

comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

六、准备好探索您自己的数据流了吗?

        验证数据流对于连续识别数据质量问题并比较不同时间段的数据状态至关重要。

        对于医疗保健能源制造娱乐领域的组织(所有组织都在处理连续的数据流),分析是建立从质量评估到数据隐私的数据治理最佳实践的关键

        这需要对数据快照进行分析,如本文所示,可以通过组合 和 无缝实现数据快照。bytewaxydata-profiling

        Bytewax负责处理数据流并将其构建为快照所需的所有过程,然后可以通过数据特征的综合报告对其进行汇总并与ydata分析进行比较。

        能够适当地处理和分析传入的数据开启了跨不同领域的大量用例,从纠正数据架构和格式中的错误到突出显示和缓解实际活动产生的其他问题,例如异常检测(例如,欺诈或入侵/威胁检测)、设备故障以及其他偏离预期的事件(例如,数据偏移或与业务规则不一致)。

        现在,您就可以开始探索数据流了!让我们知道您发现了哪些其他用例,并一如既往地在评论中给我们留言,或在以数据为中心的 AI 社区中找到我们以获取进一步的问题和建议!再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/780039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[STL]vector使用介绍

[STL]vector使用介绍 注:文内代码均在Visual Studio 2013下进行测试,不同的编译器下在扩容大小等方面可能有所不同,但不影响各接口函数的使用。 文章目录 [STL]vector使用介绍1. vector介绍2. 构造函数3. 迭代器相关函数begin函数和end函数的…

实现点击复制到剪切板功能

该功能使用VueUse实现 什么是 VueUse VueUse不是Vue.use,它是为Vue 2和3服务的一套Vue Composition API的常用工具集,是目前世界上Star最高的同类型库之一。它的初衷就是将一切原本并不支持响应式的JS API变得支持响应式,省去程序员自己写相…

jmeter常用的提取器(正则表达式和JSON提取器)

jmeter常用的后置处理器有两种提取数据: 1、JSON提取器 获取后可以将变量token引用到其他所需要的地方 (正则表达式和JSON提取器):2023接口自动化测试框架必会两大神器:正则提取器和Jsonpath提取器_哔哩哔哩_bilibilihttps://www.bilibili.…

uniapp实战

上面是tab栏,下面是swiper,,tab和swiper和 红色滑块 动态变化,, 遇到的问题: 往下滚动 tab栏 吸顶: position:sticky; z-index:99; top:0;swiper切换触发 change 事件, :current …

SOMEIP协议--第四节[ SOME/IP](someip概述与行为)

SOMEIP协议–第四节[ SOME/IP](someip概述与行为) 文章目录 SOMEIP协议--第四节[ SOME/IP](someip概述与行为)1、概述2、someip的行为2.1 基础传输2.2 SOME/IP-TP传输:2.3 someip参数(client)2.4 someip参数(server)1、概述 Method | Event | Field是上层设计的三个概念…

【C++】优先级队列和反向迭代器 模拟笔记

文章目录 优先级队列仿函数适配器模式堆知识储备 反向迭代器代码(反向迭代器)代码(优先级队列) 优先级队列 仿函数 仿函数,它不是函数(其实是个类),但用法和函数一样。既然是个类&a…

子类化QThread来实现多线程,moveToThread函数的作用

子类化QThread来实现多线程, QThread只有run函数是在新线程里的,其他所有函数都在QThread生成的线程里。正确启动线程的方法是调用QThread::start()来启动。 一、步骤 子类化 QThread;重写run,将耗时的事件放到此函数执行&#…

轻量级Web报表工具ActiveReportsJS全新发布v4.0,支持集成更多前端框架!

ActiveReportsJS 是一款基于 JavaScript 和 HTML5 的轻量级Web报表工具,采用拖拽式设计模式,不需任何服务器和组件支持,即可在 Mac、Linux 和 Windows 操作系统中,设计多种类型的报表。ActiveReportsJS 同时提供跨平台报表设计、纯…

18.背景轮播

背景轮播 html部分 <div class"container"><div class"slide active" style"background-image: url(./static/20180529205331_yhGyf.jpeg);"></div><div class"slide " style"background-image: url(./s…

vue3+taro+Nutui 开发小程序(二)

上一篇我们初始化了小程序项目&#xff0c;这一篇我们来整理一下框架 首先可以看到我的项目整理框架是这样的&#xff1a; components:这里存放封装的组件 custom-tab-bar:这里存放自己封装的自定义tabbar interface&#xff1a;这里放置了Ts的一些基本泛型&#xff0c;不用…

AtcoderABC238场

A - Exponential or QuadraticA - Exponential or Quadratic 题目大意 给定一个整数 n&#xff0c;判断是否满足 2n >n 2。 思路分析 根据数学知识可知n 的取值在 2 到 4 之间&#xff08;包括 2 和 4&#xff09;&#xff0c;不满足条件 。 时间复杂度 O(1) AC代码 …

MyBatis学习笔记——4

MyBatis学习笔记——4 一、MyBatis的高级映射及延迟加载1.1、多对一1.1.1、第一种方式&#xff1a;级联属性映射1.1.2、第二种方式&#xff1a;association1.1.3、第三种方式&#xff1a;分步查询 1.2、一对多1.2.1、第一种方式&#xff1a;collection1.2.1、第二种方式&#x…

Linux Ubuntu crontab 添加错误 提示:no crontab for root - using an empty one 888

资料 错误提示&#xff1a; no crontab for root - using an empty one 888 原因剖析&#xff1a; 第一次使用crontab -e 命令时会让我们选择编辑器&#xff0c;很多人会不小心选择默认的nano&#xff08;不好用&#xff09;&#xff0c;或则提示no crontab for root - usin…

一文了解Python中的while循环语句

目录 &#x1f969;循环语句是什么 &#x1f969;while循环 &#x1f969;遍历猜数字 &#x1f969;while循环嵌套 &#x1f969;while循环嵌套案例 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &#x1f990;专栏地址&#xff1a;Python从入门到精通专栏 循环语句是什…

【N32L40X】学习笔记11-ADC规则通道采集+dma数据传输

ADC规则通道转换 概述 支持 1 个 ADC&#xff0c;支持单端输入和差分输入&#xff0c;最多可测量 16 个外部和 3 个内部源。支持 12 位、10 位、8 位、6 位分辨率。ADC 时钟源分为工作时钟源、采样时钟源和计时时钟源 仅可配置 AHB_CLK 作为工作时钟源。可配置 PLL 作为采样时…

【安全】Sqllabs(1-4) 多种情况浅析

目录 环境⭐ 先要 ⭐⭐⭐⭐⭐ Less - 1 (information_shcema) Less - 2 (假设没有information_schema) Less - 3 (无列名注入) Less - 4 环境⭐ MySQL8.0.12 Nginx1.15.11 先要 ⭐⭐⭐⭐⭐ MySQL5.0以上有这几个数据库mysql, sys&#xff0c;information_schema informa…

前端性能优化——图片优化

一、图片优化措施 优化图片是 Web 前端优化的重要一环&#xff0c;因为图片是 Web 页面中最耗费带宽和加载时间的资源之一。以下是一些通过优化图片来优化 Web 前端的方法&#xff1a; 压缩图片&#xff1a;压缩图片可以减少图片的文件大小&#xff0c;从而减少加载时间。 使…

【数学建模】相关是一个距离指标吗?

一、说明 本文探讨最平凡的数学模型--距离模型。我们知道&#xff0c;任何数学模型如果是个距离模型&#xff0c;那么它是&#xff1a;放心的、自动的、不加任意条件的指标项目。然而另一些度量参数不是距离空间&#xff0c;因此&#xff0c;使用起来必须外加若干条件&#xff…

苹果iOS 16.6 RC发布:或为iPhone X/8系列养老版本

今天苹果向iPhone用户推送了iOS 16.6 RC更新(内部版本号&#xff1a;20G75)&#xff0c;这是时隔两个月的首次更新。 按照惯例RC版基本不会有什么问题&#xff0c;会在最近一段时间内直接变成正式版&#xff0c;向所有用户推送。 需要注意的是&#xff0c;鉴于iOS 17正式版即将…

【CN-Docker】window11下Docker下开启kubernetes

1. 安装Docker 安装docker步骤如下&#xff1a; 下载Docker启用hyper-v 2.1.powershell&#xff0c;管理员运行Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Hyper-V -All安装wsl配置Docker镜像加速地址(阿里云) 4.1. "registry-mirrors": [&quo…