Promise的并发控制 - 从普通并发池到动态并发池

news2024/11/27 22:25:27

一、场景

        给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。

        这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制 

二、普通并发池的实现

        主要思路就是,判断当前队列是否满,满则等待,有空闲则补齐。

        利用 Promise.race 方法,可以判断一个Promise数组中 “谁最先完成” ,从而让等待中的函数开始运行。

/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
 * @param taskList 任务列表
 * @param max 最大并发数量
 * @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条
 * @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
 */
export const promisePool = <T>(taskList: task<T>[], limit: number) => {
    return new Promise<T[]>(async (resolve, reject) => {
        try {
            const length = taskList.length
            /**当前并发池 */
            const pool: Promise<T>[] = []
            /**结果数组 */
            const res = new Array<T>(length)
            /**完成的数量 */
            let count = 0

            for (let i = 0; i < length; i++) {
                const task = taskList[i]();
                //promise结束的回调
                const handler = (info: T) => {
                    pool.splice(pool.indexOf(task), 1) //任务执行完就删除
                    res[i] = info //不能使用res.push,否则不能保证结果顺序
                    count++
                    if (count === length) {
                        resolve(res)
                    }
                }
                task.then((data) => {
                    handler(data)
                    console.log(`第${i}个任务完成,结果为`, data);
                }, (err) => {
                    handler(err)
                    console.log(`第${i}个任务失败,原因为`, err);
                })


                pool.push(task)

                //如果到达了并发限制,就等到池子中任意一个结束
                if (pool.length >= limit) {
                    await Promise.race(pool)
                }
            }
        } catch (error) {
            console.error('并发池出错', error);
            reject(error)
        }
    })
}

测试用例:


        /**创造一个1s后得到结果的Promise */
    const getTask = () => {
        return async () => {
            await new Promise((resolve) => setTimeout(resolve, 1000))
            return new Date()
        }
    }

//测试用例:
const testIt = async () => {
    const list = new Array(20).fill(0).map(() => getTask())
    const res = await promisePool(list, 5)
    console.log('res', res);
}
testIt()

打印结果:(观察控制台,可以发现是五个五个出现的)

三、让并发池可中断

        好,现在来了个新要求,用户点击了取消按钮后,你需要中断继续往并发池添加任务。 (常见场景:分片上传时,用户点击取消上传按钮)

        问题的关键核心就是,如何从外部 让内部的循环终止。 其实也很简单,设置一个变量,初始为false,当用户点击取消按钮时,变量变为true。在for循环中检测这个变量的值,为true就退出循环

        但是我们不能将这个变量设置为全局变量!否则如果有多处需要使用这个并发池,一处中断,全部遭殃。 在这里,我们就可以利用面向对象的思想,把这个变量作为对象内部的值,每个实例之间独立。“你终止你的,关我什么事? ” 

/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{
    /**是否取消。在循环中若发现这个变成了true,就会中断 */
    private isStop = false
    /**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
     * @param taskList 任务列表
     * @param max 最大并发数量
     * @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
     */
    run = async (taskList: task<T>[], max: number) => {
        return new Promise<Array<T | Err>>(async (resolve, reject) => {
            type resType = T | Err
            try {
                this.isStop = false //开始的时候设为false
                const length = taskList.length
                const pool: Promise<resType>[] = []//并发池 
                let count = 0//当前结束了几个
                const res = new Array<resType>(length)
                for (let i = 0; i < length; i++) {
                    let task = taskList[i]();
                    if (this.isStop) return reject('并发池终止')
                    //成功和失败都要执行的函数
                    const handler = (_res: resType) => {
                        pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务
                        res[i] = _res //放入结果数组
                        count++
                        if (count === length) {
                            return resolve(res)
                        }
                    }

                    task.then((data) => {
                        handler(data)
                        console.log(`第${i}个任务完成,结果为`, data);
                    }, (err) => {
                        handler(err)
                        console.log(`第${i}个任务失败,原因为`, err);
                    })

                    pool.push(task);

                    if (pool.length === max) {
                        //利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满
                        await Promise.race(pool)
                    }
                }

            } catch (error) {
                console.error('promise并发池出错', error);
                reject(error)
            }
        })
    }
    /**停止并发池运行 */
    stop = () => {
        this.isStop = true
    }
}

测试用例:

