Java并发编程:ConcurrentHashMap深度解析
Java并发编程:ConcurrentHashMap深度解析
ConcurrentHashMap是Java并发包中提供的一种高性能线程安全Map实现,属于java.util.concurrent包下的类,继承自Map接口。它采用数组+链表+红黑树的复合数据结构,能够在高并发环境下提供出色的性能表现。
JDK1.8中的ConcurrentHashMap通过CAS操作与synchronized锁的混合机制保证线程安全
**CAS机制:**当不存在哈希冲突时(节点直接放入数组位置) **synchronized锁:**当出现哈希冲突时(目标位置已有数据)
ConcurrentHashMap的核心优势
- 线程安全且高性能:与使用粗粒度synchronized的HashTable相比,在并发场景下效率显著提升
- 迭代器安全:使用Iterator遍历时不会抛出ConcurrentModificationException异常(避免了fail-fast机制)
数据结构解析
ConcurrentHashMap的底层存储结构采用数组+链表+红黑树的组合方式:
核心内部类实现
Node类:
static class Node<K,V> implements Map.Entry<K,V> {
final int hashVal; // 键的哈希值
final K dataKey; // 键
volatile V dataValue; // 值
volatile Node<K,V> nextRef; // 下一个节点的引用
Node(int hashVal, K dataKey, V dataValue, Node<K,V> nextRef) {
this.hashVal = hashVal;
this.dataKey = dataKey;
this.dataValue = dataValue;
this.nextRef = nextRef;
}
}
Node类实现了Entry接口,存储节点的哈希值、键、值以及指向下一个节点的引用。
TreeNode类:
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parentNode; // 父节点
TreeNode<K,V> leftNode; // 左子节点
TreeNode<K,V> rightNode; // 右子节点
TreeNode<K,V> prevNode; // 前驱节点
boolean isRed; // 节点颜色标识
TreeNode(int hashVal, K dataKey, V dataValue, Node<K,V> nextRef,
TreeNode<K,V> parentNode) {
super(hashVal, dataKey, dataValue, nextRef);
this.parentNode = parentNode;
}
}
TreeNode继承自Node,用于实现红黑树结构,维护父子节点关系和颜色标识。
TreeBin类: TreeBin用于包装TreeNode,当链表长度超过阈值时转换为红黑树。TreeBin不存储实际的键值对,而是作为红黑树的容器,并维护读写锁机制。
ForwardingNode类:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K, V>[] nextTable;
ForwardingNode(Node<K, V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
ForwardingNode用于标记正在扩容迁移中的节点,其hash值为MOVED常量。
线程安全实现机制
1. CAS无锁操作
对数组中节点的修改操作采用CAS(Compare And Swap)机制实现无锁化更新,显著减少锁竞争带来的性能开销。
2. volatile关键字
ConcurrentHashMap中的关键变量使用volatile修饰,确保多线程环境下的可见性和指令有序性。例如节点操作相关的val、next变量和sizeCtl控制变量。
3. synchronized锁粒度优化
仅对数组中的非空节点使用synchronized加锁,空节点通过CAS直接操作,实现锁粒度的最小化。
4. Unsafe类与原子操作
基于Unsafe类封装了三个核心原子操作方法:
/**
* 计算元素在数组中的内存地址
*/
// 获取指定位置的节点
static final <K,V> Node<K,V> getNodeAt(Node<K,V>[] tab, int idx) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)idx << ASHIFT) + ABASE);
}
// 使用CAS算法设置指定位置的节点
static final <K,V> boolean compareAndSetNode(Node<K,V>[] tab, int idx,
Node<K,V> current, Node<K,V> newValue) {
return U.compareAndSwapObject(tab, ((long)idx << ASHIFT) + ABASE, current, newValue);
}
// 设置节点位置的值
static final <K,V> void setNodeAt(Node<K,V>[] tab, int idx, Node<K,V> value) {
U.putObjectVolatile(tab, ((long)idx << ASHIFT) + ABASE, value);
}
核心方法解析
构造函数
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 计算合适的初始容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
构造函数初始化sizeCtl变量,该变量用于控制数组的初始化和扩容操作,具有多种状态含义:
- -1:表示正在初始化
- 小于-1:低16位表示当前扩容线程数
- 0:表示数组未初始化
- 大于0:表示扩容阈值或初始化大小
putVal方法(数据添加实现)
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hashVal = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, idx, fh;
// 初始化表
if (tab == null || (n = tab.length) == 0)
tab = initializeTable();
// 无冲突,直接插入
else if ((f = getNodeAt(tab, idx = (n - 1) & hashVal)) == null) {
if (compareAndSetNode(tab, idx, null, new Node<K,V>(hashVal, key, value, null)))
break;
}
// 正在扩容,协助迁移
else if ((fh = f.hashVal) == MOVED)
tab = assistTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (getNodeAt(tab, idx) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hashVal == hashVal &&
((ek = e.dataKey) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.dataValue;
if (!onlyIfAbsent)
e.dataValue = value;
break;
}
Node<K,V> pred = e;
if ((e = e.nextRef) == null) {
pred.nextRef = new Node<K,V>(hashVal, key, value, null);
break;
}
}
}
// 处理树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeNode(hashVal, key, value)) != null) {
oldVal = p.dataValue;
if (!onlyIfAbsent)
p.dataValue = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
convertToTree(tab, idx);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
initializeTable方法(数组初始化)
private final Node<K,V>[] initializeTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
该方法使用自旋锁机制,通过CAS操作确保线程安全的数组初始化。
convertToTree方法(链表转红黑树)
private final void convertToTree(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 数组长度不足64时优先扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryResize(n << 1);
else if ((b = getNodeAt(tab, index)) != null && b.hashVal >= 0) {
synchronized (b) {
if (getNodeAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 遍历链表构建双向链表
for (Node<K,V> e = b; e != null; e = e.nextRef) {
TreeNode<K,V> p = new TreeNode<K,V>(e.hashVal, e.dataKey, e.dataValue,
null, null);
if ((p.prevNode = tl) == null)
hd = p;
else
tl.nextRef = p;
tl = p;
}
// 将链表转换为红黑树
setNodeAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
红黑树实现详解
红黑树特性
红黑树是一种自平衡二叉查找树,具有以下特性:
- 每个节点要么是红色,要么是黑色
- 根节点必须是黑色
- 红色节点的子节点必须是黑色
- 所有叶子节点都是黑色
- 从任意节点到其每个叶子节点的所有路径都包含相同数量的黑色节点
TreeBin构造函数(双向链表转红黑树)
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.nextRef;
x.leftNode = x.rightNode = null;
if (r == null) {
x.parentNode = null;
x.isRed = false;
r = x;
} else {
K k = x.dataKey;
int h = x.hashVal;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.dataKey;
if ((ph = p.hashVal) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.leftNode : p.rightNode) == null) {
x.parentNode = xp;
if (dir <= 0)
xp.leftNode = x;
else
xp.rightNode = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
balanceInsertion方法(红黑树平衡调整)
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
x.isRed = true;
for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
if ((xp = x.parentNode) == null) {
x.isRed = false;
return x;
}
else if (!xp.isRed || (xpp = xp.parentNode) == null)
return root;
if (xp == (xppl = xpp.leftNode)) {
if ((xppr = xpp.rightNode) != null && xppr.isRed) {
xppr.isRed = false;
xp.isRed = false;
xpp.isRed = true;
x = xpp;
} else {
if (x == xp.rightNode) {
root = rotateLeft(root, x = xp);
xpp = (xp = x.parentNode) == null ? null : xp.parentNode;
}
if (xp != null) {
xp.isRed = false;
if (xpp != null) {
xpp.isRed = true;
root = rotateRight(root, xpp);
}
}
}
} else {
if (xppl != null && xppl.isRed) {
xppl.isRed = false;
xp.isRed = false;
xpp.isRed = true;
x = xpp;
} else {
if (x == xp.leftNode) {
root = rotateRight(root, x = xp);
xpp = (xp = x.parentNode) == null ? null : xp.parentNode;
}
if (xp != null) {
xp.isRed = false;
if (xpp != null) {
xpp.isRed = true;
root = rotateLeft(root, xpp);
}
}
}
}
}
}
putTreeNode方法(树节点添加)
final TreeNode<K,V> putTreeNode(int h, K k, V v) {
Class<?> kc = null;
boolean searched = false;
for (TreeNode<K,V> p = root;;) {
int dir, ph; K pk;
if (p == null) {
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
}
if ((ph = p.hashVal) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.dataKey) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0) {
if (!searched) {
TreeNode<K,V> q, ch;
searched = true;
if (((ch = p.leftNode) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.rightNode) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
dir = tieBreakOrder(k, pk);
}
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.leftNode : p.rightNode) == null) {
TreeNode<K,V> x, f = first;
first = x = new TreeNode<K,V>(h, k, v, f, xp);
if (f != null)
f.prevNode = x;
if (dir <= 0)
xp.leftNode = x;
else
xp.rightNode = x;
if (!xp.isRed)
x.isRed = true;
else {
lockRoot();
try {
root = balanceInsertion(root, x);
} finally {
unlockRoot();
}
}
break;
}
}
assert checkInvariants(root);
return null;
}
为什么链表长度为8转换为红黑树?
选择8作为链表转红黑树的阈值是基于泊松分布的数学分析。当链表长度达到8时,在理想哈希分布下出现的概率已经非常小(约为0.000006),此时转换为红黑树能提供更好的查询性能。同时,选择较小的阈值(如64)时优先进行扩容而非树化,保证在小数据量时不会过早引入红黑树的复杂开销。
tableSizeFor方法(计算2的幂次方容量)
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
该方法通过位操作将任意整数转换为大于等于该值的最小2的幂次方数,确保数组长度始终是2的幂次方,优化哈希计算和扩容效率。
tryResize方法(扩容实现)
private final void tryResize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 省略部分代码...
}
}
总结
ConcurrentHashMap通过精细的设计实现了高效的并发访问:
- 使用CAS无锁操作处理无冲突场景
- 采用细粒度的synchronized锁仅锁定必要的节点
- 引入红黑树优化长链表的查询性能
- 通过volatile保证内存可见性
- 实现了高效的扩容机制和并发协助
这些设计使得ConcurrentHashMap在保证线程安全的同时,能够提供接近非线程安全容器的性能表现,成为Java并发编程中不可或缺的工具类。