Node.js Stream.pipeline() Method

news2025/1/23 4:54:18

Why Stream.pipeline

通过流我们可以将一大块数据拆分为一小部分一点一点的流动起来,而无需一次性全部读入,在 Linux 下我们可以通过 | 符号实现,类似的在 Nodejs 的 Stream 模块中同样也为我们提供了 pipe() 方法来实现。

未使用 Stream pipe 情况

在 Nodejs 中 I/O 操作都是异步的,先用 util 模块的 promisify 方法将 fs.readFile 的 callback 形式转为 Promise 形式,这块代码看似没问题,但是它的体验不是很好,因为它是将数据一次性读入内存再进行的返回,当数据文件很大的时候也是对内存的一种消耗,因此不推荐它。

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx => {
  try {
    ctx.body = await readFile(resolve(__dirname, 'test.json'));
  } catch(err) { ctx.body = err };
});

app.listen(3000);

使用 Stream pipe 情况

下面,再看看怎么通过 Stream 的方式在 Koa 框架中响应数据

...
app.use(async ctx => {
  try {
    const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
    ctx.body = readable;
  } catch(err) { ctx.body = err };
});

以上在 Koa 中直接创建一个可读流赋值给 ctx.body 就可以了,你可能疑惑了为什么没有 pipe 方法,因为框架给你封装好了,不要被表象所迷惑了,看下相关源码:

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {
  ...
  let body = ctx.body;
  if (body instanceof Stream) return body.pipe(res);
  ...
}

没有神奇之处,框架在返回的时候做了层判断,因为 res 是一个可写流对象,如果 body 也是一个 Stream 对象(此时的 Body 是一个可读流),则使用 body.pipe(res) 以流的方式进行响应。

使用 Stream VS 不使用 Stream

动图封面

动图封面

What Stream.pipeline function

The stream.pipeline() method is a module method that is used to the pipe by linking the streams passing on errors and accurately cleaning up and providing a callback function when the pipeline is done. 

Stream.pipeline() 方法是一种模块方法,用于通过链接传递错误的流并在管道完成时准确地清理并提供回调函数来用于管道。

Syntax:

stream.pipeline(...streams, callback)

Parameters: 

This method accepts two parameters as mentioned above and described below.该方法接受如上所述和如下所述的两个参数。

  • …streams: These are two or more streams that are to be piped together.这些是要通过管道连接在一起的两个或多个流。
  • callback: This function is called when the pipeline is fully done and it shows an ‘error’ if the pipeline is not accomplished.当管道完全完成时调用此函数,如果管道未完成,则会显示“错误”。

Return Value:

 It returns a cleanup function. 返回值:它返回一个清理函数。

The below examples illustrate the use of the stream.pipeline() method in Node.js: 

以下示例说明了 Node.js 中的

stream.pipeline() 方法的用法

Example 1: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            transform,

            writable

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output:

Promise {  }
pipeline accomplished.

Example 2: 

// Node.js program to demonstrate the   

// stream.pipeline() method

// Including fs and zlib module

const fs = require('fs');

const zlib = require('zlib');

// Constructing finished from stream

const { pipeline } = require('stream');

// Constructing promisify from

// util

const { promisify } = require('util');

// Defining pipelineAsync method

const pipelineAsync = promisify(pipeline);

// Constructing readable stream

const readable = fs.createReadStream("input.text");

// Constructing writable stream

const writable = fs.createWriteStream("output.text");

// Creating transform stream

const transform = zlib.createGzip();

// Async function

(async function run() {

    try {

        // pipelining three streams

        await pipelineAsync(

            readable,

            writable,

            transform

        );

        console.log("pipeline accomplished.");

    }

    // Shows error

    catch (err) {

        console.error('pipeline failed with error:', err);

    }

})();

Output: Here, the order of streams is not proper while piping so an error occurs.

Promise {  }
pipeline failed with error: Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable
    at WriteStream.Writable.pipe (_stream_writable.js:243:24)
    at pipe (internal/streams/pipeline.js:57:15)
    at Array.reduce ()
    at pipeline (internal/streams/pipeline.js:88:18)
    at Promise (internal/util.js:274:30)
    at new Promise ()
    at pipeline (internal/util.js:273:12)
    at run (/home/runner/ThirstyTimelyKey/index.js:33:11)
    at /home/runner/ThirstyTimelyKey/index.js:45:5
    at Script.runInContext (vm.js:133:20)

解析Stream.PipeLine

在应用层我们调用了 fs.createReadStream() 这个方法,顺藤摸瓜找到这个方法创建的可读流对象的 pipe 方法实现,以下仅列举核心代码实现,基于 Nodejs v12.x 源码。

2.1.1 /lib/fs.js

