当前位置:首页 > 技术 > 正文内容

Java并发编程:ConcurrentHashMap深度解析

访客 技术 2026年6月26日 1

Java并发编程:ConcurrentHashMap深度解析

ConcurrentHashMap是Java并发包中提供的一种高性能线程安全Map实现,属于java.util.concurrent包下的类,继承自Map接口。它采用数组+链表+红黑树的复合数据结构,能够在高并发环境下提供出色的性能表现。

JDK1.8中的ConcurrentHashMap通过CAS操作与synchronized锁的混合机制保证线程安全

**CAS机制:**当不存在哈希冲突时(节点直接放入数组位置) **synchronized锁:**当出现哈希冲突时(目标位置已有数据)

ConcurrentHashMap的核心优势

  • 线程安全且高性能:与使用粗粒度synchronized的HashTable相比,在并发场景下效率显著提升
  • 迭代器安全:使用Iterator遍历时不会抛出ConcurrentModificationException异常(避免了fail-fast机制)

数据结构解析

ConcurrentHashMap的底层存储结构采用数组+链表+红黑树的组合方式:

核心内部类实现

Node类

static class Node<K,V> implements Map.Entry<K,V> {
    final int hashVal;        // 键的哈希值
    final K dataKey;          // 键
    volatile V dataValue;     // 值
    volatile Node<K,V> nextRef; // 下一个节点的引用

    Node(int hashVal, K dataKey, V dataValue, Node<K,V> nextRef) {
        this.hashVal = hashVal;
        this.dataKey = dataKey;
        this.dataValue = dataValue;
        this.nextRef = nextRef;
    }
}

Node类实现了Entry接口,存储节点的哈希值、键、值以及指向下一个节点的引用。

TreeNode类

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parentNode;  // 父节点
    TreeNode<K,V> leftNode;   // 左子节点
    TreeNode<K,V> rightNode;  // 右子节点
    TreeNode<K,V> prevNode;   // 前驱节点
    boolean isRed;           // 节点颜色标识

    TreeNode(int hashVal, K dataKey, V dataValue, Node<K,V> nextRef,
             TreeNode<K,V> parentNode) {
        super(hashVal, dataKey, dataValue, nextRef);
        this.parentNode = parentNode;
    }
}

TreeNode继承自Node,用于实现红黑树结构,维护父子节点关系和颜色标识。

TreeBin类: TreeBin用于包装TreeNode,当链表长度超过阈值时转换为红黑树。TreeBin不存储实际的键值对,而是作为红黑树的容器,并维护读写锁机制。

ForwardingNode类

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K, V>[] nextTable;

    ForwardingNode(Node<K, V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

ForwardingNode用于标记正在扩容迁移中的节点,其hash值为MOVED常量。

线程安全实现机制

1. CAS无锁操作

对数组中节点的修改操作采用CAS(Compare And Swap)机制实现无锁化更新,显著减少锁竞争带来的性能开销。

2. volatile关键字

ConcurrentHashMap中的关键变量使用volatile修饰,确保多线程环境下的可见性和指令有序性。例如节点操作相关的val、next变量和sizeCtl控制变量。

3. synchronized锁粒度优化

仅对数组中的非空节点使用synchronized加锁,空节点通过CAS直接操作,实现锁粒度的最小化。

4. Unsafe类与原子操作

基于Unsafe类封装了三个核心原子操作方法:

/**
 * 计算元素在数组中的内存地址
 */
// 获取指定位置的节点
static final <K,V> Node<K,V> getNodeAt(Node<K,V>[] tab, int idx) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)idx << ASHIFT) + ABASE);
}

// 使用CAS算法设置指定位置的节点
static final <K,V> boolean compareAndSetNode(Node<K,V>[] tab, int idx,
                                        Node<K,V> current, Node<K,V> newValue) {
    return U.compareAndSwapObject(tab, ((long)idx << ASHIFT) + ABASE, current, newValue);
}

