java自定义可动态修改任务延迟周期的任务线程池
概述
业务上需要通过在 nacos 上改配置的方式实现修改某些定时任务的延迟周期,比如同步第三方数据的定时任务,服务启动时创建的是每 30 分钟同步一次数据的延迟任务,后面因为数据的更新频率变快,需要缩短延迟时间,需要改成 15 分钟同步一次,按照我们以往使用的 ScheduledThreadPoolExecutor,只能改代码后再打包部署。
当然,使用 xxl-job 这类的定时任务框架可以实现动态修改延迟周期,但如果不想引入这类第三方框架,而只需要满足业务需求即可的话,可以用本文的方法来实现。
原理
ScheduledThreadPoolExecutor 无法修改已经提交了的任务的延迟周期,想要动态调整任务的延迟周期,无非就是删掉原来的任务,重新向 ScheduledThreadPoolExecutor 提交一个任务。我们业务上需要针对某个或某些任务修改延迟周期,所以我们还得将任务分组,方便我们批量将符合条件的任务延迟周期进行修改。
基于这个逻辑,我们封装一个 DynamicScheduledThreadPoolExecutor 替代 ScheduledThreadPoolExecutor,里面要包含一个 ScheduledThreadPoolExecutor,要根据组名将任务分组;再封装一个 Task 类,里面要包含有所在分组、任务执行逻辑(Runnable)。
DynamicScheduledThreadPoolExecutor 对外暴露 scheduleWithFixedDelay、scheduleAtFixedRate 这两个方法,同时还要暴露 resetPeriodByGroup(根据分组重设延迟周期)、resetPeriod(针对某个 task 重设延迟周期)。
最关键的是 resetPeriodByGroup、resetPeriod 这两个方法,但里面的逻辑很简单,就是将原本的任务停止,再创建新的任务,通过封装的 Task 类,对调用者来说,重设前和重设后还是原来的任务,但其实内部已经不是同一个任务了。
实现
package com.kk.util;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
/**
* 可以动态调整延迟周期的定时任务线程池
*
* @author: ky
*/
@Slf4j
public class DynamicScheduledThreadPoolExecutor {
private static final int TASK_TYPE_FIX_RATE = 1;
private static final int TASK_TYPE_FIX_DELAY = 2;
private final ScheduledThreadPoolExecutor executor;
private final Set<Task> allTaskSet = ConcurrentHashMap.newKeySet();
private final Map<String, Set<Task>> taskGroupMap = new ConcurrentHashMap<>();
public DynamicScheduledThreadPoolExecutor(int corePoolSize) {
executor = new ScheduledThreadPoolExecutor(corePoolSize);
}
public DynamicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public DynamicScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
executor = new ScheduledThreadPoolExecutor(corePoolSize, handler);
}
public DynamicScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
executor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler);
}
/**
* 根据分组来重设延迟周期
*
* @param group 分组
* @param period 周期
* @param unit 时间单位
*/
public synchronized void resetPeriodByGroup(String group, long period, TimeUnit unit) {
Set<Task> tasks = taskGroupMap.get(group);
if (tasks == null || tasks.isEmpty()) {
return;
}
log.info("重设延迟周期,group={},taskCount={},period={},unit={}", group, tasks.size(), period, unit);
for (Task task : tasks) {
task.setPeriod(period);
task.setUnit(unit);
ScheduledFuture<?> scheduledFuture = task.getScheduledFuture();
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
ScheduledFuture<?> newFuture;
if (TASK_TYPE_FIX_DELAY == task.getTaskType()) {
newFuture = executor.scheduleWithFixedDelay(task, task.getPeriod(), task.getPeriod(), task.getUnit());
} else if (TASK_TYPE_FIX_RATE == task.getTaskType()) {
newFuture = executor.scheduleAtFixedRate(task, task.getPeriod(), task.getPeriod(), task.getUnit());
} else {
throw new IllegalArgumentException("taskType error");
}
task.setScheduledFuture(newFuture);
}
}
/**
* 重设某个任务延迟周期
*
* @param task 任务
* @param period 周期
* @param unit 时间单位
*/
public void resetPeriod(Task task, long period, TimeUnit unit) {
if (task == null || !allTaskSet.contains(task)) {
return;
}
task.setPeriod(period);
task.setUnit(unit);
ScheduledFuture<?> scheduledFuture = task.getScheduledFuture();
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
ScheduledFuture<?> newFuture;
if (TASK_TYPE_FIX_DELAY == task.getTaskType()) {
newFuture = executor.scheduleWithFixedDelay(task, task.getPeriod(), task.getPeriod(), task.getUnit());
} else if (TASK_TYPE_FIX_RATE == task.getTaskType()) {
newFuture = executor.scheduleAtFixedRate(task, task.getPeriod(), task.getPeriod(), task.getUnit());
} else {
throw new IllegalArgumentException("taskType error");
}
task.setScheduledFuture(newFuture);
}
/**
* 移除任务
*
* @param task 任务
*/
public void remove(Task task) {
if (task == null || !allTaskSet.contains(task)) {
return;
}
allTaskSet.remove(task);
String group = task.getGroup();
Set<Task> taskSet = taskGroupMap.get(group);
if (taskSet != null) {
taskSet.remove(task);
}
executor.remove(task);
}
public void shutdown() {
executor.shutdown();
}
public void shutdownNow() {
executor.shutdownNow();
}
/**
* 创建固定时间延迟执行的任务
*
* @param group 分组
* @param command 命令
* @param initialDelay 初始延迟
* @param delay 延迟
* @param unit 时间单位
* @return 封装后的任务
*/
public Task scheduleWithFixedDelay(String group,
Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
Task task = new Task(command, group, initialDelay, delay, unit);
scheduleWithFixedDelay(task);
return task;
}
/**
* 创建固定时间频率执行的任务
*
* @param group 分组
* @param command 命令
* @param initialDelay 初始延迟
* @param period 执行周期
* @param unit 时间单位
* @return 封装后的任务
*/
public Task scheduleAtFixedRate(String group,
Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
Task task = new Task(command, group, initialDelay, period, unit);
scheduleAtFixedRate(task);
return task;
}
public void scheduleWithFixedDelay(Task task) {
Set<Task> taskSet = taskGroupMap.computeIfAbsent(task.getGroup(), k -> ConcurrentHashMap.newKeySet());
if (taskSet.contains(task)) {
return;
}
ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getPeriod(), task.getUnit());
task.setThreadPool(this);
task.setTaskType(TASK_TYPE_FIX_DELAY);
task.setScheduledFuture(scheduledFuture);
taskSet.add(task);
allTaskSet.add(task);
}
public void scheduleAtFixedRate(Task task) {
Set<Task> taskSet = taskGroupMap.computeIfAbsent(task.getGroup(), k -> ConcurrentHashMap.newKeySet());
if (taskSet.contains(task)) {
return;
}
ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(task, task.getInitialDelay(), task.getPeriod(), task.getUnit());
task.setThreadPool(this);
task.setTaskType(TASK_TYPE_FIX_RATE);
task.setScheduledFuture(scheduledFuture);
taskSet.add(task);
allTaskSet.add(task);
}
@Getter
public static class Task implements Runnable {
private final Runnable command;
private final String group;
private final long initialDelay;
private long period;
private TimeUnit unit;
private int taskType;
private DynamicScheduledThreadPoolExecutor threadPool;
private ScheduledFuture<?> scheduledFuture;
public Task(Runnable command, String group, long initialDelay, long period, TimeUnit unit) {
this.command = command;
this.group = group;
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
}
@Override
public void run() {
if (command != null) {
command.run();
}
}
void setPeriod(long period) {
this.period = period;
}
void setUnit(TimeUnit unit) {
this.unit = unit;
}
void setTaskType(int taskType) {
this.taskType = taskType;
}
void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
void setThreadPool(DynamicScheduledThreadPoolExecutor threadPool) {
this.threadPool = threadPool;
}
String getGroup() {
return group;
}
}
}
演示使用
这里用读取文件配置的方式演示修改任务延迟周期。
在 D:\tmp\ 目录下创建两个文件(group01-delay.txt、group02-delay.txt),用 hutool 的 WatchMonitor 文件监听工具监听文件的变化,有变化就读取文件内新的延迟时间
package com.kk.demo;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.watch.SimpleWatcher;
import cn.hutool.core.io.watch.WatchMonitor;
import cn.hutool.core.io.watch.watchers.DelayWatcher;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.util.concurrent.TimeUnit;
public class App {
public static void main(String[] args) {
DynamicScheduledThreadPool executor = new DynamicScheduledThreadPool(5);
// task01,延迟2秒执行一次
executor.scheduleWithFixedDelay(new DynamicScheduledThreadPool.Task(() -> {
System.out.println("task01 -> " + DateUtil.now());
}, "group01", 0, 2, TimeUnit.SECONDS));
// task02,延迟5秒执行一次
executor.scheduleWithFixedDelay(new DynamicScheduledThreadPool.Task(() -> {
System.out.println("task02 -> " + DateUtil.now());
}, "group02", 0, 5, TimeUnit.SECONDS));
WatchMonitor watchMonitor = WatchMonitor.create(new File("d:\\tmp"), WatchMonitor.ENTRY_MODIFY);
watchMonitor.setWatcher(new DelayWatcher(new SimpleWatcher() {
@Override
public void onModify(WatchEvent<?> watchEvent, Path path) {
System.out.println("------> 文件修改了 " + path);
Object context = watchEvent.context();
if (context.toString().endsWith("group01-delay.txt")) {
String s = FileUtil.readUtf8String("d:\\tmp\\group02-delay.txt");
System.out.println("------> group01修改了时间间隔为 " + s);
executor.resetPeriodByGroup("group01", Long.parseLong(s), TimeUnit.SECONDS);
}
if (context.toString().endsWith("group02-delay.txt")) {
String s = FileUtil.readUtf8String("d:\\tmp\\group02-delay.txt");
System.out.println("------> group02修改了时间间隔为 " + s);
executor.resetPeriodByGroup("group02", Long.parseLong(s), TimeUnit.SECONDS);
}
}
}, 300));
watchMonitor.setMaxDepth(3);
watchMonitor.start();
}
}
配合 nacos 使用
对于 springboot + nacos 的微服务项目,可以通过监听配置改变的事件来触发调用 DynamicScheduledThreadPool 的 resetPeriodByGroup 方法。
配置类
@Data
@Configuration
@ConfigurationProperties("dynamic-scheduled")
public class DynamicPeriodConfig {
/**
* 分组A的延迟周期,单位:s
*/
private Integer periodForGroupA = 1800;
/**
* 分组B的延迟周期,单位:s
*/
private Integer periodForGroupB = 1800;
}
创建 DynamicScheduledThreadPool 的 Bean
@Configuration
public class ThreadPoolConfig {
int cpuNum = Runtime.getRuntime().availableProcessors();
@Bean
public DynamicScheduledThreadPoolExecutor dynamicScheduledExecutor() {
return new DynamicScheduledThreadPoolExecutor(
cpuNum,
new NamedThreadFactory("dynamic_scheduled_", true),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
监听 nacos 配置变化
@Slf4j
@Configuration
public class SyyThreadExecutorRefreshConfig {
// 记录配置项对应的任务分组名 {配置项 -> 任务组名}
private static final Map<String, String> configKeyToGroupMap = new HashMap<>();
@Autowired
private DynamicScheduledThreadPoolExecutor dynamicScheduledExecutor;
@Autowired
private DynamicPeriodConfig dynamicPeriodConfig;
static {
configKeyToGroupMap.put("dynamic-scheduled.periodForGroupA", "periodForGroupA");
configKeyToGroupMap.put("dynamic-scheduled.periodForGroupB", "periodForGroupB");
}
@EventListener(EnvironmentChangeEvent.class)
public void setEnvironmentChangeEvent(EnvironmentChangeEvent environmentChangeEvent) {
for (String key : environmentChangeEvent.getKeys()) {
if (configKeyToGroupMap.containsKey(key)) {
String group = configKeyToGroupMap.get(key);
int pos = key.lastIndexOf('.');
String delayFieldName = key.substring(pos + 1);
String methodName = "get" + delayFieldName.substring(0, 1).toUpperCase() + delayFieldName.substring(1);
Integer delay;
try {
delay = (Integer) (dynamicPeriodConfig.getClass().getMethod(methodName).invoke(dynamicPeriodConfig));
} catch (Exception e) {
log.error("反射调用dynamicPeriodConfig.{}()失败,{}", methodName, e.getMessage(), e);
continue;
}
if (delay != null && delay > 0) {
// 重设延迟时间
dynamicScheduledExecutor.resetPeriodByGroup(group, delay, TimeUnit.SECONDS);
}
}
}
}
}