如何深入理解 Node.js 中的流(Streams)

news2024/10/1 12:19:31

1bd582bee462f9ee83e9025b272a19a1.jpeg

Node.js是一个强大的允许开发人员构建可扩展和高效的应用程序。Node.js的一个关键特性是其内置对流的支持。流是Node.js中的一个基本概念,它能够实现高效的数据处理,特别是在处理大量信息或实时处理数据时。

在本文中,我们将探讨Node.js中的流概念,了解可用的不同类型的流(可读流、可写流、双工流和转换流),并讨论有效处理流的最佳实践。

什么是Node.js流?

流是Node.js应用程序中的一个基本概念,通过按顺序读取或写入输入和输出,实现高效的数据处理。它们非常适用于文件操作、网络通信和其他形式的端到端数据交换。

流的独特之处在于它以小的、连续的块来处理数据,而不是一次性将整个数据集加载到内存中。这种方法在处理大量数据时非常有益,因为文件大小可能超过可用内存。流使得以较小的片段处理数据成为可能,从而可以处理更大的文件。

6535dfd4c44df671756405a1a77f33ae.png

如上图所示,数据通常以块或连续流的形式从流中读取。从流中读取的数据块可以存储在缓冲区中。缓冲区提供临时存储空间,用于保存数据块,直到进一步处理。

为了进一步说明这个概念,考虑一个实时股票市场数据源的情景。在金融应用中,实时更新股票价格和市场数据对于做出明智的决策至关重要。流式处理使应用程序能够以较小的连续块处理数据,而不是获取和存储整个数据源,这可能是相当庞大和不切实际的。数据通过流动,允许应用程序在更新到达时执行实时分析、计算和通知。这种流式处理方法节省了内存资源,并确保应用程序能够迅速响应市场波动,并向交易员和投资者提供最新信息。它消除了在采取行动之前等待整个数据源可用的需要。

为什么要使用流?

流提供了与其他数据处理方法相比的两个关键优势。

内存效率

使用流,处理前不需要将大量数据加载到内存中。相反,数据以较小的可管理块进行处理,减少了内存需求并有效利用了系统资源。

时间效率

流使得数据一旦可用就能立即进行处理,而不需要等待整个有效负载的传输。这样可以实现更快的响应时间和改善整体性能。

理解并有效地利用流能够帮助开发人员实现最佳的内存使用、更快的数据处理和增强的代码模块化,使其成为Node.js应用程序中强大的功能。然而,不同类型的Node.js流可以用于特定的目的,并在数据处理方面提供灵活性。为了在您的Node.js应用程序中有效地使用流,有必要清楚地了解每种流类型。因此,让我们深入研究一下Node.js中可用的不同流类型。

Node.js流的类型

Node.js 提供了四种主要类型的流,每种流都有特定的用途:

Readable Streams 可读流

可读流允许从源(如文件或网络套接字)读取数据。它们按顺序发出数据块,并可以通过附加监听器到“data”事件来消费。可读流可以处于流动或暂停状态,取决于数据的消费方式。

const fs = require('fs');

// Create a Readable stream from a file
const readStream = fs.createReadStream('the_princess_bride_input.txt', 'utf8');

// Readable stream 'data' event handler
readStream.on('data', (chunk) => {
  console.log(`Received chunk: ${chunk}`);
});

// Readable stream 'end' event handler
readStream.on('end', () => {
  console.log('Data reading complete.');
});

// Readable stream 'error' event handler
readStream.on('error', (err) => {
  console.error(`Error occurred: ${err}`);
});

如上所示的代码片段中,我们使用fs模块使用createReadStream()方法创建一个可读流。我们将文件路径
the_princess_bride_input.txt 和编码 utf8 作为参数传递。可读流以小块方式从文件中读取数据。

我们将事件处理程序附加到可读流上以处理不同的事件。当数据块可供读取时,会触发 data 事件。当可读流完成从文件中读取所有数据时,会触发 end 事件。如果在读取过程中发生错误,则会触发 error 事件。

