并发编程的故事——JUC

news2025/1/10 17:16:40

JUC

文章目录

  • JUC
  • 一、Semaphore
  • 二、CountDownLatch
  • 三、线程安全类


一、Semaphore

为什么需要用到Semaphore?
限流
Semaphore的场景?
秒杀商品的时候,不能够让那些没有秒杀成功的线程进入,只有占了坑位的才可以使用,这里可以用redis来记录这个Semaphore
Semaphore的原理?
AQS+state进行分析
定义
信号量,相当于就是停车位限制流量。
acquire:拿到位置
release:释放位置

@Slf4j(topic = "c.test")
public class MyTestSemaphore {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for(int i=0;i<10;i++){

            new Thread(()->{
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("running");
                Sleeper.sleep(1);

                log.debug("end");
                semaphore.release();
            }).start();



        }
    }
}

原理
其实都是AQS的原理

acquire
acquire调用了sync(nonfairSync)的acquireSharedInterruptibly(1),尝试上锁tryAcquireShared(arg)
调用nonfairTryAcquireShared(int acquires),每次都是-1,直到信号量小于1的时候(相当于就是AQS的state==0的时候)直接返回这个remaining
如果得到返回值是<0,那么就把对应线程送进阻塞队列。基本和lock操作一样

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

    //尝试获取锁
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

//非公平获取锁
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;//直接就是-1
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

在这里插入图片描述
release
release->调用releaseShare(包括了tryReleaseShared和doReleaseShared)
处理逻辑相似,但是tryReleaseShared是把state+1而不是-1,原因信号量的state的意思是有多少坑位可以使用,也就是只要有坑位,那么线程就能获取锁。

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;//坑位+1《释放了一个
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

在这里插入图片描述
总结:acquire调用acquireSharedInterruptibly(可被中断)。然后就是模版tryAcquireShare和doAcquireSharedInterruptibly(和doAcquire差不多的逻辑,但是多了一个唤醒共享setHeadAndProgation)锁的方法(读锁)。

二、CountDownLatch

为什么需要用到CountDownLatch?
应用场景:一个线程需要等待多个线程结果的时候。或者需要等待其它线程运行完之后

定义
他就是一个倒计时锁,await之后需要等待countDown到0的时候才会解锁。

@Slf4j(topic = "c.test")
public class TestCountDownLock {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        new Thread(()->{
            log.debug("begin1");
            Sleeper.sleep(1);
            log.debug("end2");
            countDownLatch.countDown();
        },"t1").start();

        new Thread(()->{
            log.debug("begin2");
            Sleeper.sleep(1);
            log.debug("end2");

            countDownLatch.countDown();
        },"t2").start();

        new Thread(()->{

            log.debug("begin3");
            Sleeper.sleep(2);
            log.debug("end3");
            countDownLatch.countDown();

        },"t3").start();

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.debug("结束");
    }


}

join同样可以完成功能,但是万一线程阻塞了,那么就会导致最后的join一直处于等待,需要进行特殊的处理。但是CountDownLatch能够进行倒计时,只要倒计时结束,那么就会结束主线程的阻塞
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(4);
CountDownLatch countDownLatch=new CountDownLatch(3);
service.submit(()->{
log.debug(“begin1”);
Sleeper.sleep(1);
log.debug(“end1”);
countDownLatch.countDown();
},“t1”);

    service.submit(()->{
        log.debug("begin2");
        Sleeper.sleep(1);
        log.debug("end2");
        countDownLatch.countDown();
    },"t2");
    service.submit(()->{
        log.debug("begin3");
        Sleeper.sleep(1);
        log.debug("end3");
        countDownLatch.countDown();
    },"t3");
    service.submit(()->{
        log.debug("await");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("解锁");
    },"t4");
}

