Java网络编程(二)NIO和Netty实现多人聊天功能

news2025/1/16 14:09:33

NIO实现

服务端

package com.bierce.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
//服务器端
public class NIOChatServer {
public static void main(String[] args) {
	try {
		new NIOChatServer().startServer(); //启动服务器
	} catch (IOException e) {
		throw new RuntimeException(e);
	}
}
//服务端启动方法
public void startServer() throws IOException {
	//1. 创建Selector选择器
	Selector selector = Selector.open();
	//2.创建ServerSocketChannel通道
	ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
	//3.channel通道绑定监听端口,并设置为非阻塞模式
	serverSocketChannel.bind(new InetSocketAddress(8000));
	serverSocketChannel.configureBlocking(false);
	//4.channel注册到selector上,设置为就绪接收状态
	serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
	System.out.println("Server starts Successfully!");
	//5.自旋,实时监听客户端状态
	for (;;) {
		//获取所有就绪的Channel
		int readChannels = selector.select();
		if (readChannels == 0){ //没有接入的客户端
			continue;
		}
		Set<SelectionKey> selectionKeys = selector.selectedKeys();
		//遍历可用的Channel
		Iterator<SelectionKey> iterator = selectionKeys.iterator();
		while (iterator.hasNext()){
			SelectionKey selectionKey = iterator.next();
			//6.根据就绪状态,调用对应方法实现具体业务操作
			//6.1 accept状态
			if (selectionKey.isAcceptable()){
				acceptOperator(serverSocketChannel,selector);
			}
			//6.2 readable状态
			if (selectionKey.isReadable()){
				readOperator(selector,selectionKey);
			}
			iterator.remove(); //获取成功后没有必要保留需要移除
		}
	}
}
//处理可读状态的方法实现
private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
	//从SelectionKey获取到已经就绪的通道
	SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
	ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
	//自旋读取客户端信息
	int readLength = socketChannel.read(byteBuffer);
	String message = "";
	if (readLength > 0){
		byteBuffer.flip(); //切换读模式
		message += Charset.forName("UTF-8").decode(byteBuffer);
	}
	//注册channel到selector,监听可读状态
	socketChannel.register(selector, SelectionKey.OP_READ);
	if (message.length() > 0){
		//把客户端发送过来的信息广播到其他客户端
		System.out.println(message);
		castOtherClients(message,selector,socketChannel);
	}
}
//把客户端发送的消息广播给其他客户端
private void castOtherClients(String message, Selector selector, SocketChannel socketChannel) throws IOException {
	//获取所有已经接入的Channel
	Set<SelectionKey> selectionKeys = selector.keys();
	//对除自己以外的channel进行广播
	for (SelectionKey selectionKey:selectionKeys) {
		//获取每个Channel
	   Channel targetChannel = selectionKey.channel();
	   if (targetChannel instanceof SocketChannel && targetChannel != socketChannel){ //排除服务器以及发送方客户端自己
		   ((SocketChannel) targetChannel).write(Charset.forName("UTF-8").encode(message));
	   }
	}
}
//处理可接收状态的方法实现
private void acceptOperator(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
	//接入状态:创建socketChannel
	SocketChannel socketChannel = serverSocketChannel.accept();
	//socketChannel设置为非阻塞并注册到selector上
	socketChannel.configureBlocking(false);
	socketChannel.register(selector,SelectionKey.OP_READ);
	//回复给客户端信息
	socketChannel.write(Charset.forName("UTF-8")
			.encode("Welcome to MyChatRoom, Please notice your Info!")); //UTF-8编码
}
}

客户端