通过使用可读流并监听相应的事件,您可以高效地从源(例如文件)中读取数据,并对接收到的数据块执行进一步操作。

Writable Streams 可写流

可写流处理将数据写入目标位置,例如文件或网络套接字。它们提供了像 write() 和 end() 这样的方法来向流发送数据。可写流可用于以分块方式写入大量数据,防止内存溢出。

const fs = require('fs');

// Create a Writable stream to a file
const writeStream = fs.createWriteStream('the_princess_bride_output.txt');

// Writable stream 'finish' event handler
writeStream.on('finish', () => {
  console.log('Data writing complete.');
});

// Writable stream 'error' event handler
writeStream.on('error', (err) => {
  console.error(`Error occurred: ${err}`);
});

// Write a quote from "The  to the Writable stream
writeStream.write('As ');
writeStream.write('You ');
writeStream.write('Wish');
writeStream.end();

在上面的代码示例中,我们使用fs模块使用 createWriteStream() 方法创建一个可写流。我们指定数据将被写入的文件路径(
the_princess_bride_output.txt )。

我们将事件处理程序附加到可写流上,以处理不同的事件。当可写流完成写入所有数据时,会触发 finish 事件。如果在写入过程中发生错误,则会触发 error 事件。使用 write() 方法将单个数据块写入可写流。在这个例子中,我们将字符串'As'、'You'和'Wish'写入流中。最后,我们调用 end() 来表示数据写入结束。

通过使用可写流并监听相应的事件,您可以高效地将数据写入目标位置,并在写入过程完成后执行任何必要的清理或后续操作。

Duplex Streams 双工流

双工流代表了可读和可写流的组合。它们允许同时从源读取和写入数据。双工流是双向的,并在同时进行读取和写入的情况下提供了灵活性。

const { Duplex } = require("stream");

class MyDuplex extends Duplex {
  constructor() {
    super();
    this.data = "";
    this.index = 0;
    this.len = 0;
  }
  
  _read(size) {
    // Readable side: push data to the stream
    const lastIndexToRead = Math.min(this.index + size, this.len);
    this.push(this.data.slice(this.index, lastIndexToRead));
    this.index = lastIndexToRead;
    if (size === 0) {
      // Signal the end of reading
      this.push(null);
    }
  }
  
  _write(chunk, encoding, next) {
    const stringVal = chunk.toString();
    console.log(`Writing chunk: ${stringVal}`);
    this.data += stringVal;
    this.len += stringVal.length;
    next();
  }
}

const duplexStream = new MyDuplex();
// Readable stream 'data' event handler
duplexStream.on("data", (chunk) => {
  console.log(`Received data:\n${chunk}`);
});

// Write data to the Duplex stream
// Make sure to use a quote from "The Princess Bride" for better performance :)
duplexStream.write("Hello.\n");
duplexStream.write("My name is Inigo Montoya.\n");
duplexStream.write("You killed my father.\n");
duplexStream.write("Prepare to die.\n");
// Signal writing ended
duplexStream.end();

在上面的例子中,我们从流模块扩展了Duplex类来创建一个双工流。双工流代表了既可读又可写的流(它们可以相互独立)。

我们定义了Duplex流的 _read() 和 _write() 方法来处理各自的操作。在这种情况下,我们将写入流和读取流绑定在一起,但这只是为了举例说明 - Duplex流支持独立的读取和写入流。

在 _read() 方法中,我们实现了双工流的可读端。我们使用 this.push() 将数据推送到流中,当大小变为0时,通过将null推送到流中来表示读取结束。

在 _write() 方法中,我们实现了Duplex流的可写端。我们处理接收到的数据块并将其添加到内部缓冲区。调用 next() 方法来指示写操作的完成。

事件处理程序附加到双工流的 data 事件,用于处理流的可读一侧。要向双工流写入数据,我们可以使用 write() 方法。最后,我们调用 end() 来表示写入结束。