// 设置节点位置的值
static final <K,V> void setNodeAt(Node<K,V>[] tab, int idx, Node<K,V> value) {
    U.putObjectVolatile(tab, ((long)idx << ASHIFT) + ABASE, value);
}

核心方法解析

构造函数

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    // 计算合适的初始容量
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

构造函数初始化sizeCtl变量,该变量用于控制数组的初始化和扩容操作,具有多种状态含义:

  • -1:表示正在初始化
  • 小于-1:低16位表示当前扩容线程数
  • 0:表示数组未初始化
  • 大于0:表示扩容阈值或初始化大小

putVal方法(数据添加实现)

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hashVal = spread(key.hashCode());
    int binCount = 0;
    
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, idx, fh;
        // 初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initializeTable();
        // 无冲突,直接插入
        else if ((f = getNodeAt(tab, idx = (n - 1) & hashVal)) == null) {
            if (compareAndSetNode(tab, idx, null, new Node<K,V>(hashVal, key, value, null)))
                break;
        }
        // 正在扩容,协助迁移
        else if ((fh = f.hashVal) == MOVED)
            tab = assistTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (getNodeAt(tab, idx) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hashVal == hashVal &&
                                    ((ek = e.dataKey) == key ||
                                            (ek != null && key.equals(ek)))) {
                                oldVal = e.dataValue;
                                if (!onlyIfAbsent)
                                    e.dataValue = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.nextRef) == null) {
                                pred.nextRef = new Node<K,V>(hashVal, key, value, null);
                                break;
                            }
                        }
                    }
                    // 处理树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeNode(hashVal, key, value)) != null) {
                            oldVal = p.dataValue;
                            if (!onlyIfAbsent)
                                p.dataValue = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    convertToTree(tab, idx);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

initializeTable方法(数组初始化)

private final Node<K,V>[] initializeTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); 
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if (table == tab) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

该方法使用自旋锁机制,通过CAS操作确保线程安全的数组初始化。

convertToTree方法(链表转红黑树)

