当前位置:首页 > 技术 > 正文内容

Java并发工具包核心组件详解

访客 技术 2026年6月14日 1

1. 原子操作组件

在多线程环境中执行数值计算时,若缺乏适当的同步机制,往往会导致数据不一致问题。Java提供了原子类来解决此类线程安全问题。

基础原子类型

  • AtomicInteger: 提供线程安全的整型操作
  • AtomicLong: 提供线程安全的长整型操作
  • AtomicBoolean: 提供线程安全的布尔值操作
public class AtomicDemo {
    public static void main(String[] args) {
        int previousVal = 0;
        AtomicInteger counter = new AtomicInteger(0);
        
        previousVal = counter.getAndSet(12);
        System.out.println("previous:" + previousVal + "; current:" + counter);
        
        previousVal = counter.getAndIncrement();
        System.out.println("previous:" + previousVal + "; current:" + counter);
        
        previousVal = counter.getAndAdd(-10);
        System.out.println("previous:" + previousVal + "; current:" + counter);
    }
}

引用类型原子操作

AtomicReference 支持对象引用的原子性更新:

public class BankAccount {
    private final String holder;
    private final int balance;

    public BankAccount(String holder, int balance) {
        this.holder = holder;
        this.balance = balance;
    }

    // getter方法和toString省略
}

AtomicReference<BankAccount> accountRef = new AtomicReference<>(new BankAccount("张三", 100));

// 原子性更新余额
accountRef.updateAndGet(acc -> new BankAccount(acc.getHolder(), acc.getBalance() + 50));

数组类型原子操作

  • AtomicIntegerArray: 整型数组原子操作
  • AtomicLongArray: 长整型数组原子操作
  • AtomicReferenceArray: 引用数组原子操作
public class AtomicArrayDemo {
    public static void main(String[] args) {
        int oldValue = 0;
        int[] data = {-2, -1, 1, 2, 3, 4};
        AtomicIntegerArray array = new AtomicIntegerArray(data);
        
        for (int i = 0; i < data.length; i++) {
            System.out.println(array.get(i));
        }
        
        oldValue = array.getAndSet(0, 2);
        System.out.println("old:" + oldValue + "; array:" + array);
        
        oldValue = array.getAndIncrement(0);
        System.out.println("old:" + oldValue + "; array:" + array);
        
        oldValue = array.getAndAdd(0, 5);
        System.out.println("old:" + oldValue + "; array:" + array);
    }
}

2. 线程同步工具

倒计时门闩(CountDownLatch)

允许一个或多个线程等待其他线程完成特定操作:

import java.util.concurrent.CountDownLatch;

public class LatchDemo {
    public static void main(String[] args) {
        CountDownLatch gate = new CountDownLatch(3);

        for (int i = 0; i < 3; i++) {
            new Worker(gate, "Worker-" + (i + 1)).start();
        }

        try {
            System.out.println("主线程等待所有任务完成...");
            gate.await();
            System.out.println("所有任务已完成,主线程继续执行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class Worker extends Thread {
        private final CountDownLatch latch;

        public Worker(CountDownLatch latch, String name) {
            super(name);
            this.latch = latch;
        }

        @Override
        public void run() {
            System.out.println(getName() + " 开始执行任务...");
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getName() + " 任务完成");
            latch.countDown();
        }
    }
}

循环屏障(CyclicBarrier)

使一组线程相互等待至某个公共屏障点:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class BarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("所有线程已到达屏障点,开始下一步操作");
        });

        for (int i = 0; i < 3; i++) {
            new TaskWorker(barrier, "Worker-" + (i + 1)).start();
        }
    }

    static class TaskWorker extends Thread {
        private final CyclicBarrier barrier;

        public TaskWorker(CyclicBarrier barrier, String name) {
            super(name);
            this.barrier = barrier;
        }

        @Override
        public void run() {
            System.out.println(getName() + " 开始执行任务...");
            try {
                Thread.sleep((long) (Math.random() * 1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getName() + " 任务完成,等待其他线程");

            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(getName() + " 继续执行后续任务");
        }
    }
}

信号量(Semaphore)

控制同时访问特定资源的线程数量:

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);

        for (int i = 0; i < 5; i++) {
            new ServiceWorker(semaphore, "Worker-" + (i + 1)).start();
        }
    }

    static class ServiceWorker extends Thread {
        private final Semaphore semaphore;

        public ServiceWorker(Semaphore semaphore, String name) {
            super(name);
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(getName() + " 获得许可,开始执行任务...");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(getName() + " 任务完成,释放许可");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }
}

阶段器(Phaser)

提供灵活的多阶段同步机制:

