Socket网络编程(五)——TCP数据发送与接收并行

news2024/12/23 19:46:16

目录

  • 主要实现需求
  • TCP 服务端收发并行重构
    • 启动main方法重构
    • 重构分离收发消息的操作
    • 重构接收消息的操作
    • 重构发送消息
    • TCPServer调用发送消息的逻辑
    • 监听客户端链接逻辑重构
    • Socket、流的退出与关闭
  • TCP 客户端收发并行重构
    • 客户端 main函数重构
    • 客户端接收消息重构
    • 客户端发送消息重构
    • 客户端 linkWith 主方法重构
  • TCP 收发并行重构测试
    • 服务端重构后执行日志
    • 客户端重构后执行日志
  • 源码下载

主要实现需求

多线程收发并行
TCP多线程收发协作
TCP 服务端收发并行重构

TCP 服务端收发并行重构

启动main方法重构

原有的main逻辑如下:
20240229-034932-Jk.png

重构后如下:

public class Server {
 
    public static void main(String[] args) throws IOException {
 
        TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
        boolean isSucceed = tcpServer.start();
        if(!isSucceed){
            System.out.println("Start TCP server failed.");
        }
        UDPProvider.start(TCPConstants.PORT_SERVER);
 
        // 键盘输入:
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        String str;
        do {
            str = bufferedReader.readLine();
            tcpServer.broadcast(str);
        } while (!"00bye00".equalsIgnoreCase(str));
 
        UDPProvider.stop();
        tcpServer.stop();
    }
}

重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。

重构分离收发消息的操作

创建 ClientHandler.java 重构收发消息操作:

public class ClientHandler {
    private final Socket socket;
    private final ClientReadHandler readHandler;
    private final ClientWriteHandler writeHandler;
    private final CloseNotiry closeNotiry;
 
    public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {
        this.socket = socket;
        this.readHandler = new ClientReadHandler(socket.getInputStream());
        this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
        this.closeNotiry = closeNotiry;
        System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());
    } 
}

