什么是
TCP
粘包半包?
假设客户端分别发送了两个数据包
D1
和
D2
给服务端,由于服务端一次读取到的字节
数是不确定的,故可能存在以下
4
种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是
D1
和
D2
,没有粘包和拆包;
(2)服务端一次接收到了两个数据包,
D1
和
D2
粘合在一起,被称为
TCP
粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到了完整的
D1
包和
D2
包的部分
内容,第二次读取到了
D2
包的剩余内容,这被称为
TCP
拆包;
(
4
)服务端分两次读取到了两个数据包,第一次读取到了
D1
包的部分内容
D1_1
,第
二次读取到了
D1
包的剩余内容
D1_2
和
D2
包的整包。
如果此时服务端
TCP
接收滑窗非常小,而数据包
D1
和
D2
比较大,很有可能会发生第
五种可能,即服务端分多次才能将
D1
和
D2
包接收完全,期间发生多次拆包。
服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类说明:基于Netty的服务器
*/
public class EchoSvrStickyHalf {
private static final Logger LOG = LoggerFactory.getLogger(EchoSvrStickyHalf.class);
private final int port;
public EchoSvrStickyHalf(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoSvrStickyHalf echoSvrStickyHalf = new EchoSvrStickyHalf(port);
LOG.info("服务器即将启动");
echoSvrStickyHalf.start();
LOG.info("服务器关闭");
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*服务端启动必备*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoSvrSHHandler());
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = b.bind().sync();
LOG.info("服务器启动完成。");
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public class EchoSvrSHHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
// /*** 服务端读取完成网络数据后的处理*/
// @Override
// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
// }
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
客户端
import cn.tuling.nettybasic.echo.EchoClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类说明:基于Netty的客户端
*/
public class EchoCliStickyHalf {
private final int port;
private final String host;
public EchoCliStickyHalf(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*客户端启动必备,和服务器的不同点*/
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)/*指定使用NIO的通信模式*/
/*指定服务器的IP地址和端口,和服务器的不同点*/
.remoteAddress(new InetSocketAddress(host,port))
/*和服务器的不同点*/
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoCliSHHandler());
}
});
/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoCliStickyHalf(9999,"127.0.0.1").start();
}
public class EchoCliSHHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//ByteBuf msg = null;
String request = "a,b,c,d,e";
final ByteBufAllocator alloc = ctx.alloc();
ByteBuf msg = null;
/*我们希望服务器接受到100个这样的报文*/
for(int i=0;i<100;i++){
ByteBuf byteBuf = alloc.buffer();
msg = alloc.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
TCP 粘包/半包发生的原因
由于
TCP
协议本身的机制(面向连接的可靠地协议
-
三次握手机制)客户端与服务器会
维持一个连接(
Channel
),数据在连接不断开的情况下,可以持续不断地将多个数据包发
往服务器,但是如果发送的网络数据包太小,那么他本身会启用
Nagle
算法(可配置是否启
用)对较小的数据包进行合并(基于此,
TCP
的网络延迟要
UDP
的高些)然后再发送(超
时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪
些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲
区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个
数据包的情况,造成粘包现象
UDP
:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对
数据包进行合并发送(也就没有
Nagle
算法之说了),他直接是一端发送什么数据,直接就
发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据
+UDP
头
+IP
头等等发一
次数据封装一次)也就没有粘包一说了。
分包产生的原因就简单的多:就是一个数据包被分成了多次接收。
更具体的原因至少包括:
1.
应用程序写入数据的字节大小大于套接字发送缓冲区的大小
2.
进行
MSS
大小的
TCP
分段。
MSS
是最大报文段长度的缩写。
MSS
是
TCP
报文段中的
数据字段的最大长度。数据字段加上
TCP
首部才等于整个的
TCP
报文段。所以
MSS
并不是
TCP
报文段的最大长度,而是:
MSS=TCP
报文段长度
-TCP
首部长度
解决粘包半包
由于底层的
TCP
无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重
组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,
可以归纳如下。
(1) 在包尾增加分割符,比如回车换行符进行分割,例如
FTP
协议
String request = "a,b,c,d,e" + System.getProperty("line.separator");
"C:\Program Files\Java\jdk1.8.0_131\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\lib\idea_rt.jar=57393:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_131\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\rt.jar;C:\Users\Administrator\Desktop\新建文件夹\24-Netty使用和常用组件辨析(一)-Mark\ketang-tl-netty\netty-basic\target\classes;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\Administrator\.m2\repository\org\msgpack\msgpack\0.6.12\msgpack-0.6.12.jar;C:\Users\Administrator\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\Administrator\.m2\repository\de\javakaffee\kryo-serializers\0.42\kryo-serializers-0.42.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\4.0.0\kryo-4.0.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;C:\Users\Administrator\.m2\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.2\objenesis-2.2.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.42.Final\netty-all-4.1.42.Final.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar" cn.tuling.nettybasic.splicing.demo.EchoCliStickyHalf
client Accept[Hello,a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,] and the counter is:1
client Accept[d,e
a. Welcome to Netty World!
Hello,,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
a,b,c,d,e
. Welcome to Netty World!
] and the counter is:2
(自定义分割符)
服务端
package cn.tuling.nettybasic.splicing.delimiter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类说明:
*/
public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~";
public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException {
DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
System.out.println("服务器即将启动");
delimiterEchoServer.start();
}
public void start() throws InterruptedException {
final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterServerHandler());
}
}
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
//ctx.close();
}
/*** 服务端读取完成网络数据后的处理*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete------");
//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 作者:Mark
* 类说明:
*/
public class DelimiterEchoClient {
private final String host;
public DelimiterEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
.remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(
DelimiterEchoServer.DELIMITER_SYMBOL.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new DelimiterEchoClient("127.0.0.1").start();
}
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "a,b,c,d"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
for(int i=0;i<10;i++){
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
System.out.println("发送数据到服务器");
}
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
"C:\Program Files\Java\jdk1.8.0_131\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\lib\idea_rt.jar=62139:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_131\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\rt.jar;C:\Users\Administrator\Desktop\新建文件夹\24-Netty使用和常用组件辨析(一)-Mark\ketang-tl-netty\netty-basic\target\classes;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\Administrator\.m2\repository\org\msgpack\msgpack\0.6.12\msgpack-0.6.12.jar;C:\Users\Administrator\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\Administrator\.m2\repository\de\javakaffee\kryo-serializers\0.42\kryo-serializers-0.42.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\4.0.0\kryo-4.0.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;C:\Users\Administrator\.m2\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.2\objenesis-2.2.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.42.Final\netty-all-4.1.42.Final.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar" cn.tuling.nettybasic.splicing.delimiter.DelimiterEchoClient
已连接到服务器.....
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
发送数据到服务器
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:1
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:2
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:3
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:4
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:5
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:6
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:7
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:8
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:9
client Accept[Hello,a,b,c,d. Welcome to Netty World!] and the counter is:10
注意:@ChannelHandler.Sharable注解
@ChannelHandler.Sharable注解的作用是标注一个 ChannelHandler实例可以被多个通道安全地共享,另外如果没有使用该注解修饰的ChannelHandler,那该ChannelHandler每次添加到通道的流水线都必须创建一个新的Handler实例;
Sharable的意思是在多个通道的流水线可以加入同一个ChannelHandler业务处理器实例,而这种共享操作, Netty默认是不允许的;
换句话说,如果一个Handler没有使用@ChannelHandler.Sharable注解修饰,并试图将同一个Handler实例添加到多个ChannelPipeline通道流水线,或在同一个ChannelPipeline通道流水线中添加同一个Handler实例时, Netty将会抛出异常;
2
)消息定长,例如每个报文的大小为固定长度
200
字节,如果不够,空位补空格;
服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类说明:
*/
public class FixedLengthEchoServer {
public static final String RESPONSE = "Welcome to Netty!";
public static final int PORT = 9996;
public static void main(String[] args) throws InterruptedException {
FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer();
System.out.println("服务器即将启动");
fixedLengthEchoServer.start();
}
public void start() throws InterruptedException {
final FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new FixedLengthFrameDecoder(
FixedLengthEchoClient.REQUEST.length()));
ch.pipeline().addLast(new FixedLengthServerHandler());
}
}
@ChannelHandler.Sharable
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
ctx.writeAndFlush(Unpooled.copiedBuffer(
FixedLengthEchoServer.RESPONSE.getBytes()));
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 类说明:
*/
public class FixedLengthEchoClient {
public final static String REQUEST = "a,b,c,d,e";
private final String host;
public FixedLengthEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
.remoteAddress(new InetSocketAddress(host,FixedLengthEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new FixedLengthFrameDecoder(
FixedLengthEchoServer.RESPONSE.length()));
ch.pipeline().addLast(new FixedLengthClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new FixedLengthEchoClient("127.0.0.1").start();
}
public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
+ "] and the counter is:" + counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
for (int i = 0; i < 10; i++) {
msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());
msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
"C:\Program Files\Java\jdk1.8.0_131\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:63683,suspend=y,server=n -javaagent:C:\Users\Administrator\AppData\Local\JetBrains\IntelliJIdea2020.1\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_131\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\rt.jar;C:\Users\Administrator\Desktop\新建文件夹\24-Netty使用和常用组件辨析(一)-Mark\ketang-tl-netty\netty-basic\target\classes;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\Administrator\.m2\repository\org\msgpack\msgpack\0.6.12\msgpack-0.6.12.jar;C:\Users\Administrator\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\Administrator\.m2\repository\de\javakaffee\kryo-serializers\0.42\kryo-serializers-0.42.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\4.0.0\kryo-4.0.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;C:\Users\Administrator\.m2\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.2\objenesis-2.2.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.42.Final\netty-all-4.1.42.Final.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar;C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\lib\idea_rt.jar" cn.tuling.nettybasic.splicing.fixed.FixedLengthEchoServer
Connected to the target VM, address: '127.0.0.1:63683', transport: 'socket'
服务器即将启动
服务器启动完成,等待客户端的连接和数据.....
Server Accept[a,b,c,d,e] and the counter is:1
Server Accept[a,b,c,d,e] and the counter is:2
Server Accept[a,b,c,d,e] and the counter is:3
Server Accept[a,b,c,d,e] and the counter is:4
Server Accept[a,b,c,d,e] and the counter is:5
Server Accept[a,b,c,d,e] and the counter is:6
Server Accept[a,b,c,d,e] and the counter is:7
Server Accept[a,b,c,d,e] and the counter is:8
Server Accept[a,b,c,d,e] and the counter is:9
Server Accept[a,b,c,d,e] and the counter is:10
"C:\Program Files\Java\jdk1.8.0_131\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\lib\idea_rt.jar=63728:C:\Program Files\JetBrains\IntelliJ IDEA 2020.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_131\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\rt.jar;C:\Users\Administrator\Desktop\新建文件夹\24-Netty使用和常用组件辨析(一)-Mark\ketang-tl-netty\netty-basic\target\classes;C:\Users\Administrator\.m2\repository\com\google\protobuf\protobuf-java\2.6.1\protobuf-java-2.6.1.jar;C:\Users\Administrator\.m2\repository\org\msgpack\msgpack\0.6.12\msgpack-0.6.12.jar;C:\Users\Administrator\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\Administrator\.m2\repository\de\javakaffee\kryo-serializers\0.42\kryo-serializers-0.42.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\4.0.0\kryo-4.0.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\reflectasm\1.11.3\reflectasm-1.11.3.jar;C:\Users\Administrator\.m2\repository\org\ow2\asm\asm\5.0.4\asm-5.0.4.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.2\objenesis-2.2.jar;C:\Users\Administrator\.m2\repository\io\netty\netty-all\4.1.42.Final\netty-all-4.1.42.Final.jar;C:\Users\Administrator\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\Administrator\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;C:\Users\Administrator\.m2\repository\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar" cn.tuling.nettybasic.splicing.fixed.FixedLengthEchoClient
已连接到服务器.....
client Accept[Welcome to Netty!] and the counter is:1
client Accept[Welcome to Netty!] and the counter is:2
client Accept[Welcome to Netty!] and the counter is:3
client Accept[Welcome to Netty!] and the counter is:4
client Accept[Welcome to Netty!] and the counter is:5
client Accept[Welcome to Netty!] and the counter is:6
client Accept[Welcome to Netty!] and the counter is:7
client Accept[Welcome to Netty!] and the counter is:8
client Accept[Welcome to Netty!] and the counter is:9
client Accept[Welcome to Netty!] and the counter is:10