双工流允许您创建一个双向流,既可以进行读取操作,也可以进行写入操作,从而实现灵活的数据处理场景。

Transform Streams 转换流

转换流是一种特殊类型的双工流,它在数据通过流时修改或转换数据。它们通常用于数据操作任务,如压缩、加密或解析。转换流接收输入数据,进行处理,并发出修改后的输出数据。

const { Transform } = require('stream');

// Create a Transform stream
const uppercaseTransformStream = new Transform({
  transform(chunk, encoding, callback) {
    // Transform the received data
    const transformedData = chunk.toString().toUpperCase();

    // Push the transformed data to the stream
    this.push(transformedData);

    // Signal the completion of processing the chunk
    callback();
  }
});

// Readable stream 'data' event handler
uppercaseTransformStream.on('data', (chunk) => {
  console.log(`Received transformed data: ${chunk}`);
});

// Write a classic "Princess Bride" quote to the Transform stream
uppercaseTransformStream.write('Have fun storming the castle!');
uppercaseTransformStream.end();

如上代码片段所示,我们使用流模块中的 Transform 类创建一个Transform流。我们在Transform流选项对象中定义 transform() 方法来处理转换操作。在 transform() 方法中,我们实现转换逻辑。在本例中,我们使用 chunk.toString().toUpperCase() 将接收到的数据块转换为大写。我们使用 this.push() 将转换后的数据推送到流中。最后,我们调用 callback() 来指示处理数据块的完成。

我们将事件处理程序附加到Transform流的 data 事件上,以处理流的可读端。要向Transform流写入数据,我们使用 write() 方法。并且我们调用 end() 来表示写入结束。

一个转换流允许您在数据通过流时即时进行数据转换,从而实现对数据的灵活和可定制的处理。

了解这些不同类型的流,让开发人员能够根据自己的特定需求选择适当的流类型。

使用Node.js流

为了更好地掌握Node.js Streams的实际应用,让我们考虑一个例子,使用流来读取数据并在转换和压缩后将其写入另一个文件。

const fs = require('fs');
const zlib = require('zlib');
const { Readable, Transform } = require('stream');

// Readable stream - Read data from a file
const readableStream = fs.createReadStream('classic_tale_of_true_love_and_high_adventure.txt', 'utf8');

// Transform stream - Modify the data if needed
const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    // Perform any necessary transformations
    const modifiedData = chunk.toString().toUpperCase(); // Placeholder for transformation logic
    this.push(modifiedData);
    callback();
  },
});

// Compress stream - Compress the transformed data
const compressStream = zlib.createGzip();

// Writable stream - Write compressed data to a file
const writableStream = fs.createWriteStream('compressed-tale.gz');

// Pipe streams together
readableStream.pipe(transformStream).pipe(compressStream).pipe(writableStream);

// Event handlers for completion and error
writableStream.on('finish', () => {
  console.log('Compression complete.');
});

writableStream.on('error', (err) => {
  console.error('An error occurred during compression:', err);
});

在这段代码片段中,我们使用可读流读取文件,将数据转换为大写,并使用两个转换流(一个是我们自己的,一个是内置的zlib转换流)进行压缩,最后使用可写流将数据写入文件。

我们使用 fs.createReadStream() 创建一个可读流,从输入文件中读取数据。使用 Transform 类创建一个转换流。在这里,您可以对数据进行任何必要的转换(对于这个例子,我们再次使用 toUpperCase() )。然后,我们使用 zlib.createGzip() 创建另一个转换流,使用Gzip压缩算法压缩转换后的数据。最后,我们使用 fs.createWriteStream() 创建一个可写流,将压缩后的数据写入 compressed-tale.gz 文件。

.pipe() 方法用于按顺序将流连接在一起。我们从可读流开始,将其导入转换流,然后将转换流导入压缩流,最后将压缩流导入可写流。它允许您建立从可读流通过转换和压缩流到可写流的流畅数据流。最后,事件处理程序被附加到可写流以处理 finish 和 error 事件。

