当前位置:首页 > 技术 > 正文内容

动态定时任务的实现方案

访客 技术 2026年6月23日 1

在处理动态定时任务时,传统的Scheduled注解无法满足动态配置需求。通过研究发现,可以采用时间轮算法来实现这一功能。时间轮算法通过模拟表盘的方式进行任务调度。

package com.scheduler.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BufferScheduler {
    private Logger logger = LoggerFactory.getLogger(BufferScheduler.class);

    private static final int DEFAULT_BUFFER_SIZE = 60;
    private Object[] bufferRing;
    private int bufferSize;
    private ExecutorService executorService;
    private volatile int taskCount = 0;
    private volatile boolean isStopped = false;
    private volatile AtomicBoolean isRunning = new AtomicBoolean(false);
    private AtomicInteger tickCount = new AtomicInteger();
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private AtomicInteger taskId = new AtomicInteger();
    private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);

    public BufferScheduler(ExecutorService executorService) {
        this(executorService, DEFAULT_BUFFER_SIZE);
    }

    public BufferScheduler(ExecutorService executorService, int bufferSize) {
        this.executorService = executorService;
        this.bufferSize = bufferSize;
        this.bufferRing = new Object[bufferSize];
    }

    public int registerTask(Task task) {
        int key = task.getKey();
        int id;

        try {
            lock.lock();
            int index = calculateIndex(key, bufferSize);
            task.setIndex(index);
            Set<Task> tasks = getTasks(index);

            int cycleNumber = calculateCycleNumber(key, bufferSize);
            if (tasks != null) {
                task.setCycleNumber(cycleNumber);
                tasks.add(task);
            } else {
                task.setIndex(index);
                task.setCycleNumber(cycleNumber);
                Set<Task> newSet = new HashSet<>();
                newSet.add(task);
                setTasks(key, newSet);
            }
            id = taskId.incrementAndGet();
            task.setTaskId(id);
            taskMap.put(id, task);
            taskCount++;
        } finally {
            lock.unlock();
        }

        startScheduler();
        return id;
    }

    public boolean cancelTask(int id) {
        boolean result = false;
        Set<Task> tempTasks = new HashSet<>();

        try {
            lock.lock();
            Task task = taskMap.get(id);
            if (task == null) {
                return false;
            }

            Set<Task> tasks = getTasks(task.getIndex());
            for (Task t : tasks) {
                if (t.getKey() == task.getKey() && t.getCycleNumber() == task.getCycleNumber()) {
                    taskCount--;
                    result = true;
                    taskMap.remove(id);
                } else {
                    tempTasks.add(t);
                }
            }
            bufferRing[task.getIndex()] = tempTasks;
        } finally {
            lock.unlock();
        }

        return result;
    }

    public int getTaskCount() {
        return taskCount;
    }

    private Set<Task> getTasks(int index) {
        return (Set<Task>) bufferRing[index];
    }

    private void setTasks(int key, Set<Task> tasks) {
        int index = calculateIndex(key, bufferSize);
        bufferRing[index] = tasks;
    }

    private int calculateIndex(int target, int mod) {
        target += tickCount.get();
        return target & (mod - 1);
    }

    private int calculateCycleNumber(int target, int mod) {
        return target >> Integer.bitCount(mod - 1);
    }

    private void decreaseTaskCycle(int key) {
        Set<Task> tasks = (Set<Task>) bufferRing[key];
        Set<Task> remainingTasks = new HashSet<>();
        Set<Task> completedTasks = new HashSet<>();

        if (tasks != null) {
            for (Task task : tasks) {
                if (task.getCycleNumber() == 0) {
                    completedTasks.add(task);
                } else {
                    task.setCycleNumber(task.getCycleNumber() - 1);
                    remainingTasks.add(task);
                }
                taskMap.remove(task.getTaskId());
            }
            bufferRing[key] = remainingTasks;
        }

        for (Task task : completedTasks) {
            try {
                executorService.submit(task);
            } catch (Exception e) {
                logger.error("Task execution failed", e);
            }
        }
    }

    private void startScheduler() {
        if (!isRunning.get()) {
            if (isRunning.compareAndSet(false, true)) {
                logger.info("Scheduler has started");
                Thread schedulerThread = new Thread(new TaskTrigger());
                schedulerThread.setName("Scheduler Task Consumer");
                schedulerThread.start();
            }
        }
    }

    private void stopScheduler(boolean force) {
        if (force) {
            logger.info("Scheduler is forced to stop");
            isStopped = true;
            executorService.shutdownNow();
        } else {
            logger.info("Scheduler is stopping");
            if (getTaskCount() > 0) {
                try {
                    lock.lock();
                    condition.await();
                    isStopped = true;
                } catch (InterruptedException e) {
                    logger.error("Interrupted while waiting", e);
                } finally {
                    lock.unlock();
                }
            }
            executorService.shutdown();
        }
    }

    private class TaskTrigger implements Runnable {
        @Override
        public void run() {
            int index = 0;
            while (!isStopped) {
                try {
                    Set<Task> tasks = (Set<Task>) bufferRing[index];
                    if (tasks != null) {
                        for (Task task : tasks) {
                            executorService.submit(task);
                        }
                    }

                    index = (index + 1) % bufferSize;
                    tickCount.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    logger.error("Scheduler error", e);
                }
            }
            logger.info("Scheduler has stopped");
        }
    }
}
package com.scheduler.task;

import lombok.Getter;
import lombok.Setter;

public abstract class Task extends Thread {
    @Getter @Setter private int index;
    @Getter @Setter private int cycleNumber;
    @Getter @Setter private int key;
    @Getter @Setter private int taskId;

    @Override
    public void run() {}
}
public static class TaskJob extends Task {
    @Override
    public void run() {
        System.out.println("Executing task");
    }
}

// Example usage:
SpringApplication.run(TaskProviderApplication.class, args);
BufferScheduler scheduler = new BufferScheduler(Executors.newFixedThreadPool(2));
for (int count = 0; count < 2; count++) {
    Task job = new TaskJob();
    job.setKey(10);
    job.setCycleNumber(count);
    scheduler.registerTask(job);
}

经过一段时间的技术探索,了解到xxl-job这个开源项目()在任务调度领域功能非常完善,确实是一个非常不错的选择。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。