stream并行流快(文件11g)
try (Stream<String> lines = Files.lines(filePath)) {
lines.parallel().forEach(str -> operatePartData(str, allDataList));
} catch (IOException e) {
throw new RuntimeException(e);
}
线程池慢(文件11g)
lines.skip(1).limit(37230)
limit() 范围是在skip的偏移量上增加行数范围
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
for (int i = 1; i <= THREAD_TIME; i++) {
int finalI = i;
int startIndex = (i - 1) * avgLines + 1;
int endIndex = i * avgLines;
poolExecutor.execute(() -> {
List<String[]> partData = new ArrayList<>();
try (Stream<String> lines = Files.lines(filePath)) {
if (1 == finalI) {
lines.skip(1).limit(endIndex).forEach(str -> {
operatePartData(str, partData);
});
System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, 1, endIndex, partData.size());
} else if (THREAD_TIME == finalI) {
lines.skip(startIndex).limit(totalLines.get() - startIndex).forEach(str -> {
operatePartData(str, partData);
});
System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, startIndex, totalLines.get(), partData.size());
} else {
lines.skip(startIndex).limit(endIndex - startIndex).forEach(str -> {
operatePartData(str, partData);
});
System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, startIndex, endIndex, partData.size());
}
if (!partData.isEmpty()) {
allDataList.addAll(partData);
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
poolExecutor.shutdown();
writeToFile(allDataList, outFileName);