Node.JS多线程PromisePool之promise-pool库实现

news2025/1/21 7:17:01

什么是Promise Pool

Map-like, concurrent promise processing for Node.js.

Promise-Pool是一个用于管理并发请求的JavaScript库,它可以限制同时进行的请求数量,以避免过多的请求导致服务器压力过大。使用Promise-Pool可以方便地实现对多个异步操作的并发控制。

Promise Pool “承诺池” 包允许您批量运行许多承诺。

承诺池确保并发处理任务的最大数量。

承诺池中的每个任务都是其他任务,这意味着一旦一个任务完成,池就开始处理下一个任务。

此处理可确保了为您的任务进行最佳的批处理。

 

Promise Pool - NPMJS

@supercharge/promise-pool - npm (npmjs.com)icon-default.png?t=N7T8https://www.npmjs.com/package/@supercharge/promise-pool

Promise Pool - Document

Promise Poolicon-default.png?t=N7T8https://superchargejs.com/docs/3.x/promise-pool

 

怎么使用PromisePool

Install 安装

so easy , just install it

npm i @supercharge/promise-pool

Usage用例

Using the promise pool is pretty straightforward. The package exposes a class and you can create a promise pool instance using the fluent interface.

使用promise pool承诺池非常简单。该包公开了一个类,您可以使用流畅的接口创建一个承诺池实例。

Here’s an example using a concurrency of 2:

import { PromisePool } from '@supercharge/promise-pool'

const users = [
  { name: 'Marcus' },
  { name: 'Norman' },
  { name: 'Christian' }
]

const { results, errors } = await PromisePool
  .withConcurrency(2)
  .for(users)
  .process(async (userData, index, pool) => {
    const user = await User.createIfNotExisting(userData)

    return user
  })

The promise pool uses a default concurrency of 10

默认是十个线程,请按照自己的实际情况(业务+架构)处理

 

在以下示例中,我们创建了一个包含5个worker的线程池。然后,我们向线程池添加了10个任务。线程池会并发执行这些任务,但最多只能有5个任务同时运行。当一个任务完成时,线程池会自动分配下一个任务给空闲的worker。

const PromisePool = require('promise-pool');

// 创建一个包含5个worker的线程池
const pool = new PromisePool(5, (task) => {
  return new Promise((resolve, reject) => {
    // 模拟一个耗时操作
    setTimeout(() => {
      console.log('Task completed:', task);
      resolve();
    }, 1000);
  });
});

// 添加任务到线程池
for (let i = 0; i < 10; i++) {
  pool.addTask(i).then(() => {
    console.log('Task finished:', i);
  }).catch((err) => {
    console.error('Error:', err);
  });
}

//zhengkai.blog.csdn.net

Manually Stop the Pool 手工停止

You can stop the processing of a promise pool using the pool instance provided to the .process() and .handleError() methods. Here’s an example how you can stop an active promise pool from within the .process() method:

await PromisePool
  .for(users)
  .process(async (user, index, pool) => {
    if (condition) {
      return pool.stop()
    }

    // processes the `user` data
  })

