拼多多2面,还是模拟拼团,要求用户拼团成功后,提交订单支付金额。
之前我们在系列(8)《CountDownLatch核心原理》,实现过拼团场景。但是CountDownLatch里调用countDown()方法后,线程还是可以继续执行后面的代码,没有真正的阻塞。
1、面试真题:完善模拟拼团
这里我们应用循环屏障CyclicBarrier,可以控制一组线程到达屏障点后,再全部继续执行,而且这个屏障可以重复利用的特性来实现这个场景。
现在我们模拟2人拼团成功的场景,每满2人就允许提交订单支付,且后台发送消息给仓库发货。
package lading.java.mutithread;
import cn.hutool.core.date.DateTime;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* 模拟拼团,并通知仓库发货
*/
public class Demo010CyclicBarrier {
public static int count = 2;//要求达到屏障数量
public static volatile ConcurrentHashMap<String, String> customerNames = new ConcurrentHashMap<>();//记录已到达屏障客户线程名
public static CyclicBarrier barrier = new CyclicBarrier(count, new sendMsg());//屏障数量2,目标线程数量达到后,执行sendMsg
public static void main(String[] args) {
//模拟5个人拼团
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
Thread.sleep((new Random().nextInt(10 - 1 + 1)) * 1000);//客户浏览商品信息Ns
System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,到达屏障");
customerNames.put(Thread.currentThread().getName(), Thread.currentThread().getName());//本批次拼团名单
barrier.await();//到达屏障,进入阻塞
System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss SSS") +" 【"+ Thread.currentThread().getName() + "】,完成支付。");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}, "客户00" + (i + 1)).start();
}
}
static class sendMsg extends Thread {
@Override
public void run() {
System.out.println(DateTime.now().toString("YYYY-MM-dd hh:mm:ss") + customerNames.keySet() + "达到屏障拼团人数,允许提交订单!");
customerNames.clear();//本批次拼团名单清空
}
}
}
客户2,3到达后,任务线程就发送信息给参考发货,同时客户2、3可以支持订单。当客户1到达后,就阻塞等待客户4拼团才继续执行。最后客户5,因为没有成团,一直阻塞等待。
2、说说CyclicBarrier的核心原理
CyclicBarrier,顾名思义就是循环屏障。支持一组多个线程相互等待,线程调用了屏障的await()方法后,原地阻塞等待。当本组线程最后一个到达屏障后,本组其他线程全部被唤醒,继续执行屏障await()方法后面代码。
由于这个屏障在释放完本组等待线程后,可以重复使用,等待下一组线程过来阻塞排队,因此称为:循环屏障。
3、具体说说CyclicBarrier怎么使用
循环屏障也是很简单,核心方法就几个。
1.CyclicBarrier(int parties, Runnable barrierAction)
就是实例化一个循环屏障,parties就是本组线程目标数量。barrierAction就有意思了,这个是可选参数。如果本组线程都到达屏障后,就先执行这个Runnable barrierAction,阻塞等待的线程才能继续执行。可以从模拟拼团实例运行结果看到:线程2、3到达屏障后,先执行sendMsg的方法,线程2、3才可以开始支付。
2.await()
线程调用这个方法后,表示已经到达屏障,该线程阻塞进入休眠状态,等本组其他线程都到达屏障点,才会被唤醒继续执行后面的代码。还有两个入参可选,await(long timeout, TimeUnit unit) 如果超出指定的等待时间,则抛出TimeoutException异常。
核心常用就这2个方法了,没别的!
4、CyclicBarrier源码分析
首先,看一下CyclicBarrier的成员变量,里面int 有parties、和count。这两个变量成功支持了屏障变成循环屏障。其中parties表示屏障阈值,count表示当前还差多少个线程到达屏障,每来一个线程调用await(),本组线程的屏障count就减1.count为0时候,就唤醒本组线程继续执行。
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
其次,看一下实例化循环屏障对象代码,重点将本组循环等待线程数量parties赋值给parties、count,以及设置屏障任务。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//不指定屏障任务
public CyclicBarrier(int parties) {
this(parties, null);
}
然后,重点看一下await()阻塞等待的方法,里面调用了dowait()方法。
源码很长,简单总结:dowait方法就是更新count-1,表示本组又报道了一个线程,还没到的名额少了一个。如果count为0,那说明自己是本组最后来屏障集合的线程,负责唤醒大家,以及执行屏障的barrierCommand任务(如果有的话)。
源码细的说:除了count-1判断是否全部到齐,如果是0,包括如何唤醒其他线程。不是0,如何陷入阻塞等待。
具体就是:
1、如果count不是0,就把自己加入到AQS的条件队列里,等待信号唤醒。
2、如果count是0,说明本线程是最后一个到达的,咱不用进入阻塞,先执行屏障的barrierCommand,然后去唤醒本组的其他线程兄弟继续执行。并重置count值为parties阈值,方便下一组线程使用,达成屏障可循环使用的目的。
其他就是在AQS里如何阻塞等待、以及唤醒其他线程具体逻辑,源码有点复杂,等后续我们出源码分析专栏,就画图分析讲解。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//加锁,去更新count,这里就不是CAS了
lock.lock();
try {
final Generation g = generation;
//判断屏障是否被中断
if (g.broken)
throw new BrokenBarrierException();
//判断本线程是否已中断
if (Thread.interrupted()) {
//本组线程的屏障中断,并重置屏障
breakBarrier();
throw new InterruptedException();
}
// 对count减1
int index = --count;
// 本组屏障全部线程都到达屏障,接下来执行屏障任务、以及唤醒本组其他阻塞兄弟
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//实例CyclicBarrier(),如果有指定任务,本线程就代劳去执行
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果不为0,说明约定到本屏障的其他兄弟们还没到齐,那就自旋等待,直到被打断或者超时
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
}
........
}
今天就分享这么多,明天我们继续分享并发编程里的Condition条件接口。