Java并发工具包核心组件详解
1. 原子操作组件
在多线程环境中执行数值计算时,若缺乏适当的同步机制,往往会导致数据不一致问题。Java提供了原子类来解决此类线程安全问题。
基础原子类型
- AtomicInteger: 提供线程安全的整型操作
- AtomicLong: 提供线程安全的长整型操作
- AtomicBoolean: 提供线程安全的布尔值操作
public class AtomicDemo {
public static void main(String[] args) {
int previousVal = 0;
AtomicInteger counter = new AtomicInteger(0);
previousVal = counter.getAndSet(12);
System.out.println("previous:" + previousVal + "; current:" + counter);
previousVal = counter.getAndIncrement();
System.out.println("previous:" + previousVal + "; current:" + counter);
previousVal = counter.getAndAdd(-10);
System.out.println("previous:" + previousVal + "; current:" + counter);
}
}
引用类型原子操作
AtomicReference 支持对象引用的原子性更新:
public class BankAccount {
private final String holder;
private final int balance;
public BankAccount(String holder, int balance) {
this.holder = holder;
this.balance = balance;
}
// getter方法和toString省略
}
AtomicReference<BankAccount> accountRef = new AtomicReference<>(new BankAccount("张三", 100));
// 原子性更新余额
accountRef.updateAndGet(acc -> new BankAccount(acc.getHolder(), acc.getBalance() + 50));
数组类型原子操作
- AtomicIntegerArray: 整型数组原子操作
- AtomicLongArray: 长整型数组原子操作
- AtomicReferenceArray: 引用数组原子操作
public class AtomicArrayDemo {
public static void main(String[] args) {
int oldValue = 0;
int[] data = {-2, -1, 1, 2, 3, 4};
AtomicIntegerArray array = new AtomicIntegerArray(data);
for (int i = 0; i < data.length; i++) {
System.out.println(array.get(i));
}
oldValue = array.getAndSet(0, 2);
System.out.println("old:" + oldValue + "; array:" + array);
oldValue = array.getAndIncrement(0);
System.out.println("old:" + oldValue + "; array:" + array);
oldValue = array.getAndAdd(0, 5);
System.out.println("old:" + oldValue + "; array:" + array);
}
}
2. 线程同步工具
倒计时门闩(CountDownLatch)
允许一个或多个线程等待其他线程完成特定操作:
import java.util.concurrent.CountDownLatch;
public class LatchDemo {
public static void main(String[] args) {
CountDownLatch gate = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Worker(gate, "Worker-" + (i + 1)).start();
}
try {
System.out.println("主线程等待所有任务完成...");
gate.await();
System.out.println("所有任务已完成,主线程继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Worker extends Thread {
private final CountDownLatch latch;
public Worker(CountDownLatch latch, String name) {
super(name);
this.latch = latch;
}
@Override
public void run() {
System.out.println(getName() + " 开始执行任务...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " 任务完成");
latch.countDown();
}
}
}
循环屏障(CyclicBarrier)
使一组线程相互等待至某个公共屏障点:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程已到达屏障点,开始下一步操作");
});
for (int i = 0; i < 3; i++) {
new TaskWorker(barrier, "Worker-" + (i + 1)).start();
}
}
static class TaskWorker extends Thread {
private final CyclicBarrier barrier;
public TaskWorker(CyclicBarrier barrier, String name) {
super(name);
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(getName() + " 开始执行任务...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " 任务完成,等待其他线程");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(getName() + " 继续执行后续任务");
}
}
}
信号量(Semaphore)
控制同时访问特定资源的线程数量:
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3, true);
for (int i = 0; i < 5; i++) {
new ServiceWorker(semaphore, "Worker-" + (i + 1)).start();
}
}
static class ServiceWorker extends Thread {
private final Semaphore semaphore;
public ServiceWorker(Semaphore semaphore, String name) {
super(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(getName() + " 获得许可,开始执行任务...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(getName() + " 任务完成,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
}
阶段器(Phaser)
提供灵活的多阶段同步机制:
import java.util.concurrent.Phaser;
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
for (int i = 0; i < 3; i++) {
new PhaseWorker(phaser, "Worker-" + (i + 1)).start();
}
}
static class PhaseWorker extends Thread {
private final Phaser phaser;
public PhaseWorker(Phaser phaser, String name) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(getName() + " 开始第一阶段任务...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(getName() + " 第一阶段完成,等待其他线程");
phaser.arriveAndAwaitAdvance();
System.out.println(getName() + " 开始第二阶段任务...");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(getName() + " 第二阶段完成,等待其他线程");
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3. 阻塞队列实现
数组阻塞队列(ArrayBlockingQueue)
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(new DataProducer(queue), "Producer");
Thread consumer = new Thread(new DataConsumer(queue), "Consumer");
producer.start();
consumer.start();
}
static class DataProducer implements Runnable {
private final ArrayBlockingQueue<Integer> queue;
public DataProducer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + " 生产: " + i);
queue.put(i);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class DataConsumer implements Runnable {
private final ArrayBlockingQueue<Integer> queue;
public DataConsumer(ArrayBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费: " + item);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
链表阻塞队列(LinkedBlockingQueue)
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedQueueDemo {
public static void main(String[] args) {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
Thread producer = new Thread(new ItemProducer(queue), "Producer");
Thread consumer = new Thread(new ItemConsumer(queue), "Consumer");
producer.start();
consumer.start();
}
static class ItemProducer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
public ItemProducer(LinkedBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + " 生产: " + i);
queue.put(i);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ItemConsumer implements Runnable {
private final LinkedBlockingQueue<Integer> queue;
public ItemConsumer(LinkedBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Integer item = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费: " + item);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
优先级阻塞队列(PriorityBlockingQueue)
import java.util.concurrent.PriorityBlockingQueue;
class WorkItem implements Comparable<WorkItem> {
private int id;
private int priority;
public WorkItem(int id, int priority) {
this.id = id;
this.priority = priority;
}
public int getPriority() { return priority; }
@Override
public int compareTo(WorkItem other) {
return Integer.compare(this.priority, other.priority);
}
@Override
public String toString() {
return "WorkItem{id=" + id + ", priority=" + priority + '}';
}
}
public class PriorityQueueDemo {
public static void main(String[] args) {
PriorityBlockingQueue<WorkItem> queue = new PriorityBlockingQueue<>();
Thread producer = new Thread(new WorkProducer(queue), "Producer");
Thread consumer = new Thread(new WorkConsumer(queue), "Consumer");
producer.start();
consumer.start();
}
static class WorkProducer implements Runnable {
private final PriorityBlockingQueue<WorkItem> queue;
public WorkProducer(PriorityBlockingQueue<WorkItem> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 10);
WorkItem task = new WorkItem(i, priority);
System.out.println(Thread.currentThread().getName() + " 生产: " + task);
queue.put(task);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class WorkConsumer implements Runnable {
private final PriorityBlockingQueue<WorkItem> queue;
public WorkConsumer(PriorityBlockingQueue<WorkItem> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
WorkItem task = queue.take();
System.out.println(Thread.currentThread().getName() + " 消费: " + task);
Thread.sleep((long) (Math.random() * 1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4. 线程池框架
基础线程池配置
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(10), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
定时任务线程池
import java.util.concurrent.*;
public class ScheduledPoolDemo {
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(2);
scheduler.schedule(() -> {
System.out.println("延迟执行任务");
}, 3, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率执行任务");
}, 1, 2, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟执行任务");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 2, TimeUnit.SECONDS);
scheduler.shutdown();
}
}
5. 锁机制
可重入锁(ReentrantLock)
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int value = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increase() {
lock.lock();
try {
value++;
} finally {
lock.unlock();
}
}
public void decrease() {
lock.lock();
try {
value--;
} finally {
lock.unlock();
}
}
public int getValue() {
lock.lock();
try {
return value;
} finally {
lock.unlock();
}
}
}
读写锁(ReentrantReadWriteLock)
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DataCache {
private final Map storage = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
public String fetch(String key) {
readLock.lock();
try {
return storage.get(key);
} finally {
readLock.unlock();
}
}
public void store(String key, String value) {
writeLock.lock();
try {
storage.put(key, value);
} finally {
writeLock.unlock();
}
}
}
6. 并发集合
并发哈希映射(ConcurrentHashMap)
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentMapDemo {
public static void main(String[] args) {
ConcurrentHashMap map = new ConcurrentHashMap<>();
map.put("A", 1);
map.put("B", 2);
ExecutorService executor = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("读取: " + map.get("A"));
});
}
for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
map.put("A", map.get("A") + 1);
System.out.println("更新: " + map.get("A"));
});
}
executor.shutdown();
while (!executor.isTerminated()) {}
System.out.println("最终结果: " + map);
}
}
写时复制列表(CopyOnWriteArrayList)
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
Thread reader = new Thread(() -> {
for (String item : list) {
System.out.println("读取: " + item);
}
});
Thread writer = new Thread(() -> {
list.add("C");
list.remove("A");
System.out.println("修改后: " + list);
});
reader.start();
writer.start();
try {
reader.join();
writer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终结果: " + list);
}
}