You may also stop the pool from within the .handleError() method in case you need to:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .handleError(async (error, user, pool) => {
    if (error instanceof SomethingBadHappenedError) {
      return pool.stop()
    }

    // handle the given `error`
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Bring Your Own Error Handling

The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the .handleError(handler).

If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself.

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

承诺池允许自定义错误处理。

您可以通过使用.手柄错误(处理程序)实现错误处理程序来接管错误处理。

如果您提供了一个错误处理程序,则承诺池不会收集任何错误。

然后,您必须自己收集错误。

提供了一个自定义的错误处理程序,允许您通过抛出错误处理程序函数来提前退出承诺池。

抛出错误与Node.js错误处理使用异步/等待相一致。

import { PromisePool } from '@supercharge/promise-pool'

try {
  const errors = []

  const { results } = await PromisePool
    .for(users)
    .withConcurrency(4)
    .handleError(async (error, user) => {
      if (error instanceof ValidationError) {
        errors.push(error) // you must collect errors yourself
        return
      }

      if (error instanceof ThrottleError) { // Execute error handling on specific errors
        await retryUser(user)
        return
      }

      throw error // Uncaught errors will immediately stop PromisePool
    })
    .process(async data => {
      // the harder you work for something,
      // the greater you’ll feel when you achieve it
    })

  await handleCollected(errors) // this may throw

  return { results }
} catch (error) {
  await handleThrown(error)
}

Callback for Started and Finished Tasks 开始和结束任务的回调

You can use the onTaskStarted and onTaskFinished methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

您可以使用任务启动和任务完成的方法来连接到任务的处理中。

当任务启动/完成处理时,将调用为每个方法提供的回调:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted((item, pool) => {
    console.log(`Progress: ${pool.processedPercentage()}%`)
    console.log(`Active tasks: ${pool.processedItems().length}`)
    console.log(`Active tasks: ${pool.activeTasksCount()}`)
    console.log(`Finished tasks: ${pool.processedItems().length}`)
    console.log(`Finished tasks: ${pool.processedCount()}`)
  })
  .onTaskFinished((item, pool) => {
    // update a progress bar or something else :)
  })
  .process(async (user, index, pool) => {
    // processes the `user` data
  })
You can also chain multiple onTaskStarted and onTaskFinished handling (in case you want to separate some functionality):

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .onTaskStarted(() => {})
  .onTaskStarted(() => {})
  .onTaskFinished(() => {})
  .onTaskFinished(() => {})
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Task Timeouts 超时设置

有时,配置一个任务必须完成处理的超时时间是很有用的。

一个超时的任务被标记为失败。

您可以使用与任务超时(<毫秒>)方法来配置任务的超时:

Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the withTaskTimeout(<milliseconds>) method to configure a task’s timeout:

import { PromisePool } from '@supercharge/promise-pool'

await PromisePool
  .for(users)
  .withTaskTimeout(2000) // milliseconds
  .process(async (user, index, pool) => {
    // processes the `user` data
  })

Notice: a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.

注意:为每个任务配置了一个已配置的超时,而不是为整个池。

该示例为池中的每个任务配置一个2秒的超时。

Correspond Source Items and Their Results 正确响应每个请求

有时,您希望处理后的结果与源项保持一致。

结果项在结果数组中的位置应该与其相关的源项相同。

使用使用对应结果方法来应用此行为:

Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the results array as their related source items. Use the useCorrespondingResults method to apply this behavior:

import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'

const { results } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number, index) => {
    const value = number * 2

    return await setTimeout(10 - index, value)
  })

/**
 * source array: [1, 2, 3]
 * result array: [2, 4 ,6]
 * --> result values match the position of their source items
 */

For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index 0). The result for the second item from the source array is placed at the second position in the result array, and so on …

例如,您可能有三个要处理的项目。

使用相应的结果可以确保从源数组中得到的第一个项的处理结果位于结果数组中的第一个位置(=索引0)。

来自源数组的第二个项的结果被放置在结果数组中的第二个位置,以此类推。

Return Values When Using Corresponding Results 在使用相应的结果时,请返回相应的值

The results array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:

  • the actual value type: for results that successfully finished processing
  • Symbol('notRun'): for tasks that didn’t run
  • Symbol('failed'): for tasks that failed processing

The PromisePool exposes both symbols and you may access them using

  • Symbol('notRun'): exposed as PromisePool.notRun
  • Symbol('failed'): exposed as PromisePool.failed

处理后由承诺池返回的结果数组具有混合返回类型。

每个返回的项目都是以下类型之一:

实际值类型:对于成功完成处理的结果

符号(“notRun”):用于未运行的任务

符号(“failed”):用于处理失败的任务

承诺池公开了这两个符号,您可以使用

符号(“notRun”):公开为PromisePool.notRun

符号(“failed”):公开为PromisePool.failed

