1.服务端构建
接收客户端请求,打印请求消息; 消息采用内置String作为编码与解码器; 开启信息输入监听线程,发送消息至客户端;
1.1 服务端消息处理类
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.ArrayList;
import java.util.List;
/**
* @author : luobei
* @date : 2024/10/23 11:16
* @Description : 处理类:处理channel接收到的消息
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public static List<Channel> channelList = new ArrayList<Channel>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channelList.add(ctx.channel());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务端收到消息:"+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("服务端读取数据异常:");
cause.printStackTrace();
ctx.close();
}
}
1.2 服务端启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* @author : luobei
* @date : 2024/10/23 11:03
*/
public class NettyServerProvider {
private int port;
public NettyServerProvider(int port){
this.port = port;
}
//netty服务端启动
public void start() throws InterruptedException {
//用来接收进来的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用来处理已经被接收的连接,bossGroup接收到连接就会把连接信息注册到workerGroup上
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//nio服务的启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//配置nio服务参数
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class) //说明一个新的Channel
.option(ChannelOption.SO_BACKLOG,128) //设置Tcp最大缓存连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持连接
.handler(new LoggingHandler(LogLevel.INFO)) //设置打印日志级别
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socket) throws Exception {
//管道注册handler
ChannelPipeline pipeline = socket.pipeline();
//编码通道处理
pipeline.addLast("decode",new StringDecoder());
//转码通道处理
pipeline.addLast("encode",new StringEncoder());
//处理接收到的请求
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("--------------服务端启动--------------");
//监听输入框消息并发送给所有客户端
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String msg = null;
msg = in.readLine();
if (NettyServerHandler.channelList.size()>0){
for (Channel channel : NettyServerHandler.channelList) {
channel.writeAndFlush(msg);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
//绑定端口,开始接收连接
ChannelFuture channelFuture = null;
channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServerProvider(8888).start();
}
}
2.客户端构建
发起请求,与服务端建立连接; 监听服务端下发消息,并将信息打印出来; 开启信息输入监听线程,将消息发送值服务端;
2.1 客户端消息处理类
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author : luobei
* @date : 2024/10/23 13:15
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public static Channel serverChannel = null;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
serverChannel = ctx.channel();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端收到消息:"+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("客户端读取数据异常:");
cause.printStackTrace();
ctx.close();
}
}
2.2 客户端启动类
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* @author : luobei
* @date : 2024/10/23 13:20
*/
public class NettyClientServer {
//要请求的IP地址
private String ip;
//服务器端口
private int port;
public NettyClientServer(String ip, int port){
this.ip = ip;
this.port = port;
}
//启动服务
private void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decode",new StringDecoder());
pipeline.addLast("encode",new StringEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("-----------客户端启动-----------");
ChannelFuture future = bootstrap.connect(ip,port).sync();
String msg = "客户端发起连接请求";
Channel channel = future.channel();
channel.writeAndFlush(msg);
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String msg = reader.readLine();
channel.writeAndFlush(msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
new NettyClientServer("127.0.0.1",8888).start();
}
}