初探高并发—ExecutorCompletionService

news2025/1/15 19:10:24

初探高并发—ExecutorCompletionService

为什么要引入高并发

众所周知,程序中的代码是从下往下顺序执行的,当我们需要在一个方法中同时执行多个耗时的任务时所消耗时间就会大于等于这些任务消耗的累加时间。那么有没有一种办法可以让这些耗时的任务同时执行呢?这时候就需要并发编程,让这些任务在不同的线程上分别执行,达到理论上的同步执行效果。


这里引入一下并发、并行、高并发的概念:

并发:指的是在同一时间间隔内,多个任务交替执行的能力。在计算机中,同时存在多个进程或线程,每个进程或线程都在执行各自的任务,这就是并发。在并发执行中,每个任务都会在一定时间内执行一部分,然后切换到下一个任务,如此往复,直到所有任务完成。

并行:指的是在同一时间间隔内,多个任务同时执行的能力。在计算机中,同时存在多个进程或线程,每个进程或线程都在执行各自的任务,这就是并行。在并行执行中,每个任务都会在不同的处理器或者核心上同时执行,因此可以同时完成多个任务。

高并发:指的是在同一时间内处理的请求或者任务数量非常大。在互联网应用中,高并发是一个非常重要的概念,因为当用户访问量非常大时,服务器需要同时处理大量的请求,如果服务器不能很好地处理高并发请求,就会导致系统崩溃或者响应变慢。


ExecutorCompletionService分析

ExecutorCompletionService实现了CompletionService接口,CompletionService的方法有以下:

  • Future submit(Callable task):提交一个Callable类型任务,并返回该任务执行结果关联的Future;
  • Future submit(Runnable task,V result):提交一个Runnable类型任务,并返回该任务执行结果关联的Future;
  • Future take():从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
  • Future poll():从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;
  • Future poll(long timeout, TimeUnit unit):从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null;

结构如下:

请添加图片描述

点开ExecutorCompletionService的源码,结构如下:

请添加图片描述

其中包含了三个私有属性:executor、aes、completionQueue。

关于ExecutorCompletionService这两个构造方法,源码如下:

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

也就是说新建ExecutorCompletionService实例对象的时候,可以自行指定阻塞队列的类型


阻塞队列:在Java多线程编程中,阻塞队列是一种特殊的队列,它可以在队列为空时阻塞获取元素的线程,也可以在队列已满时阻塞插入元素的线程。这种队列通常用于实现生产者-消费者模式,其中生产者线程向队列中插入任务,消费者线程从队列中取出任务并执行。Java中提供了多种类型的阻塞队列,包括:

  1. ArrayBlockingQueue:基于数组实现的有界阻塞队列,按照先进先出的原则进行元素插入和移除。

  2. LinkedBlockingQueue:基于链表实现的可选有界阻塞队列,按照先进先出的原则进行元素插入和移除。

  3. PriorityBlockingQueue:基于优先级堆实现的无界阻塞队列,元素按照优先级顺序进行插入和移除。

  4. SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待一个相应的删除操作,反之亦然。

  5. DelayQueue:一个基于优先级堆实现的延迟阻塞队列,其中的元素只有在其指定的延迟时间到达后才能被取出。

这些阻塞队列都是线程安全的,可以在多线程环境下使用。不同的阻塞队列适用于不同的场景,可以根据自己的需求选择合适的队列。


如果新建实例对象时不指定阻塞队列类型,默认使用的是LinkedBlockingQueue。

ExecutorCompletionService优势

为什么要使用ExecutorCompletionService而不是直接使用线程池进行任务提交?

原因是如果我们将任务直接提交到线程池中,通过Futrue类的get()方法,会造成堵塞,需要先等执行任务1的线程结束返回结果,才会进行获取下一个任务的执行的结果。而使用ExecutorCompletionService则不会有这样的情况,在ExecutorCompletionService内部维护了一个阻塞队列,提交的任务,先执行完的先进入队列,所以你通过 poll 或 take 获得的肯定是最先执行完的任务结果。

但是在实际生产中,如果说我们不需要返回结果的时候,可以自行选择,毕竟适合自己的才是最好的。

ExecutorCompletionService实操

这里直接上代码:

/**
 * 获取用户信息sleep时间:1000ms
 * 获取家庭信息sleep时间:3000ms
 * 获取学校信息sleep时间:5000ms
 */
@SpringBootTest
class SpringbootParallelApplicationTests {

    @Autowired()
    private ClassService classService;

    @Autowired()
    private FamilyService familyService;

    @Autowired()
    private UserService userService;