private final void convertToTree(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        // 数组长度不足64时优先扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryResize(n << 1);
        else if ((b = getNodeAt(tab, index)) != null && b.hashVal >= 0) {
            synchronized (b) {
                if (getNodeAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 遍历链表构建双向链表
                    for (Node<K,V> e = b; e != null; e = e.nextRef) {
                        TreeNode<K,V> p = new TreeNode<K,V>(e.hashVal, e.dataKey, e.dataValue,
                                                        null, null);
                        if ((p.prevNode = tl) == null)
                            hd = p;
                        else
                            tl.nextRef = p;
                        tl = p;
                    }
                    // 将链表转换为红黑树
                    setNodeAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

红黑树实现详解

红黑树特性

红黑树是一种自平衡二叉查找树,具有以下特性:

  1. 每个节点要么是红色,要么是黑色
  2. 根节点必须是黑色
  3. 红色节点的子节点必须是黑色
  4. 所有叶子节点都是黑色
  5. 从任意节点到其每个叶子节点的所有路径都包含相同数量的黑色节点

TreeBin构造函数(双向链表转红黑树)

TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.nextRef;
        x.leftNode = x.rightNode = null;
        
        if (r == null) {
            x.parentNode = null;
            x.isRed = false;
            r = x;
        } else {
            K k = x.dataKey;
            int h = x.hashVal;
            Class<?> kc = null;
            
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.dataKey;
                
                if ((ph = p.hashVal) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                         (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                
                TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.leftNode : p.rightNode) == null) {
                    x.parentNode = xp;
                    if (dir <= 0)
                        xp.leftNode = x;
                    else
                        xp.rightNode = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}

balanceInsertion方法(红黑树平衡调整)

static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
    x.isRed = true;
    
    for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
        if ((xp = x.parentNode) == null) {
            x.isRed = false;
            return x;
        }
        else if (!xp.isRed || (xpp = xp.parentNode) == null)
            return root;
            
        if (xp == (xppl = xpp.leftNode)) {
            if ((xppr = xpp.rightNode) != null && xppr.isRed) {
                xppr.isRed = false;
                xp.isRed = false;
                xpp.isRed = true;
                x = xpp;
            } else {
                if (x == xp.rightNode) {
                    root = rotateLeft(root, x = xp);
                    xpp = (xp = x.parentNode) == null ? null : xp.parentNode;
                }
                if (xp != null) {
                    xp.isRed = false;
                    if (xpp != null) {
                        xpp.isRed = true;
                        root = rotateRight(root, xpp);
                    }
                }
            }
        } else {
            if (xppl != null && xppl.isRed) {
                xppl.isRed = false;
                xp.isRed = false;
                xpp.isRed = true;
                x = xpp;
            } else {
                if (x == xp.leftNode) {
                    root = rotateRight(root, x = xp);
                    xpp = (xp = x.parentNode) == null ? null : xp.parentNode;
                }
                if (xp != null) {
                    xp.isRed = false;
                    if (xpp != null) {
                        xpp.isRed = true;
                        root = rotateLeft(root, xpp);
                    }
                }
            }
        }
    }
}

putTreeNode方法(树节点添加)

final TreeNode<K,V> putTreeNode(int h, K k, V v) {
    Class<?> kc = null;
    boolean searched = false;
    
    for (TreeNode<K,V> p = root;;) {
        int dir, ph; K pk;
        
        if (p == null) {
            first = root = new TreeNode<K,V>(h, k, v, null, null);
            break;
        }
        
        if ((ph = p.hashVal) > h)
            dir = -1;
        else if (ph < h)
            dir = 1;
        else if ((pk = p.dataKey) == k || (pk != null && k.equals(pk)))
            return p;
        else if ((kc == null &&
                 (kc = comparableClassFor(k)) == null) ||
                 (dir = compareComparables(kc, k, pk)) == 0) {
            if (!searched) {
                TreeNode<K,V> q, ch;
                searched = true;
                if (((ch = p.leftNode) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null) ||
                    ((ch = p.rightNode) != null &&
                     (q = ch.findTreeNode(h, k, kc)) != null))
                    return q;
            }
            dir = tieBreakOrder(k, pk);
        }
        
        TreeNode<K,V> xp = p;
        if ((p = (dir <= 0) ? p.leftNode : p.rightNode) == null) {
            TreeNode<K,V> x, f = first;
            first = x = new TreeNode<K,V>(h, k, v, f, xp);
            if (f != null)
                f.prevNode = x;
            if (dir <= 0)
                xp.leftNode = x;
            else
                xp.rightNode = x;
            if (!xp.isRed)
                x.isRed = true;
            else {
                lockRoot();
                try {
                    root = balanceInsertion(root, x);
                } finally {
                    unlockRoot();
                }
            }
            break;
        }
    }
    assert checkInvariants(root);
    return null;
}

为什么链表长度为8转换为红黑树?

选择8作为链表转红黑树的阈值是基于泊松分布的数学分析。当链表长度达到8时,在理想哈希分布下出现的概率已经非常小(约为0.000006),此时转换为红黑树能提供更好的查询性能。同时,选择较小的阈值(如64)时优先进行扩容而非树化,保证在小数据量时不会过早引入红黑树的复杂开销。

tableSizeFor方法(计算2的幂次方容量)

private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

该方法通过位操作将任意整数转换为大于等于该值的最小2的幂次方数,确保数组长度始终是2的幂次方,优化哈希计算和扩容效率。

tryResize方法(扩容实现)

private final void tryResize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 省略部分代码...
    }
}

总结

ConcurrentHashMap通过精细的设计实现了高效的并发访问:

  1. 使用CAS无锁操作处理无冲突场景
  2. 采用细粒度的synchronized锁仅锁定必要的节点
  3. 引入红黑树优化长链表的查询性能
  4. 通过volatile保证内存可见性
  5. 实现了高效的扩容机制和并发协助

这些设计使得ConcurrentHashMap在保证线程安全的同时,能够提供接近非线程安全容器的性能表现,成为Java并发编程中不可或缺的工具类。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。