高并发异步多线程处理例子

news2025/1/11 14:21:43

用户请求流程

在这里插入图片描述

问题点

  • tomcat 线程资源占满,由于tomcat线程资源有限,每个请求都会经由tomcat线程处理,阻塞至web层处理完才能回收再利用。
  • web层分发至后端服务可能会扩大几倍甚至数百倍的,譬如用户发起请求1w/s,到后端时可能就会10w~100w/s,这时后端压力剧增。

使用Servlet3.0异步请求优化tomcat线程使用

用户发起请求打到web层,tomcat从线程池拿出一个线程处理,这线程会调用web应用,web应用在处理请求的过程中,该线程会一直阻塞,web应用处理完毕才能再输出响应,最后才回收该线程。
Servlet 3.0中引入异步Servlet。原理是web应用启用一个子线程,而Tomcat线程立即返回,不再等待Web应用将请求处理完,这样Tomcat线程可以立即被回收到线程池,用来响应其他请求。

  • 直接上代码
    使用springboot做例子,pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.8</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.cy</groupId>
	<artifactId>thread-t</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>thread-t</name>
	<description>thread-t</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
<!--			<exclusions>-->
<!--				<exclusion>-->
<!--					<groupId>org.springframework.boot</groupId>-->
<!--					<artifactId>spring-boot-starter-tomcat</artifactId>-->
<!--				</exclusion>-->
<!--			</exclusions>-->
<!--		</dependency>-->
<!--		<dependency>-->
<!--			<groupId>org.springframework.boot</groupId>-->
<!--			<artifactId>spring-boot-starter-jetty</artifactId>-->
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

线程池配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig {

    @Bean
    public AsyncTaskExecutor asyncTaskExecutor() {
        log.info("init asyncTaskExecutor");
        int coreSize = Runtime.getRuntime().availableProcessors() * 2;
        int maximumPoolSize = coreSize * 2;
        int keepAliveTime = 10;
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(coreSize);
        executor.setMaxPoolSize(maximumPoolSize);
        executor.setQueueCapacity(100000);
        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix("async-");
        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        log.info("asyncServiceExecutor params----corePoolSize:{},maxPoolSize:{},keepAliveSeconds:{}" ,
                coreSize,maximumPoolSize,keepAliveTime);
        return executor;
    }
}

异步处理配置

import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.Resource;

@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Resource
    private AsyncTaskExecutor executor;

    /**
     * An Executor is required to handle java.util.concurrent.Callable return values.
     * Please, configure a TaskExecutor in the MVC config under "async support".
     * The SimpleAsyncTaskExecutor currently in use is not suitable under load.
     * <p>
     * 配置SpringMVC的支持
     */
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor(executor);
    }
}

在web层的异步写法,开启带返回结果的子线程来处理,tomcat线程可以立马回收

import com.cy.threadt.entity.MyResponse;
import com.cy.threadt.service.MyService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.Callable;

@RestController
@RequestMapping("/cy/")
public class MyController {
    @Resource
    private MyService myService;

    @GetMapping(value = "/get/{orderId}")
    public Callable<MyResponse> query(@PathVariable String orderId){
        Callable<MyResponse> callable = new Callable() {
            @Override
            public Object call() throws Exception {
                return myService.query(orderId);
            }
        };
        return callable;
    }
}

至此,压力给到后端服务,后端服务可能是各种第三方、远程调用、内部服务调用,那么后端服务该做什么处理?譬如根据id或编号查询,其实可以合并多个查询给到批查询。

使用批量调用方式

在这里插入图片描述

请求包装对象,根据orderId查询、将结果赋给CompletableFuture,通过CompletableFuture.get()获取到结果

import lombok.Data;

import java.util.concurrent.CompletableFuture;

@Data
public class MyRequest {
    private String id;
    private String orderId;
    private CompletableFuture<MyResponse> future;
}
import lombok.Data;

import java.math.BigDecimal;

@Data
public class MyResponse {
    private String id;
    private String orderId;
    private BigDecimal money;
}
import com.cy.threadt.entity.MyRequest;
import com.cy.threadt.entity.MyResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
@Service
public class MyService {
    private final LinkedBlockingDeque<MyRequest> linkedBlockingDeque = new LinkedBlockingDeque<>(100000);

    @PostConstruct
    public void doBiz () {
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
        threadPool.scheduleAtFixedRate(() -> {
            if (linkedBlockingDeque.size() == 0) {
                return;
            }
            List<MyRequest> requests = new ArrayList<>();
            for (int i = 0; i < linkedBlockingDeque.size(); i++) {
                requests.add(linkedBlockingDeque.poll());
            }
            batchQuery(requests);
            log.info("批查询处理数量{}", requests.size());
        }, 100, 50, TimeUnit.MILLISECONDS);
    }
    public MyResponse query(String orderId) throws ExecutionException, InterruptedException {
        MyRequest request = new MyRequest();
        request.setOrderId(orderId);
        request.setId(UUID.randomUUID().toString());
        CompletableFuture<MyResponse> objectCompletableFuture = new CompletableFuture<>();
        request.setFuture(objectCompletableFuture);
        linkedBlockingDeque.add(request);
        return objectCompletableFuture.get();
    }

