ConcurrentHashMap一生之敌
ConcurrentHashMap一生之敌
已经是不得不掌握的一个容器了
1.7
- Segment数组,Segment里面包括HashEntry
- HashEntry组成
- 是数组加链表
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
Segment组成
static final class Segment<K,V> extends ReentrantLock implements Serializable {private static final long serialVersionUID = 2249069246763182397L;// 和 HashMap 中的 Node数组 作用一样,真正存放数据的桶//volatile修饰transient volatile HashEntry<K,V>[] table;transient int count;transient int modCount;transient int threshold;final float loadFactor;}
static final calss HashEntry<K,V>{final int hash,final K key;//保证可见性,主要是为了让读操作不用加锁,但是写操作还是要加锁的volatile V value;volatile HashEntry<K,V> next;
}
ConcurrentHashMap 采用了**「分段锁」**技术,其中 Segment 继承于 ReentrantLock。
不会像HashTable那样不管是put还是get操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。
「每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment」。
put
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);int j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j); //初始化数组//这个函数中检查了三次segments[j]是否==null,但也不能完全保证,所以最后用CAS操作保证写回操作只执行一次 return s.put(key, hash, value, false);
}
上面的代码都没有加锁,通过key定位到Segment,之后在对应的Segment中进行具体的put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {//尝试加锁HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value); // 1. 加锁处理 V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key || // 2. 遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else { // 3. 不为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock(); // 4. 解锁}return oldValue;
}
- 「虽然HashEntry中的value是用volatile关键字修饰的,但是并不能保证并发的原子性,所以put操作仍然需要加锁处理」。
- 「首先第一步的时候会尝试获取锁,如果获取失败肯定就是其他线程存在竞争,则利用
scanAndLockForPut()自旋获取锁」。- 尝试获取自旋锁
- 如果重试的次数达到了
MAX_SCAN_RETRIES则改为**「阻塞锁获取」**,保证能获取成功。 - 在自旋的过程中遍历链表,若发现没有重复的节点,则提前新建一个节点,为后面再插入节省时间。
总的来说:
- 将当前的Segment中的table通过key的hashcode定位到HashEntry
- 遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value
- 不为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容
- 最后会解除在1中所获取当前Segment的锁。
get
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;
}
- 只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
- 由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
- ConcurrentHashMap 的 get 方法是非常高效的,「因为整个过程都不需要加锁」。
扩容
每个Segment内部在超过一定阈值后,内部会进行扩容。
- 首先我们要知道,由于整个数组的长度是2的整数次方,每次按2倍扩容,而hash函数就是对数组长度取模,当元素之前处于第i个位置,再次hash时,必然处于第i个或者第i+oldCapacity个位置。
- 在第一个条件的基础上我们就知道一个数组里只有两种位置了,所以这里扩容先从后往前记录了一直连续相同位置的节点位置,遍历到位置不同为止,然后一把把lastRun复制过去,只能说是能省一点省一点吧。
1.8
我们考虑一下1.7版本中分断锁有哪些好处
- 两次hash,减少Hash冲突,避免一个槽中有太多元素,加快速度
- 提高线程并发数,segment与segment互相独立
- 提供扩容的并发度。每次segment独立扩容
那升级后,相对应的怎么处理这些问题呢?
- 使用红黑树
- 加锁的粒度改为对Node数组的长度
- 并发扩容
1.8即降低了Hash冲突,也提升了并发度。
初始化
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //将sizectl设置为-1try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2); //表示下一次扩容的阈值}} finally {sizeCtl = sc;}break;}}return tab;}
多个线程只有一个能初始化,是通过CAS操作的。
Put
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable(); //整个数组初始话else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f); //并发扩容else {V oldVal = null;synchronized (f) { //加锁if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i); //超过8尝试树化if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount); return null;}
扩容
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//还没到64,所以先去扩容tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //反正是扩容两倍nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n; //用于标记进度,初始值为旧的HashMap的数组长度}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
俺不会了,先不写了。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
