ConcurrentHashMap
概述JDK1.7
一个CHM由默认16个分段组成,每一个分段默认由16个HashEntry组成(类似于16个HashMap组成一个CHM)
- 利用分段锁的技术,实现线程安全的分段数级别的并发写,put、remove、clear的时候加锁,size、containsValue可能会加锁
- HashEntry里面的value和Next都用Volatile修饰了,保证了可见性,但是牺牲的是弱一致性:同时对某一个key键值对进行读写的时候,可能读不到正在写的值(1.6更明显,因为对count进行判断,JDK1.7做了改善,但是依然无法保证强一致性)
方法的一句话总结1.7
- put方法,Segment加锁,操作完之后再释放锁,可能会触发rehash(),保证了线程安全
- get方法,getObjectVolatile的方式尝试获取最新的值,其中table、HashEntry.value、HashEntry.next都是volatile修饰的
- rehash方法,触发的时机是达到size*loadfactor,和HashMap不一样。只独立的发生在一个Segment里面,table的大小翻倍,然后遍历获取首节点,挨个放到新数组中,是从前面开始移到链表的末位
- remove方法,加锁的方式,找到就删除
- size方法:只要在一小段连续的时间内CHM未发生改动,该sum(count)值就是size值,如果变动很频繁,直接锁住,求size
- containsValue方法遍历一次结果集,如果找到了就直接返回true,未找到则last=sum(modCount),再check遍历一次,如果2次sum一样,仍然未匹配到就返回false,超过次数后就加锁遍历。
- isEmpty方法:sum(modcount)为0肯定为空,check一次sum(modcount)一样且count都为0就是空。最多遍历2次。
- clear方法: 加锁把每个Segment的table的首位entry设置为null,把Segment的count归置为0,++modCount
版本差异JDK1.7 VS 1.8
- JDK1.7的 CHM采取的是分段锁技术,put的时候,trylock()的是Segment。JDK1.8的 CHM采取的是CAS + synchronized 来保证并发安全性,CAS操作初始化table的时候以及put时FirstNode为空的时候,synchronized加锁给FiestNode不为空的时候,而且是双重校验判断加锁的形式。
- JDK1.7追加新结点的方式是在tab[index]的首位追加,而JDK1.8是在末位追加
- JDK1.8的bucket当小于阈值(8和6)时是单向链表,大于阈值时是红黑树
- JDK1.7中新的 threshold = (int)(newCapacity * loadFactor);但是在JDK1.8中,就直接写死了, sizeCtl = (n << 1) - (n >>> 1); //调sizeCtl为新容量0.75倍。
- JDK1.7中其实也用到了CAS策略,ensureSegment()方法里面,判断当前非第一个Segment是否已经初始化的时候。只是JDK1.8中CAS用的更多了而已。
ConcurrentHashMap 的弱一致性
比如get()、size()、isEmpty()、containsValue()等等方法,都无法保证强一致性(读到的结果就是最最最新的结果),但是HashTable可以做到,牺牲了并发的性能,但是维护了强一致性
CHM的结构JDK1.7
- segments是由final修饰的,所以实在CHM初始化一旦设置了,就不会再变的,也就是CHM的并发读是恒定的,不会随着扩容而变化,每次扩容,都是单个Segment独立完成的操作final int segmentMask;final int segmentShift;final Segment<K,V>[] segments;transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;transient Collection<V> values;
HashEntry JDK1.7
HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
JDK1.7 put源码
|
rehash(Node) 扩容 1.7
- table的大小翻倍,然后遍历老的table,获取首节点,挨个遍历放到新数组中,是从前面开始移到末位
- 因为是2倍扩容,所以index[i]下标里面的所有entry只可能是2种位置 100001的位置或者000001的位置,JDK做了一个优化,判断lastRun
|
JDK1.7 size()函数
参数解释:
- size:整个CHM的总数量
- sum:所有Segment中的modCount的总和
- modCount:该Segment中被修改的次数
- count:该Segment中节点的个数
逻辑
只要在一小段连续的时间内CHM未发生改动,该sum(count)值就是size值,如果变动很频繁,直接锁住,求size
- 循环遍历所有非空的Segments,只有连续2次的sum(modcount)一样的话,才把sum(count)作为总size。
- 超过尝试次数,第四次循环的时候开始对所有Segment加锁来求Sizepublic int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last)break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size;}
containsValue()函数JDK1.7
和size()函数的思想一样,遍历一次结果集,如果找到了就直接返回true,未找到则last=sum(modCount),再check遍历一次,如果2次sum一样,仍然未匹配到就返回false,超过次数后就加锁遍历。
isEmpty()函数JDK1.7
- CHM是一个空的且未被操作过的CHM,所有Segment的count为0,且modcount也全为0
- CHM是一个空的但之前有改动过,需要check一下,一小段连续的时间该CHM稳定(modcount未变,且count依旧为0),所有Segment的count为0,但modCount不为0,比如有的线程执行了put,但是又的线程又remove了,此时count为0,或者很多操作,最后clear()了public boolean isEmpty() {long sum = 0L;final Segment<K,V>[] segments = this.segments;for (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {if (seg.count != 0)return false;sum += seg.modCount;}}if (sum != 0L) { // recheck unless no modificationsfor (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {if (seg.count != 0)return false;sum -= seg.modCount;}}if (sum != 0L)return false;}return true;}
clear() 清空CHM函数JDK1.7
- 加锁的
- 把每个Segment的table的首位entry设置为null
- 把Segment的count归置为0,++modCountfinal void clear() {lock();try {HashEntry<K,V>[] tab = table;for (int i = 0; i < tab.length ; i++)setEntryAt(tab, i, null);++modCount;count = 0;} finally {unlock();}}
SegmentJDK1.7
|
Segment的modCount:
该Segment被修改的总次数(The total number of mutative operations in this segment)
- put且新增的时候会 ++modcount
- remove的时候会 ++modcount
- replace的时候会 ++modcount
- clear的时候会 ++modcount
Segment的初始化
- 初始化CHM的时候,会指定segments[]为一个16数组,且只初始化segments[0],这样提升了性能,不用CHM初始化的时候,初始所有的Sements[],而是用到的时候再去初始化
- 里面用到了CAS,防止多个线程同时初始化这个segmentprivate Segment<K,V> ensureSegment(int k) {final Segment<K,V>[] ss = this.segments;long u = (k << SSHIFT) + SBASE; // raw offsetSegment<K,V> seg;if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {Segment<K,V> proto = ss[0]; // use segment 0 as prototypeint cap = proto.table.length;float lf = proto.loadFactor;int threshold = (int)(cap * lf);HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheckSegment<K,V> s = new Segment<K,V>(lf, threshold, tab);while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))break;}}}return seg;}
JDK1.8 特性
JDK1.8 Put方法
- 引入了binCount参数,纪录该tab[index]下面的结点个数,>8触发链表变树、<6触发树变链表final V putVal(K key, V value, boolean onlyIfAbsent) {//如果key为null直接抛出空指针if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//如果table为空,就初始化table,初始化的方法里面也有原子操作,保证不会同时被初始化if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
JDK1.8时的initTable函数
注意sizeCtl是全局变量,初始化之后 sizeCtl=3/4 * capacity;
当p=tab[index]为空的时候,不加锁的CAS的方式设置entry(如果该位置为null就设置为当前的node),成功的话,就直接break;不成功的话,就继续无限循环到成功为止
JDK1.8的一些参数
|
JDK1.8中CAS的用法
初始化table的时候,如果sizeCtl为0表示未进行初始化, 为-1表示其它线程正在进行初始化,其它值表示已经初始化了,而且是valetile的全局变量。
如果未进行初始化则, CAS的方式尝试归置为 -1,成功的话,再进行一次校验,其实没有必要了,这和加锁不一样,加锁有重复执行的风险,但是CAS只会执行一次。private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;}当插入一个元素的时候 table[index]==null的时候,也会进行CAS操作,来设置该Node为当前put的node值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}
addCount(1,bincount)
如果table[index]为null的时候 bincount=0;
触发扩容的条件:
- put后的size>=sizeCtl
- table不为空
- table的大小小于最大容量private final void addCount(long x, int check) {//b=baseCount 原大小, s=b+x 表示put后的大小CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}
核心函数 resizeStamp(n); n为table.length
|