游戏案例
为什么加载的时候需要使用到countDownLock?
原因就是多方面,多文件需要加载,需要全部文件和配置初始化之后才能够进行开始,所以可以使用CountDown来记录最终需要等待的文件以及线程数。在谷粒商城的获取商品信息、快递信息的时候都会用到这种方式来提高访问的速度,并发执行,并且通过countDown来记录要执行完任务的个数才能够继续往下面执行。也可以使用join或者是future的getAll来进行阻塞。public static void test6(){
String[] a=new String[10];
Random random = new Random();
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch=new CountDownLatch(10);

    for(int j=0;j<10;j++){
        int k=j;
        service.submit(()->{
            for(int i=0;i<=100;i++){
                try {
                    Thread.sleep(random.nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                a[k]=i+"%";
                System.out.print("\r"+Arrays.toString(a));
            }
            countDownLatch.countDown();
        },"t"+j);
    }

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("游戏开始");

}

商品问题如果并发执行完且获取结果再继续执行?
还有需要future的get处理。没有结果通常使用countdownlatch

那么countDownLatch有什么问题?
问题就是他不能重置countdown的数量,也就是多次循环的话每次都要new一个,而不能重用对象。
解决方法就是CyclicBarrier,能够重用,而且可以执行最终的方法。
线程数有什么要求?
必须和循环任务数相同,不然就会多个任务被线程开启。假设3线程,3次循环,两个任务,那么就会取出第一次,第二次任务执行,还会取出循环的下一次任务执行。因为线程多。
@Slf4j(topic = “c.test”)
public class TestCycleBarrier1 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
log.debug(“结束”);
});
ExecutorService service = Executors.newFixedThreadPool(2);
for(int i=0;i<3;i++){
service.submit(()->{
log.debug(“开始…”);
Sleeper.sleep(1);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},“t1”);

        service.submit(()->{
            log.debug("开始....");
            Sleeper.sleep(2);
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        },"t1");
    }
}

}

三、线程安全类

分类
有什么分类?
blocking:阻塞+锁
copyAndWrite:拷贝方式保证安全,但是代价太大
Concurrent:并发CAS来进行修改,fast-fail一旦修改立刻停止,fast-still修改仍然遍历,有弱一致性(数据是脏的)。
HashTable和Vector是以前的安全类,方法上加锁,还有一个修饰synchronizedMap这个是把map放进去之后加锁,调用的仍然是map的方法

ConcurrentMap就能保证原子性了吗?
很明显是不能的,原因是map内部安全的源码实际上就是拿出了map的一行出来锁住,但是问题就是仅仅只是一个方法的时候是可以保证线程安全,两个的话那么就是完全不行。比如两个线程computeIfAbsent同时computeIfAbsent,都是获取到同一个值,最后put以最后修改的为准。
那么怎么把getvalue修改value变成一个原子操作?
可以通过computeIfAbsent,相当于就是直接取出那一行锁住一行,并且如果没有这个key就会新创建一个,然后再通过累加器LongAdder来完成原子累加。那么为什么不能put?很简单同一个map两个方法交错就会出现线程安全的问题。但是LongAdder和map已经分开,那么就算交错了,也需要获取到锁的时候才能够对value进行修改。而且LongAdder是get和put基本上是一个原子操作CAS

(map, words) -> {
                    for (String word : words) {

                        // 如果缺少一个 key,则计算生成一个 value , 然后将  key value 放入 map
                        //                  a      0
                        LongAdder value = map.computeIfAbsent(word, (key) -> new LongAdder());
                        // 执行累加
                        value.increment(); // 2

                        /*// 检查 key 有没有
                        Integer counter = map.get(word);
                        int newValue = counter == null ? 1 : counter + 1;
                        // 没有 则 put
                        map.put(word, newValue);*/
                    }
                }


//computeIfAbsent部分
Node<K,V> r = new ReservationNode<K,V>();//取出一行
                synchronized (r) {//锁上
                    if (casTabAt(tab, i, null, r)) {
                        binCount = 1;
                        Node<K,V> node = null;
                        try {
                            if ((val = mappingFunction.apply(key)) != null)//null就创建
                                node = new Node<K,V>(h, key, val, null);
                        } finally {
                            setTabAt(tab, i, node);
                        }
                    }
                }

