概述

业务上需要通过在 nacos 上改配置的方式实现修改某些定时任务的延迟周期,比如同步第三方数据的定时任务,服务启动时创建的是每 30 分钟同步一次数据的延迟任务,后面因为数据的更新频率变快,需要缩短延迟时间,需要改成 15 分钟同步一次,按照我们以往使用的 ScheduledThreadPoolExecutor,只能改代码后再打包部署。

当然,使用 xxl-job 这类的定时任务框架可以实现动态修改延迟周期,但如果不想引入这类第三方框架,而只需要满足业务需求即可的话,可以用本文的方法来实现。

原理

ScheduledThreadPoolExecutor 无法修改已经提交了的任务的延迟周期,想要动态调整任务的延迟周期,无非就是删掉原来的任务,重新向 ScheduledThreadPoolExecutor 提交一个任务。我们业务上需要针对某个或某些任务修改延迟周期,所以我们还得将任务分组,方便我们批量将符合条件的任务延迟周期进行修改。

基于这个逻辑,我们封装一个 DynamicScheduledThreadPoolExecutor 替代 ScheduledThreadPoolExecutor,里面要包含一个 ScheduledThreadPoolExecutor,要根据组名将任务分组;再封装一个 Task 类,里面要包含有所在分组、任务执行逻辑(Runnable)。

DynamicScheduledThreadPoolExecutor 对外暴露 scheduleWithFixedDelay、scheduleAtFixedRate 这两个方法,同时还要暴露 resetPeriodByGroup(根据分组重设延迟周期)、resetPeriod(针对某个 task 重设延迟周期)。

最关键的是 resetPeriodByGroup、resetPeriod 这两个方法,但里面的逻辑很简单,就是将原本的任务停止,再创建新的任务,通过封装的 Task 类,对调用者来说,重设前和重设后还是原来的任务,但其实内部已经不是同一个任务了。

实现

package com.kk.util;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

/**
 * 可以动态调整延迟周期的定时任务线程池
 *
 * @author: ky
 */
@Slf4j
public class DynamicScheduledThreadPoolExecutor {

    private static final int TASK_TYPE_FIX_RATE = 1;
    private static final int TASK_TYPE_FIX_DELAY = 2;

    private final ScheduledThreadPoolExecutor executor;

    private final Set<Task> allTaskSet = ConcurrentHashMap.newKeySet();

    private final Map<String, Set<Task>> taskGroupMap = new ConcurrentHashMap<>();


    public DynamicScheduledThreadPoolExecutor(int corePoolSize) {
        executor = new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public DynamicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
        executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

    public DynamicScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
        executor = new ScheduledThreadPoolExecutor(corePoolSize, handler);
    }

