当前位置:首页 > 技术 > 正文内容

基于 .NET Core 的轻量级流控服务开源实现

访客 技术 2026年7月4日 2

此前我们讨论过流控服务的应用场景、常见方案和核心算法。经过一段时间研发和测试,该服务已在生产系统中运行超过半年,表现稳定。本文将重点分享其核心设计与实现思路,并开源至 GitHub。

GitHub 地址:https://github.com/zhouguoqing/FlowControl

一、令牌桶算法实现

令牌桶算法基础流程:系统以固定速率(如 QPS=100 时每 10ms)向桶中注入令牌。桶满则停止添加。请求抵达时,必须消费令牌;若无可用令牌,则请求被阻塞或拒绝。令牌注入速率可动态调整,从而实时控制处理速率。

关键属性包括桶容量时间间隔,核心操作则有消费令牌定期重置

以下为抽象类 TokenBucket

using System;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;

    public abstract class TokenBucket : IThrottleStrategy
    {
        protected long bucketTokenCapacity;
        private static readonly object syncObj = new object();
        protected readonly long ticksRefillInterval;
        protected long nextRefillTime;
        protected long tokens;

        protected TokenBucket(long capacity, long interval, long intervalMs)
        {
            if (capacity <= 0) throw new ArgumentOutOfRangeException("capacity");
            if (interval < 0) throw new ArgumentOutOfRangeException("interval");
            if (intervalMs <= 0) throw new ArgumentOutOfRangeException("intervalMs");

            bucketTokenCapacity = capacity;
            ticksRefillInterval = TimeSpan.FromMilliseconds(interval * intervalMs).Ticks;
        }

        public bool ShouldThrottle(long n = 1)
        {
            TimeSpan wait;
            return ShouldThrottle(n, out wait);
        }

        public bool ShouldThrottle(long n, out TimeSpan waitTime)
        {
            if (n <= 0) throw new ArgumentOutOfRangeException("n");

            lock (syncObj)
            {
                RefillTokens();
                if (tokens < n)
                {
                    var timeToNext = nextRefillTime - SystemTime.UtcNow.Ticks;
                    if (timeToNext < 0) return ShouldThrottle(n, out waitTime);

                    waitTime = TimeSpan.FromTicks(timeToNext);
                    return true;
                }
                tokens -= n;
                waitTime = TimeSpan.Zero;
                return false;
            }
        }

        protected abstract void RefillTokens();

        public bool ShouldThrottle(out TimeSpan waitTime)
        {
            return ShouldThrottle(1, out waitTime);
        }

        public long CurrentTokenCount
        {
            get
            {
                lock (syncObj)
                {
                    RefillTokens();
                    return tokens;
                }
            }
        }
    }
}

抽象方法 RefillTokens 允许子类自定义令牌补充逻辑。以下是 固定令牌桶 的实现:

class FixedTokenBucket : TokenBucket
{
    public FixedTokenBucket(long maxTokens, long interval, long intervalMs)
        : base(maxTokens, interval, intervalMs) { }

    protected override void RefillTokens()
    {
        var now = SystemTime.UtcNow.Ticks;
        if (now < nextRefillTime) return;

        tokens = bucketTokenCapacity;
        nextRefillTime = now + ticksRefillInterval;
    }
}

ShouldThrottle 中,通过锁确保线程安全,同时检查是否需要重置桶。成功消费后令牌数递减;令牌不足时返回距下次重置的等待时间。

二、漏桶算法实现

漏桶算法将请求比作水,桶以恒定速率出水(响应速率)。若水流过快则溢出(即拒绝请求),从而强制限制传输速率。
关键参数包括 桶大小(允许突发流量)和 漏洞大小(恒定流出速率)。

漏桶抽象类 LeakyTokenBucket 继承自 TokenBucket,主要差异在于重置方式,新增两个属性:stepTokens(每步流出的令牌数)和 ticksStepInterval(步长间隔)。

