基于 .NET Core 的轻量级流控服务开源实现
此前我们讨论过流控服务的应用场景、常见方案和核心算法。经过一段时间研发和测试,该服务已在生产系统中运行超过半年,表现稳定。本文将重点分享其核心设计与实现思路,并开源至 GitHub。
GitHub 地址:https://github.com/zhouguoqing/FlowControl
一、令牌桶算法实现
令牌桶算法基础流程:系统以固定速率(如 QPS=100 时每 10ms)向桶中注入令牌。桶满则停止添加。请求抵达时,必须消费令牌;若无可用令牌,则请求被阻塞或拒绝。令牌注入速率可动态调整,从而实时控制处理速率。
关键属性包括桶容量和时间间隔,核心操作则有消费令牌和定期重置。
以下为抽象类 TokenBucket:
using System;
namespace CZ.FlowControl.Service
{
using CZ.FlowControl.Spi;
public abstract class TokenBucket : IThrottleStrategy
{
protected long bucketTokenCapacity;
private static readonly object syncObj = new object();
protected readonly long ticksRefillInterval;
protected long nextRefillTime;
protected long tokens;
protected TokenBucket(long capacity, long interval, long intervalMs)
{
if (capacity <= 0) throw new ArgumentOutOfRangeException("capacity");
if (interval < 0) throw new ArgumentOutOfRangeException("interval");
if (intervalMs <= 0) throw new ArgumentOutOfRangeException("intervalMs");
bucketTokenCapacity = capacity;
ticksRefillInterval = TimeSpan.FromMilliseconds(interval * intervalMs).Ticks;
}
public bool ShouldThrottle(long n = 1)
{
TimeSpan wait;
return ShouldThrottle(n, out wait);
}
public bool ShouldThrottle(long n, out TimeSpan waitTime)
{
if (n <= 0) throw new ArgumentOutOfRangeException("n");
lock (syncObj)
{
RefillTokens();
if (tokens < n)
{
var timeToNext = nextRefillTime - SystemTime.UtcNow.Ticks;
if (timeToNext < 0) return ShouldThrottle(n, out waitTime);
waitTime = TimeSpan.FromTicks(timeToNext);
return true;
}
tokens -= n;
waitTime = TimeSpan.Zero;
return false;
}
}
protected abstract void RefillTokens();
public bool ShouldThrottle(out TimeSpan waitTime)
{
return ShouldThrottle(1, out waitTime);
}
public long CurrentTokenCount
{
get
{
lock (syncObj)
{
RefillTokens();
return tokens;
}
}
}
}
}
抽象方法 RefillTokens 允许子类自定义令牌补充逻辑。以下是 固定令牌桶 的实现:
class FixedTokenBucket : TokenBucket
{
public FixedTokenBucket(long maxTokens, long interval, long intervalMs)
: base(maxTokens, interval, intervalMs) { }
protected override void RefillTokens()
{
var now = SystemTime.UtcNow.Ticks;
if (now < nextRefillTime) return;
tokens = bucketTokenCapacity;
nextRefillTime = now + ticksRefillInterval;
}
}
在 ShouldThrottle 中,通过锁确保线程安全,同时检查是否需要重置桶。成功消费后令牌数递减;令牌不足时返回距下次重置的等待时间。
二、漏桶算法实现
漏桶算法将请求比作水,桶以恒定速率出水(响应速率)。若水流过快则溢出(即拒绝请求),从而强制限制传输速率。
关键参数包括 桶大小(允许突发流量)和 漏洞大小(恒定流出速率)。
漏桶抽象类 LeakyTokenBucket 继承自 TokenBucket,主要差异在于重置方式,新增两个属性:stepTokens(每步流出的令牌数)和 ticksStepInterval(步长间隔)。
abstract class LeakyTokenBucket : TokenBucket
{
protected readonly long stepTokens;
protected long ticksStepInterval;
protected LeakyTokenBucket(long maxTokens, long interval, int intervalMs,
long stepTokens, long stepInterval, int stepIntervalMs)
: base(maxTokens, interval, intervalMs)
{
this.stepTokens = stepTokens;
if (stepInterval < 0) throw new ArgumentOutOfRangeException("stepInterval");
if (stepTokens < 0) throw new ArgumentOutOfRangeException("stepTokens");
if (stepIntervalMs <= 0) throw new ArgumentOutOfRangeException("stepIntervalMs");
ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalMs).Ticks;
}
}
漏桶的具体实现分为 满桶(初始即装满)和 空桶(逐步填充)。
满桶实现
class StepDownTokenBucket : LeakyTokenBucket
{
public StepDownTokenBucket(long maxTokens, long interval, int intervalMs,
long stepTokens, long stepInterval, int stepIntervalMs)
: base(maxTokens, interval, intervalMs, stepTokens, stepInterval, stepIntervalMs) { }
protected override void RefillTokens()
{
var now = SystemTime.UtcNow.Ticks;
if (now >= nextRefillTime)
{
tokens = bucketTokenCapacity;
nextRefillTime = now + ticksRefillInterval;
return;
}
var timeToNext = nextRefillTime - now;
var steps = timeToNext / ticksStepInterval;
var maxPossible = steps * stepTokens;
if ((timeToNext % ticksStepInterval) > 0) maxPossible += stepTokens;
if (maxPossible < tokens) tokens = maxPossible;
}
}
空桶实现
class StepUpLeakyTokenBucket : LeakyTokenBucket
{
private long lastActiveTime;
public StepUpLeakyTokenBucket(long maxTokens, long interval, int intervalMs,
long stepTokens, long stepInterval, int stepIntervalMs)
: base(maxTokens, interval, intervalMs, stepTokens, stepInterval, stepIntervalMs) { }
protected override void RefillTokens()
{
var now = SystemTime.UtcNow.Ticks;
if (now >= nextRefillTime)
{
tokens = stepTokens;
lastActiveTime = now;
nextRefillTime = now + ticksRefillInterval;
return;
}
var elapsed = now - lastActiveTime;
var steps = elapsed / ticksStepInterval;
tokens += steps * stepTokens;
if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;
lastActiveTime = now;
}
}
三、流控服务封装
接口 IThrottleStrategy 定义了流控核心方法:
public interface IThrottleStrategy
{
bool ShouldThrottle(long n = 1);
bool ShouldThrottle(long n, out TimeSpan waitTime);
bool ShouldThrottle(out TimeSpan waitTime);
long CurrentTokenCount { get; }
}
策略配置类 FlowControlStrategy 定义了流控维度、阈值等信息:
public class FlowControlStrategy
{
public string ID { get; set; }
public string Name { get; set; }
public FlowControlStrategyType StrategyType { get; set; }
public long IntThreshold { get; set; }
public decimal DoubleThreshold { get; set; }
public FlowControlTimespan TimeSpan { get; set; }
public Dictionary<string, string> FlowControlConfigs { get; set; } = new Dictionary<string, string>();
public string Descriptions { get; set; }
public bool IsRefusedRequest { get; set; }
public DateTime CreateTime { get; set; }
public string Creator { get; set; }
public DateTime LastModifyTime { get; set; }
public string LastModifier { get; set; }
}
策略类型枚举 FlowControlStrategyType 支持 TPS、Sum(限定时间内总次数)和 Delay:
public enum FlowControlStrategyType { TPS, Sum, Delay }
每种策略对应一个流控器。例如 TPS 控制器使用固定令牌桶:
class TPSFlowController : IFlowController
{
public IThrottleStrategy InnerThrottleStrategy { get; private set; }
public FlowControlStrategy FlowControlStrategy { get; private set; }
public bool ShouldThrottle(long n, out TimeSpan waitTime)
=> InnerThrottleStrategy.ShouldThrottle(n, out waitTime);
public TPSFlowController(FlowControlStrategy strategy)
{
FlowControlStrategy = strategy;
InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, 1, 1000);
}
}
Sum 控制器支持按秒、分、时、日重置桶:
class SumFlowController : IFlowController
{
public IThrottleStrategy InnerThrottleStrategy { get; private set; }
public FlowControlStrategy FlowControlStrategy { get; private set; }
public bool ShouldThrottle(long n, out TimeSpan waitTime)
=> InnerThrottleStrategy.ShouldThrottle(n, out waitTime);
public SumFlowController(FlowControlStrategy strategy)
{
FlowControlStrategy = strategy;
var interval = GetRefillInterval(strategy);
InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, interval, 1000);
}
private long GetRefillInterval(FlowControlStrategy strategy)
{
return strategy.TimeSpan switch
{
FlowControlTimespan.Second => 1,
FlowControlTimespan.Minute => 60,
FlowControlTimespan.Hour => 3600,
FlowControlTimespan.Day => 86400,
_ => 0
};
}
}
通过工厂类 FlowControllerFactory 创建并缓存流控器:
class FlowControllerFactory
{
private static Dictionary<string, IFlowController> _controllers = new Dictionary<string, IFlowController>();
private static readonly object _lock = new object();
private static readonly FlowControllerFactory _instance = new FlowControllerFactory();
private FlowControllerFactory() { }
public static FlowControllerFactory Instance => _instance;
public IFlowController GetOrCreate(FlowControlStrategy strategy)
{
if (strategy == null) throw new ArgumentNullException(nameof(strategy));
if (!_controllers.ContainsKey(strategy.ID))
{
lock (_lock)
{
if (!_controllers.ContainsKey(strategy.ID))
{
var controller = Create(strategy);
if (controller != null) _controllers[strategy.ID] = controller;
}
}
}
return _controllers.TryGetValue(strategy.ID, out var ctrl) ? ctrl : null;
}
private IFlowController Create(FlowControlStrategy strategy)
{
return strategy.StrategyType switch
{
FlowControlStrategyType.TPS => new TPSFlowController(strategy),
FlowControlStrategyType.Delay => new DelayFlowController(strategy),
FlowControlStrategyType.Sum => new SumFlowController(strategy),
_ => null
};
}
}
最终通过 FlowControlService 对外提供统一入口,内部决定等待或拒绝:
public class FlowControlService
{
public static void FlowControl(FlowControlStrategy strategy, int count = 1)
{
var controller = FlowControllerFactory.Instance.GetOrCreate(strategy);
TimeSpan wait;
if (controller.ShouldThrottle(count, out wait))
{
if (!strategy.IsRefusedRequest && wait != TimeSpan.Zero)
{
WaitUntilAvailable(strategy, controller, wait, count);
}
else if (strategy.IsRefusedRequest)
{
throw new Exception("触发流控!");
}
}
}
private static void WaitUntilAvailable(FlowControlStrategy strategy, IFlowController controller, TimeSpan wait, int count)
{
var delay = wait;
if (strategy.StrategyType == FlowControlStrategyType.Delay)
{
Thread.Sleep(delay);
return;
}
while (controller.ShouldThrottle(count, out delay))
{
Thread.Sleep(delay);
}
}
}
四、使用示例
通过 Nuget 安装包:
Install-Package CZ.FlowControl.Service -Version 1.0.0
之后即可在代码中定义策略并调用流控服务。详细代码及更多示例请参见 GitHub 仓库。
欢迎使用和贡献改进。