导出一个 createReadStream 方法,在这个方法里面创建了一个 ReadStream 可读流对象,且 ReadStream 来自 internal/fs/streams 文件,继续向下找。

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
// 懒加载,主要在用到的时候用来实例化 ReadStream、WriteStream ... 等对象
function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options); // 创建一个可读流
}

module.exports = fs = {
  createReadStream, // 导出 createReadStream 方法
  ...
}

2.1.2 /lib/internal/fs/streams.js

这个方法里定义了构造函数 ReadStream,且在原型上定义了 open、_read、_destroy 等方法,并没有我们要找的 pipe 方法。

但是呢通过 ObjectSetPrototypeOf 方法实现了继承,ReadStream 继承了 Readable 在原型中定义的函数,接下来继续查找 Readable 的实现

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');

function ReadStream(path, options) {
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  ...
  Readable.call(this, options);
  ...
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() { ... };

ReadStream.prototype._read = function(n) { ... };;

ReadStream.prototype._destroy = function(err, cb) { ... };
...

module.exports = {
  ReadStream,
  WriteStream
};

2.1.3 /lib/stream.js

在 stream.js 的实现中,有条注释:在 Readable/Writable/Duplex/... 之前导入 Stream,原因是为了避免 cross-reference(require),为什么会这样?

第一步 stream.js 这里将 require('internal/streams/legacy') 导出复制给了 Stream。

在之后的 _stream_readable、Writable、Duplex ... 模块也会反过来引用 stream.js 文件,具体实现下面会看到。

Stream 导入了 internal/streams/legacy

上面 /lib/internal/fs/streams.js 文件从 stream 模块获取了一个 Readable 对象,就是下面的 Stream.Readable 的定义。

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
...

2.1.4 /lib/internal/streams/legacy.js

上面的 Stream 等于 internal/streams/legacy,首先继承了 Events 模块,之后呢在原型上定义了 pipe 方法,刚开始看到这里的时候以为实现是在这里了,但后来看 _stream_readable 的实现之后,发现 _stream_readable 继承了 Stream 之后自己又重新实现了 pipe 方法,那么疑问来了这个模块的 pipe 方法是干嘛的?什么时候会被用?翻译文件名 “legacy=遗留”?有点没太理解,难道是遗留了?有清楚的大佬可以指点下,也欢迎在公众号 “Nodejs技术栈” 后台加我微信一块讨论下!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {
  ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
  ...
};

module.exports = Stream;

2.1.5 /lib/_stream_readable.js

在 _stream_readable.js 的实现里面定义了 Readable 构造函数,且继承于 Stream,这个 Stream 正是我们上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加载了 internal/streams/legacy 文件且重写了里面定义的 pipe 方法。

经过上面一系列的分析,终于找到可读流的 pipe 在哪里,同时也更进一步的认识到了在创建一个可读流时的执行调用过程,下面将重点来看这个方法的实现。

module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...
  Stream.call(this, options); // 继承自 Stream 构造函数的定义
}
...

2.2 _stream_readable 实现分析

2.2.1 声明构造函数 Readable

声明构造函数 Readable 继承 Stream 的构造函数和原型。

Stream 是 /lib/stream.js 文件,上面分析了,这个文件继承了 events 事件,此时也就拥有了 events 在原型中定义的属性,例如 on、emit 等方法。

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...

  Stream.call(this, options);
}

2.2.2 声明 pipe 方法,订阅 data 事件

在 Stream 的原型上声明 pipe 方法,订阅 data 事件,src 为可读流对象,dest 为可写流对象。

我们在使用 pipe 方法的时候也是监听的 data 事件,一边读取数据一边写入数据。

看下 ondata() 方法里的几个核心实现:

  • dest.write(chunk):接收 chunk 写入数据,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true,否则返回 false 时应该停止向流写入数据,直到 'drain' 事件被触发
  • src.pause():可读流会停止 data 事件,意味着此时暂停数据写入了。

之所以调用 src.pause() 是为了防止读入数据过快来不及写入,什么时候知道来不及写入呢,要看 dest.write(chunk) 什么时候返回 false,是根据创建流时传的 highWaterMark 属性,默认为 16384 (16kb),对象模式的流默认为 16。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      src.pause();
    }
  }
  ...
};

2.2.3 订阅 drain 事件,继续流动数据

上面提到在 data 事件里,如果调用 dest.write(chunk) 返回 false,就会调用 src.pause() 停止数据流动,什么时候再次开启呢?

如果说可以继续写入事件到流时会触发 drain 事件,也是在 dest.write(chunk) 等于 false 时,如果 ondrain 不存在则注册 drain 事件。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source.  This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src);
        dest.on('drain', ondrain);
      }
      src.pause();
    }
  }
  ...
};

