概述
业务场景
应用通过 WSS 客户端连接三方接口。在高并发压测时,出现了请求服务器写入失败的异常,该异常是偶发,出现的概率不到千分之一,异常如下图所示。
问题概述
注意:
- 因为握手是通过 http 协议进行的。所以,需要挂载 http 编解码器。
- 而在握手成功后。需要从 pipeline 中删除 http 编解码器,并挂载 WebSocket 编解码器。即从 http 协议升级为 WebSocket 协议。
向第三方接口请求时(channel.writeAndFlush()
),抛出了 “unsupported message type
” 异常。
该异常,是消息类型不正确导致的,由异常提示可知,要求消息类型是 ByteBuf
或 FileRegion
。
因 BUG 出现的概率极低,在服务中无法复现,只能通过查看源码和日志,分析原因。
整个握手的过程如下所示:
- 应用与第三方建立连接。
- 应用发送握手请求,在请求成功后,挂载 WebSocket 编码器。(有90%的可能是因为这一步的导致的异常)
- 第三方接口握手响应。
- 应用进行握手完成处理。
- 应用与第三方接口握手完成,可以进行正常首发报文。
握手完成后,执行的操作主要是:卸载http编解码器,挂载 WebSocket 解码器。
最终的分析结果:客户端应用在握手期间,虽然请求已经成功发送到第三方接口。但是由于未知原因,造成请求握手的 FutureListener
延迟执行,进而造成 WebSocket 编码器挂载失败。
环境
jdk1.8。
Netty 依赖。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.90.Final</version>
</dependency>
常用的 WSS 通信代码
服务端代码
服务端的代码比较简单,用的都是 Netty 提供的编解码器。
自定义 Handler :收到报文后,响应给客户端。
public class WssServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(10);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// http编解码处理
pipeline.addLast("http-codec", new HttpServerCodec());
// http聚合处理
pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
// webSocket协议处理器,其中包含了握手的处理逻辑
pipeline.addLast(new WebSocketServerProtocolHandler("/", null, false, 65536));
pipeline.addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof TextWebSocketFrame) {
String text = ((TextWebSocketFrame) msg).text();
System.out.println("server received text: " + text);
ctx.writeAndFlush(new TextWebSocketFrame("I received your msg: " + text));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
Channel channel = bootstrap.bind(8000).sync().channel();
System.out.println("server started ... port: " + 8000);
channel.closeFuture().sync();
}
}
客户端代码
WSS 客户端类 WsSslClient
在客户端与服务端建立连接成功后,进行握手请求。握手成功后,才是真正的 connect 成功,即 WsSslClient.connect()
的逻辑。
握手逻辑说明:
- 握手逻辑发生在链路连接完成后。
- 调用
handshaker.handshake(channel)
发送握手请求 ClientBizHandler
收到消息时,首先处理握手,并设置握手异步结果 (handshakeFinishPromise
)- 通过
handshakeFinishPromise
判断是否握手成功,进而可以判断是否真正的连接成功。
public class WsSslClient {
private static final String URL = "wss://localhost:8000";
private URI server;
private Bootstrap bootstrap = new Bootstrap();
/** Web握手类:用于握手处理 */
private WebSocketClientHandshaker handshaker;
public WsSslClient() throws Exception {
server = new URI(URL);
// 握手处理类
handshaker = WebSocketClientHandshakerFactory
.newHandshaker(server, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
init();
}
public void init() {
// 客户端线程组-10个线程
EventLoopGroup group = new NioEventLoopGroup(10);
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.channel(NioSocketChannel.class)
// 设置WebSocket相关处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
// http编解码处理
pipeline.addLast("http-codec", new HttpClientCodec());
// http聚合处理
pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
// webSocket聚合处理
pipeline.addLast(new WebSocketFrameAggregator(65536));
// webSocket业务处理
pipeline.addLast("client-handler", new ClientBizHandler(handshaker));
}
});
System.out.println("client init success");
}
public Channel connect() throws InterruptedException {
System.out.printf("begin connect to %s\n", URL);
Channel channel = bootstrap.connect(server.getHost(), server.getPort()).sync().channel();
System.out.printf("connected to %s\n", URL);
// 发送握手
System.out.printf("request handshake %s\n", URL);
handshaker.handshake(channel);
// 获取握手异步结果对象
ClientBizHandler clientBizHandler = (ClientBizHandler)channel.pipeline().get("client-handler");
ChannelPromise handshakeFinishPromise = clientBizHandler.getHandshakeFinishPromise();
// 通过promise等待握手完成
if (!handshakeFinishPromise.awaitUninterruptibly(2000)) {
close(channel);
throw new RuntimeException("handshake timeout");
}
if (!handshakeFinishPromise.isSuccess()) {
throw new RuntimeException("handshake error");
}
System.out.printf("%s handshake finish, you can send msg now!\n", URL);
return channel;
}
public void request(Channel channel, String msg) {
System.out.println("request server, msg: " + msg);
channel.writeAndFlush(new TextWebSocketFrame(msg)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.err.println("writeAndFlush fail: "+ future.cause().getMessage());
}
}
});
}
public void close(Channel channel) {
if (channel != null && channel.isActive()) {
System.out.println("close");
channel.close();
}
}
}
业务处理器类
收到第三方接口响应时,先进行握手处理,然后才处理实际业务。
public class ClientBizHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFinishPromise;
public ClientBizHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFinishPromise() {
return handshakeFinishPromise;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
// 处理器被添加到实际的上下文时,创建一个异步结果对象,用于WsSslClient的连接函数
handshakeFinishPromise = ctx.newPromise();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
// 握手未完成,则进行握手处理
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(channel, (FullHttpResponse) msg);
System.out.println("handshake finished");
// 告知握手结果
handshakeFinishPromise.setSuccess();
} catch (Exception e) {
// 异常也要告知
System.err.println("handshake error: " + e.getMessage());
handshakeFinishPromise.setFailure(e);
}
return;
}
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
System.out.println("received server response: " + textWebSocketFrame.text());
// 实际处理...
}
}
}
进行 WSS 连接和发送的请求的 demo
public class ClientTest {
public static void main(String[] args) throws Exception {
WsSslClient wsSslClient = new WsSslClient();
Channel channel = wsSslClient.connect();
wsSslClient.request(channel, "hello server, I'm client");
}
}
正常发起测试
1.首先启动服务端,并输出日志。
server started ... port: 8000
2.运行 ClientTest
,请求服务端,日志输出如下。
客户端日志
client init success
begin connect to wss://localhost:8000
handlerAdded
connected to wss://localhost:8000
request handshake wss://localhost:8000
handshake finished
wss://localhost:8000 handshake finish, you can send msg now!
request server, msg: hello server, I'm client
服务端日志
server received text: hello server, I'm client
客户端日志
received server response: I received your msg: hello server, I'm client
异常分析
查看 handshaker.handshake(channel)
源码,可知其用来发送握手消息,并添加异步监听。
异步监听作用:在握手消息发送成功后,添加 WebSocket 编码器
WebSocketFrameEncoder
。这一步很关键,是造成异常的主要元凶。
因为是异步进行的监听,有可能会导致执行的延迟。
WebSocketFrameEncoder
编码器的功能,正是将 WebSocketFrame
类型的消息转化为 ByteBuf
。
我们可以推理一下,如果由于未知原因(如并发高、线程切换阻塞),导致握手消息发送成功,但是执行监听延迟。
- 也就是说
WebSocketFrameEncoder
还未挂载到 channel 的 pipeline 时, - 应用已经收到第三方的握手响应,完成握手响应逻辑处理,设置
handshakeFinishPromise
异步结果为成功。 WsSslClient.connect()
函数中阻塞等待handshakeFinishPromise
放行,即连接函数执行成功。- 执行
WsSslClient.request()
,发生真实请求(此时 pipeline 上无WebSocketFrameEncoder
)。 - 抛出
unsupported message type
异常。
重现异常
为了模拟 unsupported message type
异常,定义了一个 CustomWebSocketClientHandshaker13
,用于替代原客户端代码中的 handshaker
。
我用的是 W13 版本,大家根据实际情况,使用其他版本。
CustomWebSocketClientHandshaker13
重写了发送握手请求方法 handshake()
,握手请求监听处增加了延迟执行的逻辑。
public class CustomWebSocketClientHandshaker13 extends WebSocketClientHandshaker13 {
public CustomWebSocketClientHandshaker13(URI webSocketURL, WebSocketVersion version, String subprotocol,
boolean allowExtensions, HttpHeaders customHeaders,
int maxFramePayloadLength) {
super(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength);
}
/**
* 重写该方法,主要是用于复现出现的问题
* @param channel
* @return
*/
@Override
public ChannelFuture handshake(Channel channel) {
ChannelPromise promise = channel.newPromise();
FullHttpRequest request = this.newHandshakeRequest();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
// 写握手请求时,因未知原因,导致握手后编码器未挂载成功,
// 或者发送成功,但是因为未知原因,导致监听延迟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
if (ctx == null) {
ctx = p.context(HttpClientCodec.class);
}
if (ctx == null) {
promise.setFailure(new IllegalStateException("ChannelPipeline does not contain an HttpRequestEncoder or HttpClientCodec"));
return;
}
p.addAfter(ctx.name(), "ws-encoder", CustomWebSocketClientHandshaker13.this.newWebSocketEncoder());
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
}).start();
}
});
return promise;
}
}
不要忘记 WsSslClient
中的 handshaker
喔~~。 它要换成我们自定义的异常类 CustomWebSocketClientHandshaker13
,代码如下图所示。
handshaker = new CustomWebSocketClientHandshaker13(server, WebSocketVersion.V13, null, true,
new DefaultHttpHeaders(), 65536);
执行 ClientTest
就会复现该异常。
修复异常
第一个修复点 - WsSslClient
connect()
函数中, handshaker.handshake(channel)
会返回一个 ChannelFuture
对象,用于告知握手请求的执行结果。
也就是握手请求监听函数真正执行的结果。
我们拿到这个 Future
对象后,传递给业务处理器 clientBizHandler
。
改造后的 WsSslClient
源码。
public class WsSslClient {
private static final String URL = "wss://localhost:8000";
private URI server;
private Bootstrap bootstrap = new Bootstrap();
/** Web握手类:用于握手处理 */
private WebSocketClientHandshaker handshaker;
public WsSslClient() throws Exception {
server = new URI(URL);
// 握手处理类
// handshaker = WebSocketClientHandshakerFactory
// .newHandshaker(server, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
// 为复现问题,自己定义的握手类
handshaker = new CustomWebSocketClientHandshaker13(server, WebSocketVersion.V13, null, true,
new DefaultHttpHeaders(), 65536);
init();
}
public void init() {
// 客户端线程组-10个线程
EventLoopGroup group = new NioEventLoopGroup(10);
bootstrap.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.channel(NioSocketChannel.class)
// 设置WebSocket相关处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
// http编解码处理
pipeline.addLast("http-codec", new HttpClientCodec());
// http聚合处理
pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
// webSocket聚合处理
pipeline.addLast(new WebSocketFrameAggregator(65536));
// webSocket业务处理
pipeline.addLast("client-handler", new ClientBizHandler(handshaker));
}
});
System.out.println("client init success");
}
public Channel connect() throws InterruptedException {
System.out.printf("begin connect to %s\n", URL);
Channel channel = bootstrap.connect(server.getHost(), server.getPort()).sync().channel();
System.out.printf("connected to %s\n", URL);
// 发送握手
System.out.printf("request handshake %s\n", URL);
ChannelFuture handshakeRequestFuture = handshaker.handshake(channel);
// 获取握手异步结果对象
ClientBizHandler clientBizHandler = (ClientBizHandler)channel.pipeline().get("client-handler");
// 把握手异步结果,设置到clientBizHandler
clientBizHandler.setHandshakeRequestFuture(handshakeRequestFuture);
ChannelPromise handshakeFinishPromise = clientBizHandler.getHandshakeFinishPromise();
// 通过promise等待握手完成
if (!handshakeFinishPromise.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS)) {
close(channel);
throw new RuntimeException("handshake timeout");
}
if (!handshakeFinishPromise.isSuccess()) {
throw new RuntimeException("handshake error");
}
System.out.printf("%s handshake finish, you can send msg now!\n", URL);
return channel;
}
public void request(Channel channel, String msg) {
System.out.println("request server, msg: " + msg);
channel.writeAndFlush(new TextWebSocketFrame(msg)).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
System.err.println("writeAndFlush fail: "+ future.cause().getMessage());
}
}
});
}
public void close(Channel channel) {
if (channel != null && channel.isActive()) {
System.out.println("close");
channel.close();
}
}
}
第二个改造类 - ClientBizHandler
收到握手响应后,等待握手请求完成后,再进行握手 finish 处理(handshaker.finishHandshake(channel, (FullHttpResponse) msg)
)。
改造后的业务处理类源码。
public class ClientBizHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFinishPromise;
private ChannelFuture handshakeRequestFuture;
public ClientBizHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelPromise getHandshakeFinishPromise() {
return handshakeFinishPromise;
}
public void setHandshakeRequestFuture(ChannelFuture handshakeRequestFuture) {
this.handshakeRequestFuture = handshakeRequestFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
// 处理器被添加到实际的上下文时,创建一个异步结果对象,用于WsSslClient的连接函数
handshakeFinishPromise = ctx.newPromise();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
// 握手未完成,则进行握手处理
if (!handshaker.isHandshakeComplete()) {
handshakeRequestFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
throw new RuntimeException("handshake request fail");
}
try {
handshaker.finishHandshake(channel, (FullHttpResponse) msg);
System.out.println("handshake finished");
// 告知握手结果
handshakeFinishPromise.setSuccess();
} catch (Exception e) {
// 异常也要告知
System.err.println("handshake error: " + e.getMessage());
handshakeFinishPromise.setFailure(e);
}
}
});
return;
}
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
System.out.println("received server response: " + textWebSocketFrame.text());
// 实际处理...
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
验证BUG是否修复
后面就可以正常请求啦!!
输出的日志如下,可以发现,的确是等待请求处理成功后,才进行 finish 处理,并且报文也可以正常处理。