API接口并发请求控制实现

news2024/11/25 6:40:34

文章目录

  • 一、问题概述
  • 二、解决思路
  • 1. AtomicInteger
  • 2. LongAdder
  • 3. Semaphore
  • 4. 实现区别
  • 三、API接口并发控制
  • 1. 核心源码
  • 2. 源码放送

一、问题概述

某API接口,承载某重要业务,希望控制任意时间点的并发访问数在5以内,该如何实现?

二、解决思路

解决这个问题主要有2个思路: 原子计数器和信号量。
原子计数器有2个实现:AtomicInteger、LongAdder
信号量:Semaphore

单元测试代码验证

1. AtomicInteger


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;

import lombok.extern.slf4j.Slf4j;

/**
 * AomicInteger控制线程并发数
 * 
 */
@Slf4j
public class AtomicIntegerTest
{
    private AtomicInteger count = new AtomicInteger(0);
    
    /**
     * 最大并发数
     */
    private int max = 5;
    
    /**
     * 线程池方式测试
     * 
     * @throws InterruptedException
     * @see [类、类#方法、类#成员]
     */
    @Test
    public void test()
        throws InterruptedException
    {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        int id = 0;
        while (id++ < 100)
        {
            cachedThreadPool.execute(() -> runCall());
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
        }
        cachedThreadPool.shutdown();
        while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
        {
        }
    }
    
    private void runCall()
    {
        try
        {
            log.info("++++ 计数器自增:{}", count.incrementAndGet());
            if (count.get() > max)
            {
                log.info("✈✈✈✈✈ 请求用户过多,请稍后再试! ✈✈✈✈✈");
                return;
            }
            
            // 模拟耗时业务操作
            log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
            StopWatch clock = new StopWatch();
            clock.start();
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
            clock.stop();
            log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            log.info("---- 计数器自减:{}", count.decrementAndGet());
        }
    }
}

运行结果:
在这里插入图片描述

2. LongAdder


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;

import lombok.extern.slf4j.Slf4j;

/**
 * LongAdder控制线程并发数
 *
 */
@Slf4j
public class LongAdderTest
{
    private LongAdder count = new LongAdder();
    
    /**
     * 最大并发数
     */
    private int max = 5;
    
    /**
     * 线程池方式测试
     * 
     * @throws InterruptedException
     * @see [类、类#方法、类#成员]
     */
    @Test
    public void test()
        throws InterruptedException
    {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        int id = 0;
        while (id++ < 100)
        {
            cachedThreadPool.execute(() -> runCall());
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
        }
        cachedThreadPool.shutdown();
        while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
        {
        }
    }
    
    private void runCall()
    {
        try
        {
            count.increment();
            log.info("++++ 计数器自增:{}", count.sum());
            if (count.sum() > max)
            {
                log.info("✈✈✈✈✈ 请求用户过多,请稍后再试! ✈✈✈✈✈");
                return;
            }
            
            // 模拟耗时业务操作
            log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
            StopWatch clock = new StopWatch();
            clock.start();
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
            clock.stop();
            log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            count.decrement();
            log.info("---- 计数器自减:{}", count.sum());
        }
    }
}

运行结果:
在这里插入图片描述

3. Semaphore


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SemaphoreTest
{
    private int max = 5;
    
    private Semaphore semaphore = new Semaphore(max, true);
    
    private AtomicInteger count = new AtomicInteger(0);
    
    /**
     * 线程池方式测试
     * 
     * @throws InterruptedException
     * @see [类、类#方法、类#成员]
     */
    @Test
    public void test()
        throws InterruptedException
    {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        int id = 0;
        while (id++ < 100)
        {
            cachedThreadPool.execute(() -> runCall());
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
        }
        cachedThreadPool.shutdown();
        while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
        {
        }
    }
    
    private void runCall()
    {
        try
        {
            semaphore.acquire();
            log.info("计数器自增:{}", count.incrementAndGet());
            
            // 模拟耗时业务操作
            log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
            StopWatch clock = new StopWatch();
            clock.start();
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
            clock.stop();
            log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            semaphore.release();
            log.info("计数器自减:{}", count.decrementAndGet());
        }
    }
}

