一、Semaphore
Semaphore 通常又被称为信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
1.简单的使用
1-1.控制线程的并发连接数
public static void main(String[] args) {
// 只允许两个线程执行,其余线程要进行等待
Semaphore semaphore = new Semaphore(2);
// 创建了5个线程,但是同一时间只有两个线程可以执行
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+ ": acquire() time: " + System.currentTimeMillis());
Thread.sleep(1000); // 假设业务逻辑执行耗时1s
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
1-2.实现接口限流
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE,ElementType.METHOD})
public @interface InterfaceLimitAnno {
String key();
int limit() default 1;
}
@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
@Slf4j
public class InterfaceLimitAspect {
private static Map<String, Semaphore> map = new ConcurrentHashMap<>();
@Pointcut("@annotation(com.example.practice.AOP.InterfaceLimitAnno)")
public void validInterfaceLimit(){}
@Around(value = "validInterfaceLimit()")
public void around(ProceedingJoinPoint joinPoint)throws Throwable{
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
InterfaceLimitAnno annotation = signature.getMethod().getAnnotation(InterfaceLimitAnno.class);
String key = annotation.key();
int limit = annotation.limit();
StringBuffer sb = new StringBuffer(key + "/" +signature.getMethod().getName());
for (String parmName : signature.getParameterNames()) {
sb.append(parmName);
}
Semaphore semaphore = map.get(sb.toString());
if (semaphore == null) {
semaphore= new Semaphore(limit);
map.put(sb.toString(), semaphore);
}
try {
boolean b = semaphore.tryAcquire();
if (b) {
joinPoint.proceed();
} else {
throw new Exception("访问超出上限,请稍后重试");
}
} finally {
semaphore.release();
}
}
}
2.底层实现
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package java.util.concurrent;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Semaphore implements Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Semaphore.Sync sync;
public Semaphore(int var1) {
this.sync = new Semaphore.NonfairSync(var1);
}
public Semaphore(int var1, boolean var2) {
this.sync = (Semaphore.Sync)(var2 ? new Semaphore.FairSync(var1) : new Semaphore.NonfairSync(var1));
}
// 支持中断,state=state-1;中断后会抛出异常
public void acquire() throws InterruptedException {
this.sync.acquireSharedInterruptibly(1);
}
// 不支持中断,没有异常信息抛出
public void acquireUninterruptibly() {
this.sync.acquireShared(1);
}
public boolean tryAcquire() {
return this.sync.nonfairTryAcquireShared(1) >= 0;
}
// 等待var3后不再等待,释放CPU的占用,执行降级方案
public boolean tryAcquire(long var1, TimeUnit var3) throws InterruptedException {
return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
}
public void release() {
this.sync.releaseShared(1);
}
// 指定占用的资源,state=state-var1
public void acquire(int var1) throws InterruptedException {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
this.sync.acquireSharedInterruptibly(var1);
}
}
public void acquireUninterruptibly(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
this.sync.acquireShared(var1);
}
}
public boolean tryAcquire(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
return this.sync.nonfairTryAcquireShared(var1) >= 0;
}
}
public boolean tryAcquire(int var1, long var2, TimeUnit var4) throws InterruptedException {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
return this.sync.tryAcquireSharedNanos(var1, var4.toNanos(var2));
}
}
public void release(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
this.sync.releaseShared(var1);
}
}
public int availablePermits() {
return this.sync.getPermits();
}
public int drainPermits() {
return this.sync.drainPermits();
}
protected void reducePermits(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException();
} else {
this.sync.reducePermits(var1);
}
}
public boolean isFair() {
return this.sync instanceof Semaphore.FairSync;
}
public final boolean hasQueuedThreads() {
return this.sync.hasQueuedThreads();
}
public final int getQueueLength() {
return this.sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return this.sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + this.sync.getPermits() + "]";
}
static final class FairSync extends Semaphore.Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int var1) {
super(var1);
}
protected int tryAcquireShared(int var1) {
int var2;
int var3;
do {
if (this.hasQueuedPredecessors()) {
return -1;
}
var2 = this.getState();
var3 = var2 - var1;
} while(var3 >= 0 && !this.compareAndSetState(var2, var3));
return var3;
}
}
static final class NonfairSync extends Semaphore.Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int var1) {
// 将state设置为2
super(var1);
}
protected int tryAcquireShared(int var1) {
return this.nonfairTryAcquireShared(var1);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int var1) {
this.setState(var1);
}
final int getPermits() {
return this.getState();
}
final int nonfairTryAcquireShared(int var1) {
int var2;
int var3;
do {
// 加锁的时候state=state-1
var2 = this.getState();
var3 = var2 - var1;
} while(var3 >= 0 && !this.compareAndSetState(var2, var3)); // 更新state
return var3;
}
protected final boolean tryReleaseShared(int var1) {
int var2;
int var3;
do {
var2 = this.getState();
var3 = var2 + var1;
if (var3 < var2) {
throw new Error("Maximum permit count exceeded");
}
} while(!this.compareAndSetState(var2, var3));
return true;
}
final void reducePermits(int var1) {
int var2;
int var3;
do {
var2 = this.getState();
var3 = var2 - var1;
if (var3 > var2) {
throw new Error("Permit count underflow");
}
} while(!this.compareAndSetState(var2, var3));
}
final int drainPermits() {
int var1;
do {
var1 = this.getState();
} while(var1 != 0 && !this.compareAndSetState(var1, 0));
return var1;
}
}
}
2-1.Semaphore.acquire()
static final class NonfairSync extends Semaphore.Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int var1) {
// Semaphore semaphore = new Semaphore(2);默认创建的是非公平锁,将state设置为2
super(var1);
}
// 加锁: semaphore.acquire()
protected int tryAcquireShared(int var1) {
return this.nonfairTryAcquireShared(var1);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int var1) {
this.setState(var1);
}
final int getPermits() {
return this.getState();
}
final int nonfairTryAcquireShared(int var1) {
int var2;
int var3;
do {
// state = state - 1;如果state>0则说明还有可用的资源,反之则需要将线程入队
var2 = this.getState();
var3 = var2 - var1;
} while(var3 >= 0 && !this.compareAndSetState(var2, var3));
return var3;
}
......
}
private void doAcquireSharedInterruptibly(int var1) throws InterruptedException {
AbstractQueuedSynchronizer.Node var2 = this.addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
boolean var3 = true;
try {
AbstractQueuedSynchronizer.Node var4;
do {
var4 = var2.predecessor();
if (var4 == this.head) {//如果当前线程的前置节点是head,则在入队前再尝试一次加锁
int var5 = this.tryAcquireShared(var1);
if (var5 >= 0) { // 加锁成功,则当前线程的节点成为head节点
this.setHeadAndPropagate(var2, var5);
var4.next = null;
var3 = false;
return;
}
}
//shouldParkAfterFailedAcquire(var4, var2): waitStatus由0变为-1
// this.parkAndCheckInterrupt():当前线程被阻塞
} while(!shouldParkAfterFailedAcquire(var4, var2) || !this.parkAndCheckInterrupt());
throw new InterruptedException();
} finally {
if (var3) {
this.cancelAcquire(var2);
}
}
}
2-2.Semaphore.release()
protected final boolean tryReleaseShared(int var1) {
int var2;
int var3;
do {
var2 = this.getState();
var3 = var2 + var1;
if (var3 < var2) {
throw new Error("Maximum permit count exceeded");
}
} while(!this.compareAndSetState(var2, var3));
return true;
}
private void doReleaseShared() {
while(true) {
AbstractQueuedSynchronizer.Node var1 = this.head;
if (var1 != null && var1 != this.tail) {
int var2 = var1.waitStatus;
if (var2 == -1) {
if (!compareAndSetWaitStatus(var1, -1, 0)) {
continue;
}
this.unparkSuccessor(var1);
} else if (var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) {
continue;
}
}
if (var1 == this.head) {
return;
}
}
}
private void unparkSuccessor(AbstractQueuedSynchronizer.Node var1) {
int var2 = var1.waitStatus;
if (var2 < 0) {
compareAndSetWaitStatus(var1, var2, 0);
}
AbstractQueuedSynchronizer.Node var3 = var1.next;
if (var3 == null || var3.waitStatus > 0) {
var3 = null;
for(AbstractQueuedSynchronizer.Node var4 = this.tail; var4 != null && var4 != var1; var4 = var4.prev) {
if (var4.waitStatus <= 0) {
var3 = var4;
}
}
}
if (var3 != null) {
LockSupport.unpark(var3.thread);
}
}
二、CountDownLatch
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。
1.简单的使用
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread t1 = new Thread(() -> {
try {
System.out.println("t1线程运行中");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
Thread t2 = new Thread(() -> {
try {
System.out.println("t2线程运行中");
} finally {
countDownLatch.countDown();
}
});
t1.start();
t2.start();
countDownLatch.await();
System.out.println("t1、t2线程运行完成");
}
2.底层实现
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package java.util.concurrent;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class CountDownLatch {
private final CountDownLatch.Sync sync;
public CountDownLatch(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException("count < 0");
} else {
this.sync = new CountDownLatch.Sync(var1);
}
}
public void await() throws InterruptedException {
this.sync.acquireSharedInterruptibly(1);
}
public boolean await(long var1, TimeUnit var3) throws InterruptedException {
return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
}
public void countDown() {
this.sync.releaseShared(1);
}
public long getCount() {
return (long)this.sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + this.sync.getCount() + "]";
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int var1) {
this.setState(var1);
}
int getCount() {
return this.getState();
}
protected int tryAcquireShared(int var1) {
return this.getState() == 0 ? 1 : -1;
}
protected boolean tryReleaseShared(int var1) {
int var2;
int var3;
do {
var2 = this.getState();
if (var2 == 0) {
return false;
}
var3 = var2 - 1;
} while(!this.compareAndSetState(var2, var3));
return var3 == 0;
}
}
}
2-1. countDownLatch.await():全部子线程执行完成后,执行汇总逻辑
public void await() throws InterruptedException {
this.sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int var1) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
} else {
if (this.tryAcquireShared(var1) < 0) { // state == 0 ? 1 : -1 ,与var1没什么关系
this.doAcquireSharedInterruptibly(var1); // 线程阻塞
}
}
}
private void doAcquireSharedInterruptibly(int var1) throws InterruptedException {
AbstractQueuedSynchronizer.Node var2 = this.addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
boolean var3 = true;
try {
AbstractQueuedSynchronizer.Node var4;
do {
var4 = var2.predecessor();
if (var4 == this.head) {
int var5 = this.tryAcquireShared(var1);
if (var5 >= 0) {
this.setHeadAndPropagate(var2, var5);
var4.next = null;
var3 = false;
return;
}
}
} while(!shouldParkAfterFailedAcquire(var4, var2) || !this.parkAndCheckInterrupt());
throw new InterruptedException();
} finally {
if (var3) {
this.cancelAcquire(var2);
}
}
}
2-2. countDownLatch.countDown()
public void countDown() {
this.sync.releaseShared(1);
}
public final boolean releaseShared(int var1) {
if (this.tryReleaseShared(var1)) { // state=state-1,如果state不为0则不会唤醒当前阻塞的线程
this.doReleaseShared(); // 将waitStatus由-1转化为0,唤醒阻塞的线程
return true;
} else {
return false;
}
}
protected boolean tryReleaseShared(int var1) {
int var2;
int var3;
do {
var2 = this.getState();
if (var2 == 0) {
return false;
}
var3 = var2 - 1;
} while(!this.compareAndSetState(var2, var3));
return var3 == 0;
}
private void doReleaseShared() {
while(true) {
AbstractQueuedSynchronizer.Node var1 = this.head;
if (var1 != null && var1 != this.tail) {
int var2 = var1.waitStatus;
if (var2 == -1) {
if (!compareAndSetWaitStatus(var1, -1, 0)) {
continue;
}
this.unparkSuccessor(var1);
} else if (var2 == 0 && !compareAndSetWaitStatus(var1, 0, -3)) {
continue;
}
}
if (var1 == this.head) {
return;
}
}
}
三、CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
1.简单的使用
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
@Override
public void run() {
System.out.println("所有线程都到达,任务开始执行");
}
});
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() +"到达,开始执行自己的业务逻辑");
Thread.sleep(1000);
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() +"开始执行业务的整合逻辑");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
2.底层实现
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class CyclicBarrier {
// 用于保护屏障入口的锁
private final ReentrantLock lock;
// 达到屏障并且不能放行的线程在trip条件变量上等待
private final Condition trip;
// 栅栏开启需要的到达线程总数
private final int parties;
// 最后一个线程到达屏障后执行的回调任务
private final Runnable barrierCommand;
// 这是一个内部类,通过它实现CyclicBarrier重复利用,每当await达到最大次数的时候,就会重新new 一个,表示进入了下一个轮回。里面只有一个boolean型属性,用来表示当前轮回是否有线程中断。
private CyclicBarrier.Generation generation;
private int count;
private void nextGeneration() {
this.trip.signalAll();
this.count = this.parties;
this.generation = new CyclicBarrier.Generation();
}
private void breakBarrier() {
this.generation.broken = true;
this.count = this.parties;
this.trip.signalAll();
}
private int dowait(boolean var1, long var2) throws InterruptedException, BrokenBarrierException, TimeoutException {
ReentrantLock var4 = this.lock;
var4.lock();
byte var9;
try {
CyclicBarrier.Generation var5 = this.generation;
if (var5.broken) {
throw new BrokenBarrierException();
}
if (Thread.interrupted()) {
this.breakBarrier();
throw new InterruptedException();
}
int var6 = --this.count;
if (var6 != 0) {
do {
try {
if (!var1) {
this.trip.await();
} else if (var2 > 0L) {
var2 = this.trip.awaitNanos(var2);
}
} catch (InterruptedException var19) {
if (var5 == this.generation && !var5.broken) {
this.breakBarrier();
throw var19;
}
Thread.currentThread().interrupt();
}
if (var5.broken) {
throw new BrokenBarrierException();
}
if (var5 != this.generation) {
int var21 = var6;
return var21;
}
} while(!var1 || var2 > 0L);
this.breakBarrier();
throw new TimeoutException();
}
boolean var7 = false;
try {
Runnable var8 = this.barrierCommand;
if (var8 != null) {
var8.run();
}
var7 = true;
this.nextGeneration();
var9 = 0;
} finally {
if (!var7) {
this.breakBarrier();
}
}
} finally {
var4.unlock();
}
return var9;
}
public CyclicBarrier(int var1, Runnable var2) {
this.lock = new ReentrantLock();
this.trip = this.lock.newCondition();
this.generation = new CyclicBarrier.Generation();
if (var1 <= 0) {
throw new IllegalArgumentException();
} else {
this.parties = var1;
this.count = var1;
this.barrierCommand = var2;
}
}
public CyclicBarrier(int var1) {
this(var1, (Runnable)null);
}
public int getParties() {
return this.parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return this.dowait(false, 0L);
} catch (TimeoutException var2) {
throw new Error(var2);
}
}
public int await(long var1, TimeUnit var3) throws InterruptedException, BrokenBarrierException, TimeoutException {
return this.dowait(true, var3.toNanos(var1));
}
public boolean isBroken() {
ReentrantLock var1 = this.lock;
var1.lock();
boolean var2;
try {
var2 = this.generation.broken;
} finally {
var1.unlock();
}
return var2;
}
public void reset() {
ReentrantLock var1 = this.lock;
var1.lock();
try {
this.breakBarrier();
this.nextGeneration();
} finally {
var1.unlock();
}
}
public int getNumberWaiting() {
ReentrantLock var1 = this.lock;
var1.lock();
int var2;
try {
var2 = this.parties - this.count;
} finally {
var1.unlock();
}
return var2;
}
private static class Generation {
boolean broken;
private Generation() {
this.broken = false;
}
}
}
2-1.public CyclicBarrier(int var1, Runnable var2)
public CyclicBarrier(int var1, Runnable var2) {
this.lock = new ReentrantLock();
this.trip = this.lock.newCondition();
this.generation = new CyclicBarrier.Generation();
if (var1 <= 0) {
throw new IllegalArgumentException();
} else {
this.parties = var1;
this.count = var1;
this.barrierCommand = var2;
}
}
2-2.cyclicBarrier.await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return this.dowait(false, 0L);
} catch (TimeoutException var2) {
throw new Error(var2);
}
}
private int dowait(boolean var1, long var2) throws InterruptedException, BrokenBarrierException, TimeoutException {
ReentrantLock var4 = this.lock;
var4.lock();
byte var9;
try {
CyclicBarrier.Generation var5 = this.generation;
if (var5.broken) {
throw new BrokenBarrierException();
}
if (Thread.interrupted()) {
this.breakBarrier();
throw new InterruptedException();
}
int var6 = --this.count;
if (var6 != 0) {
do {
try {
if (!var1) {
// 当前线程阻塞
this.trip.await();
} else if (var2 > 0L) {
var2 = this.trip.awaitNanos(var2);
}
} catch (InterruptedException var19) {
if (var5 == this.generation && !var5.broken) {
this.breakBarrier();
throw var19;
}
Thread.currentThread().interrupt();
}
if (var5.broken) {
throw new BrokenBarrierException();
}
if (var5 != this.generation) {
int var21 = var6;
return var21;
}
} while(!var1 || var2 > 0L);
this.breakBarrier();
throw new TimeoutException();
}
boolean var7 = false;
try {
Runnable var8 = this.barrierCommand;
if (var8 != null) {
var8.run();
}
var7 = true;
// count=0 唤醒所有的线程
this.nextGeneration();
var9 = 0;
} finally {
if (!var7) {
this.breakBarrier();
}
}
} finally {
var4.unlock();
}
return var9;
}
private void nextGeneration() {
this.trip.signalAll();
this.count = this.parties;
this.generation = new CyclicBarrier.Generation();
}