ConcurrentHashMap源码学习记录--put方法全过程大白话解析

ConcurrentHashMap源码学习记录–put方法全过程大白话解析

前排提醒:博主看这段源码时是线性的,也就是一条路走到底这样看过去的,所以描述的时候也是线性的。

调用new ConcurrentHashMap(31),会创建一个容量为64的map,这点可以直接点如构造方法查看,里面通过调用tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); 返回一个大于传入数值的最小的2的n次幂值。
默认容量为16
那么现在调用put(K,V)后,会调用一个putVal(K,V,false)
来看看putVal方法

putVal

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//根据key获取hash值int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {	//自旋操作Node<K,V> f; int n, i, fh;//如果是第一次putVal,table还没有初始化//这个table是一个Node数组,Node是CHM的静态内部类,包含K,V,hash,next指针//先看initTbale方法在接着往下看if (tab == null || (n = tab.length) == 0)tab = initTable();//初始化结束,来到这里,hash取模容量得到table的一个位置,如果为空//采用cas操作设置值,成功就直接返回了,失败说明存在竞争,自旋,继续往下看else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}//MOVED = -1,在进行扩容的时候会将Node的hash值设置为-1,这里先跳过else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;//锁住这个Node,也就是分段加锁了,不会阻塞table的其它Node操作synchronized (f) {if (tabAt(tab, i) == f) {//大于0说明这里是个链表if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;//判断hash值、key值是否与当前node的相同,相同则是修改值if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;//记得这里传入的是false吗if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;//来到这里说明这是一个新的key,那么加在链表的尾部if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}//如果是树节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}//设置值完成后来到这里,判断binCount是否大于等于TREEIFY_THRESHOLD阈值,这个值=8,如果是,我们进入treeifyBin看看if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//执行结束,增加map的size//先说说CHM统计元素个数的方式,采用的是baseCount+CounterCells数组中所有元素的value值,那么具体怎么做,进入方法看看addCount(1L, binCount);return null;}

