基本类型原子类
AtomicInteger
、AtomicBoolean
、AtomicLong
。
常用API:
public final int get();// 获取当前的值
public final int getAndSet(int newValue);// 获取当前值,并设置新值
public final int getAndIncrement();// 获取当前的值,并自增
public final int getAndDecremennt();// 获取当前的值,并自减
public final int getAndAdd(int delta);// 获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update);// 如果输入的数值等于预期值(expect),则以原子方式将该值设置为输入值(update)
一个例子:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static void main(String[] args) {
MyNumber myNumber = new MyNumber();
for (int i = 0; i < 50; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
myNumber.add();
}
}).start();
}
System.out.println("最终结果:" + myNumber.atomicInteger.get());
}
}
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void add() {
atomicInteger.getAndIncrement();
}
}
多次执行,会发现每次的结果都不一样,这是因为main线程执行的太快了,子线程还没有运行完成,main线程就获取了原子对象的值。尝试在main线程获取结果之前,添加一个线程等待,可以看到main线程就可以获取到正确的结果了。
那么,就有一个问题了,这里线程等待多久呢?由此就引出countDownLatch
。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
int size = 50;
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
myNumber.add();
}
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println("最终结果:" + myNumber.atomicInteger.get());
}
}
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void add() {
atomicInteger.getAndIncrement();
}
}
数组类型原子类
了解了基本类型原子类,再理解数组类型原子类就很容易了:AtomicIntegerArray
、AtomicLongArray
、AtomicReferenceArray
。
public class AtomicDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
System.out.println();
int value = 0;
value = atomicIntegerArray.getAndSet(0, 12345);
System.out.println(value + "\t" + atomicIntegerArray.get(0));
value = atomicIntegerArray.getAndIncrement(0);
System.out.println(value + "\t" + atomicIntegerArray.get(0));
}
}
引用类型原子类
AtomicReference
、AtomicStampedReference
、AtomicMarkableReference
。
AtomicReference
在自旋锁SpinLock里介绍过。
AtomicStampedReference
携带版本号的引用类型原子类,可以解决ABA问题,解决修改过几次问题,修改过一次后,版本号加一。
AtomicMarkableReference
带有标记位的引用类型原子类,标记位的值有两个:true/false,如果修改过,标记位由false变为true,解决一次性问题。
import java.util.concurrent.atomic.AtomicMarkableReference;
public class AtomicDemo {
static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100, false);
public static void main(String[] args) {
new Thread(() -> {
boolean marked = atomicMarkableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "标识:" + marked);
try {
Thread.sleep(1000);// 为了线程2可以拿到和线程1同样的marked,都是false
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean b = atomicMarkableReference.compareAndSet(100, 1000, marked, !marked);
System.out.println(Thread.currentThread().getName() + "CAS结果:" + b);
System.out.println("结果:" + atomicMarkableReference.getReference());
}, "thread1").start();
new Thread(() -> {
boolean marked = atomicMarkableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "标识:" + marked);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean b = atomicMarkableReference.compareAndSet(100, 2000, marked, !marked);
System.out.println(Thread.currentThread().getName() + "CAS结果:" + b);
System.out.println("结果:" + atomicMarkableReference.getReference());
}, "thread2").start();
}
}
对象的属性修改原子类
AtomicIntegerFieldUpdater
、AtomicLongFieldUpdater
、AtomicReferenceFieldUpdater
。
使用目的:以一种线程安全的方式,操作非线程安全对象内的某个字段。
使用要求:
- 更新对象属性必须使用public volatile修饰
- 因为对象属性修改类型原子类是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
AtomicIntegerFieldUpdater的例子
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
int size = 10;
Account account = new Account();
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
// account.synchronizedAdd();// 使用synchronized方式保证程序正确性
account.atomicAdd(account);// 不使用synchronized,保证程序的正确性
}
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
System.out.println("money=" + account.money);
}
}
class Account {
String name = "王劭阳";
public volatile int money = 0;
/**
* 使用synchronized方式保证程序正确性
*/
public synchronized void synchronizedAdd() {
money++;
}
AtomicIntegerFieldUpdater<Account> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Account.class, "money");
/**
* 不使用synchronized,保证程序的正确性
*/
public void atomicAdd(Account account) {
accountAtomicIntegerFieldUpdater.getAndIncrement(account);
}
}
AtomicReferenceFieldUpdater的例子
AtomicIntegerFieldUpdater也有局限性,它只能处理Integer类型的字段,对于其他类型的字段,可以使用AtomicReferenceFieldUpdater。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
int size = 10;
CountDownLatch countDownLatch = new CountDownLatch(size);
Account account = new Account();
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
account.init(account);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
}
}
class Account {
public volatile Boolean init = Boolean.FALSE;
AtomicReferenceFieldUpdater<Account, Boolean> accountBooleanAtomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Account.class, Boolean.class, "init");
public void init(Account account) {
if (accountBooleanAtomicReferenceFieldUpdater.compareAndSet(account, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + " start init");
System.out.println(Thread.currentThread().getName() + " end init");
} else {
System.out.println(Thread.currentThread().getName() + " init fail,because there has been inited");
}
}
}
原子操作增强类原理深度解析
DoubleAccumulator
、DoubleAdder
、LongAccumulator
、LongAdder
:这几个类是从Java8出现的。
在阿里巴巴Java开发手册上,推荐使用LongAdder,因为它比AtomicLong性能更好(减少乐观锁的重试次数)。
这里以LongAccumulator
和LongAdder
为例进行比较。
区别:LongAdder
只能用来计算加法,且从零开始计算,LongAccumulator
提供了自定义函数操作。
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
public class AtomicDemo {
public static void main(String[] args) {
LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.sum());
LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);
longAccumulator.accumulate(1);
longAccumulator.accumulate(3);
System.out.println(longAccumulator.get());
}
}
设计计数器,比较synchronized、AtomicLong、LongAdder、LongAccumulator
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
public class AtomicDemo {
public static void main(String[] args) throws InterruptedException {
long startTime, endTime;
int size = 100;
ClickNumber clickNumber = new ClickNumber();
CountDownLatch countDownLatch1 = new CountDownLatch(size);
CountDownLatch countDownLatch2 = new CountDownLatch(size);
CountDownLatch countDownLatch3 = new CountDownLatch(size);
CountDownLatch countDownLatch4 = new CountDownLatch(size);
startTime = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 100 * 10000; j++) {
clickNumber.bySynchronized();
}
} finally {
countDownLatch1.countDown();
}
}).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("bySynchronized:" + (endTime - startTime) + "\tsum=" + clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 100 * 10000; j++) {
clickNumber.byAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("byAtomicLong:" + (endTime - startTime) + "\tsum=" + clickNumber.atomicLong.longValue());
startTime = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 100 * 10000; j++) {
clickNumber.byLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("byLongAdder:" + (endTime - startTime) + "\tsum=" + clickNumber.longAdder.sum());
startTime = System.currentTimeMillis();
for (int i = 0; i < size; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 100 * 10000; j++) {
clickNumber.byLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("byLongAccumulator:" + (endTime - startTime) + "\tsum=" + clickNumber.longAccumulator.longValue());
}
}
class ClickNumber {
long number;
public synchronized void bySynchronized() {
number++;
}
AtomicLong atomicLong = new AtomicLong();
public void byAtomicLong() {
atomicLong.getAndDecrement();
}
LongAdder longAdder = new LongAdder();
public void byLongAdder() {
longAdder.increment();
}
LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);
public void byLongAccumulator() {
longAccumulator.accumulate(1);
}
}
通过运行结果,可以看出来:LongAdder
和LongAccumulator
的效率比AtomicLong
要快。
源码、原理分析
架构
原理
当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共和时,LongAdder
通常优于AtomicLong
。在高争用的情况下,LongAdder
的预期吞吐量明显更高,但代价是空间消耗更高。
为了分析其原理,需要先了解Striped64.java
和Cell.java
类。
Cell.java
是java.util.concurrent.atomic下Striped64.java
的一个内部类。
/** Number of CPUS, to place bound on table size:CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
* cells数组,为2的幂,方便位运算
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 基础value值,当并发较低时,只累加该值,主要用于没有竞争的情况,通过CAS更新
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* 创建或扩容Cells数组时,使用自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁
*/
transient volatile int cellsBusy;
Striped64.java
中一些变量或方法定义:
- base:类似于AtomicLong中全局value值。在没有竞争的情况下,数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
- collide:表示扩容意向,false:一定不会扩容,true:可能会扩容
- cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态,1:表示其他线程已经持有锁
- casCellsBusy():通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
- NCPU:当前计算机CPU数量,Cell数组扩容时会使用到
- getProbe():获取当前线程hash值
- advanceProbe():重置当前线程hash值
LongAdder
的基本思路就是分散热点,将value的值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()方法会将Cell数组中的value和base累加并作为返回值,核心思想就是将之前的AtomicLong的一个value的更新压力分散到多个value中去,从而降级更新热点。
低并发的时候,就是对base变量进行CAS操作。高并发的时候,就是对Cell数组进行CAS操作,最后调用sum()。
v
a
l
u
e
=
b
a
s
e
+
∑
i
=
0
n
C
e
l
l
[
i
]
value=base+\sum_{i=0}^{n}Cell[i]
value=base+i=0∑nCell[i]
LongAdder在无竞争的情况下,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时,则采用化整为零分散热点的做法,用空间换时间,用一个数组Cell,将一个value拆分进这个数组Cell。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash映射到这个数组Cell的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组Cell的所有值和base加起来作为最终结果。
源码解读深度分析
LongAdder的increment()
方法:
increment()
方法首先调用add(1L)
方法,add()
方法里,真正起作用的方法是longAccumulate()
方法。
public void add(long x) {
// as:Cells对象的引用
// b:获取的base值
// v:期望值
// m:cells数组的长度
// a:当前线程命中的cell单元格
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
观察if判断条件,第一个条件(as = cells) != null
的结果,第一次进来是false,我们继续看,会对base做一个cas操作,当线程少的时候,cas是足够的,因此casBase()
的结果就是true,再取非,就是false了。这也是低并发时候,只需要操作base即可。
当线程增多,casBase()
的返回值可能和想象中的不大一样了,因此,cas失败,!casBase()
的值是false,所以会进第一层if。再继续往里看,第一个判断as == null
即可判断出结果为true,再次进入if,接下来执行longAccumulate()
方法。
当第一次来到longAccumulate()
方法的时候,会执行Initialize table
这块代码,初始化cells。
再次回到add
方法,此时(as = cells) != null
的结果为true,继续判断里面的if条件。as == null
为false,继续判断(m = as.length - 1) < 0
的值为false,继续判断(a = as[getProbe() & m]) == null
,其中getProbe() & m
可以类比HashMap的put()方法,在找数组下标时候的思想,通过位运算快速定位数组下标,判断as[i]
这个地方有没有填充值,如果没有填充,那么运算结果为true,进入下面的longAccumulate()
方法,如果运算结果为false,继续判断!(uncontended = a.cas(v = a.value, v + x))
的值,也就是对非空的a值进行cas操作,操作成功后,uncontended是一个非零值,那么!uncontended
就是一个零值,代表false,下面的longAccumulate()
方法不会执行。继续观察!(uncontended = a.cas(v = a.value, v + x))
,当cas失败的时候,!uncontended
就为true,此时,也会进入longAccumulate()
方法,这时候代表线程竞争更加激烈,现有的base+cell已经计算不过来了,需要对cell数组进行扩容了,扩容方法也在longAccumulate()
里实现。
接下来查看longAccumulate()
方法。
这个方法有3个参数:
- long x:需要增加的值,一般是1
- LongBinaryOperator fn:默认为null
- boolean wasUncontentded:竞争标识,false表示存在竞争,只有cells初始化后,并且当前线程CAS修改失败,
wasUncontentded
的值才为false
查看getProbe()
方法,大致可以理解成当前线程的hash值,用于确定进入哪个cells里。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;// 存储线程probe值
if ((h = getProbe()) == 0) {// 说明probe未初始化
// 使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
ThreadLocalRandom.current(); // force initialization
h = getProbe();// 重新获取probe值,probe值被重置好比一个全新的线程一样
wasUncontended = true;// 标记wasUncontended为true
}
boolean collide = false; // True if last slot nonempty
for (;;) {// 自旋,看代码的时候,先看case2,case3,再看case1
Cell[] as; Cell a; int n; long v;
// case1:cells已经被初始化了,可能存在扩容
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {// 当前线程hash后映射的cells[i]为null,说明这个cells[i]可以使用
// 数组没有在扩容
if (cellsBusy == 0) { // Try to attach new Cell
// 新建一个Cell,填充value
Cell r = new Cell(x); // Optimistically create
// 尝试加锁,成功后cellsBusy=1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 双端加锁,将刚才带值的cell放到cells数组中
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// wasUncontended表示初始化cells后,当前线程竞争失败,wasUncontended=false,重新设置wasUncontended为true,接着执行advanceProbe(h)重置当前hash值,重新循环
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 说明当前线程对应数组有数据,也重置过hash值,通过CAS操作尝试对当前cells[i]累加x,如果cas成功了,直接跳出循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果n≥CPU最大数量,就不能扩容,并通过下面的advanceProbe(h)再次计算hash值
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 如果collide为false,则修改它为true(可以扩容),重新计算hash值,如果当前数组已经≥CPU最大数量,还会把collide置为false(不能扩容)
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 当前数组和最先赋值的数组是同一个,代表没有被其他线程扩容过,当前线程对数组进行扩容
if (cells == as) { // Expand table unless stale
// 容量按位左移1位进行扩容
Cell[] rs = new Cell[n << 1];
// 扩容后的拷贝工作
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 将扩容后的数组指向cells
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// 设置扩容状态=不能扩容,继续循环
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
// case2:cells没有加锁且没有初始化,尝试对它加锁并初始化cells数组
// cellsBusy=0表示无锁,并通过casCellsBusy()获取锁,也就是修改cellsBusy的值
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// double-check的目的:避免再次new一个cells数组,避免上一个线程中的数据被篡改
if (cells == as) {
// 新建一个容量为2的数组
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);// 填充数据
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// case3:cells正在初始化,尝试直接在基数base上进行累加操作
// 兜底方法,上面的所有cas操作都失败了,那么操作数据就会更新到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
接下来是sum()
方法求和,会将Cell数组中value和base累加作为返回值,核心思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sum()
方法在执行的时候,并没有限制base和cells的更新,所以LongAdder不是强一致性的,它是最终一致性的。最终返回的sum是局部变量,初始化的时候sum=base,在累加cells[i]的时候,base可能被更新了,但是sum并不会重新读取base的值,所以会出现数据不准确的情况。
使用总结
AtomicLong:线程安全,可允许一些性能损耗,要求高精度时使用,保证精度,性能代价,AtomicLong是多个线程对单个热点值value进行原子操作。
LongAdder:需要在高并发下有较好的性能表现,对值精确度要求不高时候,可以使用,保证性能,精度代价,LongAdder是每个线程拥有自己的槽位,各个线程一般只对自己槽中的那个值进行CAS操作。
小总结
AtomicLong:
原理:CAS+自旋:incrementAndGet()
场景:低并发下全局计算,AtomicLong能保证并发下计数的准确性,内部通过CAS来解决并发安全性问题
缺陷:高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈,N个线程进行CAS操作,每次只有一个线程成功,其他N-1个线程失败,失败后就不停的自旋直到成功,这样大量失败自旋的情况,CPU占用率就高了
LongAdder:
原理:CAS+Base+Cell数组,空间换时间分散热点数据
场景:高并发下全局计算
缺陷:如果在sum求和过程中,还有计算线程修改结果的话,会造成结果不准确