import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        for (int i = 0; i < 3; i++) {
            new PhaseWorker(phaser, "Worker-" + (i + 1)).start();
        }
    }

    static class PhaseWorker extends Thread {
        private final Phaser phaser;

        public PhaseWorker(Phaser phaser, String name) {
            super(name);
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(getName() + " 开始第一阶段任务...");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(getName() + " 第一阶段完成,等待其他线程");
                phaser.arriveAndAwaitAdvance();

                System.out.println(getName() + " 开始第二阶段任务...");
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println(getName() + " 第二阶段完成,等待其他线程");
                phaser.arriveAndAwaitAdvance();

                phaser.arriveAndDeregister();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

3. 阻塞队列实现

数组阻塞队列(ArrayBlockingQueue)

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayQueueDemo {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

        Thread producer = new Thread(new DataProducer(queue), "Producer");
        Thread consumer = new Thread(new DataConsumer(queue), "Consumer");

        producer.start();
        consumer.start();
    }

    static class DataProducer implements Runnable {
        private final ArrayBlockingQueue<Integer> queue;

        public DataProducer(ArrayBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    System.out.println(Thread.currentThread().getName() + " 生产: " + i);
                    queue.put(i);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class DataConsumer implements Runnable {
        private final ArrayBlockingQueue<Integer> queue;

        public DataConsumer(ArrayBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();
                    System.out.println(Thread.currentThread().getName() + " 消费: " + item);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

链表阻塞队列(LinkedBlockingQueue)

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedQueueDemo {
    public static void main(String[] args) {
        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        Thread producer = new Thread(new ItemProducer(queue), "Producer");
        Thread consumer = new Thread(new ItemConsumer(queue), "Consumer");

        producer.start();
        consumer.start();
    }

    static class ItemProducer implements Runnable {
        private final LinkedBlockingQueue<Integer> queue;

        public ItemProducer(LinkedBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    System.out.println(Thread.currentThread().getName() + " 生产: " + i);
                    queue.put(i);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ItemConsumer implements Runnable {
        private final LinkedBlockingQueue<Integer> queue;

        public ItemConsumer(LinkedBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();
                    System.out.println(Thread.currentThread().getName() + " 消费: " + item);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

优先级阻塞队列(PriorityBlockingQueue)

import java.util.concurrent.PriorityBlockingQueue;

class WorkItem implements Comparable<WorkItem> {
    private int id;
    private int priority;

    public WorkItem(int id, int priority) {
        this.id = id;
        this.priority = priority;
    }

    public int getPriority() { return priority; }

    @Override
    public int compareTo(WorkItem other) {
        return Integer.compare(this.priority, other.priority);
    }

    @Override
    public String toString() {
        return "WorkItem{id=" + id + ", priority=" + priority + '}';
    }
}

public class PriorityQueueDemo {
    public static void main(String[] args) {
        PriorityBlockingQueue<WorkItem> queue = new PriorityBlockingQueue<>();

        Thread producer = new Thread(new WorkProducer(queue), "Producer");
        Thread consumer = new Thread(new WorkConsumer(queue), "Consumer");

        producer.start();
        consumer.start();
    }

    static class WorkProducer implements Runnable {
        private final PriorityBlockingQueue<WorkItem> queue;

        public WorkProducer(PriorityBlockingQueue<WorkItem> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    int priority = (int) (Math.random() * 10);
                    WorkItem task = new WorkItem(i, priority);
                    System.out.println(Thread.currentThread().getName() + " 生产: " + task);
                    queue.put(task);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class WorkConsumer implements Runnable {
        private final PriorityBlockingQueue<WorkItem> queue;

        public WorkConsumer(PriorityBlockingQueue<WorkItem> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    WorkItem task = queue.take();
                    System.out.println(Thread.currentThread().getName() + " 消费: " + task);
                    Thread.sleep((long) (Math.random() * 1000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

4. 线程池框架

基础线程池配置

ThreadPoolExecutor pool = new ThreadPoolExecutor(
    2,                           // 核心线程数
    4,                           // 最大线程数
    60,                          // 空闲时间
    TimeUnit.SECONDS,            // 时间单位
    new LinkedBlockingQueue<>(10), // 工作队列
    Executors.defaultThreadFactory(), // 线程工厂
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

定时任务线程池

import java.util.concurrent.*;

public class ScheduledPoolDemo {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2);

        scheduler.schedule(() -> {
            System.out.println("延迟执行任务");
        }, 3, TimeUnit.SECONDS);

        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定频率执行任务");
        }, 1, 2, TimeUnit.SECONDS);

        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟执行任务");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, 2, TimeUnit.SECONDS);

        scheduler.shutdown();
    }
}

5. 锁机制

可重入锁(ReentrantLock)

import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int value = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public void increase() {
        lock.lock();
        try {
            value++;
        } finally {
            lock.unlock();
        }
    }

    public void decrease() {
        lock.lock();
        try {
            value--;
        } finally {
            lock.unlock();
        }
    }

    public int getValue() {
        lock.lock();
        try {
            return value;
        } finally {
            lock.unlock();
        }
    }
}

读写锁(ReentrantReadWriteLock)

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DataCache {
    private final Map storage = new HashMap<>();
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

    public String fetch(String key) {
        readLock.lock();
        try {
            return storage.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public void store(String key, String value) {
        writeLock.lock();
        try {
            storage.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
}

6. 并发集合

并发哈希映射(ConcurrentHashMap)

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap map = new ConcurrentHashMap<>();
        map.put("A", 1);
        map.put("B", 2);

        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                System.out.println("读取: " + map.get("A"));
            });
        }

        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.submit(() -> {
                map.put("A", map.get("A") + 1);
                System.out.println("更新: " + map.get("A"));
            });
        }

        executor.shutdown();
        while (!executor.isTerminated()) {}

        System.out.println("最终结果: " + map);
    }
}

写时复制列表(CopyOnWriteArrayList)

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteDemo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("A");
        list.add("B");

        Thread reader = new Thread(() -> {
            for (String item : list) {
                System.out.println("读取: " + item);
            }
        });

        Thread writer = new Thread(() -> {
            list.add("C");
            list.remove("A");
            System.out.println("修改后: " + list);
        });

        reader.start();
        writer.start();

        try {
            reader.join();
            writer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("最终结果: " + list);
    }
}

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。