使用 pipe() 简化了连接流的过程,自动处理数据流,并确保从可读流到可写流的高效和无误传输。它负责管理底层流事件和错误传播。

另一方面,直接使用事件可以让开发人员对数据流具有更精细的控制。通过将事件监听器附加到可读流上,您可以在将数据写入目标之前对接收到的数据执行自定义操作或转换。

在决定是使用 pipe() 还是events时,以下是一些你应该考虑的因素。

  • 简洁性:如果您需要一个简单直接的数据传输,不需要任何额外的处理或转换, pipe() 提供了一个简单而简洁的解决方案。

  • 灵活性:如果您需要更多地控制数据流,例如在写入数据之前修改数据或在过程中执行特定操作,直接使用事件可以为您提供灵活性以定制行为。

  • 错误处理:无论是 pipe() 还是事件监听器都可以用于错误处理。然而,使用事件时,您对错误处理有更多的控制权,并且可以实现自定义的错误处理逻辑。

选择最适合您特定用例的方法非常重要。对于简单的数据传输,由于其简单性和自动错误处理, pipe() 通常是首选。然而,如果您需要更多的控制或在数据流中进行额外处理,直接使用事件提供了必要的灵活性。

使用Node.js流的最佳实践

在使用Node.js Streams时,遵循最佳实践以确保最佳性能和可维护的代码非常重要。

  • 错误处理:在读取、写入或转换过程中,流可能会遇到错误。通过监听 error 事件并采取适当的措施,如记录错误或优雅地终止进程,处理这些错误非常重要。

  • 使用适当的高水位标记:高水位标记是一个缓冲区大小限制,用于确定可读流何时应该暂停或恢复其数据流。根据可用内存和正在处理的数据的性质,选择适当的高水位标记非常重要。这可以防止内存溢出或数据流中不必要的暂停。

  • 优化内存使用:由于流以块的形式处理数据,因此避免不必要的内存消耗非常重要。当资源不再需要时,例如在数据传输完成后关闭文件句柄或网络连接,始终释放资源。

  • 利用流工具:Node.js提供了几个实用模块,例如 stream.pipeline() 和 stream.finished() ,简化了流处理并确保适当的清理。这些工具处理错误传播、承诺集成和自动流销毁,减少了手动样板代码(我们在Amplication中都致力于最小化样板代码;))。

  • 实施流量控制机制:当可写流无法跟上从可读流读取数据的速度时,当可读流完成读取时,缓冲区中可能会有大量数据剩余。在某些情况下,这甚至可能超过可用内存的数量。这被称为背压。为了有效处理背压,考虑实施流量控制机制,例如使用 pause() 和 resume() 方法或利用第三方模块,如pump或through2。

通过遵循这些最佳实践,开发人员可以确保高效的流处理,最小化资源使用,并构建强大且可扩展的应用程序。

结束

Node.js流是一种强大的功能,可以以非阻塞的方式高效处理数据流。通过利用流,开发人员可以处理大型数据集,处理实时数据,并以内存高效的方式执行操作。了解不同类型的流,如可读流、可写流、双工流和转换流,并遵循最佳实践,可以确保最佳的流处理、错误管理和资源利用。通过利用流的能力,开发人员可以使用Node.js构建高性能和可扩展的应用程序。

由于文章内容篇幅有限,今天的内容就分享到这里,文章结尾,我想提醒您,文章的创作不易,如果您喜欢我的分享,请别忘了点赞和转发,让更多有需要的人看到。同时,如果您想获取更多前端技术的知识,欢迎关注我,您的支持将是我分享最大的动力。我会持续输出更多内容,敬请期待。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/919609.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

es和数据库同步方案

5.5 课程信息索引同步 5.5.1 技术方案 通过向索引中添加课程信息最终实现了课程的搜索,我们发现课程信息是先保存在关系数据库中,而后再写入索引,这个过程是将关系数据中的数据同步到elasticsearch索引中的过程,可以简单成为索引…