package com.bierce.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
//客户端实现
public class NIOChatClient {
public void startClient(String name) throws IOException {
	//客户端连接服务器端
	SocketChannel socketChannel =
			SocketChannel.open(new InetSocketAddress("127.0.0.1",8000));
	//接收服务器端响应数据
	Selector selector = Selector.open();
	socketChannel.configureBlocking(false);
	socketChannel.register(selector, SelectionKey.OP_READ);
	//创建客户端线程
	new Thread(new ClientThread(selector)).start();
	//模拟向服务器发送消息
	Scanner sc = new Scanner(System.in);
	while (sc.hasNextLine()){
		String msg = sc.nextLine();
		if (msg.length() >0){
			socketChannel.write(Charset.forName("UTF-8").encode(name + ":" + msg));
		}
	}
}
}
class ClientThread implements Runnable{
    private Selector selector;
    public ClientThread(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            for (;;) {
                //获取所有就绪的Channel
                int readChannels = selector.select();
                if (readChannels == 0){ //没有接入的客户端
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                //遍历可用的Channel
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove(); //获取成功后没有必要保留需要移除
                    //readable状态
                    if (selectionKey.isReadable()){
                        readOperator(selector,selectionKey);
                    }
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //处理可读状态的方法实现
    private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
        //从SelectionKey获取到已经就绪的通道
        SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //自旋读取客户端信息
        int readLength = socketChannel.read(byteBuffer);
        String message = "";
        if (readLength > 0){
            byteBuffer.flip(); //切换读模式
            message += Charset.forName("UTF-8").decode(byteBuffer);
        }
        //注册channel到selector,监听可读状态
        socketChannel.register(selector, SelectionKey.OP_READ);
        if (message.length() > 0){
            System.out.println(message); //该客户端控制台输出服务端发送过来的信息
        }
    }
}
//客户端A
package com.bierce.io;
import java.io.IOException;
public class TestAClient {
    public static void main(String[] args) {
        try {
            new NIOChatClient().startClient("A");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
//客户端B
package com.bierce.io;
import java.io.IOException;
public class TestBClient {
    public static void main(String[] args) {
        try {
            new NIOChatClient().startClient("B");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

效果图

Netty实现

服务端

package com.bierce.io.netty.chatGroup;
/**
 * 简易群聊功能Netty服务端实现
 */
public class ChatGroupServer {
    private int port;
    private String nowTime = "北京时间-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss")) + ": ";
    public ChatGroupServer(int port) {
        this.port = port;
    }
    public void run(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //默认初始8个线程

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) //设置为服务端通道
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ChatGroupServerHandler(nowTime)); //自定义业务处理handler
                        }
                    });
            System.out.println(nowTime + "NettyServer start Successful !!!");
            ChannelFuture cf = serverBootstrap.bind(port).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new ChatGroupServer(9999).run();
    }
}
class ChatGroupServerHandler extends SimpleChannelInboundHandler<String> {
    //定义Channel组,管理所有的Channel,其中GlobalEventExecutor.INSTANCE为全局事件执行器
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private String nowTime; //北京时间
    public ChatGroupServerHandler(String nowTime) {
        this.nowTime = nowTime;
    }
    //channelActive方法表示活动状态,如提示xxx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(nowTime + "[客户端]" + ctx.channel().remoteAddress() + " 上线了~\n");
    }
    //channelActive方法表示非活动状态,如提示xxx离线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(nowTime + "[客户端]" + ctx.channel().remoteAddress() + " 已离线~\n");
    }
    //handlerAdded方法表示连接建立后立刻执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();;
        //将该客户端发送的消息进行广播,writeAndFlush方法会遍历所有Channel并发送消息
        channelGroup.writeAndFlush(nowTime + "[客户端]" + channel.remoteAddress() + " 进入聊天室\n");
        channelGroup.add(channel);
        System.out.println(nowTime + "有新进来的客户端,当前在线客户端数量=" + channelGroup.size());
    }
    //handlerRemoved方法表示断开连接,如将该客户端离开信息发送给在线的其他客户端
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush(nowTime + "[客户端]" + ctx.channel().remoteAddress() + " 已离开了~\n");
        System.out.println(nowTime + "有刚离开的客户端,当前在线客户端数量=" + channelGroup.size());
    }
    //读取客户端信息并作出响应
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            if (channel != ch){ //给非自己的其他客户端广播信息
                ch.writeAndFlush(nowTime + "[客户端]" + ctx.channel().remoteAddress() + "发送了消息:" + msg + "\n");
            }else {
                ch.writeAndFlush(nowTime + "[客户端自己]" + ctx.channel().remoteAddress() + "发送了消息:" + msg + "\n");
            }
        });
    }
    //发生异常时关闭该通道Channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

package com.bierce.io.netty.chatGroup;
/**
 * 简易群聊功能Netty客户端实现
 */