    public DynamicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler);
    }

    /**
     * 根据分组来重设延迟周期
     *
     * @param group  分组
     * @param period 周期
     * @param unit   时间单位
     */
    public synchronized void resetPeriodByGroup(String group, long period, TimeUnit unit) {
        Set<Task> tasks = taskGroupMap.get(group);
        if (tasks == null || tasks.isEmpty()) {
            return;
        }
        log.info("重设延迟周期,group={},taskCount={},period={},unit={}", group, tasks.size(), period, unit);
        for (Task task : tasks) {
            task.setPeriod(period);
            task.setUnit(unit);
            ScheduledFuture<?> scheduledFuture = task.getScheduledFuture();
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
            ScheduledFuture<?> newFuture;
            if (TASK_TYPE_FIX_DELAY == task.getTaskType()) {
                newFuture = executor.scheduleWithFixedDelay(task, task.getPeriod(), task.getPeriod(), task.getUnit());
            } else if (TASK_TYPE_FIX_RATE == task.getTaskType()) {
                newFuture = executor.scheduleAtFixedRate(task, task.getPeriod(), task.getPeriod(), task.getUnit());
            } else {
                throw new IllegalArgumentException("taskType error");
            }

            task.setScheduledFuture(newFuture);
        }
    }

    /**
     * 重设某个任务延迟周期
     *
     * @param task   任务
     * @param period 周期
     * @param unit   时间单位
     */
    public void resetPeriod(Task task, long period, TimeUnit unit) {
        if (task == null || !allTaskSet.contains(task)) {
            return;
        }
        task.setPeriod(period);
        task.setUnit(unit);
        ScheduledFuture<?> scheduledFuture = task.getScheduledFuture();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        ScheduledFuture<?> newFuture;
        if (TASK_TYPE_FIX_DELAY == task.getTaskType()) {
            newFuture = executor.scheduleWithFixedDelay(task, task.getPeriod(), task.getPeriod(), task.getUnit());
        } else if (TASK_TYPE_FIX_RATE == task.getTaskType()) {
            newFuture = executor.scheduleAtFixedRate(task, task.getPeriod(), task.getPeriod(), task.getUnit());
        } else {
            throw new IllegalArgumentException("taskType error");
        }
        task.setScheduledFuture(newFuture);
    }

    /**
     * 移除任务
     *
     * @param task 任务
     */
    public void remove(Task task) {
        if (task == null || !allTaskSet.contains(task)) {
            return;
        }
        allTaskSet.remove(task);
        String group = task.getGroup();
        Set<Task> taskSet = taskGroupMap.get(group);
        if (taskSet != null) {
            taskSet.remove(task);
        }
        executor.remove(task);
    }

    public void shutdown() {
        executor.shutdown();
    }

    public void shutdownNow() {
        executor.shutdownNow();
    }

    /**
     * 创建固定时间延迟执行的任务
     *
     * @param group        分组
     * @param command      命令
     * @param initialDelay 初始延迟
     * @param delay        延迟
     * @param unit         时间单位
     * @return 封装后的任务
     */
    public Task scheduleWithFixedDelay(String group,
                                       Runnable command,
                                       long initialDelay,
                                       long delay,
                                       TimeUnit unit) {
        Task task = new Task(command, group, initialDelay, delay, unit);
        scheduleWithFixedDelay(task);
        return task;
    }

    /**
     * 创建固定时间频率执行的任务
     *
     * @param group        分组
     * @param command      命令
     * @param initialDelay 初始延迟
     * @param period       执行周期
     * @param unit         时间单位
     * @return 封装后的任务
     */
    public Task scheduleAtFixedRate(String group,
                                    Runnable command,
                                    long initialDelay,
                                    long period,
                                    TimeUnit unit) {
        Task task = new Task(command, group, initialDelay, period, unit);
        scheduleAtFixedRate(task);
        return task;
    }

    public void scheduleWithFixedDelay(Task task) {
        Set<Task> taskSet = taskGroupMap.computeIfAbsent(task.getGroup(), k -> ConcurrentHashMap.newKeySet());
        if (taskSet.contains(task)) {
            return;
        }
        ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getPeriod(), task.getUnit());
        task.setThreadPool(this);
        task.setTaskType(TASK_TYPE_FIX_DELAY);
        task.setScheduledFuture(scheduledFuture);

        taskSet.add(task);
        allTaskSet.add(task);
    }

    public void scheduleAtFixedRate(Task task) {
        Set<Task> taskSet = taskGroupMap.computeIfAbsent(task.getGroup(), k -> ConcurrentHashMap.newKeySet());
        if (taskSet.contains(task)) {
            return;
        }
        ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(task, task.getInitialDelay(), task.getPeriod(), task.getUnit());
        task.setThreadPool(this);
        task.setTaskType(TASK_TYPE_FIX_RATE);
        task.setScheduledFuture(scheduledFuture);

        taskSet.add(task);
        allTaskSet.add(task);
    }

    @Getter
    public static class Task implements Runnable {

        private final Runnable command;

        private final String group;

        private final long initialDelay;

        private long period;

        private TimeUnit unit;

        private int taskType;

        private DynamicScheduledThreadPoolExecutor threadPool;

        private ScheduledFuture<?> scheduledFuture;

        public Task(Runnable command, String group, long initialDelay, long period, TimeUnit unit) {
            this.command = command;
            this.group = group;
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
        }

        @Override
        public void run() {
            if (command != null) {
                command.run();
            }
        }

        void setPeriod(long period) {
            this.period = period;
        }

        void setUnit(TimeUnit unit) {
            this.unit = unit;
        }

        void setTaskType(int taskType) {
            this.taskType = taskType;
        }

        void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
            this.scheduledFuture = scheduledFuture;
        }

        void setThreadPool(DynamicScheduledThreadPoolExecutor threadPool) {
            this.threadPool = threadPool;
        }

        String getGroup() {
            return group;
        }
    }
}

演示使用

这里用读取文件配置的方式演示修改任务延迟周期。

在 D:\tmp\ 目录下创建两个文件(group01-delay.txt、group02-delay.txt),用 hutool 的 WatchMonitor​ 文件监听工具监听文件的变化,有变化就读取文件内新的延迟时间

package com.kk.demo;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.watch.SimpleWatcher;
import cn.hutool.core.io.watch.WatchMonitor;
import cn.hutool.core.io.watch.watchers.DelayWatcher;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.util.concurrent.TimeUnit;

public class App {

