1.自定义线程池
1>.在实际开发过程中建议不要使用JDK提供的方式创建线程池,因为底层不方便优化,在请求量非常大的情况下可能会出现OOM,我们需要手动实现一个线程池;
2>.代码实现:
@Slf4j
public class TestThreadPoolDemo1 {
public static void main(String[] args) {
//创建线程池对象
CustomThreadPool customThreadPool = new CustomThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
//queue.put(task); //拒绝策略①:添加任务失败,一直等待;
//queue.offer(3000,TimeUnit.MILLISECONDS,task); //拒绝策略②:添加任务失败,等待指定的时间,返回结果;
//log.info("放弃任务{}",task); //拒绝策略③:任务添加失败,直接放弃任务,后面的其他任务也会放弃;
//throw new RuntimeException(); //拒绝策略④:任务添加失败,当前任务不执行,抛出异常,后面的其他任务都不执行;
//task.run();//拒绝策略④:任务添加失败,调用者线程自己去执行任务(当前是main线程);
});
//提交请求任务
for (int i = 0; i < 4; i++) {
int j = i;
customThreadPool.executor(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//请求任务,打印信息
log.info("{}", j);
});
}
}
}
//阻塞队列,存放请求任务
@Slf4j
class CustomBlockQueue<T> {
//(有界)队列
private Deque<T> queue = new ArrayDeque<T>();
//锁对象(多个线程同时操作同一个位置上的元素)
private ReentrantLock lock = new ReentrantLock();
//生产者条件变量,队列满了让生产者线程等待
private Condition fullWaitSetCondition = lock.newCondition();
//消费者条件变量,队列空了让消费者线程等待
private Condition emptyWaitSetCondition = lock.newCondition();
//队列容量
private int capcity;
public CustomBlockQueue(int capcity) {
this.capcity = capcity;
}
//带超时的等待获取
public T poll(long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
//将超时时间统一转换为nanos
long nanos = timeUnit.toNanos(timeOut);
while (queue.isEmpty()) {
//队列为空,当前线程等待
try {
//当前线程只等待指定的时间,如果提前唤醒,则提前结束等待;如果超时未唤醒,则自动唤醒;
//该等待方法的返回值是剩余等待时间,即剩余等待时长=等待总时长-已经等待的时长;
//为了防止虚假唤醒,需要将这个方法返回的剩余等待时间重新赋值给nanos变量,
//下一轮循环就按照这个剩余时间进行等待,而不是再等待nanos时长
if (nanos <= 0) {
//等待时间结束仍然没有获取到元素,当前线程运行结束
return null;
}
nanos = emptyWaitSetCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//正常获取元素(弹出元素)
T t = queue.removeFirst();
//取出元素,队列中有空位,唤醒等待中的生产线程
fullWaitSetCondition.signalAll();
return t;
} finally {
lock.unlock();
}
}
//从队列中获取元素
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
//队列为空,当前线程等待
try {
emptyWaitSetCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//正常获取元素(弹出元素)
T t = queue.removeFirst();
//取出元素,队列中有空位,唤醒等待中的生产线程
fullWaitSetCondition.signalAll();
return t;
} finally {
lock.unlock();
}
}
//带超时时间的等待添加
public boolean offer(long timeOut, TimeUnit timeUnit, T task) {
lock.lock();
try {
//将超时时间转换成Nanos
long nanos = timeUnit.toNanos(timeOut);
while (queue.size() >= capcity) {
//队列已满,当前线程等待(等待指定的时间)
try {
log.info("任务队列已满,等待加入任务队列...{}",nanos);
//如果等待时间用完了任务还是没有添加到任务队列中,结束等待
//返回一个false,表示本次添加任务失败!
if (nanos <= 0) {
log.info("等待结束,返回结果{}",false);
return false;
}
//线程提前被虚假唤醒,返回一个剩余等待时间,这个时间也是下一轮循环当前线程要等待的时间
//把它重新赋值给nanos变量,作为下一轮循环等待时间
nanos = fullWaitSetCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//正常添加元素
log.info("添加任务到任务队列中:{}", task);
queue.addLast(task);
//队列中有元素,唤醒处于等待状态中的消费线程
emptyWaitSetCondition.signalAll();
//添加成功,返回true
return true;
} finally {
lock.unlock();
}
}
//添加元素到队列中
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
//队列已满,当前线程等待(一直等待)
try {
log.info("任务队列已满,等待加入任务队列...");
fullWaitSetCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//正常添加元素
log.info("添加任务到任务队列中:{}", task);
queue.addLast(task);
//队列中有元素,唤醒处于等待状态中的消费线程
emptyWaitSetCondition.signalAll();
} finally {
lock.unlock();
}
}
//获取队列大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
//包含拒绝策略的队列添加元素添加方法
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
//判读队列是否已满
if (queue.size() == capcity) {
//队列已满,根据指定的拒绝策略处理当前这个请求任务
rejectPolicy.reject(this, task);
} else {
//正常添加元素
log.info("添加任务到任务队列中:{}", task);
queue.addLast(task);
//队列中有元素,唤醒处于等待状态中的消费线程
emptyWaitSetCondition.signalAll();
}
} finally {
lock.unlock();
}
}
}
//自定义线程池
@Slf4j
class CustomThreadPool {
//任务队列(线程队列)
private CustomBlockQueue<Runnable> taskQueue;
//工作线程集合
private HashSet<Worker> works = new HashSet<Worker>();
//核心线程数
private int coreSize;
//工作线程从任务队列中获取任务的最大等待时间
private long timeOut;
//拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
private TimeUnit timeUnit;
public CustomThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.taskQueue = new CustomBlockQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
//提交任务
public void executor(Runnable task) {
synchronized (works) {
//当任务数量(工作线程数量)没有超过coreSize时,直接交给worker执行;
if (works.size() < coreSize) {
//创建一个worker线程
Worker worker = new Worker(task);
log.info("新创建worker工作线程:{}", worker);
//将新创建的线程加入到线程集合中
works.add(worker);
//线程启动
worker.start();
} else {
//如果任务数量(工作线程数量)超过了coreSize,将任务加入到任务队列中暂存;
//带拒绝策略的添加
taskQueue.tryPut(rejectPolicy, task);
}
}
}
//工作线程,真正执行任务
class Worker extends Thread {
//要执行的任务
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//(循环)任务执行
//当task不为空,直接执行任务
//当task执行完毕,再接着从任务队列中获取任务执行,如果超过指定的等待时间还是没有获取到新的任务,
//那么直接返回null,而不是一直等待!!!
//while (this.task != null || (this.task = taskQueue.take()) != null) {
while (this.task != null || (this.task = taskQueue.poll(timeOut, timeUnit)) != null) {
try {
log.info("正在执行的请求任务:{}", this.task);
this.task.run();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
this.task = null;
}
}
//当前线程执行完毕,从线程集合works中移除
synchronized (works) {
log.info("即将被移除的worker工作线程:{}", this);
works.remove(this);
}
}
}
}
//创建线程池拒绝策略接口
@FunctionalInterface
interface RejectPolicy<T> {
void reject(CustomBlockQueue<T> queue, T task);
}
2.异步模式之工作线程模式
2.1.定义
1>.让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务.
也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式;
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message);
***注意:不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率;
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工;
2.2.饥饿现象(线程池中的线程不足)
***注意:固定大小的线程池会有饥饿现象!
例如
①.两个工人是同一个线程池中的两个线程;
②.他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作;
- 1).客人点餐: 必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待;
- 2).后厨做菜: 没啥说的,做就是了;
③.比如工人A 处理了点餐任务,接下来它要等着工人B把菜做好,然后上菜,他俩也配合的蛮好;
④.但现在同时来了两个客人,这个时候工人A 和工人B都去处理点餐了,这时没人做饭了,饥饿;
代码实现:
@Slf4j
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
//针对不同的任务类型创建不同的线程池
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookingPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.info("处理点餐...");
Future<String> f = cookingPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜{}", f.get());
} catch (Exception e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.info("处理点餐...");
Future<String> f = cookingPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜{}",f.get());
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
3.线程池中合理的线程数
问题:
①.过小会导致程序不能充分地利用系统资源、容易导致饥饿;
②.过大会导致更多的线程上下文切换,占用更多内存;
1>.CPU密集型运算
通常采用
cpu 核数 + 1
能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费;
2>.I/O密集型
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时、远程RPC 调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率;
经验公式如下:
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如4核CPU计算时间是50%,其它等待时间是50%,期望cpu被100%利用,套用公式: 4 * 100% * 100% / 50% = 8
4.Tomcat线程池
4.1.Tomcat中哪里用到了线程池?
连接器和container容器!
组件说明:
①.LimitLatch用来限流,可以控制最大连接个数,类似J.U.C中的Semaphore;
②.Acceptor只负责"接收新的socket连接";
③.Poller只负责监听socket channel是否有"可读的I/O事件"
④.一旦可读,封装一个任务对象(socketProcessor),提交给Executor线程池处理;
⑤.Executor线程池中的工作线程最终负责"处理请求";
Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同:
如果总线程数达到maximumPoolSize,这时不会立刻抛RejectedExecutionException异常,而是再次尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException异常;
Tomcat-7.0.42源码:
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
}else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
4.2.Tomcat线程池配置
***注意:在Tomcat的配置文件"server.xml"中配置!
1>.connetor标签
的线程相关配置
2>.Executor标签
相关的线程配置
3>.线程池执行流程: