Netty
- 引入依赖
- 服务端
- 服务端处理读写业务的Handler
- 客户端
- 实现客户端handler
Netty具备设计优雅、使⽤⽅便、性能强劲等优点,
引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
服务端
public class NettyServer {
public static void main(String[] args) {
// 创建只处理客户端连接请求的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
// 创建只处理客户端读写业务的线程组
EventLoopGroup workGroup = new NioEventLoopGroup(10);
try {
// 创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap.group(bossGroup, workGroup) // 配置group
// 配置通道类型(服务器的通道实现)
.channel(NioServerSocketChannel.class)
// 配置用于存放因没有空闲线程导致连接请求被暂存放到队列中的最大长度
.option(ChannelOption.SO_BACKLOG, 1024)
// 创建通道初始化的对象并配置该对象,向该对象中添加处理器来实现具体的业务
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加处理器: 处理器里面是真正处理业务的
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty的服务端启动了");
// sync同步阻塞的启动服务端
ChannelFuture channelFuture = bootstrap.bind(9090).sync();
// 只要服务没关闭,该方法会一直阻塞
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 端口所有连接并清理内存
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
服务端处理读写业务的Handler
/**
* 自定义的handler, 实现Netty处理器的规范
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当有客户端发生数据来时,该方法就会被调用
* @param ctx 当前上下文对象:含有Channel和pipeline的上下文对象
* @param msg 客户端发送来的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的数据:" + buf.toString(StandardCharsets.UTF_8));
}
/**
* 读完数据后调用的方法,发送数据给客户端
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 创建携带的ByteByf对象
ByteBuf buf = Unpooled.copiedBuffer("hello client".getBytes(StandardCharsets.UTF_8));
// 把数据写入通道中
ctx.writeAndFlush(buf);
}
/**
* 异常捕获
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 打印异常
System.out.println(cause.getMessage());
// 关闭通道
ctx.close();
}
}
客户端
/**
* Netty客户端
*/
public class NettyClient {
public static void main(String[] args) {
// 创建一个线程组用于事件循环
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
// 创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
try {
// 设置相关参数
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
// 创建通道初始化对象并设置handler业务处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加处理器,处理器是实现具体业务的
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("Netty客户端启动了");
// 同步阻塞:告知客户端连接的服务器的地址,并启动客户端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
// 阻塞等待完成操作后关闭通道
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
实现客户端handler
/**
* 客户端Handler
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器后调用该方法:向服务端写数据
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("hello server".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(byteBuf);
}
/**
* 当通道有事件发生时调用的方法:读取服务器返回的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("来自服务器" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}