并发工具类的组合使用技巧
一、并发工具类组合使用概述
1.1 为什么需要组合使用并发工具类
Java提供了丰富的并发工具类,如CountDownLatch、CyclicBarrier、Semaphore、Phaser、CompletableFuture等。这些工具类各自解决特定的并发问题,但在实际开发中,单一工具类往往无法满足复杂的并发需求。通过组合使用这些工具类,我们可以构建更加强大、灵活的并发解决方案。
1.2 组合使用的优势
- 解决复杂问题:单一工具类只能解决特定问题,组合使用可以处理更复杂的并发场景
- 提高代码可读性:使用现成的工具类组合,比手动实现复杂的同步逻辑更清晰
- 增强可维护性:工具类经过充分测试,组合使用可以减少bug
- 优化性能:合理的组合可以减少锁的争用,提高并发性能
- 实现复杂的业务流程:如多阶段任务、条件触发、异步处理等
1.3 常见的工具类组合场景
- 多阶段任务:使用
Phaser或CountDownLatch+CyclicBarrier - 资源池管理:使用
Semaphore+ConcurrentHashMap - 异步任务编排:使用
CompletableFuture+CountDownLatch - 限流与负载均衡:使用
Semaphore+AtomicInteger - 复杂的业务流程:多种工具类的组合使用
二、核心并发工具类回顾
2.1 CountDownLatch
CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。
核心特性:
- 计数器只能递减,不能重置
- 当计数器减为0时,所有等待的线程被唤醒
- 常用于等待多个任务完成
示例:
CountDownLatch latch = new CountDownLatch(3);
// 线程1
new Thread(() -> {
// 执行任务
latch.countDown();
}).start();
// 主线程等待所有任务完成
latch.await();2.2 CyclicBarrier
CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点。
核心特性:
- 计数器可以重置,可重复使用
- 当所有线程到达屏障点时,触发屏障操作
- 常用于多个线程需要同步执行的场景
示例:
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障操作
System.out.println("所有线程到达屏障点");
});
// 线程1
new Thread(() -> {
// 执行任务
barrier.await();
}).start();2.3 Semaphore
Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。
核心特性:
- 可以控制资源的访问权限
- 支持公平和非公平模式
- 常用于限流、资源池等场景
示例:
Semaphore semaphore = new Semaphore(5); // 允许5个线程同时访问
// 线程1
try {
semaphore.acquire(); // 获取许可
// 访问资源
} finally {
semaphore.release(); // 释放许可
}2.4 Phaser
Phaser是一个灵活的同步屏障,支持动态调整参与线程数。
核心特性:
- 支持动态添加/移除参与线程
- 支持多阶段任务
- 支持中断和超时
示例:
Phaser phaser = new Phaser(3);
// 线程1
new Thread(() -> {
// 阶段1
phaser.arriveAndAwaitAdvance();
// 阶段2
phaser.arriveAndAwaitAdvance();
// 退出
phaser.arriveAndDeregister();
}).start();2.5 CompletableFuture
CompletableFuture是一个异步编程工具,支持链式调用和组合操作。
核心特性:
- 支持异步执行任务
- 支持链式调用和组合
- 支持异常处理
- 支持多任务协调
示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "result1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "result2";
});
// 组合两个任务
CompletableFuture<String> combined = future1.thenCombine(future2,
(result1, result2) -> result1 + " " + result2);三、并发工具类组合使用模式
3.1 CountDownLatch + CyclicBarrier:多阶段任务协调
场景:需要将任务分为多个阶段,每个阶段需要等待所有线程完成后才能进入下一个阶段。
实现思路:
- 使用
CyclicBarrier协调每个阶段内的线程 - 使用
CountDownLatch等待整个任务完成
示例:
public class MultiStageTask {
private static final int THREAD_COUNT = 5;
private static final int STAGE_COUNT = 3;
private final CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
System.out.println("阶段完成");
});
private final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
public void execute() {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
// 提交任务
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
executor.submit(() -> {
try {
for (int stage = 0; stage < STAGE_COUNT; stage++) {
System.out.println("线程 " + threadId + " 执行阶段 " + stage);
// 模拟任务执行
Thread.sleep(1000);
// 等待所有线程完成当前阶段
barrier.await();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
// 等待所有线程完成所有阶段
latch.await();
System.out.println("所有任务完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
public static void main(String[] args) {
new MultiStageTask().execute();
}
}3.2 Semaphore + ConcurrentHashMap:资源池管理
场景:需要管理一组可重用的资源,如数据库连接池、线程池等。
实现思路:
- 使用
Semaphore控制资源的访问权限 - 使用
ConcurrentHashMap存储资源的状态 - 实现资源的获取、使用和释放
示例:
public class ResourcePool<T> {
private final Semaphore semaphore;
private final ConcurrentHashMap<T, Boolean> resourceMap;
/**
* 构造函数
* @param resources 资源列表
*/
public ResourcePool(List<T> resources) {
this.semaphore = new Semaphore(resources.size());
this.resourceMap = new ConcurrentHashMap<>();
// 初始化资源池
for (T resource : resources) {
resourceMap.put(resource, true); // true表示资源可用
}
}
/**
* 获取资源
* @return 资源
* @throws InterruptedException 中断异常
*/
public T acquire() throws InterruptedException {
semaphore.acquire();
return getAvailableResource();
}
/**
* 释放资源
* @param resource 资源
*/
public void release(T resource) {
if (resourceMap.containsKey(resource)) {
resourceMap.put(resource, true);
semaphore.release();
}
}
/**
* 获取可用资源
* @return 可用资源
*/
private T getAvailableResource() {
while (true) {
for (Map.Entry<T, Boolean> entry : resourceMap.entrySet()) {
if (entry.getValue() && resourceMap.replace(entry.getKey(), true, false)) {
return entry.getKey();
}
}
// 短暂休眠,避免CPU空转
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
/**
* 资源池大小
* @return 大小
*/
public int size() {
return resourceMap.size();
}
/**
* 可用资源数量
* @return 数量
*/
public int availableCount() {
return (int) resourceMap.values().stream().filter(available -> available).count();
}
public static void main(String[] args) {
// 创建资源池
List<String> resources = Arrays.asList("resource1", "resource2", "resource3");
ResourcePool<String> pool = new ResourcePool<>(resources);
// 测试资源池
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
String resource = pool.acquire();
System.out.println("任务 " + taskId + " 获取资源: " + resource);
// 模拟使用资源
Thread.sleep(500);
pool.release(resource);
System.out.println("任务 " + taskId + " 释放资源: " + resource);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}3.3 CompletableFuture + CountDownLatch:异步任务编排
场景:需要执行多个异步任务,并等待所有任务完成后进行汇总处理。
实现思路:
- 使用
CompletableFuture执行异步任务 - 使用
CountDownLatch等待所有异步任务完成 - 对任务结果进行汇总处理
示例:
public class AsyncTaskOrchestrator {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void executeTasks() {
List<CompletableFuture<String>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(5);
// 提交5个异步任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("执行任务: " + taskId);
Thread.sleep(1000 + (int) (Math.random() * 1000));
return "结果 " + taskId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "错误 " + taskId;
} finally {
latch.countDown();
}
}, executor);
futures.add(future);
}
// 等待所有任务完成
try {
latch.await();
System.out.println("所有异步任务完成");
// 处理结果
List<String> results = new ArrayList<>();
for (CompletableFuture<String> future : futures) {
results.add(future.get());
}
System.out.println("任务结果: " + results);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
public static void main(String[] args) {
new AsyncTaskOrchestrator().executeTasks();
}
}3.4 Semaphore + AtomicInteger:限流与计数器
场景:需要限制系统的并发请求数量,并统计请求的处理情况。
实现思路:
- 使用
Semaphore限制并发请求数 - 使用
AtomicInteger统计请求数、成功数和失败数 - 实现请求的限流和监控
示例:
public class RateLimiter {
private final Semaphore semaphore;
private final AtomicInteger totalRequests = new AtomicInteger(0);
private final AtomicInteger successfulRequests = new AtomicInteger(0);
private final AtomicInteger failedRequests = new AtomicInteger(0);
/**
* 构造函数
* @param maxConcurrentRequests 最大并发请求数
*/
public RateLimiter(int maxConcurrentRequests) {
this.semaphore = new Semaphore(maxConcurrentRequests);
}
/**
* 处理请求
* @param request 请求
* @return 响应
* @throws InterruptedException 中断异常
*/
public String processRequest(String request) throws InterruptedException {
totalRequests.incrementAndGet();
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
// 模拟处理请求
System.out.println("处理请求: " + request);
Thread.sleep((int) (Math.random() * 1000));
// 随机失败率
if (Math.random() < 0.1) {
failedRequests.incrementAndGet();
throw new RuntimeException("请求处理失败");
}
successfulRequests.incrementAndGet();
return "响应: " + request;
} finally {
semaphore.release();
}
} else {
failedRequests.incrementAndGet();
return "请求被限流";
}
}
/**
* 获取统计信息
* @return 统计信息
*/
public String getStatistics() {
return String.format("总请求数: %d, 成功数: %d, 失败数: %d",
totalRequests.get(), successfulRequests.get(), failedRequests.get());
}
public static void main(String[] args) {
RateLimiter limiter = new RateLimiter(3);
ExecutorService executor = Executors.newFixedThreadPool(10);
// 模拟20个请求
for (int i = 0; i < 20; i++) {
final int requestId = i;
executor.submit(() -> {
try {
String response = limiter.processRequest("请求 " + requestId);
System.out.println(response);
} catch (Exception e) {
System.out.println("请求 " + requestId + " 失败: " + e.getMessage());
}
});
}
// 等待一段时间后打印统计信息
executor.submit(() -> {
try {
Thread.sleep(5000);
System.out.println(limiter.getStatistics());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.shutdown();
}
}3.5 Phaser + CompletableFuture:复杂流程编排
场景:需要执行复杂的业务流程,包括多个阶段,每个阶段可能包含异步任务。
实现思路:
- 使用
Phaser管理多阶段任务 - 使用
CompletableFuture执行每个阶段的异步任务 - 实现复杂的业务流程编排
示例:
public class ComplexProcess {
private final Phaser phaser = new Phaser(1); // 主线程作为初始参与者
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void execute() {
try {
// 阶段1: 数据准备
System.out.println("阶段1: 数据准备");
phase1();
phaser.arriveAndAwaitAdvance();
// 阶段2: 并行处理
System.out.println("阶段2: 并行处理");
phase2();
phaser.arriveAndAwaitAdvance();
// 阶段3: 结果汇总
System.out.println("阶段3: 结果汇总");
phase3();
phaser.arriveAndAwaitAdvance();
System.out.println("所有阶段完成");
} finally {
phaser.arriveAndDeregister(); // 主线程退出
executor.shutdown();
}
}
private void phase1() {
// 阶段1的任务
for (int i = 0; i < 3; i++) {
final int taskId = i;
phaser.register(); // 注册新的参与者
CompletableFuture.runAsync(() -> {
try {
System.out.println("阶段1 - 任务 " + taskId);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister(); // 任务完成,退出
}
}, executor);
}
}
private void phase2() {
// 阶段2的任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
phaser.register();
CompletableFuture.runAsync(() -> {
try {
System.out.println("阶段2 - 任务 " + taskId);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
}, executor);
}
}
private void phase3() {
// 阶段3的任务
phaser.register();
CompletableFuture.runAsync(() -> {
try {
System.out.println("阶段3 - 汇总任务");
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
}, executor);
}
public static void main(String[] args) {
new ComplexProcess().execute();
}
}四、最佳实践与注意事项
4.1 组合使用的最佳实践
- 选择合适的工具类:根据问题的性质选择合适的工具类组合
- 最小化同步范围:只在必要的地方使用同步工具,避免过度同步
- 设置合理的超时时间:避免线程无限期等待
- 正确处理异常:确保在异常情况下工具类的状态正确
- 使用线程池:配合线程池使用,避免频繁创建线程
- 考虑性能影响:不同的工具类组合有不同的性能特性,选择适合的组合
4.2 常见陷阱与避免方法
死锁风险:
- 原因:工具类使用不当可能导致死锁
- 避免方法:避免循环等待,设置超时时间,正确释放资源
资源泄漏:
- 原因:忘记释放资源(如Semaphore的许可)
- 避免方法:使用try-finally块确保资源释放
线程饥饿:
- 原因:某些线程长时间无法获得资源
- 避免方法:使用公平模式,合理设置资源数量
过度同步:
- 原因:使用过多的同步工具或同步范围过大
- 避免方法:减少同步点,使用无锁数据结构
工具类状态不一致:
- 原因:在异常情况下工具类的状态没有正确更新
- 避免方法:正确处理异常,确保工具类状态一致性
4.3 性能优化建议
- 选择高性能的工具类:如使用
CompletableFuture替代手动线程管理 - 减少上下文切换:合理设置线程池大小,避免过多线程
- 使用无锁数据结构:如
ConcurrentHashMap、Atomic类等 - 批量处理:将多个小任务合并为大任务,减少同步开销
- 异步处理:使用
CompletableFuture等异步工具,提高系统吞吐量
五、实战案例分析
5.1 案例一:分布式任务调度系统
场景:需要开发一个分布式任务调度系统,支持任务的提交、执行、监控和结果汇总。
需求分析:
- 支持多节点分布式部署
- 支持任务的优先级和依赖关系
- 支持任务的分片和并行执行
- 支持任务的监控和结果查询
工具类组合方案:
- 使用
CountDownLatch等待任务的所有分片完成 - 使用
CyclicBarrier协调任务的不同阶段 - 使用
CompletableFuture处理异步任务 - 使用
Semaphore限制节点的并发任务数 - 使用
ConcurrentHashMap存储任务状态
核心代码:
public class DistributedTaskScheduler {
private final ExecutorService executor = Executors.newFixedThreadPool(20);
private final ConcurrentHashMap<String, TaskStatus> taskStatusMap = new ConcurrentHashMap<>();
private final Semaphore nodeSemaphore = new Semaphore(10); // 每个节点最多执行10个任务
/**
* 提交分布式任务
* @param task 任务
* @param shardCount 分片数
* @return 任务ID
*/
public String submitTask(Task task, int shardCount) {
String taskId = UUID.randomUUID().toString();
TaskStatus status = new TaskStatus(taskId, shardCount);
taskStatusMap.put(taskId, status);
// 启动任务的所有分片
CountDownLatch latch = new CountDownLatch(shardCount);
for (int i = 0; i < shardCount; i++) {
final int shardId = i;
executor.submit(() -> {
if (nodeSemaphore.tryAcquire()) {
try {
executeTaskShard(task, taskId, shardId);
status.incrementCompletedShards();
} catch (Exception e) {
status.incrementFailedShards();
e.printStackTrace();
} finally {
latch.countDown();
nodeSemaphore.release();
}
} else {
status.incrementFailedShards();
latch.countDown();
}
});
}
// 等待所有分片完成并更新任务状态
executor.submit(() -> {
try {
latch.await();
status.setCompleted(true);
System.out.println("任务 " + taskId + " 完成,成功分片数: " + status.getCompletedShards() + ", 失败分片数: " + status.getFailedShards());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return taskId;
}
private void executeTaskShard(Task task, String taskId, int shardId) {
System.out.println("执行任务分片: " + taskId + "-" + shardId);
// 执行任务分片逻辑
try {
Thread.sleep(1000 + (int) (Math.random() * 2000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 获取任务状态
* @param taskId 任务ID
* @return 任务状态
*/
public TaskStatus getTaskStatus(String taskId) {
return taskStatusMap.get(taskId);
}
/**
* 任务状态类
*/
public static class TaskStatus {
private final String taskId;
private final int totalShards;
private final AtomicInteger completedShards = new AtomicInteger(0);
private final AtomicInteger failedShards = new AtomicInteger(0);
private volatile boolean completed = false;
public TaskStatus(String taskId, int totalShards) {
this.taskId = taskId;
this.totalShards = totalShards;
}
// getters and setters
public void incrementCompletedShards() {
completedShards.incrementAndGet();
}
public void incrementFailedShards() {
failedShards.incrementAndGet();
}
public void setCompleted(boolean completed) {
this.completed = completed;
}
}
/**
* 任务接口
*/
public interface Task {
void execute();
}
public static void main(String[] args) {
DistributedTaskScheduler scheduler = new DistributedTaskScheduler();
// 提交任务
String taskId = scheduler.submitTask(() -> {}, 5);
System.out.println("提交任务: " + taskId);
// 监控任务状态
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
TaskStatus status = scheduler.getTaskStatus(taskId);
if (status != null) {
System.out.println("任务状态: " + status.getCompletedShards() + "/" + status.totalShards + ", 完成: " + status.completed);
if (status.completed) {
System.out.println("任务执行完成");
scheduler.executor.shutdown();
}
}
}, 0, 1, TimeUnit.SECONDS);
}
}5.2 案例二:高并发订单处理系统
场景:需要开发一个高并发的订单处理系统,支持订单的创建、支付、发货等流程。
需求分析:
- 支持每秒1000+的订单创建请求
- 订单处理包含多个阶段:创建、支付、发货、完成
- 需要保证订单数据的一致性
- 需要支持订单的查询和监控
工具类组合方案:
- 使用
Semaphore限制系统的并发订单处理数 - 使用
CompletableFuture处理订单的异步流程 - 使用
CountDownLatch等待订单的所有阶段完成 - 使用
Atomic类处理订单状态的原子更新 - 使用
ConcurrentHashMap存储订单数据
核心代码:
public class OrderProcessingSystem {
private final ExecutorService executor = Executors.newFixedThreadPool(50);
private final ConcurrentHashMap<String, Order> orderMap = new ConcurrentHashMap<>();
private final Semaphore processingSemaphore = new Semaphore(100); // 最多同时处理100个订单
/**
* 创建订单
* @param userId 用户ID
* @param amount 订单金额
* @return 订单
*/
public CompletableFuture<Order> createOrder(String userId, double amount) {
return CompletableFuture.supplyAsync(() -> {
try {
if (processingSemaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
String orderId = UUID.randomUUID().toString();
Order order = new Order(orderId, userId, amount, OrderStatus.CREATED);
orderMap.put(orderId, order);
System.out.println("创建订单: " + orderId);
return order;
} finally {
processingSemaphore.release();
}
} else {
throw new RuntimeException("系统繁忙,请稍后重试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("创建订单失败");
}
}, executor);
}
/**
* 支付订单
* @param orderId 订单ID
* @return 订单
*/
public CompletableFuture<Order> payOrder(String orderId) {
return CompletableFuture.supplyAsync(() -> {
Order order = orderMap.get(orderId);
if (order == null) {
throw new RuntimeException("订单不存在");
}
if (order.getStatus() == OrderStatus.CREATED &&
order.setStatus(OrderStatus.CREATED, OrderStatus.PAID)) {
System.out.println("支付订单: " + orderId);
return order;
} else {
throw new RuntimeException("订单状态错误");
}
}, executor);
}
/**
* 发货订单
* @param orderId 订单ID
* @return 订单
*/
public CompletableFuture<Order> shipOrder(String orderId) {
return CompletableFuture.supplyAsync(() -> {
Order order = orderMap.get(orderId);
if (order == null) {
throw new RuntimeException("订单不存在");
}
if (order.getStatus() == OrderStatus.PAID &&
order.setStatus(OrderStatus.PAID, OrderStatus.SHIPPED)) {
System.out.println("发货订单: " + orderId);
return order;
} else {
throw new RuntimeException("订单状态错误");
}
}, executor);
}
/**
* 完成订单
* @param orderId 订单ID
* @return 订单
*/
public CompletableFuture<Order> completeOrder(String orderId) {
return CompletableFuture.supplyAsync(() -> {
Order order = orderMap.get(orderId);
if (order == null) {
throw new RuntimeException("订单不存在");
}
if (order.getStatus() == OrderStatus.SHIPPED &&
order.setStatus(OrderStatus.SHIPPED, OrderStatus.COMPLETED)) {
System.out.println("完成订单: " + orderId);
return order;
} else {
throw new RuntimeException("订单状态错误");
}
}, executor);
}
/**
* 处理完整订单流程
* @param userId 用户ID
* @param amount 订单金额
* @return 订单
*/
public CompletableFuture<Order> processOrder(String userId, double amount) {
CountDownLatch latch = new CountDownLatch(3); // 支付、发货、完成三个阶段
return createOrder(userId, amount)
.thenCompose(this::payOrder)
.thenAccept(order -> latch.countDown())
.thenCompose(v -> shipOrder(orderMap.get(userId)))
.thenAccept(order -> latch.countDown())
.thenCompose(v -> completeOrder(orderMap.get(userId)))
.thenAccept(order -> latch.countDown())
.thenApply(v -> orderMap.get(userId));
}
/**
* 订单状态枚举
*/
public enum OrderStatus {
CREATED, PAID, SHIPPED, COMPLETED, CANCELLED
}
/**
* 订单类
*/
public static class Order {
private final String orderId;
private final String userId;
private final double amount;
private final AtomicReference<OrderStatus> status;
private final Instant createTime;
private Instant payTime;
private Instant shipTime;
private Instant completeTime;
public Order(String orderId, String userId, double amount, OrderStatus status) {
this.orderId = orderId;
this.userId = userId;
this.amount = amount;
this.status = new AtomicReference<>(status);
this.createTime = Instant.now();
}
/**
* 原子更新订单状态
* @param expected 期望状态
* @param newStatus 新状态
* @return 是否更新成功
*/
public boolean setStatus(OrderStatus expected, OrderStatus newStatus) {
boolean updated = status.compareAndSet(expected, newStatus);
if (updated) {
updateTimeFields(newStatus);
}
return updated;
}
private void updateTimeFields(OrderStatus newStatus) {
Instant now = Instant.now();
switch (newStatus) {
case PAID:
this.payTime = now;
break;
case SHIPPED:
this.shipTime = now;
break;
case COMPLETED:
this.completeTime = now;
break;
default:
break;
}
}
// getters and setters
}
public static void main(String[] args) {
OrderProcessingSystem system = new OrderProcessingSystem();
// 模拟高并发订单处理
for (int i = 0; i < 100; i++) {
final int userId = i;
system.createOrder("user-" + userId, 100.0)
.thenCompose(system::payOrder)
.thenCompose(system::shipOrder)
.thenCompose(system::completeOrder)
.thenAccept(order -> System.out.println("订单处理完成: " + order.getOrderId()))
.exceptionally(e -> {
System.err.println("订单处理失败: " + e.getMessage());
return null;
});
}
// 等待一段时间后关闭系统
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
system.executor.shutdown();
System.out.println("系统关闭");
}, 10, TimeUnit.SECONDS);
}
}5.3 案例三:实时数据处理管道
场景:需要开发一个实时数据处理管道,支持数据的采集、清洗、转换、分析和存储。
需求分析:
- 支持每秒10000+的数据处理能力
- 数据处理包含多个阶段:采集、清洗、转换、分析、存储
- 每个阶段可以并行处理
- 需要支持数据的监控和错误处理
工具类组合方案:
- 使用
Phaser管理数据处理的多个阶段 - 使用
CompletableFuture处理数据的异步流 - 使用
Semaphore限制每个阶段的并发处理数 - 使用
ConcurrentLinkedQueue处理数据的缓冲 - 使用
Atomic类处理数据统计信息
核心代码:
public class DataProcessingPipeline {
private final ExecutorService executor = Executors.newFixedThreadPool(50);
private final Phaser phaser = new Phaser(1); // 主线程作为初始参与者
private final ConcurrentLinkedQueue<Data> rawDataQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Data> cleanedDataQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Data> transformedDataQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Data> analyzedDataQueue = new ConcurrentLinkedQueue<>();
private final Semaphore collectionSemaphore = new Semaphore(20);
private final Semaphore cleaningSemaphore = new Semaphore(20);
private final Semaphore transformationSemaphore = new Semaphore(20);
private final Semaphore analysisSemaphore = new Semaphore(20);
private final Semaphore storageSemaphore = new Semaphore(20);
private final AtomicLong totalProcessed = new AtomicLong(0);
private final AtomicLong failedProcessed = new AtomicLong(0);
public void start() {
try {
// 启动数据采集
startDataCollection();
// 等待所有阶段准备就绪
phaser.arriveAndAwaitAdvance();
System.out.println("数据处理管道启动完成");
// 监控数据处理情况
monitorDataProcessing();
// 运行一段时间后停止
Thread.sleep(30000);
stop();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void startDataCollection() {
// 注册数据采集阶段
phaser.register();
executor.submit(() -> {
try {
System.out.println("数据采集阶段启动");
// 模拟数据采集
for (int i = 0; i < 10000; i++) {
if (collectionSemaphore.tryAcquire(1, TimeUnit.MILLISECONDS)) {
try {
Data data = new Data("data-" + i, System.currentTimeMillis(), "raw-data");
rawDataQueue.offer(data);
// 启动数据清洗
startDataCleaning(data);
} finally {
collectionSemaphore.release();
}
}
// 控制数据采集速度
Thread.sleep(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
});
}
private void startDataCleaning(Data rawData) {
// 注册数据清洗阶段
phaser.register();
executor.submit(() -> {
try {
if (cleaningSemaphore.tryAcquire(1, TimeUnit.MILLISECONDS)) {
try {
// 模拟数据清洗
Thread.sleep(10);
Data cleanedData = new Data(rawData.getId(), rawData.getTimestamp(), "cleaned-" + rawData.getContent());
cleanedDataQueue.offer(cleanedData);
// 启动数据转换
startDataTransformation(cleanedData);
} finally {
cleaningSemaphore.release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
});
}
private void startDataTransformation(Data cleanedData) {
// 注册数据转换阶段
phaser.register();
executor.submit(() -> {
try {
if (transformationSemaphore.tryAcquire(1, TimeUnit.MILLISECONDS)) {
try {
// 模拟数据转换
Thread.sleep(15);
Data transformedData = new Data(cleanedData.getId(), cleanedData.getTimestamp(), "transformed-" + cleanedData.getContent());
transformedDataQueue.offer(transformedData);
// 启动数据分析
startDataAnalysis(transformedData);
} finally {
transformationSemaphore.release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
});
}
private void startDataAnalysis(Data transformedData) {
// 注册数据分析阶段
phaser.register();
executor.submit(() -> {
try {
if (analysisSemaphore.tryAcquire(1, TimeUnit.MILLISECONDS)) {
try {
// 模拟数据分析
Thread.sleep(20);
Data analyzedData = new Data(transformedData.getId(), transformedData.getTimestamp(), "analyzed-" + transformedData.getContent());
analyzedDataQueue.offer(analyzedData);
// 启动数据存储
startDataStorage(analyzedData);
} finally {
analysisSemaphore.release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
phaser.arriveAndDeregister();
}
});
}
private void startDataStorage(Data analyzedData) {
// 注册数据存储阶段
phaser.register();
executor.submit(() -> {
try {
if (storageSemaphore.tryAcquire(1, TimeUnit.MILLISECONDS)) {
try {
// 模拟数据存储
Thread.sleep(25);
// 数据存储完成
totalProcessed.incrementAndGet();
} finally {
storageSemaphore.release();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
failedProcessed.incrementAndGet();
} finally {
phaser.arriveAndDeregister();
}
});
}
private void monitorDataProcessing() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("数据处理统计: ");
System.out.println(" 总处理数: " + totalProcessed.get());
System.out.println(" 失败数: " + failedProcessed.get());
System.out.println(" 各队列大小: ");
System.out.println(" 原始数据: " + rawDataQueue.size());
System.out.println(" 清洗后: " + cleanedDataQueue.size());
System.out.println(" 转换后: " + transformedDataQueue.size());
System.out.println(" 分析后: " + analyzedDataQueue.size());
}, 0, 5, TimeUnit.SECONDS);
}
private void stop() {
System.out.println("停止数据处理管道");
phaser.arriveAndDeregister(); // 主线程退出
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("数据处理管道停止完成");
System.out.println("最终统计: 总处理数: " + totalProcessed.get() + ", 失败数: " + failedProcessed.get());
}
/**
* 数据类
*/
public static class Data {
private final String id;
private final long timestamp;
private final String content;
public Data(String id, long timestamp, String content) {
this.id = id;
this.timestamp = timestamp;
this.content = content;
}
// getters
}
public static void main(String[] args) {
new DataProcessingPipeline().start();
}
}六、总结与建议
6.1 核心要点总结
工具类组合的优势:组合使用并发工具类可以解决更复杂的并发问题,提高代码的可读性和可维护性
常见组合模式:
CountDownLatch + CyclicBarrier:多阶段任务协调Semaphore + ConcurrentHashMap:资源池管理CompletableFuture + CountDownLatch:异步任务编排Semaphore + AtomicInteger:限流与计数器Phaser + CompletableFuture:复杂流程编排
最佳实践:
- 选择合适的工具类组合
- 最小化同步范围
- 正确处理异常
- 使用线程池
- 考虑性能影响
避免陷阱:
- 死锁风险
- 资源泄漏
- 线程饥饿
- 过度同步
- 状态不一致
6.2 进阶建议
深入学习Java并发API:了解每个工具类的实现原理和性能特性
学习函数式编程:函数式编程的思想有助于更好地理解和使用
CompletableFuture等异步工具研究并发设计模式:如生产者-消费者、读写锁、线程池等设计模式
关注Java并发的新特性:如Project Loom的虚拟线程、结构化并发等
实践是关键:通过实际项目练习并发工具类的组合使用,积累经验
6.3 最后的思考
并发工具类的组合使用是Java并发编程的高级技巧,需要深入理解每个工具类的特性和适用场景。通过合理的组合,可以构建出高效、可靠、可维护的并发系统。
在实际开发中,我们应该根据具体的业务需求和性能要求,选择合适的工具类组合,并遵循最佳实践,避免常见的陷阱。同时,不断学习和实践,提高自己的并发编程能力。
记住,并发编程的目标是在保证数据一致性和系统可靠性的前提下,提高系统的性能和吞吐量。合理使用并发工具类的组合,可以帮助我们更好地实现这个目标。