前言:Netty 作为主流的nio 通信模型应用相当广泛,本文在spring-boot 项目中集成Netty,并实现客户端以及服务器端消息的接收和发送;本文是 Spring架构篇–2.7 远程通信基础–使用Netty 的扩展;
1 spring-boot jar包引入:引入的jar 和解释如下:
<!-- springboot-web 用于发送http 请求 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok 用于发送生成java 对象的get set 和构造方法 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<!-- lombok 依赖项目不被maven 传递 -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- netty jar :https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
<!-- Json 格式化 https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.31</version>
</dependency>
2 服务端和客户端:
2.1 服务端:
2.1.1 WebNettyServer 服务端:
import com.example.nettydemo.netty.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
public class WebNettyServer {
private final static Logger logger = LoggerFactory.getLogger(WebNettyServer.class);
private final static int PORT = 8888;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private EventLoopGroup extGroup;
private ChannelFuture channelFuture;
@PostConstruct
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
extGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器
pipeline.addLast("解码器",new StringDecoder());
// 添加编码器
pipeline.addLast("编码器",new StringEncoder());
// 添加自己的处理器
pipeline.addLast("ext",new NettyServerHandler());
// 如果业务调用比较耗时,为了不影响netty 服务端处理器请求的性能,可以使用另外的NioEventLoopGroup 进行handler 处理
// pipeline.addLast(extGroup,"ext",new NettyServerHandler());
}
})
// 连接请求时的等待队列
.option(ChannelOption.SO_BACKLOG,128)
// 设置TCP连接是否开启长连接
.childOption(ChannelOption.SO_KEEPALIVE,true);
// 绑定端口,开启服务
channelFuture = bootstrap.bind(PORT).sync();
if (channelFuture.isSuccess()) {
logger.debug("Netty server started on port {}", PORT);
}
}
@PreDestroy
public void stop() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
extGroup.shutdownGracefully();
}
}
这里对code做下简要的说明:
- 通过NioEventLoopGroup 分别定义了负责接收客户端连接的的bossGroup,定义负责客户端数据的读和服务端写回数据到客户端的workerGroup,定义额外的事件处理extGroup用于负责业务handler 比较耗时的操作;
- NioServerSocketChannel.class 用于服务端使用NioServerSocketChannel 对象,完成对channel 管道事件的处理;
- StringDecoder 用于对客户端数据的解码,StringEncoder 用于对服务端发送数据进行编码,NettyServerHandler 为业务类处理解码后的数据;
- ChannelOption.SO_BACKLOG 设置 ServerSocketChannel 的可连接队列大小,用于处理连接请求时的等待队列,即 TCP 握手完成后等待被应用程序处理的连接队列,在系统内核建立连接之前,连接请求会先进入队列中等待被处理。当 ServerSocketChannel 与客户端连接成功后,会创建一个新的 SocketChannel 与该客户端进行通信,如果队列满了,新的连接请求就会被拒绝。 不同操作系统下 backlog 参数的默认值可能不同,通常建议显式设置它的值。在 Windows 操作系统下,它的默认值为 200,而在 Linux 操作系统下,它的默认值为 128;
- ChannelOption.SO_KEEPALIVE是一个TCP参数,它用于设置TCP连接是否开启长连接。当设置为true时,TCP连接会在一定时间内没有数据传输时发送心跳包,以保持连接状态;当设置为false时,TCP连接会在一定时间内没有数据传输时直接关闭连接。
- bootstrap.bind(PORT).sync() 绑定端口,启动监听,sync 同步阻塞直到服务端启动成功;
- @Component 告诉spring 需要扫描改类并进行装配;
- @PostConstruct 告诉spring 改类被springboot 装备之后需要执行定义好的 start() 方法;
- @PreDestroy 告诉spring 在容器关闭之前需要执行定义好的stop() 方法,这里stop() 方法里对定义的事件轮询处理组进行优雅的关闭(优雅是指,事件轮询处理组在完成所有任务之后在进行关闭操作);
2.1.1 服务端 NettyServerHandler 业务处理:
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
private final static Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
logger.debug("Received message: {}", msg,ctx);
JSONObject json = JSONObject.parseObject(msg);
String type = json.getString("type");
Object content = json.get("content");
JSONObject resultJson = new JSONObject();
resultJson.put("id", json.getString("id"));
resultJson.put("type", type);
switch (type) {
case "user":
// 进入 userService 业务处理
JSONArray jsonArray;
Map<String, Object> map;
if (content instanceof JSONArray) {
jsonArray = (JSONArray) content;
// 处理 jsonArray
// ...
} else if (content instanceof JSONObject) {
map = (Map<String, Object>) content;
// 处理 map
// ...
}
resultJson.put("success", true);
break;
case "order":
// 进入 orderService 业务处理
resultJson.put("success", true);
break;
default:
resultJson.put("success", false);
resultJson.put("errorMsg", "unsupported type");
}
// 回写结果到客户端
log.debug("回写结果:{}到客户端",resultJson.toJSONString());
ctx.channel().writeAndFlush(resultJson.toJSONString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
code 比较简单,在服务端的处理器中,首先将接收到的字符串解析成一个 JSONObject 对象。然后根据 type 字段进行业务分发,可以在不同的分支中获取 content 字段并根据内容进行业务处理。处理完成后,将结果封装成一个新的 JSONObject 对象,并回写到客户端。需要注意的是,Netty 自带的 StringDecoder 和 StringEncoder 可以直接处理字符串,因此不需要手动进行 Json 转换。同时,回写结果时需要使用 ctx.channel().writeAndFlush() 方法,而不是之前的 channelFuture.channel().writeAndFlush()。
2.2 客户端:
2.2.1 客户端WebNettyClient:
import com.example.nettydemo.netty.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
@DependsOn("webNettyServer")
public class WebNettyClient {
private final static Logger logger = LoggerFactory.getLogger(WebNettyClient.class);
private final String host = "127.0.0.1"; // 服务端地址
private final int port = 8888; // 服务端端口
private ChannelFuture channelFuture; // 连接通道
private EventLoopGroup eventLoopGroup;
@PostConstruct
public void start() throws InterruptedException {
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加解码器
pipeline.addLast(new StringDecoder());
// 添加编码器
pipeline.addLast(new StringEncoder());
// 添加自己的处理器
pipeline.addLast(new NettyClientHandler());
}
});
// 连接服务端
channelFuture = bootstrap.connect(host, port).sync();
if (channelFuture.isSuccess()) {
logger.debug("连接服务端成功");
}
}
public void sendMessage(String message) {
channelFuture.channel().writeAndFlush(message);
}
@PreDestroy
public void stop() {
eventLoopGroup.shutdownGracefully();
}
}
2.2.1 客户端业务处理:
package com.example.nettydemo.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 接收到服务端消息时的处理
log.debug("Received from server:{} ", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端的code 基本和服务端一致,客户端接收了服务端的回写数据并进行了打印,其中@DependsOn(“webNettyServer”) 告诉spring 在装载客户端的bean 时,先要去装载服务端的bean;
2.3 web controller 客户端发送数据测试:
controller:
import com.alibaba.fastjson2.JSONObject;
import com.example.nettydemo.modules.user.dto.UserDto;
import com.example.nettydemo.netty.client.WebNettyClient;
import com.example.nettydemo.netty.data.NettyData;
import com.example.nettydemo.netty.server.WebNettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("user/")
public class UserController {
@Autowired
private WebNettyClient webNettyClient;
/**
* 客户端信息发送
* @return
*/
@GetMapping("client/msg")
public boolean testClientUser(){
// 处理业务
NettyData data = dealAndGetClientData();
webNettyClient.sendMessage(JSONObject.toJSONString(data));
return true;
}
private NettyData dealAndGetClientData() {
NettyData data = new NettyData();
data.setId("1").setType("user-client").setContent(new UserDto().setUserId("001").setUserName(" haha"));
return data;
}
}
UserDto:
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
@Data
@Accessors(chain = true)
public class UserDto implements Serializable {
private String userId;
private String userName;
}
NettyData:
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
@Data
@Accessors(chain = true)
public class NettyData implements Serializable {
private String id;
private String type;
private Object content;
}
可以看到服务端和客户端的结果接收情况:
3 总结:
- Netty 的服务端,通过定义bossGroup对客户端的accept 事件进行处理,然后通过定义的workerGroup 进行具体channel 内读写事件发生的处理;
- Netty在read数据时,会依次从头到尾调用 ChannelInboundHandlerAdapter 的hadler 进行数据的处理;
- Netty在write 数据时,如果使用SocketChannel ch进行数据写入,会依次从尾到头调用ChannelOutboundHandlerAdapter 的handler 进行数据处理,如果通过ChannelHandlerContext ctx会从当前处理器,向前找ChannelOutboundHandlerAdapter 的handler 进行数据处理;
- Netty 对于事件的处理是通过SocketChannel channel 中的pipeline 中每个handler 进行的处理,而pipeline 中维护的是handler 的双向链表;
4 git 项目代码:
https://codeup.aliyun.com/61cd21816112fe9819da8d9c/netty-demo.git
5 扩展:
5.1 使用EmbeddedChannel 来测试chanel 中handler 的处理走向:
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* 测试出入handler
*/
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 =new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 =new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1,h2,h3,h4);
// 测试read
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello read".getBytes(StandardCharsets.UTF_8)));
// 测试write
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello write".getBytes(StandardCharsets.UTF_8)));
}
}