// 当可写入流 dest 耗尽时,它将会在可读流对象 source 上减少 awaitDrain 计数器
// 为了确保所有需要缓冲的写入都完成,即 state.awaitDrain === 0 和 src 可读流上的 data 事件存在,切换流到流动模式
function pipeOnDrain(src) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

// stream.read() 从内部缓冲拉取并返回数据。如果没有可读的数据,则返回 null。在可读流上 src 还有一个 readable 属性,如果可以安全地调用 readable.read(),则为 true
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

2.2.4 触发 data 事件

调用 readable 的 resume() 方法,触发可读流的 'data' 事件,进入流动模式。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  // Start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }
  ...

然后实例上的 resume(Readable 原型上定义的)会在调用 resume() 方法,在该方法内部又调用了 resume_(),最终执行了 stream.read(0) 读取了一次空数据(size 设置的为 0),将会触发实例上的 _read() 方法,之后会在触发 data 事件。

function resume(stream, state) {
  ...
  process.nextTick(resume_, stream, state);
}

function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  ...
}

2.2.5 订阅 end 事件

end 事件:当可读流中没有数据可供消费时触发,调用 onend 函数,执行 dest.end() 方法,表明已没有数据要被写入可写流,进行关闭(关闭可写流的 fd),之后再调用 stream.write() 会导致错误。

Readable.prototype.pipe = function(dest, options) {
  ...
  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
              dest !== process.stdout &&
              dest !== process.stderr;

  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  ...

  function onend() {
    debug('onend');
    dest.end();
  }
}

2.2.6 触发 pipe 事件

在 pipe 方法里面最后还会触发一个 pipe 事件,传入可读流对象

Readable.prototype.pipe = function(dest, options) {
  ...
  const source = this;
  dest.emit('pipe', src);
  ...
};

在应用层使用的时候可以在可写流上订阅 pipe 事件,做一些判断,具体可参考官网给的这个示例 stream_event_pipe[1]

2.2.7 支持链式调用

最后返回 dest,支持类似 unix 的用法:A.pipe(B).pipe(C)

Readable.prototype.pipe = function(dest, options) {
  return dest;
};

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1406591.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解决 ssh: connect to host github.com port 22: Connection timed out

问题 今天使用git克隆github上的代码时,一直报错 原以为是公钥过期了,就尝试修改配置公钥,但是尝试了几次都不行,最终在博客上找到了解决方案,在次记录一下,以备不时之需 解决ssh-connect-to-host-github…

springboot整合MongoDB实战

目录 环境准备 引入依赖 配置yml 注入mongoTemplate 集合操作 文档操作 创建实体 添加文档 查询文档 更新文档 删除文档 环境准备 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-da…

Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

前言 今天一天争取搞完最后这一部分&#xff0c;学完赶紧把 Kafka 和 Flume 学完&#xff0c;就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作&#xff0c;太牛马了。 1、常用 Connector 读写 之前我们已经用过了一些简单的内置连接器&#x…

机器学习预测全家桶之单变量输入多步预测,天气温度预测为例,MATLAB代码

截止到本期&#xff0c;一共发了8篇关于机器学习预测全家桶的文章。参考文章如下&#xff1a; 1.五花八门的机器学习预测&#xff1f;一篇搞定不行吗&#xff1f; 2.机器学习预测全家桶&#xff0c;多步预测之BiGRU、BiLSTM、GRU、LSTM&#xff0c;LSSVM、TCN、CNN&#xff0c;…

怎么快速发表一篇EI会议论文?有什么要注意的?

都说EI会议论文的发表相对简单一些&#xff0c;但因为EI会议论文的含金量也挺高&#xff0c;因此很多国内外作者都喜爱在EI上投稿论文&#xff0c;那么怎么在国际ei会议发表会议论文呢? 这和国内发表论文都是差不多的&#xff0c;要选择合适的会议&#xff0c;按照会议要求整…

web安全学习笔记【06】——http\https抓包

思维导图放最后 #知识点&#xff1a; 1、Web常规-系统&中间件&数据库&源码等 2、Web其他-前后端&软件&Docker&分配站等 3、Web拓展-CDN&WAF&OSS&反向&负载均衡等 ----------------------------------- 1、APP架构-封装&原生态&…

Textual Inversion、DreamBooth、LoRA、InstantID:从低成本进化到零成本实现IP专属的AI绘画模型

2023年7月份国内有一款定制写真AI工具爆火。一款名为妙鸭相机的AI写真小程序&#xff0c;成功在C端消费者群体中出圈&#xff0c;并在微信、微博和小红书等平台迅速走红&#xff0c;小红书上的话题Tag获得了330多万的浏览量&#xff0c;相关微信指数飙升到了1800万以上。 其他…