abstract class LeakyTokenBucket : TokenBucket
{
    protected readonly long stepTokens;
    protected long ticksStepInterval;

    protected LeakyTokenBucket(long maxTokens, long interval, int intervalMs,
        long stepTokens, long stepInterval, int stepIntervalMs)
        : base(maxTokens, interval, intervalMs)
    {
        this.stepTokens = stepTokens;
        if (stepInterval < 0) throw new ArgumentOutOfRangeException("stepInterval");
        if (stepTokens < 0) throw new ArgumentOutOfRangeException("stepTokens");
        if (stepIntervalMs <= 0) throw new ArgumentOutOfRangeException("stepIntervalMs");

        ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalMs).Ticks;
    }
}

漏桶的具体实现分为 满桶(初始即装满)和 空桶(逐步填充)。

满桶实现

class StepDownTokenBucket : LeakyTokenBucket
{
    public StepDownTokenBucket(long maxTokens, long interval, int intervalMs,
        long stepTokens, long stepInterval, int stepIntervalMs)
        : base(maxTokens, interval, intervalMs, stepTokens, stepInterval, stepIntervalMs) { }

    protected override void RefillTokens()
    {
        var now = SystemTime.UtcNow.Ticks;
        if (now >= nextRefillTime)
        {
            tokens = bucketTokenCapacity;
            nextRefillTime = now + ticksRefillInterval;
            return;
        }

        var timeToNext = nextRefillTime - now;
        var steps = timeToNext / ticksStepInterval;
        var maxPossible = steps * stepTokens;
        if ((timeToNext % ticksStepInterval) > 0) maxPossible += stepTokens;
        if (maxPossible < tokens) tokens = maxPossible;
    }
}

空桶实现

class StepUpLeakyTokenBucket : LeakyTokenBucket
{
    private long lastActiveTime;

    public StepUpLeakyTokenBucket(long maxTokens, long interval, int intervalMs,
        long stepTokens, long stepInterval, int stepIntervalMs)
        : base(maxTokens, interval, intervalMs, stepTokens, stepInterval, stepIntervalMs) { }

    protected override void RefillTokens()
    {
        var now = SystemTime.UtcNow.Ticks;
        if (now >= nextRefillTime)
        {
            tokens = stepTokens;
            lastActiveTime = now;
            nextRefillTime = now + ticksRefillInterval;
            return;
        }

        var elapsed = now - lastActiveTime;
        var steps = elapsed / ticksStepInterval;
        tokens += steps * stepTokens;
        if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;
        lastActiveTime = now;
    }
}

三、流控服务封装

接口 IThrottleStrategy 定义了流控核心方法:

public interface IThrottleStrategy
{
    bool ShouldThrottle(long n = 1);
    bool ShouldThrottle(long n, out TimeSpan waitTime);
    bool ShouldThrottle(out TimeSpan waitTime);
    long CurrentTokenCount { get; }
}

策略配置类 FlowControlStrategy 定义了流控维度、阈值等信息:

public class FlowControlStrategy
{
    public string ID { get; set; }
    public string Name { get; set; }
    public FlowControlStrategyType StrategyType { get; set; }
    public long IntThreshold { get; set; }
    public decimal DoubleThreshold { get; set; }
    public FlowControlTimespan TimeSpan { get; set; }
    public Dictionary<string, string> FlowControlConfigs { get; set; } = new Dictionary<string, string>();
    public string Descriptions { get; set; }
    public bool IsRefusedRequest { get; set; }
    public DateTime CreateTime { get; set; }
    public string Creator { get; set; }
    public DateTime LastModifyTime { get; set; }
    public string LastModifier { get; set; }
}

策略类型枚举 FlowControlStrategyType 支持 TPS、Sum(限定时间内总次数)和 Delay:

public enum FlowControlStrategyType { TPS, Sum, Delay }

每种策略对应一个流控器。例如 TPS 控制器使用固定令牌桶:

class TPSFlowController : IFlowController
{
    public IThrottleStrategy InnerThrottleStrategy { get; private set; }
    public FlowControlStrategy FlowControlStrategy { get; private set; }

