背景
作为后端开发相信大家或多或少都接触过Nettty,说起Netty真实又爱又恨,因为基于它可以很简单的开发高性能的Java网络通信服务,但同时要是不小心就会出现各种奇奇怪怪的问题,特别是由于特殊的内存管理机制很容易出现内存泄漏问题即OOM问题。这些天就遇到了类似的问题,排查和解决起来确实废了不小功夫,特别这里记录一下。
其实Netty内存泄漏问题一般都比较好解决,典型的就是各种ByteBuf没有被Release,但如果遇到非典型的问题就比较考验技术人员功力的时候了。
现象
监控系统显示有一个服务在下班前突然自己挂掉了。我看了一下这个服务今天确实调整了一下日志配置,配置有错误,没办法与logstash通讯,但这个应该不会导致OOM问题才对。
于是查了一下日志显示最后是OOM了才,很明显是由于内存泄漏造成的。
日志如下:
为了确认问题的可重复性,又重启了服务,第二天上班又用htop看了一下,确实内存占用增加了一块,证实并非偶发原因,应该是代码中存在问题导致。于是开启了定位的过程。
定位过程
获取Dump文件
由于在生产系统中,直接去定位会有影响,还是研究一下dump文件
jps找到问题服务的pid,jmap导出内存问题件,zip压缩一下弄到开发机器。
分析dump文件
既然已经有了dump文件,就选择一个工具来分析就好了,我选择的是比较好使的HeroDump,至少界面看着比较好看还有自动提示。
明显这407个NioEventLoop不正常,一个1M都能有400M+了,占了90%+的内存,就他没跑了。
过了一会第二次的快照分析也完成了,结果相似,而且NioEventLoop更是暴涨到1339了。
可以看到NioEventLoop持续的在积累。
继续查看实际占用内存的数据类型
这些byte[]应该是NioEventLoop下属的数据。
到这里可以看出主要问题是NioEventLoop的垃圾回收机制没有发挥作用或是回收的速度没有新增的速度快,导致这个NioEventLoop持续增长。
这个结果反映了是不是一个好定位的问题,因为问题往往都是出现在业务代码中,而现在定位的结果却显示Netty本身的 核心NioEventLoop有问题。而Netty本身存在这么重大缺陷几乎是不可能的,这条线是追查不下去了。
代码分析
于是只能分析负责客户端连接的代码
@Service
@Slf4j
public class ClientModeConnectionManager {
/**
* 定时刷新连接情况
*/
@Scheduled(fixedRate = 30 * 1000L)
public void refreshConnection() {
//没有初始化完成就跳出
if (!flag) {
return;
}
//查询配置文件中全部的设备信息
Map<String, String> all = configService.getAllByType("server");
//断开不再启用的设备
connectionCacheService.getAll().forEach((puid, channel) -> {
if (configService.findByPuid(puid) == null) {
disconnect(puid);
}
});
all.forEach((address, puid) -> {
if (StringUtils.isEmpty(puid) || StringUtils.isEmpty(address)) {
log.error("配置错误{}-{}", puid, address);
return;
}
//查询在配置文件中未连接的设备
if (!isConnected(puid)) {
executorService.submit(() -> {
try {
connectToFacility(address, puid);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
});
}
private boolean isConnected(String puid) {
if (connectionCacheService.isConnected(puid)) {
Channel channel = connectionCacheService.getChannel(puid);
if (channel.isActive()) {
return true;
} else {
disconnect(puid);
return false;
}
} else {
disconnect(puid);
return false;
}
}
private void disconnect(String puid) {
Channel channel = connectionCacheService.getChannel(puid);
if (channel != null) {
try {
channel.close().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
connectionCacheService.clearChannel(puid);
}
EventLoopGroup eventExecutors = clients.get(puid);
if (eventExecutors != null) {
eventExecutors.shutdownGracefully();
}
}
@PreDestroy
public void stop() throws InterruptedException {
Map<String, String> all = configService.getAllByType("server");
for (String puid : all.values()) {
if (StringUtils.isEmpty(puid)) {
log.error("配置错误{}", puid);
continue;
}
//查询在配置文件中未连接的设备
if (connectionCacheService.isConnected(puid)) {
Channel channel = connectionCacheService.getChannel(puid);
channel.closeFuture().sync();
}
EventLoopGroup eventExecutors = clients.get(puid);
if (eventExecutors != null) {
eventExecutors.shutdownGracefully();
}
}
}
private synchronized void connectToFacility(String address, String puid) throws InterruptedException {
log.info("开始尝试客户端模式连接{},PUID:{}", address, puid);
EventLoopGroup bossGroup = new NioEventLoopGroup();
String[] array = address.split(":");
String ip = array[0];
int port = Integer.parseInt(array[1]);
Bootstrap bs = new Bootstrap();
bs.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 处理来自服务端的响应信息
socketChannel.pipeline().addLast(channelHandlerAdapter);
}
});
// 客户端开启
ChannelFuture cf = bs.connect(ip, port).sync();
log.info("主机{}连接成功,客户端模式,PUID:{}", ip, puid);
clients.put(puid, bossGroup);
Channel oldChannel = connectionCacheService.getChannel(puid);
if (oldChannel != null && (!cf.channel().equals(oldChannel))) {
try {
oldChannel.closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
connectionCacheService.setChannel(puid, cf.channel());
}
}
从代码中可以看出来,这个服务跟大部分Netty使用场景不太一样,大部分netty都是用作服务端了,但这里是用作客户端。这就可能是问题所在,Netty客户端开发人员没啥经验就很可能出现比较大的设计问题。
从上面的分析可以得到的结论应该是客户端管理服务,也就是ClientModeConnectionManager中出现的问题。
通过对ClientModeConnectionManager的进一步分析,同时对比各种示例程序,发现有两个大的不同点。
- EventLoopGroup一般不作为局部变量使用
- 最后要释放EventLoopGroup,也就是执行 eventLoopGroup.shutdownGracefully(),并不是要释放Bootstrap。
当然如果不是神仙或是对Netty极为熟悉的大佬是不可能一次性找到问题的,这个发现是通过逐项的修改对比和实验才确定的。
最终为了更好的管理EventLoopGroup生命周期,对代码进行一定的调整,将连接停止逻辑独立为ConnectThread(其实叫ClientThread更准确)。具体请见下面示例和模板。
逻辑反向闭环
这个是定位问题非常重要的一步就是用你得到的结论,再反向按照逻辑推出出现的问题,如果逻辑不通那必定还存在其他问题。
逻辑推演过程:
- EventLoopGroup虽然是局部变量但其实NioEventLoop是在独立的线程运行无法被垃圾回收
- 客户端管理代码没有在服务端连接异常的时候释放NioEventLoop也就是没有执行eventLoopGroup.shutdownGracefully(),导致eventLoopGroup以及NioEventLoop相关的内存没有得到释放
- ClientModeConnectionManager每30s会进行一次刷新,不断的尝试重新连接服务端,但由于特殊原因服务端一直不在线,因此会一直创建eventLoopGroup和对应的NioEventLoop
- NioEventLoop及其内部数据的内存不断积累最终导致OOM
修复确认
就算是逻辑反向闭环了,也需要实验来最终验证,最后还是要测试说了算的。
修复前
8小时内必OOM
修复后
持续测试
在生产系统上6.6%至少持续了5小时。
28小时后维持在6.7
45小时候依然在6.7%
改善措施
启动方式改进
在启动命令行中添加-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\tmp
参数,保证异常挂掉时能够生成dump文件,好第一时间定位问题。
建立Netty代码模板
形成标准的Netty客户端和服务端模板避免此类问题再次产生
客户端模板
ClientManager.java
/**
* 服务启动监听器
*
* @author ZEW
*/
@Service
@Slf4j
public class ClientModeConnectionManager {
/**
* 定时刷新连接情况
*/
@Scheduled(fixedRate = 30 * 1000L)
public void refreshConnection() {
//没有初始化完成就跳出
if (!flag) {
return;
}
//查询配置文件中全部的设备信息
Map<String, String> all = configService.getAllByType("server");
//断开不再启用的设备
connectionCacheService.getAll().forEach((puid, channel) -> {
if (configService.findByPuid(puid) == null) {
disconnect(puid);
}
});
all.forEach((address, puid) -> {
if (StringUtils.isEmpty(puid) || StringUtils.isEmpty(address)) {
log.error("配置错误{}-{}", puid, address);
return;
}
//查询在配置文件中未连接的设备
if (!isConnected(puid)) {
disconnect(puid);
ConnectThread task = new ConnectThread(puid, address, connectionCacheService, rabbitTemplate, configService);
threads.put(puid, task);
executorService.submit(task);
}
});
}
private boolean isConnected(String puid) {
ConnectThread connectThread = threads.get(puid);
if (connectThread == null) {
return false;
}
return connectThread.isConnect();
}
private void disconnect(String puid) {
ConnectThread connectThread = threads.get(puid);
if (connectThread != null) {
connectThread.disconnect();
}
threads.remove(puid);
}
@PreDestroy
public void stop() throws InterruptedException {
Map<String, String> all = configService.getAllByType("server");
for (String puid : all.values()) {
if (StringUtils.isEmpty(puid)) {
log.error("配置错误{}", puid);
continue;
}
//查询在配置文件中未连接的设备
if (connectionCacheService.isConnected(puid)) {
Channel channel = connectionCacheService.getChannel(puid);
channel.closeFuture().sync();
}
disconnect(puid);
}
}
}
ConnectThread.java
/**
* @author zew
*/
@Slf4j
public class ConnectThread implements Runnable {
@Override
public void run() {
try {
//避免重复连接导致NioEventLoopGroup和NioEventLoop重复创建,内存溢出
disconnect();
connectToFacility(address, puid);
if (currentChannel != null) {
currentChannel.closeFuture().sync();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("{} CONNECT FAILED", puid, e);
return;
} finally {
disconnect();
}
log.info("{} TRANSPORT FINISH", puid);
}
private void connectToFacility(String address, String puid) throws InterruptedException {
log.info("START TO CONNECT TO {},PUID:{}", address, puid);
String[] array = address.split(":");
String ip = array[0];
int port = Integer.parseInt(array[1]);
bootstrap = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIME_OUT)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 处理来自服务端的响应信息
socketChannel.pipeline().addLast(
new ServerChannelHandlerAdapter(configService, connectionCacheService, rabbitTemplate));
}
});
try {
// 客户端开启
ChannelFuture cf = bootstrap.connect(ip, port).sync();
if (cf.awaitUninterruptibly(TIME_OUT, TimeUnit.MILLISECONDS) && cf.isSuccess()) {
log.info("HOST {} connect successful,CLIENT,PUID:{}", ip, puid);
currentChannel = cf.channel();
connectionCacheService.setChannel(puid, currentChannel);
} else {
log.warn("HOST {} connect failed,CLIENT,PUID:{}", ip, puid);
}
} catch (InterruptedException e) {
disconnect();
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("HOST {} connect failed,CLIENT,PUID:{}", ip, puid, e);
}
}
public void disconnect() {
if (currentChannel != null) {
try {
currentChannel.close().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (eventLoopGroup != null) {
try {
eventLoopGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
eventLoopGroup = null;
}
bootstrap = null;
}
public boolean isConnect() {
if (currentChannel == null) {
return false;
}
if (currentChannel.isActive()) {
return true;
} else {
log.warn("Channel:{} is not active,then close it", puid);
return false;
}
}
}
这里的客户端代码还有改进空间,比如重连机制可以作为listener单独处理。
服务端模板
/**
* 服务启动监听器
*
* @author ZEW
*/
@Component
@Slf4j
public class ServerModeConnectionListener {
@Value("${facility.connection.port:10030}")
private Integer port;
/**
* 创建bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
private final EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
private final EventLoopGroup work = new NioEventLoopGroup();
/**
* 通道适配器
*/
@Resource
private ServerChannelHandlerAdapter channelHandlerAdapter;
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
log.info("关闭服务器....");
//优雅退出
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
@PostConstruct
public void start() {
ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 4, 2,
TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new DefaultThreadFactory("直连服务端模式-%d"));
executorService.submit(() -> {
// 从配置文件中(application.yml)获取服务端监听端口号
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
//设置事件处理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(channelHandlerAdapter);
}
});
log.info("netty服务器在[{}]端口启动监听", port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.info("[出现异常] 释放资源");
Thread.currentThread().interrupt();
} finally {
work.shutdownGracefully();
}
});
}
}
增加理论深度并结合实践
深入理解Netty原理和GC理论加速问题定位原因
- 了解NioEventLoop和NioEventLoop关系
- 了解客户端和服务端代码范式
- 了解哪些地方需要同步处理,哪些地方可以是异步的
- 哪些地方需要使用Sync等待
总结
OOM问题往往最令人烦躁的就是难以定位和确定是否修复,这个问题反反复复解决了一周。当然生产系统在人肉运维的情况下没有出现问题。
- 实践出真知,定位问题的能力更能够反映技术水平
- 多看看官方文档比较靠谱,而且往往都有例子来。
- 单因素对比实验是在没有足够线索和理论支撑的情况下定位问题原因的有效方法。
参考资源
- https://netty.io/wiki/
- https://netty.io/4.1/xref/io/netty/example/udt/echo/bytes/ByteEchoClient.html#ByteEchoClient
- https://juejin.cn/post/7053793963680989198