1.引入netty依赖
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.92.Final</version> </dependency> </dependencies>
2.准备一个服务端ChatServer和一个服务端处理器ChatServerHandler和若干个客户端ChatClient(有几个用户设置几个)和一个客户端处理器ChatClientHandler
服务端 public class ChatServer { public static void main(String[] args) { // 新建两个事件循环组,bossGroup 用于监听客户端的连接请求,将连接请求发送给 workerGroup 用于处理客户端连接的数据读写 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ChatServerHandler()); } }); // 服务端绑定9000端口 serverBootstrap.bind(9000); } public void test(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); nioSocketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); nioSocketChannel.pipeline().addLast(new ChatServerHandler()); } }); serverBootstrap.bind(9001); } }
服务端处理器
public class ChatServerHandler extends ChannelInboundHandlerAdapter { // 创建一个list集合存储连接上的所有客户端channel private static List<Channel> channels = new ArrayList<>(); // 当有客户端连接上服务端,底层会调用此方法,执行此方法的逻辑 // 这里大概的处理逻辑是:先添加新客户端channel到集合中,然后循环遍历list集合 // 然后根据不同的channel发送不同的系统消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channels.add(channel); channels.forEach(ch -> { if (ch == channel) { channel.writeAndFlush("恭喜您,上线成功"); } else { ch.writeAndFlush("系统消息:[" + ch.remoteAddress() + "]客户端已上线"); } }); System.out.println("客户端[" + channel.remoteAddress() + "]请求连接"); } // 当有客户端连断开连接,底层会调用此方法,执行此方法的逻辑 // 这里大概的处理逻辑是:然后循环遍历list集合,找到已断开连接的channel并删除 // 向集合内的channel发送系统消息 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); Iterator<Channel> iterator = channels.iterator(); while (iterator.hasNext()) { Channel ch = iterator.next(); if (ch == channel) { iterator.remove(); } ch.writeAndFlush("系统消息:[" + channel.remoteAddress() + "]客户端已下线"); } System.out.println("客户端[" + channel.remoteAddress() + "]断开连接"); } // 当有客户端发送消息到服务端,底层会调用此方法,执行此方法的逻辑 // 这里大概的处理逻辑是:循环遍历向集合中的channel(除发送消息的客户端)发送消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(System.currentTimeMillis()); String now = formatter.format(date); System.out.println(now+"收到用户[" + channel.remoteAddress() + "]发来的消息:" + msg.toString()); channels.forEach(ch -> { if (ch != channel) { ch.writeAndFlush("消息时间:" + now + " 用户[" + channel.remoteAddress() + "]说:" + msg.toString()); } }); } //异常触发 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客户端
public class ChatClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup); bootstrap.channel(NioSocketChannel.class); // 设置处理器 bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 将字符串编解码器及客户端处理器添加到pipeline中 ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); ch.pipeline().addLast(new ChatClientHandler()); } }); // 连接服务端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000); channelFuture.sync(); // 获取客户端输入的内容,并发送至服务端(因为添加了字符串编解码器,所以此处可以直接发送字符串) Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { channelFuture.channel().writeAndFlush(scanner.nextLine()); } } }
客户端处理器
public class ChatClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
如果需要多个客户端用户,将客户端多复制几个就可以了.
启动时先启动服务端再启动客户端