    public void batchQuery(List<MyRequest> list){
        Map<String, List<MyRequest>> mapRequest = list.stream().collect(Collectors.groupingBy(MyRequest::getOrderId));
        List<String> orderIds = list.stream().map(MyRequest::getOrderId).distinct().collect(Collectors.toList());
        for (String orderId : orderIds) {
            List<MyRequest> myRequests = mapRequest.get(orderId);
            BigDecimal money = BigDecimal.valueOf(Math.random());
            for (MyRequest myRequest : myRequests) {
                MyResponse response = new MyResponse();
                response.setOrderId(orderId);
                response.setMoney(money);
                myRequest.getFuture().complete(response);
            }
        }
    }
}

单元测试,模拟10000并发查询

import com.cy.threadt.entity.MyResponse;
import com.cy.threadt.service.MyService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
class ThreadTApplicationTests {

	@Resource
	private MyService myService;

	private final int count = 10000;
	private final CountDownLatch countDownLatch = new CountDownLatch(count);
	@Test
	void contextLoads() throws InterruptedException {
		for (int i = 1; i <= count; i++) {
			int finalI = i;
			new Thread( () -> {
				try {
					countDownLatch.countDown();
					countDownLatch.await();
					MyResponse query = myService.query(String.valueOf(finalI));
//					log.info("查询{},结果{}", finalI , query);
				} catch (InterruptedException | ExecutionException e) {
					throw new RuntimeException(e);
				}
			}).start();
		}

		TimeUnit.SECONDS.sleep(10);
	}

}

10000次查询最终可以缩减至数十次查询
在这里插入图片描述

代码地址

代码地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/180712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JavaEE10-Spring Boot配置文件

目录 1.配置文件作用 2.配置文件的格式 为配置文件安装提示插件 2.1. .properties&#xff08;旧版&#xff0c;默认的&#xff09; 2.1.1.基本语法 PS:配置文件中使用"#"来添加注释信息&#xff0c;2种添加方式&#xff1a; 2.1.2.缺点分析 2.2. .yml&#…

阿里“云开发“小程序(uniCloud)

博主ps&#xff1a; 网上资料少的可怜&#xff0c;哎&#xff0c;腾讯云涨价了&#xff0c;论服务器&#xff0c;我肯定选的阿里&#xff0c;再着你们对比下uniCloud的报价就知道了&#xff0c;如果有钱就另当别论了。 所以这片博文&#xff0c;博主试过之后&#xff0c;先抛出…

Git速成指南

文章目录版本管理工具概念版本管理工具介绍版本管理发展简史SVN(SubVersion)GitGit工作流程图Git安装基本配置为常用指令配置别名&#xff08;可选&#xff09;解决GitBash乱码问题Git常用命令获取本地仓库基础操作指令查看修改的状态&#xff08;status&#xff09;添加工作区…

[翻译]PostgreSQL中的WAL压缩以及版本15中的改进

[翻译]PostgreSQL中的WAL压缩以及版本15中的改进从以开始就一直在尝试对WAL进行不同级别的压缩。自2016年以来内置功能&#xff08;wal_compression&#xff09;就一直存在&#xff0c;几乎所有备份工具都会在传递到备机前对WAL进行压缩。但现在是时候再看看内置的wal_compress…

呦~,这不 SVG 映射反爬么,这你都会?厉害厉害 | 案例 25

在正式学习本篇博客前&#xff0c;先要了解一下什么是 SVG&#xff08;Scalable Vector Graphics&#xff09;&#xff0c;它是一种矢量图形格式&#xff0c;可以用来在网页上创建可伸缩的图形。 使用 SVG 技术实现反爬虫的方法有以下几种&#xff1a; 验证码&#xff1a;使用…

imx6ull Linux使用设备树配置LED

我们基于寄存器的方式已经编写了LED驱动&#xff0c;实现点亮/熄灭LED&#xff0c;但是你有没有发现一个问题&#xff0c;就是假设LED修改了一个GPIO&#xff0c;那么需要对应的修改寄存器代码&#xff0c;非常繁琐&#xff0c;而且随着改板次数增加&#xff0c;那么会带来一个…

从零开始的数模(五)插值与拟合

目录 一、概念 二、 插值 2.1方法 2.2MATLAB实现 例题1 ​编辑例题2 2.3python实现 2.3.1例题一的python解法 2.3.2二维网格节点插值 例题四 三、拟合篇&#xff1a; 3.1MATLAB实现 3.2python实现 一、概念 二、 插值 2.1方法 2.2MATLAB实现 在MATLAB中提供了一些…

带滤波器的PID控制仿真-2(M语言)

被控对象为三阶传递函数&#xff1a;低通滤波器为&#xff1a;采样时间为1ms&#xff0c;噪声信号加在对象的输出端。分三种情况进行:M1 时&#xff0c;为未加噪声信号;M2时&#xff0c;为加噪声信号未加滤波;M3时&#xff0c;为加噪声信号加滤波。阶跃响应结果如图1&#xff5…

