一、Netty简介
Netty 是一个异步事件驱动的网络通信应用框架,用于快速开发可维护的高性能服务器和客户端。简单地说Netty封装了JDK的NIO,不用再写一大堆复杂的代码,从NIO各种繁复的细节中脱离出来,让开发者重点关心业务逻辑。
二、Netty重要API
说明:大部分是模板代码,重点应该关注的是处理业务逻辑的Handler, 其按照角色不同分为ServerHandler和ClientHandler,按照数据处理流向不同,分为InboundHandler(对应接口是ChannelInboundHandlerAdapter)-处理输入数据,比如从客户端的角度,是处理从服务端发送过来的数据, 和OutboundHandler(对应接口是ChannelOutboundHandlerAdapter)-处理输出数据,比如从客户端的角度,是处理其发送给服务端的数据。
1、服务端
//用于接收客户端的连接请求的线程池
val bossGroup = new NioEventLoopGroup()
//用与处理客户端SocketChannel的网络读写的线程池
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)
//创建NioServerSocketChannel
.channel(classOf[NioServerSocketChannel])
//绑定事件处理类
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
// 日常主要要写的是ServerHandler逻辑
// 并且通常会添加多个ServerHandler
ch.pipeline().addLast(new ServerHandler1)
}
})
//绑定端口地址端口
bootstrap.bind(host, port)
2、客户端
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)
//创建NioSocketChannel
.channel(classOf[NioSocketChannel])
//绑定事件处理类
.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
// 日常主要要写的是ClientHandler逻辑
// 并且通常会添加多个ClientHandler
ch.pipeline().addLast(new ClientHandler1)
}
})
//发送连接操作
bootstrap.connect(host, port)
三、Handler执行顺序
如第二部分所述,日常开发主要是写Handler,用于处理不同的业务逻辑,并且通常需要添加多个Handler,那么多个Handler的执行顺序如何呢?示例如下
比如一个客户端添加了5个Handler, 其中1和2是inBound,3和4是OutBound,5是InboundOutBound,
该客户端向外发送的消息会依次经过如下Handle的处理:5->4->3,而其接收到的外部信息会依次经过如下Handle的处理:1->2->5。
四、应用举例
1、案例1
描述:客户端与服务端建立连接,会各自执行Handler中的channelActive方法
客户端:ClientHandler1d的channelActive方法被调用[已跟服务器建立连接]
服务端: ServerHandler1的channelActive方法被调用[一个客户端连接上]
(1) 服务端代码
① NettyServer
package top.doe.netty.demo1
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
class NettyServer1 {
def bind(host: String, port: Int): Unit = {
//用于接收客户端的连接请求的线程池
val bossGroup = new NioEventLoopGroup()
//用与处理客户端SocketChannel的网络读写的线程池
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)
//创建NioServerSocketChannel
.channel(classOf[NioServerSocketChannel])
//绑定事件处理类
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(new ServerHandler1)
}
})
//绑定端口地址端口
bootstrap.bind(host, port)
}
}
object NettyServer1 {
def main(args: Array[String]) {
val host = "localhost"
val port = 8888
val server = new NettyServer1
server.bind(host, port)
}
}
②ServerHandler
package top.doe.netty.demo1
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler1 extends ChannelInboundHandlerAdapter {
/**
* 有客户端与服务端建立连接后调用
*/
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("ServerHandler的channelActive方法被调用【一个客户端连接上】")
}
/**
* 有客户端与服务端断开连接后调用
*/
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
println("ServerHandler的channelInactive方法被调用【一个客户端与服务端断开连接了】")
}
/**
* 接受客户端发送来的消息
*/
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
println("ServerHandler的channelRead方法被调用【收到客户端发送的消息了】")
}
}
(2) 客户端代码
① NettyClient
package top.doe.netty.demo1
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
class NettyClient1 {
def connect(host: String, port: Int): Unit = {
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)
//创建NioSocketChannel
.channel(classOf[NioSocketChannel])
//绑定事件处理类
.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(new ClientHandler1)
}
})
//发送连接操作
bootstrap.connect(host, port)
}
}
object NettyClient1 {
def main(args: Array[String]) {
val host = "localhost"
val port = 8888
val client = new NettyClient1
client.connect(host, port)
}
}
②ClientHandler
package top.doe.netty.demo1
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler1 extends ChannelInboundHandlerAdapter {
/**
* 一旦跟服务端建立上连接,channelActive方法将被调用
*/
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("ClientHandler的channelActive方法被调用!【已经跟服务端连接上了】")
}
/**
* 服务端返回消息后,channelRead方法被调用,该方法用于接送服务端返回的消息
*/
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
println("ClientHandler的channelRead方法被调用!")
}
}
(3) 执行结果
>>客户端
>>服务端
2、案例2
在案例1的基础上,服务端和客户端给彼此发送一条消息。
(1) 基本流程
(2) 服务端代码
① NettyServer
package com.wakedata.demo2
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
class NettyServer2 {
def bind(host:String,port:Int) = {
val bossGroup = new NioEventLoopGroup()
val workerGroup = new NioEventLoopGroup()
val bootstrap = new ServerBootstrap()
bootstrap.group(bossGroup,workerGroup)
.channel(classOf[NioServerSocketChannel])
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(new ServerHandler2)
}
})
bootstrap.bind(host,port)
}
}
object NettyServer2{
def main (args: Array[String] ): Unit = {
val host = "localhost"
val port = 8888
val server2 = new NettyServer2
server2.bind(host,port)
}
}
②ServerHandler
package com.wakedata.demo2
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler2 extends ChannelInboundHandlerAdapter {
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("ServerHandler1的channelActive方法被调用[一个客户端连接上]")
}
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
println("ServerHandler1的channelInactive方法被调用[一个客户端断开连接]")
}
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
//接收客户端发送过来的消息
val byteBuf = msg.asInstanceOf[ByteBuf]
val bytes = new Array[Byte](byteBuf.readableBytes())
byteBuf.readBytes(bytes)
val message = new String(bytes, "UTF-8")
println("ServerHandler2的channelRead方法被调用[收到客户端发送的消息了]" + message)
//将数据发送到客户端
val back = "你好"
val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
ctx.writeAndFlush(resp)
}
}
(3) 客户端代码
① NettyClient
package com.wakedata.demo2
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
class NettyClient2 {
def connect(host:String,port:Int):Unit = {
//创建客户端的NIO线程组
val eventGroup = new NioEventLoopGroup()
//创建客户端辅助启动类
val bootstrap = new Bootstrap()
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)
.channel(classOf[NioSocketChannel])
//绑定事件处理类
.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast(new ClientHandler2)
}
})
//发送连接操作
bootstrap.connect(host,port)
}
}
object NettyClient2{
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = 8888
val client = new NettyClient2()
client.connect(host,port)
}
}
②ClientHandler
package com.wakedata.demo2
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler2 extends ChannelInboundHandlerAdapter{
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("ClientHandler2的channelActive方法被调用[已跟服务器建立连接]")
//向服务端发送消息
val msg = "hello"
ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes("UTF-8")))
}
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
//读取服务端返回的消息
val byteBuf = msg.asInstanceOf[ByteBuf]
val bytes = new Array[Byte](byteBuf.readableBytes())
byteBuf.readBytes(bytes)
val message = new String(bytes, "UTF-8")
print("ClientHandler2的channelRead方法被调用,接收到服务器端发送过来的消息:" + message)
}
}
(4) 执行结果
>> 客户端
>> 服务端
3、案例3
在案例2的基础上,添加多个Handler处理器(共计3个)。因为发送的case class对象,所以消息发出之前,会经过一个OurBoundHandler进行序列化,然后接受消息时,首先经过一个InBoundHandler进行解码,然后再经过另外一个 InBoundHandler对解码后的数据进行读取。
(1) 服务端代码
① NettyServer
package com.wakedata.demo3
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
class NettyServer3 {
def bind(host: String, port: Int): Unit = {
//配置服务端线程池组
//用于服务器接收客户端的连接
val bossGroup = new NioEventLoopGroup()
//用户进行SocketChannel的网络读写
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)
//创建NioServerSocketChannel
.channel(classOf[NioServerSocketChannel])
//绑定I/O事件处理类
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
//处理输入的数据执行顺序 decoder -> handler
//处理返回的数据执行顺序 encoder
ch.pipeline().addLast("encoder", new ObjectEncoder) //实现了ChannelOutboundHandler
ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader))) //实现了ChannelInboundHandler
ch.pipeline().addLast("handler", new ServerHandler3) //实现了ChannelInboundHandler
}
})
val channelFuture = bootstrap.bind(host, port)
channelFuture.syncUninterruptibly
}
}
object NettyServer3 {
def main(args: Array[String]) {
val host = "localhost"
val port = 8888
val server = new NettyServer3
server.bind(host, port)
}
}
② ServerHandler
package com.wakedata.demo3
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler3 extends ChannelInboundHandlerAdapter {
/**
* 有客户端建立连接后调用
*/
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("一个客户端连接上了...")
}
/**
* 接受客户端发送来的消息
*/
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
//进行模式匹配
msg match {
case RequestMsg(msg) => {
println("收到客户端发送的消息:" + msg)
//将数据发送到客户端
ctx.writeAndFlush(ResponseMsg("haha"))
}
}
}
}
③ ResponseMsg
package com.wakedata.demo3
case class ResponseMsg(msg: String)
(2) 服务端代码
① NettyClient
package com.wakedata.demo3
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
class NettyClient3 {
def connect(host: String, port: Int): Unit = {
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)
//创建NioSocketChannel
.channel(classOf[NioSocketChannel])
//绑定I/O事件处理类
.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline().addLast("encoder", new ObjectEncoder)
ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)))
ch.pipeline().addLast("handler", new ClientHandler3)
}
})
//发起异步连接操作
val channelFuture = bootstrap.connect(host, port)
channelFuture.syncUninterruptibly
}
}
object NettyClient3 {
def main(args: Array[String]) {
val host = "localhost"
val port = 8888
val client = new NettyClient3
client.connect(host, port)
}
}
② ClientHandler
package com.wakedata.demo3
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler3 extends ChannelInboundHandlerAdapter {
//一旦跟服务端建立上连接channelActive将被调用
override def channelActive(ctx: ChannelHandlerContext): Unit = {
println("已经跟服务端连接上了")
//向服务端发送case class实例
ctx.writeAndFlush(RequestMsg("hello"))
}
override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
msg match {
case ResponseMsg(msg) => {
println("收到服务端返回的消息:" + msg)
}
}
}
}
③ RequestMsg
package com.wakedata.demo3
case class RequestMsg(content: String)
(3) 执行结果
>>客户端
>>服务端