0%

Everything About ConcurrentHashMap

概述

ConcurrentHashMap常用于并发编程,这里就从源码上来分析一下ConcurrentHashMap数据结构和底层原理。

在开始之前先介绍一个算法, 这个算法和Concurrent的实现是分不开的。
CAS算法:

  • CAS是英文单词Compare And Swap的缩写,翻译过来就是比较并替换。
  • CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
  • 更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B

从思想上来说,Synchronized属于悲观锁,悲观地认为程序中的并发情况严重,所以严防死守。CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去尝试更新。

ConcurrentHashMap是一个线程安全的Map集合,可以应对高并发的场景,保证线程安全。相比较HashTable,它的锁粒度更加的细化,因为HashTable的方法都是用Synchronized修饰的,效率灰常的底下。

1.8之前ConcurrentHashMap使用锁分段技术,将数据分成一段段的存储,每一个数据段配置一把锁,相互之间不影响,而1.8之后摒弃了Segment(锁段)的概念,启用了全新的实现,也就是利用CAS+Synchronized来保证并发更新的安全,底层采用的依然是数组+链表+红黑树。

本篇文章是基于JDK1.8 。

数据结构

继承关系

1
2
3
public class ConcurrentHashMap<K,V> 
extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable

ConcurrentHashMap 继承了AbstractMap ,并且实现了ConcurrentMap接口。

与HashMap比对:

  • 相同点:都集成了AbstractMap接口
  • 不同点:HashMap实现了Map接口,ConcurrentHashMap实现了ConcurrentMap接口,而ConcurrentMap继承了Map接口,使用default关键字定义了一些方法 。

从继承关系上看ConcurrentHashMap与HashMap并没有太大的区别。

基本属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量2的30次方
private static final int DEFAULT_CAPACITY = 16; //默认容量 1<<4

private static final float LOAD_FACTOR = 0.75f; //负载因子
static final int TREEIFY_THRESHOLD = 8; //链表转为红黑树
static final int UNTREEIFY_THRESHOLD = 6; //树转列表
static final int MIN_TREEIFY_CAPACITY = 64; //
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // forwarding nodes 的hash值
static final int TREEBIN = -2; // roots of trees 的hash值
static final int RESERVED = -3; // transient reservations 的hash值
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int NCPU = Runtime.getRuntime().availableProcessors(); //可用处理器数量

重点说一下 sizeCtrl 属性,这个属性在 ConcurrentHashMap 中扮演者重要的角色。

1
2
3
4
5
6
7
8
//表初始化或者扩容的一个控制标识位
//负数代表正在进行初始化或者扩容的操作
// -1 代表初始化
// -N 代表有n-1个线程在进行扩容操作
//正数或者0表示没有进行初始化操作,这个数值表示初始化或者下一次要扩容的大小。

//transient 修饰的属性不会被序列化,volatile保证可见性
private transient volatile int sizeCtl;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
//无参构造方法,没有进行任何操作
public ConcurrentHashMap() {}
//指定初始化大小构造方法,判断参数的合法性,并创建了计算初始化的大小
public ConcurrentHashMap(int initialCapacity) {}
//将指定的集合转化为ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {}
//指定初始化大小和负载因子的构造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//指定初始化大小,负载因子和concurrentLevel并发更新线程的数量,也可以理解为segment的个数
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {}

ConcurrentHashMap的构造方法并没做太多的工作,主要是进行了参数的合法性校验,和初始值大小的转换。这个方法 tableSizeFor()说明一下, 主要的功能就是将指定的初始化参数转换为2的幂次方形式, 如果初始化参数为9 ,转换后初始大小为16 。

内部数据结构

Node

首当其冲,因为它是ConcurrentHashMap的核心,它包装了key-value的键值对,所有插入的数据都包装在这里面,与HashMap很相似,但是有一些差别:

1
2
3
4
5
6
7
8
9
10
11
12
13
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}

value 和 next使用了volatile修饰,保证了线程之间的可见性。也不允许调用setValue()方法直接改变Node的值。并增加了find()方法辅助map.get()方法。

TreeNode

树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

TreeBin

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。

ForwardingNode

一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找

ConcurrentHashMap常用方法

initTable 初始化方法

初始化方法是很重要的一个方法,因为在ConcurrentHashMap的构造方法中只是简单的进行了一些参数校验和参数转换的操作。整个Map的初始化是在插入元素的时候触发的。这一点在下面的put方法中会进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//执行初始化操作,单线程操作
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//sizeCtl < 0 表示有线程正在进行初始化操作,从运行状态变为就绪状态。
Thread.yield(); // lost initialization race; just spin

//设置SIZECTL的值为-1,阻塞其他线程的操作
//该方法有四个参数
//第一个参数:需要改变的对象
//第二个参数:偏移量
//第三个参数:期待的值
//第四个参数:更新后的值
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次检查是否有线程进行了初始化操作
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化Node对象数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sc的值设置为n的0.75倍
sc = n - (n >>> 2); //相当于n*0.75
}
} finally {
sizeCtl = sc; //更改sizeCtl的值
}
break; //中断循坏返回
}
}
return tab; //返回初始化的值
}