    public bool ShouldThrottle(long n, out TimeSpan waitTime)
        => InnerThrottleStrategy.ShouldThrottle(n, out waitTime);

    public TPSFlowController(FlowControlStrategy strategy)
    {
        FlowControlStrategy = strategy;
        InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, 1, 1000);
    }
}

Sum 控制器支持按秒、分、时、日重置桶:

class SumFlowController : IFlowController
{
    public IThrottleStrategy InnerThrottleStrategy { get; private set; }
    public FlowControlStrategy FlowControlStrategy { get; private set; }

    public bool ShouldThrottle(long n, out TimeSpan waitTime)
        => InnerThrottleStrategy.ShouldThrottle(n, out waitTime);

    public SumFlowController(FlowControlStrategy strategy)
    {
        FlowControlStrategy = strategy;
        var interval = GetRefillInterval(strategy);
        InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, interval, 1000);
    }

    private long GetRefillInterval(FlowControlStrategy strategy)
    {
        return strategy.TimeSpan switch
        {
            FlowControlTimespan.Second => 1,
            FlowControlTimespan.Minute => 60,
            FlowControlTimespan.Hour => 3600,
            FlowControlTimespan.Day => 86400,
            _ => 0
        };
    }
}

通过工厂类 FlowControllerFactory 创建并缓存流控器:

class FlowControllerFactory
{
    private static Dictionary<string, IFlowController> _controllers = new Dictionary<string, IFlowController>();
    private static readonly object _lock = new object();
    private static readonly FlowControllerFactory _instance = new FlowControllerFactory();

    private FlowControllerFactory() { }

    public static FlowControllerFactory Instance => _instance;

    public IFlowController GetOrCreate(FlowControlStrategy strategy)
    {
        if (strategy == null) throw new ArgumentNullException(nameof(strategy));

        if (!_controllers.ContainsKey(strategy.ID))
        {
            lock (_lock)
            {
                if (!_controllers.ContainsKey(strategy.ID))
                {
                    var controller = Create(strategy);
                    if (controller != null) _controllers[strategy.ID] = controller;
                }
            }
        }

        return _controllers.TryGetValue(strategy.ID, out var ctrl) ? ctrl : null;
    }

    private IFlowController Create(FlowControlStrategy strategy)
    {
        return strategy.StrategyType switch
        {
            FlowControlStrategyType.TPS => new TPSFlowController(strategy),
            FlowControlStrategyType.Delay => new DelayFlowController(strategy),
            FlowControlStrategyType.Sum => new SumFlowController(strategy),
            _ => null
        };
    }
}

最终通过 FlowControlService 对外提供统一入口,内部决定等待或拒绝:

public class FlowControlService
{
    public static void FlowControl(FlowControlStrategy strategy, int count = 1)
    {
        var controller = FlowControllerFactory.Instance.GetOrCreate(strategy);
        TimeSpan wait;
        if (controller.ShouldThrottle(count, out wait))
        {
            if (!strategy.IsRefusedRequest && wait != TimeSpan.Zero)
            {
                WaitUntilAvailable(strategy, controller, wait, count);
            }
            else if (strategy.IsRefusedRequest)
            {
                throw new Exception("触发流控!");
            }
        }
    }

    private static void WaitUntilAvailable(FlowControlStrategy strategy, IFlowController controller, TimeSpan wait, int count)
    {
        var delay = wait;
        if (strategy.StrategyType == FlowControlStrategyType.Delay)
        {
            Thread.Sleep(delay);
            return;
        }

        while (controller.ShouldThrottle(count, out delay))
        {
            Thread.Sleep(delay);
        }
    }
}

四、使用示例

通过 Nuget 安装包:

Install-Package CZ.FlowControl.Service -Version 1.0.0

之后即可在代码中定义策略并调用流控服务。详细代码及更多示例请参见 GitHub 仓库。

欢迎使用和贡献改进。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。