Atomic原子操作类介绍
在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量i=1,比如多个线程执行i++操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过Synchronized进行控制来达到线程安全的目的。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。实际上,在JUC下的atomic包提供了一系列的操作简单,性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic包下的这些类都是采用的是乐观锁策略去原子更新数据,在java中则是使用CAS操作具体实现。
在java.util.concurrent.atomic包里提供了一组原子操作类:
基本类型:AtomicInteger、AtomicLong、AtomicBoolean;
引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64
原子更新基本类型
以AtomicInteger为例总结常用的方法
//以原子的方式将实例中的原值加1,返回的是自增前的旧值;
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}
//incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果;
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
//addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
代码示例:
public class AtomicIntegerTest {
private static int count;
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
count++;
}
});
thread.start();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
}
统计总数,开启10个线程,每个线程1万次,次数给大点,太小不容易测出问题,总数应该是10万次,但是结果并不正确,因为count++不是原子操作。
使用原子操作类AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerTest {
static AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
// 原子自增 CAS
count.incrementAndGet();
}
});
thread.start();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count.get());
}
}
执行结果正确,总数10万。
常用方法代码示例:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println("获取并自增 = " + i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println("自增并获取 = " + i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println("自减并获取 = " + i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println("获取并自减 = " + i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println("获取并加值 = " + i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println("加值并获取 = " + i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println("获取并更新 = " + i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println("更新并获取 = " + i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println("获取并计算 = " + i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println("计算并获取 = " + i.accumulateAndGet(-10, (p, x) -> p + x));
}
}
执行结果:
获取并自增 = 0
自增并获取 = 2
自减并获取 = 1
获取并自减 = 1
获取并加值 = 0
加值并获取 = 0
获取并更新 = 0
更新并获取 = 0
获取并计算 = 0
计算并获取 = 0
原子更新数组类型
AtomicIntegerArray为例总结常用的方法
//addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加;
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}
//getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1;
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
//compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
代码示例:
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayTest {
static int[] value = new int[]{1, 2, 3, 4, 5};
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);
public static void main(String[] args) throws InterruptedException {
// 设置索引0的元素为100
atomicIntegerArray.set(0, 100);
System.out.println(atomicIntegerArray.get(0));
// 以原子更新的方式将数组中索引为1的元素与输入值相加
atomicIntegerArray.getAndAdd(1, 5);
System.out.println(atomicIntegerArray);
}
}
执行结果:
100
[100, 7, 3, 4, 5]
原子更新引用类型
AtomicReference作用是对普通对象的封装,它可以保证你在修改对象引用时的线程安全性。
代码示例:
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public static void main(String[] args) {
User user1 = new User("张三", 23);
User user2 = new User("李四", 25);
User user3 = new User("王五", 20);
// 初始化为 user1
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(user1);
//把 user2 赋给 atomicReference
atomicReference.compareAndSet(user1, user2);
System.out.println(atomicReference.get());
//把 user3 赋给 atomicReference
atomicReference.compareAndSet(user1, user3);
System.out.println(atomicReference.get());
}
}
@Data
@AllArgsConstructor
class User {
private String name;
private Integer age;
}
执行结果:
User(name=李四, age=25)
User(name=李四, age=25)
第二次对象比较失败,所以不会赋值。
对象属性原子修改器
AtomicIntegerFieldUpdater可以线程安全地更新对象中的整型变量。
代码示例:
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class AtomicIntegerFieldUpdaterTest {
public static class Candidate {
// 字段必须是volatile类型,多线程操作保证可见性
volatile int score = 0;
}
// 传入要修改的字段score
public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
public static void main(String[] args) throws InterruptedException {
final Candidate candidate = new Candidate();
// 启动1万个线程
Thread[] t = new Thread[10000];
for (int i = 0; i < 10000; i++) {
t[i] = new Thread(new Runnable() {
@Override
public void run() {
scoreUpdater.incrementAndGet(candidate);
}
});
t[i].start();
}
for (int i = 0; i < 10000; i++) {
// 调用join方法保证线程都能执行完成
t[i].join();
}
System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
}
}
执行结果:
AtomicIntegerFieldUpdater Score=10000
对于AtomicIntegerFieldUpdater 的使用稍微有一些限制和约束,约束如下:
(1)字段必须加volatile,在线程之间共享变量时保证立即可见。
(2)字段的描述类型(修饰符public/protected/default/private)与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
(5)对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。
LongAdder/DoubleAdder详解
AtomicLong是利用了底层的CAS操作来提供并发性的,比如addAndGet方法:
上述方法调用了Unsafe类的getAndAddLong方法,该方法内部是个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。
在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时竞争进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicInteger,AtomicLong的自旋瓶颈问题。
代码示例:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class LongAdderTest {
public static void main(String[] args) {
testAtomicLongVSLongAdder(10, 10000);
System.out.println("==================");
testAtomicLongVSLongAdder(10, 200000);
System.out.println("==================");
testAtomicLongVSLongAdder(100, 200000);
}
static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
try {
long start = System.currentTimeMillis();
testLongAdder(threadCount, times);
long end = System.currentTimeMillis() - start;
System.out.println("条件>>>>>>线程数:" + threadCount + ", 多线程操作计数" + times);
System.out.println("结果>>>>>>LongAdder方式增加计数" + (threadCount * times) + "次,共计耗时:" + end);
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount, times);
long end2 = System.currentTimeMillis() - start2;
System.out.println("条件>>>>>>线程数:" + threadCount + ", 多线程操作计数" + times);
System.out.println("结果>>>>>>AtomicLong方式增加计数" + (threadCount * times) + "次,共计耗时:" + end2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
AtomicLong atomicLong = new AtomicLong();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
// 执行完一个线程就-1
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
// 阻塞等待所有线程执行完
countDownLatch.await();
}
static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
LongAdder longAdder = new LongAdder();
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
// 每次都+1
longAdder.add(1);
}
countDownLatch.countDown();
}
}, "my-thread" + i).start();
}
countDownLatch.await();
}
}
执行结果:
条件>>>>>>线程数:10, 多线程操作计数10000
结果>>>>>>LongAdder方式增加计数100000次,共计耗时:22
条件>>>>>>线程数:10, 多线程操作计数10000
结果>>>>>>AtomicLong方式增加计数100000次,共计耗时:6
==================
条件>>>>>>线程数:10, 多线程操作计数200000
结果>>>>>>LongAdder方式增加计数2000000次,共计耗时:25
条件>>>>>>线程数:10, 多线程操作计数200000
结果>>>>>>AtomicLong方式增加计数2000000次,共计耗时:46
==================
条件>>>>>>线程数:100, 多线程操作计数200000
结果>>>>>>LongAdder方式增加计数20000000次,共计耗时:31
条件>>>>>>线程数:100, 多线程操作计数200000
结果>>>>>>AtomicLong方式增加计数20000000次,共计耗时:368
低并发、一般的业务场景下AtomicLong是足够了。如果并发量很多,存在大量写多读少的情况,那LongAdder可能更合适。
LongAdder原理
设计思路
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
LongAdder的内部结构
LongAdder内部有一个base变量,一个Cell[]数组:
base变量:非竞态条件下,直接累加到该变量上
Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中
/** Number of CPUS, to place bound on table size */
// CPU核数,用来决定槽数组的大小
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
// 数组槽,大小为2的次幂
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
/**
* 基数,在两种情况下会使用:
* 1. 没有遇到并发竞争时,直接使用base累加数值
* 2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cells初始化),
* 其他竞争失败的线程会将数值累加到base上
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
定义了一个内部Cell类,这就是我们之前所说的槽,每个Cell对象存有一个value值,可以通过Unsafe来CAS操作它的值:
LongAdder#add方法
LongAdder#add方法的逻辑如下图:
只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。
如果Cell[]数组未初始化,会调用父类的longAccumelate去初始化Cell[],如果Cell[]已经初始化但是冲突发生在Cell单元内,则也调用父类的longAccumelate,此时可能就需要对Cell[]扩容了。
这也是LongAdder设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延迟。
Striped64#longAccumulate方法
整个Striped64#longAccumulate的流程图如下: