背景
网上大多数minio大文件上传都是采用后台返回前端预上传链接,然后由前端去put请求直接和minio通信上传分片文件,然后调用后台合并分片逻辑来达到快申诉上传的目的,详情可以参考我的上两篇文章
最近有个项目域名是https的,但是上传大文件走https太慢,而且服务器配置很拉跨,https里走http预上传不知道为啥老是报错。所以研究下直接从后台分片,然后逐个上传,然后合并,删除分片。
springboot+elementui
集成minio
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.3.1</version>
</dependency>
yml配置
minio:
url: http://127.0.0.1:9000 //用于后台内部
domain: http://127.0.0.1:9000//用于返回给前端,这个可以改成线上域名
accessKey: minioadmin
secretKey: minioadmin
bucketName: minioBackName
#默认文件存放路径
filePath: common/
MinioConfig 配置
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.minio.MinioClient;
/**
* Minio 配置信息
*
* @author
*/
@Configuration
@ConfigurationProperties(prefix = "minio")
public class MinioConfig
{
/**
* 服务地址
*/
private String url;
private String domain;
public String getDomain() {
return domain;
}
public void setDomain(String domain) {
this.domain = domain;
}
/**
* 用户名
*/
private String accessKey;
/**
* 密码
*/
private String secretKey;
/**
* 存储桶名称
*/
private String bucketName;
/**
* 文件存储指定位置路径
*/
private String filePath;
public String getUrl()
{
return url;
}
public void setUrl(String url)
{
this.url = url;
}
public String getAccessKey()
{
return accessKey;
}
public void setAccessKey(String accessKey)
{
this.accessKey = accessKey;
}
public String getSecretKey()
{
return secretKey;
}
public void setSecretKey(String secretKey)
{
this.secretKey = secretKey;
}
public String getBucketName()
{
return bucketName;
}
public void setBucketName(String bucketName)
{
this.bucketName = bucketName;
}
public String getFilePath() {
return filePath;
}
public void setFilePath(String filePath) {
this.filePath = filePath;
}
@Bean
public MinioClient getMinioClient()
{
return MinioClient.builder().endpoint(url).credentials(accessKey, secretKey).build();
}
}
ISysFileService 文件上传接口
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* 文件上传接口
*
* @author
*/
public interface ISysFileService {
/**
* 文件上传接口
*
* @param file 上传的文件
* @return 访问地址
* @throws Exception
*/
public String uploadFile(MultipartFile file) throws Exception;
/**
* 异步文件上传接口
*
* @param file 上传的文件
* @return 访问地址
* @throws Exception
*/
public String uploadFileAsync(MultipartFile file) throws Exception;
}
MinioSysFileServiceImpl 实现类
/**
* Minio 文件存储
*
* @author
*/
@Primary
@Service
@Slf4j
public class MinioSysFileServiceImpl implements ISysFileService {
//minio每个分片不能低于5MB,最后一个分片可以不管 13MB文件可分成3个分片 5MB 5MB 3MB
private static final int PART_SIZE = 5 * 1024 * 1024; // 5MB parts
/**
* minio基础参数 配置类
*/
@Autowired
private MinioConfig minioConfig;
/**
* minio客户端连接 连接minio工具
*/
@Autowired
private MinioClient client;
/**
* 本地文件上传接口
*
* @param file 上传的文件
* @return 访问地址
* @throws Exception
*/
@Override
public String uploadFile(MultipartFile file) throws Exception {
String fileName = file.getOriginalFilename();//获取文件名称
fileName = minioConfig.getFilePath()+ DateUtils.getDate()+"/"+fileName.substring(fileName.lastIndexOf("/") + 1, fileName.length());
long startTime = System.currentTimeMillis()/1000;
//获取文件流
InputStream inputStream = file.getInputStream();
//获取文件大小
long fileSize = file.getSize();
//计算分片数量
int partCount = (int) (fileSize / PART_SIZE);
if (fileSize % PART_SIZE > 0) {
partCount++;
}
long partTime = System.currentTimeMillis()/1000;
System.out.println("分片耗时"+(partTime-startTime));
//存放分片流
List<InputStream> parts = new ArrayList<>();
//存放分片minio地址
List<String> fileList = new ArrayList<>();
//分配分片流
for (int i = 0; i < partCount; i++) {
// 每次只需要从原始文件InputStream中读取指定大小的数据即可
byte[] partData = new byte[PART_SIZE];
int read = inputStream.read(partData);
if (read == -1) {
break; // 文件已经读完了
}
// 将读取的数据作为一个新的InputStream添加到parts列表中
parts.add(new ByteArrayInputStream(partData, 0, read));
}
long readTime = System.currentTimeMillis()/1000;
System.out.println("读取文件耗时"+(readTime-partTime));
//上传分片流到minio
for (int i = 0; i < parts.size(); i++) {
// 构建每个part的object name
String partObjectName = fileName + ".part" + i;
fileList.add(partObjectName);
InputStream partStream = parts.get(i);
PutObjectArgs args = PutObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(partObjectName)
.stream(partStream, partStream.available(), -1)
.contentType(file.getContentType())
.build();
ObjectWriteResponse objectWriteResponse = client.putObject(args);
//System.out.println("分片上传结果======++++++"+objectWriteResponse);
}
long upLoadTime = System.currentTimeMillis()/1000;
System.out.println("上传分片耗时"+(upLoadTime-readTime));
//关闭主文件输入流和分片输入流
inputStream.close();
for (InputStream part : parts) {
part.close();
}
//获取需要合并的分片组装成ComposeSource
List<ComposeSource> sourceObjectList = new ArrayList<>(fileList.size());
for (String chunk : fileList){
sourceObjectList.add(
ComposeSource.builder()
.bucket(minioConfig.getBucketName())
.object(chunk)
.build()
);
}
//合并分片
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket(minioConfig.getBucketName())
//合并后的文件的objectname
.object(fileName)
//指定源文件
.sources(sourceObjectList)
.build();
client.composeObject(composeObjectArgs);
long mergeTime = System.currentTimeMillis()/1000;
System.out.println("合并分片耗时"+(mergeTime-upLoadTime));
//删除已经上传的分片,组装成DeleteObject
List<DeleteObject> collect = fileList.stream().map(DeleteObject::new).collect(Collectors.toList());
//执行删除
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
.bucket(minioConfig.getBucketName())
.objects(collect)
.build();
Iterable<Result<DeleteError>> results = client.removeObjects(removeObjectsArgs);
//如果没有下面try的代码,文件史删除不了的,加上下面的代码就可以删除了
try{
for (Result<DeleteError> result : results){
DeleteError deleteError = result.get();
System.out.println("error in deleteing object"+deleteError.objectName()+";"+deleteError.message());
}
}catch (Exception e){
System.out.println("minio删除文件失败");
e.printStackTrace();
}
long deleteTime = System.currentTimeMillis()/1000;
System.out.println("删除分片耗时"+(deleteTime-mergeTime));
return fileName;
}
/**
* 异步上传大文件采用链式
*
* @param file
* @return
* @throws Exception
*/
@Override
public String uploadFileAsync(MultipartFile file) throws Exception {
long startTimes = System.currentTimeMillis() / 1000;
assertAllowed(file, MimeTypeUtils.DEFAULT_ALLOWED_EXTENSION);
String fileName = FileUploadUtils.extractFilename(file);
fileName = minioConfig.getFilePath() + DateUtils.getDate() + "/" + fileName.substring(fileName.lastIndexOf("/") + 1, fileName.length());
InputStream inputStream = file.getInputStream();
ForkJoinPool pool = new ForkJoinPool();
// 创建分片流异步执行任务:读取大文件分成N个流
CompletableFuture<List<InputStream>> createPartNumTask = CompletableFuture.supplyAsync(() -> {
List<InputStream> parts = new ArrayList<>();
long fileSize = file.getSize();
int partCount = (int) (fileSize / PART_SIZE);
if (fileSize % PART_SIZE > 0) {
partCount++;
}
long startTime = System.currentTimeMillis() / 1000;
for (int i = 0; i < partCount; i++) {
// 每次只需要从原始文件InputStream中读取指定大小的数据即可
byte[] partData = new byte[PART_SIZE];
int read = 0;
try {
read = inputStream.read(partData);
} catch (IOException e) {
e.printStackTrace();
}
if (read == -1) {
break; // 文件已经读完了
}
// 将读取的数据作为一个新的InputStream添加到parts列表中
parts.add(new ByteArrayInputStream(partData, 0, read));
}
long endTime = System.currentTimeMillis() / 1000;
System.out.println(Thread.currentThread() + "执行创建分片流任务耗时->" + (endTime - startTime) + "秒");
return parts;
}, pool);
//createPartNum关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
//thenApply这里实际创建了一个新的CompletableFuture实例
String finalFileName = fileName;
CompletableFuture<List<String>> createUploadTask = createPartNumTask.thenApply((parts) -> {
long startTime = System.currentTimeMillis() / 1000;
List<String> fileList = new ArrayList<>();
for (int i = 0; i < parts.size(); i++) {
// 构建每个part的object name
String partObjectName = finalFileName + ".part" + i;
fileList.add(partObjectName);
InputStream partStream = parts.get(i);
PutObjectArgs args = null;
try {
args = PutObjectArgs.builder()
.bucket(minioConfig.getBucketName())
.object(partObjectName)
.stream(partStream, partStream.available(), -1)
.contentType(file.getContentType())
.build();
} catch (IOException e) {
e.printStackTrace();
}
try {
ObjectWriteResponse objectWriteResponse = client.putObject(args);
} catch (ErrorResponseException e) {
e.printStackTrace();
} catch (InsufficientDataException e) {
e.printStackTrace();
} catch (InternalException e) {
e.printStackTrace();
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (InvalidResponseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (ServerException e) {
e.printStackTrace();
} catch (XmlParserException e) {
e.printStackTrace();
}
}
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
for (InputStream part : parts) {
try {
part.close();
} catch (IOException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis() / 1000;
System.out.println(Thread.currentThread() + "执行上传分片流任务->" + (endTime - startTime) + "秒");
return fileList;
});
String finalFileName1 = fileName;
CompletableFuture<List<String>> megreTask = createUploadTask.thenApply((fileList) -> {
long startTime = System.currentTimeMillis() / 1000;
List<ComposeSource> sourceObjectList = new ArrayList<>(fileList.size());
for (String chunk : fileList) {
sourceObjectList.add(
ComposeSource.builder()
.bucket(minioConfig.getBucketName())
.object(chunk)
.build()
);
}
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.bucket(minioConfig.getBucketName())
//合并后的文件的objectname
.object(finalFileName1)
//指定源文件
.sources(sourceObjectList)
.build();
try {
client.composeObject(composeObjectArgs);
} catch (ErrorResponseException e) {
e.printStackTrace();
} catch (InsufficientDataException e) {
e.printStackTrace();
} catch (InternalException e) {
e.printStackTrace();
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (InvalidResponseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (ServerException e) {
e.printStackTrace();
} catch (XmlParserException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis() / 1000;
System.out.println(Thread.currentThread() + "执行合并分片任务->" + (endTime - startTime) + "秒");
return fileList;
});
CompletableFuture<Boolean> deleteTask = megreTask.thenApply((fileList) -> {
long startTime = System.currentTimeMillis() / 1000;
List<DeleteObject> collect = fileList.stream().map(DeleteObject::new).collect(Collectors.toList());
RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
.bucket(minioConfig.getBucketName())
.objects(collect)
.build();
Iterable<Result<DeleteError>> results = client.removeObjects(removeObjectsArgs);
try {
for (Result<DeleteError> result : results) {
DeleteError deleteError = result.get();
System.out.println("error in deleteing object" + deleteError.objectName() + ";" + deleteError.message());
}
} catch (Exception e) {
System.out.println("minio删除文件失败");
e.printStackTrace();
}
long endTime = System.currentTimeMillis() / 1000;
System.out.println(Thread.currentThread() + "执行删除分片任务->" + (endTime - startTime) + "秒");
return true;
});
long endTimes = System.currentTimeMillis() / 1000;
System.out.println("主线程执行耗时" + (endTimes - startTimes) + "秒");
return fileName;
}
}
controller 访问接口
@RestController
public class CommonController
{
@Value("${minio.domain}")//线上的域名
private String minioUrl;
@Value("${minio.bucketName}")//桶名
private String minioBucketName;
/**
* 文件上传请求
*/
@PostMapping("/common/upload/minio")
public AjaxResult upload(MultipartFile file)
{
try {
String url = sysFileService.uploadFile(file);
String filePath = minioUrl.+ "/"+minioBucketName+"/"+ url;
AjaxResult ajax = AjaxResult.success();
ajax.put("fileName", file.getOriginalFilename());//我这里返回的是视频原来的文件名
ajax.put("url", filePath);
return ajax;
}catch (Exception e)
{
log.error("上传文件失败", e);
return AjaxResult.error(e.getMessage());
}
}
/**
* 异步文件上传请求
*/
@PostMapping("/common/upload/minioAsync")
public AjaxResult upload(MultipartFile file)
{
try {
long startTime = System.currentTimeMillis()/1000;
String url = sysFileService.uploadFileAsync(file);
System.out.println("文件返回时间_END耗时"+(System.currentTimeMillis()/1000-startTime)+"秒");
String filePath = minioUrl.+ "/"+minioBucketName+"/"+ url;
AjaxResult ajax = AjaxResult.success();
ajax.put("fileName", file.getOriginalFilename());//我这里返回的是视频原来的文件名
ajax.put("url", filePath);
return ajax;
}catch (Exception e)
{
log.error("上传文件失败", e);
return AjaxResult.error(e.getMessage());
}
}
}
优化空间
MinioSysFileServiceImpl 实现类可以采用异步多线程的方式去执行,前端调用直接返回文件路径,不用管文件是否上传完,异步执行完成以后 调用mino 判断是否存在该文件,来判断该文件是否上传完成,另外想要实现,秒传,断点续传,只要加上MD5编码和redis就可以实现。后面我会补充
优化一 异步上传
代码我我已经再上面补充过了,主要讲下异步多线程
CompletableFuture
是java.util.concurrent
库在java 8
中新增的主要工具,同传统的Future
相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性
CompletableFuture<Integer> future1
= CompletableFuture.supplyAsync(()->{
System.out.println("compute 1");
return 1;
});
CompletableFuture<Integer> future2
= future1.thenApply((p)->{
System.out.println("compute 2");
return p+10;
});
System.out.println("result: " + future2.join());
这个例子中展示了任务链
在上面的示例中,future1
通过调用thenApply
将后置任务连接起来,并形成future2
。该示例的最终打印结果为11,可见程序在运行中,future1
的结果计算出来后,会传递给通过thenApply
连接的任务,从而产生future2
的最终结果为1+10=11。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。
需要注意的是,通过thenApply
连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算
。因此,这组函数主要用于连接前后有依赖的任务链。
利用异步多线程执行流程截图
看一下minio 里有没有文件