数据流风格

news2024/10/22 0:57:47

数据流风格概述

数据流风格是一种软件架构风格,强调数据从一个计算单元流向下一个计算单元。这种风格特别适用于处理大数据量的任务或具有复杂数据处理链的系统。数据流架构解耦了数据生成者和处理者,使得系统具有较高的内聚性低耦合,非常适合数据密集型应用。

数据流风格可以细分为两种主要子风格:

  1. 批处理序列 (Batch Sequential)
  2. 管道过滤器 (Pipeline-Filter)

子风格一:批处理序列

批处理序列 是数据流风格中的一个具体形式,指的是一组数据在多阶段的处理流程中被按顺序处理。每个阶段的处理任务完成后,才会进入下一个阶段。

特点:
  1. 强时间顺序:每个阶段必须在前一阶段完全处理完后才能启动,存在严格的顺序性。
  2. 强数据完整性:数据在不同处理单元之间通过明确的接口或数据格式传递,确保每一阶段的数据处理独立且完整。
  3. 计划性:批处理通常通过时间调度或任务计划来触发处理,并按阶段执行,常用于日志分析、数据仓库加载等需要全量数据处理的场景。
适用场景:

批处理序列非常适合处理那些需要完整的数据集并按阶段处理的任务。例如:

  • 日志文件处理:将每天的日志文件分批加载到数据库中,首先解析文件,然后提取信息,最后存储到数据库。
示例:
// 批处理阶段一:解析日志文件
class ParseLogs {
    public List<String> processLogs(String logFile) {
        // 假设解析日志并返回日志条目列表
        return List.of("INFO: Server started", "ERROR: Disk full", "INFO: User logged in");
    }
}

// 批处理阶段二:筛选错误信息
class FilterErrors {
    public List<String> filter(List<String> logs) {
        List<String> errors = new ArrayList<>();
        for (String log : logs) {
            if (log.startsWith("ERROR")) {
                errors.add(log);
            }
        }
        return errors;
    }
}

// 批处理阶段三:存储错误信息
class StoreErrors {
    public void store(List<String> errors) {
        for (String error : errors) {
            System.out.println("Storing error: " + error);
        }
    }
}

public class BatchProcessing {
    public static void main(String[] args) {
        ParseLogs parser = new ParseLogs();
        FilterErrors filter = new FilterErrors();
        StoreErrors store = new StoreErrors();
        
        // 批处理步骤:逐步处理日志文件
        List<String> logs = parser.processLogs("system.log");
        List<String> errors = filter.filter(logs);
        store.store(errors);
    }
}
执行流程:
  1. ParseLogs 解析日志文件。
  2. FilterErrors 从解析的日志中筛选出错误信息。
  3. StoreErrors 将错误信息存储到目标存储系统中。

该流程严格按照步骤执行,适合批量处理日志等场景。


子风格二:管道过滤器

管道过滤器是一种通过多个独立的过滤器处理数据的架构风格,强调数据从一个过滤器流向另一个过滤器,形成一个数据流管道。

特点:
  1. 增量处理:每个过滤器只处理数据的一部分,然后将处理结果传递给下一个过滤器。数据处理是递增式的,没有整体性处理的概念。
  2. 过滤器独立性:过滤器是完全独立的实体,它们只关心自己的输入和输出,不需要知道其他过滤器的状态。
  3. 解耦与复用:每个过滤器是模块化的,容易被替换、重用,且可以灵活组合形成新的数据流。
  4. 高响应性:与批处理不同,管道过滤器架构可以在数据流经每个过滤器时快速响应,支持流式处理(如秒级或分钟级的处理)。
适用场景:

适合实时处理和流式数据的场景,例如:

  • 日志流处理:可以实时监控日志流中的错误信息,触发告警系统。
  • 实时数据清洗:过滤和转换传感器数据或网络流量数据等。
示例:
import java.util.ArrayList;
import java.util.List;

// 定义过滤器接口
interface Filter {
    List<String> process(List<String> data);
}

// 输入过滤器,生成输入数据
class InputFilter implements Filter {
    @Override
    public List<String> process(List<String> data) {
        return List.of("INFO: Server started", "ERROR: Disk full", "INFO: User logged in");
    }
}

// 错误过滤器,筛选出错误信息
class ErrorFilter implements Filter {
    @Override
    public List<String> process(List<String> data) {
        List<String> errors = new ArrayList<>();
        for (String log : data) {
            if (log.startsWith("ERROR")) {
                errors.add(log);
            }
        }
        return errors;
    }
}

// 警报过滤器,处理错误信息并触发警报
class AlertFilter implements Filter {
    @Override
    public List<String> process(List<String> data) {
        for (String log : data) {
            System.out.println("ALERT: " + log);
        }
        return data; // 返回原数据以便于后续处理
    }
}

// 管道类,用于将过滤器组合
class Pipeline {
    private final List<Filter> filters = new ArrayList<>();

