Java 并发编程之ConcurrentHashMap原理详解
文章目录
- Java 并发编程之ConcurrentHashMap原理详解
- 原理剖析
- 源码剖析
- 一、构造方法分析
- 二、初始化
- 三、put()实现分析
- 四、扩容
原理剖析
HashMap通常的实现方式是“数组+链表”,这种方式被称为“拉链法”。 ConcurrentHashMap在这个基本原理之上进行了各种优化。
⾸先是所有数据都放在⼀个⼤的HashMap中,其次是引入了红黑树。
其原理如下图所示:
如果头节点是Node类型,则尾随它的就是⼀个普通的链表。如果头节点是TreeNode类型,它的后面就是⼀颗红黑树, TreeNode是Node的子类。
链表和红黑树之间可以相互转换:初始的时候是链表,当链表中的元素超过某个阈值时,把链表转换成红黑树。反之,当红黑树中的元素个数小于某个阈值时,再转换为链表。
那为什么要做这种设计呢?
- 使⽤红黑树,当⼀个槽里有很多元素时,其查询和更新速度会比链表快很多, Hash冲突的问题由此得到较好的解决。
- 加锁的粒度,并非整个ConcurrentHashMap,而是对每个头节点分别加锁,即并发度,就是Node数组的⻓度,初始长度为16。
- 并发扩容,这是难度最大的。当⼀个线程要扩容Node数组的时候,其他线程还要读写,因此处理过程很复杂,后面会详细分析。
总结
由上述对比可以总结出来:这种设计⼀方面降低了Hash冲突,另⼀方面也提升了并发度。
下面从构造方法开始,一步步深入分析其实现过程
源码剖析
一、构造方法分析
/**
* Creates a new, empty map with an initial table size based on
* the given number of elements ({@code initialCapacity}), table
* density ({@code loadFactor}), and number of concurrently
* updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements,
* given the specified load factor.
* @param loadFactor the load factor (table density) for
* establishing the initial table size
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation may use this value as
* a sizing hint.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
// 如果设置的初始化容量小于预估的并发更新线程数,则初始容量设为该预估的并发线程数
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 表初始化和调整大小控件
// 在表初始化方法中,用于CAS操作控制并发线程的初始化过程:如果为-1,则表正在初始化或调整大小
this.sizeCtl = cap;
}
在上面的代码中,变量cap就是Node数组的长度,保持为2的整数次方。 tableSizeFor()
方法是根据传入的初始容量,计算出⼀个合适的数组长度。具体而言: 1.5倍的初始容量+1,再往上取最接近的2的整数次方,作为数组长度cap的初始值。
这里的 sizeCtl,其含义是用于控制在初始化或者并发扩容时候的线程数,只不过其初始值设置成cap。
二、初始化
在上面的构造方法里只计算了数组的初始⼤小,并没有对数组进行初始化。当多个线程都往里面放入元素的时候,再进行初始化。这就存在⼀个问题:多个线程重复初始化。下面看⼀下是如何处理的。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 跟下面else中的cas操作对应
// 如果sizeCtl被修改为-1,说明此时有线程在初始化操作,因此当前线程自悬等待
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 利用CAS操作来解决多线程下的初始化操作
// CAS操作成功的线程会把sizeCtl的值置为-1,从而使其他线程yield等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// Node数组初始化,初始容量为n
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 初始化成功后,sizeCtl=n-(n>>>2)=0.75n
// 表示下一次扩容的阈值:n-n/4
sc = n - (n >>> 2);
}
} finally {
// sizeCtl赋新值,为上面计算出的下一次扩容的阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
通过上面的代码可以看到,多个线程的竞争是通过对sizeCtl进行CAS操作实现的。如果某个线程成功地把 sizeCtl设置为-1,它就拥有了初始化的权利,进⼊初始化的代码模块,等到初始化完成,再把sizeCtl设置回去。其他线程则⼀直执行while循环,自旋等待,直到数组不为null,即当初始化结束时,退出整个方法。
因为初始化的⼯作量很小,所以此处选择的策略是让其他线程⼀直等待,而没有帮助其初始化。
三、put()实现分析
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 分支1,整个数组初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 分支2:第i个元素初始化
// 这里调用tabAt方法获取到Node数组第i个位置下的值,如果为空,则初始化当前插入元素插入到数组第i个索引位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 利用CAS方法尝试插入,如果CAS操作成功,说明当前元素已经插入到第i个索引位置,则break退出。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 分支3:尝试扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 分支4:插入元素到链尾,或红黑树中
else {
V oldVal = null;
// 重点,利用synchronized关键字加锁。
// 同时,可以看到,锁的粒度是Node数组中节点的粒度,也就是只锁数组中每个Node节点,其他Node节点仍然可以访问
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 这里是更新的旧值的功能
// 在插入的过程中,如果发现插入元素与已有元素相同,则用新的value值替换旧的value值
// 这里的相同指 当且仅当两个元素的hashcode相同,且key值相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
// 将插入的新value替换已有元素的旧value
e.val = value;
break;
}
Node<K,V> pred = e;
// 这里e=e.next来遍历某一Node节点下的链表/红黑树
if ((e = e.next) == null) {
// 如果走到链尾,则将新元素插入到链尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 当fh<0时,表示当前节点下时红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 将元素插入红黑树中,同样的,如果树中已有当前元素,则返回对应TreeNode元素对象,否则,返回null
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
// 如果已有元素,则更新旧值
p.val = value;
}
}
}
}
// binCount保存了节点是链表时的元素个数
if (binCount != 0) {
// 如果链表的元素个数大于设置的阈值=8,则会转成红黑树的操作
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
// 如果put过程是更新操作,会返回旧值
return oldVal;
break;
}
}
}
// 容器中总元素个数+1(sizeCtl+1)
addCount(1L, binCount);
return null;
}
其中调用的几个方法为:
// 其中h是key的hashCode()得到的
// h ^ (h >>> 16)就是让高16位与低16位进行异或,目的是使得计算出来的哈希值更加散列。
// & HASH_BITS(二进制是31个1)就是为了让最终得到的哈希值始终是一个正数。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
关键步骤分析:
-
synchronized (f)
:通过源码分析知道,ConcurrentHashMap并发的同步锁是加在Node数组中每个索引位置的节点上的,意味着每个数组元素有一把锁,并发度等于数组的长度。 -
f = tabAt(tab, i = (n - 1) & hash
。这里为了获取插入元素在Node数组中的索引位置,不是用的hash%n
,而是采用了(n-1) & hash
的方式。上面在对原理进行剖析时分析过,hashmap类的容器大小都是2的整数倍,因此(n-1)相当于除了高位为0外,其余位都为1。由于n=2^m,hashcode / n
相当于右移m位,而根据二进制求余的方法可知,这右移的m位就是余数。那么问题就在于如何获取这m位。由于n是2的整数幂,则n-1用二进制表示的位数就等于m,比如:n=8=2^3, 则n-1=7=0111
、n=16=2^4,则n-1=15=0 1111
。可以发现1的位数即等于m的幂数。这里的原因就在n是2的整数幂,所以n的二进制表示的1的位置,在m+1的位置,因此用n-1=m就可以获取到该取余所需的位数。 -
put()
方法过程中,如果插入的元素是ConcurrentHashMap中已有的,则会更新现在的值。这里判断相同的方法是:当且仅当两个元素的hashcode相同 且 key相同。 -
binCount
字段保存了节点是链表时的元素个数。在插入的过程中,binCount字段会对链表的长度计数,如果链表的元素个数大于设置的阈值=8,则会转成红黑树的操作。
四、扩容
扩容的实现是最复杂的,下面从treeifyBin()的源码方法讲起
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 数组长度小于阈值64,不做红黑树转换,直接扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 链表转换为红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历链表,初始化红黑树
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
在上面的代码中, MIN_TREEIFY_CAPACITY=64,意味着当数组的长度没有超过64的时候,数组的每个节点里都是链表,只会扩容,不会转换成红⿊树。只有当数组长度大于或等于64时(static final int MIN_TREEIFY_CAPACITY = 64;
),才考虑把链表转换成红黑树。
上面方法在扩容时,调用了tryPresize
方法,来进行扩容,接下来看一下这个方法内部实现:
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
tryPresize
方法中最重要的一个方法就是transfer
方法,也是扩容的主要方法:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 计算步长
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 初始化新的HashMap
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 扩容两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 初始的transferIndex为旧的HashMap的数组长度
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 此处,i为遍历下标,bound为边界。
// 如果成功获取一个任务,则i=nextIndex-1
// bound = nextIndex-stride
// 如果获取不到,则i=0, bound=0
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// advance表示在从i=transferIndex-1遍历到bound位置的过程中,是否一直继续
while (advance) {
int nextIndex, nextBound;
// 以下是哪个分支中的advance都是false,表示如果三个分支都不执行,才可以一直while循环
// 目的在于当对transferIndex执行CAS操作不成功的时候,需要自旋,以期获取一个stride的迁移任务
if (--i >= bound || finishing)
// 对数组遍历,通过这里的--i进行。如果成功执行了--i,就不需要继续while循环了,因为advance只能进一步。
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// transferIndex <= 0,整个HashMap完成
i = -1;
advance = false;
}
// 对transferIndex执行CAS操作,即为当前线程分配1个stride
// CAS操作成功,线程成功获取到一个stride的迁移任务
// CAS操作不成功,线程没有抢到任务,会继续执行while循环,自旋。
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i越界,整个HashMap遍历完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing表示整个hashMap扩容完成
if (finishing) {
nextTable = null;
// 将nextTab赋值给当前table
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// tab[i]迁移完毕,赋值一个ForwardingNode
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// tab[i]的位置已经在迁移过程中
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对tab[i]进⾏迁移操作, tab[i]可能是⼀个链表或者红⿊树
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
// 表示lastRun之后的所有元素, hash值都是⼀样的
// 记录下这个最后的位置
lastRun = p;
}
}
if (runBit == 0) {
// 链表迁移的优化做法
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 红⿊树,迁移做法和链表类似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
该⽅法非常复杂,下面⼀步步分析:
- 扩容的基本原理如下图,⾸先建⼀个新的HashMap,其数组长度是旧数组长度的2倍,然后把旧的元素逐个迁移过来。所以,上面的方法参数有2个,第1个参数tab是扩容之前的HashMap,第2个参数nextTab是扩容之后的HashMap。当nextTab=null的时候,⽅法最初会对nextTab进行初始化。这里有⼀个关键点要说明:该⽅法会被多个线程调⽤,所以每个线程只是扩容旧的HashMap部分,这就涉及如何划分任务的问题
-
上图为多个线程并行扩容-任务划分示意图。旧数组的⻓度是N,每个线程扩容⼀段,⼀段的长度用变量stride(步长)来表示, transferIndex表示了整个数组扩容的进度。
stride的计算公式如上面的代码所示,即:在单核模式下直接等于n,因为在单核模式下没有办法多个线程并⾏扩容,只需要1个线程来扩容整个数组;在多核模式下为 (n>>> 3) /NCPU,并且保证步⻓的最⼩值是16。显然,需要的线程个数约为n/stride。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range
transferIndex是ConcurrentHashMap的⼀个成员变量,记录了扩容的进度。初始值为n,从大到小扩容,每次减stride个位置,最终减至n< =0,表示整个扩容完成。因此,从[0,transferIndex-1]的位置表示还没有分配到线程扩容的部分,从[transfexIndex, n-1]的位置表示已经分配给某个线程进行扩容,当前正在扩容中,或者已经扩容成功。因为transferIndex会被多个线程并发修改,每次减stride,所以需要通过CAS进⾏操作,如下面代码所示 :
else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
-
在扩容未完成之前,有的数组下标对应的槽已经迁移到了新的HashMap里面,有的还在旧的 HashMap里面。这个时候,所有调用get(k, v)的线程还是会访问旧 HashMap,怎么处理呢?
下图为扩容过程中的转发示意图:当Node[0]已经迁移成功,而其他Node还在迁移过程中时,如果有线程要读取Node[0]的数据,就会访问失败。为此,新建⼀个ForwardingNode,即转发节点,在这个节点⾥⾯记录的是新的 ConcurrentHashMap 的引⽤。这样,当线程访问到ForwardingNode之后,会去查询新的ConcurrentHashMap。
-
因为数组的⻓度 tab.length 是2的整数次方,每次扩容又是2倍。而Hash 函数是hashCode%tab.length,等价于hashCode&(tab.length-1)。这意味着:处于第i个位置的元素,在新的Hash表的数组中⼀定处于
第i个或者第i+n个位置,如下图所示。举个简单的例⼦:假设数组长度是8,扩容之后是16:
若hashCode=5, 5%8=0,扩容后, 5%16=0,位置保持不变;
若hashCode=24, 24%8=0,扩容后, 24%16=8,后移8个位置;
若hashCode=25, 25%8=1,扩容后, 25%16=9,后移8个位置;
若hashCode=39, 39%8=7,扩容后, 39%8=7,位置保持不变;…
正因为有这样的规律,所以如下有代码:
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
也就是把tab[i]位置的链表或红黑树重新组装成两部分,⼀部分链接到nextTab[i]的位置,⼀部分链接到nextTab[i+n]的位置,如上图所示。然后把tab[i]的位置指向⼀个ForwardingNode节点。
同时,当tab[i]后面是链表时,使⽤类似于JDK 8中在扩容时的优化方法,从lastRun往后的所有节点,不需依次拷贝,而是直接链接到新的链表头部。从lastRun往前的所有节点,需要依次拷贝。
了解了核心的迁移函数transfer(tab, nextTab),再回头看tryPresize(int size)函数。这个函数的输入是整个Hash表的元素个数,在函数里面,根据需要对整个Hash表进行扩容。想要看明白这个函数,需要透彻地理解sizeCtl变量,下面这段注释摘自源码。
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
当sizeCtl=-1时,表示整个HashMap正在初始化;
当sizeCtl=某个其他负数时,表示多个线程在对HashMap做并发扩容;
当sizeCtl=cap时, tab=null,表示未初始之前的初始容量(如上面的构造函数所示);
扩容成功之后, sizeCtl存储的是下⼀次要扩容的阈值,即上面初始化代码中的n-(n>>> 2) =0.75n。
所以, sizeCtl变量在Hash表处于不同状态时,表达不同的含义。明白了这个道理,再来看上面的tryPresize(int size)
函数
tryPresize(int size)
是根据期望的元素个数对整个Hash表进行扩容,核心是调用transfer函数。在第⼀次扩容的时候, sizeCtl会被设置成⼀个很大的负数U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) +2)
。之后每⼀个线程扩容的时候, sizeCtl 就加 1, U.compareAndSwapInt(this, SIZECTL, sc, sc+1)
,待扩容完成之后, sizeCtl减1