1.原子累加器
示例代码:
public class TestAtomicAdder {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(
() -> new AtomicLong(0),
(adder) -> adder.getAndIncrement()
);
}
for (int i = 0; i < 5; i++) {
demo(
() -> new LongAdder(),
(adder) -> adder.increment()
);
}
}
/**
* @param adderSupplier 提供者 无中生有 ()->结果
* @param action 消费者 一个参数没结果
* @param <T>
*/
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
//4个线程,每个累加5万次,最终结果是200000
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 50000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost(ns):" + (end - start) / 1000_000);
}
}
比较 AtomicLong 与 LongAdder:
LongAdder累加器性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0累加Cell[0],而Thread-1累加Cell[1]… 最后将结果汇总.这样它们在累加时操作不同的Cell变量,因此减少了CAS重试失败,从而提高性能;
1.1.LongAdder源码分析
1>.LongAdder是并发大师Doug Lea(大哥李)的作品,设计的非常精巧;
2>.LongAdder类有几个关键域:
// 累加单元数组,懒惰初始化
// transient关键字作用在是序列化的时候保证这些域不会被序列化
transient volatile Cell[] cells;
// 基础值,如果没有竞争,则用cas累加这个域
transient volatile long base;
// 在cells创建或扩容时,置为1,表示加锁
transient volatile int cellsBusy;
3>.CAS锁
// 不要用于实践!!!
public class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
}
//测试代码
LockCas lock = new LockCas();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
sleep(1);
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("lock...");
} finally {
lock.unlock();
}
}).start();
4>.缓存行(hang)伪共享(一个缓存行加入了多个内存cell对象被称为伪共享)
其中Cell即为累加单元
// 防止缓存行伪共享,防止一个缓存行容纳多个内存cell对象
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用cas方式进行累加, prev表示旧值, next表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
1.1.1.LongAdder#add()源码
public void add(long x) {
// as为累加单元数组
// b为基础值
// x为累加值
Cell[] as; long b, v; int m; Cell a;
// 进入 if 的两个条件
// 1.as有值,表示已经发生过竞争, 进入if
// 2.cas给base累加时失败了,表示base发生了竞争,进入if
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended表示cell没有竞争
boolean uncontended = true;
// as还没有创建
// 当前线程对应的cell还没有
// cas给当前线程的cell累加失败uncontended=false(a为当前线程的cell)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 进入cell数组创建、cell创建的流程
longAccumulate(x, null, uncontended);
}
}
1.1.2.Striped64#longAccumulate()源码
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {
int h;
// 当前线程还没有对应的cell,需要随机生成一个h值用来将当前线程绑定到cell
if ((h = getProbe()) == 0) {
// 初始化probe
ThreadLocalRandom.current(); // force initialization
// h对应新的probe值,用来对应(/占用)cell
h = getProbe();
wasUncontended = true;
}
// collide为true表示需要扩容
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
// 已经有了cells
if ((as = cells) != null && (n = as.length) > 0) {
// 还没有cell
if ((a = as[(n - 1) & h]) == null) {
// 为cellsBusy加锁,创建cell,cell的初始累加值为x
// 成功则 break,否则继续continue循环
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 有竞争,改变线程对应的cell来重试cas
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// cas尝试累加,fn配合LongAccumulator不为null,配合LongAdder为null
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cells长度已经超过了最大长度,或者已经扩容,改变线程对应的cell来重试cas
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 确保collide为false进入此分支,就不会进入下面的else if进行扩容了
else if (!collide)
collide = true;
// 加锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
// 加锁成功,扩容
continue; // Retry with expanded table
}
// 改变线程对应的cell对象
h = advanceProbe(h);
}
// 还没有cells,尝试给cellsBusy加锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 加锁成功,初始化cells,最开始长度为2,并填充一个cell
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
// 成功则break;
break;
}
// 上两种情况失败,尝试给base累加
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
每个线程刚进入longAccumulate()时,会尝试对应(/占用)一个cell对象(找到一个位置)
1.1.3.LongAdder#sum()源码
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
//将Cells数组中的每个元素进行累加
sum += a.value;
}
}
return sum;
}
扩展: 缓存与内存的速度比较
1>.CPU内存结构:
2>.CPU访问不同的缓存和内存耗费的时钟周期
①.CPU与内存的速度差异很大,需要靠预读数据至缓存来提升效率;
②.缓存以缓存行为单位,每个缓存行对应着一块内存,一般是64byte(8个long);
③.缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中;
④.CPU要保证数据的一致性,如果某个CPU核心更改了数据,其它CPU核心对应的整个缓存行(64byte)必须全部失效;
⑤.由于内存Cell是数组形式,在内存中是连续存储的,一个内存Cell为24字节(16字节的对象头和8字节的value),因此缓存行可以存下2个内存Cell对象.这样问题来了:
- Core-0要修改Cell[0]
- Core-1要修改Cell[1]
无论谁修改成功,都会导致对方Core的缓存行失效,比如Core-0中Cell[0]=6000,Cell[1]=8000要累加Cell[0]=6001,Cell[1]=8000,这时会让Core-1的缓存行失效;
⑥."@sun.misc.Contended"就是用来解决这个问题的,它的原理是在使用此注解的对象或字段的前后各增加128字节大小的padding(填充内存Cell),从而让CPU将(不同的)对象预读至缓存时占用不同的缓存行,这样不会造成对方缓存行的失效;
2.Unsafe对象
2.1.概述
1> Unsafe对象提供了非常底层的,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得;
2>.示例代码:
public class TestUnsafeDemo1 {
public static void main(String[] args) throws Exception {
//通过反射获取到unsafe对象,这里的参数"theUnsafe"是固定的写法,对应着unsafe对象中的(类类型)成员变量
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
//由于unsafe对象中"theUnsafe"成员变量是"private"私有的,因此要设置访问权限
theUnsafe.setAccessible(true);
//通过成员变量获取到所属的unsafe对象
//由于unsafe对象中"theUnsafe"成员变量是静态的,因此这里原本的对象参数就是null
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
System.out.println(unsafe); //sun.misc.Unsafe@7ea987ac
}
}
3>.封装成工具类:
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
2.2.Unsafe CAS操作
1>.使用Unsafe对象通过CAS机制线程安全的修改对象的成员变量:
@Slf4j
public class TestUnsafeCasDemo1 {
public static void main(String[] args) throws Exception {
//获取unsafe对象
Unsafe unsafe = UnsafeAccessor.getUnsafe();
//获取对象中某个成员变量的偏移地址
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
//执行CAS操作
Teacher teacher = new Teacher();
boolean b = unsafe.compareAndSwapInt(teacher, idOffset, 0, 1);
boolean b1 = unsafe.compareAndSwapObject(teacher, nameOffset, null, "张三");
//打印结果
log.info(teacher.toString());
}
}
@Data
@NoArgsConstructor
class Teacher {
volatile int id;
volatile String name;
}
//unsafe对象封装
class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
2.3.模拟实现原子整数
1>.以转账为例:
public class TestCustomAtomicIntegerDemo1 {
public static void main(String[] args) {
Account1.demo(new MyAtomicInteger(200));
}
}
//自定义的原子整数类
class MyAtomicInteger implements Account1 {
private volatile int value;
private static final long valueOffset;
private static final Unsafe UNSAFE;
static {
UNSAFE = UnsafeAccessorUtil.getUnsafe();
try {
valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
public MyAtomicInteger(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
public void decrement(int amount) {
while (true) {
int prev = this.value;
int next = prev - amount;
if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {
break;
}
}
}
@Override
public Integer getBalance() {
return this.getValue();
}
@Override
public void withDraw(int amount) {
this.decrement(amount);
}
}
//unsafe工具类
class UnsafeAccessorUtil {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
//封装转账相关的方法
interface Account1 {
//获取余额
Integer getBalance();
//取款
void withDraw(int amount);
/**
* 方法内会启动20个线程,每个线程做(-10元)的操作
* 如果初始余额为200那么正确的结果应当是 0
*/
static void demo(Account1 account) {
List<Thread> ts = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 20; i++) {
ts.add(new Thread(() -> {
account.withDraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end - start) / 1000_000 + " ms");
}
}