public class ChatGroupClient {
    private String host;
    private int port;
    private String nowTime = "北京时间-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss")) + ": ";
    public ChatGroupClient(String host, int port){
        this.host = host;
        this.port = port;
    }
    public void run(){
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ChatGroupClientHandler(nowTime)); //自定义业务处理handler
                        }
                    });
            ChannelFuture cf = bootstrap.connect(host, port).sync();
            Channel channel = cf.channel();
            System.out.println(nowTime + "Client"+ channel.localAddress()+ " start Successful!!!");
            Scanner sc = new Scanner(System.in);
            while (sc.hasNextLine()){
                String msg = sc.nextLine();
                channel.writeAndFlush(msg + "\r\n");
            }
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new ChatGroupClient("127.0.0.1",9999).run();
    }
}
class ChatGroupClientHandler extends SimpleChannelInboundHandler<String> {
    private String nowTime;
    public ChatGroupClientHandler(String nowTime) {
        this.nowTime = nowTime;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(nowTime + msg.trim() + "{,前面消息都是服务端广播的信息~~~~}");
    }
}

效果图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/914639.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Python科研论文绘制学习 - task2

Matplotlib 1、subplot&#xff08;&#xff09; matplotlib.pyplot模块提供了一个 subplot() 函数&#xff0c;它可以均等地划分画布&#xff0c;该函数的参数格式如下&#xff1a; plt.subplot(nrows, ncols, index) nrows 与 ncols 表示要划分几行几列的子区域&#xff0…

Jmeter性能 —— 事务控制器

统计性能测试结果一定会关注TPS&#xff0c;TPS表示&#xff1a;每秒处理事务数&#xff0c;JMeter默认每个事务对应一个请求。我们可以用逻辑控制器中的事务控制器将多个请求统计为一个事务。 1、添加事务控制器 2、事务控制器参数说明 Generate parent sample&#xff1a;如…

Spring Framework

文章目录 一 Spring Framework简介 1.1 Spring Framework系统架构 1.2 对spring的理解 1.3 设计理念 二 核心 1. IoC 容器 1.1. Spring IoC容器和Bean简介 1.2. 容器概述 1.2.1. 配置元数据 1.2.2. 实例化一个容器 构建基于XML的配置元数据 Groovy Bean Definitio…

传递函数零极点对系统的影响

传递函数的零点和极点分别对系统的影响的详细介绍&#xff1a; 零点&#xff08;Zero&#xff09;的影响&#xff1a;传递函数的零点是使得传递函数的分子为零的点。零点对系统的频率响应和稳定性产生影响。具体而言&#xff1a; 频率响应&#xff1a;零点的位置会影响系统在不…

Linux常用配置(持续记录)

写在前面&#xff1a;学的东西太多&#xff0c;一个健忘的程序猿&#xff0c;只记得那啥那啥&#xff0c;这东西好像是这个&#xff0c;哎&#xff0c;又忘了。现在就记在这个小本本上&#xff0c;方便日后来回顾。 全局配置host 命令&#xff1a; vim /etc/hosts 作用&#…

使用EventLog Analyzer 进行路由器监控

路由器是任何计算机网络的构建块&#xff0c;引导网络中的流量&#xff0c;管理员需要确保路由器已配置并正常工作&#xff0c;以确保网络安全。 监控路由器中的用户活动 在网络安全方面&#xff0c;与路由器相关的风险是一个严重的问题。具有松散安全策略的网络使入侵者可以…

PCI9054入门1:硬件引脚定义、时序、FPGA端驱动源码

文章目录 1&#xff1a;PCI9054的FPGA侧&#xff08;local侧引脚定义&#xff09;2&#xff1a;PCI9054的C模式下的读写时序3&#xff1a;FPGA代码部分具体代码&#xff1a; 1&#xff1a;PCI9054的FPGA侧&#xff08;local侧引脚定义&#xff09; 而PCI9054的本地总线端的主要…

基于Googlenet深度学习网络的交通工具种类识别matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ....................................................................................% 获…

PHP反序列化 字符串逃逸

前言 最近在打西电的新生赛&#xff0c;有道反序列化的题卡了很久&#xff0c;今天在NSS上刷题的时候突然想到做法&#xff0c;就是利用字符串逃逸去改变题目锁死的值&#xff0c;从而实现绕过 为了研究反序列化的字符串逃逸 我们先简单的测试下 原理 <?php class escape…

大数据——一文熟悉HBase

