文章目录
- 一:newScheduledThreadPool(周期性线程池)
- 1.1 特点
- 1.2 核心线程数
- 1.3 创建实例
- 1.4 常用方法
- 1.4.1 schedule方法
- 1.4.2 scheduleAtFixedRate方法
- 1.4.3 scheduleWithFixedDelay方法
- 二:多线程下载展示文件总大小、剩余时间、下载速率
- 2.1 文件总大小
- 2.2 累计下载大小
- 2.2.1 Volatile
- 2.2.2 LongAdder
- 2.3 前一次下载的大小
- 2.4 重写run方法
- 三:编写下载器
- 3.1 编写http工具类
- 3.2 核心下载功能
- 3.3 编写分片下载的线程DownloaderTask
- 3.4 合并文件
- 3.5 清理临时文件
- 3.6 编写main方法
一:newScheduledThreadPool(周期性线程池)
1.1 特点
延时启动 、定时启动 、可以自定义最大线程池数量
1.2 核心线程数
int cpuNubmer = Runtime.getRuntime().availableProcessors();
1.3 创建实例
public ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(cpuNubmer);
1.4 常用方法
1.4.1 schedule方法
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @description 测试周期性线程池ScheduledThread-schedule方法
*/
public class ScheduledThreadTest {
public static void main(String[] args) throws Exception {
ScheduledExecutorService executorService = ScheduledThread.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
// 延迟2秒执行下一个任务
System.out.println("当前时间=" + sdf.format(new Date()));
for (int i = 0; i < 9; i++) {
int finalI = i;
ScheduledFuture future = executorService.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
int value = finalI;
System.out.println("时间=" + sdf.format(new Date()) + ",线程=" + Thread.currentThread().getName() + ",任务=" + value);
// Thread.sleep(3);
return "call";
}
}, 2, TimeUnit.SECONDS);
System.out.println(future.get());
}
executorService.shutdown();
}
}
1.4.2 scheduleAtFixedRate方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
- scheduleAtFixedRate :是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
- initialDelay:表示首次延迟2秒执行
- period :表示周期执行的时间为6秒,即表示会重复执行,重复执行的间隔时间为6秒
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 测试周期性线程池ScheduledThread-scheduleAtFixedRate方法
* initialDelay 表示首次延迟2秒执行
* period 表示周期执行的时间为6秒,即表示会重复执行,重复执行的间隔时间为6秒
* scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,
* 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
*/
public class ScheduledThreadTest2 {
public static void main(String[] args) throws Exception {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间="+sdf.format(new Date()));
// 延迟2秒执行下一个任务
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("begin=" + sdf.format(new Date()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end=" + sdf.format(new Date()));
}
}, 2, 4, TimeUnit.SECONDS);
}
// 周期性执行
// 当执行时间小于period时,下次执行时间= 当前执行开始时间+period
// 当前时间=2021-09-16 05:21:59
// begin=2021-09-16 05:22:01
// end=2021-09-16 05:22:05
// begin=2021-09-16 05:22:07
// end=2021-09-16 05:22:11
// 当执行时间大于period时,下次执行时间 = 当前执行结束时间
// 当前时间=2021-09-16 05:23:13
// begin=2021-09-16 05:23:15
// end=2021-09-16 05:23:20
// begin=2021-09-16 05:23:20
// end=2021-09-16 05:23:25
}
1.4.3 scheduleWithFixedDelay方法
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @description 测试周期性线程池ScheduledThread-scheduleWithFixedDelay方法
* scheduleWithFixedDelay,是从上一个任务结束时开始计时,period时间过去后,再次执行下一次任务。
*/
public class ScheduleWithFixedDelayTest {
public static void main(String[] args) throws Exception {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间="+sdf.format(new Date()));
// 延迟2秒执行下一个任务
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("begin=" + sdf.format(new Date()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end=" + sdf.format(new Date()));
}
}, 2, 2, TimeUnit.SECONDS);
}
// 周期性执行查询
// 执行结果如下:当前时间延迟2秒时间后开始执行线程,下次执行的时间= 当前执行结束时间+period
// 当前时间=2021-09-16 05:13:28
// begin=2021-09-16 05:13:31
// end=2021-09-16 05:13:36
// begin=2021-09-16 05:13:38
// end=2021-09-16 05:13:43
// begin=2021-09-16 05:13:45
// end=2021-09-16 05:13:50
}
二:多线程下载展示文件总大小、剩余时间、下载速率
2.1 文件总大小
/**
* 文件总大小
*/
public long httpFileContentSize;
2.2 累计下载大小
public static volatile LongAdder downSize = new LongAdder();
2.2.1 Volatile
Volatile关键字的作用主要有如下两个:
1.线程的可见性:当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。
2. 顺序一致性:禁止指令重排序。
2.2.2 LongAdder
LongAdder可以是计数器的增强版,高并发下性能会更好,适合频繁的更新,但是不太频繁读取,汇总统计信息时使用分成多个操作单元,不同线程更新不同的单元,只有需要汇总的时候才计算所有单元的操作。其实就是将内存中操作的变量拆分出来,让它变成多个变量(这里和ConcurrentHashMap的原理就很相似了),然后让线程去竞争这些变量,将这些变量处理完后,然后在进行求和,这样降低了变量的并发度,减少了CAS失败次数。
2.3 前一次下载的大小
/**
* 前一次下载的大小
* currentDownSize - preDownSize就是一秒内下载了多少
*/
public double preDownSize;
2.4 重写run方法
package com.sysg.file.core;
import com.sysg.file.constant.FileConstant;
import java.util.concurrent.atomic.LongAdder;
/**
* 展示下载信息的线程
*/
public class DownloadInfoThread implements Runnable {
/**
* 文件总大小
*/
public long httpFileContentSize;
/**
* 累计下载大小
* Volatile关键字的作用主要有如下两个:
* 1.线程的可见性:当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。
* 2. 顺序一致性:禁止指令重排序。
*
* LongAdder可以是计数器的增强版,高并发下性能会更好,适合频繁的更新,但是不太频繁读取,
* 汇总统计信息时使用分成多个操作单元,不同线程更新不同的单元,只有需要汇总的时候才计算所有单元的操作。
*
* 其实就是将内存中操作的变量拆分出来,让它变成多个变量( 这里和ConcurrentHashMap的原理就很相似了),
* 然后让线程去竞争这些变量,将这些变量处理完后,然后在进行求和,这样降低了变量的并发度,减少了CAS失败次数
*/
public static volatile LongAdder downSize = new LongAdder();
/**
* 前一次下载的大小
* currentDownSize - preDownSize就是一秒内下载了多少
*/
public double preDownSize;
public DownloadInfoThread(long httpFileContentSize){
this.httpFileContentSize = httpFileContentSize;
}
/**
* 通过定时任务线程池,让其每一秒执行一次
*/
@Override
public void run() {
//计算文件总大小,单位MB
String fileSize = String.format("%.2f", httpFileContentSize/FileConstant.MB);
//计算每秒下载速度,单位MB
double speed = Double.parseDouble(String.format("%.2f", (downSize.doubleValue() - preDownSize)/FileConstant.MB));
//计算后,将当前下载大小作为上一次的,currentDownSize在不断变化
preDownSize = downSize.doubleValue();
//剩余文件大小,文件总大小 - 本地已下载文件的大小 - 当前下载的大小
double remainSize = httpFileContentSize - downSize.doubleValue();
//估算剩余时间
String remainTime = String.format("%.1f", (remainSize / FileConstant.MB / speed));
//判断剩余时间是否为无限大
if("Infinity".equals(remainTime)){
remainTime = "-";
}
//计算已下载大小
String alreadyDownSize = String.format("%.2f", downSize.doubleValue() / FileConstant.MB);
String downInfo = String.format("已下载%sMB,文件总大小%sMB,下载速度%smb/s,剩余时间%ss",
alreadyDownSize,
fileSize,
speed,
remainTime);
System.out.println("\r");
System.out.println(downInfo);
}
}
三:编写下载器
以下载QQ客户端的url为例:https://dldir1.qq.com/qqfile/qq/PCQQ9.7.9/QQ9.7.9.29059.exe
3.1 编写http工具类
package com.sysg.file.utils;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
/**
* http工具类
*/
public class HttpUtil {
/**
* 得到下载文件长度
*
* @param url url
* @return long
*/
public static long getHttpFileContentLength(String url){
HttpURLConnection httpURLConnection = null;
try {
httpURLConnection = getHttpURLConnection(url);
return httpURLConnection.getContentLength();
} catch (Exception e) {
e.printStackTrace();
return 0;
} finally {
if(httpURLConnection != null){
httpURLConnection.disconnect();
}
}
}
/**
* 分片下载
* @param url 下载地址
* @param startPos 下载起始位置
* @param endPos 下载结束位置
* @return
*/
public static HttpURLConnection getHttpURLConnection(String url,long startPos,long endPos){
HttpURLConnection httpURLConnection = getHttpURLConnection(url);
LogUtil.info("下载的区间是:{}-{}",startPos,endPos);
if(httpURLConnection != null){
if (endPos != 0){
//bytes=100-200
httpURLConnection.setRequestProperty("RANGE","BYTES="+ startPos + "-" + endPos);
} else {
//下载最后一部分时endPos会赋值为0,此时如果为0,就表示会下载最后一部分
httpURLConnection.setRequestProperty("RANGE","BYTES="+ startPos + "-");
}
return httpURLConnection;
}
return null;
}
/**
* 获取HttpURLConnection连接对象
* @param url 文件的地址
* @return
*/
public static HttpURLConnection getHttpURLConnection(String url){
try {
//建立连接
URL httpUrl = new URL(url);
HttpURLConnection urlConnection =(HttpURLConnection)httpUrl.openConnection();
//向文件所在服务器发送标识信息,模拟浏览器
urlConnection.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36");
return urlConnection;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* 获取下载文件的文件名
* @param url
* @return
*/
public static String getHttpFileName(String url){
//获取最后一次“/”出现的下标
int index = url.lastIndexOf("/");
String fileName = url.substring(index + 1);
if(!fileName.endsWith(".exe")){
fileName = fileName + ".exe";
}
return fileName;
}
}
3.2 核心下载功能
/**
* 下载器
*
* @param url url
*/
public void downloader(String url){
//获取下载的文件名
String fileName = HttpUtil.getHttpFileName(url);
File file = new File(FileConstant.DOWNLOAD_PATH);
if(!file.exists()){
file.mkdirs();
}
//拼接文件下载路径
String downPath = FileConstant.DOWNLOAD_PATH + fileName;
//获取本地文件大小
long localFileLength = FileUtil.getFileContentLength(downPath);
//获取连接对象
HttpURLConnection httpURLConnection = null;
//获取需要下载下载的文件大小
int contentLength = 0;
try {
httpURLConnection = HttpUtil.getHttpURLConnection(url);
contentLength = httpURLConnection.getContentLength();
//判断文件是否已经下载过
if(localFileLength >= contentLength){
LogUtil.info("无须重新下载:{}",fileName);
return;
}
//创建获取下载信息的任务对象
DownloadInfoThread downloadInfoThread = new DownloadInfoThread(contentLength);
/**
* 将任务交给线程执行,每隔一秒执行一次
* initialDelay:延迟一秒执行
* period:每隔一秒执行一次
*/
scheduledExecutorService.scheduleAtFixedRate(downloadInfoThread,1,1, TimeUnit.SECONDS);
//调用切分任务的方法
splitFile(url);
countDownLatch.await();
//合并文件
if(mergeFile(fileName)){
clearTemp(fileName);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("\r");
System.out.println("下载完成");
if(httpURLConnection != null){
//关掉连接对象
httpURLConnection.disconnect();
}
//关闭线程
scheduledExecutorService.shutdownNow();
//关闭线程池
fixedThreadPool.shutdown();
}
}
3.3 编写分片下载的线程DownloaderTask
package com.sysg.file.core;
import com.sysg.file.constant.FileConstant;
import com.sysg.file.utils.HttpUtil;
import com.sysg.file.utils.LogUtil;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.util.concurrent.CountDownLatch;
/**
* 分块下载任务
*/
public class DownloaderTask implements Runnable {
/**
* url
*/
private final String url;
/**
* 下载的起始位置
*/
private final long startPos;
/**
* 下载的结束位置
*/
private final long endPos;
/**
* 标识当前下载块
*/
private final int part;
/**
* 程序计数器
*/
private CountDownLatch countDownLatch;
public DownloaderTask(String url, long startPos, long endPos, int part , CountDownLatch countDownLatch) {
this.url = url;
this.startPos = startPos;
this.endPos = endPos;
this.part = part;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
//获取文件名
String httpFileName = HttpUtil.getHttpFileName(url);
//下载路径 + 分块文件名
httpFileName = FileConstant.DOWNLOAD_PATH + httpFileName + ".temp" + part;
//获取分块下载的链接
HttpURLConnection httpURLConnection = HttpUtil.getHttpURLConnection(url, startPos, endPos);
try(
InputStream inputStream = httpURLConnection.getInputStream();
BufferedInputStream bis = new BufferedInputStream(inputStream);
RandomAccessFile randomAccessFile = new RandomAccessFile(httpFileName,"rw");
){
byte[] buffer = new byte[FileConstant.BYTE_SIZE];
int len = -1;
//循环读取数据
while((len = bis.read(buffer)) != -1){
//一秒内下载数据之和,通过原子类进行操作
DownloadInfoThread.downSize.add(len);
randomAccessFile.write(buffer,0,len);
}
} catch (FileNotFoundException e) {
LogUtil.error("下载文件不存在:{}",url);
} catch (Exception e) {
LogUtil.error("url出现异常,此url有问题,请换一个试试");
} finally {
countDownLatch.countDown();
//关闭链接
httpURLConnection.disconnect();
}
}
}
3.4 合并文件
/**
* 合并文件
*
* @param fileName 文件名称
* @return boolean 文件合并是否成功
*/
public boolean mergeFile(String fileName){
LogUtil.info("开始合并文件:{}",fileName);
byte[] buffer = new byte[FileConstant.BYTE_SIZE];
int len = -1;
try (RandomAccessFile accessFile = new RandomAccessFile(FileConstant.DOWNLOAD_PATH + fileName, "rw")){
for (int i = 0; i < FileConstant.THREAD_NUM; i++) {
try (
FileInputStream fileInputStream = new FileInputStream(FileConstant.DOWNLOAD_PATH + fileName + ".temp" + i);
BufferedInputStream bis = new BufferedInputStream(fileInputStream);
) {
while ((len = bis.read(buffer)) != -1){
accessFile.write(buffer,0,len);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
LogUtil.info("结束合并文件:{}",fileName);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
3.5 清理临时文件
/**
* 清理临时文件
*
* @param fileName 文件名称
*/
public void clearTemp(String fileName){
for (int i = 0; i < FileConstant.THREAD_NUM; i++) {
//删除临时文件
File file = new File(FileConstant.DOWNLOAD_PATH,fileName + ".temp" + i);
if(file.exists()){
file.delete();
}
}
}
3.6 编写main方法
public static void main(String[] args) {
String url = "https://dldir1.qq.com/qqfile/qq/PCQQ9.7.9/QQ9.7.9.29059.exe";
new Downloader().downloader(url);
}