SD-WebUI和ComfyUI的局域网访问设置!

如何通过局域网访问AI绘画软件,这是星球成员提的一个问题,而且两个软件都问到了,我也回答过了。现在把内容整理一下发出来,大家可能用得着。 SD-WebUI和ComfyUI这两个AI绘画工具都是通过浏览器来使用,但是默认情况下并…

oops Framwork creator游戏开发框架

环境: Mac oops Framework 该框架是由gdflas编写,基于cocosCreator 3.x 而实现的开源游戏框架。特点: 框架通过插件方式提供,与项目相分离,方便不同版本平滑升级内置模块低耦合, 可根据需要进行删减&…

如何使用HTML5新增的标签来构建语义化的页面结构?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ <header>&#xff1a;⭐ <nav>&#xff1a;⭐ <main>&#xff1a;⭐ <section>&#xff1a;⭐ <article>&#xff1a;⭐ <aside>&#xff1a;⭐ <footer>&#xff1a;⭐ <figure> 和 &l…

开黑啦kook 机器人开发 PHP swoole Liunx 服务器(宝塔)

安装环境 PHP 拓展 直接使用 宝塔一键安装 &#xff08;Windows系统不支持&#xff09; 设置命令行的PHP版本避免执行脚本时 获取不到 swoole 检查swoole是否安装成功 获取官方SDK GitHub - kaiheila/php-bot: 开黑啦机器人的php版本https://github.com/kaiheila/php-bot 配…

同态排序算法

参考文献&#xff1a; [Batcher68] Batcher K E. Sorting networks and their applications[C]//Proceedings of the April 30–May 2, 1968, spring joint computer conference. 1968: 307-314. [SV11] Smart, N.P., Vercauteren, F.: Fully homomorphic SIMD operations. IA…

智能井盖传感器,物联网智能井盖系统

随着城市人口的不断增加和城市化进程的不断推进&#xff0c;城市基础设施的安全和可靠性变得愈发重要&#xff0c;城市窨井盖作为城市基础设施重要组成部分之一&#xff0c;其安全性事关城市安全有序运行和居民生产生活安全保障。 近年来&#xff0c;各地都在加强城市窨井盖治理…

多页面应用多次引入同一个资源优化方法

介绍 项目是多页面应用&#xff0c;每个界面都会引入一次layui框架源码&#xff0c;造成未优化之前界面加载十分缓慢 优化探索 想办法让多页面只加载一次 但是由于多页面使用iframe&#xff0c;主页面和子页面资源隔离&#xff0c;无法让资源只加载一次 利用浏览器缓存 在…

为Claude的分析内容做准备:提取PDF页面内容的简易应用程序

由于Claude虽然可以分析整个文件&#xff0c;但是对文件的大小以及字数是有限制的&#xff0c;为了将pdf文件分批传入Claude人工智能分析和总结文章内容&#xff0c;才有了这篇博客&#xff1a; 在本篇博客中&#xff0c;我们将介绍一个基于 wxPython 和 PyMuPDF 库编写的简易的…

计算机安全学习笔记(I):访问控制安全原理

访问控制原理 从广义上来讲&#xff0c;所有的计算机安全都与访问控制有关。 RFC 4949: Internet Security Glossary, Version 2 (rfc-editor.org) RFC 4949 定义的计算机安全&#xff1a;用来实现和保证计算机系统的安全服务的措施&#xff0c;特别是保证访问控制服务的措施…

【Flink】Flink提交流程

我们通常在学习的时候需要掌握大数据组件的原理以便更好的掌握这个大数据组件&#xff0c;Flink实际生产开发过程中最常见的就是提交到yarn上进行调度&#xff0c;模式使用的Per-Job模式&#xff0c;下面我们就给大家讲下Flink提交Per-Job任务到yarn上的流程&#xff0c;流程图…