1、HBase是什么 HBase是基于HDFS的数据存储&#xff0c;它建立在HDFS文件系统上面&#xff0c;利用了HDFS的容错能力&#xff0c;内部还有哈希表并利用索引&#xff0c;可以快速对HDFS上的数据进行随时读写功能。 Hadoop在已经有一个HiveMapReduce结构的数据读写功能&#x…

Apollo自动驾驶:引领未来的智能出行

自动驾驶技术正日益成为当今科技领域的焦点&#xff0c;它代表着未来出行的一大趋势&#xff0c;而Baidu公司推出的Apollo自动驾驶平台则在这一领域中展现出强大的领导地位。本文将深入探讨Apollo自动驾驶技术的关键特点、挑战以及它对未来智能出行的影响。 Apollo自动驾驶平台…

QT基础教程之二 第一个Qt小程序

QT基础教程之二 第一个Qt小程序 按钮的创建 在Qt程序中&#xff0c;最常用的控件之一就是按钮了&#xff0c;首先我们来看下如何创建一个按钮 QPushButton * btn new QPushButton; 头文件 #include <QPushButton>//设置父亲btn->setParent(this);//设置文字btn-&g…

C++进阶 多线程相关

本篇博客介绍&#xff1a; 主要介绍C中的一些线程操作以及线程库 C进阶 多线程相关 为什么要有线程库线程库介绍线程库常见的接口构造线程对象获取线程idjoin和deteach mutex库原子操作相关条件变量库总结 为什么要有线程库 我们在Linux中写多线程的时候使用的是Linux下提供的…

Windows使用MobaXterm远程访问ubuntu20.04桌面

参考ubuntu 2020.4 安装vnc 一、脚本文件 remote_setup.sh脚本文件内容&#xff1a; #! /bin/bash #参考链接&#xff1a;https://blog.csdn.net/hailangdeyingzi/article/details/124507304 sudo apt update sudo apt install x11vnc -y sudo x11vnc -storepasswd telpo.12…

【论文阅读】POIROT:关联攻击行为与内核审计记录以寻找网络威胁(CCS-2019)

POIROT: Aligning Attack Behavior with Kernel Audit Records for Cyber Threat Hunting CCS-2019 伊利诺伊大学芝加哥分校、密歇根大学迪尔伯恩分校 Milajerdi S M, Eshete B, Gjomemo R, et al. Poirot: Aligning attack behavior with kernel audit records for cyber thre…

Apache Paimon 实时数据湖 Streaming Lakehouse 的存储底座

摘要&#xff1a;本文整理自阿里云开源大数据表存储团队负责人&#xff0c;阿里巴巴高级技术专家李劲松&#xff08;之信&#xff09;&#xff0c;在 Streaming Lakehouse Meetup 的分享。内容主要分为四个部分&#xff1a; 流计算邂逅数据湖 Paimon CDC 实时入湖 Paimon 不止…

关于打包多模块SpringBoot项目并通过宝塔上传服务器

打包 —— 如何打包多模块项目&#xff0c;参考b站up主&#xff1a;[喜欢编程的代先生] 的视频 总结&#xff1a;1. 对着视频里看一下父模块和各个子模块pom.xml文件关于打包工具的依赖是否正确。2. 从最底层开始打包&#xff0c;逐层向上&#xff0c;最后再合并打包。 部署 …

网络安全(大厂)面试题

以下为网络安全各个方向涉及的面试题&#xff0c;星数越多代表问题出现的几率越大&#xff0c;祝各位都能找到满意的工作。 注&#xff1a;本套面试题&#xff0c;已整理成pdf文档&#xff0c;但内容还在持续更新中&#xff0c;因为无论如何都不可能覆盖所有的面试问题&#xf…

采购合同有哪些类型?应注意哪些内容?

采购合同为建立和管理客户与供应商的关系奠定了基础。在合同中&#xff0c;卖方同意向买方提供符合特定规格的用品&#xff0c;或承担买方的项目&#xff0c;并提供一个确定的价格&#xff08;通常有批量折扣&#xff09;。 作为回报&#xff0c;买方同意接收并支付一定数量的…

在ubuntu上部署label-studio

1. 安装label-studio 由于服务器的默认python3版本太低&#xff0c;尝试了很多方法&#xff0c;没有升级。因此采用annaconda方式安装label-studio. a.安装anaconda: 参照如下链接&#xff0c;安装anaconda。 Ubuntu安装Anaconda详细步骤&#xff08;Ubuntu22.04.1&#xff…