Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(二)

news2024/11/19 5:26:29

接上文

二、注册OP_WRITE写数据

服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
 * 服务端接收到数据后,缓存,注册OP_WRITE事件,收到状态转发数据
 *
 */
class NIOSelectorOpWriteServer1 {
    Selector selector;

    public static void main(String[] args) throws IOException {
        NIOSelectorOpWriteServer1 server = new NIOSelectorOpWriteServer1();
        server.start(); // 开启监听和事件处理
    }

    public void start() {
        initServer();
        // selector非阻塞轮询有哪些感兴趣的事件到了
        doService();
    }

    private void doService() {
        if (selector == null) {
            System.out.println("server init failed, without doing read/write");
            return;
        }
        try {
            while (true) {
                while (selector.select() > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
                    Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove(); // 删除当前元素,防止重复处理
                        // 下面根据事件进行分别处理
                        if (key.isAcceptable()) {
                            // 客户端连接事件
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            // 读取客户端数据
                            readHandler(key);
                        } else if (key.isWritable()) {
                            // 为了避免重复写,需要先去除OP_WRITE注册状态
                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                            writeHandler(key);
                        }
                    }
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    private void initServer() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(9090));

            // 此时在selector上注册感兴趣的事件
            // 这里先注册OP_ACCEPT: 客户端连接事件
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("server init success");
        } catch (IOException exception) {
            exception.printStackTrace();
            System.out.println("server init failied");
        }
    }

    public void acceptHandler(SelectionKey key) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false); // 设置client非阻塞
            System.out.println("server receive a client :" + client);
            // 注册OP_READ事件,用于从客户端读取数据
            // 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
            ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
            client.register(key.selector(), SelectionKey.OP_READ, buffer);
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }

    public void readHandler(SelectionKey key) {
        System.out.println("read handler");
        SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
        ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
        buffer.clear(); // 使用前clear

        // 防止数据分包,需要while循环读取
        try {
            while (true) {
                int readLen = client.read(buffer);
                if (readLen > 0) {
                    // 读取到数据了
                    buffer.flip();
                    byte[] data = new byte[buffer.limit()];
                    buffer.get(data);
                    System.out.println("server read data from " + client + ", data is :" + new String(data));
                    // 给其他客户端注册OP_WRITE;
                    registerWrite(client, data);
                } else if (readLen == 0) {
                    // 没读到数据
                    System.out.println(client + " : no data");
                    break;
                } else if (readLen < 0) {
                    // client 关闭连接
                    // 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
                    System.out.println(client + " close");
                    client.close();
                    break;
                }
            }
        } catch (IOException exception) {
            exception.printStackTrace();
            // client 关闭连接
            System.out.println(client + " disconnect");
            // todo:disconnect 导致一直有read事件,怎么办?
            try {
                client.close();
            } catch (IOException ex) {
                System.out.println("close ex");
            }
        }
    }

    public void writeHandler(SelectionKey key) {
        System.out.println("write handler");
        SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
        ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer,此时处于读模式

        try {
            while (buffer.hasRemaining()) {
                client.write(buffer);
            }
        } catch (IOException exception) {
            System.out.println("write error");
            exception.printStackTrace();
        }
    }

    private void registerWrite(SocketChannel myself, byte[] data) throws IOException {
        Set<SelectionKey> keys = selector.keys();
        // read/write 对应同一个key,同一个client不会发送两遍
        for (SelectionKey key : keys) {
            SelectableChannel channel = key.channel();
            if (channel instanceof SocketChannel && channel != myself) {
                key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
                ByteBuffer attachment = (ByteBuffer) key.attachment();
                attachment.clear();
                attachment.put(data);
                attachment.flip();
            }
        }
    }
}

这里有几个注意项:

1.在注册OP_WRITE时,需要给所有其他客户端注册;

2.注册OP_WRITE时:是使用key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);避免对原来的OP_READ事件进行覆盖;在OP_WRITE事件来的时候,要把先把OP_WRITE事件去掉,key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 防止重复写事件状态发生;

3.注册OP_WRITE时,要写的数据,直接给到了原来channel对应的attachment里了;在OP_WRITE事件来的时候,可以直接用;

到此,我们一定有个疑问:既然服务端关不关注OP_WRITE事件,都可以给客户端发送数据,意义何在?

那我们就要看下OP_WRITE事件的具备条件:send-queue是否有空间

而服务端要写数据要关注:服务端数据是否准备好了 + send-queue是否有空间

服务端一般都是在自己数据准备好了后,再注册对客户端的OP_WRITE事件。

但是上面的代码中,在给客户端写数据时,是一直写,直到数据写完,但是 send-queue空间有限,当 send-queue写满后,写操作就会阻塞,导致单线程下业务阻塞。。。

此时,OP_WRITE的优势就体现出来了

我们可以对OP_WRITE的使用方式稍微调整,就可以解决上面的问题:

