文章目录
- 介绍
- 代码设计
- 代码参考
- 客户端代码
- 服务器端代码
- 测试实例
介绍
使用HTTP协议进行数据流式传输是一种常见的方法。对于大文件数据传输可以使用HTTP的chunked编码或使用多部分响应来实现数据流式传输。
【HTTP的chunked编码】在发送数据的服务中,可以将数据切分为较小的块,并使用HTTP的chunked编码将这些块发送给接收数据的服务。接收数据的服务在接收到每个块时可以进行相应的处理。
【使用多部分响应】发送数据的服务可以将数据分割为多个部分,并使用多部分响应将这些部分作为独立的消息发送给接收数据的服务。接收数据的服务可以逐个处理这些部分。
使用流式传输时,您需要确保发送数据的服务和接收数据的服务都能处理流式数据,并且网络环境和服务器配置允许流式传输。
代码设计
【1】本次实现的是大文件的接口传输,大约2G,但是在项目中为了避免占用内存过大影响其他接口,所以测试代码限制了总体的项目的JVM参数为-Xms100m -Xmx100m
【2】客户端读取本地大文件,因为读取的文件过大,如果一次性读取到内存会OOM异常,所以使用了BufferedInputStream读取,当从输入流中读取数据时,BufferedInputStream 会一次性读取更多的数据,并将其存储在内部缓冲区中。然后,当从 BufferedInputStream 中读取数据时,它将从缓冲区中返回数据,而不是每次都直接从底层输入流中读取数据。通过使用缓冲区,BufferedInputStream 可以减少对底层输入流的直接读取次数,从而提高读取性能。更大的缓冲区大小通常意味着更少的 I/O 操作,但也会占用更多的内存。bufferSize 参数用于指定 BufferedInputStream 内部缓冲区的大小,影响读取操作的性能和内存占用。
【3】本次测试程序中读取缓存大小和发送数据块的大小设置都是5M左右,具体设置可以根据项目情况决定。
【4】客户端每读取完一次缓冲区数据后就发送给服务端接口。
【5】设置ContentType为application/octet-stream。application/octet-stream 是一种通用的 MIME 类型,用于表示未知的二进制数据流。它是一个字节流的二进制数据类型,没有指定具体的数据格式或文件类型。使用 application/octet-stream MIME 类型时,接收方可能需要根据其他信息(例如文件扩展名、附加的元数据等)来推断数据的实际类型和处理方式。
【6】因为需要传输多个数据块,客户端需要传递一个文件的唯一标识,用户服务端接口将不同的客户端发来的数据进行匹配和追加写入。
【7】服务端接口接收文件,根据传入的唯一标识,当前代码通过文件名标识的,决定是否创建不存在的文件。
【8】如果插入的文件已存在,则表示当前传入的是该文件的后续数据块,则进行追加写入,此处使用了RandomAccessFile,然后将文件指针移到末尾写入数据块。
【9】依次执行完上述操作后,完成写入,上面的主要存在的问题是对于大文件存在网络异常或者其他原因导致某一个块文件传输失败,对于这个文件客户端和服务端可以进行数据块索引的匹配验证,在传输后进行验证,如果传输没有问题则成功,如果失败可以选择全部重新传输,或者仅传输失败的那一个编号的数据块。
代码参考
客户端代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.io.*;
import java.util.UUID;
/**
* 后台服务接口间大文件的流式发送和读取
*
* @author zhangyu
* @date 2023/6/11
*/
@Slf4j
@RequestMapping("/stream")
@RestController
public class DataStreamClientController {
public static final String SEND_FILE="/Users/zhangyu/code/test_data/demo2.mp4";
/**
* 每个块的大小
*/
private static final int CHUNK_SIZE = 5242880;
@GetMapping("/split/send")
public String sendFileBySplit() throws IOException {
sendDataToService();
return "ok";
}
public void sendDataToService() throws IOException {
// 设置文件名称
String fileName = UUID.randomUUID() +"."+ getFileExtension(SEND_FILE);
log.info("文件:{} 开始发送",fileName);
// 读取文件,设置缓冲区读取,避免一次读取OOM
try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(SEND_FILE),CHUNK_SIZE)) {
byte[] buffer = new byte[CHUNK_SIZE];
int bytesRead;
int chunkIndex=0;
while ((bytesRead = inputStream.read(buffer, 0, CHUNK_SIZE)) != -1) {
chunkIndex++;
processChunk(fileName, buffer, bytesRead, chunkIndex);
}
} catch (IOException e) {
e.printStackTrace();
}
log.info("文件:{} 发送完成",fileName);
}
private static void processChunk(String fileName, byte[] buffer, int bytesRead, int chunkIndex) {
// 创建RestTemplate
RestTemplate restTemplate = new RestTemplate();
// 设置请求头application/octet-stream
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
// 分块传输数据给服务
byte[] chunk = new byte[bytesRead];
System.arraycopy(buffer, 0, chunk, 0, bytesRead);
// 构建请求实体
HttpEntity<byte[]> requestEntity = new HttpEntity<>(chunk, headers);
// 发送数据块给服务A
ResponseEntity<String> responseEntity = restTemplate.postForEntity(
"http://127.0.0.1:9999/stream/upload?fileName=" + fileName,
requestEntity,
String.class
);
// 处理响应
if (responseEntity.getStatusCode().is2xxSuccessful()) {
// 响应成功
log.info("数据分块 {} 发送成功", (chunkIndex + 1) );
} else {
// 响应失败
log.error("数据分块 {} 发送失败", (chunkIndex + 1) );
}
}
public static String getFileExtension(String fileName) {
int dotIndex = fileName.lastIndexOf(".");
if (dotIndex == -1 || dotIndex == fileName.length() - 1) {
return "";
} else {
return fileName.substring(dotIndex + 1);
}
}
}
服务器端代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
/**
* 后台服务接口间大文件的流式发送和读取
*
* @author zhangyu
* @date 2023/6/11
*/
@RequestMapping("/stream")
@Slf4j
@RestController
public class DataStreamServerController {
private static final String UPLOAD_DIR = "demo";
@PostMapping("/upload")
public ResponseEntity<String> uploadDataChunk(@RequestBody byte[] dataChunk, @RequestParam("fileName") String fileName) {
try {
log.info("接收到上传文件:{}",fileName);
// 处理数据块,例如将数据写入文件
writeDataChunkToFile(dataChunk, fileName);
// 返回成功响应
return ResponseEntity.ok("数据上传成功");
} catch (Exception e) {
// 处理上传失败情况
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("数据块处理异常: " + e.getMessage());
}
}
private void writeDataChunkToFile(byte[] dataChunk,String curFileName) throws IOException {
// 判断文件是否已创建,如果未创建,则进行初始化
if (!Files.exists(getFilePath(curFileName))) {
initializeFile(curFileName);
}
File file = new File(UPLOAD_DIR, curFileName);
try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) {
// 将文件指针移到末尾
raf.seek(raf.length());
// 写入数据块
raf.write(dataChunk);
}
log.info("完成文件读取写入:{}",curFileName);
}
/**
* 初始化文件
*/
private void initializeFile(String curFileName) throws IOException {
Path filePath = getFilePath(curFileName);
Files.createDirectories(filePath.getParent());
Files.createFile(filePath);
}
/**
* 获取文件路径
*/
private Path getFilePath(String curFileName) {
Path uploadDir = Paths.get(UPLOAD_DIR);
return uploadDir.resolve(curFileName);
}
}
测试实例
【1】分别设置客户端和服务器端项目的JVM参数: -Xms100m -Xmx100m
【2】客户端发送文件如下2G大小,总体的传输时间5S左右,因为设置的分块很小,所以传输交互很多次,可以根据应用的内存大小进行调整优化。
【3】完整代码:
https://github.com/zwzhangyu/ZyCodeHub/tree/main/spring-code-hub/mvc
【4】代码运行截图