Hashmap并发死锁
为什么会造成并发死锁?
(只有jdk7下面才会出现,为什么?)
因为每次数据数量达到容量的3/4的时候,那么这个时候出现拉链情况的次数就会越来越多,导致比较次数增加,查找速度减慢,就会进行扩容,分散数据到各个位置。桶下标会重新进行计算。
案例的意思其实就是扩容的时候16去了别的地方,而且1和35的位置发生了变化。导致第二个同时进来扩容的线程无法正确转移。
第一个线程和第二个线程都是,现在第一个线程执行扩容,e是1->35->16->null,e.next是35->16->null。扩容之后16走了,而1先进新的数组,35后进,因为e是一个引用,e一直指向的都是1节点,但是节点1变成了1->null,而不是1->35->16。所以e现在指向的是1->null,e.next很自然就是35->1->null,切换到线程2就会死链,为什么会这样?
原因就是把1送到新链之后,接着下一个e赋值35,35节点后面的next还是1,相当于就是1和35反复交换位置。
在这里插入图片描述
JDK8版本ConcurrentHashMap重要源码
属性分析
sizeCtl下一次扩容的阈值,初始是-1,扩容为-(1+线程数)
Node:链表节点
TreeNode:红黑树节点
TreeBin:红黑树节点头,防止hash攻击
ForwardingNode:表示节点已经被迁移到新的数组
table:数组
nextTable:迁移的新数组

重要方法
tabAt:获取某个节点
casTabAt:CAS方式修改节点的值
setTableAt:直接修改
在这里插入图片描述
构造方法
懒惰式初始化,就算是计算了size,但是不用到那么也还是不会进行对数组的初始化
而且size的计算是initialCapacity(初始化容量)/loadFactor(负载因子)。但是最后还是会通过tableSizeFor(size)来把size改成2^n。并且不能超过最大的容量
容量必须>=并发度
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) //容量必须大于等于并发度 ,不然就需要修改
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);//计算实际容量大小,因为初始化容量必须只能占3/4或者是loadFactor
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);//计算最后的容量
this.sizeCtl = cap;
}

