写在前面
关于IO分类以及IO模型等理论知识,可以参考io之io分类和io模型这篇文章。本文主要来实现Java中相关IO模型实现程序。
1:BIO
blocking io,是Java io中对阻塞IO模型的具体实现。
因为不管是server端还是client端,都需要发送消息给对端,所以我们先来定义一个通道的处理器类负责完成消息发送的工作:
package com.dahuyou.io.model.bio;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;
public class ChannelHandler {
private Socket socket;
private Charset charset;
public ChannelHandler(Socket socket, Charset charset) {
this.socket = socket;
this.charset = charset;
}
public void writeAndFlush(Object msg) {
OutputStream out = null;
try {
out = socket.getOutputStream();
out.write((msg.toString()).getBytes(charset));
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public Socket socket() {
return socket;
}
}
再有就是client和server又是有所不同的,哪里不同呢?一个是通道建立成功是行为,以及读取到消息时的行为可能是不同的,所以再来定义一个适配器类来适配这些不同,并且在该类中依赖ChannleHandler,从而拥有向通道中发送消息的能力,代码如下:
package com.dahuyou.io.model.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.Charset;
public abstract class ChannelAdapter extends Thread {
private Socket socket;
private ChannelHandler channelHandler;
private Charset charset;
public ChannelAdapter(Socket socket, Charset charset) {
this.socket = socket;
this.charset = charset;
while (!socket.isConnected()) {
break;
}
channelHandler = new ChannelHandler(this.socket, charset);
channelActive(channelHandler);
}
@Override
public void run() {
try {
BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));
String str = null;
while ((str = input.readLine()) != null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channelRead(channelHandler, str);
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 链接通知抽象类
public abstract void channelActive(ChannelHandler ctx);
// 读取消息抽象类
public abstract void channelRead(ChannelHandler ctx, Object msg);
}
其中channelActive
通道建立方法和channelRead
通道数据读取方法作为钩子方法供子类来提供各自场景下的具体实现。
接着,来定义具体的client类:
package com.dahuyou.io.model.bio.client;
import com.dahuyou.io.model.bio.ChannelAdapter;
import com.dahuyou.io.model.bio.ChannelHandler;
import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
public class BioClientHandler extends ChannelAdapter {
public BioClientHandler(Socket socket, Charset charset) {
super(socket, charset);
}
@Override
public void channelActive(ChannelHandler ctx) {
System.out.println("链接报告LocalAddress:" + ctx.socket().getLocalAddress());
ctx.writeAndFlush("hi! BioClient to msg for you \r\n");
}
@Override
public void channelRead(ChannelHandler ctx, Object msg) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
ctx.writeAndFlush("hi, 我是client,我已经收到你的消息Success!\r\n");
}
}
定义具体的server类:
package com.dahuyou.io.model.bio.server;
import com.dahuyou.io.model.bio.ChannelAdapter;
import com.dahuyou.io.model.bio.ChannelHandler;
import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
public class BioServerHandler extends ChannelAdapter {
public BioServerHandler(Socket socket, Charset charset) {
super(socket, charset);
}
@Override
public void channelActive(ChannelHandler ctx) {
System.out.println("链接报告LocalAddress:" + ctx.socket().getLocalAddress());
ctx.writeAndFlush("hi! BioServer to msg for you \r\n");
}
@Override
public void channelRead(ChannelHandler ctx, Object msg) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
ctx.writeAndFlush("hi 我是server, 我已经收到你的消息Success!\r\n");
}
}
接下来就可以定义server和client的服务启动类了,定义server启动类:
package com.dahuyou.io.model.bio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
//public class BioServer extends Thread {
public class BioServer {
private ServerSocket serverSocket = null;
public static void main(String[] args) {
BioServer bioServer = new BioServer();
// bioServer.start();
bioServer.startServer();
}
private void startServer() {
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(7397));
System.out.println("bio server start suc!");
while (true) {
Socket socket = serverSocket.accept();
// BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));
BioServerHandler handler = new BioServerHandler(socket, Charset.forName("UTF-8"));
handler.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/*@Override
public void run() {
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(7397));
System.out.println("bio server start suc!");
while (true) {
Socket socket = serverSocket.accept();
// BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));
BioServerHandler handler = new BioServerHandler(socket, Charset.forName("UTF-8"));
handler.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}*/
}
定义client启动类:
package com.dahuyou.io.model.bio.client;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.Charset;
public class BioClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("192.168.10.91", 7397);
System.out.println("bio client start suc!");
BioClientHandler bioClientHandler = new BioClientHandler(socket, Charset.forName("utf-8"));
bioClientHandler.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行server:
运行client:
运行过程中:
附UML图:
2:NIO
new io,是Java io中对多路复用IO模型的具体实现。
因为不管是server端还是client端,都需要发送消息给对端,所以我们先来定义一个通道的处理器类负责完成消息发送的工作:
package com.dahuyou.io.model.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ChannelHandler {
private SocketChannel channel;
private Charset charset;
public ChannelHandler(SocketChannel channel, Charset charset) {
this.channel = channel;
this.charset = charset;
}
public void writeAndFlush(Object msg) {
try {
byte[] bytes = msg.toString().getBytes(charset);
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
public SocketChannel channel() {
return channel;
}
}
再有就是client和server又是有所不同的,哪里不同呢?一个是通道建立成功是行为,以及读取到消息时的行为可能是不同的,所以再来定义一个适配器类来适配这些不同,并且在该类中依赖ChannleHandler,从而拥有向通道中发送消息的能力,代码如下:
package com.dahuyou.io.model.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public abstract class ChannelAdapter extends Thread {
private Selector selector;
private ChannelHandler channelHandler;
private Charset charset;
public ChannelAdapter(Selector selector, Charset charset) {
this.selector = selector;
this.charset = charset;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
selector.select(1000); //Selects a set of keys whose corresponding channels are ready for I/O
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleInput(key);
}
} catch (Exception ignore) {
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (!key.isValid()) return;
// 客户端SocketChannel
Class<?> superclass = key.channel().getClass().getSuperclass();
if (superclass == SocketChannel.class){
SocketChannel socketChannel = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
channelHandler = new ChannelHandler(socketChannel, charset);
channelActive(channelHandler);
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
System.exit(1);
}
}
}
// 服务端ServerSocketChannel
if (superclass == ServerSocketChannel.class){
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
channelHandler = new ChannelHandler(socketChannel, charset);
channelActive(channelHandler);
}
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
channelRead(channelHandler, new String(bytes, charset));
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}
}
// 链接通知抽象类
public abstract void channelActive(ChannelHandler ctx);
// 读取消息抽象类
public abstract void channelRead(ChannelHandler ctx, Object msg);
}
其中channelActive
通道建立方法和channelRead
通道数据读取方法作为钩子方法供子类来提供各自场景下的具体实现。
接着,来定义具体的client处理器类,server处理器类:
package com.dahuyou.io.model.nio.client;
import com.dahuyou.io.model.nio.ChannelAdapter;
import com.dahuyou.io.model.nio.ChannelHandler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NioClientHandler extends ChannelAdapter {
public NioClientHandler(Selector selector, Charset charset) {
super(selector, charset);
}
@Override
public void channelActive(ChannelHandler ctx) {
try {
System.out.println("链接报告LocalAddress:" + ctx.channel().getLocalAddress());
ctx.writeAndFlush("hi! NioClient to msg for you \r\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandler ctx, Object msg) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
ctx.writeAndFlush("hi i am client, already receive your message!\r\n");
}
}
package com.dahuyou.io.model.nio.server;
import com.dahuyou.io.model.nio.ChannelAdapter;
import com.dahuyou.io.model.nio.ChannelHandler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NioServerHandler extends ChannelAdapter {
public NioServerHandler(Selector selector, Charset charset) {
super(selector, charset);
}
@Override
public void channelActive(ChannelHandler ctx) {
try {
System.out.println("链接报告LocalAddress:" + ctx.channel().getLocalAddress());
ctx.writeAndFlush("hi! NioServer to msg for you \r\n");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandler ctx, Object msg) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
ctx.writeAndFlush("hi i am nio server!\r\n");
}
}
server,client main类:
package com.dahuyou.io.model.nio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class NioClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
boolean isConnect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));
System.out.println("nio client started suc!");
if (isConnect) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
new NioClientHandler(selector, Charset.forName("GBK")).start();
}
}
package com.dahuyou.io.model.nio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.Charset;
public class NioServer {
private Selector selector;
private ServerSocketChannel socketChannel;
public static void main(String[] args) throws IOException {
new NioServer().bind(7397);
System.out.println("nio server started suc!");
}
public void bind(int port) {
try {
selector = Selector.open();
socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().bind(new InetSocketAddress(port), 1024);
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
new NioServerHandler(selector, Charset.forName("GBK")).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行测试:
附UML图:
3:AIO
主要类:
AsynchronousServerSocketChannel:服务端的异步通道类
AsynchronousSocketChannel:客户端的异步通道类
CompletionHandler:事件回调规范接口,连接建立事件,消息可读事件等
所以,异步其实就是基于事件驱动的方式来进行编程了,到处都是回调这种,不是很符合人类的线性思维,需要习惯下。
整体代码结构和bio以及nio比较类似,不同之处在于server类,定义了初始化ChannelInitializer
,来做一下初始化的工作,其中数据读取的监听类初始化如下:
public class AioServerChannelInitializer extends ChannelInitializer {
@Override
protected void initChannel(AsynchronousSocketChannel channel) throws Exception {
// channel.read(ByteBuffer.allocate(1024), 10, TimeUnit.SECONDS, null, new AioServerHandler(channel, Charset.forName("GBK")));
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer, buffer, new AioServerHandler(channel, Charset.forName("GBK")));
}
}
这样当有数据可读时就会调用AioServerHandler,当然AioServerHandler也是CompletionHandler的子类了,也是一个事件回调类。
server启动类如下:
public class AioServer extends Thread {
private AsynchronousServerSocketChannel serverSocketChannel;
@Override
public void run() {
try {
serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10));
serverSocketChannel.bind(new InetSocketAddress(7397));
System.out.println("dahuyou-study-netty aio server start done.");
// 等待
CountDownLatch latch = new CountDownLatch(1);
// 通过AioServerChannelInitializer,初始化数据读取的处理器(实现了接口CompletionHandler)
serverSocketChannel.accept(this, new AioServerChannelInitializer());
// 防止退出
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
public AsynchronousServerSocketChannel serverSocketChannel() {
return serverSocketChannel;
}
public static void main(String[] args) {
new AioServer().start();
}
}
client启动类如下:
package com.dahuyou.io.model.aio.client;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.Future;
public class AioClient {
public static void main(String[] args) throws Exception {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));
// socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));
// aio的a体现:指定一个回调类,建立连接成功时回调
socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("connect to nio server suc!");
ByteBuffer allocate = ByteBuffer.allocate(1024);
socketChannel.read(allocate, allocate, new AioClientHandler(socketChannel, Charset.forName("GBK")));
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
System.out.println("dahuyou-study-netty aio client start done.");
// future.get();
// 读数据
// clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch));
// socketChannel.read(ByteBuffer.allocate(1024), null, new AioClientHandler(socketChannel, Charset.forName("GBK")));
Thread.sleep(10000000);
}
}
运行cient输出:
dahuyou-study-netty aio client start done.
connect to nio server suc!
链接报告信息:/127.0.0.1:7397
nio client receive: congratulations, connected to aio server suc!
server对应输出:
dahuyou-study-netty aio server start done.
链接报告信息:/127.0.0.1:63537
nio server receive:nio client send back msg!
程序有点问题,就是,server只能给client发一个消息,用netassit测试也有这个问题,对这块不太熟,哪位大哥大姐有空帮给看看,发现问题还请留言告知,感谢!!!,问题对应的代码位置如下:
写在后面
参考文章列表
io之io分类和io模型 。
UML一一 类图关系 (泛化、实现、依赖、关联、聚合、组合) 。