当我们收到OP_WRITE事件,开始给客户端写数据后,当我们发现该客户端对应的send-queue写满,SocketChannel.write(buffer)会返回已经写出去的字节数,此时为0;我们根据此标志知道,此时send-queue满,不能再写了,此时我们记录下没有写的数据,再次给该客户端注册一个OP_WRITE事件,结束本次写过程;让业务线程继续处理其他事件,等到该客户端对应的send-queue有空闲的时候,会再次收到收到OP_WRITE事件,我们就可以继续写数据了;这样就是解决了写数据满导致业务阻塞的问题了。

关于上面的观点可以参考:

java nio selectedKey,关于 NIO 你不得不知道的一些“地雷”-CSDN博客文章浏览阅读302次。本文是笔者在学习NIO过程中发现的一些比较容易让人忽略的知识的一个总结,而这些让人忽略的小细节恰恰是NIO网络编程中必不可少。虽然现在我们不会直接编写NIO来完成我们的网络层通讯,而是使用成熟的基于NIO的网络框架来实现我们的网络层。如,netty、mina。但对NIO网络编程过程的了解,非常有助于我们更深入的理解netty、mina等网络框架,以至于能更好的使用它们。因此,本文并不对NIO的一些..._java nio selectionkey中的事件多次变化都能每监听到吗https://blog.csdn.net/weixin_39850920/article/details/115994629?spm=1001.2014.3001.5506

Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)-CSDN博客文章浏览阅读2.1k次,点赞5次,收藏23次。selectionKey.interestOps()就是已经注册的事件,SelectionKey中可以只用1个整形数字来表示多个注册的事件(interestOps变量),SelectionKey.OP_READ=1(二进制为 00000001),SelectionKey.OP_WRITE=4(二进制为 00000100),SelectionKey.OP_CONNECT=8(二进制为 00001000),SelectionKey.OP_ACCEPT=16(二进制为 00010000)。..._selectionkey.op_writehttps://blog.csdn.net/huyuyang6688/article/details/126106949?spm=1001.2014.3001.5506

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1629528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32H750外设之ADC开关控制功能介绍

目录 概述 1 ADC开关控制介绍 2 开关控制功能流程 2.1 软件使能ADC 的流程 2.2 软件禁止 ADC 的流程 3 相关寄存器 3.1 ADCx_ISR 3.2 ADCx_CR 4 使能/禁止ADC流程图 ​5 写入 ADC 控制位时的限制 概述 本文介绍STM32H750外设之ADC开关控制功能&#xff0c;该功能是…

禅道项目管理系统身份认证绕过漏洞

禅道项目管理系统身份认证绕过漏洞 1.漏洞描述 禅道项目管理软件是国产的开源项目管理软件&#xff0c;专注研发项目管理&#xff0c;内置需求管理、任务管理、bug管理、缺陷管理、用例管理、计划发布等功能&#xff0c;完整覆盖了研发项目管理的核心流程。 禅道项目管理系统…

每日OJ题_DFS回溯剪枝⑧_力扣494. 目标和

目录 力扣494. 目标和 解析代码&#xff08;path设置成全局&#xff09; 解析代码&#xff08;path设置全局&#xff09; 力扣494. 目标和 494. 目标和 难度 中等 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - &#xff0c;然后串联…

“无媒体,不活动”,这句话怎么理解?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 “无媒体&#xff0c;不活动”通常指的是在现代社会中&#xff0c;媒体对于各种活动&#xff0c;尤其是公共活动和事件的推广、宣传和影响力是至关重要的。它强调了媒体在塑造公众意识、…

【Redis 开发】(Feed流的模式,GEO数据结构,BitMap,HyperLogLog)

Redis FeedTimeline GEOBitMapHyperLogLog Feed Feed流产品有两种常见模式: Timeline:不做内容筛选&#xff0c;简单的按照内容发布时间排序&#xff0c;常用于好友或关注。例如朋友圈 优点:信息全面&#xff0c;不会有缺失。并且实现也相对简单 缺点:信息噪音较多&#xff0c…

池化整合多元数据库,zData X 一体机助力证券公司IT基础架构革新

引言 近期&#xff0c;云和恩墨 zData X 多元数据库一体机&#xff08;以下简称 zData X&#xff09;在某证券公司的OA、短信和CRM业务系统中成功上线&#xff0c;标志着其IT基础架构完成从集中式存储向池化高性能分布式存储的转变。zData X 成功整合了该证券公司使用的达梦、O…

【VBA】获取指定目录下的Excel文件,并合并所有excel中的内容。

1.新建一个excel表格。并创建两个Sheet&#xff0c;名字分别命名为FileList 和 All information。 2.按ALTF11进入 VBA编程模块&#xff0c;插入模块。 3.将如下 第五部分代码复制到模块中。 点击运行即可&#xff0c;然后就能提取指定目录下的所有excel文件信息并合并到一起…

2021东北四省赛补题/个人题解

Dashboard - The 15th Chinese Northeast Collegiate Programming Contest - Codeforces I 模拟 #include <bits/stdc.h> using i64 long long; using namespace std; #define int long long int mp[8] {0, 7, 27, 41, 49, 63, 78, 108}; void solve() {int n; cin …