get
首先就是重新计算传过来的key的hash为正整数(因为要在数组上遍历)spread
接着就是如果table不是空,那么就取出对应h与n-1相与下标的节点(保证不超过n-1,二进制分析),接着对比节点hash是不是等于key计算出来的那个,如果是那么就再次比较key是不是和节点相等,如果是那么就返回val。(获取h(hash),取出节点,对比节点的hash(数组同一个位置的h相同,因为计算的h就是数组的下标,也叫做数组的hash),然后对比key是不是相同)
如果key也不相同那么查看是不是ForwardingNode(不能访问,只能去新数组nextTable中访问),或者是Treebin
最后如果都不是那么说明节点可能在链表中,再次遍历链表
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//key的hash取正整数(数组下标)
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//取出节点
if ((eh = e.hash) == h) {//判断取出节点的hash是不是和h相同
if ((ek = e.key) == key || (ek != null && key.equals(ek)))//对比key成功返回
return e.val;
}
else if (eh < 0)//如果是负数,可能就是fnode或者是treebin
return (p = e.find(h, key)) != null ? p.val : null;//去到对应位置查找
while ((e = e.next) != null) {//如果都不是那么就遍历链表看看在不在下面
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put
onlyIfAbsent是表示要不要覆盖写新值,false就是要new一个新的
ConcurrentHashMap的key和value都不能为空
计算数组hash值,然后看看table存不存在,如果不存在那么就调用initTable。如果存在那么就取出数组的hash位置节点
取出的第一个节点是头节点,如果是空的,那么就new一个,并且赋值
如果头节点不是null,且节点是一个正在迁移的节点,那么线程就会调用helpTransfer来帮助正在迁移的线程迁移点
如果头节点不是空,那么就上锁(锁就是头节点本身),然后再次取出头节点,看看头节点有没有被其它线程修改
然后就是for循环遍历链表对比key和hash,如果有那么就CAS更新节点,如果没有那么就new一个节点放到链表尾部
如果头节点是红黑树节点,那么就去查询红黑树
如果节点数量大于8那么就构建红黑树
最后就是addCount与容量相关
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();//key和value不能为空
    int hash = spread(key.hashCode());//计算正整数hash(数组下标)
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {//进入无限循环
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)//如果是null那么就初始化table
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//取出头结点,如果是null
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))//CAS创建头结点
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)//如果是forwardingNode,那么线程帮助其他线程去转移节点,帮助扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {//锁上当前行
                if (tabAt(tab, i) == f) {//再次取出数组的节点看看有没有被改变。
                    if (fh >= 0) {//如果fh是正数说明是链表里面的
                        binCount = 1;//记录节点个数
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {//如果key和hash相同那么赋值新值。
                                oldVal = e.val;
                                if (!onlyIfAbsent)//如果是onlyIfAbsent=false直接覆盖旧值
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {//如果找不到那么就接到后面去。
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {//如果是红黑树节点,那么就遍历红黑树
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {//如果binCount>=8那么就进行树化 
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

initTable初始化table
从第二个判断开始说,先把sizeCtl修改为-1,告诉其它线程正在创建表
再次判断表是不是空的,如果是空的,那么就创建表,并且设置好下次扩容的阈值交给sizeCtl
第一次判断的意思是如果还有其它线程想要进来,那么发现乐观锁(U.compareAndSwapInt(this, SIZECTL, sc, -1)已经被使用了,那么就进入下一次循环,发现sizeCtl是-1也就是有线程正在创建表,那么就yield放弃cpu使用权相当于就进入阻塞,下一次苏醒进入创建的时候发现这个时候table已经不是null了,再次回到循环,然后回到while循环的时候结束。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//防止多个线程进来创建表。所以在第一个线程创建之后,其它线程都会判断table是不是空,如果不是那么就去到while结束循环
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//如果sc旧值不是-1那么就直接使用原来的sizeCtl,如果是-1那么就使用默认的容量
@SuppressWarnings(“unchecked”)
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//超过阈值扩容的阈值
}
} finally {
sizeCtl = sc;//阈值赋值给sizeCtl
}
break;
}
}
return tab;
}

addCount
作用是什么?
就是给hashMap的baseCount+1。并且检查是否需要扩容

为什么会有竞争?
因为put方法里面是以节点为单位的上锁,也就是每次锁住一行提高锁的粒度,并且增加并发度。

首先判断就是cells是否存在,cell是否存在,其中一个不存在都要调用fullAddCount来创建
如果存在,且发现有竞争,那么直接修改cellValue相当于就是通过cells数组来组合计算最后的cell值。因为这个时候可能会有很多线程同时想要修改对应的baseCount,那么就可以先计算到一个cell里最后再汇总。
如果没有竞争就可以直接修改baseCount的值
接着就是s=sumCount()计算节点有多少个,如果发现节点大于阈值那么就会进行扩容操作,并且把sizeCtl设置为负数,然后transger来创建新数组扩容。其它线程进入发现sizeCtl是负数那么就帮助扩容。
拓展
compareAndSwapLong(a,b,c,d)
a:对象
b:对象的偏移值,也就是属性的位置,比如Long里面的long x属性的位置
c:旧值
d:想要赋予的值
基本上可以在c中看出来偏移的位置的属性,比如U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)中第三个参数就是baseCount,那么我们修改的就是baseCount的值
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||//发现有竞争那么就要通过Cells来计算baseCount
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//没有竞争直接加给当前对象的baseCount的偏移值,也就是对象的size的位置
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||//如果发现cells是空的那么就要创建
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||//如果发现节点是空那么就创建cell
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {//如果都有那么就直接赋值给cellvalue这里的cellvalue是a里面的偏移值,因为a是个Long类。相当于就是其中一个cell
fullAddCount(x, uncontended);//创建cells和cell
return;
}
if (check <= 1)
return;
s = sumCount();//计算节点个数
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&//如果大于阈值那么就要进行扩容
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {//如果线程发现其它线程正在扩容那么就帮助扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,//设置sizeCtl为负数也就是扩容或者创建状态
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);//进行扩容转移。
s = sumCount();
}
}
}

