ConcurrentHashMap那些事儿

ConcurrentHashMap那些事儿

薛定谔的汪

今天回顾总结下ConcurrentHashMap的一些东西,众所周知,HashMap 不是线程安全的,而ConcurrentHashMap 是线程安全的,所以前阵子总结完多线程的相关知识点后再分析 ConcurrentHashMap 就会轻松很多。

ConcurrentHashMap

ConcurrentHashMap在jdk7和jdk8中的实现是不同的。

在jdk7中 ConcurrentHashMap 是由一个个Segment 组成,默认是16个,构成一个Segment数组,Segment通过继承自ReentrantLock来进行加锁,这样每次加锁锁住的都是一个Segment,Segment 数在初始化ConcurrentHashMap时是可以指定的,但是一旦设置后不可更改。

每个 Segment 更像是一个 HashMap,这样利用锁分段的技术来实现线程安全。

jdk8对 ConcurrentHashMap 进行了优化,抛弃了 jdk7的“锁分段”,而是采用 CAS+synchronize 提高了性能,同时保证线程安全(jdk8的 HashMap 使用了红黑树,jdk8的 ConcurrentHashMap 也一样)。

jdk8之后未对 ConcurrentHashMap 改动,现在互联网公司很少在用 jdk7或以下版本了,9月25日 jdk11 都快要出了,目前公司所用 jdk 版本也是8,所以分析 ConcurrentHashMap 就以jdk8为例。

结构图

jdk8 中 ConcurrentHashMap 的结构与 HashMap 的结构大体一样,只不过其实现细节要复杂的多:

继承关系

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable

ConcurrentHashMap 继承自AbstractMap,实现ConcurrentMap接口,ConcurrentMap定义了一写方法,这不是关注的重点。

构造方法

ConcurrentHashMap 的构造方法很简单,有一个很特别:

1
2
3
4
5
6
7
8
9
10
11
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

这个构造方法传入的concurrencyLevel参数并不是真正的控制并发量,只是用来判断initialCapacity从而确定 sizeCtl 的值。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 默认并发量 jdk8版本已和 Segment 一起被弃用,保留这个是为了序列化和反序列化时版本兼容
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* table数组,大小是2的整数次幂
*/
transient volatile Node<K,V>[] table;

/**
* 一个过渡的 table,在扩容时生成并将原table拷贝至此
*/
private transient volatile Node<K,V>[] nextTable;

/**
* 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
*/
private transient volatile long baseCount;

/**
* 控制标识符
* 负数代表正在初始化或正在扩容 -1--正在初始化、-N--有N-1个线程正在扩容
* 0代表table 还没初始化。
* 正数有两种情况:
* 一,构造方法根据initialCapacity计算出的值;
* 二,table 初始化后始终等于 table 长度的0.75,也就是下次扩容的阈值,下面会有分析。
* 这个字段非常重要!!!!
*/
private transient volatile int sizeCtl;

静态内部类 Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
//用 valatile 修饰保证线程可见
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }

//setValue 方法不允许使用!!!而是改用下面的 find
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

//重写 equals,当k 和 v 都相同时,认为是同一个 node
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* 根据hash 和 key 查找对应的 node
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

对比 HashMap 的 Node来理解。

TreeNode

红黑树节点,Node的子类,链表转红黑树时 Node 转 TreeNode。

TreeBin

不负责封装 key 和 value,只是代替作为红黑树的根节点,也就是说实table 数组中存放的是 TreeBin,TreeBin 下是TreeNode,TreeBin 使用了 ReentranReadWriteLock。

TreeBin的 hash 值固定为TREEBIN=-2

ForwardingNode

一个用于连接两个table的节点类。它包含一个nextTable引用,而且这个节点的key value next指针全部为null,它的hash值为MOVED=-1。

Unsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}

看得出 Unsafe 结合反射控制了一些属性了修改工作,如 sizeCtl,在学习 Atomic*原子类时知道 Unsafe 还提供了 自旋+ CAS实现原子操作,在 ConcurrentHashMap中很多地方都有体现。

原子操作

利用 Unsafe 实现的三个原子操作保证了线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//获得 tab 数组中索引 i 位置上的节点 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