【图论】缩点的综合应用(一)

一.缩点的概念 缩点&#xff0c;也称为点缩法&#xff08;Vertex Contraction&#xff09;&#xff0c;是图论中的一种操作&#xff0c;通常用于缩小图的规模&#xff0c;同时保持了图的某些性质。这个操作的目标是将图中的一些节点合并为一个超级节点&#xff0c;同时调整相关…

springboot小知识:配置feign服务超时时间

背景&#xff1a;当前项目通过feign服务调用了其他两个项目的接口&#xff0c;但是由于特殊需求&#xff0c;需要调整某一个项目的feign服务的默认超时时间&#xff1a; 默认连接超时10秒&#xff0c;默认读取超时时间 60秒 1.找到定义的FeignClient 2.根据FeignClient定义的名…

基于YOLOV8模型的课堂场景下人脸目标检测系统(PyTorch+Pyside6+YOLOv8模型)

摘要&#xff1a;基于YOLOV8模型的课堂场景下人脸目标检测系统可用于日常生活中检测与定位课堂场景下人脸&#xff0c;利用深度学习算法可实现图片、视频、摄像头等方式的目标检测&#xff0c;另外本系统还支持图片、视频等格式的结果可视化与结果导出。本系统采用YOLOv8目标检…

Unity实现UI图片面板滚动播放效果第二弹

效果&#xff1a; 场景结构&#xff1a; 特殊物体&#xff1a;panel下面用排列组件horizent layout group放置多个需要显示的面板&#xff0c;用mask遮罩好。 主要思路&#xff1a; 这次是要在最后一个toggle的地方&#xff0c;依然向左滚动回1&#xff0c;这是难点。因此实际…

【接口优化方案解决】

文章目录 前言1、批量插入或者查询数据库2、异步思想 耗时操作&#xff0c;考虑放到异步3、空间换时间思想&#xff1a;恰当使用缓存。4. 预取思想&#xff1a;提前初始化到缓存5、借用线程池6. 事件回调思想&#xff1a;拒绝阻塞等待。7、锁粒度避免过粗8、切换存储方式&#…

考研数据结构:第八章 排序

文章目录 一、排序的基本概念二、插入排序2.1插入排序2.1.1算法思想2.1.2算法实现2.1.3算法效率分析2.1.4算法优化——折半插入排序 2.2希尔排序2.2.1算法思想2.2.2代码实现2.2.3算法性能分析 三、交换排序3.1冒泡排序3.1.1算法思想3.1.2代码实现3.1.3算法性能分析 3.2快速排序…

flutter对数组中某个数据二次加工成单独的数组

如何将数据[2,1,2,2,2,1,2,2,3,2,2,2,2,3,2,2,2,2,2,3,2,4,2,2,1,2,3,2,4,2]加工成 [[2], 1, [2, 2, 2], 1, [2, 2], 3, [2, 2, 2, 2], 3, [2, 2, 2, 2, 2], 3, [2], 4, [2, 2], 1, [2], 3, [2], 4, [2]]。这是实际工作中遇到的问题&#xff0c;UI要求将某一类型数据&#xff…

【无标题】idea 中 SpringBoot 点击运行没反应,按钮成灰色

问题描述 在使用 Spring Boot 开发项目时&#xff0c;可能会遇到一个问题&#xff1a;点击运行按钮后&#xff0c;控制台没有任何输出&#xff0c;项目界面也没有显示。这种情况可能是由多种原因导致的&#xff0c;本文将介绍一些常见的解决方法。 解决方法 首先看下Groovy插…

Nexus 如何配置 Python 的私有仓库

Nexus 可作为一个代理来使用。 针对一些网络环境不好的公司&#xff0c;可以通过配置 Nexus 来作为远程的代理。 Group 概念 Nexus 有一个 Group 的概念&#xff0c;我们可以认为一个 Nexus 仓库的 Group 就是很多不同的仓库的集合。 从下面的配置中我们可以看到&#xff0…