    public void addFilter(Filter filter) {
        filters.add(filter);
    }

    public List<String> execute(List<String> input) {
        List<String> data = input;
        for (Filter filter : filters) {
            data = filter.process(data); // 每个过滤器独立处理
        }
        return data;
    }
}

public class PipelineProcessing {
    public static void main(String[] args) {
        // 创建管道并添加过滤器
        Pipeline pipeline = new Pipeline();
        pipeline.addFilter(new InputFilter());
        pipeline.addFilter(new ErrorFilter());
        pipeline.addFilter(new AlertFilter());

        // 执行管道
        List<String> logs = pipeline.execute(null); // 由第一个过滤器生成数据
    }
}
执行流程:
  1. InputFilter 生成日志数据。
  2. ErrorFilter 筛选出其中的错误日志。
  3. AlertFilter 处理错误日志并触发警报。

过滤器是模块化的,顺序可以自由组合,且每个过滤器之间没有强耦合。


总结:批处理序列 vs 管道过滤器

特点批处理序列管道过滤器
处理方式阶段性处理,每一步必须依次执行流式处理,每个过滤器独立工作
数据完整性每步处理后保持数据完整性,直到所有步骤完成增量处理,数据逐步传递和变换
响应时间较长,适合全量数据处理较短,适合实时流式处理
耦合度依赖严格的顺序每个过滤器独立工作,易于扩展
适用场景日志分析、大规模数据处理实时监控、流式数据处理

这两种子风格分别适用于不同的场景。批处理序列适合需要全量数据和阶段性处理的应用,而管道过滤器适合实时处理和流式数据处理。这两者共同的优点是解耦和复用,保证了数据流风格架构在面对复杂数据处理时的高效性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2220411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cefsharp79.1.360(Chromium 79.0.3945.130)支持H264视频播放-PDF预览 老版本回顾系列体验

一、关于此版本 版本:Cef 79.1.36/CefSharp 79.1.360/Chromium 79.0.3945.130/支持H264/支持PDF预览 支持PDF预览和H264推荐版本 63/79/84/88/100/111/125 运行环境需要 visual c++ 2015不支持xp/vista/2003/2008默认不支持h264(版权问题)支持打印预览 print preview已知问题…

Kafka之原理解析

定义 Kafka 是一个分布式流媒体平台&#xff0c;kafka官网&#xff1a;http://kafka.apache.org/ Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统&#xff0c;最初由 LinkedIn 公司开发&#xff0c;使用Scala 语言编写&#xff0c;目前是Apache 的开源项目。 流媒体…

深入解析Golang GMP

文章目录 1. 引言2. GMP 模型概述与核心结构体2.1. G&#xff08;Goroutine&#xff09;2.2. M&#xff08;Machine/Thread&#xff09;2.3. P&#xff08;Processor&#xff09;2.4. 全局调度器schedt&#xff08;Scheduler&#xff09; 3. Goroutine 的生命周期与状态管理3.1…

子比主题美化-用户中心隐私功能

前言 子比主题用户中心的文章、评论、粉丝等默认全部人可见&#xff0c;但是有时不想让全部人可见就可以开启此功能 图片展示 教程开始 把以下代码添加到子比主题下&#xff0c;按顺序找到该文件/inc/functions/zib-author.php&#xff0c;在zib-author.php第374行把原代码删…

面试官:`interrupted()` 和 `isInterrupted()` 你真的用懂了吗?

感谢Java面试教程的 Java面试题&#xff1a;interrupted和isInterrupted方法的区别 在Java中&#xff0c;interrupted() 和 isInterrupted() 是用于检查线程中断状态的方法&#xff0c;但它们之间有一些关键的区别。 方法类型&#xff1a; interrupted() 是一个静态方法&…

每月洞察:App Store 和 Google Play 的主要更新

Google Play 和 App Store 的算法不断发展&#xff0c;定期更新和变化会显着影响其功能。对于开发人员和营销人员来说&#xff0c;跟上这些变化至关重要&#xff0c;因为它们会直接影响应用发现和排名。 本文将深入探讨 Google Play 和 App Store 的最新更新&#xff0c;解释它…

基于微信小程序二手物品调剂系统设计与实现

文章目录 前言项目介绍技术介绍功能介绍核心代码数据库参考 系统效果图文章目录 前言 文章底部名片&#xff0c;获取项目的完整演示视频&#xff0c;免费解答技术疑问 项目介绍 二手物品调剂系统是一种在线平台&#xff0c;旨在促进用户之间的二手物品交易。该系统提供了一个…

【Pycharm】显示内存不足the IDE is running low on memory解决方法

Pycharm提示显示内存不足the IDE is running low on memory解决方法 在右上角找到Help&#xff0c;点击&#xff0c;找到change memory settings 修改数值如1024&#xff0c;2048 等&#xff0c;增大容量即可。最后点击save and Restart

