Contents
  1. 1. 概述JDK1.7
  2. 2. 方法的一句话总结1.7
  3. 3. 版本差异JDK1.7 VS 1.8
  4. 4. ConcurrentHashMap 的弱一致性
  5. 5. CHM的结构JDK1.7
  6. 6. HashEntry JDK1.7
  7. 7. JDK1.7 put源码
  8. 8. rehash(Node) 扩容 1.7
  9. 9. JDK1.7 size()函数
    1. 9.1. 参数解释:
    2. 9.2. 逻辑
  10. 10. containsValue()函数JDK1.7
  11. 11. isEmpty()函数JDK1.7
  12. 12. clear() 清空CHM函数JDK1.7
  13. 13. SegmentJDK1.7
    1. 13.1. Segment的modCount:
    2. 13.2. Segment的初始化
  14. 14. JDK1.8 特性
    1. 14.1. JDK1.8 Put方法
    2. 14.2. JDK1.8时的initTable函数
    3. 14.3. JDK1.8的一些参数
    4. 14.4. JDK1.8中CAS的用法
    5. 14.5. addCount(1,bincount)
    6. 14.6. 核心函数 resizeStamp(n); n为table.length

概述JDK1.7

一个CHM由默认16个分段组成,每一个分段默认由16个HashEntry组成(类似于16个HashMap组成一个CHM)

  • 利用分段锁的技术,实现线程安全的分段数级别的并发写,put、remove、clear的时候加锁,size、containsValue可能会加锁
  • HashEntry里面的value和Next都用Volatile修饰了,保证了可见性,但是牺牲的是弱一致性:同时对某一个key键值对进行读写的时候,可能读不到正在写的值(1.6更明显,因为对count进行判断,JDK1.7做了改善,但是依然无法保证强一致性)

方法的一句话总结1.7

  1. put方法,Segment加锁,操作完之后再释放锁,可能会触发rehash(),保证了线程安全
  2. get方法,getObjectVolatile的方式尝试获取最新的值,其中table、HashEntry.value、HashEntry.next都是volatile修饰的
  3. rehash方法,触发的时机是达到size*loadfactor,和HashMap不一样。只独立的发生在一个Segment里面,table的大小翻倍,然后遍历获取首节点,挨个放到新数组中,是从前面开始移到链表的末位
  4. remove方法,加锁的方式,找到就删除
  5. size方法:只要在一小段连续的时间内CHM未发生改动,该sum(count)值就是size值,如果变动很频繁,直接锁住,求size
  6. containsValue方法遍历一次结果集,如果找到了就直接返回true,未找到则last=sum(modCount),再check遍历一次,如果2次sum一样,仍然未匹配到就返回false,超过次数后就加锁遍历。
  7. isEmpty方法:sum(modcount)为0肯定为空,check一次sum(modcount)一样且count都为0就是空。最多遍历2次。
  8. clear方法: 加锁把每个Segment的table的首位entry设置为null,把Segment的count归置为0,++modCount

版本差异JDK1.7 VS 1.8

  • JDK1.7的 CHM采取的是分段锁技术,put的时候,trylock()的是Segment。JDK1.8的 CHM采取的是CAS + synchronized 来保证并发安全性,CAS操作初始化table的时候以及put时FirstNode为空的时候,synchronized加锁给FiestNode不为空的时候,而且是双重校验判断加锁的形式。
  • JDK1.7追加新结点的方式是在tab[index]的首位追加,而JDK1.8是在末位追加
  • JDK1.8的bucket当小于阈值(8和6)时是单向链表,大于阈值时是红黑树
  • JDK1.7中新的 threshold = (int)(newCapacity * loadFactor);但是在JDK1.8中,就直接写死了, sizeCtl = (n << 1) - (n >>> 1); //调sizeCtl为新容量0.75倍。
  • JDK1.7中其实也用到了CAS策略,ensureSegment()方法里面,判断当前非第一个Segment是否已经初始化的时候。只是JDK1.8中CAS用的更多了而已。

ConcurrentHashMap 的弱一致性

比如get()、size()、isEmpty()、containsValue()等等方法,都无法保证强一致性(读到的结果就是最最最新的结果),但是HashTable可以做到,牺牲了并发的性能,但是维护了强一致性

