复杂IO密集接口优化
创始人
2024-06-02 06:33:01
0

背景

很多同学应该都会遇到复杂,涉及到N张表组装的数据接口,尤其是首页的某某列表(比如 美团外卖的商家列表,淘宝订单明细列表等).
如果按照通常的线性流程执行所有代码逻辑,那么接口的响应时间可以说直接爆炸(动辄几秒),产品不打死你,用户也要喷你.
为此,我结合一个实际栗子,来分享下优化的思路及成果.

业务描述

这是首页的赛程列表,该列表需要展示最近5日有比赛且关联游戏的列表数据.
数据如下:
在这里插入图片描述
这个列表接口需要2大业务域数据组成:

  1. (比赛域) 赛事,赛程,赛果,对阵关联队伍,队伍
  2. (游戏域) 赛程关联游戏,游戏内容(比如包含多少关卡)

共计7张表.

业务流程图

在这里插入图片描述

业务分析

该业务描述虽然只有一句话,但是实现很复杂.
最近5日有比赛且关联游戏的列表数据,拆解(不使用join的情况下):

  1. 查询有赛程的日期,并缓存.
  2. 查询有游戏的日期,并缓存.
  3. 按照查询条件的开始时间,循环直到查询出有交集的5天.
  4. 查询流程3计算得到的实际有赛程且有游戏的时间范围作为条件,查询出赛程
  5. 按日组装赛程,构造2维数组,并准备公用数据
  6. 按日遍历赛程列表
  7. 遍历赛程列表
  8. 查询并拼装7个表的数据,生成单条详情数据
  9. 输出比赛列表

流程3 是需求的难点,没法事先知道需要取多少数据才能得想要的结果.

基本实现

按照上述流程,不做其他优化,以线性流程去实现,在准备赛程1000条,游戏10000条的情况下,得到如下结果:
在这里插入图片描述
(笑哭)单次请求超过3秒,这个数据量上生产,分分钟给你爆掉.
当然,这个数据量其实超过正常情况,但怕就怕不正常的时候.

优化

源码包含业务敏感信息,就不黏贴了,形式参考下面的测试栗子.

优化思路

  1. 从取数逻辑入手,提高数据检索效率,介绍无用数据返回
  2. 优化算法,减少数据库访问
  3. 尽可能多的缓存少变化的数据
  4. 耗时任务以线程池方式执行
  5. 有缓存时,可能以循环而不是批量方式方式查询数据更高效

parallelStream并发流优化(有风险)

其实就是简单的以并发流的方式遍历赛程日.
在这里插入图片描述

单IO线程池优化

此处构造了专门的IO线程池处理赛程日的数据,以单独一天的赛程列表作为一个任务,并发处理.
在这里插入图片描述

主从IO线程池优化

此处构造了2个专门的IO线程池(主,从)分别处理赛程日的数据和对应日中的赛程列表数据,以单独一天的赛程列表作为主任务池的一个任务,以一天中的赛程作为子任务池的任务,并发处理.
在这里插入图片描述

终极杀招

如果数据变化不频繁,直接缓存比赛列表接口,然后通过赛程和游戏变动事件异步更新比赛列表.
这样接口响应时间直接能干到100ms以下. 这个就不测试了,因为该方案是最外层优化.
如果内层实现不够好,当赛程和游戏变化频繁时,单次构造列表花费3秒这样依然会导致服务down掉.
当然,此处我们还可以 同时检查时间和变化事件双条件满足才异步更新列表.