您可以对所有未运行或失败的任务重复处理:

You may repeat processing for all tasks that didn’t run or failed:

import { PromisePool } from '@supercharge/promise-pool'

const { results, errors } = await PromisePool
  .for([1, 2, 3])
  .withConcurrency(5)
  .useCorrespondingResults()
  .process(async (number) => {
    // …
  })

const itemsNotRun = results.filter(result => {
  return result === PromisePool.notRun
})

const failedItems = results.filter(result => {
  return result === PromisePool.failed
})

When using corresponding results, you need to go through the errors array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.

当使用相应的结果时,您需要自己检查错误数组。

默认的错误处理(收集错误)保持不变,您可以按照上面描述的错误处理部分进行操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1571600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于spring boot的漫画之家系统

基于spring boot的漫画之家系统设计与实现 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09; 数据库工具&#xff1a;Navicat11 开发软件&…

云计算面临的威胁

目录 一、概述 二、威胁建模分析 2.1 威胁建模的概念 2.2 威胁建模起到的作用 2.3 威胁建模的流程 2.3.1 威胁建模流程图 2.3.2 威胁建模流程内容 2.3.2.1 绘制数据流图 2.3.2.2 威胁识别与分析 2.3.2.2.1 STRIDE威胁分析方法论 2.3.2.3 制定消减措施 2.3.2.3.1 消减…

注解,自定义注解和元注解

1.注解 1.1.注解概述、作用 注解&#xff08;Annotation&#xff09;&#xff0c;也叫元数据。一种代码级别的说明。它是JDK1.5及以后版本引入的一个特性&#xff0c;与类、接口、枚举是在同一个层次。它可以声明在包、类、字段、方法、局部变量、方法参数等的前面&#xff0…

后端灰度发布

在软件开发中&#xff0c;"灰度"通常指的是渐进式地将新功能、更新或改进引入到生产环境中&#xff0c;但只对一小部分用户或流量进行部署和测试的过程。这种方法允许开发团队在生产环境中逐步测试新功能&#xff0c;以确保其稳定性、可靠性和用户体验&#xff0c;同…

每日一题(leetcode287):寻找重复数--二分查找+思维

思路&#xff1a;看官方解答 class Solution { public:int findDuplicate(vector<int>& nums) {int nnums.size();int left1;int rightn-1;int ans-1;while(left<right){int mid(leftright)/2;int count0;for(int j0;j<n;j){if(nums[j]<mid){count;}}if(co…

观察者模式 C++

&#x1f442; Honey Honey - 孙燕姿 - 单曲 - 网易云音乐 目录 &#x1f33c;前言 &#x1f33c;描述 &#x1f382;问题 &#x1f4aa;解决方案 &#x1f232;现实场景 代码 场景1 -- 报纸发行 场景 解释 代码 场景2 -- 气象资料发布 场景3 -- 过红绿灯 &#x…

Restful Web Service

Restful 1.特点 RESTful是一种架构风格&#xff0c;强调简单、轻量级和对资源的状态less操作。RESTful是通过HTTP协议进行通信的。RESTful的应用程序可以调用运行在不同服务器上的服务或函数。RESTful的接口通常使用JSON&#xff0c;但实际上它们都支持多种数据格式。RESTful…

短信群发中链接缩短、点击量统计工具

此文中介绍内容需要用到C1N短网址&#xff08;c1n.cn)。 短信营销在很多行业依然是重要的客户关系和用户增长手段。但是直接在短信中使用原始链接有以下两个问题: 1. 链接过长&#xff0c;短信计费字数增加&#xff0c;成本上升 原始链接往往会包含很多参数&#xff0c;长度比…

kubernetes有ingress-controler以及没有外部loadbalancer 的情况下使用istio-gateway.

那就配置一个ingress-使用已有ingress-controler代理istio-gateway class创建的gateway svc来公开。

MySQL常见锁探究

