一、NIO特点介绍
NIO全称 java non-blocking IO。从JDK 1.4开始,java提供了一些列改进的输入/输出(I/O)的新特性,被称为NIO,是同步非阻塞的,NIO相关类都被放在java.nio包及其子包下。 NIO是面向缓冲区的,或者面向块编程的,数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区内前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞的高伸缩性网络 Java NIO的非阻塞模式,使一个线程从某通道发送或者读取数据,但是它仅能得到目前可用的数据,如果目前没有可用的数据时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可读取之前,该线程可以继续做其他事情。非阻塞就是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 NIO三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)
在线程处理过程中,如果涉及到IO操作,那么当前的线程不会被阻塞,而是会去处理其他业务代码,然后等过段时间再来查询 IO 交互是否完成。如下图:Buffer 是一个缓冲区,用来缓存读取和写入的数据;Channel 是一个通道,负责后台对接 IO 数据;而 Selector 实现的主要功能,是主动查询哪些通道是处于就绪状态。Selector复用一个线程,来查询已就绪的通道,这样大大减少 IO 交互引起的频繁切换线程的开销.
二、NIO代码实现
2.1、客户端代码
import java. io. IOException ;
import java. net. InetSocketAddress ;
import java. nio. ByteBuffer ;
import java. nio. channels. SelectionKey ;
import java. nio. channels. Selector ;
import java. nio. channels. SocketChannel ;
import java. util. Iterator ;
import java. util. Scanner ;
import java. util. Set ;
public class NioClient implements Runnable {
private String host;
private int port;
private volatile boolean started;
private Selector selector;
private SocketChannel socketChannel;
public NioClient ( String ip, int port) {
this . host = ip;
this . port = port;
try {
selector = Selector . open ( ) ;
socketChannel = SocketChannel . open ( ) ;
socketChannel. configureBlocking ( false ) ;
started = true ;
} catch ( IOException e) {
e. printStackTrace ( ) ;
}
}
@Override
public void run ( ) {
try {
doConnect ( ) ;
} catch ( IOException e) {
e. printStackTrace ( ) ;
System . exit ( 1 ) ;
}
while ( started) {
try {
selector. select ( 1000 ) ;
Set < SelectionKey > keys = selector. selectedKeys ( ) ;
Iterator < SelectionKey > it = keys. iterator ( ) ;
SelectionKey key = null ;
while ( it. hasNext ( ) ) {
key = it. next ( ) ;
it. remove ( ) ;
try {
if ( key. isValid ( ) ) {
if ( key. isConnectable ( ) ) {
connectHandler ( key) ;
}
if ( key. isReadable ( ) ) {
readHandler ( key) ;
}
}
} catch ( Exception e) {
if ( key != null ) {
key. cancel ( ) ;
if ( key. channel ( ) != null ) {
key. channel ( ) . close ( ) ;
}
}
}
}
} catch ( Exception e) {
e. printStackTrace ( ) ;
System . exit ( 1 ) ;
}
}
if ( selector != null )
try {
selector. close ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
private void connectHandler ( SelectionKey key) throws IOException {
SocketChannel sc = ( SocketChannel ) key. channel ( ) ;
if ( sc. finishConnect ( ) ) {
socketChannel. register ( selector,
SelectionKey . OP_READ ) ;
} else System . exit ( 1 ) ;
}
private void readHandler ( SelectionKey key) throws IOException {
SocketChannel sc = ( SocketChannel ) key. channel ( ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 1024 ) ;
int readBytes = sc. read ( buffer) ;
if ( readBytes > 0 ) {
buffer. flip ( ) ;
byte [ ] bytes = new byte [ buffer. remaining ( ) ] ;
buffer. get ( bytes) ;
String result = new String ( bytes, "UTF-8" ) ;
System . out. println ( "客户端收到消息:" + result) ;
}
else if ( readBytes < 0 ) {
key. cancel ( ) ;
sc. close ( ) ;
}
}
private void doWrite ( SocketChannel channel, String request)
throws IOException {
byte [ ] bytes = request. getBytes ( ) ;
ByteBuffer writeBuffer = ByteBuffer . allocate ( bytes. length) ;
writeBuffer. put ( bytes) ;
writeBuffer. flip ( ) ;
channel. write ( writeBuffer) ;
}
private void doConnect ( ) throws IOException {
if ( socketChannel. connect ( new InetSocketAddress ( host, port) ) ) {
socketChannel. register ( selector, SelectionKey . OP_READ ) ;
} else {
socketChannel. register ( selector, SelectionKey . OP_CONNECT ) ;
}
}
public void sendMsg ( String msg) throws Exception {
doWrite ( socketChannel, msg) ;
}
public void stop ( ) {
started = false ;
}
public static void main ( String [ ] args) throws Exception {
NioClient nioClient = new NioClient ( "127.0.0.1" , 9998 ) ;
new Thread ( nioClient) . start ( ) ;
System . out. println ( "请输入消息:" ) ;
Scanner scanner = new Scanner ( System . in) ;
while ( true ) {
nioClient. sendMsg ( scanner. next ( ) ) ;
}
}
}
2.2、服务端代码
import java. io. IOException ;
import java. net. InetSocketAddress ;
import java. nio. ByteBuffer ;
import java. nio. channels. SelectionKey ;
import java. nio. channels. Selector ;
import java. nio. channels. ServerSocketChannel ;
import java. nio. channels. SocketChannel ;
import java. util. Iterator ;
import java. util. Set ;
public class NioServer {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean started;
public NioServer ( int port) {
try {
selector = Selector . open ( ) ;
serverChannel = ServerSocketChannel . open ( ) ;
serverChannel. configureBlocking ( false ) ;
serverChannel. socket ( )
. bind ( new InetSocketAddress ( port) , 1024 ) ;
serverChannel. register ( selector, SelectionKey . OP_ACCEPT ) ;
started = true ;
System . out. println ( "服务器已启动,端口号:" + port) ;
} catch ( IOException e) {
e. printStackTrace ( ) ;
System . exit ( 1 ) ;
}
}
public void run ( ) {
while ( started) {
try {
selector. select ( ) ;
Set < SelectionKey > keys = selector. selectedKeys ( ) ;
Iterator < SelectionKey > it = keys. iterator ( ) ;
SelectionKey key = null ;
while ( it. hasNext ( ) ) {
key = it. next ( ) ;
System . out. println ( "当前通道的事件:" + key. interestOps ( ) ) ;
it. remove ( ) ;
try {
if ( key. isValid ( ) ) {
if ( key. isAcceptable ( ) ) {
acceptHandle ( key) ;
}
if ( key. isReadable ( ) ) {
readHandler ( key) ;
}
if ( key. isWritable ( ) ) {
writHandler ( key) ;
}
}
} catch ( Exception e) {
if ( key != null ) {
key. cancel ( ) ;
if ( key. channel ( ) != null ) {
key. channel ( ) . close ( ) ;
}
}
}
}
} catch ( Throwable t) {
t. printStackTrace ( ) ;
}
}
if ( selector != null )
try {
selector. close ( ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
private void acceptHandle ( SelectionKey key) throws IOException {
ServerSocketChannel ssc = ( ServerSocketChannel ) key. channel ( ) ;
SocketChannel sc = ssc. accept ( ) ;
System . out. println ( "======socket channel 建立连接=======" ) ;
sc. configureBlocking ( false ) ;
sc. register ( selector, SelectionKey . OP_READ ) ;
}
private void readHandler ( SelectionKey key) throws IOException {
System . out. println ( "======socket channel 数据准备完成," +
"可以去读==读取=======" ) ;
SocketChannel sc = ( SocketChannel ) key. channel ( ) ;
ByteBuffer buffer = ByteBuffer . allocate ( 2 ) ;
int readBytes = sc. read ( buffer) ;
if ( readBytes > 0 ) {
buffer. flip ( ) ;
byte [ ] bytes = new byte [ buffer. remaining ( ) ] ;
buffer. get ( bytes) ;
String message = new String ( bytes, "UTF-8" ) ;
System . out. println ( "服务器收到消息:" + message) ;
String result = "服务器已经收到 message = " + message;
doWrite ( sc, result) ;
}
else if ( readBytes < 0 ) {
key. cancel ( ) ;
sc. close ( ) ;
}
}
private void writHandler ( SelectionKey key) throws IOException {
SocketChannel sc = ( SocketChannel ) key. channel ( ) ;
ByteBuffer buffer = ( ByteBuffer ) key. attachment ( ) ;
if ( buffer. hasRemaining ( ) ) {
int count = sc. write ( buffer) ;
System . out. println ( "write :" + count
+ "byte, remaining:" + buffer. hasRemaining ( ) ) ;
} else {
key. interestOps ( SelectionKey . OP_READ ) ;
}
}
private void doWrite ( SocketChannel channel, String response)
throws IOException {
byte [ ] bytes = response. getBytes ( ) ;
ByteBuffer writeBuffer = ByteBuffer . allocate ( bytes. length) ;
writeBuffer. put ( bytes) ;
writeBuffer. flip ( ) ;
channel. register ( selector, SelectionKey . OP_WRITE | SelectionKey . OP_READ ,
writeBuffer) ;
}
public void stop ( ) {
started = false ;
}
public static void main ( String [ ] args) {
NioServer nioServer = new NioServer ( 9998 ) ;
nioServer. run ( ) ;
}
}