概述
流式处理一个zip,zip里有多个json文件。
流式处理可以避免解压一个大的zip。再加上多线程,处理的效率杠杠的。
代码
package 多线程.demo05多jsonCountDownLatch;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@Slf4j
public class ZipProcessor {
private Path path;
private static int numThreads = Runtime.getRuntime().availableProcessors();
private static ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
private static ObjectMapper objectMapper = new ObjectMapper();
@SneakyThrows
public ZipProcessor(String filePath){
path = Paths.get(filePath);
if (!Files.exists(path)) {
throw new FileNotFoundException("The specified ZIP file does not exist: " + filePath);
}
}
public void streamProcess(){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 使用 try-with-resources 保证资源关闭
try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
// 将当前条目的数据读取到字节数组中
byte[] byteArray = getByteArray(zis);
process(byteArray, entry);
// 关闭当前条目的输入流
zis.closeEntry();
}
stopWatch.stop();
log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
} catch (IOException e) {
stopWatch.stop();
log.error("zip处理异常,耗时:{}秒", stopWatch.getTotalTimeSeconds(), e);
}
}
public void streamParallelProcess() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try (ZipInputStream zis = new ZipInputStream(Files.newInputStream(path))) {
ZipEntry entry;
List<Future<?>> futures = new ArrayList<>();
while ((entry = zis.getNextEntry()) != null) {
// 为了lambda表达式捕获局部变量
final ZipEntry currentEntry = entry;
// 将当前条目的数据读取到字节数组中
byte[] byteArray = getByteArray(zis);
Future<?> future = executorService.submit(() -> process(byteArray, currentEntry));
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("任务执行失败", e);
}
}
} catch (IOException e) {
log.error("读取ZIP文件异常", e);
} finally {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
executorService.shutdownNow();
}
stopWatch.stop();
log.info("zip处理耗时:{}秒", stopWatch.getTotalTimeSeconds());
}
}
private void process(byte[] entryData, ZipEntry entry) {
try {
// 在这里处理每个条目的数据
ObjectMapper objectMapper = new ObjectMapper();
OriginalObject originalObject = objectMapper.readValue(entryData, OriginalObject.class);
log.info("完成处理:{},sourceFileId:{}", entry.getName(), originalObject.getSourceFileId());
} catch (IOException e) {
log.error("处理条目 {} 异常", entry.getName(), e);
}
}
private byte[] getByteArray(ZipInputStream zis) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int length;
while ((length = zis.read(buffer)) > 0) {
baos.write(buffer, 0, length);
}
return baos.toByteArray();
}
}