上一篇文章中实现了短轮询,不过短轮询的弊端也很明显,如果请求的频率较高,那么就会导致服务端压力大(并发高);如果请求的频率放低,那么客户端感知变更的及时性就会降低。所以我们来看另一种轮询方式,长轮询。
长轮询就是客户端发起请求,如果服务端的数据没有发生变更,那么就hold住请求,直到服务端的数据发生了变更,或者达到了一定的时间就会返回。这样就减少了客户端和服务端不断频繁连接和传递数据的过程,并且不会消耗服务端太多资源,而且客户端感知变更的及时性也会大大提高
代码在https://gitee.com/summer-cat001/config-center
原理
要实现服务端长时间hold请求,就要利用到servlet异步的特性,因为web服务器会有一个线程池,每一个请求来了之后会提交给这个线程池去处理请求,如果一个任务很长时间都没完成的话就会一直占有这个线程,那么其他请求来了会发现线程池里没有可用的线程就会一直等,直到有空闲的线程,这样就会导致并发性大大的减少。所以需要采用异步响应的方式去实现,而比较方便实现异步http的方式就是Servlet3.0提供的AsyncContext 机制。asyncContext是为了把主线程返回给web服务器的线程池,不影响服务对其他客户端请求。会有线程专门处理这个长轮询,但并不是说每一个长轮询的http请求都要用一个线程阻塞在那。而是把长轮询的request的引用在一个集合中存起来,用一个或几个线程专门处理一批客户端的长轮询请求,这样就不需要为每一个长轮询单独分配线程阻塞在那了,从而大大降低了资源的消耗。注意,异步不是非阻塞,响应数据时还是要阻塞的。
服务端
服务端增加一个长轮询的接口
@PostMapping("/change/get/long")
public Result<Void> getLongChangeConfig(@RequestBody Map<Long, Integer> configIdMap, HttpServletRequest request, HttpServletResponse response) {
if (configIdMap == null || configIdMap.isEmpty()) {
return Result.fail("配置参数错误");
}
response.setContentType("application/json;charset=UTF-8");
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ConfigPolingTask configPolingTask = new ConfigPolingTask();
configPolingTask.setAsyncContext(asyncContext);
configPolingTask.setConfigPolingDataMap(configIdMap);
configPolingTask.setEndTime(System.currentTimeMillis() + 28 * 1000);
configService.configListener(configPolingTask);
return null;
}
主要就是把请求的配置id和版本的map、超时时间、asyncContext对象组装成一个任务,添加到任务池里,如有更新了配置,会去任务池里找是否有该配置id的任务,如果版本号大于任务的版本号,就将新配置返回给客户端。于此同时会有1个定时线程每1秒访问一下任务池,找到过期的任务,返回给客户端。客户端的请求过期时间是30秒,服务端过期时间定的是28秒,也就是配置没有改变的情况下,会hold请求28秒才返回,提前2秒返回是为了防止返回传输时间导致超过30秒,客户端断开链接。
@Slf4j
@Service
public class ConfigServiceImpl implements ConfigService {
private ConfigDAO configDAO;
private ConfigSyncService configSyncService;
@Autowired
private LocalConfigDAO localConfigDAO;
@Autowired
private LocalConfigSyncServiceImpl localConfigSyncService;
@Value("${config.center.mode:0}")
private int configCenterMode;
private int respThreadNum;
private final ExecutorService respExecutor;
private final ConfigPolingTasksHolder configPolingTasksHolder;
public ConfigServiceImpl() {
configPolingTasksHolder = new ConfigPolingTasksHolder();
//构建用于响应长轮询的线程池
respExecutor = new ThreadPoolExecutor(100, 5000,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(102400),
this::newRespThread,
new ThreadPoolExecutor.CallerRunsPolicy());
//每1秒轮询执行一次任务超时检测
ScheduledExecutorService timeoutCheckExecutor = new ScheduledThreadPoolExecutor(1, this::newCheckThread);
timeoutCheckExecutor.scheduleAtFixedRate(this::responseTimeoutTask, 0, 1, TimeUnit.SECONDS);
}
@PostConstruct
public void init() {
ConfigCenterModeEnum configCenterModeEnum = ConfigCenterModeEnum.getEnum(configCenterMode);
if (configCenterModeEnum == null) {
throw new IllegalArgumentException("配置config.center.mode错误");
}
if (configCenterModeEnum == ConfigCenterModeEnum.STANDALONE) {
this.configDAO = localConfigDAO;
this.configSyncService = localConfigSyncService;
}
}
@Override
public Result<Void> insertConfig(ConfigBO configBO) {
List<ConfigDO> configList = configDAO.getAllValidConfig();
if (configList.stream().anyMatch(c -> c.getName().equals(configBO.getName()))) {
return Result.fail("配置名重复");
}
ConfigDO configDO = new ConfigDO();
configDO.setName(configBO.getName());
configDO.setConfigData(configBO.getConfigData().toJSONString());
configDAO.insertConfigDO(configDO);
return Result.success(null);
}
@Override
public Result<Void> updateConfig(ConfigBO configBO) {
ConfigDO configDO = new ConfigDO();
configDO.setId(configBO.getId());
configDO.setName(configBO.getName());
configDO.setConfigData(configBO.getConfigData().toJSONString());
configDAO.updateConfig(configDO);
configSyncService.publish(configBO.getId());
return Result.success(null);
}
@Override
public Result<Void> delConfig(long id, long updateUid) {
configDAO.delConfig(id, updateUid);
return Result.success(null);
}
@Override
public Result<List<ConfigBO>> getAllValidConfig() {
List<ConfigDO> configList = configDAO.getAllValidConfig();
return Result.success(configList.stream().map(this::ConfigDO2BO).collect(Collectors.toList()));
}
@Override
public void configListener(ConfigPolingTask configPolingTask) {
//先将任务加到待响应列表中,然后再判断账号是否有改变,防止并发问题
//如先判断再加进去,加入前如有变动,任务里无法感知到,空等到超时
configPolingTasksHolder.addConfigTask(configPolingTask);
List<ConfigBO> allValidConfig = getAllValidConfig().getData();
List<ConfigVO> changeConfigList = getChangeConfigList(configPolingTask, allValidConfig);
if (!changeConfigList.isEmpty()) {
List<ConfigPolingTask> todoTask = configPolingTasksHolder.getExecuteTaskList(configPolingTask::equals);
if (!todoTask.isEmpty()) {
doResponseTask(configPolingTask, Result.success(changeConfigList));
}
}
}
@Override
public void onChangeConfigEvent(long configId) {
List<ConfigPolingTask> todoTasks = configPolingTasksHolder.getExecuteTaskList(
configPolingTask -> configPolingTask.getConfigPolingDataMap().containsKey(configId));
if (!todoTasks.isEmpty()) {
List<ConfigBO> configList = Collections.singletonList(ConfigDO2BO(configDAO.getConfig(configId)));
todoTasks.forEach(todoTask -> {
List<ConfigVO> changeConfigList = getChangeConfigList(todoTask, configList);
respExecutor.submit(() -> doResponseTask(todoTask, Result.success(changeConfigList)));
});
}
}
private List<ConfigVO> getChangeConfigList(ConfigPolingTask configPolingTask, List<ConfigBO> configList) {
Map<Long, Integer> configPolingDataMap = configPolingTask.getConfigPolingDataMap();
return configList.stream()
.filter(configBO -> configPolingDataMap.containsKey(configBO.getId()))
.filter(configBO -> configBO.getVersion() > configPolingDataMap.get(configBO.getId()))
.map(ConfigServiceImpl::configBO2ConfigVO).collect(Collectors.toList());
}
private ConfigBO ConfigDO2BO(ConfigDO configDO) {
ConfigBO configBO = new ConfigBO();
configBO.setId(configDO.getId());
configBO.setName(configDO.getName());
configBO.setVersion(configDO.getVersion());
configBO.setCreateTime(configDO.getCreateTime());
configBO.setConfigData(JSON.parseObject(configDO.getConfigData()));
return configBO;
}
//响应超时未改变的任务
private void responseTimeoutTask() {
List<ConfigPolingTask> timeoutTasks = configPolingTasksHolder.getExecuteTaskList(
configPolingTask -> System.currentTimeMillis() >= configPolingTask.getEndTime());
timeoutTasks.forEach(timeoutTask -> respExecutor.submit(() ->
doResponseTask(timeoutTask, Result.success(new ArrayList<>()))));
}
private void doResponseTask(ConfigPolingTask configPolingTask, Result<?> result) {
AsyncContext asyncContext = configPolingTask.getAsyncContext();
try (PrintWriter writer = asyncContext.getResponse().getWriter()) {
writer.write(JSON.toJSONString(result));
writer.flush();
} catch (Exception e) {
log.error("doResponseTimeoutTask error,task:{}", configPolingTask, e);
} finally {
asyncContext.complete();
}
}
private Thread newCheckThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("ConfigLongPollingTimeoutCheckExecutor");
return t;
}
private Thread newRespThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("ConfigLongPollingTimeoutRespExecutor-" + respThreadNum++);
return t;
}
public static ConfigVO configBO2ConfigVO(ConfigBO configBO) {
ConfigVO configVO = new ConfigVO();
configVO.setId(configBO.getId());
configVO.setName(configBO.getName());
configVO.setVersion(configBO.getVersion());
configVO.setConfigData(configBO.getConfigData());
configVO.setCreateTime(DateUtil.date2str1(configBO.getCreateTime()));
return configVO;
}
}
public class ConfigPolingTasksHolder {
private final List<ConfigPolingTask> configPolingTasks;
public ConfigPolingTasksHolder() {
configPolingTasks = new ArrayList<>();
}
public synchronized void addConfigTask(ConfigPolingTask configPolingTask) {
configPolingTasks.add(configPolingTask);
}
//将要处理的任务在任务列表中删除,并将其放到外面执行,防止锁的时间太长
public synchronized List<ConfigPolingTask> getExecuteTaskList(Predicate<ConfigPolingTask> predicate) {
List<ConfigPolingTask> resultTasks = new ArrayList<>();
configPolingTasks.removeIf(configPolingTask -> {
boolean res = predicate.test(configPolingTask);
if (res) {
resultTasks.add(configPolingTask);
}
return res;
});
return resultTasks;
}
}
@Data
public class ConfigPolingTask {
/**
* 截止时间
*/
private long endTime;
/**
* 异步请求
*/
private AsyncContext asyncContext;
/**
* 配置轮询数据(配置id,版本)
*/
private Map<Long, Integer> configPolingDataMap;
}
客户端
客户端就很简单了,只要循环发一个超时时间是30秒的http请求就行
public void startLongPolling() {
polling("/config/change/get/long", null, 30000);
}
public void polling(String uri, Runnable runnable, int readTimeout) {
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
try {
Optional.ofNullable(runnable).ifPresent(Runnable::run);
Map<Long, List<ConfigDataBO>> refreshConfigMap = new HashMap<>();
configMap.values().forEach(configBO -> {
Optional.ofNullable(configBO.getConfigDataList()).ifPresent(cdList -> cdList.stream()
.filter(cd -> cd.getRefreshFieldList() != null && !cd.getRefreshFieldList().isEmpty())
.forEach(refreshConfigMap.computeIfAbsent(configBO.getId(), k1 -> new ArrayList<>())::add));
});
if (refreshConfigMap.isEmpty()) {
return;
}
Map<String, Integer> configIdMap = refreshConfigMap.keySet().stream()
.collect(Collectors.toMap(String::valueOf, configId -> configMap.get(configId).getVersion()));
HttpRespBO httpRespBO = HttpUtil.httpPostJson(url + uri, JSON.toJSONString(configIdMap), readTimeout);
List<ConfigVO> configList = httpResp2ConfigVOList(httpRespBO);
if (configList.isEmpty()) {
continue;
}
configList.forEach(configVO -> {
Map<String, Object> result = new HashMap<>();
DataTransUtil.buildFlattenedMap(result, configVO.getConfigData(), "");
ConfigBO configBO = this.configMap.get(configVO.getId());
configBO.setVersion(configVO.getVersion());
List<ConfigDataBO> configDataList = configBO.getConfigDataList();
Map<String, ConfigDataBO> configDataMap = configDataList.stream()
.collect(Collectors.toMap(ConfigDataBO::getKey, Function.identity()));
result.forEach((key, value) -> {
ConfigDataBO configDataBO = configDataMap.get(key);
if (configDataBO == null) {
configDataList.add(new ConfigDataBO(key, value.toString()));
} else {
configDataBO.setValue(value.toString());
List<RefreshFieldBO> refreshFieldList = configDataBO.getRefreshFieldList();
if (refreshFieldList == null) {
refreshFieldList = new ArrayList<>();
configDataBO.setRefreshFieldList(refreshFieldList);
}
refreshFieldList.forEach(refreshFieldBO -> {
try {
Field field = refreshFieldBO.getField();
field.setAccessible(true);
field.set(refreshFieldBO.getBean(), value.toString());
} catch (Exception e) {
log.error("startShortPolling set Field error", e);
}
});
}
});
});
} catch (Exception e) {
log.error("startShortPolling error", e);
}
}
});
thread.setName("startShortPolling");
thread.setDaemon(true);
thread.start();
}
private List<ConfigVO> httpResp2ConfigVOList(HttpRespBO httpRespBO) {
if (!httpRespBO.success()) {
throw new IllegalArgumentException("获取配置失败:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());
}
if (httpRespBO.getBody() == null) {
throw new IllegalArgumentException("获取配置失败 body is null:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());
}
Result<?> result = JSON.parseObject(new String(httpRespBO.getBody(), StandardCharsets.UTF_8), Result.class);
if (result.failed()) {
throw new IllegalArgumentException("获取配置失败 result:" + result);
}
return JSON.parseArray(JSON.toJSONString(result.getData()), ConfigVO.class);
}
public class ClientTest {
private String userName;
private String userAge;
private List<Object> education;
public ClientTest() throws NoSuchFieldException {
ConfigCenterClient configCenterClient = new ConfigCenterClient("http://localhost:8088");
Map<String, String> configProperty = configCenterClient.getConfigProperty();
this.userName = configProperty.get("user.name");
this.userAge = configProperty.get("user.age");
this.education = new ArrayList<>();
int i = 0;
while (configProperty.containsKey("user.education[" + i + "]")) {
education.add(configProperty.get("user.education[" + (i++) + "]"));
}
configCenterClient.addRefreshField("user.name", new RefreshFieldBO(this, ClientTest.class.getDeclaredField("userName")));
configCenterClient.startLongPolling();
}
public String toString() {
return "姓名:" + userName + ",年龄:" + userAge + ",教育经历:" + education;
}
public static void main(String[] args) throws NoSuchFieldException, InterruptedException {
ClientTest clientTest = new ClientTest();
while (!Thread.interrupted()) {
System.out.println(clientTest);
Thread.sleep(1000);
}
}
}