C# 复合取消令牌源的原理与实践
C# 复合取消令牌源的原理与实践
在 C# 异步编程中,CancellationTokenSource.CreateLinkedTokenSource 方法提供了一种强大的机制,允许开发者将多个取消令牌(CancellationToken)有效地组合成一个单一的取消信号。本文将深入探讨这一"复合取消令牌源"的功能、工作原理及其在各种复杂场景下的应用。
一个复合取消令牌源,顾名思义,能够将多个独立的取消令牌聚合起来。其核心特性是,只要这些被链接的令牌中任意一个发出取消请求,这个复合令牌源也会被取消。这在需要同时响应多种取消条件(例如用户操作、系统超时或资源压力)的异步操作中尤为实用。
基本概念
复合令牌源是什么?
复合取消令牌源通过 CancellationTokenSource.CreateLinkedTokenSource 静态方法创建。以下是一个基本示例:
// 声明两个独立的任务取消信号源
var sourceOne = new CancellationTokenSource();
var sourceTwo = new CancellationTokenSource();
// 创建一个复合取消令牌源。当 sourceOne 或 sourceTwo 触发取消时,compositeSource 也会取消。
var compositeSource = CancellationTokenSource.CreateLinkedTokenSource(
sourceOne.Token,
sourceTwo.Token
);
// 获取由多个信号源组合而成的最终取消令牌
CancellationToken unifiedToken = compositeSource.Token;
主要特点
- "逻辑或"关系:复合令牌会在其任一上游源令牌发出取消请求时被激活。
- 生命周期管理:复合令牌源实现了
IDisposable接口,因此需要显式地进行资源释放。推荐使用using语句来确保其正确释放。 - 非级联取消:取消复合令牌源本身并不会反向影响或取消任何其原始的独立令牌源。
- 状态实时同步:复合令牌的状态会即时反映其所链接的源令牌的取消状态。
运作机制
内部工作流程
当您创建复合取消令牌源时,其内部实现通常会执行以下步骤:
graph LR
A[Cancellation Source 1] --> C[Composite Source]
B[Cancellation Source 2] --> C
C --> D[Target Operation]
- 复合令牌源会为其所链接的每一个源令牌注册一个回调函数。
- 一旦任何一个源令牌被取消,其对应的回调函数会被触发。
- 在回调函数中,复合令牌源会调用自身的
Cancel()方法来发出取消信号。
状态传播时序
sequenceDiagram
participant SourceA
participant SourceB
participant CompositeCTS
participant Operation
SourceA->>CompositeCTS: Trigger Cancel()
CompositeCTS->>Operation: Propagate Cancellation Request
Operation->>Operation: Execute Cancellation Logic
SourceB->>CompositeCTS: Trigger Cancel() (No further effect if already cancelled)
典型应用场景
组合超时与用户取消
在一个网络请求中,我们可能需要同时响应用户的主动取消和预设的系统超时限制。
public async Task<HttpResponseMessage> FetchDataWithExpirationAsync(string targetUrl, CancellationToken clientRequestToken)
{
// 创建一个5秒的超时取消信号源
using var expirationCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
// 将用户请求令牌与超时令牌组合
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(
clientRequestToken,
expirationCts.Token
);
try
{
using var httpClient = new HttpClient();
return await httpClient.GetAsync(targetUrl, combinedCts.Token);
}
catch (OperationCanceledException ex)
{
// 根据哪个令牌被取消来判断具体原因
if (expirationCts.IsCancellationRequested)
throw new TimeoutException("数据获取操作超时。", ex);
throw; // 可能是用户取消或其他原因
}
}
构建分层取消机制
在复杂的应用中,可以设计一个分层的取消系统,例如一个全局的取消信号可以级联到所有子任务,同时每个子任务也可以有其特定的取消条件。
public class HierarchicalTaskProcessor : IDisposable
{
private readonly CancellationTokenSource _masterCancellationTokenSource = new();
private readonly List<CancellationTokenSource> _subTaskCancellationSources = new();
public CancellationToken GenerateSubTaskToken(params CancellationToken[] additionalConditions)
{
// 组合全局令牌和任务特有令牌
var allTokens = new List<CancellationToken> { _masterCancellationTokenSource.Token };
allTokens.AddRange(additionalConditions);
var subTaskCts = CancellationTokenSource.CreateLinkedTokenSource(allTokens.ToArray());
_subTaskCancellationSources.Add(subTaskCts);
return subTaskCts.Token;
}
// 允许根据特定令牌取消对应层级的任务
public void TriggerSubTaskCancellation(CancellationToken tokenToCancel)
{
var matchedCts = _subTaskCancellationSources.FirstOrDefault(c => c.Token == tokenToCancel);
matchedCts?.Cancel();
}
// 释放所有资源
public void Dispose()
{
foreach (var cts in _subTaskCancellationSources) cts.Dispose();
_masterCancellationTokenSource.Dispose();
// 清理列表以防止重复释放或引用无效对象
_subTaskCancellationSources.Clear();
}
}
注册资源清理回调
当一个操作被取消时,可能需要清理一些临时资源。复合令牌可以确保无论何种原因导致操作取消,清理逻辑都能被执行。
public void HandleTemporaryFile(string sourcePath, CancellationToken processingToken)
{
string tempFilePath = Path.GetTempFileName(); // 假设创建一个临时文件
// 注册当令牌取消时执行的清理操作
processingToken.Register(() => {
if (File.Exists(tempFilePath))
{
File.Delete(tempFilePath); // 取消时删除临时文件
Console.WriteLine($"处理取消: 已删除临时文件 {tempFilePath}");
}
});
// ... 实际文件处理逻辑 ...
Console.WriteLine($"开始处理文件: {sourcePath},临时文件: {tempFilePath}");
// 模拟耗时操作
Thread.Sleep(2000);
processingToken.ThrowIfCancellationRequested();
Console.WriteLine($"文件处理完成: {sourcePath}");
// 如果处理完成,则可以主动注销回调或在 disposing 时清理
}
// 使用示例:
var primaryCts = new CancellationTokenSource();
// 假设 CreateMemoryThresholdToken() 返回一个在内存压力过大时触发的令牌
var operationCts = CancellationTokenSource.CreateLinkedTokenSource(
primaryCts.Token,
CreateMemoryThresholdToken()
);
// 启动文件处理
HandleTemporaryFile("sample.txt", operationCts.Token);
// 模拟主任务取消
// primaryCts.Cancel();
// 模拟内存压力过大,如果 CreateMemoryThresholdToken() 实现了该逻辑
// ...
operationCts.Dispose(); // 确保复合令牌源被释放
// 辅助方法,模拟一个基于内存阈值的取消令牌
private static CancellationToken CreateMemoryThresholdToken()
{
// 实际场景中,这里会有一个后台任务监控内存并触发取消
var cts = new CancellationTokenSource();
// 示例中我们不真正监控,只是返回一个令牌
// cts.CancelAfter(5000); // 假设5秒后因内存问题取消
return cts.Token;
}
高级技巧与用法
动态添加取消条件
在某些运行时场景中,我们可能需要动态地向一个已有的复合令牌源添加新的取消条件。
public class DynamicCancellationManager : IDisposable
{
private CancellationTokenSource _initialCts = new();
private CancellationTokenSource _currentLinkedCts;
public DynamicCancellationManager()
{
// 初始时只链接一个基本令牌
_currentLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(_initialCts.Token);
}
public CancellationToken GetCurrentCombinedToken() => _currentLinkedCts.Token;
// 运行时添加新的取消条件
public void AddDynamicCancellationCondition(CancellationToken additionalConditionToken)
{
// 创建一个新的复合源,包含现有令牌和新令牌
var updatedLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(
_currentLinkedCts.Token,
additionalConditionToken
);
// 替换并释放旧的复合源,以防止资源泄漏
_currentLinkedCts.Dispose();
_currentLinkedCts = updatedLinkedCts;
Console.WriteLine("动态添加了新的取消条件。");
}
public void TriggerInitialCancellation()
{
_initialCts.Cancel();
Console.WriteLine("初始令牌源已取消。");
}
public void Dispose()
{
_currentLinkedCts.Dispose();
_initialCts.Dispose();
}
}
诊断取消的来源
当一个操作被取消时,了解是哪个具体的源令牌导致取消对于调试和错误处理至关重要。
public async Task AnalyzeCancellationOrigin(CancellationToken sourceTokenA, CancellationToken sourceTokenB)
{
using var compositeTokenSource = CancellationTokenSource.CreateLinkedTokenSource(sourceTokenA, sourceTokenB);
try
{
await PerformComplexComputationAsync(compositeTokenSource.Token);
}
catch (OperationCanceledException)
{
// 检查原始令牌的状态以诊断取消原因
if (sourceTokenA.IsCancellationRequested && !sourceTokenB.IsCancellationRequested)
Console.WriteLine("取消来源: 主要由 sourceTokenA 触发");
else if (!sourceTokenA.IsCancellationRequested && sourceTokenB.IsCancellationRequested)
Console.WriteLine("取消来源: 主要由 sourceTokenB 触发");
else if (sourceTokenA.IsCancellationRequested && sourceTokenB.IsCancellationRequested)
Console.WriteLine("取消来源: 多个源(sourceTokenA 和 sourceTokenB)同时触发");
else
Console.WriteLine("取消来源: 未知(可能是在复合令牌源创建前已取消的令牌)");
}
}
// 模拟一个耗时操作
private async Task PerformComplexComputationAsync(CancellationToken token)
{
Console.WriteLine("复杂计算开始...");
await Task.Delay(5000, token); // 模拟5秒操作,可被取消
Console.WriteLine("复杂计算完成。");
}
与并行操作的集成
复合令牌源可以很好地与 TPL (Task Parallel Library) 中的并行操作结合,以响应多种取消条件。
public class WorkItem { public int Id { get; set; } } // 示例数据项
public void ProcessCollectionInParallel(IEnumerable<WorkItem> items, CancellationToken overallCancellation)
{
// 创建一个模拟的内存压力监控令牌
// ResourceConstraintCancellationTokenSource 是一个假设的自定义类,用于在达到内存阈值时触发取消
using var memoryMonitorCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // 示例:10秒后模拟内存不足
// 组合外部提供的取消令牌和内部的内存监控令牌
using var parallelExecutionCts = CancellationTokenSource.CreateLinkedTokenSource(
overallCancellation,
memoryMonitorCts.Token
);
var options = new ParallelOptions {
CancellationToken = parallelExecutionCts.Token,
MaxDegreeOfParallelism = Environment.ProcessorCount // 根据处理器核心数设置并行度
};
try
{
Parallel.ForEach(items, options, item => {
// 在这里处理每个数据项
Console.WriteLine($"处理 WorkItem {item.Id}...");
Thread.Sleep(new Random().Next(100, 500)); // 模拟工作
options.CancellationToken.ThrowIfCancellationRequested(); // 在循环内部检查取消
});
Console.WriteLine("所有 WorkItems 已并行处理完成。");
}
catch (OperationCanceledException)
{
Console.WriteLine("并行处理操作被取消。");
}
catch (Exception ex)
{
Console.WriteLine($"并行处理发生错误: {ex.Message}");
}
}
资源管理与最佳实践
正确的释放模式
由于 CancellationTokenSource 实现了 IDisposable,务必正确释放它所持有的资源,特别是在创建复合令牌源时。
// 正确的做法:使用 using 语句确保资源被释放
using (var ctsAlpha = new CancellationTokenSource())
using (var ctsBeta = new CancellationTokenSource())
using (var compositeCts = CancellationTokenSource.CreateLinkedTokenSource(ctsAlpha.Token, ctsBeta.Token))
{
// 使用 compositeCts.Token 进行操作...
Console.WriteLine("正确地管理了 CancellationTokenSource 资源。");
}
// 错误的做法:忘记释放复合令牌源可能导致内存泄漏
var ctsX = new CancellationTokenSource();
var ctsY = new CancellationTokenSource();
var unmanagedCompositeCts = CancellationTokenSource.CreateLinkedTokenSource(ctsX.Token, ctsY.Token);
// 如果没有调用 unmanagedCompositeCts.Dispose(),相关的回调注册可能不会被清理,导致潜在的内存泄漏。
Console.WriteLine("错误地管理了 CancellationTokenSource 资源(未调用 Dispose)。");
ctsX.Dispose();
ctsY.Dispose();
性能优化建议
在创建复合令牌源时,避免不必要的嵌套可以提高效率。
// 不推荐:多层嵌套链接
var source1 = new CancellationTokenSource();
var source2 = new CancellationTokenSource();
using var linkedIntermediate = CancellationTokenSource.CreateLinkedTokenSource(source1.Token, source2.Token);
using var finalLinked = CancellationTokenSource.CreateLinkedTokenSource(linkedIntermediate.Token, anotherExternalToken);
// 推荐:扁平化令牌组合,一次性链接所有源
using var simplifiedLinked = CancellationTokenSource.CreateLinkedTokenSource(
source1.Token,
source2.Token,
anotherExternalToken
);
生命周期管理策略
在组件或服务中管理取消令牌的生命周期,确保在组件销毁时所有相关的取消源都被正确处理。
public class SystemTaskCoordinator : IDisposable
{
private readonly List<CancellationTokenSource> _managedCancellationSources = new();
private readonly CancellationTokenSource _systemWideCts = new();
public CancellationToken RegisterManagedTask(CancellationToken specificTaskToken)
{
var newCts = CancellationTokenSource.CreateLinkedTokenSource(
_systemWideCts.Token,
specificTaskToken
);
_managedCancellationSources.Add(newCts);
return newCts.Token;
}
public void Dispose()
{
// 先触发全局取消,通知所有相关任务停止
_systemWideCts.Cancel();
// 然后释放所有注册的复合取消源和全局源
foreach (var cts in _managedCancellationSources)
{
cts.Dispose();
}
_managedCancellationSources.Clear(); // 清理列表
_systemWideCts.Dispose();
Console.WriteLine("SystemTaskCoordinator 及其所有相关取消源已释放。");
}
}
常见问题与解决方案
复合令牌源未被取消?
可能的原因包括:
- 链接的任一源令牌从未被实际取消。
- 在创建复合令牌源时,某个源令牌已经处于取消状态,但你期望它在未来被取消(创建时状态被固定)。
- 回调注册可能因某些异常而失败。
诊断方法:
// 检查所有原始源令牌的取消状态
System.Diagnostics.Debug.Assert(mySource1.IsCancellationRequested || mySource2.IsCancellationRequested,
"任一源令牌应该已发出取消请求。");
// 检查复合令牌源自身的取消状态
System.Diagnostics.Debug.Assert(myCompositeCts.IsCancellationRequested,
"复合令牌源应该已处于取消状态。");
内存泄漏问题
症状:取消操作完成后,应用程序的内存占用并未如预期般下降。
解决方案:
public void PerformCancellationAndResourceRelease()
{
// 1. 首先触发全局取消信号
_masterCts.Cancel();
// 2. 迭代并释放所有主动链接的令牌源
foreach (var cts in _activeLinkedSources)
{
cts.Dispose(); // 关键步骤:释放内部资源,注销回调
}
_activeLinkedSources.Clear(); // 清空列表,解除引用
_masterCts.Dispose(); // 释放主取消源
Console.WriteLine("所有取消源和相关资源均已清理。");
}
取消响应延迟
为了提高取消的响应速度,尤其是在长循环或计算密集型任务中,可以增加检查点。
// 在耗时循环中混合使用两种检查方式
for (int i = 0; i < itemsToProcess.Length; i++)
{
// 周期性快速检查(例如,每100次迭代)
if (i % 100 == 0) cancellationCheckToken.ThrowIfCancellationRequested();
// ... 执行当前数据项的耗时处理 ...
// 在关键操作之前或之后添加额外的检查点
if (IsEnteringCriticalSection(i))
cancellationCheckToken.ThrowIfCancellationRequested();
}
实战案例:Web 服务请求处理
在一个处理客户端 Web 请求的服务中,我们可能需要组合客户端的断开连接信号、服务内部的全局超时和数据库查询的特定超时。
public enum ServiceResponseStatus { Success, ClientCanceled, ServiceTimeout, DatabaseTimeout, UnknownCancel }
public class DatabaseAccess
{
public async Task<string> ExecuteQueryAsync(string query, CancellationToken token)
{
Console.WriteLine($"数据库查询开始: {query}");
await Task.Delay(new Random().Next(2000, 8000), token); // 模拟耗时查询
token.ThrowIfCancellationRequested();
Console.WriteLine($"数据库查询完成: {query}");
return $"Result for {query}";
}
}
public class RequestProcessor
{
private readonly DatabaseAccess _dataRepository = new();
public async Task<ServiceResponseStatus> HandleServiceRequestAsync(string requestPayload, CancellationToken consumerToken)
{
// 创建服务级别全局超时(30秒)
using var serviceTimeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// 创建数据库查询的独立超时(10秒)
using var dataAccessTimeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// 组合核心令牌:客户端取消 + 服务全局超时
using var overallRequestCts = CancellationTokenSource.CreateLinkedTokenSource(
consumerToken,
serviceTimeoutCts.Token
);
try
{
// 步骤1:验证请求(快速操作,不需频繁检查取消)
Console.WriteLine("验证请求...");
// SimulateValidation(requestPayload);
// 步骤2:数据库查询(引入数据库特定超时,并与整体请求令牌组合)
using var dbQueryCts = CancellationTokenSource.CreateLinkedTokenSource(
overallRequestCts.Token,
dataAccessTimeoutCts.Token
);
var dbQueryResult = await _dataRepository.ExecuteQueryAsync(requestPayload, dbQueryCts.Token);
// 步骤3:CPU密集型处理(使用并行选项,并定期检查整体取消令牌)
var processedData = await Task.Run(() =>
{
var parallelOptions = new ParallelOptions {
CancellationToken = overallRequestCts.Token,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Console.WriteLine("开始并行处理数据...");
return ProcessDataConcurrently(dbQueryResult, parallelOptions);
}, overallRequestCts.Token); // Task.Run 本身也接受取消令牌
Console.WriteLine($"请求处理成功,结果: {processedData}");
return ServiceResponseStatus.Success;
}
catch (OperationCanceledException)
{
// 精确诊断取消的原因
if (consumerToken.IsCancellationRequested)
return ServiceResponseStatus.ClientCanceled;
if (serviceTimeoutCts.IsCancellationRequested)
return ServiceResponseStatus.ServiceTimeout;
if (dataAccessTimeoutCts.IsCancellationRequested)
return ServiceResponseStatus.DatabaseTimeout;
return ServiceResponseStatus.UnknownCancel;
}
catch (Exception ex)
{
Console.WriteLine($"处理请求时发生未知错误: {ex.Message}");
return ServiceResponseStatus.UnknownCancel; // 或其他适当的错误码
}
}
private string ProcessDataConcurrently(string inputData, ParallelOptions options)
{
// 模拟并行处理
var processed = new System.Text.StringBuilder();
for (int i = 0; i < 5; i++)
{
options.CancellationToken.ThrowIfCancellationRequested();
Console.WriteLine($" 并行子任务 {i} 处理中...");
Thread.Sleep(new Random().Next(1000, 3000));
processed.Append($"[ProcessedChunk{i}]");
}
return processed.ToString();
}
}
// 示例调用
// var processor = new RequestProcessor();
// var clientRequestCts = new CancellationTokenSource();
// var result = await processor.HandleServiceRequestAsync("Sample Query", clientRequestCts.Token);
// Console.WriteLine($"最终服务响应状态: {result}");
// clientRequestCts.Dispose();
性能考量
创建开销
- 每个
CancellationTokenSource对象的创建开销约为 100 纳秒。 - 每个令牌的注册(即
Register方法)开销约为 50 纳秒。 - 取消信号的传播开销(每链接一个令牌)约为 20 纳秒。
优化建议
在条件不变的情况下,应尽可能重用已有的复合令牌源,避免频繁创建和销毁。
public class ReusableTokenManager : IDisposable
{
private readonly CancellationToken _baseActivityToken;
private CancellationTokenSource _reusableCompositeCts;
public ReusableTokenManager(CancellationToken baseToken)
{
_baseActivityToken = baseToken;
}
public CancellationToken GetCombinedToken(CancellationToken dynamicConditionToken)
{
// 如果缓存的复合令牌源有效且未被取消,则直接返回其令牌
if (_reusableCompositeCts != null && !_reusableCompositeCts.IsCancellationRequested)
{
return _reusableCompositeCts.Token;
}
// 否则,释放旧的(如果存在且已取消或无效),然后创建新的
_reusableCompositeCts?.Dispose();
_reusableCompositeCts = CancellationTokenSource.CreateLinkedTokenSource(
_baseActivityToken,
dynamicConditionToken
);
Console.WriteLine("创建新的复合令牌源或重用了现有令牌源。");
return _reusableCompositeCts.Token;
}
public void Dispose()
{
_reusableCompositeCts?.Dispose();
}
}
内存占用
- 每个
CancellationTokenSource对象大约占用 40 字节。 - 每个链接操作会额外增加约 24 字节。
- 每个回调注册会增加约 32 字节的开销。
提示:在需要频繁创建取消令牌源的场景(例如,短生命周期的并行任务),可以考虑使用对象池技术来减少内存分配和垃圾回收的压力。