CHM的结构JDK1.7

  • segments是由final修饰的,所以实在CHM初始化一旦设置了,就不会再变的,也就是CHM的并发读是恒定的,不会随着扩容而变化,每次扩容,都是单个Segment独立完成的操作
    final int segmentMask;
    final int segmentShift;
    final Segment<K,V>[] segments;
    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

HashEntry JDK1.7

HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。

static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}

JDK1.7 put源码

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

rehash(Node) 扩容 1.7

  • table的大小翻倍,然后遍历老的table,获取首节点,挨个遍历放到新数组中,是从前面开始移到末位
  • 因为是2倍扩容,所以index[i]下标里面的所有entry只可能是2种位置 100001的位置或者000001的位置,JDK做了一个优化,判断lastRun
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
//这里的capacity指的是table的长度,不是size啊,别搞错了
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
//遍历老的table
for (int i = 0; i < oldCapacity ; i++) {
//然后一个个处理firstEntry,首节点为空就不处理
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
//先计算first的新数组中的下标,如果next为空,该oldTable[i]扩容完成
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
//如果该位置不止一个entry,lastRun为当前节点
HashEntry<K,V> lastRun = e;
//lastIndex第一次为首位节点在新数组中的位置 可能是A 可能是B
int lastIdx = idx;
for (HashEntry<K,V> last = next;last != null;last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//把该链表中末位的一串新下标的entry,赋值到未初始化的新数组中的位置
newTable[lastIdx] = lastRun;
//然后遍历该firstEntry,从前到后,挨个归置到新桶当中
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

JDK1.7 size()函数

参数解释:

  1. size:整个CHM的总数量
  2. sum:所有Segment中的modCount的总和
  3. modCount:该Segment中被修改的次数
  4. count:该Segment中节点的个数

逻辑

只要在一小段连续的时间内CHM未发生改动,该sum(count)值就是size值,如果变动很频繁,直接锁住,求size

  • 循环遍历所有非空的Segments,只有连续2次的sum(modcount)一样的话,才把sum(count)作为总size。
  • 超过尝试次数,第四次循环的时候开始对所有Segment加锁来求Size
    public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    for (;;) {
    if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    sum += seg.modCount;
    int c = seg.count;
    if (c < 0 || (size += c) < 0)
    overflow = true;
    }
    }
    if (sum == last)
    break;
    last = sum;
    }
    } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
    }
    return overflow ? Integer.MAX_VALUE : size;
    }

containsValue()函数JDK1.7

和size()函数的思想一样,遍历一次结果集,如果找到了就直接返回true,未找到则last=sum(modCount),再check遍历一次,如果2次sum一样,仍然未匹配到就返回false,超过次数后就加锁遍历。

isEmpty()函数JDK1.7

  • CHM是一个空的且未被操作过的CHM,所有Segment的count为0,且modcount也全为0
  • CHM是一个空的但之前有改动过,需要check一下,一小段连续的时间该CHM稳定(modcount未变,且count依旧为0),所有Segment的count为0,但modCount不为0,比如有的线程执行了put,但是又的线程又remove了,此时count为0,或者很多操作,最后clear()了
    public boolean isEmpty() {
    long sum = 0L;
    final Segment<K,V>[] segments = this.segments;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    if (seg.count != 0)
    return false;
    sum += seg.modCount;
    }
    }
    if (sum != 0L) { // recheck unless no modifications
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    if (seg.count != 0)
    return false;
    sum -= seg.modCount;
    }
    }
    if (sum != 0L)
    return false;
    }
    return true;
    }

clear() 清空CHM函数JDK1.7

  • 加锁的
  • 把每个Segment的table的首位entry设置为null
  • 把Segment的count归置为0,++modCount
    final void clear() {
    lock();
    try {
    HashEntry<K,V>[] tab = table;
    for (int i = 0; i < tab.length ; i++)
    setEntryAt(tab, i, null);
    ++modCount;
    count = 0;
    } finally {
    unlock();
    }
    }

SegmentJDK1.7

transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;

Segment的modCount:

该Segment被修改的总次数(The total number of mutative operations in this segment)

  • put且新增的时候会 ++modcount
  • remove的时候会 ++modcount
  • replace的时候会 ++modcount
  • clear的时候会 ++modcount

