利用DUCC配置平台实现一个动态化线程池
作者:京东零售 张宾
1.背景
在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。
本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。
2.代码实现
当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。
setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:
setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。
Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。
基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:
(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;
(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;
(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;
接下来代码一一实现:
(1)动态线程池类
/** * 动态线程池 * */ public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { }
(2)动态线程池配置定时刷新类
@Slf4j public class DynamicThreadPoolRefresh implements InitializingBean { /** * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor. */ private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>(); /** * @param threadPoolBeanName * @param threadPoolTaskExecutor */ public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) { log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor())); DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor); } @Override public void afterPropertiesSet() throws Exception { this.refresh(); //创建定时任务线程池 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build()); //延迟1秒执行,每个1分钟check一次 executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS); } private void refresh() { String dynamicThreadPool = ""; try { if (DTP_REGISTRY.isEmpty()) { log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty"); return; } dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL); if (StringUtils.isBlank(dynamicThreadPool)) { log.debug("DynamicThreadPool refresh dynamicThreadPool not config"); return; } log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool); List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() { }); if (CollectionUtils.isEmpty(threadPoolPropertiesList)) { log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool); return; } for (ThreadPoolProperties properties : threadPoolPropertiesList) { doRefresh(properties); } } catch (Exception e) { log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e); } } /** * @param properties */ private void doRefresh(ThreadPoolProperties properties) { if (StringUtils.isBlank(properties.getThreadPoolBeanName()) || properties.getCorePoolSize() < 1 || properties.getMaxPoolSize() < 1 || properties.getMaxPoolSize() < properties.getCorePoolSize()) { log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties); return; } DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName()); if (Objects.isNull(threadPoolTaskExecutor)) { log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName()); return; } ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize()) && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName()); return; } if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) { threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize()); log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize()); } if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize()); log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize()); } ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp); } private class RefreshThreadPoolConfig extends TimerTask { private RefreshThreadPoolConfig() { } @Override public void run() { DynamicThreadPoolRefresh.this.refresh(); } } }
线程池配置类
@Data public class ThreadPoolProperties { /** * 线程池名称 */ private String threadPoolBeanName; /** * 线程池核心线程数量 */ private int corePoolSize; /** * 线程池最大线程池数量 */ private int maxPoolSize; }
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key
ducc配置平台使用见:https://cf.jd.com/pages/viewpage.action?pageId=403477057
动态线程池配置key:dynamic.thread.pool
配置value:
[ { "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor", "corePoolSize": 32, "maxPoolSize": 128 } ]
(4) 应用启动刷新应用本地动态线程池配置
@Slf4j public class DynamicThreadPoolPostProcessor implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DynamicThreadPoolTaskExecutor) { DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean); } return bean; } }
3.动态线程池应用
动态线程池Bean声明
<!-- 普通线程池 --> <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper"> <!-- 核心线程数,默认为 --> <property name="corePoolSize" value="128"/> <!-- 最大线程数,默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="512"/> <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="500"/> <!-- 线程池维护线程所允许的空闲时间,默认为60s --> <property name="keepAliveSeconds" value="60"/> <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --> <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --> <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <!-- 动态线程池 --> <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor"> <!-- 核心线程数,默认为 --> <property name="corePoolSize" value="32"/> <!-- 最大线程数,默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="128"/> <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="500"/> <!-- 线程池维护线程所允许的空闲时间,默认为60s --> <property name="keepAliveSeconds" value="60"/> <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --> <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 --> <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <!-- 动态线程池刷新配置 --> <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/> <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
业务类注入Spring Bean后,直接使用即可
@Resource private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor; Runnable asyncTask = ()->{...}; CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
4.小结
本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
关于 DataLeap 中的 Notebook,你想知道的都在这
更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 DataLeap 是火山引擎数智平台 VeDI 旗下的大数据研发治理套件产品,帮助用户快速完成数据集成、开发、运维、治、资产、安全等全套数据中台建设,降低工作成本和数据维护成本、挖掘数据价值、为企业决策提供数据支撑。 本文主要详细讲述 DataLeap 中的 Notebook ,包括前期选型、技术路线、架构升级、调度方案、以及未来工作等五部分重点内容,带你详细了解 Notebook。 概述 Notebook 是一种支持 REPL 模式的开发环境。所谓「REPL」,即「读取-求值-输出」循环:输入一段代码,立刻得到相应的结果,并继续等待下一次输入。它通常使得探索性的开发和调试更加便捷。在 Notebook 环境,你可以交互式地在其中编写你的代码、运行代码、查看输出、可视化数据并查看结果,使用起来非常灵活。 在数据开发领域,Notebook 广泛应用于数据清理和转换、数值模拟、统计建模、数据可视化、构建和训练机器学习模型等方面。 但是显然,做数据开发,只有 Notebook 是不够的。在火山引擎 Data...
- 下一篇
如何从0开始搭建 Vue 组件库
作者:京东零售 陈艳春 前言: 组件设计是通过对功能及视觉表达中元素的拆解、归纳、重组,并基于可被复用的目的,形成规范化的组件,通过多维度组合来构建整个设计方案,將这些组件整理在一起,便形成组件库。本文我们主要讲述基于Vant CLI的自建组件库。Vant CLI 是一个基于 Vite 实现的 Vue 组件库构建工具,通过 Vant CLI 可以快速搭建一套功能完备的 Vue 组件库。 建立组件库的意义 首先组件库可以给我们降本提效,其次可以保持视觉风格统一以及交互一致,可以帮助我们快速构建使用场景,便于多个项目后续迭代升级 。 视觉风格统一以及交互的一致性,可以减少用户学习成本培养用户习惯,让产品拥有良好的用户体验。比如一个四级地址的选择组件,在整个产品中应该就一种交互方式,如果一会是滚动选择,一会是点击选择,会让用户操作起来比较烦躁,统一交互可以减少用户学习成本。 新产品上线后,还需要不断的去完善,在迭代过程中可能会新增其他功能,这时候我们就可以只修改组件库一套代码,所有不同项目相同组件就可以达到了迭代升级的效果。 如何创建组件库 一、梳理组件清单 首先梳理出项目中样式相同的模块,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS关闭SELinux安全模块
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Linux系统CentOS6、CentOS7手动修改IP地址
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- MySQL8.0.19开启GTID主从同步CentOS8
- Mario游戏-低调大师作品