CountDownLatch最基本的原理,就是用来阻塞线程的,java本身也有CountDownLatch,用多线程处理分批处理多数据的时候很有用
基本的逻辑就是,同时开多个子线程,然后主线程进入等待,只有当其他子线程全都结束之后,主线程才会继续往下走。
而redis的CountDownLatch,可以设置指定的客户端(线程)数量,只有这些客户端,全都完成之后执行了countDown()方法之后,才会继续往下走。
没有达到指定线程数量的话,所有客户端就一直阻塞
就是java里面CountDownLatch的分布式版
不过一般来说,用CountDownLatch都是在本地使用就够了,redis的用不到。
实现
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(3);
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。");
for(int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。");
Thread.sleep(3000);
RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");
localLatch.countDown();
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
//这里会卡住,等待上面3个线程都执行countDown()方法后才往下执行
latch.await();
System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走");
这两行代码是设置锁
RCountDownLatch latch = redisson.getCountDownLatch(“anyCountDownLatch”);
latch.trySetCount(3);
如果key不存在的话,设置这个key的制定值,并且发布一条消息
其他线程参与
每一个来参与的线程,都会给这个值-1,如果减到小于0了,就直接把这个数据删掉。
如果刚好=0 ,那么就发布一条消息广播。
客户端等待
这里的意思就是
await()方法,其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环
countDown(),decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1就可以了。如果这个值已经小于等于0,del anyCcoutnDownLatch,删除掉他就可以ile;
如果是这个值为0的时候,还会去发布一个消息,publish redisson_countdownlatch__channel__{anyCountDownLatch} 0