Semaphore 是一种基于计数的信号量,在定义信号量对象的时候可以设置一个阈值,然后基于这个阈值,多线程可以竞争访问信号量,线程竞争到许可的信号之后,开始执行具体的业务逻辑,业务逻辑在执行完成之后释放这个许可信号。
如果许可信号竞争队列超过阈值,新加入的申请信号许可的线程就会被阻塞,知道有其他许可信号被释放。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//创建Semaphore信号量,初始化许可大小为3
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
Runnable runnable = new Runnable(){
public void run(){
try {
//请求获得许可,如果有可获得的许可则继续往下执行,
//许可数减1。否则进入阻塞状态
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release();//释放许可,许可数加1
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}
}
Semaphore 对于锁的申请与释放和ReentrantLock是类似的,通过acquire()方法和release()方法来获取和释放许可信号资源。
Semaphore.acquire()方法与
ReentrantLock.lockInterruptibly()方法的效果是一样的,为可响应中断的锁。也就是说在等待获取许可信号的过程中可以被Thread.interrupt()方法中断而取消对许可信号的申请操作。
除此之外,Semaphore也实现了可轮询的锁请求、定时锁的等功能,以及公平与非公平锁的定义在构造函数中设定
Semaphore锁的释放操作也需要手动进行释放,为此为了避免线程因为异常没有正常释放锁,释放锁的操作必须在finally代码块中完成。
Semaphore也可以用于实现一些对象池、资源池的构建,例如静态全局对象池、数据库连接池等等。
单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockTest {
public static void main(String[] args) {
final Business business = new Business();
ExecutorService executor = Executors.newFixedThreadPool(3);
for(int i=0;i<3;i++)
{
executor.execute(
new Runnable()
{
public void run()
{
business.service();
}
}
);
}
executor.shutdown();
}
private static class Business
{
private int count;
Lock lock = new ReentrantLock();
Semaphore sp = new Semaphore(1);
public void service()
{
//lock.lock();
try {
//当前线程使用count变量的时候将其锁住,不允许其他线程访问
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
count++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
} catch (RuntimeException e) {
e.printStackTrace();
}
finally
{
//lock.unlock();
sp.release(); //释放锁
}
}
}
}