为什么要引入线程池
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统的资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
Java中几种默认的线程池
- 如何创建线程池
- JUC包下Executors提供了几种线程池
Executors.newSingleThreadExecutor();
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(int corePoolSize);
Executors.newScheduledThreadPool(int corePoolSize);
- 线程池的重要方法
- 线程池状态流转
- 线程池的执行流程
大家可以参考下面这篇博客,笔者觉得写得很通俗易懂。
https://juejin.cn/post/6866054685044768782
封装一个高扩展性的线程池框架
- 支持任务优先级
- 支持线程池暂停、恢复、关闭
- 支持异步任务结果回调
package com.example.hilibrary.executor
import android.os.Handler
import android.os.Looper
import android.util.Log
import androidx.annotation.IntRange
import java.util.PriorityQueue
import java.util.concurrent.BlockingQueue
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
object HCommonExecutor {
private val TAG:String = "HCommonExecutor"
private var isPaused: Boolean = false
private var hiExecutor: ThreadPoolExecutor
private var lock:ReentrantLock
private var pauseCondition:Condition
private val mainHandler = Handler(Looper.myLooper()!!)
init {
lock = ReentrantLock();
pauseCondition = lock.newCondition()
val cpuCount = Runtime.getRuntime().availableProcessors()
val corePoolSize = cpuCount + 1
val maxPoolSize = cpuCount * 2 + 1;
val blockingQueue: PriorityBlockingQueue<out Runnable> = PriorityBlockingQueue()
val keepAliveTime = 30L
val unit = TimeUnit.SECONDS
val seq = AtomicLong()
val threadFactory = ThreadFactory {
val thread = Thread(it)
thread.name = "hi-executor-" + seq.getAndIncrement()
return@ThreadFactory thread
}
hiExecutor = object : ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
unit,
blockingQueue as BlockingQueue<Runnable>,
threadFactory
){
override fun beforeExecute(t: Thread?, r: Runnable?) {
super.beforeExecute(t, r)
if(isPaused){
lock.lock()
try {
pauseCondition.await()
}finally {
lock.unlock()
}
}
}
override fun afterExecute(r: Runnable?, t: Throwable?) {
super.afterExecute(r, t)
Log.e(TAG,"已执行完的任务的优先级是: "+(r as PriorityRunnable).priority)
}
}
}
@JvmOverloads
fun execute(@IntRange(from = 0, to = 10)priority:Int=0,runnable: Runnable){
hiExecutor.execute(PriorityRunnable(priority, runnable))
}
@JvmOverloads
fun execute(@IntRange(from = 0, to = 10)priority:Int=0,runnable: Callable<*>){
hiExecutor.execute(PriorityRunnable(priority, runnable))
}
abstract class Callable<T>:Runnable{
override fun run() {
mainHandler.post { onPrepare() }
val t = onBackground()
mainHandler.post { onCompleted(t) }
}
open fun onPrepare(){
}
abstract fun onBackground():T
abstract fun onCompleted(t:T)
}
class PriorityRunnable(val priority:Int,val runnable:Runnable):Runnable,Comparable<PriorityRunnable>{
override fun run() {
runnable.run()
}
override fun compareTo(other: PriorityRunnable): Int {
return if(this.priority<other.priority) 1 else if(this.priority>other.priority) -1 else 0
}
}
@Synchronized
fun pause(){
isPaused = true
Log.e(TAG,"hcexecutor is pause")
}
@Synchronized
fun resume(){
isPaused = false
lock.lock()
try {
pauseCondition.signalAll()
}finally {
lock.unlock()
}
Log.e(TAG,"hcexecutor is resume")
}
}