【论文精读】KD-MVS

今天读的是发表在ECCV2022上的自监督MVS文章&#xff0c;作者来自于旷视科技和清华大学。 文章链接&#xff1a;arxiv 代码链接&#xff1a;https://github.com/megvii-research/KD-MVS 目录Abstract1. Introduction2. Related work3. Methodology3.1 Self-supervised Teacher …

51单片机七人多数表决器仿真设计( proteus仿真+程序+报告+讲解视频)

51单片机七人多数表决器仿真设计( proteus仿真程序报告讲解视频&#xff09; 仿真图proteus 7.8及以上 程序编译器&#xff1a;keil 4/keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0033 51单片机七人多数表决器仿真设计视频讲解1.主要功能&#xff1a;2.仿真3.…

Java:Mybatis的使用

一、Mybatis的概述 MyBatis 是一款优秀的持久层框架&#xff0c;用于简化 JDBC 开发。 MyBatis中文官网&#xff1a;https://mybatis.org/mybatis-3/zh/getting-started.html 二、Mybatis快速入门 1、创建user表&#xff0c;添加数据 create database mybatis; use mybati…

英语语法大全

文章目录一、主语1、名词、代词和动词做主语2、主语从句做主语&#xff0c;谓语动词用单数3、主语从句练习二、谓语动词1、谓语动词种类2、主谓一致三、宾语1、单宾语2、双宾语3、复合宾语4、宾语从句四、定语1、定语从句2、定语从句的翻译五、状语1、分词做状语2、独立主格结构…

42.Isaac教程--超像素

超像素 ISAAC教程合集地址: https://blog.csdn.net/kunhe0512/category_12163211.html 超像素是一组外观相似的相连像素。 超像素分割将图像分成数百个不重叠的超像素&#xff08;而不是数千或数百万个单独的像素&#xff09;。通过使用超像素&#xff0c;您可以在更有意义的区…

ESP-C3入门2. Clion+IDF 开发环境搭建

ESP-C3入门2. ClionIDF 开发环境搭建一、准备工作二、Clion配置过程1. 使用idf.py命令创建一个范例工程2. 使用Clion打开项目&#xff0c;修改CMakeLists.txt3. 设置交叉编译工具链4. 设置CMake5. 对cmake一些路径进行修改。三、编译及烧录1. 编译2. 烧录3. 查看输出一、准备工…

审核中台业务数据进审升级之路

本文字数&#xff1a;3850字预计阅读时间&#xff1a;15 分钟目录1.背景1.1. 相关名词介绍1.2. 审核中台介绍1.3. 业务痛点介绍2. 规范化改造2.1 规范通讯协议2.2 规范处理流程3. 自动化改造3.1 业务接入检测器3.2 数据自动化流转3.3 源码示例4. 总结1.背景1.1 相关名词介绍1.1…

计算机图形学基础教程(Visual C++版)习题解答与编程实践(第2版)孔令德1-到第3章的直线扫描转换

1-到第3章的直线扫描转换&#xff08;没更新完&#xff09;习题1知识积累习题2知识点映射模式使用GDI对象习题3知识积累直线的中点Bresenham算法习题1 1.计算机图形学的定义是什么?说明计算机图形学、图像处理和模式识别之间的关系。 答&#xff1a; CG是计算机图形学的缩写。…

实验一、旅馆客户服务呼叫显示系统

实验一 旅馆客户服务呼叫显示系统 实验目的 综合应用数字电子技术知识&#xff0c;按照要求设计并完成一个小规模的数字电路系统。进行硬件线路的设计、仿真、焊接、调试与实现。使系统实现一种用于旅馆客户服务呼叫显示系统的实用电路。在呼叫过程中&#xff0c;当8位旅客有…

Spark Core ---- RDD持久化

RDD的数据是过程数据 RDD之间进行相互迭代计算&#xff08;Transformation的转换&#xff09;&#xff0c;当执行开启后&#xff0c;新RDD的生成&#xff0c;代表老RDD的消失 RDD的数据是过程数据&#xff0c;只在处理的过程中存在&#xff0c;一旦处理完成&#xff0c;就不见…

【数据结构和算法】实现带头双向循环链表(最复杂的链表)

前文&#xff0c;我们实现了认识了链表这一结构&#xff0c;并实现了无头单向非循环链表&#xff0c;接下来我们实现另一种常用的链表结构&#xff0c;带头双向循环链表。如有仍不了解单向链表的&#xff0c;请看这一篇文章(7条消息) 【数据结构和算法】认识线性表中的链表&…

Spring Boot之SpringSecurity学习

文章目录一 SpringSecurity简介二 实战演示0. 环境 介绍1. 新建一个初始的springboot项目2. 导入thymeleaf依赖3. 导入静态资源4. 编写controller跳转5. 认证和授权6. 权限控制和注销7. 记住登录8. 定制登录页面三 完整代码3.1 pom配置文件3.2 RouterController.java3.3 Securi…