多线程下载
在日常开发中,我们不可避免的会接到类似这样的需求,下载一个比较大的素材文件或者安装包文件,以此实现APP的自动更新,APP内的素材替换等。由于一般此类文件都比较大,一般会在50M以上,如果我们不对下载的进度进行记录的话,那么对于用户的流量的损耗和体验,都是比较糟糕的。所以我们自然而然的就会想到断点续传,同时为了充分压榨用户的带宽,使一些文件能够尽快的下载完成,那么我们也可以使用多线程同时下载的技术加快文件的下载速度。
举个例子,我们要从一个水缸中用抽水机通过水管抽水,由于管子的直径等等的限制,我们单条管子无法完全利用我们的抽水机的抽水动力。因此我们就将这些抽水的任务分成了多份,分摊到多个管子上,这样就可以更充分的利用我们的抽水机动力,从而提高抽水的速度。
因此,我们使用多线程下载的主要意义就是——提高下载速度。
多线程下载的原理
简单来讲,多线程下载原理其实就是讲一个文件逻辑区分了几块,每个线程分别独立地下载自己负责的区块。
所以我们可以简单地讲一个文件的大小平均分成几份即可。
既然要分配文件的区块,那么我们就要知道文件的总大小,文件的总大小我们可以通过网络请求进行获取,在 Response Headers 中的 Content-Length 字段。也就是该文件的大小,单位是字节。
简单抽象出来
/**
* 获取需要下载链接的文件长度
* @param url 链接
*/
@WorkerThread
fun obtainTotalSize(url: String): Long
获取文件指定区域的内容
任务分配我们已经了解了,看起来很理想,但有一个问题,我们如何实现向服务器只请求这个文件的某一段而不是全部呢?
我们可以通过在请求头中加入 Range 字段来指定请求的范围,从而实现指定某一段的数据。如:RANGE bytes=10000-19999 就指定了 10000-19999 这段字节的数据所以我们的核心思想就是通过它拿到文件对应字节段的 InputStream,然后对它读取并写入文件。
抽象出来即:
/**
* 获取文件分段内的内容
* @param url 链接
* @param start 开始位置
* @param end 结束位置
* @return 输入流
*/
@WorkerThread
fun obtainStreamByRange(url: String, start: Long, end: Long): InputStream?
文件的指定位置的写入
获取到对应的内容,那么我们就要在文件的指定区域去写入,由于我们是多线程下载,因此文件并不是每次都是从前往后一个个字节写入的,随时可能在文件的任何一个地方写入数据。因此我们需要能够在文件的指定位置写入数据。这里我们用到了RandomAccessFile 来实现这个功能。
RandomAccessFile 是一个随机访问文件类,同时整合了 FileOutputStream 和 FileInputStream,支持从文件的任何字节处读写数据。通过它我们就可以在文件的任何字节处写入数据。
接下来简单讲讲我们这里是如何使用 RandomAccessFile 的。我们对于每个子任务来说都有一个开始和结束的位置。每个任务都可以通过 RandomAccessFile::seek 跳转到文件的对应字节位置,然后从该位置开始读取 InputStream 并写入。这样,就实现了不同线程对文件的随机写入。
断点续传
那么文件的写入搞定了,那么就剩下最后一个文件,如何实现断点续传。这里其实我们可以记录每一次写入文件的进度,当下载任务被暂停的时候,我们就将对应的任务记录下载,记录相应的url,存储文件,当前下载的进度等基本信息,当用户再次出发的时候我们就可以从这些信息恢复进度,继续下载。
简单来讲,只需要我们将对应的任务进行持久化即可。
代码实现
首先我们需要定义任务的下载的各个阶段的状态。
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 下载状态枚举
*/
enum class DownloadStatus {
/**
* 空闲,默认状态
*/
IDLE,
/**
* 完成
*/
COMPLETED,
/**
* 下载中
*/
DOWNLOADING,
/**
* 暂停
*/
PAUSE,
/**
* 出错
*/
ERROR
}
相应的监听回调
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 下载的事件监听
*/
interface DownloadListener {
/**
* 开始下载
*/
fun onStart() {}
/**
* 下载中
* @param progress 进度 字节
* @param total 总数 字节
*/
fun onDownloading(progress: Long, total: Long) {}
/**
* 暂停
*/
fun onPause() {}
/**
* 取消下载
*/
fun onCancel() {}
/**
* 下载完成
*/
fun onComplete() {}
/**
* 出错
* @param msg 错误信息
*/
fun onError(msg: String) {}
}
Http的抽象辅助类
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 下载的网络请求接口定义
*/
interface DownloadHttpHelper {
/**
* 获取需要下载链接的文件长度
* @param url 链接
*/
@WorkerThread
fun obtainTotalSize(url: String): Long
/**
* 获取文件分段内的内容
* @param url 链接
* @param start 开始位置
* @param end 结束位置
* @return 输入流
*/
@WorkerThread
fun obtainStreamByRange(url: String, start: Long, end: Long): InputStream?
}
持久化的辅助类
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 下载的db存储接口定义
*/
interface DownloadDbHelper {
/**
* 删除一个任务
* @param task 下载子任务
*/
fun delete(task : SubDownloadTask)
/**
* 添加一个子任务
* @param task 子任务
*/
fun insert(task : SubDownloadTask)
/**
* 更新一个任务
* @param task 子任务
*/
fun update(task : SubDownloadTask)
/**
* 根据url查询相关任务
* @param url 下载链接
* @return 相关子任务 无:返回空列表
*/
fun queryByTaskTag(url : String):List<SubDownloadTask>
}
一些通用的配置
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 下载配置接口
*/
object DownloadConfig {
const val TAG = "DownloadManager"
/**
* 上下文
*/
lateinit var context: Application
private set
/**
* db实现
*/
var dbHelper: DownloadDbHelper
private set
/**
* http下载实现
*/
var httpHelper: DownloadHttpHelper
private set
/**
* 线程数
*/
var threadNum: Int
private set
/**
* 线程池
*/
var mExecutorService: Executor
private set
init {
threadNum = 4
dbHelper = DownloadDbImpl()
httpHelper = DownloadHttpImpl()
mExecutorService = Dispatchers.IO.asExecutor()
}
/**
* 必须要设置
* 设置上下文
*/
fun setContext(app: Application): DownloadConfig {
context = app
return this
}
/**
* 设置自定义的DownloadDbHelper
* 默认使用sqlite
*/
fun setDbHelper(impl: DownloadDbHelper): DownloadConfig {
dbHelper = impl
return this
}
/**
* 设置自定义的DownloadHttpHelper
* 默认使用HttpURLConnection
*/
fun setHttpHelper(impl: DownloadHttpHelper): DownloadConfig {
httpHelper = impl
return this
}
/**
* 设置线程数
* 默认 4
*/
fun setThreadNum(num: Int): DownloadConfig {
threadNum = num
return this
}
/**
* 设置线程池
* 默认采用 协程IO线程池
*/
fun setExecutor(executor: Executor): DownloadConfig {
mExecutorService = executor
return this
}
}
子任务的实现
/**
* Author: huangtao
* Date: 2022/12/27
* Desc: 子任务下载类
*/
class SubDownloadTask(
//下载路径
val url: String,
//子任务的大小
val taskSize: Long,
//开始位置
val startPos: Long,
//结束位置
val endPos: Long,
//当前位置
var currentPos: Long = startPos,
//保存的文件
val saveFile: File,
//回调监听
var listener: DownloadListener? = null
) : Runnable {
companion object {
const val BUFFER_SIZE: Long = 1024 * 1024
}
/**
* 当前任务的状态
*/
@Volatile
var status: DownloadStatus = DownloadStatus.IDLE
/**
* 已经下载的大小
*/
@Volatile
var completeSize = 0L
/**
* 暂停任务
*/
fun pause() {
status = DownloadStatus.PAUSE
}
override fun run() {
try {
status = DownloadStatus.DOWNLOADING
listener?.onStart()
val input = DownloadConfig.httpHelper.obtainStreamByRange(url, currentPos, endPos)
?: throw java.lang.NullPointerException("obtainStreamByRange InputStream is null")
writeFile(input)
} catch (e: Exception) {
Log.e(DownloadConfig.TAG, e.message ?: "", e)
status = DownloadStatus.ERROR
listener?.onError(e.message ?: "")
}
}
@Throws(IOException::class)
private fun writeFile(input: InputStream) {
Log.i(
DownloadConfig.TAG,
"${DownloadConfig.TAG}{${hashCode()}},写入开始 线程名:${Thread.currentThread().name} " +
"文件路径:${saveFile.absolutePath}"
)
val file = RandomAccessFile(saveFile, "rwd")
val bufferSize = BUFFER_SIZE.coerceAtMost(taskSize).toInt()
val buffer = ByteArray(bufferSize)
file.seek(currentPos)
while (true) {
if (status != DownloadStatus.DOWNLOADING) {
break
}
val offset = input.read(buffer, 0, bufferSize)
if (offset == -1) {
break
}
file.write(buffer, 0, offset)
currentPos += offset
completeSize += offset
listener?.onDownloading(offset.toLong(), taskSize)
}
//更新状态
if (status == DownloadStatus.DOWNLOADING) {
status = DownloadStatus.COMPLETED
}
DownloadConfig.dbHelper.update(this)
//处理回调
if (status == DownloadStatus.COMPLETED) {
listener?.onComplete()
} else if (status == DownloadStatus.PAUSE) {
listener?.onPause()
}
//关闭资源
file.close()
input.close()
Log.i(
DownloadConfig.TAG, "${DownloadConfig.TAG}{${hashCode()}},\n 写入状态:${status.name} " +
"总大小=${taskSize} 开始位置${startPos} " +
"结束位置${endPos} 完成大小${completeSize}"
)
}
}
总任务的实现
/**
* Author: huangtao
* Date: 2022/12/27
* Desc:
*/
class DownloadTask(
//链接
val url: String,
//保存路径
private val savePath: String,
//回调监听
val listener: DownloadListener
) : DownloadListener {
/**
* 完成大小
*/
@Volatile
var completeSize: Long = 0
private set
/**
* 文件总大小
*/
private var totalSize: Long = 0
/**
* 状态
*/
var status: DownloadStatus = DownloadStatus.IDLE
private set
/**
* 线程数
*/
private val threadNum = DownloadConfig.threadNum
/**
* 子任务列表
*/
private val subTasks = mutableListOf<SubDownloadTask>()
/**
* 线程池
*/
private val mExecutorService: Executor by lazy {
DownloadConfig.mExecutorService
}
/**
* 开始下载
* 如果是暂停的则从上次的位置继续下载
*/
fun download() {
mExecutorService.execute {
if (status == DownloadStatus.DOWNLOADING) {
return@execute
}
status = DownloadStatus.DOWNLOADING
val list = DownloadConfig.dbHelper.queryByTaskTag(url)
subTasks.clear()
totalSize = 0
completeSize = 0
for (task in list) {
task.listener = this
totalSize += task.taskSize
completeSize += task.completeSize
}
subTasks.addAll(list)
if (subTasks.isEmpty()) {
downloadNewTask()
} else if (subTasks.size == threadNum) {
existDownloadTask()
} else {
resetDownloadTask()
}
}
}
/**
* 暂停下载任务
*/
fun pauseDownload() {
if (status != DownloadStatus.DOWNLOADING) {
return
}
for (task in subTasks) {
task.pause()
}
status = DownloadStatus.PAUSE
listener.onPause()
}
/**
*重置下载任务
*/
fun resetDownloadTask() {
mExecutorService.execute {
for (task in subTasks) {
DownloadConfig.dbHelper.delete(task)
}
subTasks.clear()
downloadNewTask()
}
}
private fun existDownloadTask() {
startAsyncDownload()
}
private fun downloadNewTask() {
mExecutorService.execute {
listener.onStart()
completeSize = 0
val targetFile = File(savePath)
val destinationFolder = File(targetFile.parent ?: "")
if (!destinationFolder.exists()) {
destinationFolder.mkdirs()
}
targetFile.createNewFile()
val size = DownloadConfig.httpHelper.obtainTotalSize(url)
totalSize = size
val averageSize = size / threadNum
for (i in 0 until threadNum) {
var taskSize = averageSize
if (i == (threadNum - 1)) {
taskSize += totalSize % threadNum
}
var start = 0L
var index = i
while (index > 0) {
start += subTasks[i - 1].taskSize
index--
}
val end = start + taskSize - 1
val subTask =
SubDownloadTask(
url, taskSize, start,
end, start, targetFile, this
)
subTasks.add(subTask)
DownloadConfig.dbHelper.insert(subTask)
}
val file = RandomAccessFile(targetFile.absolutePath, "rwd")
file.setLength(size)
file.close()
startAsyncDownload()
}
}
private fun startAsyncDownload() {
status = DownloadStatus.DOWNLOADING
for (task in subTasks) {
if (task.completeSize < task.taskSize) {
mExecutorService.execute(task)
}
}
}
/**
* 下载进度
*/
override fun onDownloading(progress: Long, total: Long) {
synchronized(this) {
completeSize += progress
listener.onDownloading(completeSize, totalSize)
}
}
/**
* 子任务完成回调
* 此方法被将被调用threadNum次
*/
override fun onComplete() {
for (task in subTasks) {
if (task.status != DownloadStatus.COMPLETED) {
return
}
}
Log.i(
DownloadConfig.TAG, "${DownloadConfig.TAG}{${hashCode()}},下载完成 当前的线程名:${Thread.currentThread().name} "
)
for (task in subTasks) {
DownloadConfig.dbHelper.delete(task)
}
status = DownloadStatus.COMPLETED
}
/**
* 子任务出现异常的回调
*/
override fun onError(msg: String) {
//出现异常 暂停,清除任务重新下载
pauseDownload()
for (task in subTasks) {
DownloadConfig.dbHelper.delete(task)
}
subTasks.clear()
listener.onError(msg)
listener.onCancel()
}
}
使用方法
//引入依赖 gradle 7.0以下 项目根目录 build.gradle 文件
allprojects {
repositories {
...
maven { url 'https://jitpack.io' }
}
}
//引入依赖 gradle 7.0以上 项目根目录 setting.gradle 文件
dependencyResolutionManagement {
...
repositories {
...
maven { url 'https://jitpack.io' }
}
}
//模块module build.gradle
dependencies {
...
implementation 'com.github.huangtaoOO.TaoComponent:lib-download:0.0.3'
}
//初始化,必须
DownloadConfig.setContext(application)
//构建下载任务
val downloadTask = DownloadTask(
"https://dldir1.qq.com/qqfile/qq/TIM3.4.3/TIM3.4.3.22064.exe",
file,
object : DownloadListener {
@SuppressLint("SetTextI18n")
override fun onDownloading(progress: Long, total: Long) {
runOnUiThread {
tvProgress.text = "下载进度:${progress}/${total}"
}
}
})
//下载任务
downloadTask.download()
//重新下载
downloadTask.resetDownloadTask()
//暂停下载
downloadTask.pauseDownload()
源码传送门:github
- 使用过程中如有BUG,请提issue
- 使用过程中如有疑问或者更好的想法,欢迎进群讨论Android 学习交流群