动态定时任务的实现方案
在处理动态定时任务时,传统的Scheduled注解无法满足动态配置需求。通过研究发现,可以采用时间轮算法来实现这一功能。时间轮算法通过模拟表盘的方式进行任务调度。
package com.scheduler.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BufferScheduler {
private Logger logger = LoggerFactory.getLogger(BufferScheduler.class);
private static final int DEFAULT_BUFFER_SIZE = 60;
private Object[] bufferRing;
private int bufferSize;
private ExecutorService executorService;
private volatile int taskCount = 0;
private volatile boolean isStopped = false;
private volatile AtomicBoolean isRunning = new AtomicBoolean(false);
private AtomicInteger tickCount = new AtomicInteger();
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger taskId = new AtomicInteger();
private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);
public BufferScheduler(ExecutorService executorService) {
this(executorService, DEFAULT_BUFFER_SIZE);
}
public BufferScheduler(ExecutorService executorService, int bufferSize) {
this.executorService = executorService;
this.bufferSize = bufferSize;
this.bufferRing = new Object[bufferSize];
}
public int registerTask(Task task) {
int key = task.getKey();
int id;
try {
lock.lock();
int index = calculateIndex(key, bufferSize);
task.setIndex(index);
Set<Task> tasks = getTasks(index);
int cycleNumber = calculateCycleNumber(key, bufferSize);
if (tasks != null) {
task.setCycleNumber(cycleNumber);
tasks.add(task);
} else {
task.setIndex(index);
task.setCycleNumber(cycleNumber);
Set<Task> newSet = new HashSet<>();
newSet.add(task);
setTasks(key, newSet);
}
id = taskId.incrementAndGet();
task.setTaskId(id);
taskMap.put(id, task);
taskCount++;
} finally {
lock.unlock();
}
startScheduler();
return id;
}
public boolean cancelTask(int id) {
boolean result = false;
Set<Task> tempTasks = new HashSet<>();
try {
lock.lock();
Task task = taskMap.get(id);
if (task == null) {
return false;
}
Set<Task> tasks = getTasks(task.getIndex());
for (Task t : tasks) {
if (t.getKey() == task.getKey() && t.getCycleNumber() == task.getCycleNumber()) {
taskCount--;
result = true;
taskMap.remove(id);
} else {
tempTasks.add(t);
}
}
bufferRing[task.getIndex()] = tempTasks;
} finally {
lock.unlock();
}
return result;
}
public int getTaskCount() {
return taskCount;
}
private Set<Task> getTasks(int index) {
return (Set<Task>) bufferRing[index];
}
private void setTasks(int key, Set<Task> tasks) {
int index = calculateIndex(key, bufferSize);
bufferRing[index] = tasks;
}
private int calculateIndex(int target, int mod) {
target += tickCount.get();
return target & (mod - 1);
}
private int calculateCycleNumber(int target, int mod) {
return target >> Integer.bitCount(mod - 1);
}
private void decreaseTaskCycle(int key) {
Set<Task> tasks = (Set<Task>) bufferRing[key];
Set<Task> remainingTasks = new HashSet<>();
Set<Task> completedTasks = new HashSet<>();
if (tasks != null) {
for (Task task : tasks) {
if (task.getCycleNumber() == 0) {
completedTasks.add(task);
} else {
task.setCycleNumber(task.getCycleNumber() - 1);
remainingTasks.add(task);
}
taskMap.remove(task.getTaskId());
}
bufferRing[key] = remainingTasks;
}
for (Task task : completedTasks) {
try {
executorService.submit(task);
} catch (Exception e) {
logger.error("Task execution failed", e);
}
}
}
private void startScheduler() {
if (!isRunning.get()) {
if (isRunning.compareAndSet(false, true)) {
logger.info("Scheduler has started");
Thread schedulerThread = new Thread(new TaskTrigger());
schedulerThread.setName("Scheduler Task Consumer");
schedulerThread.start();
}
}
}
private void stopScheduler(boolean force) {
if (force) {
logger.info("Scheduler is forced to stop");
isStopped = true;
executorService.shutdownNow();
} else {
logger.info("Scheduler is stopping");
if (getTaskCount() > 0) {
try {
lock.lock();
condition.await();
isStopped = true;
} catch (InterruptedException e) {
logger.error("Interrupted while waiting", e);
} finally {
lock.unlock();
}
}
executorService.shutdown();
}
}
private class TaskTrigger implements Runnable {
@Override
public void run() {
int index = 0;
while (!isStopped) {
try {
Set<Task> tasks = (Set<Task>) bufferRing[index];
if (tasks != null) {
for (Task task : tasks) {
executorService.submit(task);
}
}
index = (index + 1) % bufferSize;
tickCount.incrementAndGet();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
logger.error("Scheduler error", e);
}
}
logger.info("Scheduler has stopped");
}
}
}
package com.scheduler.task;
import lombok.Getter;
import lombok.Setter;
public abstract class Task extends Thread {
@Getter @Setter private int index;
@Getter @Setter private int cycleNumber;
@Getter @Setter private int key;
@Getter @Setter private int taskId;
@Override
public void run() {}
}
public static class TaskJob extends Task {
@Override
public void run() {
System.out.println("Executing task");
}
}
// Example usage:
SpringApplication.run(TaskProviderApplication.class, args);
BufferScheduler scheduler = new BufferScheduler(Executors.newFixedThreadPool(2));
for (int count = 0; count < 2; count++) {
Task job = new TaskJob();
job.setKey(10);
job.setCycleNumber(count);
scheduler.registerTask(job);
}
经过一段时间的技术探索,了解到xxl-job这个开源项目()在任务调度领域功能非常完善,确实是一个非常不错的选择。