如何有效的将丢失的mfc140u.dll修复,几种mfc140u.dll丢失的解决方法

当你在运行某个程序或应用程序时&#xff0c;突然遭遇到mfc140u.dll丢失的错误提示&#xff0c;这可能会对你的电脑运行产生一些不利影响。但是&#xff0c;不要担心&#xff0c;以下是一套详细的mfc140u.dll丢失的解决方法。 mfc140u.dll缺失问题的详细解决步骤 步骤1&#x…

Atcoder Beginner Contest351 A-E Solution题解

文章目录 [A - The bottom of the ninth](https://atcoder.jp/contests/abc351/tasks/abc351_a)[B - Spot the Difference ](https://atcoder.jp/contests/abc351/tasks/abc351_b)[D - Grid and Magnet](https://atcoder.jp/contests/abc351/tasks/abc351_d)E Note&#xff1a;…

盲人地图使用的革新体验:助力视障人士独立、安全出行

在我们日常生活中&#xff0c;地图导航已经成为不可或缺的出行工具。而对于盲人群体来说&#xff0c;盲人地图使用这一课题的重要性不言而喻&#xff0c;它不仅关乎他们的出行便利性&#xff0c;更是他们追求生活独立与品质的重要一环。 近年来&#xff0c;一款名为蝙蝠…

《HCIP-openEuler实验指导手册》1.7 Apache虚拟主机配置

知识点 配置步骤 需求 域名访问目录test1.com/home/source/test1test2.com/home/source/test2test3.com/home/source/test3 创建配置文件 touch /etc/httpd/conf.d/vhost.conf vim /etc/httpd/conf.d/vhost.conf文件内容如下 <VirtualHost *.81> ServerName test1.c…

python中如何用matplotlib写雷达图

#代码 import numpy as np # import matplotlib as plt # from matplotlib import pyplot as plt import matplotlib.pyplot as pltplt.rcParams[font.sans-serif].insert(0, SimHei) plt.rcParams[axes.unicode_minus] Falselabels np.array([速度, 力量, 经验, 防守, 发球…

AtCoder Beginner Contest 351 G. Hash on Tree(树剖维护动态dp 口胡题解)

题目 n(n<2e5)个点&#xff0c;给定一个长为a的初始权值数组&#xff0c; 以1为根有根树&#xff0c; 树哈希值f计算如下&#xff1a; &#xff08;1&#xff09;如果一个点u是叶子节点&#xff0c;则f[u]a[u] &#xff08;2&#xff09;否则&#xff0c; q(q<2e5)次…

【C++】从零开始认识继承

送给大家一句话&#xff1a; 其实我们每个人的生活都是一个世界&#xff0c;即使最平凡的人也要为他生活的那个世界而奋斗。 – 路遥 《平凡的世界》 ✩◝(◍⌣̎◍)◜✩✩◝(◍⌣̎◍)◜✩✩◝(◍⌣̎◍)◜✩ ✩◝(◍⌣̎◍)◜✩✩◝(◍⌣̎◍)◜✩✩◝(◍⌣̎◍)◜✩ ✩◝(◍…

详解如何品味品深茶的精髓

在众多的茶品牌中&#xff0c;品深茶以其独特的韵味和深厚的文化底蕴&#xff0c;赢得了众多茶友的喜爱。今天&#xff0c;让我们一同探寻品深茶的精髓&#xff0c;品味其独特的魅力。 品深茶&#xff0c;源自中国传统茶文化的精髓&#xff0c;承载着世代茶人的智慧与匠心。这…

linux kernel内存泄漏检测工具之slub debug

一、背景 slub debug 是一个debug集&#xff0c;聚焦于kmem_cache 分配机制的slub内存&#xff08;比如kmalloc&#xff09;&#xff0c;这部分内存在内核中使用最频繁&#xff0c;slub debug其中有相当部分是用来处理内存踩踏&#xff0c;内存use after free 等异常的&#x…

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(Http板块)

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器&#xff08;Http板块&#xff09; 一、思路图二、Util板块1、Splite板块&#xff08;分词&#xff09;&#xff08;1&#xff09;代码&#xff08;2&#xff09;测试及测试结果i、第一种测试ii、第二种…

PotatoPie 4.0 实验教程(29) —— FPGA实现摄像头图像均值滤波处理

图像的均值滤波简介 图像均值滤波处理是一种常见的图像处理技术&#xff0c;用于降低图像中噪声的影响并平滑图像。该方法通过在图像中滑动一个固定大小的窗口&#xff08;通常是一个正方形或矩形&#xff09;&#xff0c;将窗口中所有像素的值取平均来计算窗口中心像素的新值…

GateWay具体的使用之全链路跟踪TraceId日志

1.创建全局过滤器&#xff0c;在请求头上带入traceId参数&#xff0c;穿透到下游服务. package com.by.filter;import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.jwt.JWTValidator;…