size
这里size调用的是sumCount
而且sumCount调用的是baseCount,然后进行累加的操作
但它并不是一个精确的值,在多线程下可能有线程减,有线程增加节点。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

transfer
作用?
扩容转移节点
首先就是创建扩容数组*2
然后就是遍历数组,如果发现tab节点是null说明处理完,那么就设置forwardingNode也就是迁移节点标签
如果本来就是forwardingNode那么就跳过下面的锁定头节点f进行迁移
锁定

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings(“unchecked”)
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容数组
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;😉 {//遍历数组,进行处理。
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (–i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)//如果头结点处理完,那么就把原来的节点设置为forWardingNode
advance = casTabAt(tab, i, null, fwd);//fwd就是forWardingNode
else if ((fh = f.hash) == MOVED)//如果是forwarding那么就跳过
advance = true; // already processed
else {
synchronized (f) {//锁定首节点进行转移处理
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {//如果是普通节点那么就扩容
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//红黑树节点处理转移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

JDK7版本

初始化

第一个就是Segment存储着HashEntry也就是一个键值对,而且每次都会先初始化Segment。占用很大的内存。
然后就是segmentShift和segmentMask的作用是什么?
其实就是shift用来记录要hash右移的位数,mask就像树子网掩码那样与hash右移之后的高位进行相与,得到的那个值就是数组segment上面的下标
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;//hash高位保留多少位
int ssize = 1;//数组大小
while (ssize < concurrencyLevel) {//初始化大小是16
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;//hash右移多少位
this.segmentMask = ssize - 1;//对应多少个hash的高位
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

put
和JDK8有什么不同?
这里加锁使用的是segment,因为segment就是ReentrantLock。而且锁里面就是小哈希表,可以通过hash在数组中找到节点。问题是segment和hashEntry使用同一个Entry?很明显不是,segment使用的是hash右移segmentShift位然后与segmentMask进行相与得到的位置,但是小hash表使用的是直接hash。
调用的是segment内部的put操作放进小hash
然后segment调用tryLock尝试锁住并且修改小hash。并且只会尝试64次,失败就进行阻塞,期间会创建新节点
lock成功之后就获取table和对应的头节点,并且对比key和hash看看是不是对应的节点,如果是那么就设置新值
如果不是那么就继续循环下去访问下一个节点,并且对比设置
如果访问到最后发现没有,那么就看看tryLock期间有没有创建节点,如果有那么node.next=first相当于就是链接上链表头部,没有就自己创建连接到头部
然后检查节点个数是不是大于阈值如果是那么就进行扩容rehash
最后就是解锁
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;//求segment的位置
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);//调用segment的put方法设置key和value
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null 😕/尝试加锁
scanAndLockForPut(key, hash, value);//失败还可以尝试64次并且创建要加入的节点
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;//求小hash表对应的下标
HashEntry<K,V> first = entryAt(tab, index);//取出该节点
for (HashEntry<K,V> e = first;😉 {//遍历链表
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {//对比hash和key
oldValue = e.value;//赋值新值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {//如果链表没有,那么就创建新节点加入
if (node != null)//在tryLock期间创建
node.setNext(first);//直接加入
else//没有创建那么就创建节点
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)//如果大于阈值那么就要扩容
rehash(node);
else
setEntryAt(tab, index, node);//修改头结点为新的节点
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//解锁
unlock();
}
return oldValue;
}

rehash
这里扩容不是直接搬运节点,先遍历数组
然后得到头节点,如果没有下一个节点那么就直接搬运
那么就要遍历下面的节点,然后找到那些hash已经发生变化的最后一个节点,然后把这个节点搬运到lastIdx的位置上。其它节点根据hash创建新节点搬运到其它位置,把链表拆散。那么为什么计算出来的k是不同的呢?原因是扩容之后二进制+1,那么相当于市2->4从10到11那么hash&size也就发生了改变。
private void rehash(HashEntry<K,V> node) {

        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        int newCapacity = oldCapacity << 1;//扩容操作
        threshold = (int)(newCapacity * loadFactor);//重新计算阈值
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
        int sizeMask = newCapacity - 1;
        for (int i = 0; i < oldCapacity ; i++) {//遍历旧table
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {//如果节点不是null,那么进行迁移
                HashEntry<K,V> next = e.next;
                int idx = e.hash & sizeMask ;
                if (next == null)   //如果节点的下一个节点是空的,那么可以直接搬运
                    newTable[idx] = e;
                else { // 如果有多个节点,就要判断哪些节点的hash值发生了改变
                    HashEntry<K,V> lastRun = e;
                    int lastIdx = idx;
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {//如果下标发生改变,那么就记录下来
                            lastIdx = k;//记录最后改变的那个节点的值
                            lastRun = last;//记录最后一个节点
                        }
                    }
                    newTable[lastIdx] = lastRun;//把最后改变的节点赋值给新表的同一个位置,说明剩下来的这些节点的hash没有发生改变
                    // Clone remaining nodes
                    //创建数组新的位置来保存不同的节点
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        int nodeIndex = node.hash & sizeMask; //扩容之后加入新的节点
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }

那么什么时候加入新节点?
扩容之后

get
和jdk8的异同?
都是没有加锁,但是遍历放肆不同。先找到锁,然后再找到锁的table,最后计算出hash找到table中的节点,取出并且遍历链表找到对应节点。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&//取出segment,使用unsafe方法保证可见性
(tab = s.table) != null) {//取出table
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);//取出节点并且进行遍历
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;//如果有对应的key那么就返回
}
}
return null;
}

size
和jdk8的不同?
jdk8使用的是cells方式来进行的累加保证弱一致性。jdk7使用的是多次循环,2次之内如果结果相同那么就返回,否则继续遍历。超过遍历三次之后加锁之后再进行元素大小的计算。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn’t retry
try {
for (;😉 {
//如果超过3次加锁
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//遍历segment获取count和modCount修改次数
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//如果sum等于前一个值那么就结束相当于两次结果相同
if (sum == last)
break;
last = sum;//记录当前sum
}
} finally {
//解锁
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

LinkedBlockingQueue

入队
直接就是last=last.next=node,意思就是连接新节点,然后last指向新节点

出队
先是h临时指向head,然后first=h.next指向下一个节点
然后就是h.next=h指向自己方便垃圾回收
然后head=first相当于重新设置头节点
然后head的item设置为空,因为head是一个占位点

好处在哪?
使用了两把锁一个占位节点
生产者一把,消费者一把保证两个不同操作的并发度高
而且只有两个节点的时候或者是大于两个节点的时候,putLock保护last节点,takeLock保护head节点,主要就是防止put和take这些操作的并发性问题
只有一个节点的时候take会被阻塞

阻塞队列会出现什么并发性问题?
如果队列只剩下一个位置的时候刚好两个线程通过判断,那么就会导致溢出或者是取不出的问题。

put
不能加入空节点
如果满了就阻塞
如果消费品小于容量的最大值的时候就唤醒一个生产者,如果还有空位可以生产,那么就唤醒生产者
如果是只有一个消费品的时候那么就是唤醒消费者线程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();

    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

take
当没有消费品的时候阻塞
当消费品大于1的时候继续唤醒消费者,也就是如果还有消费品那么就唤醒消费者
当消费品达到容量的时候唤醒生产者

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

总结:消费者叫醒自己的队列,生产者也是。每次只叫醒一个,防止竞争的消耗。而且只要生产满了,那么就会阻塞。用完了阻塞消费者。但是总有一边会有一个生产者或者是消费者

Linked和Array的比较
链表 数组
有界限 强制有界限
懒惰 初始化好了
每次需要new节点 需要提前初始化节点
2把锁 1把锁

dummy的好处?
可以让两把锁锁住不同对象防止竞争

CopyOnWriteArrayList
原理
其实就是每次写的时候都copy一份新的在新的上面改变,然后赋值给当前。

put的问题是什么?
如果这个时候有读线程get,但是切换到写线程把旧的给换成新的数据,那么get就会获取到错误的数据
get并不会加锁。但是写的时候就会加锁。而且写能和读一起进行。实现了弱一致性。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/972165.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Myvatis关联关系映射与表对象之间的关系

目录 一、关联关系映射 1.1 一对一 1.2 一对多 1.3 多对多 二、处理关联关系的方式 2.1 嵌套查询 2.2 嵌套结果 三、一对一关联映射 3.1 建表 ​编辑 3.2 配置文件 3.3 代码生成 3.4 编写测试 四、一对多关联映射 五、多对多关联映射 六、小结 一、关联关系映射 …

现货黄金代理好吗?

做黄金代理这个职业好吗&#xff1f;从目前的市场现状来看&#xff0c;其实做黄金代理很不错的。在股票市场中&#xff0c;投资者只能通过买涨进盈利&#xff0c;所以当市场行情不好的时候&#xff0c;股票经纪人的业务将很难展开&#xff0c;而现货黄金投资者不一样&#xff0…

mac 安装java1.8

1、下载jdk1.8 https://www.oracle.com/java/technologies/downloads/#java8-mac 2、 安装jdk1.8 一路默认&#xff0c;安装后的位置在这儿。 /Library/Java/JavaVirtualMachines/jdk-1.8.jdk 3、配置环境 打开终端&#xff0c;输入命令 sudo vim /etc/profile 添加以下配…

经管博士科研基础【19】齐次线性方程组

1. 线性方程组 2. 非线性方程组 非线性方程,就是因变量与自变量之间的关系不是线性的关系,这类方程很多,例如平方关系、对数关系、指数关系、三角函数关系等等。求解此类方程往往很难得到精确解,经常需要求近似解问题。相应的求近似解的方法也逐渐得到大家的重视。 3. 线…

Groovy 下载安装

Groovy 简介 在某种程度上&#xff0c;Groovy 可以被视为Java 的一种脚本化改良版,Groovy 也是运行在 JVM 上&#xff0c;它可以很好地与 Java 代码及其相关库进行交互操作。它是一种成熟的面向对象编程语言&#xff0c;既可以面向对象编程&#xff0c;又可以用作纯粹的脚本语…

Mac下安装Jmeter及其配置

一、安装JDK环境 安装方式&#xff1a;mac下配置JDK环境_只看不学的博客-CSDN博客 如果已安装JDK环境即可忽略该步骤&#xff0c;检查方式&#xff0c;在终端输入java -version,如果出现了java版本&#xff0c;即代表已经配置过JDK环境了&#xff0c;如下图所示&#xff1a; …

MyBatis关系映射

文章目录 前言一、一对一映射1.1 创建实体1.2 xml配置 二、一对多映射2.1 创建实体2.2 resultMap配置2.3 测试 三、 多对多映射3.1 创建实体3.2 resultMap配置3.3 测试 前言 MyBatis是一个Java持久化框架&#xff0c;它提供了一种将数据库表和Java对象之间进行关系映射的方式。…

安全基础 --- js的闭包和this属性

js闭包 简介 一个函数和对其周围状态&#xff08;lexical exviroment&#xff0c;词法环境&#xff09;的引用捆绑在一起&#xff08;或者说函数被引用包围&#xff09;&#xff0c;这样的组合就是闭包&#xff08;closure&#xff09; 在js中&#xff0c;通俗来讲&#xff0c…

计算机毕设 大数据商城人流数据分析与可视化 - python 大数据分析

文章目录 0 前言课题背景分析方法与过程初步分析&#xff1a;总体流程&#xff1a;1.数据探索分析2.数据预处理3.构建模型 总结 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&#xff0c;往往达不到…

【9月比赛合集】9场可报名的「创新应用」、「数据分析」和「程序设计」大奖赛,任君挑选!

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 创新应用赛&#xff08;2场比赛&#xff09;程序设计赛&#…

openGauss学习笔记-60 openGauss 数据库管理-逻辑存储结构

文章目录 openGauss学习笔记-60 openGauss 数据库管理-逻辑存储结构 openGauss学习笔记-60 openGauss 数据库管理-逻辑存储结构 openGauss的数据库节点负责存储数据&#xff0c;其存储介质也是磁盘&#xff0c;本节主要从逻辑视角介绍数据库节点都有哪些对象&#xff0c;以及这…

无锡布里渊——厘米级分布式光纤-锅炉安全监测解决方案

无锡布里渊——厘米级分布式光纤-锅炉安全监测解决方案 厘米级分布式光纤-锅炉安全监测解决方案 1、方案背景与产品简介&#xff1a; 1.1&#xff1a;背景简介&#xff1a; 锅炉作为一种把煤、石油或天燃气等化石燃料所储藏的化学能转换成水或水蒸气的热能的重要设备&#xff…

音频基础知识

文章目录 前言一、音频基本概念1、音频的基本概念①、声音的三要素②、音量与音调③、几个基本概念④、奈奎斯特采样定律 2、数字音频①、采样②、量化③、编码④、其他相关概念<1>、采样位数<2>、通道数<3>、音频帧<4>、比特率&#xff08;码率&#…

uniapp从零到一的学习商城实战

涵盖的功能&#xff1a; 安装开发工具HBuilder&#xff1a;HBuilderX-高效极客技巧 创建项目步骤&#xff1a; 1.右键-项目&#xff1a; 2.选择vue2和默认模板&#xff1a; 3.完整的项目目录&#xff1a; 微信开发者工具调试&#xff1a; 1.安装微信开发者工具 2.打开…

改变金融贷款市场营销方式 ---- 运营商大数据精准获客

与传统的企业网络营销相比&#xff0c;最常见的是网络推广和硬广告推广。一些企业无法找到可靠准确的数据来源&#xff0c;也无法找到一些未知的总数据。这些数据大多存在持续时间长、准确性差的缺点&#xff0c;企业在将这些数据信息应用于商品在线营销时往往会遇到不足。 在…

什么是RPA机器人?RPA机器人能做什么?RPA机器人的应用场景

什么是RPA机器人&#xff1f; RPA机器人是一种使用软件机器人来模拟和执行人类操作的技术。RPA代表Robotic Process Automation&#xff08;机器人流程自动化&#xff09;。它是一种自动化技术&#xff0c;可以使用预定规则和预定流程来执行重复性、繁琐或规定任务的工作。 RP…

进程的状态与组织

进程的5种状态分别为创建态、就绪态、运行态、阻塞态、终止态。在PCB中&#xff0c;会有一个state变量来记录进程此时所处的状态。 操作系统会根据各种进程的不同状态来组织管理进程&#xff0c;有链接组织方式和索引组织方式。

使用 Linux 相关知识部署博客系统

目录 ​编辑一、认识 Linux 二、如何拥有 Linux 环境 三、常见的 Linux 命令 1、目录相关命令 &#xff08;1&#xff09;ls &#xff08;2&#xff09;pwd &#xff08;3&#xff09;cd 2、文件操作相关命令 &#xff08;1&#xff09;touch &#xff08;2&#xf…

48、springboot 的国际化之让用户在程序界面上弄个下拉框,进行动态选择语言

上一篇是直接改浏览器的支持语言。 在浏览器上面直接改国际化语言 这次要实现的功能是直接在程序界面动态选择语言。 Locale 代表语言、国家。 ★ 在界面上动态改变语言 应用之所以能动态呈现不同的语言界面&#xff0c;其实关键在于如何确定客户端的Locale&#xff08;代…

springboot实战(五)之sql业务日志输出,重要

目录 环境&#xff1a; 一、mybatis-plus之sql分析日志输出 1.配置 2.验证 3.高级输出方式 二、业务日志输出到文件 1.添加log4j2依赖 2.排除logback依赖 3.新增log4j2的配置文件 4.添加配置 5.启动测试 6.给日志请求加个id 6.1、过滤器filter实现 6.2、测试 6.3、…