【Exchanger】:
package Ten_Class.t04.no139;
import java.util.concurrent.Exchanger;
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(() -> {
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
//交换后两个线程继续往前跑。( 先执行exchange()方法的线程会进入阻塞状态 )
【Exchanger注意】:
只能是两个线程之间,三个以及以上不清楚如何交换。
两个线程一个执行了exchange()方法 , 另一个不执行,那么执行的那个线程会进入阻塞状态。
【 复习总结 】:
- synchronized
- volatile
- AtomicXXX
- 各种JUC同步锁
ReentrantLock
CountDownLatch
CyclicBarrier
Phaser
ReadWriteLock - StampedLock
Semaphore
Exchanger
LockSupport
//JUC的锁中除了最后的LockSupport之外,其它的都是利用AQS来实现的。
【synchronized 和 ReentrantLock的不同之处 】:
sync系统自动加锁,自动解锁,RL需要我们手动加锁,手动解锁,RL可以出现各种各样的Condition , Condition代表的是不同的等待队列,RL底层是CAS实现,sync是四种状态的升级。
【淘宝面试题】:
实现一个容器,提供两个方法,add , size ;写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束。
【面试题】:
写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。
【 LockSupport 】:
package Ten_Class.t04.no141;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class T13_TestLockSupport00 {
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park(); //park是停车的意思;————当前线程停止。
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after 8 senconds!");
LockSupport.unpark(t); //叫醒线程。
}
}
【实验】:
package Ten_Class.t04.no141;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class T13_TestLockSupport01 {
public static void main(String[] args) {
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
LockSupport.park(); //park是停车的意思;————当前线程停止。
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
LockSupport.unpark(t);
}
}
LockSupport.unpark(t); //这一句话是同时和t线程启动的。
//但是一上来t线程没有进行park , 这说明unpark可以先于park进行调用。先调用的话之后就没办法停车了,之后就没办法再调park()方法了。
先调用unpark()方法的话,之后的park()方法就相当于失效了。这种方法比wait、notify要更加灵活。
【 Volatile 】:
package Ten_Class.t04.no142;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class T01_WithoutVolatile {
List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T01_WithoutVolatile c = new T01_WithoutVolatile();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
new Thread(() -> {
while (true) {
if (c.size() == 5) {
break;
}
}
System.out.println(" t2 结束 ");
}, "t2").start();
}
}
//发现不行。
【错误点】:
1)没有加同步。
2)停止不了是因为永远没有检测到——线程之间不可见的问题。第一个线程变化了之后,第二个线程不会马上就能看到。
【 改 正 之 后 】:
package Ten_Class.t04.no142;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class T02_WithVolatile {
//添加volatile , 使t2能够得到通知֪ͨ
volatile List lists = Collections.synchronizedList(new LinkedList<>());
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T02_WithVolatile c = new T02_WithVolatile();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
}
}, "t1").start();
new Thread(() -> {
while (true) {
if (c.size() == 5) {
break;
}
}
System.out.println("t2 结束");
}, "t2").start();
}
}
【注意】:
volatile List lists = Collections.synchronizedList(new LinkedList<>());
//同步容器。
【 wait、notify 】:
【错误示例】:
/**
* 曾经的面试题:(淘宝?)
* 实现一个容器,提供两个方法,add,size
* 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
* <p>
* 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
* <p>
* 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
* 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
* <p>
* 阅读下面的程序,并分析输出结果
* 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出
* 想想这是为什么?
*/
package Ten_Class.t04.no143;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class T03_NotifyHoldingLock { //wait notify
//添加volatile,使t2能够得到通知
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T03_NotifyHoldingLock c = new T03_NotifyHoldingLock();
final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 start");
if (c.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 end");
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
new Thread(() -> {
System.out.println("t1 start");
synchronized (lock) {
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
if (c.size() == 5) {
lock.notify(); //不释放锁————虽然叫醒了,但是没有拿到锁。
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
【修改】:
package Ten_Class.t04.no143;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class T04_NotifyFreeLock {
//添加volatile ,使t2能够得到通知。
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
public static void main(String[] args) {
T04_NotifyFreeLock c = new T04_NotifyFreeLock();
final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (c.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 结束");
//ͨ通知֪t继续执行1
lock.notify();
}
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
new Thread(() -> {
System.out.println("t1~~~启动");
synchronized (lock) {
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
if (c.size() == 5) {
lock.notify();
// 释放锁,让t2得以执行。
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
【最核心的点】:
notify不会释放锁 , 而wait是会释放锁的。wait要回来继续执行的时候还需要拿到锁才能继续执行。
【 CountDownLatch 、Semaphore 】:
package Ten_Class.t04.no145;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.LockSupport;
public class T08_Semaphore {
//添加volatile , 使t2能够得到通知。
volatile List lists = new ArrayList();
public void add(Object o) {
lists.add(o);
}
public int size() {
return lists.size();
}
static Thread t1 = null, t2 = null;
public static void main(String[] args) {
T08_Semaphore c = new T08_Semaphore();
Semaphore s = new Semaphore(1);
t1 = new Thread(() -> {
try {
s.acquire();
for (int i = 0; i < 5; i++) {
c.add(new Object());
System.out.println("add " + i);
}
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
t2.start();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
s.acquire();
for (int i = 5; i < 10; i++) {
System.out.println(i);
}
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t2 = new Thread(() -> {
try {
s.acquire();
System.out.println("t2 结束");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
//t2.start();
t1.start();
}
}
【 最终输出 】:
【 交叉打印案例 】:
【 synchronized 】:
【面试题】:
- 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
- 能够支持2个生产者线程以及10个消费者线程的阻塞调用。
【解释】:
生产者线程生产的东西装不下了,生产线程就会进入阻塞状态 , 同样拿的线程拿不到东西了也会进入阻塞状态。
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
* <p>
* 使用wait和notify/notifyAll来实现
* <p>
* 使用Lock和Condition来实现
* 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
*/
package Ten_Class.t04.no146;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
public class MyContainer1<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
public synchronized void put(T t) {
while (lists.size() == MAX) { //想想为什么用while而不是用if?
try {
//this.notifyAll()
this.wait(); //effective java
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
++count;
this.notifyAll(); //通知消费者线程进行消费
}
public synchronized T get() {
T t = null;
while (lists.size() == 0) {
try {
//this.notifyAll()
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count--;
this.notifyAll(); //通知生产者进行生产
return t;
}
public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
//启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
【 背过这种写法 】:
public synchronized void put(T t) {
while (lists.size() == MAX) { //想想为什么用while而不是用if?
//while后面的判断说明已经扔满了。
try {
//this.notifyAll()
this.wait(); //effective java //当前线程停住。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
++count;
this.notifyAll(); //通知消费者线程进行消费
}
//加synchronized是因为有 ++count 这一句。
【 CAS 】:
【能否?】:
生产者线程只负责叫醒消费者;
消费者线程只负责叫醒生产者。
//下面的写法可以满足这个要求。
【 满足要求的写法 】:
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
* <p>
* 使用wait和notify/notifyAll来实现
* <p>
* 使用Lock和Condition来实现
* 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
*
* @author mashibing
*/
package Ten_Class.t04.no146;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
lock.lock();
try {
while (lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}
lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
lock.lock();
try {
while (lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count--;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
【本质】:
private Condition producer = lock.newCondition(); //第一个条件;
private Condition consumer = lock.newCondition(); //第二个条件;
Condition的本质就是等待队列 ;
当producer.awit( )的时候 , 指的是这个线程进入P队列等待 , 当consumer.awit( )的时候 , 指的是这个线程进入C队列等待。
这样叫醒的时候,可以当读叫醒某个队列里的全部线程。
所以Condition的本质就是不同的等待队列。
【 探索源码 】:
【读源码的基础】:
1)数据结构基础
2)设计模式
【读源码的思路】:
更重要的是借鉴别人的思路 , 具体的边界判定条件其实可以忽略。
【 AQS核心 】:
Template Method
Callback Function
父类默认实现
子类具体实现
【 源码阅读原则 】:
//有时候静态读会读不下去 , 但是动态DEBUG就会发现问题所在。(有时会因为多态机制)
第一遍读的时候一些边界条件判断可以略过。
【 ReentrantLock 】:
public void lock() {
sync.lock();
}
-->
private final Sync sync; //sync原来是一个内部类;
-->
abstract static class Sync extends AbstractQueuedSynchronizer { ... }