运行结果:
在这里插入图片描述

4. 实现区别

对比原子计数器和信号量的实现,我们可以发现信号量Semaphore一旦许可不够会导致线程阻塞,原子计数器一旦达到最大线程并发数,可以快速失败,立即返回友好的提示信息。

三、API接口并发控制

1. 核心源码


import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.fly.demo.JsonResult;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Api(tags = "接口并发控制")
@RestController
@RequestMapping(value = "/both", produces = "application/json; charset=utf-8")
public class ConcurrentController
{
    /**
     * 最大并发数
     */
    private int max = 5;
    
    /***************** 注意:不可共用计数器!!! ******************/
    
    AtomicInteger count1 = new AtomicInteger(0);
    
    LongAdder count2 = new LongAdder();
    
    LongAdder count3 = new LongAdder();
    
    Semaphore semaphore = new Semaphore(max, true);
    
    @ApiOperation("并发测试Atomic")
    @GetMapping("/query/atomic")
    public JsonResult<?> queryAtomic()
    {
        try
        {
            log.info("计数器自增:{}", count1.incrementAndGet());
            if (count1.get() > max)
            {
                log.info("✈✈✈✈✈ 请求用户过多✈✈✈✈✈");
                return JsonResult.error("请求用户过多,请稍后再试!");
            }
            log.info("业务处理开始......");
            TimeUnit.SECONDS.sleep(10);
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            log.info("计数器自减:{}", count1.decrementAndGet());
        }
        return JsonResult.success();
    }
    
    @ApiOperation("并发测试LongAdder")
    @GetMapping("/query/longAdder")
    public JsonResult<?> queryLongAdder()
    {
        try
        {
            count2.increment();
            log.info("计数器自增:{}", count2.sum());
            if (count2.sum() > max)
            {
                log.info("✈✈✈✈✈ 请求用户过多,计数:{} ✈✈✈✈✈", count2.sum());
                return JsonResult.error("请求用户过多,请稍后再试!");
            }
            log.info("业务处理开始......");
            TimeUnit.SECONDS.sleep(10);
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            count2.decrement();
            log.info("计数器自减:{}", count2.sum());
        }
        return JsonResult.success();
    }
    
    // 仅用于Semaphore中计数
    private AtomicInteger count = new AtomicInteger(0);
    
    @ApiOperation("并发测试Semaphore")
    @GetMapping("/query/semaphore")
    public JsonResult<?> querySemaphore()
    {
        try
        {
            // 一旦许可不够,线程阻塞
            semaphore.acquire();
            log.info("计数器自增:{}", count.incrementAndGet());
            
            log.info("业务处理开始......");
            TimeUnit.SECONDS.sleep(10);
        }
        catch (InterruptedException e)
        {
            log.error(e.getMessage());
        }
        finally
        {
            semaphore.release();
            log.info("计数器自减:{}", count.decrementAndGet());
        }
        return JsonResult.success();
    }
    
    @ApiOperation("并发测试InCallable")
    @GetMapping("/query/callable")
    public Callable<JsonResult<?>> callable()
    {
        return () -> {
            try
            {
                count3.increment();
                log.info("计数器自增:{}", count3.sum());
                if (count3.sum() > max)
                {
                    log.info("✈✈✈✈✈ 请求用户过多,计数:{} ✈✈✈✈✈", count3.sum());
                    return JsonResult.error("请求用户过多,请稍后再试!");
                }
                log.info("业务处理开始......");
                TimeUnit.SECONDS.sleep(10);
            }
            catch (InterruptedException e)
            {
                log.error(e.getMessage());
            }
            finally
            {
                count3.decrement();
                log.info("计数器自减:{}", count3.sum());
            }
            return JsonResult.success();
        };
    }
}

2. 源码放送

git clone https://gitcode.com/00fly/springboot-demo.git

有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!

-over-

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2208848.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构-5.5.二叉树的存储结构

