Java 异步任务工厂(二)生产实战案例与性能调优
1. 引言
上一篇搭建了异步任务工厂的基础框架,相当于盖房子打好了地基。现在该装修了——看看这套框架在真实业务中怎么用,以及如何让它跑得又快又稳。
接下来通过电商订单处理和社交消息推送两个实际案例,看看异步任务工厂在生产环境中到底能解决什么问题,顺便聊聊线程管理的一些优化技巧。
2. 电商订单处理:让用户秒看到"下单成功"
2.1 异步库存检查:别让用户盯着转圈圈
想象一下双11的场景:用户疯狂下单,如果每次都要等库存检查完才能提示"下单成功",用户早就不耐烦跑了。聪明的做法是先告诉用户"订单已提交",让用户安心,然后悄悄在后台检查库存。
这就是异步库存检查的核心思想:用户体验和系统处理分离。
public class InventoryService {
// 模拟库存数据存储
private Map<Long, Integer> inventoryMap = new HashMap<>();
public InventoryService() {
// 初始化库存数据
inventoryMap.put(1L, 100);
inventoryMap.put(2L, 50);
}
/**
* 检查商品库存是否充足
* @param productIds 商品ID列表
* @return 库存是否充足
*/
public boolean checkInventory(List<Long> productIds) {
for (Long productId : productIds) {
Integer stock = inventoryMap.get(productId);
if (stock == null || stock <= 0) {
return false;
}
}
return true;
}
}
public class OrderService {
/**
* 异步检查库存
* @param orderId 订单ID
* @param productIds 商品ID列表
* @return 库存检查结果的Future对象
*/
public static Future<Boolean> asyncCheckInventory(Long orderId, List<Long> productIds) {
return AsyncManager.me().asyncExecute(() -> {
InventoryService inventoryService = SpringUtils.getBean(InventoryService.class);
return inventoryService.checkInventory(productIds);
});
}
/**
* 处理订单:先响应用户,再检查库存
*/
public void processOrder(Long orderId, List<Long> productIds) {
// 启动异步库存检查任务
Future<Boolean> inventoryCheckFuture = asyncCheckInventory(orderId, productIds);
// 立即响应用户,不让用户等待
System.out.println("订单已提交,请稍候查询订单状态");
try {
// 获取库存检查结果
boolean hasStock = inventoryCheckFuture.get();
if (hasStock) {
System.out.println("库存充足,订单处理中...");
} else {
System.out.println("库存不足,订单处理失败");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码解析:
- InventoryService:库存管家,专门负责检查商品库存
- asyncCheckInventory:异步任务创建器,把库存检查丢给后台线程
- processOrder:订单处理流程,先安抚用户,再处理业务
这么做的好处一目了然:用户点完下单立马看到"订单已提交",心里踏实;系统在后台悄悄检查库存,用户该干嘛干嘛,不用干等。
2.2 异步支付验证:别让用户怀疑人生
支付是电商的命根子,但调用支付宝、微信接口经常要等几秒钟。用户点完支付看着转圈圈,心里就开始嘀咕:“到底成功了没有?要不要再点一次?”
异步支付验证就是为了解决这个问题:用户一点支付,立马显示"支付处理中",让用户知道系统在工作,后台再慢慢验证结果。
public class PaymentService {
/**
* 验证第三方支付结果
* @param paymentId 支付ID
* @return 支付是否成功
*/
public boolean verifyPayment(String paymentId) {
// 实际开发中这里会调用支付宝、微信等支付接口
// 模拟验证逻辑
return "validPaymentId".equals(paymentId);
}
}
public class OrderService {
/**
* 异步验证支付结果
* @param orderId 订单ID
* @param paymentId 支付ID
* @return 支付验证结果的Future对象
*/
public static Future<Boolean> asyncVerifyPayment(Long orderId, String paymentId) {
return AsyncManager.me().asyncExecute(() -> {
PaymentService paymentService = SpringUtils.getBean(PaymentService.class);
return paymentService.verifyPayment(paymentId);
});
}
/**
* 处理支付:先安抚用户,再验证结果
*/
public void processPayment(Long orderId, String paymentId) {
// 启动异步支付验证任务
Future<Boolean> paymentVerifyFuture = asyncVerifyPayment(orderId, paymentId);
// 立即给用户反馈,减少等待焦虑
System.out.println("支付信息已提交,正在验证...");
try {
// 获取支付验证结果
boolean paymentValid = paymentVerifyFuture.get();
if (paymentValid) {
System.out.println("支付成功,订单已确认");
} else {
System.out.println("支付失败,请重新支付");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
关键设计思路:
- PaymentService:支付验证专家,负责与第三方支付平台打交道
- asyncVerifyPayment:异步任务包装器,把耗时的支付验证丢给后台
- processPayment:支付流程控制器,先给用户信心,再处理结果
这样用户体验就好多了:不用盯着屏幕发呆,知道系统在处理就行;系统也能稳稳当当地在后台验证支付结果。
3. 社交系统消息推送:让千万用户同时收到通知
想想微信朋友圈的点赞通知,或者微博热搜推送。一条内容火了,可能要同时给几十万、几百万人发通知。要是用老办法同步处理,用户A点个赞,系统得等所有推送发完才能处理用户B的操作,这不是要疯吗?
异步推送就像雇了一群快递小哥:用户一点赞,系统马上记下来,然后把推送任务扔给"快递小哥"们,让他们慢慢送。
3.1 异步消息推送:快递员模式
在社交系统中,消息及时推送对用户体验至关重要。当用户发布动态或收到新消息时,系统需将这些信息推送给相关用户。通过异步任务实现消息推送,可避免因推送任务耗时导致系统卡顿。
public class MessagePushService {
/**
* 单个用户消息推送
* @param userId 用户ID
* @param message 推送内容
*/
public boolean pushMessage(Long userId, String message) {
// 实际开发中这里会调用推送平台API(极光推送、友盟等)
System.out.println("向用户 " + userId + " 推送消息: " + message);
return true;
}
}
public class SocialEventService {
/**
* 异步消息推送:把推送任务丢给后台处理
* @param userId 目标用户ID
* @param message 推送内容
* @return 推送任务的Future对象
*/
public static Future<Boolean> asyncPushMessage(Long userId, String message) {
return AsyncManager.me().asyncExecute(() -> {
MessagePushService pushService = SpringUtils.getBean(MessagePushService.class);
return pushService.pushMessage(userId, message);
});
}
/**
* 处理新消息:先告诉发送者成功,再慢慢推送给接收者
* @param senderId 发送者ID
* @param receiverId 接收者ID
* @param message 消息内容
*/
public void handleNewMessage(Long senderId, Long receiverId, String message) {
// 启动异步推送任务
Future<Boolean> pushMessageFuture = asyncPushMessage(receiverId, message);
// 立即告诉发送者消息已发送,不让用户等待
System.out.println("消息已发送");
try {
boolean pushSuccess = pushMessageFuture.get();
if (!pushSuccess) {
// 推送失败处理:记录日志、重试或降级处理
System.out.println("消息推送失败");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个设计的核心思想:
- MessagePushService:专业推送员,只管发消息
- asyncPushMessage:任务分发器,把推送任务交给后台
- handleNewMessage:消息处理器,先安抚发送者,再处理推送
这样做的好处显而易见:用户发完消息立马看到"已发送",推送在后台慢慢进行。就算推送出了问题,也不会影响用户正常聊天。
3.2 批量消息推送:群发通知的艺术
想象一下微信群里有人发红包,系统要同时给群里所有人发推送。如果一个一个发,1000人的群就要发1000次,太慢了!聪明的做法是批量处理:把所有用户打包,一次性推送。
public class MessagePushService {
/**
* 批量推送:一次性给多个用户发消息
* @param userIds 用户ID列表
* @param message 推送内容
* @return 推送是否成功
*/
public boolean pushMessages(List<Long> userIds, String message) {
// 实际开发中这里会调用批量推送API,比一个一个发效率高很多
for (Long userId : userIds) {
System.out.println("向用户 " + userId + " 推送消息: " + message);
}
return true;
}
}
public class SocialEventService {
/**
* 异步批量推送:把群发任务丢给后台处理
* @param userIds 目标用户列表
* @param message 推送内容
* @return 批量推送任务的Future对象
*/
public static Future<Boolean> asyncPushMessages(List<Long> userIds, String message) {
return AsyncManager.me().asyncExecute(() -> {
MessagePushService pushService = SpringUtils.getBean(MessagePushService.class);
return pushService.pushMessages(userIds, message);
});
}
/**
* 发送系统公告:先告诉管理员已发布,再慢慢推送给用户
* @param message 公告内容
*/
public void sendSystemNotice(String message) {
// 获取所有需要接收公告的用户(实际开发中从数据库查询)
List<Long> allUserIds = Arrays.asList(1L, 2L, 3L, 4L, 5L);
// 启动异步批量推送任务
Future<Boolean> pushMessagesFuture = asyncPushMessages(allUserIds, message);
// 立即告诉管理员公告已发布,不让管理员等待
System.out.println("系统公告已发布");
try {
boolean pushSuccess = pushMessagesFuture.get();
if (!pushSuccess) {
// 批量推送失败处理
System.out.println("系统公告推送失败");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
批量推送的优势:
- MessagePushService:批量推送专家,一次处理多个用户
- asyncPushMessages:批量任务分发器,把群发任务交给后台
- sendSystemNotice:公告发布器,先确认发布,再批量推送
这招特别适合发系统公告、活动通知:管理员点完发布立马看到"已发布",系统在后台一口气推送给所有用户,比一个一个发效率高太多了。
4. 线程管理优化:让工人们更高效地干活
前面用异步任务解决了不少问题,但还有个关键问题没解决:线程池就像工厂里的工人团队,人手不够活干不完,人太多又浪费工资。怎么找到最合适的人数?怎么让大家配合得更默契?
这就是线程管理要解决的核心问题。
4.1 从临时工到正式工:告别 SimpleAsyncTaskExecutor
上一篇用的 SimpleAsyncTaskExecutor,就像是来一个活就临时找一个人,干完就散伙。这样有个大问题:活多的时候临时工满天飞,把工厂都挤爆了!
聪明的老板会怎么做?建个固定的工人团队,让他们轮流干活,这就是 ThreadPoolExecutor 的思路。
import java.util.concurrent.*;
public class AsyncManager {
private static AsyncManager me = new AsyncManager();
// 核心工人数:平时保持5个工人在岗
private static final int CORE_POOL_SIZE = 5;
// 最大工人数:忙的时候最多可以有10个工人
private static final int MAX_POOL_SIZE = 10;
// 临时工闲置时间:临时工闲置60秒后就让他们下班
private static final long KEEP_ALIVE_TIME = 60L;
// 任务队列:工人都忙的时候,任务先排队等待(最多排100个)
private static final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, // 核心工人数
MAX_POOL_SIZE, // 最大工人数
KEEP_ALIVE_TIME, // 临时工空闲时间
TimeUnit.SECONDS, // 时间单位
workQueue // 任务排队区
);
private AsyncManager() {
}
public static AsyncManager me() {
return me;
}
public <T> Future<T> asyncExecute(Callable<T> task) {
return threadPoolExecutor.submit(task);
}
}
这样设计的好处:
- 固定班底:平时5个人就够了,不用天天招人
- 忙时加人:真忙起来可以临时加到10个人
- 排队机制:人手不够时新任务排队,最多排100个
- 自动下班:临时工闲着超过60秒就让他们回家
这样既不用频繁招人浪费成本,又能应对突然的忙碌期,还不会因为人太多而浪费资源。
4.2 线程池体检:看看工人们干活怎么样
就像工厂要定期检查机器一样,线程池也得时不时看看情况。工人们累不累?活是不是堆成山了?有没有人在偷懒?
4.2.1 关键指标:工厂运行情况一目了然
- 人手够不够:现在有几个人在干活?忙得过来吗?
- 排队长不长:有多少活在排队?用户会不会等急了?
- 效率高不高:每个活要干多久?有没有特别磨蹭的?
4.2.2 智能调节:看情况加人减人
就像餐厅饭点多安排服务员,深夜减少人手一样,线程池也可以看情况调整人数。
/**
* 智能调整工人数量
*/
public class SmartThreadPoolManager {
/**
* 根据当前负载调整线程池大小
*/
public void adjustThreadPool(ThreadPoolExecutor executor) {
// 获取当前排队任务数
int queueSize = executor.getQueue().size();
// 获取当前活跃工人数
int activeCount = executor.getActiveCount();
if (queueSize > 50) {
// 任务排队太多,增加工人
int newCoreSize = Math.min(executor.getCorePoolSize() + 2, 20);
executor.setCorePoolSize(newCoreSize);
System.out.println("任务太多了,增加工人到: " + newCoreSize);
} else if (queueSize < 10 && activeCount < executor.getCorePoolSize() / 2) {
// 任务很少,减少工人
int newCoreSize = Math.max(executor.getCorePoolSize() - 1, 3);
executor.setCorePoolSize(newCoreSize);
System.out.println("任务不多,减少工人到: " + newCoreSize);
}
}
}
调整的基本思路:
- 活太多 → 加人:排队超过50个活时,多叫几个人来
- 活太少 → 减人:排队不到10个活,而且大半人闲着时,让一些人回家
- 设个底线:最少留3个人,最多20个人,别搞极端
4.3 任务优先级:重要的事情先做
现实中有些事比较急(比如处理支付),有些事可以缓缓(比如发邮件通知)。就像银行VIP客户可以插队一样,异步任务也可以分个轻重缓急。
4.3.1 给任务贴标签
需要给每个任务贴个标签,写上"紧急"、“普通”、“不急”,这样系统就知道先做哪个了。
import java.util.concurrent.Callable;
/**
* 带优先级的任务包装器
* 就像给每个任务贴上紧急程度的标签
*/
public class PriorityTask<T> implements Callable<T> {
private final Callable<T> task; // 实际要执行的任务
private final int priority; // 优先级:数字越大越紧急
private final String taskName; // 任务名称,方便调试
public PriorityTask(Callable<T> task, int priority, String taskName) {
this.task = task;
this.priority = priority;
this.taskName = taskName;
}
@Override
public T call() throws Exception {
System.out.println("开始执行优先级 " + priority + " 的任务: " + taskName);
return task.call();
}
public int getPriority() {
return priority;
}
public String getTaskName() {
return taskName;
}
}
设计思路:
- 数字越大越急:支付任务给10分,发邮件给1分
- 起个名字:方便调试时知道在干什么
- 包装一下:不改原来的逻辑,就是套个优先级的壳
4.3.2 VIP通道:让重要任务插队
普通排队是先来先服务,优先级队列就像银行VIP通道,重要的可以插队。用 PriorityBlockingQueue 就能实现这个功能。
import java.util.concurrent.*;
/**
* 支持优先级的异步任务管理器
* 就像是一个智能的任务调度员
*/
public class PriorityAsyncManager {
private static PriorityAsyncManager me = new PriorityAsyncManager();
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 60L;
// 优先级队列:VIP任务可以插队
private static final PriorityBlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<>(100,
(r1, r2) -> {
// 比较两个任务的优先级
if (r1 instanceof PriorityTask && r2 instanceof PriorityTask) {
PriorityTask<?> task1 = (PriorityTask<?>) r1;
PriorityTask<?> task2 = (PriorityTask<?>) r2;
// 优先级高的排在前面(数字大的优先级高)
return Integer.compare(task2.getPriority(), task1.getPriority());
}
return 0; // 非优先级任务按原顺序
});
private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
workQueue
);
private PriorityAsyncManager() {
}
public static PriorityAsyncManager me() {
return me;
}
/**
* 提交带优先级的异步任务
* @param task 要执行的任务
* @param priority 优先级(数字越大越紧急)
* @param taskName 任务名称
*/
public <T> Future<T> asyncExecute(Callable<T> task, int priority, String taskName) {
PriorityTask<T> priorityTask = new PriorityTask<>(task, priority, taskName);
return threadPoolExecutor.submit(priorityTask);
}
/**
* 提交普通异步任务(默认优先级为1)
*/
public <T> Future<T> asyncExecute(Callable<T> task, String taskName) {
return asyncExecute(task, 1, taskName);
}
}
工作原理:
- 自动排序:新任务来了,自动按优先级插到合适位置
- 重要的先做:支付任务(10分)肯定排在发邮件(1分)前面
- 兼容老任务:没设优先级的任务还是按原来的顺序
实际使用示例:让重要任务插队
public class OrderService {
public void processUrgentOrder() {
// 紧急支付任务:优先级10(最高)
Future<Boolean> paymentFuture = PriorityAsyncManager.me().asyncExecute(() -> {
// 处理支付逻辑
System.out.println("处理紧急支付...");
return true;
}, 10, "紧急支付处理");
// 普通邮件通知:优先级1(最低)
Future<Boolean> emailFuture = PriorityAsyncManager.me().asyncExecute(() -> {
// 发送邮件通知
System.out.println("发送邮件通知...");
return true;
}, 1, "邮件通知");
// 库存更新:优先级5(中等)
Future<Boolean> inventoryFuture = PriorityAsyncManager.me().asyncExecute(() -> {
// 更新库存
System.out.println("更新库存...");
return true;
}, 5, "库存更新");
// 执行顺序:支付处理 → 库存更新 → 邮件通知
}
}
优先级怎么设:
- 支付相关:8-10分(最急)
- 库存操作:5-7分(比较急)
- 通知推送:1-3分(不急)
- 日志记录:1分(最不急)
5. 总结
这篇文章带大家从简单的异步任务用法,一路走到了线程管理优化。咱们来回顾一下都学了什么:
-
实战案例的收获:
- 电商下单流程:学会了用异步让下单、查库存、验支付同时进行,用户体验直接起飞
- 社交消息推送:掌握了批量异步处理,千万用户同时收消息不再是天方夜谭
-
线程管理的进步:
- 告别临时工:用
ThreadPoolExecutor替换SimpleAsyncTaskExecutor,不用每次都临时找人干活了 - 智能调节:学会了看线程池的脸色,忙的时候多叫人,闲的时候让人回家
- VIP插队:重要的事情先做,支付任务不用排在发邮件后面了
- 告别临时工:用
-
设计思路的转变:
- 用户体验优先:先安抚用户,再慢慢处理业务
- 资源合理利用:既不浪费人力也不让系统累趴下
- 系统稳定可靠:有监控、能降级、不怕出错
- 点赞
- 收藏
- 关注作者
评论(0)