Segment的初始化

  • 初始化CHM的时候,会指定segments[]为一个16数组,且只初始化segments[0],这样提升了性能,不用CHM初始化的时候,初始所有的Sements[],而是用到的时候再去初始化
  • 里面用到了CAS,防止多个线程同时初始化这个segment
    private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
    Segment<K,V> proto = ss[0]; // use segment 0 as prototype
    int cap = proto.table.length;
    float lf = proto.loadFactor;
    int threshold = (int)(cap * lf);
    HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    == null) { // recheck
    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
    == null) {
    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
    break;
    }
    }
    }
    return seg;
    }

JDK1.8 特性

JDK1.8 Put方法

  • 引入了binCount参数,纪录该tab[index]下面的结点个数,>8触发链表变树、<6触发树变链表
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    //如果key为null直接抛出空指针
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    //如果table为空,就初始化table,初始化的方法里面也有原子操作,保证不会同时被初始化
    if (tab == null || (n = tab.length) == 0)
    tab = initTable();
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
    else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);
    else {
    V oldVal = null;
    synchronized (f) {
    if (tabAt(tab, i) == f) {
    if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    Node<K,V> pred = e;
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
    }
    addCount(1L, binCount);
    return null;
    }

JDK1.8时的initTable函数

注意sizeCtl是全局变量,初始化之后 sizeCtl=3/4 * capacity;
当p=tab[index]为空的时候,不加锁的CAS的方式设置entry(如果该位置为null就设置为当前的node),成功的话,就直接break;不成功的话,就继续无限循环到成功为止

if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

JDK1.8的一些参数

//用来计数,volatile
private transient volatile long baseCount;
//判断resize的参数
private transient volatile int sizeCtl;

JDK1.8中CAS的用法

  1. 初始化table的时候,如果sizeCtl为0表示未进行初始化, 为-1表示其它线程正在进行初始化,其它值表示已经初始化了,而且是valetile的全局变量。
    如果未进行初始化则, CAS的方式尝试归置为 -1,成功的话,再进行一次校验,其实没有必要了,这和加锁不一样,加锁有重复执行的风险,但是CAS只会执行一次。

    private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
    Thread.yield(); // lost initialization race; just spin
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
    try {
    if ((tab = table) == null || tab.length == 0) {
    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    @SuppressWarnings("unchecked")
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    table = tab = nt;
    sc = n - (n >>> 2);
    }
    } finally {
    sizeCtl = sc;
    }
    break;
    }
    }
    return tab;
    }
  2. 当插入一个元素的时候 table[index]==null的时候,也会进行CAS操作,来设置该Node为当前put的node值

    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }

addCount(1,bincount)

如果table[index]为null的时候 bincount=0;
触发扩容的条件:

  • put后的size>=sizeCtl
  • table不为空
  • table的大小小于最大容量
    private final void addCount(long x, int check) {
    //b=baseCount 原大小, s=b+x 表示put后的大小
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell a; long v; int m;
    boolean uncontended = true;
    if (as == null || (m = as.length - 1) < 0 ||
    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    !(uncontended =
    U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    fullAddCount(x, uncontended);
    return;
    }
    if (check <= 1)
    return;
    s = sumCount();
    }
    if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    if (sc < 0) {
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    transferIndex <= 0)
    break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    transfer(tab, nt);
    }
    else if (U.compareAndSwapInt(this, SIZECTL, sc,
    (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
    s = sumCount();
    }
    }
    }

核心函数 resizeStamp(n); n为table.length

static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Contents
  1. 1. 概述JDK1.7
  2. 2. 方法的一句话总结1.7
  3. 3. 版本差异JDK1.7 VS 1.8
  4. 4. ConcurrentHashMap 的弱一致性
  5. 5. CHM的结构JDK1.7
  6. 6. HashEntry JDK1.7
  7. 7. JDK1.7 put源码
  8. 8. rehash(Node) 扩容 1.7
  9. 9. JDK1.7 size()函数
    1. 9.1. 参数解释:
    2. 9.2. 逻辑
  10. 10. containsValue()函数JDK1.7
  11. 11. isEmpty()函数JDK1.7
  12. 12. clear() 清空CHM函数JDK1.7
  13. 13. SegmentJDK1.7
    1. 13.1. Segment的modCount:
    2. 13.2. Segment的初始化
  14. 14. JDK1.8 特性
    1. 14.1. JDK1.8 Put方法
    2. 14.2. JDK1.8时的initTable函数
    3. 14.3. JDK1.8的一些参数
    4. 14.4. JDK1.8中CAS的用法
    5. 14.5. addCount(1,bincount)
    6. 14.6. 核心函数 resizeStamp(n); n为table.length