配置中心是如何实现推送的?
背景
传统的静态配置方式想要修改某个配置时,必须重新启动一次应用,如果是数据库连接串的变更,那可能还容易接受一些,但如果变更的是一些运行时实时感知的配置,如某个功能项的开关,重启应用就显得有点大动干戈了。配置中心正是为了解决此类问题应运而生的,特别是在微服务架构体系中,更倾向于使用配置中心来统一管理配置。
配置中心最核心的能力就是配置的动态推送,常见的配置中心如 Nacos、Apollo 等都实现了这样的能力。目前比较流行的配置中心恰恰没有使用长连接,而是使用长轮询。
数据交互模式
数据交互有两种模式:
- Push(推模式)
- Pull(拉模式)
推模式指的是客户端和服务端建立好网络长连接,服务方有相关数据,直接通过长连接通道推动到客户端。其优点是及时,一旦数据变更,客户端立马能感知到;另外对客户端来说逻辑简单,不需要关心有无数据这些逻辑处理。缺点是不知道客户端的数据消费能力,可能导致数据积压在客户端,来不及处理。
拉模式是客户端主动向服务端发出请求,拉取相关数据。其优点是此过程由客户端发起请求,故不存在推模式中数据积压的问题。缺点是可能不及时,对客户端来说需要考虑数据拉取相关逻辑,何时去拉,拉的频率怎么样等等。
长轮询与轮询
长轮询和轮询都是拉模式的实现。
“轮询”是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。配置中心如果使用「轮询」实现动态推送,会有以下问题:
- 推送延迟。客户端每隔 5s 拉取一次配置,若配置变更发生在第 6s,则配置推送的延迟会达到 4s。
- 服务端压力。配置一般不会发生变化,频繁的轮询会给服务端造成很大的压力。
- 推送延迟和服务端压力无法中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。
“长轮询”则不存在上述的问题。客户端发起长轮询,如果服务端的数据没有发生变更,会 hold 住请求,直到服务端的数据发生变化,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次长轮询。配置中心使用「长轮询」如何解决「轮询」遇到的问题也就显而易见了:
- 推送延迟。服务端数据发生变更后,长轮询结束,立刻返回响应给客户端。
- 服务端压力。长轮询的间隔期一般很长,例如 30s、60s,并且服务端 hold 住连接不会消耗太多服务端资源。
可能有人会有疑问,为什么一次长轮询需要等待一定时间超时,超时后又发起长轮询,为什么不让服务端一直 hold 住?主要有两个层面的考虑,一是连接稳定性的考虑,长轮询在传输层本质上还是走的 TCP 协议,如果服务端假死、fullgc 等异常问题,或者是重启等常规操作,长轮询没有应用层的心跳机制,仅仅依靠 TCP 层的心跳保活很难确保可用性,所以一次长轮询设置一定的超时时间也是在确保可用性。除此之外,在配置中心场景,还有一定的业务需求需要这么设计。在配置中心的使用过程中,用户可能随时新增配置监听,而在此之前,长轮询可能已经发出,新增的配置监听无法包含在旧的长轮询中,所以在配置中心的设计中,一般会在一次长轮询结束后,将新增的配置监听给捎带上,而如果长轮询没有超时时间,只要配置一直不发生变化,响应就无法返回,新增的配置也就没法设置监听了。
配置中心长轮询设计
接下来来介绍实现细节
客户端发起长轮询
客户端发起一个HTTP请求,请求信息包含配置中心的地址,以及监听的dataId(本文处于简化说明的考虑,认为dataId是定位配置的唯一键)。若配置没有发生变化,客户端与服务端之间一直处于连接状态。
服务端监听数据变化
服务端会维护dataId和长轮询的映射关系,如果配置发生变化,服务端会找到对应的连接,为响应写入更新后的配置内容。如果超时内配置没有发生变化,服务端找到对应的超时长轮询连接,写入304响应。
304 在 HTTP 响应码中代表“未改变”,并不代表错误。比较契合长轮询时,配置未发生变更的场景。
客户端接受长轮询响应
首先查看响应码是 200 还是 304,以判断配置是否变更,做出相应的回调。之后再次发起下一次长轮询。
使用Java语言来实现一个简易的长轮询机制
在这里插入图片描述
ConfigServer.java
@RestController
@Slf4j
@SpringBootApplication
public class ConfigServer {
@Data
private static class AsyncTask {
// 长轮询请求的上下文,包含请求和响应体
private AsyncContext asyncContext;
// 超时标记
private boolean timeout;
public AsyncTask(AsyncContext asyncContext, boolean timeout) {
this.asyncContext = asyncContext;
this.timeout = timeout;
}
}
// guava提供的多指Map,一个key可以对应多个value
private volatile Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap
.create());
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
.build();
private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
@RequestMapping("/listener")
public void addListener(HttpServletRequest request, HttpServletResponse response) {
String dataId = request.getParameter("dataId");
// 开启异步
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);
dataIdContext.put(dataId, asyncTask);
// 启动定时器,30秒后写入304响应
timeoutChecker.schedule(() -> {
if (asyncTask.isTimeout()) {
dataIdContext.remove(dataId, asyncTask);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
asyncContext.complete();
}
}, 30000, TimeUnit.MILLISECONDS);
}
// ④ 配置发布接入点
@RequestMapping("/publishConfig")
@SneakyThrows
public String publishConfig(String dataId, String configInfo) {
log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
for (AsyncTask asyncTask : asyncTasks) {
asyncTask.setTimeout(false);
HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(configInfo);
asyncTask.getAsyncContext().complete();
}
return "success";
}
public static void main(String[] args) {
SpringApplication.run(ConfigServer.class, args);
}
}
ConfigClient.java:
@Slf4j
public class ConfigClient {
private CloseableHttpClient httpClient;
private RequestConfig requestConfig;
public ConfigClient() {
this.httpClient = HttpClientBuilder.create().build();
// httpClient客户端超时时间要大于长轮询约定的超时时间
this.requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
}
@SneakyThrows
public void longPolling(String url, String dataId) {
String endpoint = url + "?dataId=" + dataId;
HttpGet request = new HttpGet(endpoint);
CloseableHttpResponse response = httpClient.execute(request);
switch (response.getStatusLine().getStatusCode()) {
case 200: {
BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
response.close();
String configInfo = result.toString();
log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
// 开启下一个长连接
longPolling(url, dataId);
break;
}
// 响应标记配置没有更变
case 304: {
log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
longPolling(url, dataId);
break;
}
default: {
throw new RuntimeException("unExcepted HTTP status code");
}
}
}
public static void main(String[] args) {
// httpClient 会打印很多debug日志,关掉
Logger logger = (Logger) LoggerFactory.getLogger("org.apache.http");
logger.setLevel(Level.INFO);
ConfigClient configClient = new ConfigClient();
// 对dataId:user进行配置监听
configClient.longPolling("http://127.0.0.1:8080/listener", "user");
}
}