1、socket通信建立流程
1.1、创建服务端流程
-
使用 socket 函数来创建 socket服务。
-
使用 bind 函数绑定端口。
-
使用 listen 函数监听端口。
-
使用 accept 函数接收客户端请求。
1.2、创建客户端流程
-
使用 socket 函数来创建 socket 服务。
-
使用 connect 函数连接到 socket 服务端。
以下图表演示了客户端与服务端之间的通信流程:
1.3、创建服务端代码
package com.example.dyc.mysocket;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MySocketServer {
// 预定义字典
private static final Map<String, String> dictionary;
//字典初始化
static {
dictionary = new HashMap<>();
dictionary.put("apple", "苹果");
dictionary.put("pear", "梨");
}
//定义服务器端口,范围在[0, 65535]
public static final int PORT = 8888;
public static void main(String[] args) throws IOException {
System.out.println(new Date() + ":" + 1);
ServerSocket serverSocket = new ServerSocket(PORT);//从CLOSED到LISTEN状态
System.out.println(new Date() + ":" + 2);
ExecutorService executorService = Executors.newFixedThreadPool(10);
while (true) {
System.out.println(new Date() + ":" + 3);
Socket socket = serverSocket.accept(); // 阻塞,等待客户端发起连接,建立连接,到ESTABLISHED状态
System.out.println(new Date() + ":" + 4);
SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();//得到客户端地址+端口
System.out.println(new Date() + ":" + remoteSocketAddress);
InputStream inputStream = socket.getInputStream();//得到输入流
Scanner scanner = new Scanner(inputStream, "UTF-8");//字符集编码
OutputStream outputStream = socket.getOutputStream();//得到输出流
Writer writer = new OutputStreamWriter(outputStream, "UTF-8");//字符集编码
PrintWriter printWriter = new PrintWriter(writer);
// TCP 是一种流式数据,没有明显分界的
// 隐含着我们的请求一定 XXXX\n
String request = scanner.nextLine();//由scanner得到客户端输入
String response = dictionary.getOrDefault(request, "没有找到");
// 响应的协议也是 XXX\n
printWriter.println(response);//把响应传入输出
printWriter.flush();//发送给客户端
socket.close(); // 关闭连接
}
}
}
1.4、创建客户端代码
package com.example.dyc.mysocket;
import java.io.*;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.Scanner;
public class MySocketClient {
public static void main(String[] args) throws IOException {
System.out.println(new Date() + ":" + 1);
Socket socket = new Socket("127.0.0.1", 8888);//刚建立完连接,传入客户端IP地址和端口
SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();//得到服务器地址+端口
System.out.println(new Date() + ":" + remoteSocketAddress);
InputStream inputStream = socket.getInputStream();//得到输入流
Scanner scanner = new Scanner(inputStream, "UTF-8");//字符集编码
OutputStream outputStream = socket.getOutputStream();//得到输出流
Writer writer = new OutputStreamWriter(outputStream, "UTF-8");//字符集编码
PrintWriter printWriter = new PrintWriter(writer);
printWriter.println("apple");//把apple传入输出
printWriter.flush();//输出发送给服务器
String response = scanner.nextLine();//由scanner得到输入的服务器响应
System.out.println(new Date() + ":" + response);
socket.close();//关闭连接
}
}
2、socket实现BIO
2.1、BIO
传统的网络通讯模型,就是BIO,同步阻塞IO, 其实就是服务端创建一个ServerSocket, 然后就是客户端用一个Socket去连接服务端的那个ServerSocket, ServerSocket接收到了一个的连接请求就创建一个Socket和一个线程去跟那个Socket进行通讯。接着客户端和服务端就进行阻塞式的通信,客户端发送一个请求,服务端Socket进行处理后返回响应,在响应返回前,客户端那边就阻塞等待,什么事情也做不了。 这种方式的缺点, 每次一个客户端接入,都需要在服务端创建一个线程来服务这个客户端,这样大量客户端来的时候,就会造成服务端的线程数量可能达到了几千甚至几万,这样就可能会造成服务端过载过高,最后崩溃死掉。
2.2、代码实现
Client
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyBIOClient {
public static void main(String[] args) throws Exception {
// 创建 Socket 客户端
Socket socket = new Socket();
// 与服务端建立连接
socket.connect(new InetSocketAddress("127.0.0.1", 8081));
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
int counter = 0;
while (counter < 5) {
String now = simpleDateFormat.format(new Date());
// 发送请求
socket.getOutputStream().write(now.getBytes("UTF-8"));
socket.getOutputStream().flush();
Thread.sleep(1000);
counter++;
}
// 若方法运行结束后,不调用 close 函数,服务端则会报错:java.net.SocketException: Connection reset
socket.close();
System.out.println("客户端关闭了 Socket 连接~!");
}
}
Serve
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyBIOServe {
public static void main(String[] args) throws Exception {
// 创建 Socket 服务端,并设置监听的端口
ServerSocket serverSocket = new ServerSocket(8081);
// 创建线程池以执行客户端请求(防止因请求过多,而导致的阻塞)
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
while (true) {
// 阻塞方法,监听客户端请求
Socket socket = serverSocket.accept();
System.out.println("\r\n" + socket);
// 创建自定义请求处理器
SocketHandler handler = new SocketHandler(socket);
// 处理客户端请求
poolExecutor.execute(handler);
}
}
}
SocketHandler
public class SocketHandler implements Runnable {
private Socket socket;
private static final byte[] BUFFER = new byte[1024];
@Override
public void run() {
try {
while (true){
System.out.println(Thread.currentThread().getName());
// 读取客户端 Socket 请求数据
int read = socket.getInputStream().read(BUFFER);
if (read != -1) {
System.out.println(new String(BUFFER, "UTF-8"));
}else{
socket.close();
System.out.println("服务端关闭了 Socket 连接~!");
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public SocketHandler(Socket socket) {
this.socket = socket;
}
}
3、NIO
3.1、NIO
NIO: NIO是一种同步非阻塞IO, 基于Reactor模型来实现的。其实相当于就是一个线程处理大量的客户端的请求,通过一个线程轮询大量的channel,每次就获取一批有事件的channel,然后对每个请求启动一个线程处理即可。这里的核心就是非阻塞,就那个selector一个线程就可以不停轮询channel,所有客户端请求都不会阻塞,直接就会进来,大不了就是等待一下排着队而已。这里面优化BIO的核心就是,一个客户端并不是时时刻刻都有数据进行交互,没有必要死耗着一个线程不放,所以客户端选择了让线程歇一歇,只有客户端有相应的操作的时候才发起通知,创建一个线程来处理请求。
3.2、代码实现
MyNIOClient2
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class MyNIOClient2 {
public static void main(String[] args) {
//创建远程地址
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
SocketChannel channel = null;
//定义缓存
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//开启通道
channel = SocketChannel.open();
//连接远程远程服务器
channel.connect(address);
Scanner sc = new Scanner(System.in);
while (true) {
System.out.println("客户端即将给 服务器发送数据..");
String line = "clinet2:" + sc.nextLine();
if (line.equals("exit")) {
break;
}
//控制台输入数据写到缓存
buffer.put(line.getBytes("UTF-8"));
//重置buffer 游标
buffer.flip();
//数据发送到数据
channel.write(buffer);
//清空缓存数据
buffer.clear();
//读取服务器返回的数据
int readLen = channel.read(buffer);
if (readLen == -1) {
break;
}
//重置buffer游标
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
//读取数据到字节数组
buffer.get(bytes);
System.out.println("收到了服务器发送的数据 : " + new String(bytes, "UTF-8"));
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
MyNIOService
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyNIOService extends Thread {
//1.声明多路复用器
private Selector selector;
//2.定义读写缓冲区
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//3.定义构造方法初始化端口
public MyNIOService(int port) {
init(port);
}
//4.main方法启动线程
public static void main(String[] args) {
new Thread(new MyNIOService(8888)).start();
}
//5.初始化
private void init(int port) {
try {
System.out.println("服务器正在启动......");
//1)开启多路复用器
this.selector = Selector.open();
//2) 开启服务通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//3)设置为非阻塞
serverSocketChannel.configureBlocking(false);
//4)绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
//5)注册,标记服务通标状态
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动完毕");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (true) {
try {
//1.当有至少一个通道被选中,执行此方法
this.selector.select();
//2.获取选中的通道编号集合
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
//3.遍历keys
while (keys.hasNext()) {
SelectionKey key = keys.next();
//4.当前key需要从集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱
keys.remove();
//5.判断通道是否有效
if (key.isValid()) {
try {
//6.判断是否可读
if (key.isAcceptable()) {
accept(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
try {
//7.判断是否可读
if (key.isReadable()) {
read(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
try {
//8.判断是否可写
if (key.isWritable()) {
write(key);
}
} catch (CancelledKeyException e) {
//出现异常断开连接
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
try {
//1.当前通道在init方法中注册到了selector中的ServerSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//2.阻塞方法, 客户端发起后请求返回.
SocketChannel channel = serverSocketChannel.accept();
//3.serverSocketChannel设置为非阻塞
channel.configureBlocking(false);
//4.设置对应客户端的通道标记,设置次通道为可读时使用
channel.register(this.selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//使用通道读取数据
private void read(SelectionKey key) {
try {
//清空缓存
this.readBuffer.clear();
//获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
//将通道的数据(客户发送的data)读到缓存中.
int readLen = channel.read(readBuffer);
//如果通道中没有数据
if (readLen == -1) {
//关闭通道
key.channel().close();
//关闭连接
key.cancel();
return;
}
//Buffer中有游标,游标不会重置,需要我们调用flip重置. 否则读取不一致
this.readBuffer.flip();
//创建有效字节长度数组
byte[] bytes = new byte[readBuffer.remaining()];
//读取buffer中数据保存在字节数组
readBuffer.get(bytes);
System.out.println("收到了从客户端 " + channel.getRemoteAddress() +
" : " + new String(bytes, "UTF-8"));
//注册通道,标记为写操作
channel.register(this.selector, SelectionKey.OP_WRITE);
} catch (Exception e) {
}
}
//给通道中写操作
private void write(SelectionKey key) {
//清空缓存
this.readBuffer.clear();
//获取当前通道对象
SocketChannel channel = (SocketChannel) key.channel();
//录入数据
Scanner scanner = new Scanner(System.in);
try {
System.out.println("即将发送数据到客户端..");
String line = scanner.nextLine();
//把录入的数据写到Buffer中
writeBuffer.put(line.getBytes("UTF-8"));
//重置缓存游标
writeBuffer.flip();
channel.write(writeBuffer);
//清空writeBuffer
writeBuffer.clear();
channel.register(this.selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、Netty
ps:后面再补充
参考文献:
Socket 之 BIO、NIO、Netty 简单实现 - 知乎 (zhihu.com)
什么是NIO?NIO和BIO,AIO之间的区别是什么? - 知乎 (zhihu.com)
java.nio.Buffer 中的 flip()方法_wrap.flip()-CSDN博客