一.二叉树的顺序存储&#xff1a; a.完全二叉树&#xff1a; 1.顺序存储中利用了静态数组&#xff0c;空间大小有限&#xff1a; 2.基本操作&#xff1a; (i是结点编号) 1.上述图片中i所在的层次后面的公式应该把n换成i(图片里写错了)&#xff1b; 2.上述图片判断i是否有左…

ThingsBoard规则链节点:JSON Path节点详解

引言 JSON Path节点简介 用法 含义 应用场景 实际项目运用示例 智能农业监控系统 工业自动化生产线 车联网平台 结论 引言 ThingsBoard是一个功能强大的物联网平台&#xff0c;它提供了设备管理、数据收集与处理以及实时监控等核心功能。其规则引擎允许用户通过创建复…

Java-学生管理系统[初阶]

这次我们来尝试使用Java实现一下"学生管理系统"&#xff0c;顾名思义&#xff0c;就是实现一个能用来管理学生各种数据的系统。在后续学习中我们将对"学生管理系统"进行两种实现&#xff1a; &#x1f4da; 学生管理系统[初阶](不带模拟登录系统) &#…

衡石分析平台系统管理手册-智能运维之系统日志

系统日志​ 点击系统设置->系统日志 在这个页面&#xff0c;从时间&#xff0c;操作者, IP&#xff0c;行为&#xff0c;结果&#xff0c;类别&#xff0c;对象&#xff0c;描述等方面记录了用户行为&#xff0c;系统管理员可以从此页面针对整个系统的用户行为进行审计工作…

【C++】set/map 与 multiset/multimap

✨✨欢迎大家来到Celia的博客✨✨ &#x1f389;&#x1f389;创作不易&#xff0c;请点赞关注&#xff0c;多多支持哦&#x1f389;&#x1f389; 所属专栏&#xff1a;C 个人主页&#xff1a;Celias blog~ 目录 ​编辑 序列式容器和关联式容器 一、set 1.1 set介绍 1.2 …

大健康零售电商的智囊团:知识中台的应用与影响

在数字化浪潮席卷各行各业的今天&#xff0c;大健康零售电商行业也在积极探索转型升级的新路径。知识中台&#xff0c;作为一种集知识管理、数据挖掘与智能化应用于一体的新型技术架构&#xff0c;正逐渐成为推动这一转型的关键力量。本文将深入探讨知识中台在大健康零售电商中…

Light灯光组件+组件的相关操作+游戏资源的加载

Light灯光组件 Type: Directional:平行光&#xff0c;模仿的是太阳光 Spot:聚光灯 Area:区域光 Color&#xff1a; 颜色值 Mode: RealTime:实时 Mix:混合 Baked:烘焙 Intersity: 光照强度 Indirect Multiplier:光照强度乘数 Shadow Type:影子设置&#xff1a;…

CV方法再学习

轻量化模型 Mobile系列(V1~V3) MobileNetV1 MobileNetV1之所以轻量&#xff0c;与深度可分离卷积的关系密不可分 深度可分离卷积 主要是两种卷积变体组合使用&#xff0c;分别为逐通道卷积&#xff08;Depthwise Convolution&#xff09;和逐点卷积&#xff08;Pointwise C…

Nginx UI 一个可以管理Nginx的图形化界面工具

Nginx UI 是一个基于 Web 的图形界面管理工具&#xff0c;支持对 Nginx 的各项配置和状态进行直观的操作和监控。 Nginx UI 的功能非常丰富&#xff1a; 在线查看服务器 CPU、内存、系统负载、磁盘使用率等指标 在线 ChatGPT 助理 一键申请和自动续签 Let’s encrypt 证书 在…

八、Python基础语法(判断语句-下)

一、if elif else 结构 应用场景&#xff1a;多个判断条件下&#xff0c;并且这些判断条件存在一定的关联。 语法&#xff1a; elif也是python中关键字&#xff0c;后面跟一个判断条件&#xff0c;判断条件后面跟冒号 存在冒号&#xff0c;需要换行缩进&#xff0c;处于elif…

金九银十软件测试面试题(800道)

