版本:opensearch-rest-high-level-client-2.3.0.jar,httpcore-nio-4.4.11.jar,httpasyncclient-4.1.4.jar
问题背景
初始化es索引逻辑是监听大数据团队消息,然后异步写入es(org.opensearch.client.RestHighLevelClient#bulkAsync),qps很低就将服务cpu接近打满,通过排查问题原因是消息消费很快,es写入有瓶颈,由于是异步写入,那么请求都积压在服务导致服务内存不足频繁GC,进而导致cpu飙高,那么es客户端是如何初始化的?异步线程池是什么类型队列为什么不会积压阻塞?那么带着问题去学习事半功倍
ES客户端初始化流程
- 创建异步Http客户端
- customizeRequestConfig回调
- customizeHttpClient回调
- 创建IOReactor:ConnectingIOReactor
- 创建连接管理器
- 创建http异步客户端:InternalHttpAsyncClient
创建IOReactor
默认实现类:DefaultConnectingIOReactor
- requestQueue:SessionRequest队列,即:客户端与服务器建立会话连接请求队列(SessionRequest interface represents a request to establish a new connection (or session) to a remote host)
- threadFactory:dispatcher线程池,兜底命名规则:I/O dispatcher
- dispatchers:dispatcher处理器(BaseIOReactor)
- eventDispatch(IOEventDispatch):IOEvent分发调度器,动作:connected/inputReady/outputReady/timeout/disconnected
- exceptionHandler:IO异常处理器,当收到异常时决定是否继续执行I/O reactor,异常类型:IOException/RuntimeException
- workers:worker线程,BaseIOReactor与IOEventDispatch桥接线程。自旋等待处理select数据
- this.selector.select
- 如果是SHUT_DOWN状态,终止自旋
- 如果是SHUTTING_DOWN状态,closeSessions,closeNewChannels
- processEvents:处理事件,BaseIOReactor仅处理读写事件,acceptable,connectable事件不处理,客户端也不需要处理·.·
- validate:Validate active channels
- processClosedSessions:处理已关闭会话
- 如果是ACTIVE状态:org.apache.http.impl.nio.reactor.AbstractIOReactor#processNewChannels:处理新channel如果存在的话,即如果存在新channel将当前selector注册至channel(类型:SelectionKey.OP_READ,监听channel事件),并将注册的SelectionKey封装为IOSessionImpl添加至org.apache.http.impl.nio.reactor.AbstractIOReactor#sessions,将IOSessionImpl附加(attach)至刚刚注册的SelectionKey
- 如果是ACTIVE状态,并且sessions为空,优雅停机:Exit select loop if graceful shutdown has been completed
- 如果开启了interestOpsQueueing:processPendingInterestOps,默认关闭
- threads:使用threadFactory命名规则封装的Worker线程缓存
创建连接管理器connmgr
实现类:PoolingNHttpClientConnectionManager
- ioreactor : DefaultConnectingIOReactor
- configData
- pool:连接池实现类CPool
- connFactory:实现类InternalConnectionFactory
- addressResolver
- sessionRequestCallback:实现类InternalSessionRequestCallback,在会话请求(org.apache.http.nio.reactor.ConnectingIOReactor#connect)完成时处理pending队列中的待处理请求
- routeToPool:route与pool映射
- leasingRequests:实现类new LinkedList<LeaseRequest<T, C, E>>()
- pending:实现类new HashSet()
- leased:实现类new HashSet()
- available:new LinkedList()
- completedRequests:new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>()
- maxPerRoute:new HashMap<T, Integer>()
- defaultMaxPerRoute:2
- maxTotal:20
- iosessionFactoryRegistry:RegistryBuilder.create()…build()
创建http异步客户端
实现类:InternalHttpAsyncClient
- connmgr:连接管理器
- connManagerShared:默认false,Defines the connection manager is to be shared by multiple client instances
- threadFactory
- connManagerShared默认false,使用配置线程工厂,兜底Executors.defaultThreadFactory()
- 否则为null
- eventHandler
- connManagerShared默认false,使用配置事件处理器,兜底HttpAsyncRequestExecutor
- 否则为null
- reactorThread
- 如果threadFactory与eventHandler不为空,使用threadFacotry创建匿名线程,线程使用connmgr执行内部IO事件分发调度(new InternalIODispatch(eventHandler))
- 否则为null
- status:初始化状态为INACTIVE
- exec:实现类MainClientExec(路由策略在prepare阶段决定org.apache.http.impl.nio.client.MainClientExec#prepare:routePlanner=new DefaultRoutePlanner(schemePortResolver=DefaultSchemePortResolver.INSTANCE))
创建Rest客户端
- client:http异步客户端
- nodeSelector:默认org.opensearch.client.NodeSelector#ANY
- chunkedEnabled:默认为空
- nodes:集群节点HttpHost.create(serverCluster),例如:https://my-es0-cluster.com
- nodeTuple:nodes与authCache映射
- compressionEnabled:如果chunkedEnabled不为空,取chunkedEnabled配置,否则为false
启动http异步客户端
实现类:InternalHttpAsyncClient
启动异步客户端
- 将客户端状态由INACTIVE设置为ACTIVE
- 如果reactorThread不为空,启动reactorThread线程
- ioreactor(DefaultConnectingIOReactor)执行内部事件(InternalIODispatch)分发:DefaultConnectingIOReactor.AbstractMultiworkerIOReactor.execute(InternalIODispatch)
- 默认根据服务核心数量(n)创建n个BaseIOReactor,并将n个BaseIOReactor与InternalIODispatch封装为n个Worker
- 遍历并启动Worker
- 自旋等待select事件:selector.select(mac系统默认实现类:KQueueSelectorImpl)
- DefaultConnectingIOReactor处理select事件(下面再聊这块的流程):org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processEvents
- 遍历workers,如果存在异常处理异常
- 客户端至此创建并初始化完成返回RestClient
DefaultConnectingIOReactor处理select事件
org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processEvents
- 处理会话请求:org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processSessionRequests
- 遍历selectedKeys处理数据:org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processEvent
- 如果可以连接isConnectable,建立连接
- 如果连接未完成(未中断,例如:超时/IO异常/主动释放/主动取消),将会话具柄(SessionRequestHandle)中的会话请求(SessionRequestImpl)封装为通道条目ChannelEntry,添加至BaseIOReactor的新通道(newChannels)
- 唤醒selector等待处理通道数据,即启动异步客户端中的2.c步骤逻辑
- 处理超时数据:org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processTimeouts
BaseIOReactor处理select事件
- 执行父类方法处理事件:org.apache.http.impl.nio.reactor.AbstractIOReactor#processEvents
- 遍历SelectionKey数据处理读写等事件
- 处理读写等数据均回调eventDispatch(即InternalIODispatch,handler=HttpAsyncRequestExecutor)处理,例如:org.apache.http.impl.nio.reactor.BaseIOReactor#readable,回调eventDispatch.inputReady
ES客户端线程设计
异步写流程bulkAsync
RouteSpecificPool
- available:pool entry可复用链表
- leased:已租借集合,可复用链表转移而来
- pending:待处理会话请求与回调映射(Map<SessionRequest, BasicFuture> pending),key值等同CPool.pengding
方法
- getFree:返回空闲资源,不为空则将LeaseRequest设置为已完成(即isDone=true)。并且将CPool的该资源由available转移至leased
CPool
- maxTotal:全局最大可用资源数量
- available:pool entry可复用链表
- leased:已租借集合,可复用链表转移而来。包含RouteSpecificPool:leased结合
- pending:待处理会话请求集合,当RouteSpecificPool已分配数量小于maxPerRoute并且maxTotal存在剩余空间,创建的待连接的会话请求
- leasingRequests:租赁中的请求链表,当LeaseRequest请求未完成,并且processPendingRequest未完成时,将LeaseRequest添加至该链表
- completedRequests:已完成请求队列,当LeaseRequest请求完成时,将LeaseRequest添加至该队列
- totalUsed=maxTotal-pending-leased
方法
- fireCallbacks:处理completedRequests队列
- release:释放已租借资源
LeaseRequest
- future:状态机变化时回调该动作,例如:完结时回调PoolingNHttpClientConnectionManager.FutureCallback#completed->DefaultClientExchangeHandlerImpl#requestConnection->connectionAllocated->将LeaseRequest请求数据写入es服务端等待响应
小结
- leased:已租借的连接资源。在请求响应完成时回调callback释放已租借资源,例如:org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl#responseCompleted/org.apache.http.impl.nio.client.MainClientExec#responseCompleted。一些关闭或异常场景也会释放连接资源,例如:org.apache.http.impl.nio.client.AbstractClientExchangeHandler#discardConnection
- available:可租借的连接资源。如果存在直接获取链接,将其转移至leased
- leasingRequests:正在租借连接资源的请求,等待有可用连接时处理该请求,并将该请求转移至leased或pending
- completedRequests:已完成租借并且已与服务端建立连接的请求,等待客户端请求写入服务端完成,服务端响应数据后将其再次出借或释放
- pending:已完成租借,且正在与服务端建立连接中的请求,建立完成后回调org.apache.http.nio.pool.AbstractNIOConnPool#requestCompleted租借连接将客户端请求写入服务端,租借成功则转移至leased,否则释放资源转移至available
问题推断
结合问题背景,有朋友可能已经有原因推断了,如果消息不断产生,生产的速度大于异步写es数据的速度,那么积压的请求都会堆积在leasingRequests这个无限链表里,那么就会出现gc频繁,并且无法回收,导致cpu飙高,降低服务的并发与吞吐量
问题复现
代码很简单,并发100线程异步写es数据,因为是异步写入es,因此不会阻塞,会很快完成写入,在完成1W条数据写入时可以手工dump内存快照
int size = 100;
ExecutorService es = Executors.newFixedThreadPool(size);
Thread.sleep(10 * 1000);
for (int i = 0; i < 10000; i++) {
int finalI = i;
es.submit(() -> {
try {
indexWriterService.indexDelete(docMessage);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(System.currentTimeMillis() + ":"+ finalI);
}
);
}
Thread.sleep(Integer.MAX_VALUE);
问题分析
dump命令
jmap -dump:live,format=b,file=dump.hprof 30652
使用ibm工具分析dump文件,工具jar可以在官网下载,执行命令如下(因为是jdk17所以有很多参数,如果不是17可以直接执行工具)
java
# jdk17导致不得不加的参数
--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/sun.net.util=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.text=ALL-UNNAMED --add-opens java.desktop/java.awt.font=ALL-UNNAMED --add-opens java.desktop/sun.swing=ALL-UNNAMED
# jdk8直接直接下面这个工具jar即可
-Xmx4G -jar ha457.jar
分析结果,可以看到72%的内存暂用被列举为嫌疑对象了,而其中42%也正是与我们推断一致,蓝色部分为对象入口PoolingNHttpClientConnectionManager->CPool->LinkedList->Node(LeaseRequest)。至此破案
另外的30%呢,同样可以通过工具跟踪到其中23%的占用其实是我们的spring框架中加载的环境配置数据
压测时使用arthas对生产进行dump,也验证了我们的推断-.-
解决方法
- 为无限队列增加限制,方法很多,例如:增加一个请求计数器,请求进来时判断处理中数量,放行则递增,完结或异常时递减
- 将租借超时时间(对应配置:org.apache.http.client.config.RequestConfig.Builder#connectionRequestTimeout,默认Long的最大值)设置小一些(不推荐),超时不会再占用leasingRequests而是直接进入completedRequests并fireCallbacks,可能会有大量超时异常依然会消耗系统资源得不偿失,本文复现时也出现了该异常,堆栈见:注1
- 通过连接池状态限制请求数量(org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager#getStats)。es客户端没有对外暴露任何连接池相关的方法,如果想要通过该方式处理则需要实现es的回调接口org.opensearch.client.RestClientBuilder.HttpClientConfigCallback#customizeHttpClient,通过该回调传入定义好的连接池,例如:注2,实际如果选择该方案可以参考官方的兜底构建方式(org.apache.http.impl.nio.client.HttpAsyncClientBuilder#build)。例如我们增加了一个简单的监控打点(见注3),可以观察到每个路由的任务积压状态:pool_status=[leased: 0; pending: 6561(包含leasingRequests数量); available: 0; max: 2]
注1:
at org.opensearch.client.RestHighLevelClient$1.onFailure(RestHighLevelClient.java:1966) ~[opensearch-rest-high-level-client-2.3.0.jar:2.3.0]
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:707) ~[opensearch-rest-client-2.3.0.jar:2.3.0]
at org.opensearch.client.RestClient$1.failed(RestClient.java:450) ~[opensearch-rest-client-2.3.0.jar:2.3.0]
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) ~[httpcore-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.executionFailed(DefaultClientExchangeHandlerImpl.java:101) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:426) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.connectionRequestFailed(AbstractClientExchangeHandler.java:348) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.access$100(AbstractClientExchangeHandler.java:62) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler$1.failed(AbstractClientExchangeHandler.java:392) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) ~[httpcore-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$1.failed(PoolingNHttpClientConnectionManager.java:316) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137) ~[httpcore-4.4.11.jar:4.4.11]
at org.apache.http.nio.pool.AbstractNIOConnPool.fireCallbacks(AbstractNIOConnPool.java:503) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:633) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) ~[httpasyncclient-4.1.4.jar:4.1.4]
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) ~[httpasyncclient-4.1.4.jar:4.1.4]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391) ~[httpcore-nio-4.4.11.jar:4.4.11]
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:629) ~[httpcore-nio-4.4.11.jar:4.4.11]
... 8 more
注2:
RestClient.builder(HttpHost.create("myServerCluster")).setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(apolloConfig.getEsMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(apolloConfig.getEsMaxConnectPerRoute());
ConnectingIOReactor ioreactor = IOReactorUtils.create(
defaultIOReactorConfig != null ? defaultIOReactorConfig : IOReactorConfig.DEFAULT, threadFactory);
PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(
ioreactor,
RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", null)
.build());
httpClientBuilder.setConnectionManager(poolingmgr);
return httpClientBuilder;
});
注3:
PoolingNHttpClientConnectionManager poolingMgr = esClientFactory.getPoolingmgr();
if (poolingMgr != null) {
for (HttpRoute route : poolingMgr.getRoutes()) {
System.out.println("pool_status="+poolingMgr.getStats(route));
}
}