重构接收消息的操作

    /**
     * 接收数据
     */
    class ClientReadHandler extends Thread {
 
        private boolean done = false;
        private final InputStream inputStream;
 
        ClientReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            super.run();
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = socketInput.readLine();
                    if(str == null){
                        System.out.println("客户端已无法读取数据!");
                        // 退出当前客户端
                        ClientHandler.this.exitBySelf();
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开");
                    ClientHandler.this.exitBySelf();
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建一个单独的线程进行接收消息,该线程不需要关闭。

重构发送消息

    /**
     * 发送数据
     */
    class ClientWriteHandler {
        private boolean done = false;
        private final PrintStream printStream;
        private final ExecutorService executorService;
 
        ClientWriteHandler(OutputStream outputStream) {
            this.printStream = new PrintStream(outputStream);
            // 发送消息使用线程池来实现
            this.executorService = Executors.newSingleThreadExecutor();
        }
 
        void exit(){
            done = true;
            CloseUtils.close(printStream);
            executorService.shutdown();
        }
 
        void send(String str) {
            executorService.execute(new WriteRunnable(str));
        }
 
        class WriteRunnable implements  Runnable{
            private final String msg;
 
            WriteRunnable(String msg){
                this.msg = msg;
            }
            @Override
            public void run(){
                if(ClientWriteHandler.this.done){
                    return;
                }
                try {
                    ClientWriteHandler.this.printStream.println(msg);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

TCPServer调用发送消息的逻辑

    public void broadcast(String str) {
        for (ClientHandler client : clientHandlerList){
            // 发送消息
            client.send(str);
        }
    }

监听客户端链接逻辑重构

    private List<ClientHandler> clientHandlerList = new ArrayList<>();
 
    /**
     * 监听客户端链接
     */
    private class ClientListener extends Thread {
        private ServerSocket server;
        private boolean done = false;
 
        private ClientListener(int port) throws IOException {
            server = new ServerSocket(port);
            System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());
        }
 
        @Override
        public void run(){
            super.run();
 
            System.out.println("服务器准备就绪~");
            // 等待客户端连接
            do{
                // 得到客户端
                Socket client;
                try {
                    client = server.accept();
                }catch (Exception e){
                    continue;
                }
                try {
                    // 客户端构建异步线程
                    ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
                    // 启动线程
                    clientHandler.readToPrint();
                    clientHandlerList.add(clientHandler);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("客户端连接异常: " + e.getMessage());
                }
 
            }while (!done);
            System.out.println("服务器已关闭!");
        }
        void exit(){
            done = true;
            try {
                server.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。

Socket、流的退出与关闭

    /**
     * 退出、关闭流
     */
    public void exit(){
        readHandler.exit();
        writeHandler.exit();
        CloseUtils.close(socket);
        System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());
    }
 
    /**
     * 发送消息
     * @param str
     */
    public void send(String str){
        writeHandler.send(str);
    }
 
    /**
     * 接收消息
     */
    public void readToPrint() {
        readHandler.exit();
    }
 
    /**
     *  接收、发送消息异常,自动关闭
     */
    private void exitBySelf() {
        exit();
        closeNotiry.onSelfClosed(this);
    }
    /**
     *  关闭流
     */
    public interface CloseNotiry{
        void onSelfClosed(ClientHandler handler);
    }

TCP 客户端收发并行重构

客户端 main函数重构

    public static void main(String[] args) {
        // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机
        ServerInfo info = UDPSearcher.searchServer(10000);
        System.out.println("Server:" + info);
 
        if( info != null){
            try {
                TCPClient.linkWith(info);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

客户端接收消息重构

    static class ReadHandler extends Thread{
        private boolean done = false;
        private final InputStream inputStream;
 
        ReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = null;
                    try {
                        str = socketInput.readLine();
                    }catch (SocketTimeoutException e){
 
                    }
                    if(str == null){
                        System.out.println("连接已关闭,无法读取数据!");
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开:" + e.getMessage());
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。

客户端发送消息重构

    private static void write(Socket client) throws IOException {
        // 构建键盘输入流
        InputStream in = System.in;
        BufferedReader input = new BufferedReader(new InputStreamReader(in));
 
        // 得到Socket输出流,并转换为打印流
        OutputStream outputStream = client.getOutputStream();
        PrintStream socketPrintStream = new PrintStream(outputStream);
 
        boolean flag = true;
        do {
            // 键盘读取一行
            String str = input.readLine();
            // 发送到服务器
            socketPrintStream.println(str);
 
            // 从服务器读取一行
            if("00bye00".equalsIgnoreCase(str)){
                break;
            }
        }while(flag);
        // 资源释放
        socketPrintStream.close();
    }

在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。

客户端 linkWith 主方法重构

     public static void linkWith(ServerInfo info) throws IOException {
        Socket socket = new Socket();
        // 超时时间
        socket.setSoTimeout(3000);
        // 端口2000;超时时间300ms
        socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//
 
        System.out.println("已发起服务器连接,并进入后续流程~");
        System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());
        System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());
 
        try {
            ReadHandler readHandler = new ReadHandler(socket.getInputStream());
            readHandler.start();
            // 发送接收数据
            write(socket);
        }catch (Exception e){
            System.out.println("异常关闭");
        }
 
        // 释放资源
        socket.close();
        System.out.println("客户端已退出~");
    }

原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。

TCP 收发并行重构测试

服务端重构后执行日志

20240229-053719-hC.png

客户端重构后执行日志

20240229-053740-Qt.png

源码下载

下载地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1482707.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

日常科研中经常使用的命令

Linux目录树状结构 1. Windows是磁盘分区&#xff0c;Linux不区分盘符&#xff0c;所有文件都在根目录斜线下面&#xff1b; 2. 根目录显示不同&#xff0c;Linux是一个斜线&#xff0c;而windows是盘符&#xff0c;然后冒号&#xff1b; 3. 分割目录Linux用斜线&#xff0c…

Odoo迈入开源第一低代码开发平台的重要里程碑

Odoo17的正式发布已经过去好几个月了&#xff0c;通过一段时间的运用&#xff0c;最大的感触就是&#xff0c;Odoo会成为企业管理软件低代码开发平台的重要一员&#xff0c;而V17则会成为这个过程中具有里程碑意义的版本。 时隔四个月&#xff0c;让我们回头来看看Odoo17带来的…

国防科大、中大0706大气科学及南信大大物院初试复习宝典——简答题知识点汇总

Attention&#xff01;有思维导图版 有Xmind导图格式的&#xff0c;更适合复习&#xff0c;下面是xmind导出的markdown格式&#xff0c;丑死了&#xff0c;可以移步去网盘查看 链接&#xff1a;https://pan.baidu.com/s/1ZcqfPWoyMd6eXf3_jvSVzQ?pwd7ij7 提取码&#xff1a;…

Node.js基础---Express中间件

1. 概念 1.什么是中间件 中间件(Middleware)&#xff0c;特指业务流程的中间处理环节 2. Express 中间件的调用流程 当一个请求到达 Express 的服务器后&#xff0c;可以连续调用多个中间件&#xff0c;从而对这次请求进行预处理 3. Express 中间件格式 Express 的中间件&…

Docker技术概论(1):Docker与虚拟化技术比较

Docker技术概论&#xff08;1&#xff09; Docker与虚拟化技术比较 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

鸿蒙学习day1基础语法 基础变量类型

在这里插入图片描述 什么是变量&#xff1a;变量就是一些数据 如125&#xff0c;‘字符串数据’ 通过一个符号来表示 变量的定义 方法 let 变量名&#xff1a;变量类型 ’ 各种数据’ ,let是关键字&#xff0c;系统给的用来定义变量的 let name: string 张亚洲; let age: …

MATLAB中function_handle函数用法

目录 说明 创建对象 示例 命名函数求积分 匿名函数求积分 function_handle函数所表示的是函数的句柄。 说明 函数句柄是一种表示函数的 MATLAB 数据类型。函数句柄的典型用法是将函数传递给另一个函数。例如&#xff0c;可以将函数句柄用作基于某个值范围计算数学表达式的…

Mybatis 主从表有名字相同,只能查询出一条数据

Mybatis 主从表有名字相同&#xff0c;只能查询出一条数据 重新命名后&#xff0c;可以正常查询

网络爬虫部分应掌握的重要知识点

目录 一、预备知识1、Web基本工作原理2、网络爬虫的Robots协议 二、爬取网页1、请求服务器并获取网页2、查看服务器端响应的状态码3、输出网页内容 三、使用BeautifulSoup定位网页元素1、首先需要导入BeautifulSoup库2、使用find/find_all函数查找所需的标签元素 四、获取元素的…

AI-数学-高中-29-样本的数字特征(标准差、方差)

原作者视频&#xff1a;【统计】【一数辞典】3样本的数字特征_哔哩哔哩_bilibili 标准差(s)、方差(S^2)公式&#xff1a;判断数据的稳定性。

137.乐理基础-协和音程、不协和音程

内容参考于&#xff1a; 三分钟音乐社 上一个内容&#xff1a;136.旋律音程、和声音程、自然音程、变化音程 上一个内容里练习的答案&#xff1a; 所有音程都可以分成协和音程与不协和音程两大类 协和音程又分三个小类&#xff1a; 第一个小类叫极完全协和音程&#xff0c;就…

特殊设计模式

▶实现一个类&#xff0c;不能被拷贝 ▶实现一个类&#xff0c;只能在堆上创建 ❗实现一个类&#xff0c;只能创建在栈上 ❗设计一个不能继承的类 ❗单例模式——一个类只能生成一个对象   ❔饿汉模式——在每次程序启动都会自动生成一个对象   ❓懒汉模式——在第一次需要…

202435读书笔记|《半小时漫画中国史》——读点经济学与历史,生活更美好,趣味烧脑土地制度、商鞅变法、华丽丽的丝绸之路这里都有

202435读书笔记|《半小时漫画中国史》——读点经济学与历史&#xff0c;生活更美好&#xff0c;趣味烧脑土地制度、商鞅变法、华丽丽的丝绸之路这里都有 1. 土地政策、度量衡及税收2. 商鞅变法3. 西汉经济4. 西汉盐铁大辩论5. 西汉丝绸之路 《半小时漫画中国史&#xff1a;经济…

​MPV,汽车产品里一个特殊品类的进化过程

「汽车」可能是整个工业革命以来&#xff0c;所诞生出的最有趣的工业产品。 它不仅能产生工业的机械美&#xff0c;还诞生了一个独立的文化体系&#xff0c;在汽车的发展过程中&#xff0c;我们也能看到一些本来应功能而诞生的产品&#xff0c;最终走向了千家万户。 MPV 就是…

Qt5.9.9交叉编译(带sqlite3、OpenSSL)

1、交叉编译工具链 这里ARM平台是ARM CortexA9的&#xff0c;一般交叉编译工具链demo板厂商都会提供&#xff0c;若未提供或想更换新版本的交叉编译工具链可参考以下方式获取。 1.1 下载适用于ARM CortexA9的交叉编译工具链 Linaro Releases下载gcc4的最新版xxxx-i686_arm-li…

使用 Docker 部署 Fiora 在线聊天室平台

一、Fiora 介绍 Fiora 简介 Fiora 是一款开源免费的在线聊天系统。 GitHub&#xff1a;https://github.com/yinxin630/fiora Fiora 功能 注册账号并登录&#xff0c;可以长久保存你的数据加入现有群组或者创建自己的群组&#xff0c;来和大家交流和任意人私聊&#xff0c;并添…

Cocos Creator 3.8.x 后效处理(前向渲染)

关于怎么开启后效效果我这里不再赘述&#xff0c;可以前往Cocos官方文档查看具体细节&#xff1a;后效处理官网 下面讲一下怎么自己定义一个后处理效果&#xff0c;想添加自己的后效处理的话只需要在postProcess节点下添加一个BlitScreen 组件即可&#xff0c;然后自己去添加自…

ElasticSearch搜索引擎使用指南

一、ES数据基础类型 1、数据类型 字符串 主要包括: text和keyword两种类型&#xff0c;keyword代表精确值不会参与分词&#xff0c;text类型的字符串会参与分词处理 数值 包括: long, integer, short, byte, double, float 布尔值 boolean 时间 date 数组 数组类型不…

div在vue的组件之中如何设置这个字体的颜色和样式大小

在Vue组件中设置<div>的字体颜色和样式大小可以通过两种主要方式实现&#xff1a;通过内联样式&#xff08;inline styles&#xff09;或者通过CSS类&#xff08;CSS classes&#xff09;。 使用内联样式 在Vue模板中直接在元素上使用style属性来设置样式。这种方法适用…

2024年春招小红书前端实习面试题分享

文章目录 导文面试重点一、方便介绍一下&#xff0c;你之前实习都做了什么嘛&#xff1f;二、 可以讲一下封装组件相关逻辑嘛&#xff1f;1. 为什么要封装组件&#xff1f;2. 封装组件的步骤3. 封装组件的原则4. 组件的复用和扩展5. 组件的维护和文档 三、项目的性能优化你有什…