共享模式之无锁
文章目录
- 共享模式之无锁
- 一、提出问题
- 二、CAS和volatile
- 三、原子整数
- 四、原子引用
- 五、原子数组
- 六、原子更新器
- 七、原子累加器
- 八、unsafe
一、提出问题
关于对共享变量修改的多线程问题其实就是指令交错问题导致取值的时机相同,最后修改之后以最后一个修改的线程为标准赋值给最新的变量
二、CAS和volatile
CAS
定义其实就是一个操作系统的指令。它是一个原子方法,能够保证比较和赋值同时完成
CAS的锁机制
其实就是无锁,通过不断的旧值和新值的比较如果成功那么就赋值和交换。所谓的旧值其实就是刚传入进来的时候的共享变量(赋值给局部变量定下来),然后在执行compareAndSet的时候对比局部变量和最新的共享变量。(其实就是这里共享变量可能会被其它线程先进行修改)如果对比不行那么就再次循环重试
public class TestAccount {
public static void main(String[] args) {
Account account = new AccountCas(10000);
Account.demo(account);
// Account account=new AccountUnsafe(10000);
// Account.demo(account);
}
}
class AccountCas implements Account {
private AtomicInteger balance;
// private Integer balance;
public AccountCas(int balance) {
// this.balance=balance;
this.balance = new AtomicInteger(balance);
}
@Override
public Integer getBalance() {
return balance.get();
// synchronized (this){
// return balance;
// }
}
@Override
public void withdraw(Integer amount) {
// synchronized (this){
// this.balance-=amount;
// }
while(true) {
// 获取余额的最新值
int prev = balance.get();
// 要修改的余额
int next = prev - amount;
// 真正修改
if(balance.compareAndSet(prev, next)) {
break;
}
}
// balance.getAndAdd(-1 * amount);
}
}
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
synchronized (this) {
return this.balance;
}
}
@Override
public void withdraw(Integer amount) {
synchronized (this) {
this.balance -= amount;
}
}
}
interface Account {
// 获取余额
Integer getBalance();
// 取款
void withdraw(Integer amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end-start)/1000_000 + " ms");
}
}
CAS-volatile
CAS依赖volatile原因就是要通过volatile保证取到的值是最新的值。防止对比出现问题
为什么无锁的效率更高
无锁不需要线程上下文切换,但是synchronize需要上下文切换消耗资源
无锁的情况需要额外cpu运行,cpu就像是跑道,如果没有跑道,线程这样的赛车是无法继续运行。需要上下文切换,本质就是在循环等待对比的旧值和新值,一旦成功那么就立刻修改
CAS的特点
无锁并发,无阻塞
无阻塞就是CAS效率比synchronize更高的原因
线程数少的时候使用,防止对比频率太高导致慢
多核cpu,为了while循环继续运行
三、原子整数
AtomicInteger
api
public class MyTest30 {
public static void main(String[] args) {
AtomicInteger i=new AtomicInteger(0);
System.out.println(i.getAndAdd(1));//i++
System.out.println(i.addAndGet(1));//++i
System.out.println(i.get());
System.out.println(i.getAndAdd(5));
System.out.println(i.addAndGet(5));
}
}
模仿updateAndGet
本质其实就是compareAndSet,也就是乐观锁来保证并发安全,然后加上接口来实现乘法
public class MyTest30 {
public static void main(String[] args) {
//乘法和编程式函数
AtomicInteger i=new AtomicInteger(12);
System.out.println(i.updateAndGet(x -> x * 10));
//模仿updateAndGet
int i1 = updateAndGet(i, x -> x / 10);
System.out.println(i1);
}
public static int updateAndGet(AtomicInteger x, IntUnaryOperator operator){
int pre = x.get();//以前的值,用于对比新值
int next=operator.applyAsInt(pre);//接口实现,乘法得到结果
//把x设置为next
while(true){
if(x.compareAndSet(pre,next)){
break;
}
}
return x.get();
}
}
updateAndGet源码
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
四、原子引用
AtomicReference
用于保护其它类型的对象。比如decimal,或者其它类等。
class DecimalAccountCas implements DecimalAccount {
private AtomicReference<BigDecimal> balance;
public DecimalAccountCas(BigDecimal balance) {
// this.balance = balance;
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
//
while(true){
BigDecimal pre=balance.get();
BigDecimal next=pre.subtract(amount);
if(balance.compareAndSet(pre,next)){
break;
}
}
}
}
ABA问题
线程2先启动,到对比变量的时候切换线程,如果线程1修改变量A->B之后又把它修改为B->A,轮到线程2修改的时候是无法发现这个变量被改变了
@Slf4j(topic = "c.Test36")
public class Test36 {
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
sleep(1);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(0.5);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
}
解决方案
AtomicStampedReference
可以通过AtomicStampedReference来进行处理,实际上就是加上了判断版本号,也就是每次修改不仅仅要对比旧值和新值,还需要对比修改的版本号。每次引用被修改版本号就会被改变。
@Slf4j(topic = "c.test36")
public class MyTest36 {
static AtomicStampedReference<String> ref=new AtomicStampedReference<>("A",0);
public static void main(String[] args) {
String prev = ref.getReference();
int stamp = ref.getStamp();
log.debug("版本号{}",stamp);
other();
Sleeper.sleep(1);
log.debug("A->B{}",ref.compareAndSet(prev,"B",stamp,stamp+1));
}
public static void other(){
new Thread(()->{
log.debug("stamp:{}",ref.getStamp());
log.debug("A->B {}",ref.compareAndSet(ref.getReference(),"B",ref.getStamp(),ref.getStamp()+1));
},"t1").start();
Sleeper.sleep(0.5);
new Thread(()->{
log.debug("stamp:{}",ref.getStamp());
log.debug("B->A {}",ref.compareAndSet(ref.getReference(),"A",ref.getStamp(),ref.getStamp()+1));
},"t2").start();
}
}
AtomicMarkableReference
这个相当于就是把版本号修改成了boolean,如果发生了修改那么boolean也会发生修改,因为你只需要知道到底有没有修改。这里的mark标记垃圾袋满了就是true,如果发生修改为空那么就是true。两个线程,如果保洁阿姨已经把垃圾袋改为空,那么主线程就不需要把垃圾袋的状态进行修改。主要就是标记垃圾袋的状态。而且修改内容的时候也能够感知到。
Slf4j(topic = "c.Test38")
public class Test38 {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("start...");
bag.setDesc("空垃圾袋");
ref.compareAndSet(bag, bag, true, false);
log.debug(bag.toString());
},"保洁阿姨").start();
sleep(1);
log.debug("想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
五、原子数组
AtomicIntegerArray
保证线程的安全性,能够保证每次自增的时候都是只有一个线程在处理。相当于就是给数组的每个位置都加上cas操作,每次操作的时候都需要进行CAS。
public class Test39 {
public static void main(String[] args) {
demo(
()->new int[10],
(array)->array.length,
(array,index)->array[index]++,
array-> System.out.println(Arrays.toString(array))
);
demo(
()->new AtomicIntegerArray(10),
(array)->array.length(),
(array,index)->array.getAndIncrement(index),
array-> System.out.println(array)
);
}
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,回传 array, index
参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer ) {
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 每个线程对数组作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j%length);
}
}));
}
ts.forEach(t -> t.start()); // 启动所有线程
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有线程结束
printConsumer.accept(array);
}
}
六、原子更新器
AtomicReferenceUpdater
主要就是处理对象里面的变量的原子性,本质还是CAS进行的处理
@Slf4j(topic = "c.Test40")
public class Test40 {
public static void main(String[] args) {
Student student = new Student();
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
System.out.println(updater.compareAndSet(student, null, "张三"));
System.out.println(student);
}
}
class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
七、原子累加器
LongAdder
原子累加器处理速度更快的原因就是使用了多个cell,相当于就是i要进行多线程的控制自增,然后分开两部分来相加CAS,然后最后汇总起来。
public class Test41 {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(
() -> new AtomicLong(0),
(adder) -> adder.getAndIncrement()
);
}
System.out.println();
for (int i = 0; i < 5; i++) {
demo(
() -> new LongAdder(),
adder -> adder.increment()
);
}
}
/*
() -> 结果 提供累加器对象
(参数) -> 执行累加操作
*/
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
long start = System.nanoTime();
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start) / 1000_000);
}
}
CAS实现锁的原理
实际上就是lock的时候改变state变量为1,那么其它线程进来的时候发现state不是0那么就进入到while。直到state被解锁为0,那么其它线程就能够再次进入。模仿加锁和解锁,只不过阻塞是改变成while处理。
缓存伪共享
其实就是CPU的缓存都是以缓存行进行的存储,cpu1和cpu2读取了内存块1和2进入自己的缓存行,导致的问题就是一方的修改导致对方的缓存失效,那么就要去修改内存再通知其它缓存块。这里就会造成缓存失效的问题。解决办法就是通过Contended注解,把内存块分成两行相当于就是增加padding空块,然后让Cell数据存到内存块的下一行,让cpu读取的时候存入不同的缓存行,那么就不会出现在修改的时候还需要去修改另一个cpu的缓存
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
LongAdder源码
add部分
其实就是在base的CAS累加失败的时候(其实就是因为有线程在竞争),那么会创建cells通过longAccumulate,然后就是重新进行判断。如果cells不为空,那么就要看看当前线程的cell是否创建,如果没有创建那么就longAccumulate创建,如果创建那么就通过cell来完成累加的机制。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {//判断是否创建cells和判断是否能够通过base无竞争直接完成累加
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||//判断cells是不是空的
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))//如果不是空的那么就执行cell的CAS操作
longAccumulate(x, null, uncontended);//如果是空的那么创建sell
}
}
longAccumulate
这个地方有三个判断,第一个判断是当这个cells不为空的时候。第二个判断是casBasy是0的时候,而且cells没有被修改,把casBusy改为1,相当于就是上锁。最后一个判断就是执行给base进行cas的累加操作,如果失败那么就返回循环。
这里主要讲第二个判断之后的逻辑,cells不存在,cell也不存在
创建Cells数组,并且创建累加x的cell
然后给cells赋值为rs也就是刚才创建的Cells数组
并且给casBusy进行赋值为0相当于就是解锁,可以让其它线程进来。
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//如果未加锁、而且cells没有被其他线程创建或者是修改、那么就给casBusy赋值1加锁
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];//创建cells数组
rs[h & 1] = new Cell(x);//给x创建空间
cells = rs;//赋值
init = true;//结束循环
}
} finally {
cellsBusy = 0;//解锁
}
if (init)
break;
}
第一段的逻辑,主要是cells在,但是cell没有存在也就是没有了创建累加值的cell
如果发现没有创建槽位cell,那么就创建并且赋值累加位x给它
判断是否加锁,没有上锁,那么就自己加上锁并且进入修改。但是问题是这个地方可能会在进入在之前槽位被其它线程修改,因为第一个判断if((a=as[(n-1)&h])==null)的时候可能同时进来多个线程,那么在锁上之后仍然需要判断槽位是不是被占坑了。如果没有那么就创建,并且赋值create为true。否则就重新进入循环
if ((as = cells) != null && (n = as.length) > 0) {//如果cells不为空
if ((a = as[(n - 1) & h]) == null) {//判断槽位是不是空
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // 创建新的累加值x的槽位
if (cellsBusy == 0 && casCellsBusy()) {//判断是否没有加锁,加锁后进入
boolean created = false;//判断是否创建成功
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//判断槽位是否被修改
rs[j] = r;//赋值
created = true;//创建成功
}
} finally {
cellsBusy = 0;//解锁
}
if (created)//重新进入循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // 为了跳过扩容操作
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { //如果cells没有修改进行扩容操作
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
第三个阶段的逻辑,还是上面的代码,主要是处理else if(a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))失败的情况,这里实际上就是对cell做了一次cas的自增,但是由于线程竞争导致失败
然后就是累加失败之后判断是不是超过CPU上线,NCPU,如果是那么就把collide设置为false,然后就可以跳过下面的else if(cellsBusy==0&&casCellsBusy())扩容操作,接着就是h=advanceProbe(h);这个就是为了换一个cell进行累加,因为不论是哪个cell都可以最后汇总到一起
如果累加成功那么就直接结束了
如果不是cpu的问题,那么就去扩容,扩容之后还失败那么就换一个cell进行累加。如果太多线程的情况下,可能多个线程围绕着cell来进行处理。
for (;😉 {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {//如果cells不为空
if ((a = as[(n - 1) & h]) == null) {//判断槽位是不是空
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // 创建新的累加值x的槽位
if (cellsBusy == 0 && casCellsBusy()) {//判断是否没有加锁,加锁后进入
boolean created = false;//判断是否创建成功
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {//判断槽位是否被修改
rs[j] = r;//赋值
created = true;//创建成功
}
} finally {
cellsBusy = 0;//解锁
}
if (created)//重新进入循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {//如果未加锁、而且cells没有被其他线程创建或者是修改、那么就给casBusy赋值1加锁
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];//创建cells数组
rs[h & 1] = new Cell(x);//给x创建空间
cells = rs;//赋值
init = true;//结束循环
}
} finally {
cellsBusy = 0;//解锁
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
最后的操作就是累加的操作,把之前累加的x全部加到目标base上面
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
八、unsafe
作用
主要是用来处理底层的os和多线程的操作
使用案例
它只能通过反射来获取私有对象,并且需要对象的属性偏移值才能够使线程安全地修改变量
public class MyTestUnsafe {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
Field theUnsafe = Unsafe.class.getDeclaredField(“theUnsafe”);
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
System.out.println(unsafe);
Teacher1 teacher1 = new Teacher1();
long id = unsafe.objectFieldOffset(Teacher1.class.getDeclaredField("id"));
long name = unsafe.objectFieldOffset(Teacher1.class.getDeclaredField("name"));
unsafe.compareAndSwapObject(teacher1,id,null,1);
unsafe.compareAndSwapObject(teacher1,name,null,"好人");
System.out.println(teacher1);
}
}
@Data
class Teacher1{
Integer id;
String name;
}
自己写一个通过unsafe处理的AtomicInteger,其实大部分操作就是unsafe+cas的while机制
public class MyUnsafeAccessor {
public static void main(String[] args) {
Account.demo(new MyAtomicInteger1(10000));
}
}
class MyAtomicInteger1 implements Account{
private volatile int value;
private static final long valueOffset;
private static final Unsafe unsafe;
static {
unsafe= UnsafeAccessor.getUnsafe();
try {
valueOffset=unsafe.objectFieldOffset(MyAtomicInteger1.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
public int getValue() {
return value;
}
public MyAtomicInteger1(int value) {
this.value = value;
}
public void decrease(int amount){
while (true){
int prev=this.value;
int next=prev-amount;
if( unsafe.compareAndSwapInt(this,valueOffset,prev,next)){
//修改成功
break;
}
}
}
@Override
public Integer getBalance() {
return getValue();
}
@Override
public void withdraw(Integer amount) {
decrease(amount);
}
}