前段时间在群里面发现有个群友抛出一个实际需求:需要通过一个接口拉取数据,这个接口有每秒10QPS限制,请问如何实现数据拉去效率最大化且限制调用拉取接口每秒10PQPS?我觉得这个需求挺有意思的,跟某群友讨论,发现可以利用JUC包下的Semaphore实现,几行代码就能搞定。这里将实现方案做下整理,算是抛砖引玉吧~
利用Semaphore实现多线程调用接口
- 一、代码实现
- 1.自定义线程池ExecutorConfig
- 2.获取数据接口DataFetchService
- 3.拉取数据接口核心实现RateLimitedDataFetcher
- 4.接口实现类DataFetchServiceImpl
- 5.UserController控制层
- 二、项目结构及源码下载地址
一、代码实现
1.自定义线程池ExecutorConfig
/**
* 线程池配置
*/
@Component
public class ExecutorConfig {
private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
private volatile static ExecutorService executorService;
public static ExecutorService getThreadPool() {
if (executorService == null){
synchronized (ExecutorConfig.class){
if (executorService == null){
executorService = newThreadPool();
}
}
}
return executorService;
}
private static ExecutorService newThreadPool(){
int queueSize = 1000;
int corePool = Math.min(10, maxPoolSize);
return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
}
private ExecutorConfig(){}
}
2.获取数据接口DataFetchService
/**
* 获取数据
*/
public interface DataFetchService {
/**
* 获取数据
* @return List<User>
*/
List<User> dataFetchTask();
}
3.拉取数据接口核心实现RateLimitedDataFetcher
@Service
@Slf4j
public class RateLimitedDataFetcher {
@Autowired
private UserMapper userMapper;
private static final int MAX_REQUESTS_PER_SECOND = 10;
private Semaphore semaphore = new Semaphore(MAX_REQUESTS_PER_SECOND);
private ExecutorService executorService = ExecutorConfig.getThreadPool();
public Future<List<User>> fetchData(Integer id) {
return executorService.submit((Callable<List<User>>) () -> {
try {
// 获取许可
semaphore.acquire();
// 执行网络请求,这里简化为一个延迟操作
QueryWrapper qw = new QueryWrapper();
//lt是小于,id小于5
qw.lt("id", id);
return userMapper.selectList(qw);
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} finally {
// 释放许可
semaphore.release();
}
});
}
}
4.接口实现类DataFetchServiceImpl
@Service
@Slf4j
public class DataFetchServiceImpl implements DataFetchService {
@Autowired
private UserMapper userMapper;
@Autowired
private RateLimitedDataFetcher rateLimitedDataFetcher;
@Override
public List<User> dataFetchTask() {
List<User> userList = null;
try {
userList = rateLimitedDataFetcher.fetchData(2).get();
} catch (InterruptedException | ExecutionException e) {
log.error("DataFetchServiceImpl dataFetchTask error:{}",e.getMessage());
}
return userList;
}
}
5.UserController控制层
/**
* 用户控制层
*
* @author hua
*/
@RestController
@RequestMapping(value = "/user")
public class UserController {
@Autowired
private DataFetchService dataFetchService;
@GetMapping(value = "/getBatchUser")
public ResponseEntity<List<User>> getBatchUser() {
List<User> users = dataFetchService.dataFetchTask();
HttpStatus status = users == null ? HttpStatus.NOT_FOUND: HttpStatus.OK;
return new ResponseEntity<>(users, status);
}
}
二、项目结构及源码下载地址
下载地址 springboot-cacheable 欢迎star哦~