IO
- 第一章 BIO、NIO、AIO课程介绍
- 1.1 说明
- 1.2 可以解决的问题
- 第二章 Java I/O的演进
- 2.1 I/O模型基本说明
- 2.2 I/O模型
- BIO
- NIO
- AIO
- 2.3 适用场景
- 第三章 Java BIO深入解剖
- 3.1 Java BIO基本介绍
- 3.2 传统的BIO编程实例
- 3.3 单个客户端下的多发多收
- 3.4 BIO 模式下接收多个客户端
- 概述
- 服务器端代码
- 客户端代码
- 小结
- 3.5 伪异步I/O编程
- 概述
- 服务端源码
- 客户端源码
- 小结
- 3.6 基于Java BIO的文件上传
- 目标
- 客户端实现
- 服务端实现
- 3.7 Java BIO 模式下的端口转发思想
- 服务端代码实现
- 客户端代码实现
- 3.8 基于 BIO 模式下的即时通信案例
- 案例说明
- 工具类
- 服务端代码
- 客户端代码
- 第四章 Java NIO
- 4.1 NIO 概述
- NIO 线程模型
- 4.2 NIO-Channel 详解
- Channel 概述
- FileChannel
- FileChannel 读取数据
- FileChannel 写数据
- FileChannel 拷贝文件
- FileChannel 方法总结
- Socket通道介绍
- SocketChannel 通道
- ServerSocketChannel
- SocketChannel
- DatagramSocketChannel
- 聚集和分散
- 4.3 NIO-Buffer
- Buffer概述
- Buffer的关键属性
- Buffer的相关方法
- Buffer实现类的相关方法
- Buffer缓冲区的分类
- 4.4 NIO-Selector
- Selector 概述
- SelectableChannel 可选择通道
- 事件注册与监听
- SelectionKey选择键
- Selector的使用
- 案例实现
- 4.5 NIO包中的其他类
- Pipe
- FileLock
- Path
- Files
- 4.6 NIO 案例:聊天室
- 第五章 Java AIO
- 5.1 AIO 概述
- 5.2 AsynchronousFileChannel
- 5.3 AsynchronousSocketChannel
第一章 BIO、NIO、AIO课程介绍
1.1 说明
在Java的软件设计开发中,通信架构是不可避免的,我们在进行不同系统或者不同进程之间的数据交互,或者在高并发下的通信场景下都需要用到网络通信相关的技术,对于一些经验丰富的程序员来说, Java早 、期的网络通信架构存在一些缺陷, 其中最令人恼火的是基于性能低下的同步阻塞式的I/O通信(BIO) ,随着互联网开发下通信性能的高要求,Java在2002年开始支持了非阻塞式的I/O通信技术(NIO)。大多数读者在学习网络通信相关技术的时候,都只是接触到零碎的通信技术点,没有完整的技术体系架构,以至于对于Java的通信场景总是没有清晰的解决方案。本次课程将通过大量清晰直接的案例从最基础的BIO式通信开始介绍到NIO , AIO,读者可以清晰的了解到阻塞、同步、异步的现象、概念和特征以及优缺点。本课程结合了大量的案例让读者可以快速了解每种通信架构的使用。
1.2 可以解决的问题
- 局域网内的通信要求。
- 多系统间的底层消息传递机制。
- 高并发下,大数据量的通信场景需要。
- 游戏行业。无论是手游服务端,还是大型的网络游戏,Java语言都得到越来越广泛的应用。
第二章 Java I/O的演进
2.1 I/O模型基本说明
I/O 模型:就是用什么样的通道或者说是通信模式和 架构进行数据的传输和接收,很大程度上决定了程序通信的 性能,Java 共支持 3 种网络编程的/IO 模型:BIO、NIO、AIO
实际通信需求下,要根据不同的业务场景和性能需求决 定选择不同的I/O模型
2.2 I/O模型
BIO
同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动 一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
NIO
Java NIO : 同步非阻塞,服务器实现模式为一个线程处理 多个请求(连接),即客户端发送的连接请求都会注册到多路 复用器上,多路复用器轮询到连接有 I/O 请求就进行处理
AIO
Java AIO(NIO.2) : 异步 异步非阻塞,服务器实现模式为一 个有效请求一个线程,客户端的I/O请求都是由OS先完成了 再通知服务器应用去启动线程进行处理,一般适用于连接数 较多且连接时间较长的应用
2.3 适用场景
- BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
- NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。
- AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
第三章 Java BIO深入解剖
3.1 Java BIO基本介绍
- Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io 中
- BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器).
3.2 传统的BIO编程实例
-
网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。
-
传统的同步阻塞模型开发中,服务端ServerSocket负责绑定IP地址,启动监听端口;客户端Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 基于BIO模式下的通信,客户端 - 服务端是完全同步,完全耦合的。
服务端代码如下:
package d5_io.t7_bio.demo1;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
try {
System.out.println("======服务端启动=======");
// 1.创建一个ServerSocket对象进行服务端的端口注册
ServerSocket ss = new ServerSocket(9999);
// 2.监听客户端的Socket连接请求
Socket socket = ss.accept();
// 3.从客户端的Socket中获取一个字节输入流对象
InputStream is = socket.getInputStream();
// 4.把字节输入流包装成一个缓冲字符输入流 InputStreamReader:将一个字节输入流转换为一个字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = "";
// while ((msg = br.readLine()) != null) { // 如果使用while的话服务器会一直等待客户端发送消息,如果等不到客户端发送消息,那么将会抛出异常,SocektException: Connection reset
if ((msg = br.readLine()) != null) {
System.out.println("接收到消息:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端代码如下:
package d5_io.t7_bio.demo1;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
public static void main(String[] args) {
try {
System.out.println("======客户端启动=======");
// 1.创建Socket对象 - 需要传入服务器的地址和端口
Socket socket = new Socket("127.0.0.1", 9999);
// 2.从Socekt中获取字节输出流
OutputStream os = socket.getOutputStream();
// 3.把字节输出流包装成打印流
PrintStream ps = new PrintStream(os);
ps.println("你好,Server");
ps.flush();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
小结
-
在以上通信中,服务端会一致等待客户端的消息,如果客户 端没有进行消息的发送,服务端将一直进入阻塞状态。
-
同时服务端是按照行获取消息的,这意味着客户端也必须按 照行进行消息的发送,否则服务端将进入等待消息的阻塞状态!
3.3 单个客户端下的多发多收
客户端代码如下:
package d5_io.t7_bio.demo2;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
try {
System.out.println("======客户端启动=======");
// 1.创建Socket对象 - 需要传入服务器的地址和端口
Socket socket = new Socket("127.0.0.1", 9999);
// 2.从Socekt中获取字节输出流
OutputStream os = socket.getOutputStream();
// 3.把字节输出流包装成打印流
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("输入:");
String msg = scanner.nextLine();
ps.println(msg);
ps.flush();
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端代码如下:
package d5_io.t7_bio.demo2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月8日 上午8:48:10
* Version: V1.0
* Description: 服务端 需要实现:客户端发送信息,服务端接收消息
*/
public class Server {
public static void main(String[] args) {
try {
System.out.println("======服务端启动=======");
// 1.创建一个ServerSocket对象进行服务端的端口注册
ServerSocket ss = new ServerSocket(9999);
// 2.监听客户端的Socket连接请求
Socket socket = ss.accept();
// 3.从客户端的Socket中获取一个字节输入流对象
InputStream is = socket.getInputStream();
// 4.把字节输入流包装成一个缓冲字符输入流 InputStreamReader:将一个字节输入流转换为一个字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = "";
while ((msg = br.readLine()) != null) {
System.out.println("接收到消息:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
总结:
本案例中确实可以实现客户端多发多收但是服务端只能处理一个客户端的请求,因为服务端是单线程的。一次只能与一个客户端进行消息通信。
3.4 BIO 模式下接收多个客户端
概述
在上述的案例中,一个服务端只能接收一个客户端的通信请求,那么如果服务端需要处理很多个客户端的消息通信请求应该如何处理呢,此时我们就需要在服务端引入线程了,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求,这样
就实现了一个客户端一个线程的模型,图解模式如下:
服务器端代码
Socket线程类
package d5_io.t7_bio.demo3;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月8日 上午10:25:17
* Version: V1.0
* Description: 服务端Socket线程类,用于为每个客户端的Socket连接请求创建一个线程来读取用户的消息
*/
public class ServerThreadReader implements Runnable {
private Socket socket;
public ServerThreadReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 1.根据当前Socket连接获取字节输入流
InputStream is = socket.getInputStream();
// 2.将字节输入流转为字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
// 读取客户端发送的数据
while (null != (msg = br.readLine())) {
System.out.println("收到消息:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端
package d5_io.t7_bio.demo3;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月8日 上午10:21:09
* Version: V1.0
* Description: 服务端,用于接收用户发送的消息
*/
public class Server {
public static void main(String[] args) {
try {
// 1.创建 ServerSocket 对象进行服务端的端口注册
ServerSocket ss = new ServerSocket(9999);
// 2.循环监听客户端的Socket连接请求
while (true) {
// 获取客户端的 Socket 连接对象
Socket socket = ss.accept();
// 为当前客户端创建一个线程进行I/O读取操作
new Thread(new ServerThreadReader(socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端代码
package d5_io.t7_bio.demo3;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
/**
* Author: luxianghai
* Date: 2022年12月8日 上午8:48:20
* Version: V1.0
* Description: 客户端
*/
public class Client {
public static void main(String[] args) {
try {
System.out.println("======客户端启动=======");
// 1.创建Socket对象 - 需要传入服务器的地址和端口
Socket socket = new Socket("127.0.0.1", 9999);
// 2.从Socekt中获取字节输出流
OutputStream os = socket.getOutputStream();
// 3.把字节输出流包装成打印流
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("输入:");
String msg = scanner.nextLine();
ps.println(msg);
ps.flush();
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
小结
- 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
- 每个线程都会占用栈空间和CPU资源;
- 并不是每个socket都进行IO操作,无意义的线程处理;
- 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。
3.5 伪异步I/O编程
概述
在上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。
接下来我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
图示如下:
服务端源码
服务端
package d5_io.t7_bio.demo4;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月9日 上午9:38:34
* Version: V1.0
* Description: 服务端:用于接收客户端发送的消息
*/
public class Server {
public static void main(String[] args) {
try {
// 1. 注册服务器端口
ServerSocket ss = new ServerSocket(9999);
// 2.创建一个用于处理接收客户端消息的线程池
ServerThreadPool pool = new ServerThreadPool(10, 6);
while (true) {
Socket socket = ss.accept();
// 3.每次收到一个客户端的连接请求,都需要为该连接分配一个线程池中的线程
// 用于完成业务
pool.execute(new ServerTaskObject(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ServerThreadPool
package d5_io.t7_bio.demo4;
/**
* Author: luxianghai
* Date: 2022年12月9日 上午9:41:42
* Version: V1.0
* Description: 服务端线程池类,用于为每个客户端分配一个线程池中的线程,以此来完成任务
*/
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ServerThreadPool {
// 1.创建线程池对象
private ExecutorService executorService;
/**
* 2.初始化线程池
*/
public ServerThreadPool(int maxPoolSize, int quenenSize) {
executorService = new ThreadPoolExecutor(
3, // 活动线程数量
maxPoolSize, // 最大线程数量
120, // 空闲时的最大存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<Runnable>(quenenSize)); // 任务阻塞队列,需要传入支持的任务数
}
/**
* 3.执行任务
* @param task - 任务
*/
public void execute(Runnable task) {
this.executorService.execute(task);
}
}
ServerTaskObject
package d5_io.t7_bio.demo4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月9日 上午9:53:24
* Version: V1.0
* Description: 用于将客户端的Socket包装为一个线程
*/
public class ServerTaskObject implements Runnable {
private Socket socket;
public ServerTaskObject(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 1.根据socket获取字节输入流
InputStream is = socket.getInputStream();
// 2.将字节输入流包装为字符输入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
// 3.读取消息
while ((msg = br.readLine()) != null) {
System.out.println("收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端源码
Client
package d5_io.t7_bio.demo4;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
/**
* Author: luxianghai
* Date: 2022年12月9日 上午9:59:26
* Version: V1.0
* Description: TODO(describle the function of this class in one sentence)
*/
public class Client {
public static void main(String[] args) {
try {
// 1.创建一个与服务端通信的Socket对象
Socket socket = new Socket("127.0.0.1", 9999);
// 2.根据socket获取字节输出流
OutputStream os = socket.getOutputStream();
// 3.将字节输出流包装为
PrintStream pw = new PrintStream(os);
Scanner in = new Scanner(System.in);
// 4.循环发送消息到服务端
while (true) {
pw.println(in.nextLine());
pw.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
小结
- 伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题。
- 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续socket的i/o消息都将在队列中排队。新的Socket请求将被拒绝,客户端会发生大量连接超时。
3.6 基于Java BIO的文件上传
目标
客户端可以上传任意类型的文件到服务器端,然后服务器端将其保存下来
客户端实现
package d5_io.t7_bio.demo5;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午3:59:02
* Version: V1.0
* Description: 客户端 - 向服务端发送文件
*/
public class Client {
public static void main(String[] args) {
String file = "F:\\Java\\知识\\c.IO流与网络编程\\AIO-BIO-NIO.assets\\image-20221208083257547.png";
try {
// 1.与服务端建立连接
Socket socket = new Socket("127.0.0.1", 9999);
// 2.创建数据输出流
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
// 3.发送文件后缀
dos.writeUTF( file.substring(file.lastIndexOf("."), file.length()) );
// 4.读取文件并将其输出到服务端
InputStream is = new FileInputStream(file);
byte[] buffer = new byte[1024];
int len = 0;
while ( (len = is.read(buffer)) > 0 ) {
dos.write(buffer, 0, len);
}
dos.flush();
is.close();
dos.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务端实现
Server
package d5_io.t7_bio.demo5;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午3:58:46
* Version: V1.0
* Description: 用于接收客户端发送的文件,并将其保存到服务端目录下
*/
public class Server {
public final static String FILE_PATH = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\";
public static void main(String[] args) {
try {
// 1.创建服务端
ServerSocket ss = new ServerSocket(9999);
while (true) {
// 2.接收客户端的连接请求
Socket socket = ss.accept();
// 3.为每一个客户端的连接请求分配一个线程来完成文件保存操作
new ServerReaderThread(socket).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ServerReaderThread
package d5_io.t7_bio.demo5;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午4:03:28
* Version: V1.0
* Description: 用于将客户端的每个Socket连接指定一个线程来完成文件保存
*/
public class ServerReaderThread extends Thread {
private Socket socket;
public ServerReaderThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 1.获取数据输入流来读取客户端发送过来的数据
DataInputStream dis = new DataInputStream(socket.getInputStream());
// 2.读取后缀
String suffix = dis.readUTF();
// 3.创建字节输出流,用于将读取的文件输出到服务端的文件路径下
OutputStream os = new FileOutputStream( Server.FILE_PATH + UUID.randomUUID() + suffix );
System.out.println("接收到客户端发送的文件!");
byte[] buffer = new byte[1024];
int len = 0;
while ( (len = dis.read(buffer)) > 0 ) {
os.write(buffer, 0, len);
}
os.close();
dis.close();
System.out.println("服务器成功保存客户端发送的文件!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.7 Java BIO 模式下的端口转发思想
需求:实现一个客户端的消息可以发送给所有的客户端去接收。(群聊实现)
服务端代码实现
服务端线程类:
package d5_io.t7_bio.demo6;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午7:12:45
* Version: V1.0
* BIO模式下的端口转发思想-聊天室的实现
* 服务端实现的需求:
* 1. 注册端口
* 2. 把客户端的Socket连接交给一个独立的线程来进行处理
* 3. 把当前连接的客户端Socket存入到一个在线Socket集合中
* 4. 一个客户端发送消息时将信息发送給所有在线Socket
*/
public class Server {
public static List<Socket> onlineSocket = new ArrayList<>();
public static void main(String[] args) {
try {
// 1.注册端口
ServerSocket ss = new ServerSocket(9999);
//
while (true) {
// 2.接收用户连接请求
Socket socket = ss.accept();
// 3.将客户端的socket保存中在线socket集合中
onlineSocket.add(socket);
// 4. 把客户端socket连接交给一个独立的线程来进行处理
new ServerReaderThread(socket).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
服务端:
package d5_io.t7_bio.demo6;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午7:12:45
* Version: V1.0
* BIO模式下的端口转发思想-聊天室的实现
* 服务端实现的需求:
* 1. 注册端口
* 2. 把客户端的Socket连接交给一个独立的线程来进行处理
* 3. 把当前连接的客户端Socket存入到一个在线Socket集合中
* 4. 一个客户端发送消息时将信息发送給所有在线Socket
*/
public class Server {
public static List<Socket> onlineSocket = new ArrayList<>();
public static void main(String[] args) {
try {
// 1.注册端口
ServerSocket ss = new ServerSocket(9999);
//
while (true) {
// 2.接收用户连接请求
Socket socket = ss.accept();
// 3.将客户端的socket保存中在线socket集合中
onlineSocket.add(socket);
// 4. 把客户端socket连接交给一个独立的线程来进行处理
new ServerReaderThread(socket).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端代码实现
客户端线程类:
package d5_io.t7_bio.demo6;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* Author: luxianghai
* Date: 2022年12月12日 上午12:26:31
* Version: V1.0
* 用于接收服务端转发到客户端的消息处理线程
*/
public class ClientReceiveThread extends Thread {
private Socket socket;
public ClientReceiveThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 根据socket对象获取输入流对象,用于读取服务端转发的消息
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg;
while ( (msg = br.readLine()) != null ) {
System.out.println("接收到消息:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端:
package d5_io.t7_bio.demo6;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* Author: luxianghai
* Date: 2022年12月11日 下午7:14:30
* Version: V1.0
* 客户端:
* 1.向服务端发送消息
* 2.接收服务端发送过来的消息
*/
public class Client {
public static void main(String[] args) {
try {
// 建立与服务端的连接
Socket socket = new Socket("127.0.0.1", 9999);
// 根据socket连接获取打印流
PrintStream ps = new PrintStream(socket.getOutputStream());
Scanner in = new Scanner(System.in);
// 用一个独立的线程处理服务端转发到客户端的消息
new ClientReceiveThread(socket).start();
// 客户端发送消息的线程处理
while (true) {
ps.println( in.nextLine() );
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
在Java中,如果你通过socket获取一个打印流,并且关闭了这个打印流,这会导致底层的socket连接也被关闭。这是因为打印流的close()方法会调用底层的socket的close()方法,从而导致socket连接被关闭。
要想避免这种情况,可以在使用完打印流后,不要直接关闭它,而是关闭它所包装的流。
3.8 基于 BIO 模式下的即时通信案例
基于BIO模式下的即时通信,我们需要解决客户端到客户端的通 信,也就是需要实现客户端与客户端的端口消息转发逻辑。
案例说明
本项目案例为即时通信的软件项目,适合基础加强的大案例,具备综合性。学
习本项目案例至少需要具备如下Java SE技术点:
- Java 面向对象设计,语法设计。
- 多线程技术。
- IO流技术。
- 网络通信相关技术。
- 集合框架。
- 项目开发思维。
- Java 常用 api 使用。
- …
功能清单简单说明:
-
客户端登陆功能
可以启动客户端进行登录,客户端登陆只需要输入用户名
和服务端ip地址即可。 -
在线人数实时更新。
客户端用户户登陆以后,需要同步更新所有客户端的联系人信息栏。 -
离线人数更新
检测到有客户端下线后,需要同步更新所有客户端的联系人信息栏。 -
群聊
任意一个客户端的消息,可以推送给当前所有客户端接收。 -
私聊
可以选择某个员工,点击私聊按钮,然后发出的消息可以被该客户端单独接收。 -
@消息
可以选择某个员工,然后发出的消息可以@该用户,但是其他所有人都能看到该消息 -
消息用户和消息时间点
服务端可以实时记录该用户的消息时间点,然后进行消息的多路转发或者选择。
在 Java 中,Socket 可能会在以下情况下被关闭:
- 当调用 Socket 的 close() 方法时,该 Socket 将被关闭。
- 如果 Socket 连接的远程主机关闭了连接,则该 Socket 也会被关闭。
- 如果在读取 Socket 数据时发生错误,则该 Socket 也会被关闭。
- 如果在写入 Socket 数据时发生错误,则该 Socket 也会被关闭。
注意:当 Socket 被关闭后,它将不能再被使用。如果需要重新打开该 Socket,必须创建一个新的 Socket 对象。
当Socket被关闭时,由Socket创建出来的流均会被关闭
工具类
package d5_io.t7_bio.demo8.utils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Author: luxianghai
* Date: 2022年12月12日 下午3:02:43
* Version: V1.0
* Description: TODO(describle the function of this class in one sentence)
*/
public class Constants {
public static final String ADDR = "127.0.0.1";
public static final int PORT = 8989;
public static final int LOGIN = 1;
public static final int SEND_TO_ONE = 2; // 私发
public static final int SEND_TO_ALL = 3; // 群发,@消息
public static final String USERS_SPLIT = "♠♣♧♡♥❤❥❣♂㊚㊛囍";
public static void sendMsg(int flag, String msg, Socket socket, String destUser) throws IOException {
// 1.根据socket连接获取输出流,并包装为数据流
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
// 2.写消息类型
dos.writeInt(flag);
// 3.写消息体
dos.writeUTF(msg);
if ( flag == Constants.SEND_TO_ONE ) {
dos.writeUTF(destUser);
}
dos.flush();
}
public static String getTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
return format.format(new Date());
}
}
服务端代码
服务端
package d5_io.t7_bio.demo8.server;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import d5_io.t7_bio.demo8.utils.Constants;
/**
* Author: luxianghai
* Date: 2022年12月12日 下午3:01:44
* Version: V1.0
* 服务端
* 任务:
* 1. 接收客户端发送的消息并对其做相应的处理
* 登陆消息,私发消息,群发消息,@消息
* 2. 监测客户端的上线和离线,并更新在线列表
* 3.
*/
public class ServerChat {
// 在线socket集合,键为socket,值为用户名
public static Map<Socket, String> onlineSocketList = new HashMap<>();
public static void main(String[] args) {
System.out.println("服务端启动成功:");
try {
// 1.注册端口
ServerSocket ss = new ServerSocket(Constants.PORT);
// 2.
while (true) {
// 获取客户端的连接请求
Socket socket = ss.accept();
// 将对消息的处理交给一个独立的线程来完成
new ServerThreadHandler(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端线程处理类
package d5_io.t7_bio.demo8.server;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.util.Set;
import d5_io.t7_bio.demo8.utils.Constants;
/**
* Author: luxianghai
* Date: 2022年12月12日 下午3:15:38
* Version: V1.0
* 处理消息
*/
public class ServerThreadHandler extends Thread {
private Socket socket;
public ServerThreadHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
DataOutputStream dos = null;
DataInputStream dis = null;
try {
// 1.根据socket获取输入流和输出流,并将其封装为数据流
dos = new DataOutputStream(socket.getOutputStream());
dis = new DataInputStream(socket.getInputStream());
while (true) {
// 2.获取数据类型
int flag = dis.readInt();
// 登陆消息
if (flag == Constants.LOGIN) {
String username = dis.readUTF(); // 读取用户名
// 将用户添加到在线socket集合中
ServerChat.onlineSocketList.put(socket, username);
}
// 发送消息
sendMsg(flag, dos);
}
} catch (SocketException e) {
try {
System.out.println("有人下线了");
ServerChat.onlineSocketList.remove(socket);
// 发送消息
sendMsg(Constants.LOGIN, dos);
} catch (IOException e1) {
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送消息
* @param flag
* @param dos
* @throws IOException
*/
private void sendMsg(int flag, DataOutputStream dos) throws IOException {
DataInputStream dis = new DataInputStream(socket.getInputStream());
// 如果是登陆,则将在线用户更新到每一个客户端
if (flag == Constants.LOGIN) {
StringBuilder sb = new StringBuilder();
Set<Socket> keys = ServerChat.onlineSocketList.keySet();
for (Socket s: keys) {
sb.append(ServerChat.onlineSocketList.get(s) + Constants.USERS_SPLIT);
}
sendToAll(flag, sb.toString());
} else if (flag == Constants.SEND_TO_ALL) { // 发送给所有人
String msg = ServerChat.onlineSocketList.get(socket) + " " + Constants.getTime() + "\r\n " + dis.readUTF() + "\r\n";
sendToAll( flag, msg);
} else if (flag == Constants.SEND_TO_ONE) {
sendToOne(flag, dis.readUTF(), dis.readUTF());
}
}
/**
* 私发消息
* @param flag
* @param msg
* @param destUserName - 要私发给谁
* @throws IOException
*/
private void sendToOne(int flag, String msg, String destUserName) throws IOException {
for (Socket s: ServerChat.onlineSocketList.keySet()) {
if (ServerChat.onlineSocketList.get(s).equals(destUserName)) {
msg = "【" + ServerChat.onlineSocketList.get(socket) + " -> 我】 " + Constants.getTime() + "\r\n " + msg + "\r\n";
Constants.sendMsg(flag, msg, s, destUserName);
break;
}
}
}
/**
* 将信息发送给所有人
* @param flag
* @param string
* @throws IOException
*/
private void sendToAll(int flag, String msg) throws IOException {
for (Socket s: ServerChat.onlineSocketList.keySet()) {
Constants.sendMsg(flag, msg, s, null);
}
}
}
客户端代码
客户端UI
package d5_io.t7_bio.demo8.client;
import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Dimension;
import java.awt.FlowLayout;
import java.awt.GridLayout;
import java.awt.Insets;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import javax.swing.Box;
import javax.swing.BoxLayout;
import javax.swing.JButton;
import javax.swing.JCheckBox;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JList;
import javax.swing.JOptionPane;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import d5_io.t7_bio.demo8.utils.Constants;
/**
* Author: luxianghai
* Date: 2022年12月12日 下午3:02:19
* Version: V1.0
* Description: TODO(describle the function of this class in one sentence)
*/
public class ClientChat implements ActionListener {
private JFrame win = new JFrame();
private static int width = 650;
private static int height = 600;
/** 用于显示消息 **/
public static JTextArea smsContent = new JTextArea(23, 10);
/** 用于书写要发送的消息 **/
private JTextArea smsSend = new JTextArea(4, 10);
/** 在线人数区域 **/
public static JList<String> onlineUsers = new JList<String>();
/** 是否私聊按钮 **/
private JCheckBox isPrivateBtn = new JCheckBox("私聊");
/** 发送消息按钮 **/
private JButton sendBtn = new JButton("发送");
/** 取消选中按钮 **/
private JButton clearBtn = new JButton("取消");
/** 登陆界面 **/
private JFrame loginView;
private JTextField nameEt, pwdEt;
private Socket socket;
private void initView() {
smsSend.setMargin(new Insets(5, 5, 5, 5));
smsContent.setMargin(new Insets(5, 5, 5, 5) );
/** 展示登陆界面 **/
displayLoginView();
/** 展示聊天界面 **/
displayChatView();
}
/**
* 展示聊天界面
*/
private void displayChatView() {
win.setSize(width, height);
JPanel bottomPanle = new JPanel(new BorderLayout());
// -----------------------------------------------
// 添加发送按钮到底部 Panel 面板中
bottomPanle.add(smsSend); // 添加消息输入框
JPanel btns = new JPanel(new FlowLayout(FlowLayout.LEFT));
btns.add(sendBtn);
btns.add(isPrivateBtn);
btns.add(clearBtn);
bottomPanle.add(btns, BorderLayout.EAST); // 添加按钮组
// 将消息框和按钮添加到窗口底部
win.add(bottomPanle, BorderLayout.SOUTH);
smsContent.setBackground(new Color(0xdd, 0xdd, 0xdd));
// 添加消息显示框,并使用滚动组件包装
win.add(new JScrollPane(smsContent), BorderLayout.CENTER);
smsContent.setEditable(false);
// ------------------------------------------------
Box rightBox = new Box(BoxLayout.Y_AXIS); // 从上到下的布局方式
onlineUsers.setFixedCellWidth(120);
onlineUsers.setVisibleRowCount(13);
rightBox.add(new JScrollPane(onlineUsers));
win.add(rightBox, BorderLayout.EAST);
win.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
sendBtn.addActionListener(this);
clearBtn.addActionListener(this);
}
/**
* 展示登陆界面
*/
private void displayLoginView() {
loginView = new JFrame("登陆");
loginView.setLayout(new GridLayout(3, 1));
loginView.setSize(400, 230);
loginView.setResizable(false);
JPanel name = new JPanel();
JLabel label = new JLabel("昵称:");
name.add(label);
nameEt = new JTextField(20);
nameEt.setPreferredSize(new Dimension(300, 30));
name.add(nameEt);
loginView.add(name);
JPanel pwd = new JPanel();
JLabel label1 = new JLabel("密码:");
pwd.add(label1);
pwdEt = new JTextField(20);
pwdEt.setPreferredSize(new Dimension(300, 30));
pwd.add(pwdEt);
loginView.add(pwd);
JPanel btnView = new JPanel();
JButton login = new JButton("登陆");
btnView.add(login);
loginView.add(btnView);
login.addActionListener(this);
loginView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
loginView.setVisible(true);
}
@Override
public void actionPerformed(ActionEvent event) {
// 1.获取事件源
JButton btn = (JButton) event.getSource();
switch (btn.getText()) {
case "登陆":
String uname = nameEt.getText().trim();
String pwd = pwdEt.getText().trim();
if (uname.equals("") || pwd.equals("")) {
JOptionPane.showMessageDialog(loginView, "用户名和密码不能为空!");
} else {
try {
// 设置窗口标题
win.setTitle(uname);
// 创建与服务端的连接
socket = new Socket(Constants.ADDR, Constants.PORT);
// 发送数据
Constants.sendMsg(Constants.LOGIN, uname, socket, null);
new ClientThreadHandler(socket).start();
// 关闭登陆窗口
loginView.dispose();
// 展示聊天窗口
displayChatView();
win.setVisible(true);
JOptionPane.showMessageDialog(win, "登陆成功!");
} catch (UnknownHostException e) {
System.out.println("服务器地址错误!");
} catch (IOException e) {
e.printStackTrace();
}
}
break;
case "发送":
try {
int flag = Constants.SEND_TO_ALL;
// 获取发送的消息
String msg = smsSend.getText().trim();
if (msg.equals("")) break;
// 获取选中的用户(决定是否私发的条件之一)
String destUser = onlineUsers.getSelectedValue();
if (destUser != null && !destUser.trim().equals("")) {
destUser = destUser.trim();
// 私发消息还是@消息
flag = isPrivateBtn.isSelected() ? Constants.SEND_TO_ONE : Constants.SEND_TO_ALL;
// 如果是@消息
if (flag == Constants.SEND_TO_ALL) {
msg = "@" + destUser + " " + msg;
} else {
smsContent.append( "【我 -> "+ destUser +"】 " + Constants.getTime() + "\r\n " + msg + "\r\n" );
}
}
// 发送数据
Constants.sendMsg(flag, msg, socket, destUser);
smsSend.setText(null);
onlineUsers.clearSelection();
} catch (IOException e) {
e.printStackTrace();
}
break;
case "取消":
onlineUsers.clearSelection();
break;
}
}
public static void main(String[] args) {
new ClientChat().initView();
}
}
客户端线程类
package d5_io.t7_bio.demo8.client;
import java.io.DataInputStream;
import java.net.Socket;
import java.net.SocketException;
import d5_io.t7_bio.demo8.utils.Constants;
/**
* Author: luxianghai
* Date: 2022年12月12日 下午3:37:06
* Version: V1.0
* Description: TODO(describle the function of this class in one sentence)
*/
public class ClientThreadHandler extends Thread {
private Socket socket;
public ClientThreadHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
// 1.获取输入流,并包装为数据流
DataInputStream dis = new DataInputStream(socket.getInputStream());
while ( true ) {
// 2.读取数据类型
int flag = dis.readInt();
// 3. 读取消息
String msg = dis.readUTF();
if (flag == Constants.LOGIN) {
String[] users = msg.split(Constants.USERS_SPLIT);
ClientChat.onlineUsers.setListData(users);
} else if (flag == Constants.SEND_TO_ALL || flag == Constants.SEND_TO_ONE) {
ClientChat.smsContent.append( msg );
ClientChat.smsContent.setCaretPosition(ClientChat.smsContent.getText().length());
}
}
} catch (SocketException e) {
System.out.println("socket连接异常");
} catch (Exception e) {
e.printStackTrace();
}
}
}
第四章 Java NIO
4.1 NIO 概述
Java NIO (New IO或Non Blocking IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO 支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。
NIO是同步非阻塞的I0,服务器可以使用一个线程来处理多个客户端请求,客户端发送的请求会注册到多路复用器Selector
上,由多路复用器Selector轮询各客户端的请求
并进行处理。
NIO 线程模型
NIO包含三大组件:
- Channel通道:每个通道对应一个buffer缓冲区
- Buffer缓冲区: buffer底层是数组,类似于蓄水池,channel就是水管
- Selector选择器: selector对应一 个或多个线程。channel会注册到selector上,由selector根据channel读写时间的发生交给某个空闲线程来执行。
- Buffer和Channel都是既可读也可写。
4.2 NIO-Channel 详解
Channel 概述
Channel即通道,表示打开IO设备的连接,比如打开到文件、Socket套 接字的连接。在使用NIO时,必须要获取用于连接IO设备的通道以及用于容纳数据的缓冲区。通过操作缓冲区,实现对数据的处理。也就是说数据是保存在buffer缓冲区中的,需要通过Channel来操作缓冲区中的数据。
Channel相比IO流中的Stream更加高效,可以异步双向传输(全双工)。
Channel的主要实现类有以下几个:
- FileChannel: 读写文件的通道
- SocketChannel: 读写TCP网络数据的通道
- ServerSocketChannel: 像web服务器一样, 监听新进来的TCP连接,为连接创建SocketChannel
- DatagramChannel: 读写UDP网络数据的通道
FileChannel
用于读取、写入、映射和操作文件的通道。
文件通道是连接到文件的可搜索字节通道。它在其文件中有一个当前位置,可以查询和修改。文件本身包含可变长度的字节序列,可以读取和写入,并且可以查询其当前大小。当写入的字节超过其当前大小时,文件的大小增加;文件被截断时,其大小会减小。文件还可能具有一些相关联的元数据,如访问权限、内容类型和上次修改时间;此类不定义元数据访问的方法。
除了熟悉的字节通道读、写和关闭操作外,此类还定义了以下文件特定操作:
- 字节可以以不影响通道当前位置的方式在文件中的绝对位置读取或写入。
- 文件的区域可以直接映射到存储器中;对于大型文件,这通常比调用通常的读或写方法更有效。
- 对文件进行的更新可能会被强制输出到底层存储设备,以确保在系统崩溃时数据不会丢失。
- 字节可以从一个文件传输到另一个通道,反之亦然,许多操作系统都可以将其优化为直接从文件系统缓存进行非常快速的传输。
- 文件的一个区域可以被锁定以防止其他程序访问。
多个并发线程使用文件通道是安全的。根据通道接口的指定,可以随时调用close方法。在任何给定时间,只有一个涉及通道位置或可以改变其文件大小的操作正在进行;在第一个 操作仍在进行时尝试发起第二个这样的操作将被阻止,直到第一个操作完成。其他行动,特别是采取明确立场的行动,可以同时进行;它们是否真的这样做取决于底层实现,因此没有具体说明。
FileChannel 读取数据
package d5_io.t8_nio.channel.file;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* Author: luxianghai
* Date: 2022年12月13日 下午11:18:55
* Version: V1.0
* 使用FileChannel通道对读取文件
*/
public class T1_FileChannel_ReadData {
public static void main(String[] args) {
String file = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\a.txt";
try (
// 1.创建随机访问流
RandomAccessFile raf = new RandomAccessFile(file, "rw");
){
// 2.根据随机访问流得到FielChannel对象
FileChannel fc = raf.getChannel();
// 3.创建Buffer并分配其1024个字节,用于存储数据
ByteBuffer bb = ByteBuffer.allocate(1024);
// 4.读取数据,
int len;
// 4.1 read()方法如果返回-1表示以经把数据读取完毕
while ( (len = fc.read(bb)) != -1 ) {
// 4.2 翻转模式
bb.flip();
StringBuilder sb = new StringBuilder();
// 4.3 循环判断是否还有剩余数据 hasRemaining()方法用于判断是否还有数据未读取
while ( bb.hasRemaining() ) {
// 4.4 获取buffer中的数据
byte b = bb.get();
sb.append( (char)b );
}
System.out.println( sb.toString() );
// 4.5 清空buffer中的数据
bb.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
FileChannel 写数据
package d5_io.t8_nio.channel.file;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* Author: luxianghai
* Date: 2022年12月13日 下午11:33:18
* Version: V1.0
* 使用 FileChannel 写数据到指定文件中
*/
public class T2_FileChannel_WriteData {
public static void main(String[] args) {
String dest = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\b.txt";;
try (
// 1.创建随机流访问文件
RandomAccessFile raf = new RandomAccessFile(dest, "rw");
){
// 2.根据随机流获取FileChannel
FileChannel fc = raf.getChannel();
// 3.创建Buffer
ByteBuffer bb = ByteBuffer.allocate(1024);
String data = "你好\r\n你好\r\n世界那么大\r\n我想去看看"; // 数据
// 4.将数据存入Buffer中
bb.put(data.getBytes());
// 5.翻转Buffer
bb.flip();
// 6.写入数据
fc.write(bb);
} catch (Exception e) {
e.printStackTrace();
}
}
}
FileChannel 拷贝文件
使用传统方法拷贝
package d5_io.t8_nio.channel.file;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* Author: luxianghai
* Date: 2022年12月13日 下午11:42:08
* Version: V1.0
* 使用 FileChannel 拷贝文件
*/
public class T3_FileChannel_CopyFile {
public static void main(String[] args) {
String src = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\f.png";
String filePath = src.substring(0, src.lastIndexOf("\\") + 1);
String fullfileName = src.substring(src.lastIndexOf("\\") + 1);
String[] info = fullfileName.split("\\.");
String dest = filePath + "\\" + info[0] + "-copy." + info[1];
try (
// 1.创建随机访问流
RandomAccessFile rafRead = new RandomAccessFile(src, "rw");
RandomAccessFile rafWrite = new RandomAccessFile(dest, "rw");
) {
// 2.根据实际访问流获取FileChannel
FileChannel fcRead = rafRead.getChannel();
FileChannel fcWrite = rafWrite.getChannel();
// 3.创建Buffer
ByteBuffer bb = ByteBuffer.allocate(1024);
// 4. 读数据
while ( (fcRead.read(bb)) != -1 ) {
// 翻转模式
bb.flip();
// 5.写数据
fcWrite.write(bb);
// 6.清空Buffer中的数据
bb.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用通道传输
package d5_io.t8_nio.channel.file;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.RandomAccess;
/**
* Author: luxianghai
* Date: 2022年12月14日 上午7:43:02
* Version: V1.0
* Description: TODO(describle the function of this class in one sentence)
*/
public class T4_FileChannel_TransferFrom {
public static void main(String[] args) {
String src = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\f.png";
String dest = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\f-copy.png";
try (
// 1.创建随机访问流并候取对应的FileChannel
FileChannel sourceFileChannel = new RandomAccessFile(src, "rw").getChannel();
FileChannel targetFileChannel = new RandomAccessFile(dest, "rw").getChannel();
){
// 2.调整sourceFileChannel的position,默认为0
// sourceFileChannel.position(10);
// 3.传输
//targetFileChannel.transferFrom(sourceFileChannel, 0, sourceFileChannel.size());
sourceFileChannel.transferTo(0, sourceFileChannel.size(), targetFileChannel);
} catch (Exception e) {
e.printStackTrace();
}
}
}
FileChannel 方法总结
-
long size():
返回当前通道文件的大小 -
long position():
Returns this channel’s file position. -
FileChannel position(long newPosion):
Sets this channel’s file position. -
long read(ByteBuffer[] dsts):
Reads a sequence of bytes from this channel into the given buffers. -
int write(ByteBuffer src):
Writes a sequence of bytes to this channel from the given buffer. -
void force(boolean metaData):
Forces any updates to this channel’s file to be written to the storage device that contains it. 类似于IO流中的flush()方法 -
long transferFrom(ReadableByteChannel src, long position, long count):
Transfers bytes into this channel’s file from the given readable byte channel. -
long transferTo(long position, long count, WritableByteChannel target):
Transfers bytes from this channel’s file to the given writable byte channel. -
truncate(long size):
Truncates this channel’s file to the given size.
Socket通道介绍
面向流的连接通道。Socket通道用于管理socket和socket之间的通道。Socket通道具有以下特点:
- 可以实现非阻塞,一个线程可以同时管理多个Socket连接, 提升系统的吞吐量。
- Socket通道的实现类(DatagramChannel、 SocketChannel和ServerSocketChannel) 在被实例化时会创建一个对等的Socket对象,也可以从Socket对象中通过getChannel()方法获得对应的Channel。
SocketChannel 通道
ServerSocketChannel
ServerSocketChannel是一个基 于通道的Socket监听器,能够实现非阻塞模式。
ServerSocketChannel的主要作用是用来监听端口的连接,来创建SocketChannel。 也就是说,可以调用ServerSocketChannel的accept方法来创建客户端的SocketChannel对象。
代码演示
package d5_io.t8_nio.channel.socket;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* Author: luxianghai
* Date: 2022年12月14日 上午11:02:13
* Version: V1.0
* ServerSocketChannel的使用
*/
public class T1_ServerSocketChannel {
public static void main(String[] args) {
try (
// 1.创建ServerSocketChannel对象
ServerSocketChannel ssc = ServerSocketChannel.open();
) {
// 2.绑定端口号
ssc.socket().bind(new InetSocketAddress(8888));
// 3.设置为非阻塞模式,默认为阻塞模式
ssc.configureBlocking(false);
while (true) {
System.out.println("等待客户端连接...");
// 当有客户端连接时,创建出SocketChannel对象
SocketChannel socketChannel = ssc.accept();
if (socketChannel != null) {
System.out.println("有客户端连接,客户端地址为:" + socketChannel.socket().getRemoteSocketAddress());
} else {
System.out.println("等待客户端连接...");
Thread.sleep(2000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
SocketChannel
介绍
SocketChannel是连接到TCP网络套接字的通道,更多代表的是客户端的操作。SocketChannel具有 以下特点:
- SocketChannel连接的是Socket套接字,也就是说通道的两边是Socket套接字
- SocketChannel是用来处理网络IO的通道
- SocketChannel是可选择的,可以被多路复用
- SocketChannel基于TCP连接传输
SocketChannel使用细节
-
SocketChannel在使用上需要注意以下细节:
-
不能在已经存在的Socket上再创建SocketChannel
-
SocketChannel需要指明关联的服务器地址及端口后才能使用
-
未进行连接的SocketChannel进行IO操作时将抛出NotYetConnectedException异常
-
SocketChannel支 持阻塞和非阻塞两种模式
-
SocketChannel支持异步关闭。
-
SocketChannel支持设定参数
-
-
创建SocketChannel的两种方法
-
方式一:创建SocketChannel,并指定服务器地址和端口
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));
-
方式二:创建SocketChannel后再指定服务器的地址和端口
SocketChannel socketChannel = SocketChannel.open(); socketChannel.socket().bind(new InetSocketAddress("localhost", 8888));
-
-
SocketChannel的连接状态
- socketChannel.isOpen():判断SocketChannel是否为open状态
- socketChannel.isConnected():判断SocketChannel是否已连接
- socketChannel.isConnectionPending():判断SocketChannel是否正在进行连接
- socketChannel.finishConnect():完成连接,如果此通道已连接,则此方法将不会阻塞,并将立即返回true。如果此通道处于非阻塞模式,则如果连接过程尚未完成,则此方法将返回false。如果此通道处于阻塞模式,则此方法将阻塞,直到连接完成或失败,并且将始终返回true或抛出一个描述失败的检查异常。
-
设置阻塞与非阻塞
// 设置非阻塞 socketChannel.configureBlocking(false);
-
设置参数
// 设置读写缓冲区大小 socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 2048); socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, 1024); // 获取参数 socketChannel.getOption(StandardSocketOptions.SO_SNDBUF);
代码演示
package d5_io.t8_nio.channel.socket;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* Author: luxianghai
* Date: 2022年12月14日 上午11:21:21
* Version: V1.0
* SocketChannel的使用
*/
public class T2_SocketChannel {
public static void main(String[] args) {
try (
// 1.创建SocketChannel,并指定服务器地址和端口
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com", 80));
){
// 2.设置非阻塞模式
socketChannel.configureBlocking(false);
if (socketChannel.isConnectionPending()) {
System.out.println("正在连接...");
// 完成连接
socketChannel.finishConnect();
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(buffer);
if (len == -1) {
System.out.println("没有读到数据");
} else if (len == 0) {
System.out.println("读到空数据");
} else {
System.out.println("读到数据:" + new String(buffer.array(), 0, len));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
DatagramSocketChannel
DatagramChannel对象关联着一个DatagramSocket对象 。
DatagramChannel基于UDP无连接协议,每个数据报都是-个自包含的实体,拥有它自己的目的地址及数据负载。DatagramChannel可以发 送单独的数据报给不同的目的地,同样也可以接受来自于任意地址的数据报。
-
发送方实现
@Test public void sender() throws IOException { // 1. 创建DatagramChannel对象 DatagramChannel datagramChannel = DatagramChannel.open(); // 2. 创建地址对象 InetSocketAddress addr = new InetSocketAddress("localhost", 8888); // 3.创建Buffer ByteBuffer buffer = ByteBuffer.wrap("".getBytes()); // 4.发送数据到指定地址 datagramChannel.send(buffer, addr); }
-
接收方实现
@Test public void receiver() throws IOException { // 1.创建DatagramChannel对象 DatagramChannel datagramChannel = DatagramChannel.open(); // 2.绑定端口号 datagramChannel.bind(new InetSocketAddress(8888)); // 3.创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 4.接收消息并解析 while (true) { buffer.clear(); // 4.1 接收消息并得到消息是从哪里来的 SocketAddress socketAddress = datagramChannel.receive(buffer); // 4.2 翻转模式 buffer.flip(); // 4.3 解析数据 System.out.println(socketAddress + ": " + new String(buffer.array(), 0, buffer.limit())); } }
-
read 和 write
-
读入端:
@Test public void reader() throws IOException { System.out.println("运行..."); // 1.创建DatagramChannel对象 DatagramChannel datagramChannel = DatagramChannel.open(); // 2.绑定端口号 datagramChannel.bind(new InetSocketAddress("127.0.0.2", 8080)); // 为自己绑定IP和端口 datagramChannel.connect(new InetSocketAddress("127.0.0.1", 8081)); // 要与之通信的那个UPD的IP和端口 // 3.创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 4.接收消息并解析 while (true) { buffer.clear(); // 4.1 接收消息并得到消息是从哪里来的 datagramChannel.read(buffer); // 4.2 翻转模式 buffer.flip(); // 4.3 解析数据 System.out.println("收到消息:" + new String(buffer.array(), 0, buffer.limit())); } }
-
写出端:
@Test public void senderWithWrite() throws IOException { // 1. 创建DatagramChannel对象 DatagramChannel datagramChannel = DatagramChannel.open(); // 2. 创建地址对象 datagramChannel.bind(new InetSocketAddress("127.0.0.1",8081)); // 为自己绑定IP和端口 datagramChannel.connect(new InetSocketAddress("127.0.0.2", 8080)); // 要与之通信的那个UPD的IP和端口 // 3.创建Buffer ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes()); // 4.发送数据 datagramChannel.write(buffer); }
-
自己写自己读
@Test public void readAndWrite() throws IOException { // 1.创建DatagramChannel对象 DatagramChannel datagramChannel = DatagramChannel.open(); // 2.绑定端口号 datagramChannel.bind(new InetSocketAddress("localhost" ,8080)); // 为自己绑定IP和端口 datagramChannel.connect(new InetSocketAddress("localhost", 8080)); // 要与之通信的那个UPD的IP和端口 // write datagramChannel.write(ByteBuffer.wrap("又读又写".getBytes())); // 3.创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 4.接收消息并解析 while (true) { buffer.clear(); // 4.1 接收消息并得到消息是从哪里来的 datagramChannel.read(buffer); // 4.2 翻转模式 buffer.flip(); // 4.3 解析数据 System.out.println("收到消息:" + new String(buffer.array(), 0, buffer.limit())); } }
-
-
注意:由于UDP是无连接的,使用connect方法连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。因此即使是连接的地址不存在,也不会报错。
聚集和分散
- 聚集(gather):在写操作时将多个buffer的数据写入同一个Channel。
- 分散(scatter):在读操作时将读取的数据写入多个buffer中。
scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,我们可能需要将消息体和消息头分散到不同的buffer中,这样我们就可以方便的处理消息头和消息体
常用方法
long write(ByteBuffer[] srcs)
long read(ByteBuffer[] srcs)
代码示例:
-
分散代码示例
@Test public void scatter() { String file = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\a.txt"; try ( // 1.创建随机访问流并获取 FileChannel FileChannel fc = new RandomAccessFile(file, "rw").getChannel(); ){ // 2.创建两个buffer ByteBuffer buffer1 = ByteBuffer.allocate(5); ByteBuffer buffer2 = ByteBuffer.allocate(1024); // 3. 创建buffer数组 ByteBuffer[] buffers = new ByteBuffer[] {buffer1, buffer2}; // 4. 读取数据 fc.read(buffers); for (ByteBuffer bb: buffers) { System.out.println(new String(bb.array(), 0, bb.array().length)); } } catch (Exception e) { e.printStackTrace(); } }
-
聚合代码示例
@Test public void gather() { String file = "F:\\Java\\知识\\c.IO流与网络编程\\serverPath\\a.txt"; try ( // 1.创建随机流并获取FileChannel FileChannel fc = new RandomAccessFile(file, "rw").getChannel(); ){ // 2.创建两个buffer ByteBuffer buffer1 = ByteBuffer.wrap("{Content-Type: application/json}".getBytes()); ByteBuffer buffer2 = ByteBuffer.wrap("{sno: 1001, sname: 张三}".getBytes()); // 3.将两个buffer放到数组中 ByteBuffer[] buffers = new ByteBuffer[] {buffer1, buffer2}; // 4.将buffer数组中所有buffer的数据存放到FileChannel中,完成聚集 fc.write(buffers); } catch (Exception e) { e.printStackTrace(); } }
4.3 NIO-Buffer
Buffer概述
Buffer缓冲区实际上是内存中开辟的一块数组空间,用于存放数据。Java NIO中的buffer类提供了对这块数组缓冲区的基本操作。
IO 流的操作面向的是流对象,而NIO操作的数据都是面向Buffer缓冲区的。也就是说,读取数据是通过通道将数据存入到buffer中。写数据是将buffer缓冲区中的数据通过通道写到文件中。
Java NIO提供了所有缓冲区的抽象基类Buffer。Buffer的具体实现类有很多,比如ByteBuffer、 IntBuffer、LongBuffer、CharBuffer、 DoubleBuffer、 FloatBuffer、 HeapByteBuffer、 MappedByteBuffer等等, 这些具体的实现类实际_上是依据Buffer数组中存放数据的数据类型来决定,比如ByteBuffer中存放的是字节数据、LongBuffer中存放的long类型的数据。
Buffer的关键属性
Buffer的三大关键属性: capactity、limit、position
。
- capacity(容量): 缓冲区的容量是它包含的元素数。缓冲区的容量永远不会为负,也永远不会改变。
- limit(限制): limit之后的的数据不可读写。 缓冲区的limit永远不会为负,也永远不会大于其容量。
- 写数据时: limit == capacity
- 读数据时: limit表示可读的数据位置,因此在上次写操作后需要通过flip方法,将position值 赋给limit。
- position(位置):缓冲区的位置是要读取或写入的下一个元素的索引。缓冲区的位置永远不会为负,也永远不会大于其极限。
Buffer的相关方法
修饰符和返回值 | 方法 | 描述 |
---|---|---|
abstract Object | array() | 返回缓冲区中的数组 |
abstract int | arrayOffset() | 返回缓冲区中数组的第一个元素的位置 |
int | capacity() | 返回缓冲区的容量 |
Buffer | clear() | 清空缓冲区中的内容,position置为0,limit置为capacity |
Buffer | flip() | 翻转缓冲区,limit置为position,position置为0。切换为读模式 |
boolean | hasRemaining() | buffer中是否还有可访问元素,即position和limit之间是否还存在元素 |
int | limit() | 返回buffer中limit属性值 |
Buffer | limit(int newLimit) | 设置buffer中limit的值 |
Buffer | mark() | 标记buffer中当前position的值,记录在mark属性中 |
Buffer | reset() | 将position的值为mark属性记录的值 |
int | position() | 返回buffer的position属性值 |
Buffer | position(int newPosition) | 设置buffer中position的值 |
int | remaining() | 返回buffer中的可访问元素个数,即position和limit之间的元素个数 |
Buffer | rewind() | 将position置为0,limit不变 |
Buffer实现类的相关方法
以ByteBuffer为例
-
创建Buffer的三种方式
-
方式一:allocate(容量)
ByteBuffer buffer = ByteBuffer.allocate(1024);
-
方式二:allocateDirect(容量)
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
-
方式三:wrap(数据)
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
-
-
向Buffer中写数据
-
put(数据)
buffer.put("hello".getBytes());
-
wrap(数据)
-
channel.read(buffer): 将通道中的数据写道buffer中
-
-
读取Buffer中的数据
- get相关方法:获取当前position或指定position的数据
- array(): 返回buffer中整个数组内容
- channel.write(buffer): 将buffer中的数据写到通道中
Buffer缓冲区的分类
-
子缓冲区
可以为Buffer创建子缓冲区,在现有缓冲区上分割出一块空间作为新的缓冲区。原缓冲区和子缓冲区共享同一片数据空间。
通过调用slice方法创建子缓冲区。
-
只读缓冲区
通过buffer的asReadOnlyBuffer()方法获得一个新的只读缓冲区, 所谓的只读缓冲区就是只能读不能写。只读缓冲
区与原缓冲区共享同一片数据空间,原缓冲区数据发生改变,只读缓冲区也能看到变化后的数据,因为它们共享
同一片存储空间。 -
直接缓冲区
直接缓冲区,Java虚拟机将尽最大努力直接对其执行本机I/O操作。也就是说,它将试图避免在每次调用底层操作系统的本机I/O操作之前(或之后) ,将缓中区的内容复制到中间缓冲区(或从中间缓冲区复制)。
可以通过调用此类的allocateDirect工厂方法来创建直接字节缓冲区。此方法返回的缓冲区通常比非直接缓冲区具有更高的分配和释放成本。直接缓冲区的内容可能位于正常垃圾收集堆之外,因此它们对应用程序内存占用的影响可能不明显。
-
MappedByteBuffer采用direct buffer的方 式读写文件内容,这种方式就是内存映射。这种方式直接调用系统底层的缓存,没有JVM和系统之间的复制操作,所以效率非常高,主要用于操作大文件。
通过FileChannel的map方法得到MappedByteBuffer
, MappedByteBuffer把磁 盘中文件的内容映射到计算机的虚拟内存中,操作MappedByteBuffer 直接操作内存中的数据,而无需每次通过IO来读取物理磁盘中的文件,效率极高。File file = new File("demo.txt"); FileChannel channel = new RandomAccessFile(file, "rw").getChannel(); MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, fle.length()); while (buffer.hasRemaining()) { System.out.print((char)buffer.get()); }
4.4 NIO-Selector
Selector 概述
Selector选择器,也可以称为多路复用器。它是Java NIO的核心组件之一,用于检查一个或多个Channel的状态是否处于可读、可写、可连接、可接收等。通过一个Selector选择器管理多个Channel,可以实现一个线程管理多个Channel对应的网络连接。使用单线程管理多个Channel可以避免多线程的线程上下文切换带来的额外开销。
SelectableChannel 可选择通道
只有SelectableChannel才能被Selector管理,比如所有的Socket通道。比如FileChannel并没有继承SelectableChannel,因此不能被Selector管理。
事件注册与监听
Channel通过注册的方式关联Selector。一个Channel可以注册到多个Selector上,但在某一个Selector上只能注册一次。注册时需要告知对通道的哪个操作感兴趣。
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
}
Channel注册到Selector后,由Selector来轮询监听所有被注册到该Selector的Channel所触发的事件,这个时候我们就需要关心Selector应该监听Channel触发的哪些事件。
Selector能监听的事件如下:
- SelectionKey.OP_READ(1):读事件,当被轮询的Channel读缓冲区有数据可读时触发;客户端和服务端的SocketChannel都可以监听Read事件,但ServerSocketChannel不可以监听读事件
- SelectionKey.OP_WRITE(4):可写事件,当被轮询的Channel写缓冲区有空闲空间时触发。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。(
一般不注册写事件
) - SelectionKey.OP_CONNECT(8):连接事件,当被轮询到Channel成功连接到其他服务器时触发;只有客户端的SocketChannel才可以监听Connect事件,因为只有客户端的SocketChanel才可以连接其他Channel(即连接服务端)。
- SelectionKey.OP_ACCEPT(16):接收事件,当被轮询到Channel接受到新的连接时触发;只有ServerSocketChannel才可以注册Accept事件,因为只有ServerSocketChannel才可以接收其他Channel的请求。
可以同时为Channel注册多个事件,多个事件之间使用|分隔即可,如:
channel.register(selection, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
选择器会查询每个一个channel的操作事件,如果是该channel注册的操作已就绪,则进行响应。注意,这里channel的操作指的是channel完成某个操作的条件,表示该channel对于 该操作已处于就绪状态。比如ServerSocketChannel已准备好接收新的连接,那么它注册的SelectionKey.OP_ACCEPT
操作就处于就绪状态。又比如SocketChannel已准备好去连接Server服务器,那么它注册的SelectionKey.OP_CONNECT
操作就处于就绪状态。于是Selector就可以触发之后的动作。
并不是所有的Channel都支持注册所有的事件,下表描述各种 Channel 允许注册的操作类型,Y 表示允许注册,其中服务器 SocketChannel 指由服务器 ServerSocketChannel.accept()返回的对象。
项目 | OP_READ | OP_WRITE | OP_CONNECT | OP_ACCEPT |
---|---|---|---|---|
服务端 ServerSocketChannel | Y | |||
服务端 SocketChannel | Y | Y | ||
客户端 SocketChannel | Y | Y | Y |
-
判断状态的方法:
-
public Set selectedKeys():获取Selector监听到的事件集合。一个SelectionKey对象代表一个监听事件;
SelectionKey监听具体事件的方法: -
public boolean isAcceptable():监听接收事件,当客户端连接到服务端时,服务端的接收事件将被触发;
-
public boolean isConnectable():监听连接事件,当客户端连接到服务端时,客户端的连接事件将被触发;
-
public boolean isReadable():监听读事件,当客户端向服务端写出数据时,服务端的SocketChannel将触发可读数据;
-
public boolean isWritable():监听写事件,当被轮询的Channel写缓冲区有空闲空间时触发(一般情况下都会触发)
-
public boolean isValid():判断当前这个通道是否是有效的;
-
SelectionKey选择键
SelectionKey封装了Channel和注册的操作。
当Selector调用select()方法时,会轮询所有注册在它身上的Channel,查看是否有处于某个操作(已注册到selector上)的就绪状态的Channel, 然后把这些Channel放入到SelectionKey的集合中。
在Java NIO中,每个Channel都有一个唯一的SelectionKey,用于标识该Channel。 SelectionKey是一个对象,它包含有关与该Channel相关的信息,例如通道的当前状态(如是否可读/写),以及与该Channel关联的Selector。当Channel和Selector一起使用时,SelectionKey用于在Selector中标识特定的Channel,并用于检索有关Channel的信息。
Selector的使用
-
创建Selector
Selector selector = Selector.open();
-
将Channel注册到Selector上
Channel必须处于非阻塞模式才能注册到Selector上
channel.register(selector, SelectionKey.OP_READ);
-
Selector轮询就绪状态的Channel
Selector通过调用select方法轮询已就绪的通道操作。select方法是阻塞的,直到至少有一个通道的注册操作已就绪。当完成select方法调用后,被选中的已就绪的所有channel通过Selector的selectedKeys()方法获得,该方法获得到的是一个SelectionKey集合, 其中每一个SelectionKey都表示一 个Channel。 于是可以根据SelectionKey的注册操作来做具体的业务处理。
// 阻塞等待某个操作就绪状态的channel selector.select(); // 获取一个集合,该集合包含了多个触发就绪状态的SelectionKey,而每个SelectionKey中都包含着一个Channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator();
select()方法会返回已经准备就绪的通道数量,该方法会阻塞到至少有一个通道在你注册的事件上就绪了
-
Set<SelectionKey> key()
: 获取注册到Selector上的所有Channel -
Set<SelectionKey> selectedKey()
: 获取Selector上处于就绪状态的Channel
案例实现
服务端的实现
package d5_io.t8_nio.t1_selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @author: lxh
* @date: 2022/12/15 19:55
* @version: 1.0
*/
public class NIOServer {
public static void main(String[] args) {
try (
// 创建ServerSocketChannel对象
ServerSocketChannel ssc = ServerSocketChannel.open();
){
// 设置该Channel为非阻塞的
ssc.configureBlocking(false);
// 为服务端绑定端口
ssc.socket().bind(new InetSocketAddress(8080));
// 创建Selector多路复用器
Selector selector = Selector.open();
// 将服务端的Channel注册到Selector上,并告知Selector我们的Channel对客户端的连接事件感兴趣
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待事件的发生,一旦有事件发生就和结束阻塞状态
selector.select();
// 获取就绪事件的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 获取SelectionKey,一个Channel对应着一个SelectionKey
SelectionKey selectionKey = iterator.next();
handle(selectionKey);
// 移除当前就绪事件,防止事件被重复处理
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void handle(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) { // 如果是客户端的请求连接...
System.out.println("有客户端连接了...");
// 获取当前服务端的Channel
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
// 接收客户端的连接请求,此时就会建立其一个服务端与客户端通信的Channel(accept方法是阻塞的)
SocketChannel socketChannel = ssc.accept();
// 设置非阻塞
socketChannel.configureBlocking(false);
// 把客户端的Channel注册到Selector上,关心读事件
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 如果是客户端的写事件发生
System.out.println("有客户端向服务端写数据...");
// 获取服务端和客户端通信的Channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 通过SocketChannel将客户端写过来的数据读到buffer中
int len = 0;
StringBuilder msg = new StringBuilder();
// read本身就是非阻塞的,read方法执行的时候一定是客户端执行了写操作,在前面if语句中的configureBlocking方法中设置的
while ( (len = socketChannel.read(buffer)) > 0 ) {
msg.append( new String(buffer.array(), 0, len ) );
// 清空缓冲区,即position置为0,limit置为capacity
buffer.clear();
}
System.out.println("接收到客户端数据:" + msg.toString());
// 向客户端发送数据
socketChannel.write( ByteBuffer.wrap("你好,我是服务端".getBytes()) );
// 监听下一次事件
selectionKey.interestOps(SelectionKey.OP_READ);
} else if (selectionKey.isWritable()) {
System.out.println("服务端:写就绪...");
} else if (!selectionKey.isValid()) {
System.out.println("服务端:无效事件...");
}
}
}
客户端的实现
package d5_io.t8_nio.t1_selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author: lxh
* @date: 2022/12/15 21:51
* @version: 1.0
* 客户端
*/
public class NIOClient {
public static void main(String[] args) {
try (
// 创建SocketChannel
SocketChannel sc = SocketChannel.open();
) {
// 设置非阻塞
sc.configureBlocking(false);
// 连接到服务端
sc.connect(new InetSocketAddress("localhost", 8080));
// 创建Selector多路选择器
Selector selector = Selector.open();
// 将当前客户端的Channel注册到Selector上,并告知对连接事件感兴趣
sc.register(selector, SelectionKey.OP_CONNECT);
while (true) {
// 阻塞等待,当有注册过的事件发生时就会解除阻塞状态
selector.select();
// 获取所有的就绪事件,每一个SelectionKey都对应着一个SocketChannel
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历就绪事件集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
handle(selectionKey);
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void handle(SelectionKey selectionKey) throws IOException {
// 获取当前和服务端通信的Channel
SocketChannel sc = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()) { // 连接就绪
if (sc.isConnectionPending()) { // 如果正在与服务端连接
// 完成与客户端的连接
sc.finishConnect();
// 设置成非阻塞模式
sc.configureBlocking(false);
// 把当前与服务端建立起来的SocketChannel注册到Selector上,并告知对Read事件感兴趣
sc.register(selectionKey.selector(), SelectionKey.OP_READ);
// 写数据到服务端
sc.write( ByteBuffer.wrap("hello server".getBytes()) );
}
} else if (selectionKey.isReadable()) { // 读就绪
// 创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取来自服务端的数据
int len = 0;
StringBuilder msg = new StringBuilder();
while ( (len = sc.read(buffer)) > 0 ) {
msg.append( new String(buffer.array(), 0, len) );
// 清空缓冲区,即position置为0,limit置为capacity
buffer.clear();
}
System.out.println("收到服务端消息:" + msg.toString());
}
}
}
注:Selector只会监听被注册过的事件
4.5 NIO包中的其他类
Pipe
Java NIO包中提供了Pipe类,用来实现两个线程之间的单向数据连接。Pipe类中有两个Channel,分别是SinkChannel和SourceChannel。
- SinkChannel:线程将数据写入到SinkChannel
- SourceChannel:线程从SourceChannel中获取数据
具体的两个线程通过Pipe管道实现数据传输的例子如下:
package d5_io.t8_nio.t3_other;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
/**
* @author: lxh
* @date: 2022/12/16 13:35
* @version: 1.0
* 测试Pipe类来进行线程间的数据传输
*/
public class PipeDemo {
public static void main(String[] args) throws IOException {
Pipe pipe = Pipe.open();
new Thread1(pipe).start();
new Thread2(pipe).start();
}
}
// 写线程
class Thread1 extends Thread {
private Pipe pipe;
public Thread1(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try {
System.out.println("写...");
// 获得SinkChannel
Pipe.SinkChannel sinkChannel = pipe.sink();
// 向SinkChannel中写数据
sinkChannel.write(ByteBuffer.wrap("hell, I'm Pipe".getBytes()));
sinkChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 读线程
class Thread2 extends Thread {
private Pipe pipe;
public Thread2(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
System.out.println("读...");
try {
// 获得SinkChannel
Pipe.SourceChannel sourceChannel = pipe.source();
// 读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder msg = new StringBuilder();
int len = 0;
while ( (len = sourceChannel.read(buffer)) > 0 ) {
msg.append( new String(buffer.array(), 0, len) );
buffer.clear();
}
System.out.println(Thread.currentThread().getName() + ": " + msg.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
FileLock
FileLock表示文件锁,通过JVM进程对要操作的文件上锁,在同一时间只允许一个进程访问上锁的文件。FileLock文件锁是进程级别,在同一个进程中的多个线程可以同时对文件进行操作。
FileLock文件锁又分成了排它锁和共享锁。
-
排它锁:只允许获得锁的进程对文件进行读写。获取排它锁有四种方式:
-
方式一:
// 阻塞式的,如果锁被其他进程持有,则阻塞等待其他进程释放锁,直到拿到锁 FileLock fileLock = fileChannnel.lock();
-
方式二:
// 和方式一一样,不过此种方式锁的是文件中指定区间的内容 FileLock fileLock = fileChannel.lock(0, file.length(), false);
-
方式三:
// 非阻塞式的,如果拿不到锁,则返回null FileLock fileLock = fileChannel.tryLock();
-
方式四:
// 非阻塞式的,如果拿不到锁,则返回null FileLock fileLock = fileChannel.tryLock(0, file.length(), false);
-
-
共享锁:所有获得共享锁的进程可以读文件,但不能写文件。获取共享锁的两种方式:
-
方式一:
// 阻塞式的 FileLock fileLock = fileChannel.lock(0, file.length(), true);
-
方式二:
// 非阻塞式的,如果拿不到锁,则返回null FileLock fileLock = fileChannel.tryLock(0, file.length(), true);
-
Path
ath是一个接口,它用来表示文件系统的路径,可以指向文件或文件夹。也有相对路径和绝对路径之分。Path是在Java 7中新添加进来的。Path接口在java.nio.file包下,所以全称是java.nio.file.Path;Path对象中存在很多与路径相关的功能方法,如获取根路径、获取父路径、获取文件名、拼接路径、返回绝对路径等操作;
-
如何创建Path:通过Paths类的get方法创建Path
static Path get(String first, String... more)
Path path = Paths.get("1.txt");
-
Path获取的相关方法:
Path getName(int index)
:返回此路径的名称元素作为 Path对象(文件所在盘符不算元素)。int getNameCount()
:返回路径中的名称元素的数量(文件所在盘符不算元素)。Path getFileName()
:将此路径表示的文件或目录的名称返回为Path对象。Path getParent()
:返回父路径,如果此路径没有父返回null,如:相对路径Path getRoot()
:返回此路径的根(盘符)作为Path对象,如果此路径没有根返回null,如:相对路径
-
路径包含的方法:
boolean endsWith(Path other)
:是否以给定的路径结束。boolean endsWith(String other)
:是否以给定的路径结束。boolean startsWith(Path other)
:是否以给定的路径开始。boolean startsWith(String other)
:是否以给定的路径开始。
-
将Path转为File
使用 java.io.file类的
File toPath()
方法即可 -
将File转为Path:调用java.nio.file.Path接口的以下方法:
File toFile()
:返回表示此路径的File对象。static Path of(String first, String... more)
:构建一个Path对象;
Files
NIO包中的Files类提供了操作文件及目录的一些常用方法。Files需要和Path一起使用。
-
创建目录的方法:
Files.createDirectory(Path path)
Files.createDirectory(Path.of("myDir"));
-
创建文件的方法:
Files.createFile(Path filePath)
-
拷贝文件:
Files.copy(Path srcPath, Path destPath)
Path srcPath = Path.get("1.txt"); Path destPath = Path.get("2.txt"); // 如果文件已存在,报错 //Files.copy(srcPath, destPath); // 如果文件已存在,则替换 Files.copy(srcPath, destPath, StandardCopyOption.REPLACE_EXISTING);
-
文件移动:
Files.move(Path src, Path target)
-
删除文件/目录
Files.delete(Path path)
Files.deleteIfExists(Path path)
-
读取相关方法
返回值 | 方法 | 描述 |
---|---|---|
List | Files.readAllLines(Path path) | 读取所有的行,以List形式返回,一行就是一个String类型的元素。只能是文本文件 |
List | Files.readAllLines(Path path, Charset charset) | 可指定解码字符集 |
byte[] | Files.readAllBytes(Path path) | 读取文件所有数据,以字节数组形式返回。文件可以是任意类型。 |
- 写入相关方法:
返回值 | 方法 | 描述 |
---|---|---|
Path | Files.write(Path path, byte[] bytes) | 写入一个byte[] |
-
遍历目录
通过
static Path walkFileTree(Path path, FileVisitor fv)
方法可以实现目录的遍历,通过提供FileVisitor接口的实现类对象来告知遍历文件的具体措施。其中通过返回的FileVisitResult
告知遍历每一个文件时的具体 下一步动作是什么,比如:继续(CONTINUE)、终止(TERMINATE) 、跳过同级(SKIP_SIBLING) 、跳过子级(SKIP_SUBTREE)。示例:
package d5_io.t8_nio.t3_other; import java.io.IOException; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; /** * @author: lxh * @date: 2022/12/16 18:47 * @version: 1.0 * Files类中的使用 */ public class FilesDemo { public static void main(String[] args) throws IOException { Path srcPath = Paths.get("resource/"); Files.walkFileTree(srcPath, new SimpleFileVisitor<Path>() { /** * 访问成功 * @param file 文件 * @param attrs * @return * @throws IOException */ @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { String fileName = file.getFileName().toString(); if ("1.txt".equals(fileName)) { // 找到 1.txt 这个文件,打印路径 System.out.println("find 1.txt: " + file.toAbsolutePath()); return FileVisitResult.TERMINATE; } return FileVisitResult.CONTINUE; } }); } }
4.6 NIO 案例:聊天室
源码地址:点击传送
-
工具类
package d5_io.t8_nio.t4_chat; import java.util.Scanner; /** * @author: lxh * @date: 2022/12/16 22:51 * @version: 1.0 */ public class ChatUtils { public static String input(String tip) { Scanner in = new Scanner(System.in); System.out.print(tip); while (in.hasNextLine()) { String msg = in.nextLine(); if ( msg.trim().length() > 0 ) { return msg; } } return null; } }
-
服务端实现
package d5_io.t8_nio.t4_chat.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Set; /** * @author: lxh * @date: 2022/12/16 21:28 * @version: 1.0 * 服务端 */ public class ChatServer { public static void main(String[] args) { try ( // 创建ServerSocketChannel ServerSocketChannel ssc = ServerSocketChannel.open(); ) { // 绑定端口 ssc.socket().bind(new InetSocketAddress(8080)); // 设置非阻塞 ssc.configureBlocking(false); // 创建Selector多路选择器 Selector selector = Selector.open(); // 将服务端的Channel注册到Selector上,并告知对客户端的连接事件感兴趣 ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务端启动成功..."); // 循环等待事件发生 while (true) { // 阻塞等待,有就绪事件是接收阻塞状态 int select = selector.select(); if (0 == select) continue; // 获取所有就绪状态的Channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 获取当前SelectionKey SelectionKey selectionKey = iterator.next(); // 处理就绪事件 handle(selectionKey); // 移除当前就绪Channel,防止重复处理 iterator.remove(); } } } catch (Exception e) { e.printStackTrace(); } } /** * 处理的就绪就绪事件 * @param selectionKey */ private static void handle(SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()) { handleAccept(selectionKey); } if (selectionKey.isReadable()) { handleRead(selectionKey); } } /** * 处理读就绪事件 * @param selectionKey * @throws IOException */ private static void handleRead(SelectionKey selectionKey) throws IOException { // 获取Selector Selector selector = selectionKey.selector(); // 获取当前客户端和服务端通信的Channel SocketChannel sc = (SocketChannel) selectionKey.channel(); // 设置非阻塞 sc.configureBlocking(false); // 创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取数据 StringBuilder msg = new StringBuilder(); int len = 0; while ( (len = sc.read(buffer)) > 0 ) { msg.append( new String(buffer.array(), 0, len) ); // 清空缓冲区,即position置为0,limit置为capacity buffer.clear(); } if (msg.length() > 0) { System.out.println(sc.getRemoteAddress() + ": " + msg); // 将消息广播到自己除外的所有客户端 broadcastMessage(selectionKey, msg.toString()); } sc.register(selector, SelectionKey.OP_READ); } /** * 广播消息到自己除外的所有客户端 * @param key * @param msg 消息 * @throws IOException */ private static void broadcastMessage(SelectionKey key, String msg) throws IOException { // 获取Selector Selector selector = key.selector(); // 获取所有注册到Selector上的客户端Channel Set<SelectionKey> keys = selector.keys(); // 遍历 Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 获取当前Channel SelectableChannel channel = selectionKey.channel(); // 如果是SocketChannel并且当前SocketChannel不是发送消息过来的那个人 if (channel instanceof SocketChannel && channel != key.channel()) { // 广播消息 SocketChannel sc = (SocketChannel) channel; sc.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))); } } } /** * 处理连接就绪事件 * @param selectionKey * @throws IOException */ private static void handleAccept(SelectionKey selectionKey) throws IOException { // 获取Selector Selector selector = selectionKey.selector(); // 获取服务端Channel ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel(); // 接受客户端的连接请求 SocketChannel sc = ssc.accept(); // 设置非阻塞 sc.configureBlocking(false); // 将客户端Channel注册到Selector上,并告知对读事件感兴趣 sc.register(selector, SelectionKey.OP_READ); // 写一个数据到客户端 sc.write(ByteBuffer.wrap("欢迎使用大海聊天室...".getBytes(StandardCharsets.UTF_8))); } }
-
客户端实现
package d5_io.t8_nio.t4_chat.client; import d5_io.t8_nio.t4_chat.ChatUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * @author: lxh * @date: 2022/12/16 21:29 * @version: 1.0 * 客户端 */ public class ChatClient { public static void main(String[] args) { try ( // 创建SocketChannel SocketChannel sc = SocketChannel.open(new InetSocketAddress("localhost", 8080)); ){ // 设置非阻塞 sc.configureBlocking(false); // 创建Selector Selector selector = Selector.open(); // 将当前Channel注册到selector上,并告知对读事件感兴趣 sc.register(selector, SelectionKey.OP_READ); Scanner in = new Scanner(System.in);; // 创建一个独立线程进行数据发送 new ClientSendThread(ChatUtils.input("输入用户名:"), sc).start(); // 循环等待事件发生 while (true) { // 阻塞等待,如果有就绪事件,则接收阻塞状态 int select = selector.select(); // 获取就绪状态的所有Channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); // 处理就绪事件 handle(selectionKey); // 移除当前就绪事件,避免重复处理 iterator.remove(); } } } catch (Exception e) { e.printStackTrace(); } } /** * 处理所有就绪事件 * @param selectionKey */ private static void handle(SelectionKey selectionKey) throws IOException { if (selectionKey.isReadable()) { // 读就绪 handleRead(selectionKey); } } /** * 处理读就绪事件 * @param selectionKey */ private static void handleRead(SelectionKey selectionKey) throws IOException { // 获取Selector Selector selector = selectionKey.selector(); // 获取当前Channel SocketChannel sc = (SocketChannel) selectionKey.channel(); // 创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取 StringBuilder msg = new StringBuilder(); int len = 0; while ( (len = sc.read(buffer)) > 0 ) { msg.append( new String(buffer.array(), 0, len) ); // 清除缓冲区数据 buffer.clear(); } if (msg.length() > 0) { System.out.println("收到消息:" + msg.toString()); } } }
-
客户端发送数据线程
package d5_io.t8_nio.t4_chat.client; import d5_io.t8_nio.t4_chat.ChatUtils; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; /** * @author: lxh * @date: 2022/12/16 22:19 * @version: 1.0 */ public class ClientSendThread extends Thread { private String name; private SocketChannel socketChannel; public ClientSendThread(String name, SocketChannel socketChannel) { this.name = name; this.socketChannel = socketChannel; } /** * 处理发送消息的业务 */ @Override public void run() { try { // 创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 输入待发送消息 while (true) { String msg = ChatUtils.input(""); if (msg != null) { msg = name + ": " + msg; socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))); } } } catch (Exception e) { e.printStackTrace(); } } }
第五章 Java AIO
5.1 AIO 概述
Asynchronous I0也称为AlO,即异步非阻塞I0。Java7提供了改进版的NIO,引入了异步非阻塞的I0,由操作系统完成后回调通知服务端程序启动线程去处理。一 般适用于连接数较多且连接时间较长的应用。
异步,就是在对文件读写的过程中,程序不需要等待文件读写完毕,就可以直接执行之后的代码,可以设置监听程序执行读写完毕之后的操作(写在回调函数中),当程序完成读写后会回去执行这个回调函数,这样就完成了异步读写。
5.2 AsynchronousFileChannel
在Java7中加入了支持异步操作的文件通道AsynchronousFileChannel,首先要理解什么是异步操作。所谓的异步,就是在对文件读写的过程中,程序不需要等待文件读写完毕,就可以直接执行之后的代码。可以设置监听程序执行读写完毕之后的操作。
-
创建异步文件通道
Path path = Paths.get("resource/1.txt"); AsynchronousFileChannel sfc = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
参数一是要操作的文件,参数二是要执行的操作
-
读取数据的方式一:获得Future类来读
Path path = Paths.get("resource/1.txt"); AsynchronousFileChannel sfc = AsynchronousFileChannel.open(path, StandardOpenOption.READ); ByteBuffer buffer = ByteBuffer.allocate(1024); // 数据读到buffer中,从0位置处开始读 Future<Integer> future = sfc.read(buffer, 0); // 如果没有读完就阻塞等待 while (!future.isDone()); // 到了这里,程序读完 buffer.flip(); // 打印数据 System.out.println(new String(buffer.array(), 0, buffer.limit()));
-
读取方式二:使用异步监听方式
当读取完成后,会调用CompletionHandler对 象的completed方法,在方法中做读取数据完毕后的操作。
Path path = Paths.get("resource/1.txt"); AsynchronousFileChannel sfc = AsynchronousFileChannel.open(path, StandardOpenOption.READ); ByteBuffer buffer = ByteBuffer.allocate(1024); sfc.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { /** * 成功读取所有数据后执行的方法 * @param result 成功读到了多少字节的数据 * @param attachment 读取到的数据 */ @Override public void completed(Integer result, ByteBuffer attachment) { System.out.println("result: " + result); attachment.flip(); System.out.println(new String(attachment.array())); } /** * 读取失败 * @param exc * @param attachment */ @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("failed..."); } }); // 为了看到结果,使线程休眠一会 Thread1.sleep(1000L);
-
写入数据方式一:使用Future类写入
通过异步的写数据,返回Future对象, 写数据过程中立即返回Future对象,即时写操作还未完成。可以通过Future对象中的isDone()方法得知写操作是否已完成。
Path path = Paths.get("resource/11.txt"); AsynchronousFileChannel sfc = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); ByteBuffer buffer = ByteBuffer.wrap("hello write async channel".getBytes()); // 写数据 Future<Integer> future = sfc.write(buffer, 0); // 等待写完 while (!future.isDone()); // 写入完成 System.out.println("写入完成...");
-
写入数据方式二:监听方式
Path path = Paths.get("resource/11.txt"); AsynchronousFileChannel sfc = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE); ByteBuffer buffer = ByteBuffer.wrap("hello write async channel method 2".getBytes()); // 写数据 sfc.write(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { /** * 数据写入成功后执行的回调函数 * @param result 写入数据的字节数 * @param attachment 写入的数据 */ @Override public void completed(Integer result, ByteBuffer attachment) { System.out.println("result: " + result); } /** * 数据写入失败后执行的回调函数 * @param exc * @param attachment */ @Override public void failed(Throwable exc, ByteBuffer attachment) { } }); // 为了看到结果,使线程休眠一会 Thread1.sleep(1000L);
5.3 AsynchronousSocketChannel
学了上面的AsynchronousFileChannel之后,AsynchronousSocketChannel就很好理解了
- 服务端实现
package d5_io.t9_aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @author: lxh
* @date: 2022/12/17 8:11
* @version: 1.0
* AIO服务端实现
*/
public class AIOServer {
public static void main(String[] args) {
try (
// 创建Channel
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
){
// 绑定端口
ssc.bind(new InetSocketAddress(8080));
// 异步接收客户端连接
ssc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
/**
* 接收到客户端连接执行的方法
* @param socketChannel
* @param attachment
*/
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
try {
// 接收客户端连接请求
ssc.accept(attachment, this);
System.out.println("客户端连接: " + socketChannel.getRemoteAddress());
// 创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读取客户端的数据
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
/**
* 服务端读就绪
* @param result 读取到数据的字节数
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("收到客户端消息: " + new String(attachment.array(), 0, result));
// 向客户端返回消息
socketChannel.write(ByteBuffer.wrap("hello client".getBytes()));
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
System.out.println("server main thread...");
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 客户端实现
package d5_io.t9_aio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
/**
* @author: lxh
* @date: 2022/12/17 8:11
* @version: 1.0
* AIO 客户端实现
*/
public class AIOClient {
public static void main(String[] args) {
try (
// 创建客户端Channel
AsynchronousSocketChannel sc = AsynchronousSocketChannel.open();
){
// 连接到服务端, get()方法会阻塞等待,直到与服务端连接成功
sc.connect(new InetSocketAddress("localhost", 8080)).get();
// 发送数据
sc.write(ByteBuffer.wrap("hello aio server".getBytes()));
// 创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sc.read(buffer).get()) > 0) {
System.out.println("服务端返回的消息: " + new String(buffer.array(), 0, len));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2022/12/17