今年你的目标是拿下大厂offer&#xff1f;还是多少万年薪&#xff1f;其实这些都离不开日积月累的过程。 为此我特意整理出一份&#xff08;超详细笔记/面试题&#xff09;它几乎涵盖了所有的测试开发技术栈&#xff0c;非常珍贵&#xff0c;人手一份 肝完进大厂 妥妥的&#…

QD1-P5 HTML 段落标签(p)换行标签(br)

本节视频 www.bilibili.com/video/BV1n64y1U7oj?p5 ‍ 本节学习 HTML 标签&#xff1a; p标签 段落br标签 换行 ‍ 一、p 标签-段落 1.1 使用 p 标签划分段落 <p>段落文本</p>示例 <!DOCTYPE html> <html><head><meta charset"…

算法剖析:滑动窗口

文章目录 前言一、长度最小的子数组二、无重复字符的最长子串三、最大连续1的个数 III四、将 x 减到 0 的最小操作数五、水果成篮六、找到字符串中所有字母异位词七、串联所有单词的子串八、最小覆盖子串总结 前言 滑动窗口可以看作为一种特殊的通向双指针&#xff0c;这两个指…

轻松翻译:顶尖翻译器评测!

在工作生活中如果遇到翻译需求&#xff0c;就少不了一些好用的翻译器&#xff0c;接下来是我们就来为大家推荐几款市面上口碑极佳的翻译软件&#xff01; 福昕在线翻译 直达链接&#xff1a; fanyi.pdf365.cn/ 操作教程&#xff1a;立即获取 福昕在线翻译是一款基于云端技…

关于部分股市买卖的演示和总结

本文是对上一文的补充&#xff1a;一个普通人的投资认知-CSDN博客 一、简介 假设公司A 向某交易所发行100股股票&#xff0c;每股5元&#xff0c;预计将融资500元。 股民a买了10股&#xff0c;付出50元。 股民b买了20股&#xff0c;付出100元。 股民c买了30股&#xff0c;付出…

【3维视觉】超级好用的3D模型交互式可视化工具viser

项目地址 https://github.com/nerfstudio-project/viser 功能 SMPL模型可视化编辑 点云可视化 3DGS实时渲染播放 安装和使用 安装viser git clone https://github.com/nerfstudio-project/viser.git使用 官方说明文档 1. SMPL模型可视化编辑 先下载SMPLX人体模型 下载解…

290. 单词规律【哈希表】

文章目录 290. 单词规律解题思路Go代码 290. 单词规律 290. 单词规律 给定一种规律 pattern 和一个字符串 s &#xff0c;判断 s 是否遵循相同的规律。 这里的 遵循 指完全匹配&#xff0c;例如&#xff0c; pattern 里的每个字母和字符串 s 中的每个非空单词之间存在着双向…

小程序知识付费的优势 知识付费服务 知识付费平台 知识付费方法

在信息爆炸的时代&#xff0c;知识如同繁星点点&#xff0c;璀璨而散落。如何在这片知识的海洋中精准捕捞&#xff0c;成为现代人追求自我提升的迫切需求。小程序知识付费&#xff0c;正是这样一座桥梁&#xff0c;它以独特的优势&#xff0c;让智慧触手可及&#xff0c;轻触未…

metahuman如何导入UE5

1.启动 通过EPIC启动UE5(UE5内置有Bridge, 但是UE4是需要单独下在Bridge软件) 2.打开Quixel Bridge 在window(窗口)中打开Quixel Bridge 3.Bridge界面 在弹出的Bridge界面选择模型 需要先下载&#xff0c;然后再导入 4.下载模型 点击需要的模型右上方的绿色箭头下载 5.下…

TensorRT-LLM七日谈 Day1

Flag 利用7天时间熟悉tensort-llm的代码架构&#xff0c;cublas的使用方式以及flash attention的调优。 项目链接 https://github.com/NVIDIA/TensorRT-LLM 安装 https://nvidia.github.io/TensorRT-LLM/installation/linux.html 它的安装主要是需要下载相应的docker镜像&am…