Newstar_week1_week2_wp

week1 wp crypto 一眼秒了 n费马分解再rsa flag&#xff1a; import libnum import gmpy2 from Crypto.Util.number import * p 9648423029010515676590551740010426534945737639235739800643989352039852507298491399561035009163427050370107570733633350911691280297…

大数据之hive(分布式SQL计算工具)加安装部署

1.分布式SQL计算: 对数据进行统计分析&#xff0c; SQL是目前最为方便的编程工具. 2.hive:主要功能: 将 SQL语句翻译成MapReduce程序运行,提供用户分布式SQL计算能力 3.构建分布式SQL计算:(hive核心组件) 需要有: 一:元数据管理功能, 即&#xff1a;数据位置,数据结构,等对数…

每日OJ题_牛客_[NOIP2001]装箱问题_01背包_C++_Java

目录 牛客_[NOIP2001]装箱问题_01背包 题目解析 C代码 Java代码 牛客_[NOIP2001]装箱问题_01背包 [NOIP2001]装箱问题 (nowcoder.com) 描述&#xff1a; 有一个箱子容量为V&#xff08;正整数&#xff0c;0 ≤ V ≤ 20000&#xff09;&#xff0c;同时有n个物品&…

Vue3中ref和reactive的对比

1. ref 定义 用途: 用于创建基本数据类型或单一值的响应式引用。语法: const myRef ref(initialValue); 特性 返回一个包含 .value 属性的 Proxy 对象。适用于基本数据类型&#xff08;如数字、字符串、布尔值等&#xff09;和单一值。 import { ref } from vue;const co…

售后管理系统 解锁服务效率与质量双重提升

售后管理系统通过提升响应速度、确保服务一致性、数据分析优化流程&#xff0c;提高企业售后服务质量。ZohoDesk等解决方案可自动化分配工单、多渠道支持、管理追踪工单等&#xff0c;增强客户满意度和忠诚度。 一、什么是售后管理系统 首先&#xff0c;我们需要了解什么是售后…

SSM网上鲜花商城—计算机毕业设计源码41992

目 录 摘要 1 绪论 1.1研究背景 1.2研究内容 1.3系统开发技术的特色 1.4 ssm框架介绍 1.5论文结构与章节安排 2 网上鲜花商城系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1数据增加流程 2.2.2数据修改流程 2.2.3数据删除流程 2.3 系统功能分析 2.3.1 功能性…

吴恩达深度学习笔记(7)

误差分析&#xff1a; 你运行一个算法代替人类计算&#xff0c;但是没有达到人类的效果&#xff0c;需要手动检查算法中的错误&#xff0c;对模型的一些部分做相应调整&#xff0c;才能更好地提升分类的精度。如果不加分析去做&#xff0c;可能几个月的努力对于提升精度并没有…

opencv学习:基于计算机视觉的表情识别系统

简介 基于计算机视觉的表情识别系统&#xff0c;该系统能够从视频流中实时检测人脸&#xff0c;并识别出两种基本表情&#xff1a;大笑和微笑。实验通过分析人脸关键点来计算表情特征指标&#xff0c;从而判断表情类型。 原理 基于以下原理进行&#xff1a; 人脸检测&#x…

缓存常见问题:缓存穿透、雪崩、击穿及解决方案分析

1. 什么是缓存穿透&#xff0c;怎么解决&#xff1f; 缓存穿透是指用户请求的数据在缓存中不存在即没有命中&#xff0c;同时在数据库中也不存在&#xff0c;导致用户每次请求该数据都要去数据库中查询一遍。如果有恶意攻击者不断请求系统中不存在的数据&#xff0c;会导致短时…

使用RabbitMQ实现延迟消息的完整指南

在分布式系统中&#xff0c;消息队列通常用于解耦服务&#xff0c;RabbitMQ是一个广泛使用的消息队列服务。延迟消息&#xff08;也称为延时队列或TTL消息&#xff09;是一种常见的场景应用&#xff0c;特别适合处理某些任务在一段时间后执行的需求&#xff0c;如订单超时处理、…

CISP/NISP二级练习题-第一卷

目录 另外免费为大家准备了刷题小程序和docx文档&#xff0c;有需要的可以私信获取 1&#xff0e;不同的信息安全风险评估方法可能得到不同的风险评估结果&#xff0c;所以组织 机构应当根据各自的实际情况选择适当的风险评估方法。下面的描述中错误的是 &#xff08;&#…

Cesium 实战 - 自定义纹理材质 - 立体墙(旋转材质)

Cesium 实战 - 自定义纹理材质 - 立体墙(旋转材质) 核心代码完整代码在线示例Cesium 给实体对象(Entity)提供了很多实用的样式,基本满足普通项目需求; 但是作为 WebGL 引擎,肯定不够丰富,尤其是动态效果样式。 对于实体对象(Entity),可以通过自定义材质,实现各种…