接上文
二、注册OP_WRITE写数据
服务端代码:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
* 服务端接收到数据后,缓存,注册OP_WRITE事件,收到状态转发数据
*
*/
class NIOSelectorOpWriteServer1 {
Selector selector;
public static void main(String[] args) throws IOException {
NIOSelectorOpWriteServer1 server = new NIOSelectorOpWriteServer1();
server.start(); // 开启监听和事件处理
}
public void start() {
initServer();
// selector非阻塞轮询有哪些感兴趣的事件到了
doService();
}
private void doService() {
if (selector == null) {
System.out.println("server init failed, without doing read/write");
return;
}
try {
while (true) {
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys(); // 感兴趣且准备好的事件
Iterator<SelectionKey> iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 删除当前元素,防止重复处理
// 下面根据事件进行分别处理
if (key.isAcceptable()) {
// 客户端连接事件
acceptHandler(key);
} else if (key.isReadable()) {
// 读取客户端数据
readHandler(key);
} else if (key.isWritable()) {
// 为了避免重复写,需要先去除OP_WRITE注册状态
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
writeHandler(key);
}
}
}
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
private void initServer() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
// 此时在selector上注册感兴趣的事件
// 这里先注册OP_ACCEPT: 客户端连接事件
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server init success");
} catch (IOException exception) {
exception.printStackTrace();
System.out.println("server init failied");
}
}
public void acceptHandler(SelectionKey key) {
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
try {
SocketChannel client = server.accept();
client.configureBlocking(false); // 设置client非阻塞
System.out.println("server receive a client :" + client);
// 注册OP_READ事件,用于从客户端读取数据
// 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
client.register(key.selector(), SelectionKey.OP_READ, buffer);
} catch (IOException exception) {
exception.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
System.out.println("read handler");
SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
buffer.clear(); // 使用前clear
// 防止数据分包,需要while循环读取
try {
while (true) {
int readLen = client.read(buffer);
if (readLen > 0) {
// 读取到数据了
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println("server read data from " + client + ", data is :" + new String(data));
// 给其他客户端注册OP_WRITE;
registerWrite(client, data);
} else if (readLen == 0) {
// 没读到数据
System.out.println(client + " : no data");
break;
} else if (readLen < 0) {
// client 关闭连接
// 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
System.out.println(client + " close");
client.close();
break;
}
}
} catch (IOException exception) {
exception.printStackTrace();
// client 关闭连接
System.out.println(client + " disconnect");
// todo:disconnect 导致一直有read事件,怎么办?
try {
client.close();
} catch (IOException ex) {
System.out.println("close ex");
}
}
}
public void writeHandler(SelectionKey key) {
System.out.println("write handler");
SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer,此时处于读模式
try {
while (buffer.hasRemaining()) {
client.write(buffer);
}
} catch (IOException exception) {
System.out.println("write error");
exception.printStackTrace();
}
}
private void registerWrite(SocketChannel myself, byte[] data) throws IOException {
Set<SelectionKey> keys = selector.keys();
// read/write 对应同一个key,同一个client不会发送两遍
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
if (channel instanceof SocketChannel && channel != myself) {
key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);
ByteBuffer attachment = (ByteBuffer) key.attachment();
attachment.clear();
attachment.put(data);
attachment.flip();
}
}
}
}
这里有几个注意项:
1.在注册OP_WRITE时,需要给所有其他客户端注册;
2.注册OP_WRITE时:是使用key.interestOps(key.interestOps() + SelectionKey.OP_WRITE);避免对原来的OP_READ事件进行覆盖;在OP_WRITE事件来的时候,要把先把OP_WRITE事件去掉,key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 防止重复写事件状态发生;
3.注册OP_WRITE时,要写的数据,直接给到了原来channel对应的attachment里了;在OP_WRITE事件来的时候,可以直接用;
到此,我们一定有个疑问:既然服务端关不关注OP_WRITE事件,都可以给客户端发送数据,意义何在?
那我们就要看下OP_WRITE事件的具备条件:send-queue是否有空间
而服务端要写数据要关注:服务端数据是否准备好了 + send-queue是否有空间
服务端一般都是在自己数据准备好了后,再注册对客户端的OP_WRITE事件。
但是上面的代码中,在给客户端写数据时,是一直写,直到数据写完,但是 send-queue空间有限,当 send-queue写满后,写操作就会阻塞,导致单线程下业务阻塞。。。
此时,OP_WRITE的优势就体现出来了
我们可以对OP_WRITE的使用方式稍微调整,就可以解决上面的问题:
当我们收到OP_WRITE事件,开始给客户端写数据后,当我们发现该客户端对应的send-queue写满,SocketChannel.write(buffer)会返回已经写出去的字节数,此时为0;我们根据此标志知道,此时send-queue满,不能再写了,此时我们记录下没有写的数据,再次给该客户端注册一个OP_WRITE事件,结束本次写过程;让业务线程继续处理其他事件,等到该客户端对应的send-queue有空闲的时候,会再次收到收到OP_WRITE事件,我们就可以继续写数据了;这样就是解决了写数据满导致业务阻塞的问题了。
关于上面的观点可以参考:
java nio selectedKey,关于 NIO 你不得不知道的一些“地雷”-CSDN博客文章浏览阅读302次。本文是笔者在学习NIO过程中发现的一些比较容易让人忽略的知识的一个总结,而这些让人忽略的小细节恰恰是NIO网络编程中必不可少。虽然现在我们不会直接编写NIO来完成我们的网络层通讯,而是使用成熟的基于NIO的网络框架来实现我们的网络层。如,netty、mina。但对NIO网络编程过程的了解,非常有助于我们更深入的理解netty、mina等网络框架,以至于能更好的使用它们。因此,本文并不对NIO的一些..._java nio selectionkey中的事件多次变化都能每监听到吗https://blog.csdn.net/weixin_39850920/article/details/115994629?spm=1001.2014.3001.5506
Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)-CSDN博客文章浏览阅读2.1k次,点赞5次,收藏23次。selectionKey.interestOps()就是已经注册的事件,SelectionKey中可以只用1个整形数字来表示多个注册的事件(interestOps变量),SelectionKey.OP_READ=1(二进制为 00000001),SelectionKey.OP_WRITE=4(二进制为 00000100),SelectionKey.OP_CONNECT=8(二进制为 00001000),SelectionKey.OP_ACCEPT=16(二进制为 00010000)。..._selectionkey.op_writehttps://blog.csdn.net/huyuyang6688/article/details/126106949?spm=1001.2014.3001.5506