开发者社区 > 博文 > 利用DUCC配置平台实现一个动态化线程池
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

利用DUCC配置平台实现一个动态化线程池

  • 24****
  • 2023-02-01
  • IP归属:北京
  • 17720浏览

    1.背景

    在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

    本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

    2.代码实现

    当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

    setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:


    setMaximumPoolSize方法:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁

    Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。


    基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

    (1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

    (2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

    (3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

    (4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

    接下来代码一一实现:

    (1)动态线程池类

    /**
     * 动态线程池
     *
     */
    public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    }
    

    (2)动态线程池配置定时刷新类

    @Slf4j
    public class DynamicThreadPoolRefresh implements InitializingBean {
        /**
         * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
         */
        private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
    
        /**
         * @param threadPoolBeanName
         * @param threadPoolTaskExecutor
         */
        public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
            log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
            DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            this.refresh();
            //创建定时任务线程池
            ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
            //延迟1秒执行,每个1分钟check一次
            executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
        }
    
        private void refresh() {
            String dynamicThreadPool = "";
            try {
                if (DTP_REGISTRY.isEmpty()) {
                    log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                    return;
                }
                dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
                if (StringUtils.isBlank(dynamicThreadPool)) {
                    log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                    return;
                }
                log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
                List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
                });
                if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                    log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                    return;
                }
                for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                    doRefresh(properties);
                }
            } catch (Exception e) {
                log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
            }
        }
    
        /**
         * @param properties
         */
        private void doRefresh(ThreadPoolProperties properties) {
            if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                    || properties.getCorePoolSize() < 1
                    || properties.getMaxPoolSize() < 1
                    || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
                log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
                return;
            }
            DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
            if (Objects.isNull(threadPoolTaskExecutor)) {
                log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
                return;
            }
            ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
            if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                    && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
                log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
                return;
            }
            if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
                threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
                log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
            }
            if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
                threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
                log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
            }
           
            ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
            log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
        }
    
        private class RefreshThreadPoolConfig extends TimerTask {
            private RefreshThreadPoolConfig() {
            }
    
            @Override
            public void run() {
                DynamicThreadPoolRefresh.this.refresh();
            }
        }
    
    }
    

    线程池配置类

    @Data
    public class ThreadPoolProperties {
        /**
         * 线程池名称
         */
        private String threadPoolBeanName;
        /**
         * 线程池核心线程数量
         */
        private int corePoolSize;
        /**
         * 线程池最大线程池数量
         */
        private int maxPoolSize;
    }
    

    (3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

    ducc配置平台使用见:https://cf.jd.com/pages/viewpage.action?pageId=403477057

    动态线程池配置key:dynamic.thread.pool

    配置value:

    [
      {
        "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
        "corePoolSize": 32,
        "maxPoolSize": 128
      }
    ]
    

    (4) 应用启动刷新应用本地动态线程池配置

    @Slf4j
    public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if (bean instanceof DynamicThreadPoolTaskExecutor) {
                DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
            }
            return bean;
        }
    }
    

    3.动态线程池应用

    动态线程池Bean声明

        <!-- 普通线程池 -->
        <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
            <!-- 核心线程数,默认为 -->
            <property name="corePoolSize" value="128"/>
            <!-- 最大线程数,默认为Integer.MAX_VALUE -->
            <property name="maxPoolSize" value="512"/>
            <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
            <property name="queueCapacity" value="500"/>
            <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
            <property name="keepAliveSeconds" value="60"/>
            <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
            <property name="rejectedExecutionHandler">
                <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
                <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
                <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
                <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
                <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
            </property>
        </bean>
        <!-- 动态线程池 -->
        <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
            <!-- 核心线程数,默认为 -->
            <property name="corePoolSize" value="32"/>
            <!-- 最大线程数,默认为Integer.MAX_VALUE -->
            <property name="maxPoolSize" value="128"/>
            <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
            <property name="queueCapacity" value="500"/>
            <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
            <property name="keepAliveSeconds" value="60"/>
            <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
            <property name="rejectedExecutionHandler">
                <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
                <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
                <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
                <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
                <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
            </property>
        </bean>
        <!-- 动态线程池刷新配置 -->
        <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
        <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
    

    业务类注入Spring Bean后,直接使用即可

     @Resource
     private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
    
     
     Runnable asyncTask = ()->{...};
     CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
    

    4.小结

    本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。


    文章数
    2
    阅读量
    840

    作者其他文章

    01 Orika JavaBean映射工具使用