MySQL常见锁探究 1. 各种锁类型1.1 全局锁1.2 表级锁1.2.1 表锁1.2.2 元数据锁&#xff08;MDL&#xff09;1.2.3 意向锁1.2.4 AUTO-INC 锁 1.3 行级锁1.3.1 Record Lock1.3.2 Gap Lock1.3.3 Next-Key Lock 2. MySQL是如何加锁的&#xff1f;2.1 什么 SQL 语句会加行级锁&#…

微电网优化:基于​海象优化算法(Walrus Optimization Algorithm,WOA)​的微电网优化(提供MATLAB代码)

一、微电网优化模型 微电网是一个相对独立的本地化电力单元&#xff0c;用户现场的分布式发电可以支持用电需求。为此&#xff0c;您的微电网将接入、监控、预测和控制您本地的分布式能源系统&#xff0c;同时强化供电系统的弹性&#xff0c;保障您的用电更经济。您可以在连接…

RisingWave 在品高股份 Bingo IAM 中的应用

背景介绍 公司背景 品高股份&#xff0c;是国内专业的云计算及行业信息化服务提供商。公司成立于 2003 年&#xff0c;总部位于广州&#xff0c;下设多家子公司和分公司&#xff0c;目前员工总数近 900 人&#xff0c;其中 80 %以上是专业技术人员。 品高股份在 2008 年便开…

第20次修改了可删除可持久保存的前端html备忘录:重新布局

第20次修改了可删除可持久保存的前端html备忘录&#xff1a;重新布局 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"…

Tuxera2023 NTFS for Mac下载,安装和序列号激活

对于必须在Windows电脑和Mac电脑之间来回切换的Mac朋友来说&#xff0c;跨平台不兼容一直是一个巨大的障碍&#xff0c;尤其是当我们需要使用NTFS格式的硬盘在Windows和macOS之间共享文件时。因为Mac默认不支持写入NTFS磁盘。 为了解决这一问题&#xff0c;很多朋友会选择很便捷…

算法刷题应用知识补充--基础算法、数据结构篇

这里写目录标题 位运算&#xff08;均是拷贝运算&#xff0c;不会影响原数据&#xff0c;这点要注意&#xff09;&、|、^位运算特性细节知识补充对于n-1的理解异或来实现数字交换找到只出现一次的数据&#xff0c;其余数据出现偶数次 >> 、<<二进制中相邻的位的…

力扣---删除链表的倒数第 N 个结点

给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5]示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&#xff1a;[]示例 3&#xff1a…

idea 中 大于等于,不等于、小于等于等等这些符号发生了改变问题解决方法

1.问题描述 idea 中&#xff01;变为 ≠、 >变成了≥、<变成了 ≤ 等问题的解决办法 展示效果如下截图 解决方法

Vue3从入门到实战:掌握状态管理库pinia(下部分)

1.storeToRefs 在Count.vue文件中 显的冗余了&#xff0c;如何更加优雅简化代码。用storeToRefs 补充&#xff1a; 为什么不用ToRefs呢&#xff1f; 使用的话会将所有数据都用ref引用包裹&#xff0c;其实方法等是没必要包裹的&#xff0c;具有一定风险 2.getters的使用 …

SSM 项目学习(Vue3+ElementPlus+Axios+SSM)

文章目录 1 项目介绍1.1 项目功能/界面 2 项目基础环境搭建2.1 创建项目2.2 项目全局配置 web.xml2.3 SpringMVC 配置2.4 配置 Spring 和 MyBatis , 并完成整合2.5 创建表&#xff0c;使用逆向工程生成 Bean、XxxMapper 和 XxxMapper.xml2.6 注意事项和细节说明 3 实现功能 01-…

jvm基础三——类加载器

类加载器 在Java中&#xff0c;类加载器&#xff08;Class Loader&#xff09;是Java虚拟机&#xff08;JVM&#xff09;的一部分&#xff0c;负责将类文件&#xff08;.class文件&#xff09;加载到JVM中&#xff0c;使得程序能够使用这些类。类加载器在Java中具有重要的作用&…