Semaphore和Exchanger的使用
Semaphore
功能介绍
Semaphore 主要作用就是限制线程并发数量。
Semaphore 的构造函数中permits 可以控制最大并发数。每个线程可以acquire指定数量的 permit 。但是acquire n个 则需要释放n个。防止被阻塞
public class MySemaphore {
// permits 控制最大许可数
private Semaphore mySemaphore = new Semaphore(2);
public void test(int permits){
try {
//获取锁 permits 表示该线程需要的许可数量 如果 当前许可数 > semaphore 的许可数量将会被阻塞
mySemaphore.acquire(permits);
System.out.println("acquire " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println("release " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
//释放锁 获取了n个许可 则需要释放n个许可
mySemaphore.release(permits);
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
MySemaphore mySemaphore = new MySemaphore();
MyThread myThread1 = new MyThread(mySemaphore);
MyThread myThread2 = new MyThread(mySemaphore);
MyThread myThread3 = new MyThread(mySemaphore);
myThread1.start();
myThread2.start();
myThread3.start();
}
@AllArgsConstructor
@NoArgsConstructor
public static class MyThread extends Thread{
private MySemaphore mySemaphore;
@Override
public void run(){
mySemaphore.test(2);
}
}
}
使用acquireUninterruptibly() 可以防止 thread被 println
public class MySemaphore {
// permits 控制最大许可数
private Semaphore mySemaphore = new Semaphore(1);
public void test(){
try {
//获取锁 permits 表示该线程需要的许可数量 如果 当前许可数 > semaphore 的许可数量将会被阻塞
mySemaphore.acquireUninterruptibly();
System.out.println("acquire " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
// Thread.sleep(3000);
for (int i = 0; i < Integer.MAX_VALUE/50; i++){
String str = new String();
Math.random();
}
System.out.println("release " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
//释放锁 获取了n个许可 则需要释放n个许可
mySemaphore.release();
}catch (Exception e){
System.out.println(Thread.currentThread().getName() + " interrupt");
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
MySemaphore mySemaphore = new MySemaphore();
MyThread myThread1 = new MyThread(mySemaphore);
MyThread myThread2 = new MyThread(mySemaphore);
MyThread myThread3 = new MyThread(mySemaphore);
myThread1.setName("myThread1");
myThread1.start();
myThread2.setName("myThread2");
myThread2.start();
myThread3.setName("myThread3");
myThread3.start();
Thread.sleep(1000);
System.out.println("start interrupt a");
myThread2.interrupt();
}
@AllArgsConstructor
@NoArgsConstructor
public static class MyThread extends Thread{
private MySemaphore mySemaphore;
@Override
public void run(){
mySemaphore.test();
}
}
}
availablePermits() 获取当前可用的许可数
drainPermits() 立即返回可用的所有许可数,并且可讲许可数设置为0
hasQueuedThreads() 获取等待许可的线程数
hasQueuedThreads() 判断是都有线程在等待许可
可以 使用 tryAcquire 尝试获取许可 该方法可以保证线程不被阻塞。
应用场景
- 多进路-多处理-多出路(多线程并发)
- 多进路-单处理-多出路(多线程同时触发,串行运行)
- pool池 (限制线程数量)
- 多生产者/多消费者
Exchanger
功能介绍
Exchanger 可以使2个线程之间传输数据,比生产者/消费者更加方便。
public class MyExchanger {
@AllArgsConstructor
@NoArgsConstructor
public static class MyThreadA extends Thread{
private Exchanger<String> exchanger;
@Override
public void run(){
try {
System.out.println("A: " + exchanger.exchange("aaa"));
System.out.println(Thread.currentThread().getName() + " end!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@AllArgsConstructor
@NoArgsConstructor
public static class MyThreadB extends Thread{
private Exchanger<String> exchanger;
@Override
public void run(){
try {
System.out.println("B: " + exchanger.exchange("bbb"));
System.out.println(Thread.currentThread().getName() + " end!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
MyThreadA myThread = new MyThreadA(exchanger);
myThread.start();
MyThreadB myThreadB = new MyThreadB(exchanger);
myThreadB.start();
System.out.println("main end!");
}
}