initTable

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {//这个sizeCtl在new的时候把容量赋值给它了//小于0说明有其它线程在初始化或者扩容,让出时间片if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin//采用cas操作尝试修改sizeCtl的值为-1,修改成功则进入下面的代码else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//这里相当于n*0.75,得到一个扩容阈值,当map的size大于这个值时就要进行扩容了sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;}

treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {//如果目前容量还小于64,进行扩容,如果不是,那么就把链表转为树了,扩容操作我们之后再看,先回到putVal方法if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}

addCount

private final void addCount(long x, int check) {//这个CounterCell是CHM用来统计size的静态内部类,只有一个value属性,就是相当于元素个数CounterCell[] as; long b, s;//如果这个counterCells为空,那么直接cas操作,增加baseCount的值//cas失败说明存在竞争,那么进入这个if,去修改counterCells数组中一个counterCell的值if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;//冲突标志,默认为true代表没有冲突boolean uncontended = true;//如果as为空或者当前线程通过getProbe()获得的随机数定位的as数组的位置为空//或者cas修改这个位置的值失败,进入fullAddCount,这个修改就是元素个数+1,注意这里会修改uncontended标志位,也就是如果cas失败,这个值为falseif (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);//修改成功就直接返回了return;}//如果上面cas成功了,来到这里判断一下check,这个check就是putVal中的binCount//如果<=1,直接返回if (check <= 1)return;//统计元素个数,进入sumCount()看看在干嘛s = sumCount();}//上面如果没有return,进入这里,检查扩容if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;//首先判断s是否大于等于扩容阈值了while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {//计算得到扩容戳,这个方法返回一个Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1))//重点看看后面的,1左移15位,与前面的做|运算,这样得到的int数的第16位一定是1int rs = resizeStamp(n);//如果sc<0说明已经有线程在扩容了,看看是否可以协助扩容//第一次到这里sc是扩容阈值,是大于0的,先去看看elseif (sc < 0) {//这里五个条件,只要一个为true就break,无法协助扩容//1.sc右移16位,也就是比较高16位//2.sc == rs+1 代表扩容结束了//3.sc = rs + MAX_RESIZERS 表示协助线程已经达到最大值了//4.nt = nextTable 表示扩容结束了//5.transferIndex <= 0 表示transfer任务被别人领取光了,其实也就是不需要自己了if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;//如果条件满足,cas让sc+1,加入扩容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//cas修改sc的值为rs左移16位+2,rs的第16位一定为1,那么这里修改sc得到的是一个负数,+2代表有一个线程在扩容else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))//进入看看transfer(tab, null);s = sumCount();}}}

fullAddCount

private final void fullAddCount(long x, boolean wasUncontended) {int h;//获取当前线程的probe值,如果为0进行初始化,设置冲突标志值if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();      // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false;                // True if last slot nonempty//进入自旋操作for (;;) {CounterCell[] as; CounterCell a; int n; long v;//第一次进来肯定没初始化了,看下面的//如果初始化完成了,进入if ((as = counterCells) != null && (n = as.length) > 0) {			//先定位到一个元素,如果为空,初始化当前元素if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) {            // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic create//cas尝试修改cellsBusyif (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try {               // Recheck under lockCounterCell[] rs; int m, j;//看最后一个条件,如果为空才进入。这里又进行了一次判断//因为存在一种情况就是上面第一次判断cellsBusy为0时,可能有一个线程在cas修改cellsBusy并且初始化完成了,然后复位cellsBusy//此时另一个线程进入,已经是初始化过了,所以需要再次判断if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}//如果创建成功了if (created)break;//如果没成功,自旋continue;           // Slot is now non-empty}}collide = false;}//来到这里说明在adCount修改的时候失败了else if (!wasUncontended)       // CAS already known to failwasUncontended = true;      // Continue after rehash//定位的元素不为空,尝试cas修改,成功则退出,否则继续往下走else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;//如果其它线程扩容了counterCells,或者当前as容量已经大于cpu核心数了,不进行扩容,修改collide为false,自旋else if (counterCells != as || n >= NCPU)collide = false;            // At max size or stale//恢复collide标志,继续自旋else if (!collide)collide = true;//到了这里,说明竞争激烈,数组容量不够大,需要扩容,先cas修改cellsBusyelse if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless stale//扩容一倍CounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue;                   // Retry with expanded table}//更新随机数值h = ThreadLocalRandom.advanceProbe(h);}//未初始化,cellsBusy用以标志是否存在线程在初始化counterCells数组,为0代表没有//那么cas设置cellsBusy=1,成功就进入进行初始化,初始化容量为2else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try {                           // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];//根据之前得到probe定位,设置值rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}//如果初始化cas操作失败,来到这里尝试修改baseCount,成功则退出else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break;                          // Fall back on using base}}

sumCount-----size()方法统计元素的实现

final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;//这就是之前说的了,统计元素个数的时候是baseCount+counterCells数组的每一个元素的value值if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}

transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//设置任务,n>>>3 /NCPU(cpu个数)如果小于16,那么设置16//目的是让每个cpu处理的数量一样多,避免任务分配不均匀if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range//没初始化的话if (nextTab == null) {            // initiatingtry {//扩容为两倍@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;//这个是转移下标,用以table的数组下标的transferIndex = n;}int nextn = nextTab.length;//这是一个占位节点,当table的一个位置节点完成迁移后,设置这个代表完成了,这个节点的hash值为-1ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//推进标识boolean advance = true;//完成标识boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;//复制nextIndex,注意这里修改了nextIndex 下面使用else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}//首次进入这里,cas修改转移下标,为当前线程分配任务//也就是负责转移的区间 为nextBound到nextIndex,修改完成修改advance,表示需要转移这一段区间,不用往前推进else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;//如果是完成了,修改nextTable为null,修改阈值if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}//来到这里,cas修改sizeCtl-1if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//如果sc-2 == resizeStamp(n) << RESIZE_STAMP_SHIFT),说明这是最后一个在进行扩容的线程了,别的都完成了,修改finishing//如果不等,那直接结束就行了if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}//如果当前位置为null,cas设置这个位置为fwdelse if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {//ln代表低位,hn代表高位,这个高低位是这么个说法//因为容量一直都是2的整数次幂,那么扩容为两倍后,原本hash取模容量落在这个位置 可能 会落在这个位置 +n的位置,那么就把这个的所有node分为两部分,一部分是留在这个位置的,一部分就在新位置Node<K,V> ln, hn;//同样,如果是链表if (fh >= 0) {//&操作是都为1才为1,n是数组长度,为2的整数次幂,也就是说,第x位为1,如10 = 2,100 = 4分别第二位//为1,第三位为1,这里就是可以得到一个第x位为1或为0,对应低位与高位int runBit = fh & n;//之后的节点都是跟在这个节点后面的,也就是runBit与这个一样Node<K,V> lastRun = f;//这个循环就是为了获得最后一位node与它的runBitfor (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}//低位if (runBit == 0) {ln = lastRun;hn = null;}//高位else {hn = lastRun;ln = null;}//构造一个ln链与hn链for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//分别设置在新数组的原位跟原位+nsetTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);//标记为处理完成setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}

helpTransfer(putVal中调用)

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;//判断是否仍在进行扩容,nextTab == null说明扩容结束了,transfer方法中扩容完成设置了nextTab = nullif (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {//这下面跟addCount后面的逻辑基本一样了int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {transfer(tab, nextTab);break;}}return nextTab;}return table;
}

tryPresize(treeifyBin中调用)

 private final void tryPresize(int size) {//防止传入的值不是2的整数次幂int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;//这段与initTable代码一致if (tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c;if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}//如果别人扩容完了,就退出else if (c <= sc || n >= MAXIMUM_CAPACITY)break;//这段与addCount后面代码一样。else if (tab == table) {int rs = resizeStamp(n);if (sc < 0) {Node<K,V>[] nt;if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}}

总结

put方法会调用putVal,putVal方法中,会执行initTable,协助扩容helpTransfer,设置值。
设置值之后判断链表长度,如果大于8并且table容量大于64,会转化为红黑树,如果小于64会调用tryPresize方法进行扩容。
putVal最后会执行addCount方法,表示添加了一个元素

addCount方法中如果counterCells数组为null或者cas修改baseCount失败或者cas修改counterCells数组中一个元素的值失败,就会调用fullAddCount方法;sumCount就是统计元素个数,采用baseCount+counterCells数组所有元素的value值来统计,最后会判断是否需要扩容,需要会调用transfer方法。

fullAddCount包含初始化counterCells数组与扩容与设置baseCount,总之就是添加元素了。

transfer方法采用多线程协助扩容


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部