//CAS 更新索引 i 位置的节点,当预期值与内存值一样时更新
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

//利用volatile方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

核心方法

put(…)

添加元素

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal(…)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//onlyIfAbsent 当key已存在,true-不改变value的值  false-更新value
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap 的 key 和 value 都不能为空
if (key == null || value == null) throw new NullPointerException();
//计算 hash
int hash = spread(key.hashCode());
int binCount = 0;
//自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//当数组为空时初始化table
tab = initTable();
//根据hash 计算 key 的索引i,CAS 设置索引i上的值,如果i 位置处没有元素,则直接添加不用加锁
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果插入时,索引i上的 Node 正好是ForwardingNode,那ForwardingNode就给人家让位置,自己再找新的位置
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
//到这里就表示原来索引i 处有节点,hash冲突了,hash 冲突先转链表,当链表长度超过8时转红黑树
V oldVal = null;
//这里使用synchronized保证同步,锁是索引i处的 Node
synchronized (f) {
//再校验一次,防止期间索引i处的node被修改过
if (tabAt(tab, i) == f) {
//排除ForwardingNode和 TreeBin
if (fh >= 0) {
binCount = 1;
//自旋
for (Node<K,V> e = f;; ++binCount) {
K ek;
//key 相同,覆盖value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//key 不相同,构造新 Node 插入链表
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果索引i处已经是红黑树了,则构建新 Node 插入红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//单向链表转红黑树,期间涉及到扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

思考:为什么HashMap 中可以存null,但是ConcurrentHashMap 不可以呢?

我的理解是ConcurrentHashMap是并发的,在 get 的过程中可能有其他线程把目标元素给移除了,我们调用 get 得到的 value是 null,但是我们不知道这个null是具体存 value 的时候存的就是 null 还是没获取到 value 返回的null,再就是在并发情况下调用contains(key)和 get(key) 后的结果不是我们想要的了,为了避免歧义所以ConcurrentHashMap 不允许存 null。

参考stackoverflow ,回答的更详细哦。

initTable()初始化 table 数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果 sizeCtl<0 表示其他线程正在初始化或者扩容操作,那自己就把cpu让出来
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//自己扩容时 CAS 方式将SIZECTL设置为-1标识正在扩容
try {
//双重校验,再次判断table 是否为空,这一步是很有必要的!!防止并发条件下重复初始化。
if ((tab = table) == null || tab.length == 0) {
//如果使用带初始容量的构造方法,根据初始容量计算sc的值是2的整数次幂
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化 Node
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//重新设置sc的值是容量的0.75倍
sc = n - (n >>> 2);
}
} finally {
//最终设置 sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

get(Object key)

获取元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算 key 的 hash
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { //根据hash 定位到table 上元素e
if ((eh = e.hash) == h) {
//匹配完 hash 后匹配 equals
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//eh=-1是 ForwardNode,-2是Treebin,-3预留
return (p = e.find(h, key)) != null ? p.val : null;
//到这里还没找到就得去链表其他节点或者红黑树里遍历找了
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//都没找到返回null
return null;
}

注意点

计算 ConcurrentHashMap元素个数的方法有两个,一个是 size(),一个是 mappingCount()

size()返回 int,但是Map 的长度是有可能超过 int 的最大值的,所以jdk8增加了mappingCount(),返回 long

size():

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

mappingCount():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns the number of mappings. This method should be used
* instead of {@link #size} because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

看注释作者推荐用此方法代替 size()因为返回 long,但是当并发插入和移除元素时,得到的结果都不一定准确。

总结

ConcurrentHashMap jdk8版本相较于 jdk7抛弃了 Segment分段锁而是采用自旋+CAS+synchronized 来保证线程安全。

ConcurrentHashMap 博大精深啊!内部类就有好几十个,看的时候太难受了,所以这个就只分析了几个面试会问的重点,其他的会在以后工作学习中继续深入研究。

  • Title: ConcurrentHashMap那些事儿
  • Author: 薛定谔的汪
  • Created at : 2018-08-24 18:01:54
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/08/24/java/ConcurrentHashMap/
  • License: This work is licensed under CC BY-NC-SA 4.0.