线程池多线程在项目中的实际应用

news2024/12/25 22:31:24

一.发短信

发短信的场景有很多,比如手机号+验证码登录注册,电影票买完之后会发送取票码,发货之后会有物流信息,支付之后银行发的付款信息,电力系统的电费预警信息等等

在这些业务场景中,有一个特征,那就是主业务可以和短信业务割裂,比如手机号+验证码登陆,当我们点击获取验证码的时候,会连接短信业务平台发送短信,但是发短信这个业务受到短信平台的影响,可能会存在一定时间的延时,但是我们不一定非要等短信平台返回之后,再给用户返回,我们可以先返回获取验证码成功的提升样式,将发短信的业务放入到另外一个线程中执行,用户晚一会收到短信对整体的业务流程也不会受到影响,反而提升了用户体验

代码演示:
1.在springboot项目中导入依赖:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
</dependency>

2.编写自定义线程池配置

package com.jeesite.modules.asysutils.juc.pool;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolConfig {
    @Bean("asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值  ThreadPoolTaskExecutor带返回值
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 设置核心线程数
        // 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
        executor.setCorePoolSize(10);
        // 设置最大线程数
        // 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
        executor.setMaxPoolSize(10);
        // 设置队列容量
        // new LinkedBlockingQueue<Runnable>();
        executor.setQueueCapacity(32);
        // 设置线程活跃时间(秒)
        // 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("async-thread-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        //new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
        //new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
        //new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
        //new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        // executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

3.线程池要执行的任务

package com.jeesite.modules.asysutils.juc.pool.sms;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class ThreadService {

//    @Autowired
//    private RedisTemplate<String,String> redisTemplate;

    @Async("asyncServiceExecutor")
    public void sendSMS(String phone , int code) {
//        boolean isSend = smsService.sendSms(phone,code);
//        if (isSend){ redisTemplate.opsForValue().set("LOGIN_"+phone,String.valueOf(code), Duration.ofMinutes(time));
//        }
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "发送短信成功:" + code);
    }
}

4.服务层调用

package com.jeesite.modules.asysutils.juc.pool.sms;

import jline.internal.Log;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SmsService {
    @Autowired
    private ThreadService threadService;

    public String sendSms(String phone) {
        /**
         * 1. 调用短信平台 发送短信  如果发送成功,将验证码存入redis,redis要有过期时间
         * 2. 发送成功,返回成功
         */
        //短信验证码 要生成
        int code = RandomUtils.nextInt(100000, 999999);
        Log.info("短信验证码:  ",code);
        //放入线程池执行,不影响当前的业务,立马返回
        threadService.sendSMS(phone,code);

        return "success";
    }
}

5.启动服务,控制层调用

package com.jeesite.modules.asysutils.juc.pool.sms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
@RequestMapping(value = "t/send")
public class SmsController{
    @Autowired
    private SmsService smsService;

    @RequestMapping(value = "msg")
    @ResponseBody
    public String sendSms(String phone) {
        return smsService.sendSms(phone);
    }
}

二.推送

比如有一个业务场景:
有一个审核业务,当收到数据之后,需要将这些数据发送给第三方的监管系统进行审核,数据量有百万之多,一条数据按照一秒计算,那摩需要经过百万秒,200多个小时才能处理完

解决:
考虑引入多线程进行并发操作,降低数据推送时间,提供数据推送的实时性
要注意的问题:

防止重复推送
可以考虑将数据切分成不同的数据段,每一个线程负责一个
失败处理
推送失败后,进行失败推送的数据记录,用额外的程序处理失败数据(补偿措施)
代码演示:

1.同样这里使用自定义线程池

package com.jeesite.modules.asysutils.juc.pool;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolConfig {
    @Bean("asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值  ThreadPoolTaskExecutor带返回值
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 设置核心线程数
        // 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
        executor.setCorePoolSize(10);
        // 设置最大线程数
        // 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
        executor.setMaxPoolSize(10);
        // 设置队列容量
        // new LinkedBlockingQueue<Runnable>();
        executor.setQueueCapacity(32);
        // 设置线程活跃时间(秒)
        // 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("async-thread-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        //new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
        //new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
        //new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
        //new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        // executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

2.测试类

  • 传统方式,不调用线程池
  • 调用线程池,无返回值(这里可以发现,虽然此方法可以达到异步执行的目的,但是我们并不知道线程执行的结果,有没有执行成功,因为这种方式有时候在企业中并不是最佳使用方式,下面介绍带有返回值的多线程)
  • 调用线程池,有返回值(使用多线程一次性执行多个不同任务并且获取任务执行结果)
package com.jeesite.modules.asysutils.juc.pool.push;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.TimeUnit;

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestPushService {


    @Autowired
    private PushService pushService;

    @Test
    public void testOldPush(){
        //传统方式
        pushService.oldPush();
    }

    @Test
    public void testNewPush(){
        //线程无返回值
        pushService.pushNew();
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testNewPushCall(){
        //线程有返回值
        pushService.pushNewCall();
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.推送消息

package com.jeesite.modules.asysutils.juc.pool.push;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@Service
public class PushService {

    @Autowired
    private ThreadPushService threadService;

    @Autowired
    private ThreadPoolTaskExecutor asyncServiceExecutor;

    public void oldPush(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量

        for (int i = 0 ; i < array.length;i++){
            //推送到第三方审核平台
            pushSend(array[i]);
        }

        long end = System.currentTimeMillis();

        System.out.println("需要时间:"+(end - start) + "ms");
    }

    public void pushNew(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量
        //假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
        for (int i = 0 ; i < 10;i++){
            int s = i * 1000;
            int e = i * 1000 + 1000 - 1;
            //推送到第三方审核平台
            //这个是假设 有10000条数据,那么每次推送处理1000条数据
            threadService.push(array,s,e);
        }

        long end = System.currentTimeMillis();

        System.out.println("需要时间:"+(end - start) + "ms");
    }

    public void pushNewCall(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量
        //假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
        List<Future> futureList = new ArrayList<>();
        for (int i = 0 ; i < 10;i++){
            int s = i * 1000;
            int e = i * 1000 + 1000 - 1;
            //推送到第三方审核平台
            //这个是假设 有10000条数据,那么每次推送处理1000条数据
			//无法使用配置的线程池,没有返回值,使用这种方式重写
            Future<Integer> submit = asyncServiceExecutor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return threadService.push1(array, s, e);
                }
            });
            //不能在这 直接得到返回值,因为会阻塞
//            System.out.println("本轮线程执行数量:" +submit.get());
            futureList.add(submit);
        }

        for (Future future : futureList) {
            try {
                System.out.println("本轮线程执行数量:" +future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();

        System.out.println("需要时间:"+(end - start) + "ms");
    }

    private void pushSend(int data) {
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.使用线程池来执行该任务

package com.jeesite.modules.asysutils.juc.pool.push;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class ThreadPushService {


    @Async("asyncServiceExecutor")
    public void push(int[] array, int start, int end){
        long s = System.currentTimeMillis();
        for (int i = start;i<=end;i++){
            pushSend(array[i]);
            //推送失败 可以记录日志
        }
        long e = System.currentTimeMillis();
        System.out.println((e-s)+"ms");
    }

    public int push1(int[] array, int start, int end){
        int count = 0;
        long s = System.currentTimeMillis();
        for (int i = start;i<=end;i++){
            count++;
            pushSend(array[i]);
            //推送失败 可以记录日志
        }
        long e = System.currentTimeMillis();
        System.out.println((e-s)+"ms");
        return count;
    }

    public void pushSend(int dataNum){
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5、启动测试,可以看到三种方式的效果对比!!!

  • 传统方式需要时长:
    在这里插入图片描述

  • 线程无返回值:
    在这里插入图片描述

  • 线程有返回值:
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1615982.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 网络编程项目--简易ftp

主要代码 config.h #define LS 0 #define GET 1 #define PWD 2#define IFGO 3#define LCD 4 #define LLS 5 #define CD 6 #define PUT 7#define QUIT 8 #define DOFILE 9struct Msg {int type;char data[1024];char secondBuf[128]; }; 服务器: #i…

231 基于matlab的北斗信号数据解析

基于matlab的北斗信号数据解析&#xff0c;多通道和单通道接收到的北斗信号数据&#xff0c;利用接收到的北斗数据&#xff08;.dat .txt文件&#xff09;&#xff0c;进行解析&#xff0c;得到初始伪距&#xff0c;平滑伪距&#xff0c;载波相位&#xff0c;并计算其标准差&am…

洛谷 -P1007 独木桥(模拟,思维)

独木桥 题目背景 战争已经进入到紧要时间。你是运输小队长&#xff0c;正在率领运输部队向前线运送物资。运输任务像做题一样的无聊。你希望找些刺激&#xff0c;于是命令你的士兵们到前方的一座独木桥上欣赏风景&#xff0c;而你留在桥下欣赏士兵们。士兵们十分愤怒&#xf…

双线性插值计算手动实现以及原理

双线性插值计算手动实现以及原理 代码原理 代码 先贴代码吧&#xff0c;原理其实也比较简单&#xff0c;看代码基本也就理解了&#xff0c;时间太晚了&#xff0c;原理后续再补吧。 import torch from torch.nn import functional as F import numpy as np from itertools im…

苍穹外卖开发笔记(6.缓存商品,购物车)

目录 一、缓存商品2、缓存菜品&#xff08;redis&#xff09;1.问题说明2.实现思路3.代码开发 2、缓存套餐&#xff08;spring cache&#xff09;1.实现思路2.代码实现 3、测试 二、购物车功能1、添加购物车1.需求分析设计2.代码开发3.测试 2、查看购物车1.需求分析设计2.代码开…

基于TSM模块的打架斗殴识别技术

目 录 1 引言.... 4 1.1 研究背景与意义.... 4 1.2 研究现状综述.... 5 1.3 研究内容.... 6 1.3.1 图像预处理的优化.... 6 1.3.2 TSM模块的应用.... 6 1.3.3 视频分类的设计与实现.... 6 2 关键技术与方法.... 8 2.1 TSM算法与模型选择.... 8 2.1.1 TSM算法原理.... 8 2.1.2 …

用python做傅里叶变换和系统辨识

一、原始信号 1、理想数据 &#xff08;1&#xff09;系统参数 参数类型数值J0.5 k g ∗ m 2 kg*m^2 kg∗m2K0.2b5 &#xff08;2&#xff09;激励曲线 import matplotlib.pyplot as plt import numpy as np# 生成数据 x np.linspace(0, 10, 1000) # 生成0到10之间的100…

下列程序定义了NxN的二维数组,并在主函数中自动赋值。请编写函数fun(int a[][N],int n),该函数的功能是:使数组右上半三角元素中的值乘以m。

本文收录于专栏:算法之翼 https://blog.csdn.net/weixin_52908342/category_10943144.html 订阅后本专栏全部文章可见。 本文含有题目的题干、解题思路、解题思路、解题代码、代码解析。本文分别包含C语言、C++、Java、Python四种语言的解法完整代码和详细的解析。 题干 下列…

从0到1:社区论坛小程序开发笔记

背景 论坛小程序&#xff1a;为用户提供了一个社交互动的平台&#xff0c;使用户可以分享经验、交流观点、解决问题&#xff0c;促进社区成员之间的互动和交流。 用户可以在论坛小程序上发布有关各种话题的帖子&#xff0c;分享自己的知识、经验和见解&#xff0c;帮助其他用户…

mysql基础14——视图

视图 视图是一种虚拟表 可以把一段查询语句作为视图存储在数据库中 需要的时候把视图看作一个表&#xff0c;对里面的数据进行查询 视图并没有真正存储数据 避免了数据存储过程中可能产生的冗余 提高了存储的效率 子查询 嵌套在另一个查询中的查询 派生表 如果在查询中…

【MySQL 数据宝典】【内存结构】- 003 Change Buffer 详解

一、 Change Buffer基本概念 Change Buffer&#xff1a;写缓冲区,是针对二级索引(辅助索引) 页的更新优化措施。 作用: 在进行DML操作时&#xff0c;如果请求的是 辅助索引&#xff08;非唯一键索引&#xff09;没有在缓冲池 中时&#xff0c;并不会立刻将磁盘页加载到缓冲池…

游戏AI智能体模仿学习技术方案揭秘(二)(附方案详情),沉浸式玩家体验秘诀,看《梦三国2》游戏AI智能体!

接上篇内容&#xff0c;小智发现内容非常受游戏开发者们的欢迎&#xff0c;今天给大家带来方案(二&#xff09;内容&#xff0c;没看过第一篇的伙伴可以戳以下链接查看~~码住&#xff01; 游戏AI智能体模仿学习技术方案&#xff08;附方案详情&#xff09;&#xff0c;沉浸式玩…

AQS(AbstractQueuedSynchronizer)队列同步器源码解读

&#x1f3f7;️个人主页&#xff1a;牵着猫散步的鼠鼠 &#x1f3f7;️系列专栏&#xff1a;Java全栈-专栏 &#x1f3f7;️个人学习笔记&#xff0c;若有缺误&#xff0c;欢迎评论区指正 目录 1. 前言 2. AOS、AQS、AQLS的区别 3. AQS的底层原理 3.1. 核心思想 3.2. 数…

PyQt介绍——动画使用详解之QPropertyAnimation

一、继承关系 PyQt5的动画框架是QAbstractAnimation&#xff0c;它是一个抽象类&#xff0c;不能直接使用&#xff0c;需要使用它的子类。它的类结构如下&#xff1a; QAbstractAnimation&#xff1a;抽象动画&#xff0c;是所有动画的基类&#xff0c;不能直接使用。 QVariant…

基于postCSS手写postcss-px-to-vewiport插件实现移动端适配

&#x1f31f;前言 目前前端实现移动端适配方案千千万&#xff0c;眼花缭乱各有有缺&#xff0c;但目前来说postcss-px-to-vewiport是一种非常合适的实现方案&#xff0c;postcss-px-to-vewiport是一个基于postCss开发的插件&#xff0c;其原理就是将项目中的px单位转换为vw(视…

【极速前进】20240422:预训练RHO-1、合成数据CodecLM、网页到HTML数据集、MLLM消融实验MM1、Branch-Train-Mix

一、RHO-1&#xff1a;不是所有的token都是必须的 论文地址&#xff1a;https://arxiv.org/pdf/2404.07965.pdf 1. 不是所有token均相等&#xff1a;token损失值的训练动态。 ​ 使用来自OpenWebMath的15B token来持续预训练Tinyllama-1B&#xff0c;每1B token保存一个che…

配置nodejs的俩小脚本

介绍&#xff1a;共两个脚本。 脚本1&#xff0c;用来配置环境变量&#xff0c;生成环境变量所需的配置信息&#xff0c;然后自己添加到系统环境变量里去 特别注意&#xff1a;该脚本需要放到nodejs目录下面&#xff0c;如果不是&#xff0c;则无法生成环境变量配置文本内容 另…

【STL概念】

STL STL&#xff08;Standard Template Library),即标准模板库从根本上说,STL是一些“容器”的集合,这些“容器”有list,vector,set,map等,STL也是算法和其他一些组件的集合。这里的“容器”和算法的集合指的是世界上很多聪明人很多年的杰作。STL的目的是标准化组件&#xff0…

找不到msvcp140dll,无法继续执行代码的详细解决方法

在我们日常使用计算机进行各类工作任务的过程中&#xff0c;时常会遭遇一些突发的技术问题。比如&#xff0c;有时在运行某个重要程序或应用软件时&#xff0c;系统会突然弹出一个令人困扰的错误提示&#xff1a;“电脑提示找不到msvcp140.dll文件&#xff0c;因此无法继续执行…

Linux CPU 占用率 100% 排查

其他层面要考虑到的地方 mysql&#xff0c;有执行时间特别长的sql、死锁redis雪崩等相关问题并发导出数据量大Java定时器服务业务复杂&#xff0c;比如像每天要更新电商的统计表&#xff0c;每天发送优惠券等业务需要提前计算才能保证业务使用时的流畅性&#xff0c;我这个原因…