    public static void main(String[] args) {
        DynamicScheduledThreadPool executor = new DynamicScheduledThreadPool(5);
        // task01,延迟2秒执行一次
        executor.scheduleWithFixedDelay(new DynamicScheduledThreadPool.Task(() -> {
            System.out.println("task01 -> " + DateUtil.now());
        }, "group01", 0, 2, TimeUnit.SECONDS));

        // task02,延迟5秒执行一次
        executor.scheduleWithFixedDelay(new DynamicScheduledThreadPool.Task(() -> {
            System.out.println("task02 -> " + DateUtil.now());
        }, "group02", 0, 5, TimeUnit.SECONDS));

        WatchMonitor watchMonitor = WatchMonitor.create(new File("d:\\tmp"), WatchMonitor.ENTRY_MODIFY);
        watchMonitor.setWatcher(new DelayWatcher(new SimpleWatcher() {
            @Override
            public void onModify(WatchEvent<?> watchEvent, Path path) {
                System.out.println("------> 文件修改了 " + path);
                Object context = watchEvent.context();
                if (context.toString().endsWith("group01-delay.txt")) {
                    String s = FileUtil.readUtf8String("d:\\tmp\\group02-delay.txt");
                    System.out.println("------> group01修改了时间间隔为 " + s);
                    executor.resetPeriodByGroup("group01", Long.parseLong(s), TimeUnit.SECONDS);
                }
                if (context.toString().endsWith("group02-delay.txt")) {
                    String s = FileUtil.readUtf8String("d:\\tmp\\group02-delay.txt");
                    System.out.println("------> group02修改了时间间隔为 " + s);
                    executor.resetPeriodByGroup("group02", Long.parseLong(s), TimeUnit.SECONDS);
                }
            }
        }, 300));
        watchMonitor.setMaxDepth(3);
        watchMonitor.start();
    }
}

配合 nacos 使用

对于 springboot + nacos 的微服务项目,可以通过监听配置改变的事件来触发调用 DynamicScheduledThreadPool 的 resetPeriodByGroup 方法。

配置类

@Data
@Configuration
@ConfigurationProperties("dynamic-scheduled")
public class DynamicPeriodConfig {

    /**
     * 分组A的延迟周期,单位:s
     */
    private Integer periodForGroupA = 1800;

    /**
     * 分组B的延迟周期,单位:s
     */
    private Integer periodForGroupB = 1800;

}

创建 DynamicScheduledThreadPool 的 Bean

@Configuration
public class ThreadPoolConfig {

    int cpuNum = Runtime.getRuntime().availableProcessors();

    @Bean
    public DynamicScheduledThreadPoolExecutor dynamicScheduledExecutor() {
        return new DynamicScheduledThreadPoolExecutor(
                cpuNum,
                new NamedThreadFactory("dynamic_scheduled_", true),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

}

监听 nacos 配置变化

@Slf4j
@Configuration
public class SyyThreadExecutorRefreshConfig {

    // 记录配置项对应的任务分组名 {配置项 -> 任务组名}
    private static final Map<String, String> configKeyToGroupMap = new HashMap<>();

    @Autowired
    private DynamicScheduledThreadPoolExecutor dynamicScheduledExecutor;

    @Autowired
    private DynamicPeriodConfig dynamicPeriodConfig;

    static {
        configKeyToGroupMap.put("dynamic-scheduled.periodForGroupA", "periodForGroupA");
        configKeyToGroupMap.put("dynamic-scheduled.periodForGroupB", "periodForGroupB");
    }

    @EventListener(EnvironmentChangeEvent.class)
    public void setEnvironmentChangeEvent(EnvironmentChangeEvent environmentChangeEvent) {
        for (String key : environmentChangeEvent.getKeys()) {
            if (configKeyToGroupMap.containsKey(key)) {
                String group = configKeyToGroupMap.get(key);

                int pos = key.lastIndexOf('.');
                String delayFieldName = key.substring(pos + 1);
                String methodName = "get" + delayFieldName.substring(0, 1).toUpperCase() + delayFieldName.substring(1);
                Integer delay;
                try {
                    delay = (Integer) (dynamicPeriodConfig.getClass().getMethod(methodName).invoke(dynamicPeriodConfig));
                } catch (Exception e) {
                    log.error("反射调用dynamicPeriodConfig.{}()失败,{}", methodName, e.getMessage(), e);
                    continue;
                }

                if (delay != null && delay > 0) {
                    // 重设延迟时间
                    dynamicScheduledExecutor.resetPeriodByGroup(group, delay, TimeUnit.SECONDS);
                }
            }
        }
    }
}