SpringBoot 集成 Netty
文章目录
SpringBoot 集成 Netty 背景描述 Netty与SpringBoot整合关注点 Netty组件 Bootstrap、ServerBootstrap Channel EventLoop、EventLoopGroup ChannelHandler ChannelPipeline ByteBuf
Pom依赖 Yml 配置 整合Netty步骤
背景描述
如果需要在SpringBoot
开发的app
中,提供Socket
服务,那么Netty
是不错的选择。
Netty与SpringBoot整合关注点
Netty
跟Springboot
生命周期保持一致,同生共死Netty
能用上ioc
中的Bean
Netty
能读取到全局的配置
Netty组件
Bootstrap、ServerBootstrap
帮助 Netty
使用者更加方便地组装和配置 Netty
,也可以更方便地启动 Netty
应用程序 Bootstrap
用于启动一个 Netty TCP
客户端,或者 UDP
的一端。ServerBootstrap
往往是用于启动一个 Netty
服务端。
Channel
Channel
是 Netty
网络操作抽象类,它除了包括基本的 I/O
操作,如 bind、connect、read、write
之外,还包括了 Netty
框架相关的一些功能,如获取该 Channel
的 EventLoop
。其实就是我们平常网络编程中经常使用的socket
套接字对象
EventLoop、EventLoopGroup
EventLoop
定义了Netty
的核心对象,用于处理IO
事件,多线程模型、并发一个EventLoopGroup
包含一个或者多个EventLoop
一个EventLoop
在它的生命周期内只和一个Thread
绑定 所有有EventLoop
处理的I/O
事件都将在它专有的Thread
上被处理 一个Channel
在它的生命周期内只注册于一个EventLoop
一个EventLoop
可能会被分配给一个或者多个Channel
ChannelHandler
ChannelHandler
其实就是用于负责处理接收和发送数据的的业务逻辑,Netty
中可以注册多个handler
,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。ChannelHandler
主要用于对出站和入站数据进行处理,它有两个重要的子接口:
ChannelInboundHandler
——处理入站数据ChannelOutboundHandler
——处理出站数据
ChannelPipeline
ChannelPipeline
是ChannelHandler
的容器,通过ChannelPipeline
可以将ChannelHandler
组织成一个逻辑链,该逻辑链可以用来拦截流经Channel
的入站和出站事件,当 Channel
被创建时,它会被自动地分配到它的专属的 ChannelPipeline
。
ByteBuf
ByteBuf
就是字节缓冲区,用于高效处理输入输出。
Pom依赖
引入springboot starter web
和 netty
< ! -- SpringBoot 初始化依赖 -- >
< dependency>
< groupId> org. springframework. boot< / groupId>
< artifactId> spring- boot- starter- web< / artifactId>
< version> 2.3 .5 . RELEASE < / version>
< / dependency>
< ! -- https: / / mvnrepository. com/ artifact/ io. netty/ netty- all -- >
< dependency>
< groupId> io. netty< / groupId>
< artifactId> netty- all< / artifactId>
< version> 4.1 .85 . Final< / version>
< / dependency>
Yml 配置
server :
port : 2345
netty :
websocket :
port : 1024
ip : 0.0.0.0
max-frame-size : 10240
path : /channel
整合Netty步骤
服务端
使用 SpringBoot Runner
机制启动 Netty
服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {
@Resource
private SocketServer socketServer;
@Override
public void run ( ApplicationArguments args) {
this . socketServer. start ( ) ;
}
}
@Component
public class SocketServer {
private static final Logger logger = LoggerFactory . getLogger ( SocketServer . class ) ;
private ServerBootstrap serverBootstrap;
@Autowired
private SocketInitializer socketInitializer;
@Value ( "${netty.websocket.port}" )
private int port;
public void start ( ) {
this . init ( ) ;
this . serverBootstrap. bind ( this . port) ;
logger. info ( "Netty started on port: {} (TCP) with boss thread {}" , this . port, 2 ) ;
}
private void init ( ) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup ( 2 ) ;
NioEventLoopGroup workerGroup = new NioEventLoopGroup ( ) ;
this . serverBootstrap = new ServerBootstrap ( ) ;
this . serverBootstrap. group ( bossGroup, workerGroup) . channel ( NioServerSocketChannel . class ) . childHandler ( this . socketInitializer) ;
}
}
public class SocketHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory . getLogger ( SocketHandler . class ) ;
public static final ChannelGroup clients = new DefaultChannelGroup ( GlobalEventExecutor . INSTANCE ) ;
@Override
public void channelRead ( ChannelHandlerContext ctx, Object msg) {
byte [ ] data = ( byte [ ] ) msg;
log. info ( "收到消息: " + new String ( data) ) ;
for ( Channel client : clients) {
if ( ! client. equals ( ctx. channel ( ) ) ) {
client. writeAndFlush ( data) ;
}
}
}
@Override
public void handlerAdded ( ChannelHandlerContext ctx) {
log. info ( "新的客户端链接:" + ctx. channel ( ) . id ( ) . asShortText ( ) ) ;
clients. add ( ctx. channel ( ) ) ;
}
@Override
public void handlerRemoved ( ChannelHandlerContext ctx) {
clients. remove ( ctx. channel ( ) ) ;
}
@Override
public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause) {
cause. printStackTrace ( ) ;
ctx. channel ( ) . close ( ) ;
clients. remove ( ctx. channel ( ) ) ;
}
}
@Component
public class SocketInitializer extends ChannelInitializer < SocketChannel > {
@Override
protected void initChannel ( SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel. pipeline ( ) ;
pipeline. addLast ( new ByteArrayDecoder ( ) ) ;
pipeline. addLast ( new ByteArrayEncoder ( ) ) ;
pipeline. addLast ( new SocketHandler ( ) ) ;
}
}
客户端
public class ChatClient {
public void start ( String name) throws IOException {
SocketChannel socketChannel = SocketChannel . open ( new InetSocketAddress ( "127.0.0.1" , 1024 ) ) ;
socketChannel. configureBlocking ( false ) ;
Selector selector = Selector . open ( ) ;
socketChannel. register ( selector, SelectionKey . OP_READ ) ;
new Thread ( new ClientThread ( selector) ) . start ( ) ;
Scanner scanner = new Scanner ( System . in) ;
while ( scanner. hasNextLine ( ) ) {
String message = scanner. next ( ) ;
if ( StringUtils . hasText ( message) ) {
socketChannel. write ( StandardCharsets . UTF_8 . encode ( name + ": " + message) ) ;
}
}
}
private class ClientThread implements Runnable {
private final Logger logger = LoggerFactory . getLogger ( ClientThread . class ) ;
private final Selector selector;
public ClientThread ( Selector selector) {
this . selector = selector;
}
@Override
public void run ( ) {
try {
while ( true ) {
int channels = selector. select ( ) ;
if ( channels == 0 ) {
continue ;
}
Set < SelectionKey > selectionKeySet = selector. selectedKeys ( ) ;
Iterator < SelectionKey > keyIterator = selectionKeySet. iterator ( ) ;
while ( keyIterator. hasNext ( ) ) {
SelectionKey selectionKey = keyIterator. next ( ) ;
keyIterator. remove ( ) ;
if ( selectionKey. isReadable ( ) ) {
handleRead ( selector, selectionKey) ;
}
}
}
} catch ( IOException e) {
logger. error ( e. getMessage ( ) , e) ;
}
}
}
private void handleRead ( Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = ( SocketChannel ) selectionKey. channel ( ) ;
ByteBuffer byteBuffer = ByteBuffer . allocate ( 1024 ) ;
StringBuilder message = new StringBuilder ( ) ;
if ( channel. read ( byteBuffer) > 0 ) {
byteBuffer. flip ( ) ;
message. append ( StandardCharsets . UTF_8 . decode ( byteBuffer) ) ;
}
channel. register ( selector, SelectionKey . OP_READ ) ;
System . out. println ( message) ;
}
}
public static void main ( String [ ] args) throws IOException {
new ChatClient ( ) . start ( "张三" ) ;
}