Netty入门代码示例(基于TCP服务)
Server端
package com.bierce.io.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup和WorkerGroup线程池组,均属于自旋状态
EventLoopGroup bossGroup = new NioEventLoopGroup(); //负责连接请求处理
EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行业务处理
try {
//创建服务器端启动,通过链式编程配置相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //设置为服务端通道
.option(ChannelOption.SO_BACKLOG,128) //设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //匿名创建通道初始对象
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler()); //为workerGroup下的NioEventLoop对应管道pipeline设置自定义处理器
}
});
System.out.println("Server is start Successful !!!");
ChannelFuture cf = bootstrap.bind(6668).sync(); //绑定指定端口并同步处理
cf.channel().closeFuture().sync(); //监听关闭通道方法
}finally { //关闭线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取客户端发送的数据
//ctx:上下文对象,包含管道pipeline,通道等信息
//msg:即客户端发送的数据
ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高
System.out.println("客户端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));
System.out.println("客户端地址 = " + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取客户端信息完成后进行的业务处理
// super.channelReadComplete(ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client!",CharsetUtil.UTF_8)); //将数据写到缓存并刷新
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close(); //处理异常需要关闭通道
}
}
Client
package com.bierce.io.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组
try {
Bootstrap bootstrap = new Bootstrap(); //客户端启动对象
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) //客户端通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器
}
});
System.out.println("Client Start Successful!!!");
ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();
sync.channel().closeFuture().sync(); //监听关闭通道
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { //通道就绪会触发该方法
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8)); //将数据写到缓存并刷新
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取服务端返回信息
ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高
System.out.println("服务端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));
System.out.println("服务端地址 = " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close(); //处理异常需要关闭通道
}
}
运行结果
Netty入门代码示例(基于HTTP服务)
package com.bierce.io.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.net.URI;
public class TestServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端启动,通过链式编程配置相关参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //设置为服务端通道
.childHandler(new TestServerInitializer()); //设置为自定义的初始化
System.out.println("Server is start Successful !!!");
ChannelFuture cf = bootstrap.bind(9999).sync(); //绑定指定端口并同步处理
cf.channel().closeFuture().sync(); //监听关闭通道方法
}finally { //关闭线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//HttpServerCodec是Netty提供的处理Http的编-解码器 使用io.netty:netty-all:4.1.20Final版本,其他版本不支持会报错
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//增加自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if (httpObject instanceof HttpRequest){
System.out.println("httpObject Type = " + httpObject.getClass());
System.out.println("Client Address = " + channelHandlerContext.channel().remoteAddress());
//对特定资源进行过滤
HttpRequest httpRequest = (HttpRequest)httpObject;
URI uri = new URI(httpRequest.getUri());
if ("/favicon.ico".equals(uri.getPath())){
System.out.println("favicon.ico资源不做响应");
return;
}
//回复浏览器信息(http协议)
ByteBuf content = Unpooled.copiedBuffer("Hello, I'm Server", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
运行结果
Netty心跳检测机制
package com.bierce.io.netty.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//IdleStateHandler:Netty提供的处理空闲状态的处理器
//readerIdleTime:多长时间没有读操作,会发送心跳检测包检测是否连接
//writerIdleTime:多长时间没有写操作,会发送心跳检测包检测是否连接
//allIdleTime:多长时间没有读写操作,会发送心跳检测包检测是否连接
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
//IdleStateHandler触发后,将传递给下一个Handler的userEventTriggered方法去处理
//通过自定义的Handler对空闲状态进一步处理
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent IdleStateEvent = (IdleStateEvent) evt;
String eventType = null;
switch (IdleStateEvent.state()){
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "-已超时,超时类型为: " + eventType );
System.out.println("Server will deal with it instantly...");
//发生空闲则关闭当前通道
ctx.close();
}
}
}
class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组
try {
Bootstrap bootstrap = new Bootstrap(); //客户端启动对象
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) //客户端通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器
}
});
System.out.println("Client Start Successful!!!");
ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();
sync.channel().closeFuture().sync(); //监听关闭通道
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
注意: 需要调整readerIdleTime|writerIdleTime|allIdleTime参数才会显示对应超时信息
Netty入门代码示例(基于WebSocket协议)
服务端
package com.bierce.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.time.LocalDateTime;
public class MyWebsocketServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//基于Http协议,所以需要http解码编码
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler()); //处理块方式的写操作
//http传输过程数据量非常大时会分段,而HttpObjectAggregator可以将多个分段聚合
pipeline.addLast(new HttpObjectAggregator(8192));
//webSocket采用帧方式传输数据
//WebSocketServerProtocolHandler作用是将http协议升级为ws协议,且保持长连接
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new MyWebsocketServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class MyWebsocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("Server receive the Info: " + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("Server time " + LocalDateTime.now() + " --- " + msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客户端连接成功 --" + ctx.channel().id().asLongText()); //asLongText唯一值
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("有客户端已经离开 --" + ctx.channel().id().asLongText()); //asLongText唯一值
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常Info:" + cause.getMessage());
ctx.close();
}
}
客户端(浏览器)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Websocket</title>
</head>
<body>
<script>
var socket;
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:9999/hello");
//相当于channelRead0,读取服务器端的消息
socket.onmessage = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
//开启连接
socket.onopen = function(ev){
var rt = document.getElementById("responseText");
rt.value = "开启连接成功!";
}
//连接关闭
socket.onclose = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭成功!";
}
}
//发送消息给服务器
function send(msg){
if(!window.socket){ //是否已创建socket
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(msg);
}else{
alert("socket未连接");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height:300px;width:300px"></textarea>
<input type="button" value="Send" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height:300px;width:300px"></textarea>
<input type="button" value="Clear" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
效果图