【RT-DETR有效改进】2023.12月份最新成果TransNeXt像素聚焦注意力主干(全网首发)

前言 大家好&#xff0c;我是Snu77&#xff0c;这里是RT-DETR有效涨点专栏。 本专栏的内容为根据ultralytics版本的RT-DETR进行改进&#xff0c;内容持续更新&#xff0c;每周更新文章数量3-10篇。 专栏以ResNet18、ResNet50为基础修改版本&#xff0c;同时修改内容也支持Re…

检查字符串数组中的每个字符串是否全为“不显示元素”(如空格、制表符、换行符等)numpy.char.isspace()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 检查字符串数组中的每个字符串 是否全为“不显示元素” &#xff08;如空格、制表符、换行符等&#xff09; numpy.char.isspace() [太阳]选择题 请问以下代码最终输出结果是&#xff1f; i…

RabbitMQ中交换机的应用及原理,案例的实现

目录 一、介绍 1. 概述 2. 作用及优势 3. 工作原理 二、交换机Exchange 1. Direct 2. Topic 3. Fanout 三、代码案例 消费者代码 1. 直连direct 生产者代码 测试 2. 主题topic 生产者代码 测试 3. 扇形fanout 生产者代码 测试 每篇一获 一、介绍 1. …

【前端小点】Vue3中的IP输入框组件

本文章记录,如何在vue3项目开发中,使用ip输入框组件. 之前写过vue2版本的ip组件,为了更好的适应vue3,此次进行vue3代码重写 先上效果图: 禁用效果图: 主要是组件的开发,代码如下,可直接拷贝使用. 大概思路就是: 使用四个输入框拼接,然后给输入内容添加校验操作,添加光标移动,…

05 双向链表

目录 1.双向链表 2.实现 3.OJ题 4.链表和顺序表对比 1. 双向链表 前面写了单向链表&#xff0c;复习一下 无头单向非循环链表&#xff1a;结构简单&#xff0c;一般不会单独用来存数据。实际中更多作为其他数据结构的子结构&#xff0c;如哈希桶、图的邻接等。另外这种结构在…

你知道Mysql的架构吗?

msyql分为server曾和存储引擎层 server层包括了连接器(管理连接&#xff0c;权限验证)、查询缓存&#xff08;命中直接返回结果&#xff09;、分析器&#xff08;词法分析&#xff0c;语法分析&#xff09;、优化器&#xff08;执行计划生成&#xff0c;索引选择&#xff09;、…

浪花 - 查询队伍列表

一、接口设计 1. 请求参数&#xff1a;封装 TeamQuery package com.example.usercenter.model.dto;import com.example.usercenter.common.PageRequest; import lombok.Data;/*** author 乐小鑫* version 1.0* Date 2024-01-22-20:14*/ Data public class TeamQuery extends …

使用Unity创建VisionPro应用

1、下载特定Unity版本 Unity账号需要是Pro账号,普通账号不行,目前只支持这1个Unity版本,不要下载任何其它版本:unityhub://2022.3.11f1/d00248457e15) 其它条件:使用Mac电脑M系列芯片,XCode15 Beta2及以上 参考资料: 苹果官网:苹果官网 Unity官网:Unity官网 官方教程…

C#,生成图片的指定尺寸缩略图的源代码

编程的时候经常用到图像的缩略图。 本文发布一个用于生成指定尺寸的缩略图的简单方法。 1 文本格式 private void button1_Click(object sender, EventArgs e) { CreateThumbnail("demo.jpg", "demo_thumb.jpg", 128, 128); } private void CreateTh…

MySQL函数—日期函数

MySQL函数—日期函数 函数功能CURDATE()返回当前日期&#xff0c;只有年月日CURTIME()返回当前时间&#xff0c;只有时分秒NOW()返回当前日期和时间 年月日时分秒YEAR(date)获取指定date的年份MONTH(date)获取指定date的月份DAY(date)获取指定date的日期DATE_ADD(date,INTERVAL…

项目解决方案: 视频融合(实时监控视频和三维建模进行融合)设计方案

目 录 一、需求描述 1、视频接入和控制要求 2、视频播放需求 3、提供其他应用的调用 二、方案设计 &#xff08;一&#xff09;系统设计图 &#xff08;二&#xff09;产品实现方案 三、产品和功能描述 &#xff08;一&#xff09;总体描述 &#xf…

2024问题汇总

2024问题汇总 Linux1.df-h / df -i 命令2.为多网卡Linux云服务器配置策略路由 Windows1.快速进入控制面板 网络连接指令 Linux 1.df-h / df -i 命令 df -h / df -i 都表示查看磁盘空间使用信息 如果遇到磁盘快满的情况&#xff0c;用这两个命令区别如下 df -h 是去删除比较大 …