目录标题
- 1、前期知识科普
- 1.1 NIO 基本概念
- 1.2 java BIO与NIO对比
- 1.3 Reactor 模型
- 2、Netty 基础概念
- 2.1 Netty 简介
- 2.2 Netty 执行流程
- 2.3 Netty 核心组件
- 3、Netty Demo编写
- 3.1 总体框架
- 3.2 具体代码
- 4、交流群
1、前期知识科普
1.1 NIO 基本概念
阻塞(Block)与非阻塞(Non-Block)
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。
-
阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则一直等待在那里。
-
非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。
阻塞 IO :
非阻塞 IO :
同步(Synchronous)与异步(Asynchronous)
同步和异步都是基于应用程序和操作系统处理 IO 事件所采用的方式。比如
同步:是应用程序要直接参与 IO 读写的操作。
异步:所有的 IO 读写交给操作系统去处理,应用程序只需要等待通知。
同步方式在处理 IO 事件的时候,必须阻塞在某个方法上面等待我们的 IO 事件完成(阻塞 IO 事件或者通过轮询 IO事件的方式),对于异步来说,所有的 IO 读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的 IO 操作,当操作完成 IO 后,会给我们的应用程序一个通知。
所以异步相比较于同步带来的直接好处就是在我们处理IO数据的时候,异步的方式我们可以把这部分等待所消耗的资源用于处理其他事务,提升我们服务自身的性能。
同步 IO :
异步 IO :
1.2 java BIO与NIO对比
BIO(传统IO):
BIO是一个同步并阻塞的IO模式,传统的 java.io 包,它基于流模型实现,提供了我们最熟知的一些 IO 功能,比如File抽象、输入输出流等。交互方式是同步、阻塞的方式,也就是说,在读取输入流或者写入输出流时,在读、写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。
NIO(Non-blocking/New I/O)
NIO 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发
BIO与NIO的对比
NIO 的 Server 通信的简单模型:
BIO 的 Server 通信的简单模型:
NIO的特点:
一个线程可以处理多个通道,减少线程创建数量;
读写非阻塞,节约资源:没有可读/可写数据时,不会发生阻塞导致线程资源的浪费
1.3 Reactor 模型
单线程的 Reactor 模型
多线程的 Reactor 模型
多线程主从 Reactor 模型
2、Netty 基础概念
2.1 Netty 简介
Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。
“快速简便”并不意味着最终的应用程序将遭受可维护性或性能问题的困扰。Netty 经过精心设计,结合了许多协议(例如FTP,SMTP,HTTP 以及各种基于二进制和文本的旧式协议)的实施经验。结果,Netty 成功地找到了一种无需妥协即可轻松实现开发,性能,稳定性和灵活性的方法。
2.2 Netty 执行流程
2.3 Netty 核心组件
Channel
Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。
EventLoop 与 EventLoopGroup
EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。
ServerBootstrap 与 Bootstrap
Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。
Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。
ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。
ChannelHandler 与 ChannelPipeline
ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。
Netty 的异步编程模型都是建立在 Future 与回调概念之上的。
3、Netty Demo编写
3.1 总体框架
3.2 具体代码
Maven 依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
Netty客户端
- NettyClient类
public class NettyClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private String host;
private int port;
private static final Bootstrap bootstrap;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
static {
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline
.addLast(new CommonDecoder())
.addLast(new CommonEncoder(new JsonSerializer()))
.addLast(new NettyClientHandler());
}
});
}
@Override
public Object sendRequest(Request request) {
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
logger.info("客户端连接到服务器 {}:{}", host, port);
Channel channel = future.channel();
if (channel != null) {
channel.writeAndFlush(request).addListener(future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", request.toString()));
} else {
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
channel.closeFuture().sync();
AttributeKey<Response> key = AttributeKey.valueOf("Response");//注意这里的发送是非阻塞的,所以发送后会立刻返回,而无法得到结果。这里通过 AttributeKey 的方式阻塞获得返回结果:
logger.info("key=====" + key);
Response response = channel.attr(key).get();
logger.info("rpcResponse.getData()========" + response.getData());
return response.getData();
}
} catch (InterruptedException e) {
logger.error("发送消息时有错误发生: ", e);
}
return null;
}
}
- NettyClientHandler类
public class NettyClientHandler extends SimpleChannelInboundHandler<Response> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Response msg) throws Exception {
try {
logger.info(String.format("客户端接收到消息: %s", msg));
AttributeKey<Response> key = AttributeKey.valueOf("Response");//接收返回的值
ctx.channel().attr(key).set(msg);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}
}
Netty服务
- NettyServer类
public class NettyServer implements RpcServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
@Override
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
logger.info("开始启动服务器,端口为:"+port);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("启动服务器时有错误发生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- NettyServerHandler类
public class NettyServerHandler extends SimpleChannelInboundHandler<Request> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static ServiceRegistry serviceRegistry;
static {
serviceRegistry = new ServiceRegistryImpl();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Exception {
try {
logger.info("服务器接收到请求: {}", msg);
String methodName = msg.getMethodName();
String result = serviceRegistry.getService(methodName);
ChannelFuture future = ctx.writeAndFlush(Response.success(result));
future.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("处理过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}
}
service
- NettyClient
public interface NettyClient {
Object sendRequest(Request request);
}
- NettyServer
public interface NettyServer {
void start(int port);
}
- ServiceRegistry
public interface ServiceRegistry {
<T> void register(String methodName);
String getService(String serviceName);
}
Impl
- ServiceRegistry Impl类 服务注册
public class ServiceRegistryImpl implements ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private static final Map<String, String> serviceMap = new ConcurrentHashMap<>();
private static final Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public synchronized <T> void register(String methodName) {
logger.info("service1==="+methodName);
if(registeredService.contains(methodName)) return;
registeredService.add(methodName);
serviceMap.put(methodName,methodName+"执行结果返回");
logger.info("方法: {} 执行结果: {}", methodName, methodName+"执行结果返回");
}
@Override
public synchronized String getService(String methodName) {
String result = serviceMap.get(methodName);
if(result == null) {
throw new NettyException(NettyError.SERVICE_NOT_FOUND);
}
return result;
}
}
endecode包序列化
- CommonDecoder类
public class CommonDecoder extends ReplayingDecoder {
private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if(magic != MAGIC_NUMBER) {
logger.error("不识别的协议包: {}", magic);
throw new NettyException(NettyError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if(packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = Request.class;
} else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = Response.class;
} else {
logger.error("不识别的数据包: {}", packageCode);
throw new NettyException(NettyError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if(serializer == null) {
logger.error("不识别的反序列化器: {}", serializerCode);
throw new NettyException(NettyError.UNKNOWN_SERIALIZER);
}
logger.info("序列化器为:"+serializer);
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}
}
- CommonEncoder类
public class CommonEncoder extends MessageToByteEncoder {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private final CommonSerializer serializer;
public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if(msg instanceof Request) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
- CommonSerializer
public interface CommonSerializer {
byte[] serialize(Object obj);
Object deserialize(byte[] bytes, Class<?> clazz);
int getCode();
static CommonSerializer getByCode(int code) {
switch (code) {
case 1:
return new JsonSerializer();
default:
return null;
}
}
}
- JsonSerializer
public class JsonSerializer implements CommonSerializer {
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
logger.error("serialize序列化时有错误发生:", e);
throw new SerializeException("序列化时有错误发生");
}
}
@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try {
Object obj = objectMapper.readValue(bytes, clazz);
// if (obj instanceof Request) {
// obj = handleRequest(obj);
// }
return obj;
} catch (IOException e) {
logger.error("deserialize序列化时有错误发生:", e);
throw new SerializeException("序列化时有错误发生");
}
}
/*
这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
需要重新判断处理
*/
// private Object handleRequest(Object obj) throws IOException {
// Request request = (Request) obj;
// for (int i = 0; i < request.getParamTypes().length; i++) {
// Class<?> clazz = request.getParamTypes()[i];
// if (!clazz.isAssignableFrom(request.getParameters()[i].getClass())) {
// byte[] bytes = objectMapper.writeValueAsBytes(request.getParameters()[i]);
// request.getParameters()[i] = objectMapper.readValue(bytes, clazz);
// }
// }
// return request;
// }
@Override
public int getCode() {
return SerializerCode.valueOf("JSON").getCode();
}
}
entity包请求返回类
- Request类
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Request implements Serializable {
/**
* 待调用方法名称
*/
private String methodName;
}
- Response类
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Response<T> implements Serializable {
/**
* 响应对应的请求号
*/
private String requestId;
/**
* 响应状态码
*/
private Integer statusCode;
/**
* 响应状态补充信息
*/
private String message;
/**
* 响应数据
*/
private T data;
public static <T> Response<T> success(T data, String requestId) {
Response<T> response = new Response<>();
response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> Response<T> success(T data) {
Response<T> response = new Response<>();
// response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> Response<T> fail(ResponseCode code, String requestId) {
Response<T> response = new Response<>();
response.setRequestId(requestId);
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
public static <T> Response<T> fail(ResponseCode code) {
Response<T> response = new Response<>();
// response.setRequestId(requestId);
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
}
enumeration包枚举类
- NettyError
@AllArgsConstructor
@Getter
public enum NettyError {
UNKNOWN_ERROR("出现未知错误"),
SERVICE_SCAN_PACKAGE_NOT_FOUND("启动类ServiceScan注解缺失"),
CLIENT_CONNECT_SERVER_FAILURE("客户端连接服务端失败"),
SERVICE_INVOCATION_FAILURE("服务调用出现失败"),
SERVICE_NOT_FOUND("找不到对应的服务"),
SERVICE_NOT_IMPLEMENT_ANY_INTERFACE("注册的服务未实现接口"),
UNKNOWN_PROTOCOL("不识别的协议包"),
UNKNOWN_SERIALIZER("不识别的(反)序列化器"),
UNKNOWN_PACKAGE_TYPE("不识别的数据包类型"),
SERIALIZER_NOT_FOUND("找不到序列化器"),
RESPONSE_NOT_MATCH("响应与请求号不匹配"),
FAILED_TO_CONNECT_TO_SERVICE_REGISTRY("连接注册中心失败"),
REGISTER_SERVICE_FAILED("注册服务失败");
private final String message;
}
- PackageType
@AllArgsConstructor
@Getter
public enum PackageType {
REQUEST_PACK(0),
RESPONSE_PACK(1);
private final int code;
}
- ResponseCode
@AllArgsConstructor
@Getter
public enum ResponseCode {
SUCCESS(200, "调用方法成功"),
FAIL(500, "调用方法失败"),
METHOD_NOT_FOUND(500, "未找到指定方法"),
CLASS_NOT_FOUND(500, "未找到指定类");
private final int code;
private final String message;
}
- SerializerCode 字节流中标识序列化和反序列化器
@AllArgsConstructor
@Getter
public enum SerializerCode {
KRYO(0),
JSON(1),
HESSIAN(2),
PROTOBUF(3);
private final int code;
}
exception 包 异常类
- NettyException
public class NettyException extends RuntimeException {
public NettyException(NettyError error, String detail) {
super(error.getMessage() + ": " + detail);
}
public NettyException(String message, Throwable cause) {
super(message, cause);
}
public NettyException(NettyError error) {
super(error.getMessage());
}
}
- SerializeException
public class SerializeException extends RuntimeException {
public SerializeException(String msg) {
super(msg);
}
}
Test 包测试类
- NettyTestServer
public class NettyTestServer {
public static void main(String[] args) {
ServiceRegistry registry = new ServiceRegistryImpl();
registry.register("方法A");
NettyServer server = new NettyServer();
server.start(9999);
}
}
- NettyTestClient
public class NettyTestClient {
public static void main(String[] args) {
Request request = new Request();
request.setMethodName("方法A");
NettyClient nettyClient = new NettyClient("127.0.0.1", 9999);
System.out.println(nettyClient.sendRequest(request));
}
}
测试结果,先启动服务端再启动客户端
服务端:
客户端:
4、交流群