页面查询多项数据组合的线程池设计 | 京东云技术团队
背景
我们应对并发场景时一般会采用下面方式去预估线程池的线程数量,比如QPS需求是1000,平均每个任务需要执行的时间是t秒,那么我们需要的线程数是t * 1000。
但是在一些情况下,这个t是不好估算的,即便是估算出来了,在实际的线程环境上也需要进行验证和微调。比如在本文所阐述分页查询的数据项组合场景中。
1、数据组合依赖不同的上游接接口, 它们的响应时间参差不齐,甚至差距还非常大。有些接口支持批量查询而另一些则不支持批量查询。有些接口因为性能问题还需要考虑降级和平滑方案。
2、为了提升用户体验,这里的查询设计了动态列,因此每一次访问所需要组合的数据项和数量也是不同的。
因此这里如果需要估算出一个合理的t是不太现实的。
方案
一种可动态调节的策略,根据监控的反馈对线程池进行微调。整体设计分为装配逻辑和线程池封装设计。
1、装配逻辑
查询结果,拆分分片(水平拆分),并行装配(垂直拆分),获得装配项列表(动态列), 并行装配每一项。
2、线程池封装
可调节的核心线程数、最大线程数、线程保持时间,队列大小,提交任务重试等待时间,提交任务重试次数。 固定异常拒绝策略。
调节参数:
字段 | 名称 | 说明 |
---|---|---|
corePoolSize | 核心线程数 | 参考线程池定义 |
maximumPoolSize | 最大线程数 | 参考线程池定义 |
keepAliveTime | 线程存活时间 | 参考线程池定义 |
queueSize | 队列长度 | 参考线程池定义 |
resubmitSleepMillis | 提交任务重试等待时间 | 添加任务被拒绝后重试时的等待时间 |
resubmitTimes | 提交任务重试次数 | 添加任务被拒绝后重试添加的最大次数 |
@Data private static class PoolPolicy { /** 核心线程数 */ private Integer corePoolSize; /** 最大线程数 */ private Integer maximumPoolSize; /** 线程存活时间 */ private Integer keepAliveTime; /** 队列容量 */ private Integer queueSize; /** 重试等待时间 */ private Long resubmitSleepMillis; /** 重试次数 */ private Integer resubmitTimes; }
创建线程池:
线程池的创建考虑了动态的需求,满足根据压测结果进行微调的要求。首先缓存旧的线程池后再创建新的线程,当新的线程池创建成功后再去关闭旧的线程池。保证在这个替换过程中不影响正在执行的业务。线程池使用了中断策略,用户可以及时感知到系统繁忙并保证了系统资源占用的安全。
public void reloadThreadPool(PoolPolicy poolPolicy) { if (poolPolicy == null) { throw new RuntimeException("The thread pool policy cannot be empty."); } if (poolPolicy.getCorePoolSize() == null) { poolPolicy.setCorePoolSize(0); } if (poolPolicy.getMaximumPoolSize() == null) { poolPolicy.setMaximumPoolSize(Runtime.getRuntime().availableProcessors() + 1); } if (poolPolicy.getKeepAliveTime() == null) { poolPolicy.setKeepAliveTime(60); } if (poolPolicy.getQueueSize() == null) { poolPolicy.setQueueSize(Runtime.getRuntime().availableProcessors() + 1); } if (poolPolicy.getResubmitSleepMillis() == null) { poolPolicy.setResubmitSleepMillis(200L); } if (poolPolicy.getResubmitTimes() == null) { poolPolicy.setResubmitTimes(5); } // - 线程池策略没有变化直接返回已有线程池。 ExecutorService original = this.executorService; this.executorService = new ThreadPoolExecutor( poolPolicy.getCorePoolSize(), poolPolicy.getMaximumPoolSize(), poolPolicy.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<>(poolPolicy.getQueueSize()), new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build(), new ThreadPoolExecutor.AbortPolicy()); this.poolPolicy = poolPolicy; if (original != null) { original.shutdownNow(); } }
任务提交:
线程池封装对象中使用的线程池拒绝策略是AbortPolicy,因此在线程数和阻塞队列到达上限后会触发异常。另外在这里为了保证提交的成功率利用重试策略实现了一定程度的延迟处理,具体场景中可以结合业务特点进行适当的调节和配置。
public <T> Future<T> submit(Callable<T> task) { RejectedExecutionException exception = null; Future<T> future = null; for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) { try { // - 添加任务 future = this.executorService.submit(task); exception = null; break; } catch (RejectedExecutionException e) { exception = e; this.theadSleep(this.poolPolicy.getResubmitSleepMillis()); } } if (exception != null) { throw exception; } return future; }
监控:
1、submit提交的监控
见代码中的「监控点①」,在submit方法中添加监控点,监控key的需要添线程池封装对象的线程名称前缀,用于区分具体的线程池对象。
「监控点①」用于监控添加任务的动作是否正常,以便对线程池对象及策略参数进行微调。
public <T> Future<T> submit(Callable<T> task) { // - 监控点① CallerInfo callerInfo = Profiler.registerInfo(UmpConstant.THREAD_POOL_WAP + threadNamePrefix, UmpConstant.APP_NAME, UmpConstant.UMP_DISABLE_HEART, UmpConstant.UMP_ENABLE_TP); RejectedExecutionException exception = null; Future<T> future = null; for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) { try { // - 添加任务 future = this.executorService.submit(task); exception = null; break; } catch (RejectedExecutionException e) { exception = e; this.theadSleep(this.poolPolicy.getResubmitSleepMillis()); } } if (exception != null) { // - 监控点① Profiler.functionError(callerInfo); throw exception; } // - 监控点① Profiler.registerInfoEnd(callerInfo); return future; }
2、线程池并行任务
见代码的「监控点②」,分别在添加任务和任务完成后。
「监控点②」实时统计在线程中执行的总任务数量,用于评估线程池的任务的数量的满载水平。
/** 任务并行数量统计 */ private AtomicInteger parallelTaskCount = new AtomicInteger(0); public <T> Future<T> submit(Callable<T> task) { RejectedExecutionException exception = null; Future<T> future = null; for (int i = 0; i < this.poolPolicy.getResubmitTimes(); i++) { try { // - 添加任务 future = this.executorService.submit(()-> { T rst = task.call(); // - 监控点② log.info("{} - Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.decrementAndGet()); return rst; }); // - 监控点② log.info("{} + Parallel task count {}", this.threadNamePrefix, this.parallelTaskCount.incrementAndGet()); exception = null; break; } catch (RejectedExecutionException e) { exception = e; this.theadSleep(this.poolPolicy.getResubmitSleepMillis()); } } if (exception != null) { throw exception; } return future; }
3、调节
线程池封装对象策略的调节时机
1)上线前基于流量预估的压测阶段;
2)上线后跟进监控数据和线程池中任务的满载水平进行人工微调,也可以通过JOB在指定的时间自动调整;
3)大促前依据往期大促峰值来调高相关参数。
线程池封装对象策略的调节经验
1)访问时长要求较低时,我们可以考虑调小线程数和阻塞队列,适当调大提交任务重试等待时间和次数,以便降低资源占用。
2)访问时长要求较高时,就需要调大线程数并保证相对较小的阻塞队列,调小提交任务的重试等待时间和次数甚至分别调成0和1(即关闭重试提交逻辑)。
作者:京东零售 王文明
来源:京东云开发者社区 转载请注明来源

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
多模态GPT-V出世!36种场景分析ChatGPT Vision能力,LMM将全面替代大语言模型? | 京东云技术团队
LMM将会全面替代大语言模型?人工智能新里程碑GPT-V美国预先公测,医疗领域/OCR实践+166页GPT-V试用报告首发解读 ChatGPT Vision,亦被广泛称为GPT-V或GPT-4V,代表了人工智能技术的新里程碑。作为LMM (Large Multimodal Model) 的代表,它不仅继承了LLM (Large Language Model) 的文本处理能力,还加入了图像处理的功能,实现了文本与图像的多模态交互。与传统的LLM相比,GPT-V更加强大和灵活,能够更深入地理解和生成与图像相关的内容。这种进化打开了无数新的应用可能性,从图像描述、创意设计到复杂的图文结合任务,GPT-4V都展现出了卓越的性能和广泛的潜力。 使用方法:GPT-V目前对于美国区ChatGPT Plus账户开放。 相关链接:ChatGPT can now see, hear, and speak 相关介绍:GPTV_System_Card.pdf 166页GPT-V试用报告:Dawn of LMMs: Preliminary Explorations with GPT-4V(ision)...
- 下一篇
云起开发平台 1.1 版本发布,支持后端渲染 Vue 页面
谁告诉你的,使用Vue就一定要前后端分离,云起开发平台1.1版本通过重写thinkphp-template模板引擎,使它能支持在后端渲染vue页面,从此不用打包,不用繁琐的代理处理跨域,不用vue-router,不用前后端都要重复控制权限问题,不用npm引入一大堆看不懂的js代码,你就能愉快的使用vue来编程。 如下所示,这是一段回收站功能的代码,不用打包,不用nodejs,不用ssr,所见即所得,响应快捷迅速。 <template> <el-card shadow="never" style="border:0;"> <yun-table :columns='recyclebin_("init")' search="{$search}" ref="yuntable" :common-search="false" :extend="extend" toolbar="refresh,restore,destroy,restoreall,clear"> <template #toolbar="{tool,selections}...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS8安装Docker,最新的服务器搭配容器使用
- Linux系统CentOS6、CentOS7手动修改IP地址
- 2048小游戏-低调大师作品
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS8编译安装MySQL8.0.19
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2整合Thymeleaf,官方推荐html解决方案