文章目录
- 一、问题概述
- 二、解决思路
- 1. AtomicInteger
- 2. LongAdder
- 3. Semaphore
- 4. 实现区别
- 三、API接口并发控制
- 1. 核心源码
- 2. 源码放送
一、问题概述
某API接口,承载某重要业务,希望控制任意时间点的并发访问数在5以内,该如何实现?
二、解决思路
解决这个问题主要有2个思路: 原子计数器和信号量。
原子计数器有2个实现:AtomicInteger、LongAdder
信号量:Semaphore
单元测试代码验证
1. AtomicInteger
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;
import lombok.extern.slf4j.Slf4j;
/**
* AomicInteger控制线程并发数
*
*/
@Slf4j
public class AtomicIntegerTest
{
private AtomicInteger count = new AtomicInteger(0);
/**
* 最大并发数
*/
private int max = 5;
/**
* 线程池方式测试
*
* @throws InterruptedException
* @see [类、类#方法、类#成员]
*/
@Test
public void test()
throws InterruptedException
{
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
int id = 0;
while (id++ < 100)
{
cachedThreadPool.execute(() -> runCall());
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
}
cachedThreadPool.shutdown();
while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
{
}
}
private void runCall()
{
try
{
log.info("++++ 计数器自增:{}", count.incrementAndGet());
if (count.get() > max)
{
log.info("✈✈✈✈✈ 请求用户过多,请稍后再试! ✈✈✈✈✈");
return;
}
// 模拟耗时业务操作
log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
StopWatch clock = new StopWatch();
clock.start();
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
clock.stop();
log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
log.info("---- 计数器自减:{}", count.decrementAndGet());
}
}
}
运行结果:
2. LongAdder
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;
import lombok.extern.slf4j.Slf4j;
/**
* LongAdder控制线程并发数
*
*/
@Slf4j
public class LongAdderTest
{
private LongAdder count = new LongAdder();
/**
* 最大并发数
*/
private int max = 5;
/**
* 线程池方式测试
*
* @throws InterruptedException
* @see [类、类#方法、类#成员]
*/
@Test
public void test()
throws InterruptedException
{
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
int id = 0;
while (id++ < 100)
{
cachedThreadPool.execute(() -> runCall());
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
}
cachedThreadPool.shutdown();
while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
{
}
}
private void runCall()
{
try
{
count.increment();
log.info("++++ 计数器自增:{}", count.sum());
if (count.sum() > max)
{
log.info("✈✈✈✈✈ 请求用户过多,请稍后再试! ✈✈✈✈✈");
return;
}
// 模拟耗时业务操作
log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
StopWatch clock = new StopWatch();
clock.start();
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
clock.stop();
log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
count.decrement();
log.info("---- 计数器自减:{}", count.sum());
}
}
}
运行结果:
3. Semaphore
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Test;
import org.springframework.util.StopWatch;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SemaphoreTest
{
private int max = 5;
private Semaphore semaphore = new Semaphore(max, true);
private AtomicInteger count = new AtomicInteger(0);
/**
* 线程池方式测试
*
* @throws InterruptedException
* @see [类、类#方法、类#成员]
*/
@Test
public void test()
throws InterruptedException
{
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
int id = 0;
while (id++ < 100)
{
cachedThreadPool.execute(() -> runCall());
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(100, 1000));
}
cachedThreadPool.shutdown();
while (!cachedThreadPool.isTerminated()) // 保证任务全部执行完
{
}
}
private void runCall()
{
try
{
semaphore.acquire();
log.info("计数器自增:{}", count.incrementAndGet());
// 模拟耗时业务操作
log.info("★★★★★★★★ 报名或抢购处理中★★★★★★★★");
StopWatch clock = new StopWatch();
clock.start();
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1000, 5000));
clock.stop();
log.info("运行 {} ms ---------------", clock.getLastTaskTimeMillis());
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
semaphore.release();
log.info("计数器自减:{}", count.decrementAndGet());
}
}
}
运行结果:
4. 实现区别
对比原子计数器和信号量的实现,我们可以发现信号量Semaphore一旦许可不够会导致线程阻塞
,原子计数器一旦达到最大线程并发数,可以快速失败
,立即返回友好的提示信息。
三、API接口并发控制
1. 核心源码
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fly.demo.JsonResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Api(tags = "接口并发控制")
@RestController
@RequestMapping(value = "/both", produces = "application/json; charset=utf-8")
public class ConcurrentController
{
/**
* 最大并发数
*/
private int max = 5;
/***************** 注意:不可共用计数器!!! ******************/
AtomicInteger count1 = new AtomicInteger(0);
LongAdder count2 = new LongAdder();
LongAdder count3 = new LongAdder();
Semaphore semaphore = new Semaphore(max, true);
@ApiOperation("并发测试Atomic")
@GetMapping("/query/atomic")
public JsonResult<?> queryAtomic()
{
try
{
log.info("计数器自增:{}", count1.incrementAndGet());
if (count1.get() > max)
{
log.info("✈✈✈✈✈ 请求用户过多✈✈✈✈✈");
return JsonResult.error("请求用户过多,请稍后再试!");
}
log.info("业务处理开始......");
TimeUnit.SECONDS.sleep(10);
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
log.info("计数器自减:{}", count1.decrementAndGet());
}
return JsonResult.success();
}
@ApiOperation("并发测试LongAdder")
@GetMapping("/query/longAdder")
public JsonResult<?> queryLongAdder()
{
try
{
count2.increment();
log.info("计数器自增:{}", count2.sum());
if (count2.sum() > max)
{
log.info("✈✈✈✈✈ 请求用户过多,计数:{} ✈✈✈✈✈", count2.sum());
return JsonResult.error("请求用户过多,请稍后再试!");
}
log.info("业务处理开始......");
TimeUnit.SECONDS.sleep(10);
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
count2.decrement();
log.info("计数器自减:{}", count2.sum());
}
return JsonResult.success();
}
// 仅用于Semaphore中计数
private AtomicInteger count = new AtomicInteger(0);
@ApiOperation("并发测试Semaphore")
@GetMapping("/query/semaphore")
public JsonResult<?> querySemaphore()
{
try
{
// 一旦许可不够,线程阻塞
semaphore.acquire();
log.info("计数器自增:{}", count.incrementAndGet());
log.info("业务处理开始......");
TimeUnit.SECONDS.sleep(10);
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
semaphore.release();
log.info("计数器自减:{}", count.decrementAndGet());
}
return JsonResult.success();
}
@ApiOperation("并发测试InCallable")
@GetMapping("/query/callable")
public Callable<JsonResult<?>> callable()
{
return () -> {
try
{
count3.increment();
log.info("计数器自增:{}", count3.sum());
if (count3.sum() > max)
{
log.info("✈✈✈✈✈ 请求用户过多,计数:{} ✈✈✈✈✈", count3.sum());
return JsonResult.error("请求用户过多,请稍后再试!");
}
log.info("业务处理开始......");
TimeUnit.SECONDS.sleep(10);
}
catch (InterruptedException e)
{
log.error(e.getMessage());
}
finally
{
count3.decrement();
log.info("计数器自减:{}", count3.sum());
}
return JsonResult.success();
};
}
}
2. 源码放送
git clone https://gitcode.com/00fly/springboot-demo.git
有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!
-over-