    void normal() {
        StopWatch watch = new StopWatch();
        watch.start();
        UserInfoDTO userInfo = userService.getUserInfo("user01");
        ClassDTO classInfo = classService.getClassDTO("class01");
        FamilyDTO familyInfo = familyService.getFamilyInfo("family01");
        watch.stop();
        long millis = watch.getTotalTimeMillis();
        System.out.println("=====================普通调用===========================");
        System.out.println("用户信息:" + userInfo.toString());
        System.out.println("班级信息:" + classInfo.toString());
        System.out.println("家庭信息:" + familyInfo.toString());
        System.out.println("程序耗时:" + millis + "毫秒");
    }

    void executor() throws ExecutionException, InterruptedException {
        // 计时器
        StopWatch watch = new StopWatch();
        watch.start();
        // 新建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 提交线程查询用户信息
        Future<BaseRspDTO<Object>> submit1 = executor.submit(() -> {
            UserInfoDTO userInfo = userService.getUserInfo("user01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("userInfo");
            dto.setData(userInfo);
            return dto;
        });
        BaseRspDTO<Object> rspDTO1 = submit1.get();
        UserInfoDTO user = (UserInfoDTO) rspDTO1.getData();
        // 提交线程查询班级信息
        Future<BaseRspDTO<Object>> submit2 = executor.submit(() -> {
            ClassDTO classInfo = classService.getClassDTO("class01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("classInfo");
            dto.setData(classInfo);
            return dto;
        });
        BaseRspDTO<Object> rspDTO2 = submit2.get();
        ClassDTO classDto = (ClassDTO) rspDTO2.getData();
        // 提交线程查询学校信息
        Future<BaseRspDTO<Object>> submit3 = executor.submit(() -> {
            FamilyDTO familyInfo = familyService.getFamilyInfo("family01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("familyInfo");
            dto.setData(familyInfo);
            return dto;
        });
        BaseRspDTO<Object> rspDTO3 = submit3.get();
        FamilyDTO family = (FamilyDTO) rspDTO3.getData();
        watch.stop();
        // 获取耗时
        long millis = watch.getTotalTimeMillis();
        System.out.println("=====================线程池调用===========================");
        System.out.println("用户信息:" + user.toString());
        System.out.println("班级信息:" + classDto.toString());
        System.out.println("家庭信息:" + family.toString());
        System.out.println("程序耗时:" + millis + "毫秒");
    }

    void executorWithOnly() {
        // 计时器
        StopWatch watch = new StopWatch();
        watch.start();
        // 新建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 提交线程查询用户信息
        executor.submit(() -> {
            UserInfoDTO userInfo = userService.getUserInfo("user01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("userInfo");
            dto.setData(userInfo);
            return dto;
        });
        // 提交线程查询班级信息
        executor.submit(() -> {
            ClassDTO classInfo = classService.getClassDTO("class01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("classInfo");
            dto.setData(classInfo);
            return dto;
        });
        // 提交线程查询学校信息
        Future<BaseRspDTO<Object>> submit = executor.submit(() -> {
            FamilyDTO familyInfo = familyService.getFamilyInfo("family01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("familyInfo");
            dto.setData(familyInfo);
            return dto;
        });
        System.out.println("=====================单个结果的线程池调用===========================");
        try {
            BaseRspDTO<Object> baseRspDTO = submit.get();
            FamilyDTO data = (FamilyDTO)baseRspDTO.getData();
            System.out.println("家庭信息:" + data.toString());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        watch.stop();
        // 获取耗时
        long millis = watch.getTotalTimeMillis();
        System.out.println("程序耗时:" + millis + "毫秒");
    }

    void special() {
        // 计时器
        StopWatch watch = new StopWatch();
        watch.start();
        // 新建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 新建ExecutorCompletionService实例
        ExecutorCompletionService<BaseRspDTO> service = new ExecutorCompletionService<>(executor);
        // 提交线程查询用户信息
        service.submit(() -> {
            UserInfoDTO userInfo = userService.getUserInfo("user01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("userInfo");
            dto.setData(userInfo);
            return dto;
        });
        // 提交线程查询班级信息
        service.submit(() -> {
            ClassDTO classInfo = classService.getClassDTO("class01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("classInfo");
            dto.setData(classInfo);
            return dto;
        });
        // 提交线程查询学校信息
        service.submit(() -> {
            FamilyDTO familyInfo = familyService.getFamilyInfo("family01");
            BaseRspDTO<Object> dto = new BaseRspDTO<>();
            dto.setKey("familyInfo");
            dto.setData(familyInfo);
            return dto;
        });
        UserInfoDTO userInfo = new UserInfoDTO();
        ClassDTO classInfo = new ClassDTO();
        FamilyDTO familyInfo = new FamilyDTO();
        try {
            for (int i = 0; i < 3; i++) {
                // 获取ExecutorCompletionService内部中阻塞队列的已完成的Future
                Future<BaseRspDTO> poll = service.poll(1, TimeUnit.MINUTES);
                // 获取结果
                BaseRspDTO dto = poll.get();
                String key = dto.getKey();
                if ("userInfo".equals(key)) {
                    userInfo = (UserInfoDTO) dto.getData();
                } else if ("classInfo".equals(key)) {
                    classInfo = (ClassDTO) dto.getData();
                } else if ("familyInfo".equals(key)) {
                    familyInfo = (FamilyDTO) dto.getData();
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        watch.stop();
        // 获取耗时
        long millis = watch.getTotalTimeMillis();
        System.out.println("=====================通过ExecutorCompletionService调用===========================");
        System.out.println("用户信息:" + userInfo.toString());
        System.out.println("班级信息:" + classInfo.toString());
        System.out.println("家庭信息:" + familyInfo.toString());
        System.out.println("程序耗时:" + millis + "毫秒");
    }

    @Test
    void contextLoads() throws ExecutionException, InterruptedException {
        normal();
        executor();
        executorWithoutResult();
        special();
    }

}

结果

请添加图片描述

通过executor();executorWithOnly();这两个方法的耗时对比可以验证线程池get()会阻塞这个理论。

需要注意的问题

1、调用poll方法产生的空指针

调用限制时长的poll()方法时,需要合理的设定时间,否则会返回null,容易引发空指针问题

2、需要注意OOM

调用ExecutorCompletionService实例对象后,需要及时的进行take()或者poll()操作,否则执行的结果会不停的堆积在队列中,占用堆内存,最终导致oom

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/533170.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序入门04-后端脚手架搭建

我们上一篇已经介绍了权限系统的库表搭建&#xff0c;光有表还是不够的&#xff0c;我们还需要有一个后台系统和数据库进行交互。搭建后台的时候既需要选择使用什么语言&#xff0c;也需要选择框架。 框架分为前端框架和后端框架。在第一篇微信开发者工具搭建的时候我们其实前…

面试官:什么是防抖和节流?如何实现?应用场景?

防抖 与 节流 大厂面试题分享 面试题库 前后端面试题库 &#xff08;面试必备&#xff09; 推荐&#xff1a;★★★★★ 地址&#xff1a;前端面试题库 web前端面试题库 VS java后端面试题库大全 前言 防抖和节流作为很多大厂的经典面试题&#xff0c;问倒了许多小伙伴&a…

【Python-ESL】python-esl安装

pip install python-esl 时会报错&#xff1a; “error: command ‘swig’ failed with exit status 1” 报错原因是 因为 swig 软件未正确安装&#xff0c;当然对swig的版本也是有要求的&#xff0c;目前测试以下版本没有问题&#xff1a; swig3.0.63 python-ESL1.4.18(app-…

域名年龄查询工具-域名历史查询工具

批量域名历史查询工具 在近几年的网络营销中&#xff0c;老域名已经成为获取网站排名和SEO优化的重要途径。而对于购买这些老域名&#xff0c;了解域名的过往经历&#xff0c;可以帮助我们更好地评估域名的价值&#xff0c;并避免购买不良的域名。因此&#xff0c;今天我们将向…

微信小程序入门03-搭建权限系统,建库建表

我们准备零基础搭建一个小程序&#xff0c;小程序分为两部分&#xff0c;一个是用户访问的程序&#xff0c;可以是小程序也可以是H5。另外一个就是管理员使用的管理后台&#xff0c;后台第一个要实现的就是搭建权限系统。为了搭建权限系统&#xff0c;我们先需要梳理概念 1 RB…

Oracle自增序列探秘:一篇文章教你读懂

目录 1&#xff1a;什么是Oracle 自增长序列 2 &#xff1a;创建数据-->实现自增长序列 2.1 创建序列 2.2 使用序列 3 &#xff1a;查询数据-->实现自增长序列 1&#xff1a;什么是Oracle 自增长序列 Oracle自增长序列是一种生成唯一数字的方法&#xff0c;可以用于创…

NXP MCUXPresso - .h: No such file or directory

文章目录 NXP MCUXPresso - .h: No such file or directory概述备注END NXP MCUXPresso - .h: No such file or directory 概述 在尝试迁移 openpnp - Smoothieware project 从gcc命令行 MRI调试方式 到NXP MCUXpresso工程. 快摸进门了. 按照C工程编译的. 头文件路径都加好…

Wijmo 2023 Crack添加的一些改进

Wijmo 2023 Crack添加的一些改进 改进了对React 18的支持-增加了对Reack 18严格模式的支持&#xff0c;这有助于开发人员在开发过程中发现常见的错误。 可访问性改进-以下是本版本中添加的一些改进&#xff1a; 改进了FlexGridFilter弹出窗口&#xff0c;用于按条件和值进行筛选…

文本三剑客之——sed编辑器

sed编辑器 sed编辑器sed基础语法sed查询sed删除sed 替换sed 插入 sed编辑器 sed是文本处理工具&#xff0c;依赖于正则表达式&#xff0c;可以读取文本内容&#xff0c;工具指定条件对数据进行添加、删除、替换等操作&#xff0c;被广泛应用于shell脚本&#xff0c;以完成自动…

【交直流保护用HJZ-Y910静态中间继电器 性能稳定功耗小 JOSEF约瑟】

品牌&#xff1a;JOSEF约瑟&#xff0c;型号&#xff1a;HJZ-Y910&#xff0c;名称&#xff1a;静态中间继电器&#xff0c;额定电压&#xff1a;48220VDC&#xff1b;48415VAC&#xff0c;触点容量&#xff1a;250V/5A&#xff0c;功率消耗&#xff1a;≤5W&#xff0c;动作时…

Linux的常见指令(下)

常见指令以及权限理解&#xff08;下&#xff09; 基础指令的继续学习&#xff0c;本篇博客是对于Linux的大部分常见指令的学习和使用&#xff0c;指令的选项都是比较常用的&#xff0c;基本的复制移动&#xff0c;删除文件or目录&#xff0c;查看文件的三种方式cat、more、le…

区块链技术方向的就业前景

区块链技术是一个快速发展的领域&#xff0c;目前正在被越来越多的企业和组织广泛应用。区块链技术在金融、物流、医疗、社交媒体等众多领域都有着广泛的应用。因此&#xff0c;区块链技术方向的就业前景非常乐观。 区块链技术是新一代信息技术的重要组成部分&#xff0c;区块…

SpringBoot 发送邮件(四十三)

从头开始&#xff0c;并不意味着失败&#xff0c;相反&#xff0c;正是拥抱成功的第一步&#xff0c;即使还会继续失败 上一章简单介绍了 SpringBoot 整合 ES (四十二), 如果没有看过,请观看上一章 一. 发送邮件 关于发送邮件的功能和基础知识&#xff0c;老蝴蝶这儿就不重点…

NodeJs在Linux下使用的各种问题

环境:ubuntu16.04 ubuntu中安装NodeJs 通过apt-get命令安装后发现只能使用nodejs&#xff0c;而没有node命令 如果想避免这种情况请看下面连接的这种安装方式&#xff1a; 拓展见:Linux下Nodejs安装&#xff08;完整详细&#xff09; 如果想解决问题的话,输入下面的命令即可…

Sentinel-1的GRD和SLC数据的区别和联系

目录 01 前言 02 Sentinel1下载时的基本参数解释 2.1 卫星平台 2.2 产品类型 03 SLC数据和GRD数据的区别 3.1 处理过程的区别 3.2 处理操作的一点解释 3.2.1 为什么要做地形辐射校正&#xff1f; 3.2.2 多普勒地形校正和地形辐射校正的区别&#xff1f; 01 前言 最近…

【软考数据库】第十二章 事务管理

目录 12.1 事务的基本概念 12.2 数据库的并发控制 12.3 数据库的故障与恢复 12.3.1 事务故障 12.3.2 系统故障 12.3.3 介质故障 12.3.4 数据库备份 12.4 数据库的安全性与完整性 前言&#xff1a; 笔记来自《文老师软考数据库》教材精讲&#xff0c;精讲视频在b站&am…

B2B2C商城系统怎么挑选好?

B2B2C商城它不仅提供B2B模式下的批量交易&#xff0c;还为消费者提供了B2C模式的优质购物体验&#xff0c;因此&#xff0c;越来越多的企业或商家开始重视B2B2C商城系统的搭建&#xff0c;如目前的SHOP、Magento等商城系统。那么&#xff0c;如何挑选合适的B2B2C商城系统呢&…

接口自动化测试—如何实现多套环境的自动化测试?

在敏捷迭代的项目中&#xff0c;通常会将后台服务部署到多套测试环境。那么在进行接口自动化测试时&#xff0c;则需要将服务器的域名进行配置。使用一套接口测试脚本&#xff0c;通过切换域名地址配置&#xff0c;实现多套环境的自动化测试。 实战练习 分别准备两套测试环境…

ffmpeg合并多张图片为视频,加转场

需求是合并多个图片为视频&#xff0c;并在每个图片衔接处加入转场特效&#xff0c;第一种方式是用fade做转场&#xff0c;第二种方式是xfade做转场&#xff0c;xfade的转场特效更多&#xff0c;建议用这个。如果对你有帮助&#xff0c;点赞收藏。 第一种&#xff1a;直接用fa…

实验十 超市订单管理系统综合实验

实验十 超市订单管理系统综合实验 应粉丝要求&#xff0c;本博主帮助实现基本效果&#xff01; 未避免产生版权问题&#xff0c;本项目博主不公开源码&#xff0c;如果您遇到相关问题可私聊博主&#xff01; 一、实验目的及任务 通过该实验&#xff0c;掌握利用SSM框架进行系…