扩容方法

当ConcurrentHashMap 容量不足的时候,需要对table进行扩容,这个方法是支持多个线程并发扩容的,我们所说的扩容,从本质上来说,无非是从一个数组到另外一个数组的拷贝。

扩容方法分为两个部分:

  • 创建扩容后的新数组,容量变为原来的两倍 ,新数组的创建时单线程完成
  • 将原来的数组元素复制到新的数组中,这个是多线程操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
//帮助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}


//tab = table ,nextTab 一个Node<Key,Value>[]类型的变量
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n 是tab的长度 , stride 初始值为0
int n = tab.length, stride;
//判断cpu处理多线程的能力,如果小于16就直接赋值为16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//构造一个容量是原来两倍的Node<K ,V> 类型数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt; //赋值
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab; //赋值
transferIndex = n; //将数组长度赋值给transferIndex
}
int nextn = nextTab.length; //获取新数组的长度

ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //创建fwd节点

boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab

//使用for循环来处理每个槽位中的链表元素,CAS设置transferIndex属性值,并初始化i和bound值
// i 指当前的槽位序号,bound值需要处理的边界,先处理槽位为15的节点
for (int i = 0, bound = 0;;) {

//创建两个变量,一个为Node<K,V> 类型,一个为int类型
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;

//将transferIndex的值赋值给 nextIndex ,并判断nextIndex的值是否小于等于0
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}

//更新nextIndex的值
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果table已经复制结束
if (finishing) {
nextTable = null; //清空nextTable
table = nextTab; //把nextTab 赋值给 table
sizeCtl = (n << 1) - (n >>> 1); //阈值设置为容量的1.5倍
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//CAS算法获取某个数组节点,为空就设置为fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);

//如果某个节点的hash为-1,跳过
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//对头节点加锁,禁止其他线程进入
synchronized (f) {
if (tabAt(tab, i) == f) {

//构造两个链表 ,将该节点的列表拆分为两个部分,一个是原链表的排列顺序,一个是反序
Node<K,V> ln, hn;
if (fh >= 0) { // fh 当前节点的hash值 若 >= 0
int runBit = fh & n;
Node<K,V> lastRun = f; //将当前节点赋值给 lastRun 节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//差分列表操作
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTab 的i 位置上放置ln节点
setTabAt(nextTab, i, ln);
//在nextTab 的 i+n 位置上放置 hn节点
setTabAt(nextTab, i + n, hn);
//在tab节点i位置上插入插入forwardNode节点,表示该节点已经处理
setTabAt(tab, i, fwd);
advance = true;
}
//对TreeBin对象进行处理,过程与上面有些类似
//也把节点分类,分别插入到lo和hi为头节点的链表中
//
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后 不在需要tree结构,反向转换成链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

put方法

put操作是最长用的方法,接下来看一下put()方法的具体实现:

  • put()要求键值都不能为空
  • 需要经过两次散列, 是数据均匀分散,减少碰撞的次数
  • 判断tab是否进行了初始化,没有则调用initTable进行初始化操作(单线程)
  • 数组i的位置没有元素存在,直接放入
  • 如果i的位置在进行MOVE操作,也就是在进行扩容操作,则多线程帮助扩容
  • 如果i的位置有元素存在,则在该节点加锁Synchronized,判断是链表还是红黑树,按照相应的插入规则插入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key|value == null 抛出异常
//ConcurrentHashMap不允许键或者值为null的这种情况发生
//这一点和HashMap有区别
if (key == null || value == null) throw new NullPointerException();

//散列在散列, 让数据均匀分布,减少碰撞次数
int hash = spread(key.hashCode()); -->static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
int binCount = 0;

//死循环 相当于while(true) ,将table赋值给 tab
for (Node<K,V>[] tab = table;;) {

//创建一个Node类型的变量f , int 类型的变量 n i fh
Node<K,V> f; int n, i, fh;

//判断tab是否为null ,是否进行了初始化操作,如果没有执行初始化,执行初始化操作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//tabAt 获取值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

//添加到table中
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; //退出循环 // no lock when adding to empty bin
}

//node的hash值为 -1
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;

//key 相等,使用新值替换旧值
if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//放在链表的尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,value, null);
break;
}
}
}
//红黑树替换
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

Get方法

Get方法也是最长用的方法,元素放入了,总要取出来

  • 根据传入的key,获取相应的hash值
  • 然后判断当前的table数组是否为空
  • 计算指定的key在table中存储的位置
  • 链表或者红黑树转换相依的方法处理
  • 不存在则返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//eh< 0 表示红黑树节点
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//链表遍历
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

总结

JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。

jdk7中ConcurrentHashmap中,当长度过长,碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以jdk8 中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。

主要设计上的变化有以下几点:

  1. 不采用segment而采用node,锁住node来实现减小锁粒度。
  2. 设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。
  3. 使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。
  4. sizeCtl的不同值来代表不同含义,起到了控制的作用。

参考:

http://blog.csdn.net/u010723709/article/details/48007881