优化工具的源码

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** IO密集任务工具* Warning: {@link #asyncTaskQueue}, {@link #getAsyncTaskQueueResult} 依赖 ThreadLocal, 注意多线程下的问题* Warning: 警惕嵌套使用该工具,可能会导致死锁. 如果真需要嵌套,请使用master和worker模式.** @Description* @Author LionZhou* @Date 2023/2/21* @Version 1.0*/
@Slf4j
public class IOUtil {static final ThreadPoolTaskExecutor masterThreadPoolExecutor;static final ThreadLocal>> masterTaskQueue;static final ThreadPoolTaskExecutor workerThreadPoolExecutor;static final ThreadLocal>> workerTaskQueue;public static  Future async(Callable func) {return workerThreadPoolExecutor.submit(func);}public static  List> async(Collection> funcs) {ArrayList> list = new ArrayList<>(funcs.size());for (Callable func : funcs) {list.add(workerThreadPoolExecutor.submit(func));}return list;}public static void asyncTaskQueue(Callable func) {asyncTaskQueue(func, false);}/*** 提交异步任务** @param func     待执行任务* @param isMaster 是否使用master*/public static void asyncTaskQueue(Callable func, boolean isMaster) {ThreadPoolTaskExecutor threadPoolTaskExecutor = isMaster ? masterThreadPoolExecutor : workerThreadPoolExecutor;ThreadLocal>> taskQueue = isMaster ? masterTaskQueue : workerTaskQueue;Future future = threadPoolTaskExecutor.submit(func);List> futures = taskQueue.get();if (Objects.isNull(futures)) {futures = new ArrayList<>(32);taskQueue.set(futures);}futures.add(future);}public static  List getAsyncTaskQueueResult(Class clz) {return getAsyncTaskQueueResult(clz, false);}/*** 获取异步任务结果** @param clz      返回结果需要转换的泛型类型* @param isMaster 是否使用master*/public static  List getAsyncTaskQueueResult(Class clz, boolean isMaster) {ThreadLocal>> taskQueue = isMaster ? masterTaskQueue : workerTaskQueue;List> futures = taskQueue.get();if (Objects.isNull(futures)) {return Collections.emptyList();}ArrayList rs = new ArrayList<>(futures.size());try {for (Future future : futures) {rs.add((R) future.get());}} catch (InterruptedException | ExecutionException e) {log.error(e.getMessage(), e);throw new RuntimeException(e);} finally {taskQueue.remove();}return rs;}public static  Optional syncWaitDone(Callable func) {try {return Optional.of(async(func).get());} catch (InterruptedException | ExecutionException e) {log.error(e.getMessage(), e);}return Optional.empty();}public static  List> syncWaitDone(Collection> funcs) {List> list = async(funcs);ArrayList> rsList = new ArrayList<>(funcs.size());for (Future future : list) {try {rsList.add(Optional.of(future.get()));} catch (InterruptedException | ExecutionException e) {log.error(e.getMessage(), e);rsList.add(Optional.empty());}}return rsList;}/*** 自定义线程池,并注入MDC上下文,方便链路追踪** @param coreSize  核心线程数* @param queueSize 队列长度* @return 线程池*/public static ThreadPoolTaskExecutor taskExecutor(int coreSize, int queueSize) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {@Overridepublic void execute(@Nonnull Runnable task) {Map copyOfContextMap = MDC.getCopyOfContextMap();super.execute(() -> {MDC.setContextMap(Objects.nonNull(copyOfContextMap) ? copyOfContextMap : Collections.emptyMap());try {task.run();} catch (Exception e) {log.error(e.getMessage());} finally {MDC.clear();}});}@Overridepublic Future submit(@Nonnull Runnable task) {Map copyOfContextMap = MDC.getCopyOfContextMap();return super.submit(() -> {MDC.setContextMap(Objects.nonNull(copyOfContextMap) ? copyOfContextMap : Collections.emptyMap());try {task.run();} catch (Exception e) {log.error(e.getMessage());} finally {MDC.clear();}});}};executor.setCorePoolSize(coreSize);// 因为测试,暂时设置为core相同executor.setMaxPoolSize(coreSize);executor.setKeepAliveSeconds(120);executor.setQueueCapacity(queueSize);executor.initialize();return executor;}static {masterThreadPoolExecutor = taskExecutor(1, 50);workerThreadPoolExecutor = taskExecutor(10, 300);masterTaskQueue = new ThreadLocal<>();workerTaskQueue = new ThreadLocal<>();}public static void main(String[] args) {final Callable callB = () -> {try {Thread thread = Thread.currentThread();log.info("worker {},{}", thread.getThreadGroup().getName(), thread.getId());Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}return true;};Callable runnable = () -> {try {Thread thread = Thread.currentThread();for (int i = 0; i < 10; i++) {// 提交到worker线程池asyncTaskQueue(callB);}List async = getAsyncTaskQueueResult(Boolean.class);log.info("master {},{}", thread.getThreadGroup().getName(), thread.getId());} catch (Exception e) {throw new RuntimeException(e);}return true;};for (int i = 0; i < 8; i++) {// 提交到master线程池调用asyncTaskQueue(runnable, true);}long st = System.currentTimeMillis();List async = getAsyncTaskQueueResult(Boolean.class, true);long et = System.currentTimeMillis();log.info("{} ms", et - st);masterThreadPoolExecutor.shutdown();workerThreadPoolExecutor.shutdown();}
} 

测试结果

仅worker模式, master 0, worker 3

在这里插入图片描述

    private static void onlyWorker() {Callable runnable = () -> {try {for (int i = 0; i < 10; i++) {// 提交到worker线程池asyncTaskQueue(callB);}List async = getAsyncTaskQueueResult(Boolean.class);Thread thread = Thread.currentThread();log.info("master {},{}", thread.getThreadGroup().getName(), thread.getId());} catch (Exception e) {throw new RuntimeException(e);}return true;};for (int i = 0; i < 8; i++) {try {runnable.call();} catch (Exception e) {throw new RuntimeException(e);}}}public static void main(String[] args) {long st = System.currentTimeMillis();onlyWorker();long et = System.currentTimeMillis();log.info("{} ms", et - st);masterThreadPoolExecutor.shutdown();workerThreadPoolExecutor.shutdown();}
 

主从模式,master 1,worker 2

    private static void mixed() {Callable runnable = () -> {try {for (int i = 0; i < 10; i++) {// 提交到worker线程池asyncTaskQueue(callB);}List async = getAsyncTaskQueueResult(Boolean.class);Thread thread = Thread.currentThread();log.info("master {},{}", thread.getThreadGroup().getName(), thread.getId());} catch (Exception e) {throw new RuntimeException(e);}return true;};for (int i = 0; i < 8; i++) {// 提交到master线程池调用asyncTaskQueue(runnable, true);}List async = getAsyncTaskQueueResult(Boolean.class, true);}public static void main(String[] args) {long st = System.currentTimeMillis();mixed();long et = System.currentTimeMillis();log.info("{} ms", et - st);masterThreadPoolExecutor.shutdown();workerThreadPoolExecutor.shutdown();}
 

在这里插入图片描述

仅master,master 3,worker 0

    private static void onlyMaster() {Callable runnable = () -> {try {for (int i = 0; i < 10; i++) {// 提交到worker线程池callB.call();}Thread thread = Thread.currentThread();log.info("master {},{}", thread.getThreadGroup().getName(), thread.getId());} catch (Exception e) {throw new RuntimeException(e);}return true;};for (int i = 0; i < 8; i++) {// 提交到master线程池调用asyncTaskQueue(runnable, true);}List async = getAsyncTaskQueueResult(Boolean.class, true);}public static void main(String[] args) {long st = System.currentTimeMillis();onlyMaster();long et = System.currentTimeMillis();log.info("{} ms", et - st);masterThreadPoolExecutor.shutdown();workerThreadPoolExecutor.shutdown();}
 

在这里插入图片描述

结论

  1. 对于IO密集型业务,使用线程池和不使用响应时间差距巨大.
  2. 主从模式不一定就好于同等数量的worker模式,尤其是线程数较少且master任务不耗时的情况.

正确使用线程池,对于服务的响应有极大的提升.

思考: 为啥我要分master和worker分别处理2种任务? 放一起不行吗?

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...