3 线程间通信
线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。我们来基本一道面试常见的题目来分析
多线程编程步骤:
- 第一步:创建资源类,在资源类创建属性和操作方法
- 第二步:在资源类操作方法
- 判断
- 干活
- 通知
- 第三步:创建多个线程,调用资源类的操作方法
- 第四步:防止虚假唤醒问题
周阳四大口诀:
- 高内聚低耦合前提下,封装思想 -> 线程操作 -> 资源类
- 判断、干活、通知
- 防止虚假唤醒,wait方法要注意
- 注意标志位flag,可能是volatile的
题目:场景两个线程,一个线程对当前数值加 1,另一个线程对当前数值减 1,要求用线程间通信
// 资源类,
class Share2{
private int number = 0;
public synchronized void incr() throws InterruptedException {
if(number != 0){
wait();
}
number++;
System.out.println(Thread.currentThread().getName()+" : "+number);
notifyAll();
}
public synchronized void decr() throws InterruptedException {
if(number != 1){
wait();
}
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
notifyAll();
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share2 s = new Share2();
new Thread(()->{
try {
while(true){
s.incr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"AA").start();
new Thread(()->{
try {
while(true){
s.decr();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},"BB").start();
}
}
执行:可以完成需求
存在问题:当我把线程增加到4个,2个加,2个减,那么就会出现结果变成0或1以外的值。
虚假唤醒问题:
本质:唤醒后依然需要判断条件
对于条件的判断需要一直在while循环里面,把判断放进if就很容易出现问题。
因为wait在那里睡,在哪里醒,醒了之后继续执行后面的代码,有可能造成判断失效
使用while包裹起来,wait醒来之后还是会进行判断条件,只有等待条件不满足,不会进行wait,才会继续执行。
使用lock完成刚才的场景,使用newCondition()方法获取condition对象,使用await方法和signal方法。
// Lock版本
class MyShare{
private int number = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void incr() {
lock.lock();
try {
while (number!=0){
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+" : "+number);
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void decr() {
lock.lock();
try {
while (number!=1){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+" : "+number);
condition.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
public class MyThreadDemo2 {
public static void main(String[] args) {
MyShare myShare = new MyShare();
new Thread(()->{
for(int i=0;i<40;i++){
myShare.incr();
}
},"AA").start();
new Thread(()->{
for(int i=0;i<40;i++){
myShare.decr();
}
},"BB").start();
new Thread(()->{
for(int i=0;i<40;i++){
myShare.incr();
}
},"CC").start();
new Thread(()->{
for(int i=0;i<40;i++){
myShare.decr();
}
},"DD").start();
}
}
4 线程间定制化通信
主要内容:使用Lock接口里面的newCondition创建Condition对象,使用condition对象进行特定唤醒和睡眠达成效果
问题:A线程打印5次A,B线程打印 10 次 B,C线程打印15次C,按照此顺序循环10轮 主要在于按照顺序执行
class MyShare3{
// 1 AA 2 BB 3 CC
private int flag = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5(int loop){
lock.lock();
try {
while (flag!=1){
condition1.await();
}
for(int i=1;i<=5;i++){
System.out.println(loop+" 轮 :"+Thread.currentThread().getName()+" : "+i);
}
flag = 2;
condition2.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void print10(int loop){
lock.lock();
try {
while (flag!=2){
condition2.await();
}
for(int i=1;i<=10;i++){
System.out.println(loop+" 轮 :"+Thread.currentThread().getName()+" : "+i);
}
flag = 3;
condition3.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void print15(int loop){
lock.lock();
try {
while (flag!=3){
condition3.await();
}
for(int i=1;i<=15;i++){
System.out.println(loop+" 轮 :"+Thread.currentThread().getName()+" : "+i);
}
flag = 1;
condition1.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
public class MyThreadDemo3 {
public static void main(String[] args) {
MyShare3 share3 = new MyShare3();
new Thread(()->{
for(int i=1;i<=10;i++){
share3.print5(i);
}
},"AA").start();
new Thread(()->{
for(int i=1;i<=10;i++){
share3.print10(i);
}
},"BB").start();
new Thread(()->{
for(int i=1;i<=10;i++){
share3.print15(i);
}
},"CC").start();
}
}
5 集合的线程安全
5.1 ArrayList线程不安全演示
多线程对集合修改,造成 java.util.ConcurrentModificationException 异常
public class NotSafeDemo {
/**
* 多个线程同时对集合进行修改 * @param args
*/
public static void main(String[] args) {
List list = new ArrayList();
for (int i = 0; i < 100; i++) { new Thread(() ->{
list.add(UUID.randomUUID().toString()); System.out.println(list);
}, "线程" + i).start(); }
}
}
异常内容
java.util.ConcurrentModificationException
问题:为什么会出现并发修改异常?
查看 ArrayList 的 add 方法源码 : add方法没有使用synchronized修饰
接下来展示几种解决方案:
5.1.2 Vector
使用Vector替换ArrayList:
List<String> list = new Vector<>();
实际使用的并不多,这个是JDK1.0出现的,并且Vector存在很多问题:
- 同步开销:由于
Vector
的每个方法都是同步的,即使在单线程环境下,这也会引入额外的同步开销。这使得Vector
在性能方面可能不如非线程安全的集合类,例如ArrayList
。 - 高并发性能:对于高并发环境,
Vector
的同步机制可能会导致性能瓶颈。在并发访问频繁的情况下,使用更加高效的并发集合类,如ConcurrentLinkedQueue
或ConcurrentHashMap
,可以更好地满足并发性能需求。 - 更灵活的同步控制:虽然
Vector
的每个方法都是同步的,但在某些情况下,我们可能需要更精细的同步控制。使用Collections.synchronizedList()
方法可以将非线程安全的集合类包装成线程安全的集合类,同时可以使用更细粒度的同步控制,从而获得更好的性能。
5.1.3 Collections
使用Collections里面的synchronizedList静态方法,传入new的集合对象就可以确保其线程安全:
List<String> list = Collections.synchronizedList(new ArrayList<>());
这个解决方案也比较古老
5.1.4 CopyOnWriteArrayList(重点)
通过JUC工具包里面的CopyOnWriteArrayList类解决:
也是一个用于替换ArrayList<>集合的类
List<String> list = new CopyOnWriteArrayList<>();
底层原理:写时复制技术
支持并发读,独立写:
即,需要写的时候,复制一份出来,在新内容里面进行写,写完之后再去合并两部分内容。
// 对写加锁
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
代码实现里面,仅对写操作进行加锁,对于读操作,类似indexOf的操作,并不进行加锁操作。
原因分析(重点):动态数组与线程安全
下面从“动态数组”和“线程安全”两个方面进一步对 CopyOnWriteArrayList 的原理进行说明。
-
“动态数组”机制
- 它内部有个“volatile 数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile 数组”, 这就是它叫做 CopyOnWriteArrayList 的原因
- 由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的 操作,CopyOnWriteArrayList效率很低;但是单单只是进行遍历查找的话, 效率比较高。**
-
“线程安全”机制
- 通过 volatile 和互斥锁来实现的。
- 通过“volatile 数组”来保存数据的。一个线程读取 volatile 数组时,总能看到其它线程对该 volatile 变量最后的写入;就这样,通过 volatile 提供了“读取到的数据总是最新的”这个机制的保证。
- 通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile 数组”中,然后再“释放互斥 锁”,就达到了保护数据的目的。
5.2 HashSet线程不安全
线程不安全演示:
Set<String> set = new HashSet<>();
for (int i = 0; i <30; i++) {
new Thread(()->{
//向集合添加内容
set.add(UUID.randomUUID().toString().substring(0,8));
//从集合获取内容
System.out.println(set);
},String.valueOf(i)).start();
}
HashSet的add方法没有加上synchronized关键字
解决方案:
5.2.1 CopyOnWriteArraySet
Set<String> set = new CopyOnWriteArraySet<>();
5.3 HashMap线程不安全
HashMap的put方法没有加上synchronized关键字
Map<String,String> map = new HashMap<>();
for (int i = 0; i <30; i++) {
String key = String.valueOf(i);
new Thread(()->{
//向集合添加内容
map.put(key,UUID.randomUUID().toString().substring(0,8));
//从集合获取内容
System.out.println(map);
},String.valueOf(i)).start();
}
}
5.3.1 ConcurrentHashMap
Map<String,String> map = new ConcurrentHashMap<>();
ConCurrentHashMap如何实现线程安全:
- 分段锁:
ConcurrentHashMap
内部使用了分段锁(Segment),将哈希表分成多个段(Segment),每个段上有一个独立的锁。不同的线程可以同时访问不同的段,从而实现更高的并发性。 - 原子操作:
ConcurrentHashMap
使用了一些原子操作(Atomic Operations),例如compareAndSet
和volatile
关键字,来确保在并发修改时的数据一致性。 - 安全发布机制:
ConcurrentHashMap
在创建时会进行一些安全发布机制的操作,确保其他线程在完全构造之前无法访问它。 - 无阻塞算法:
ConcurrentHashMap
在并发修改时使用了无阻塞算法(Lock-Free),这意味着即使在高并发情况下,线程不会被阻塞在锁上,从而提高了并发性能。
public V put(K key, V value) {return putVal(key, value, false);}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table;;) {
Node<K, V> f;
int n, i, fh;
// 1. 分段锁
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 2. 原子操作
// 如果节点为空,在对应位置上使用 CAS 操作进行插入
if (casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
break; // no lock when adding to empty bin
} else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 3. 同步控制
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f;; ++binCount) {
K ek;
// 遍历链表查找键是否已存在
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
// 到达链表尾部,将新节点插入链表末尾
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
} else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
// 当前节点为树节点,调用树节点的插入操作
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 4. 无阻塞算法
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
5.4 小结
1.线程安全与线程不安全集合
集合类型中存在线程安全与线程不安全的两种,常见例如:
ArrayList ----- Vector
HashMap -----HashTable
但是以上都是通过 synchronized 关键字实现,效率较低
2.Collections 构建的线程安全集合
3.java.util.concurrent 并发包下
CopyOnWriteArrayList
CopyOnWriteArraySet
ConCurrentHashMap
类型,通过动态数组与线程安全个方面保证线程安全