/**可终止的并发池测试用例 */
const promisePoolStaticTest = () => {
    const list = new Array(18).fill(0).map(() => getTask())
    const pool = new PromisePoolStatic()
    pool.run(list, 3).catch((err) => {
        console.log('可终止的并发池测试用例出错 -- ', err)
    })
    //18个任务,每个花费1s完成,并发数量为3,共需要6s完成
    //我们在第三秒的时候中断
    setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()

结果如下:

        可以看到第九个任务结束之后,并发池没有进入新的任务了。 但是为什么已经终止了,还有Promise完成的回调打印出来? 因为执行终止函数时,并发池内仍有三个函数在运行,而正在运行的Promise无法终止,所以只能阻止新任务进入并发池  (虽然无法终止Promise,但是可以终止Promise完成后的操作,这里不阐述)

四、动态并发池

        现在前面完成的操作,都是已经确定好了任务列表,才进行并发控制。如果我们需要动态添加任务的效果,如果队列没满就运行,队满则挂起等待,应该怎么做呢? (常见场景:全局axios请求并发控制

        主要思路: 队未满则直接运行,队满则加入等待队列。任务完成后,检查等待队列是否有任务


type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }


/**动态并发池 */
export class PromisePoolDynamic<T> {
    /**最大并发数量 */
    private limit: number;
    /**当前正在跑的数量 */
    private runningCount: number;
    /**等待队列 */
    private queue: Array<taskWithCallbacks<T>>;

    /**动态并发池 - 构造函数
     * @param maxConcurrency 最大并发数量
     */
    constructor(maxConcurrency: number) {
        this.limit = maxConcurrency;
        this.runningCount = 0;
        this.queue = [];
    }

    /**添加任务
     * @param task 任务,() => Promise<T>
     * @returns 结果
     */
    addTask(task: task<T>) {
        //返回一个新的Promise实例,在任务完成前,会一直是pending状态
        return new Promise<T>((resolve, reject) => {
            const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;
            if (this.runningCount < this.limit) {//并发数量没满则运行
                console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');
                this.runTask(taskWithCallbacks);
            } else {//并发数量满则加入等待队列
                console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');
                this.queue.push(taskWithCallbacks);
            }
        });
    }
    /**运行任务
     * @param taskWithCallback 带有resolve和reject的任务
     */
    private runTask(taskWithCallback: taskWithCallbacks<T>) {
        this.runningCount++;//当前并发数++
        taskWithCallback.task()//从对象中取出任务执行
            .then(result => {
                this.runningCount--;
                taskWithCallback.resolve(result);
                console.log('任务完成', result, '当前并发数', this.runningCount);
                this.checkQueue();
            })
            .catch(error => {
                this.runningCount--;
                taskWithCallback.reject(error);
                this.checkQueue();
            });
    }
    /**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */
    private checkQueue() {
        if (this.queue.length > 0 && this.runningCount < this.limit) {
            const nextTask = this.queue.shift()!;
            console.log('并发池出现空位,取出等待队列的任务', nextTask);
            this.runTask(nextTask);
        }
    }
}

测试用例:

/**动态并发池的测试用例 */
const promisePoolDynamicTest = () => {
    const promisePoolDynamic = new PromisePoolDynamic(3) //一个最大并发3的动态并发池
    //最大并发3,我一次性添加7个任务
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
    promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()

测试结果:

五、结语

        关于并发池就到这里了。除了利用Promise.race,其实还可以递归等方式,不过Promise.race是最简单也是最容易理解的。

        如果代码中有哪里出现的不对,欢迎指出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1168928.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【IDEA】在工具栏设置快速创建包和类的图表

页面效果&#xff1a; 操作步骤&#xff1a; 设置 --> 外观与行为 --> 菜单与工具栏 --> 点击 主工具栏 --> 点击 ---- --> 点击 号 --> 添加操作 主菜单 --> 文件 --> 文件打开操作 --> 打开项目操作 --> 新建 --> 往下找 找到 clas…

单行自动横向滚动——css实现

效果 封装组件 <template><div ref"container" class"scroll-area"><divref"content":class"[isScroll ? scroll : no-scroll]":style"{ color: fontColor }">{{ content }}</div></div> &…

c++值deque容器

1.deque容器介绍 deque 是 double-ended queue 的缩写&#xff0c;又称双端队列容器。deque容器支持从头部和尾部双端插入、删除数据。与vector容器不同的是&#xff0c;vector容器是一段连续的空间&#xff0c;而deque没有所谓容量的概念&#xff0c;因为它是动态的以分段连续…

Spring Boot 常见面试题

目录 1.Spring Boot 快速入门什么是 Spring Boot&#xff1f;有什么优点&#xff1f;Spring Boot 与 Spring MVC 有什么区别&#xff1f;Spring 与 Spring Boot 有什么关系&#xff1f;✨什么是 Spring Boot Starters?Spring Boot 支持哪些内嵌 Servlet 容器&#xff1f;如何设…

掌握RESTful API:规范与设计详解

前言 RAML (RESTful API Modeling Language) 和 OAS (OpenAPI Specification) 都是用于描述和定义 RESTful API 的规范。它们分别提供了不同的功能和优势。 RAML&#xff08;RESTful API Modeling Language&#xff09;&#xff1a; RAML简介 RAML&#xff08;RESTful API M…

CSC公派研究生项目|北语北外2024年寒假英语培训班正在招生

北京语言大学出国部、北京外国语大学出国部近期发布了2024年寒假“国家建设高水平大学公派研究生项目”英语培训的通知&#xff0c;知识人网小编特归纳整理&#xff0c;供有需求的同学参考。 北京语言大学 我部将于2024年寒假举办“国家建设高水平大学公派研究生项目”英语培训…

银行账单转换beancount

用了beancount来记账后&#xff0c;发现每月的账单手动记是一件极其麻烦的事情。 然后再github搜索一通后&#xff0c;有double-entry-generator&#xff08;https://github.com/deb-sig/double-entry-generator&#xff09;能转换支付宝/微信的账单&#xff0c;但是没有自己用…

基于STM32F412RET6的智能桶硬件设计

一、智能桶功能需求&#xff1a; 智能桶是一直采用Cortex-M4 嵌入式平台&#xff0c;搭载NB-IotTO通讯模组、智能称重采集、智能门锁监控以及温度监测等装置。主要功能如下&#xff1a; ▲ 具有GPS定位功能&#xff0c;可以通过后台APP实时定位智能桶的位置。 ▲ 具有温度监测功…

大厂面试题-Netty中Reactor模式的理解

Reactor其实是在NIO多路复用的基础上提出的一个高性能IO设计模式。 它的核心思想是把响应IO事件和业务处理进行分离&#xff0c;通过一个或者多个线程来处理IO事件。 然后把就绪的事件分发给业务线程进行异步处理。 Reactor模型有三个重要的组件&#xff1a; Reactor&#…

基于单片机的可穿戴个人健康监测仪-智能手环

收藏和点赞&#xff0c;您的关注是我创作的动力 文章目录 概要 一、方案的设计与论证2.1设计任务及要求2.2 模块技术和方法综述2.3 设计可能遇到的困难 二、 系统总体框架3.1 硬件设计 三 软件部分4.1 主程序流程框 四、 结论五、 文章目录 概要 近几年智能化的不断发展&#…

GEE——提取制定多波段影像的属性值(按照制定属性名称和属性值)输出格式为矢量格式

简介: 这里我们很多时候,需要提取制定影像,或者多波段影像制定区域的值,这里有一个问题是我们一般输出的结果仅仅是一个字典类型的对象,而我们不知道如何按照一个矢量输入,这里我们首先要做的就是进行多波段值在制定区域的提取,随后就是分别对其新的字典的键、值的设定…

Leetcode76最小覆盖子串

思路&#xff1a;滑动窗口思想 1. 滑动窗口是什么&#xff1a;用一个滑动窗口为覆盖目标子串的字符串 2.怎么移动窗口&#xff1a;当不满足覆盖时右指针移动扩大范围&#xff0c;当覆盖了就移动左指针缩减范围直到再次不覆盖 3. 怎么判断是否覆盖&#xff1a;这里使用两个哈…

Qt封装的Halcon显示控件,支持ROI绘制

前言 目前机器视觉ROI交互控件在C#上做的比较多&#xff0c;而Qt上做的比较少&#xff0c;根据作者 VSQtHalcon——显示图片&#xff0c;实现鼠标缩放、移动图片的文章&#xff0c;我在显示和移动控件的基础上&#xff0c;增加了ROI设置功能&#xff0c;并封装成了一个独立的Q…

记录一次normal diskgroup添加磁盘组操作

客户的一个磁盘组空间快满&#xff0c;需要添加一下磁盘&#xff0c;磁盘组的冗余模式为normal&#xff0c;本来觉得是一件不难的事情&#xff0c;在添加过程中还是遇到了一些问题。 本来为2个500G的磁盘组成的normal模式磁盘组&#xff0c;目前可用空间只剩下170G左右的空间&…

【多线程】线程池总结带你详细了解线程池

文章目录 线程池标准库中的线程池Executors 创建线程池的几种方式ThreadPoolExecutor创建线程池 模拟实现线程池 线程池 线程池是一种线程使用模式。线程过多会带来调度开销&#xff0c;进而影响缓存局部性和整体性能。而线程池维护着多个线程&#xff0c;等待着监督管理者分配…

基于单片机的自动感应门设计

博主主页&#xff1a;单片机辅导设计 博主简介&#xff1a;专注单片机技术领域和毕业设计项目。 主要内容&#xff1a;毕业设计、简历模板、学习资料、技术咨询。 文章目录 主要介绍一、自动感应门设计的功能概述二、系统总体方案2.1系统的总体计划2.2元器件的介绍2.2.1单片机的…

全局安装 vue-cli 报错 Error: EPERM: operation not permitted, open

原因&#xff1a;没有权限 解决方法&#xff1a;CMD 点击右键 以管理员身份运行。

IDEA创建Springboot多模块项目

一、创建父模块 File --> New --> Project &#xff0c;选择 “ Spring Initalizr ” &#xff0c;点击 Next Next Next --> Finish 二、创建子模块 右键根目录&#xff0c;New --> Module 选择 “ Spring Initializr ”&#xff0c;点击Next 此处注意T…

ubuntu20.04配置解压版mysql5.7

目录 1.创建mysql 用户组和用户2.下载 MySQL 5.7 解压版3.解压 MySQL 文件4.将 MySQL 移动到适当的目录5.更改mysql目录所属的用户组和用户&#xff0c;以及权限6.进入mysql/bin/目录&#xff0c;安装初始化7.编辑/etc/mysql/my.cnf配置文件8.启动 MySQL 服务&#xff1a;9.建立…