C# 异步流 IAsyncEnumerable 完全指南
IAsyncEnumerable<T> 的本质
这是 C# 8.0 引入的异步迭代模式,核心在于延迟拉取——消费者请求时才异步生成下一个元素,而非预先构建完整集合。
与传统模式的差异
| 模式 | 内存占用 | 启动延迟 | 适用场景 |
|---|---|---|---|
| List<T> | 全量驻留 | 等待全部就绪 | 小数据集 |
| Task<List<T>> | 全量驻留 | 异步等待全部 | 中等数据,可接受等待 |
| IAsyncEnumerable<T> | 单元素缓冲 | 首元素就绪即处理 | 大数据流、实时流 |
核心语法结构
// 生产者:异步生成器
async IAsyncEnumerable<TResult> StreamDataAsync()
{
while (await HasMoreAsync())
{
await Task.Delay(50); // 模拟 IO
yield return await FetchNextAsync(); // 惰式产出
}
}
// 消费者:异步迭代
await foreach (var element in StreamDataAsync())
{
Process(element); // 立即处理,无需等待后续元素
}
实战场景:日志流处理
async IAsyncEnumerable<LogEntry> TailLogStreamAsync(
string logPath,
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var fs = new FileStream(
logPath,
FileMode.Open,
FileAccess.Read,
FileShare.ReadWrite | FileShare.Delete,
bufferSize: 4096,
useAsync: true);
using var sr = new StreamReader(fs, Encoding.UTF8);
while (!ct.IsCancellationRequested)
{
var line = await sr.ReadLineAsync(ct);
if (line is null)
{
await Task.Delay(100, ct); // 等待新内容写入
continue;
}
yield return LogEntry.Parse(line);
}
}
分页 API 的流式封装
public record DeviceReading(
Guid SensorId,
double Temperature,
DateTimeOffset CapturedAt);
async IAsyncEnumerable<DeviceReading> PollSensorDataAsync(
string endpoint,
[EnumeratorCancellation] CancellationToken ct = default)
{
var http = new HttpClient();
int cursor = 0;
const int pageSize = 50;
while (true)
{
ct.ThrowIfCancellationRequested();
var page = await http.GetFromJsonAsync<List<DeviceReading>>(
$"{endpoint}?from={cursor}&limit={pageSize}",
ct);
if (page?.Count is null or 0)
break;
foreach (var reading in page)
{
yield return reading;
}
cursor += page.Count;
}
}
取消传播的正确方式
[EnumeratorCancellation] 特性确保 await foreach 传入的令牌能穿透到生成器内部:
// 定义时标注
async IAsyncEnumerable<int> GenerateWithCancelAsync(
[EnumeratorCancellation] CancellationToken outerToken = default)
{
for (int i = 0; i < 1000; i++)
{
// 无需手动传递,outerToken 自动接收 await foreach 的令牌
await Task.Delay(100, outerToken);
yield return i;
}
}
// 消费时传入
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
await foreach (var num in GenerateWithCancelAsync(cts.Token))
{
Console.WriteLine(num);
}
若省略特性,则需显式调用 WithCancellation:
// 不推荐:语法冗余
await foreach (var num in GenerateWithCancelAsync().WithCancellation(cts.Token))
{
// ...
}
自定义类型示例:信号采样
public readonly struct SignalSample
{
public long SequenceNumber { get; init; }
public float Amplitude { get; init; }
public TimeSpan Offset { get; init; }
}
async IAsyncEnumerable<SignalSample> AcquireSignalAsync(
int channel,
int totalSamples,
[EnumeratorCancellation] CancellationToken ct = default)
{
var start = TimeSpan.Zero;
for (long seq = 0; seq < totalSamples; seq++)
{
ct.ThrowIfCancellationRequested();
// 模拟硬件采集延迟
await Task.Delay(16, ct); // ~60Hz 采样率
yield return new SignalSample
{
SequenceNumber = seq,
Amplitude = MathF.Sin(seq * 0.1f) * channel,
Offset = start + TimeSpan.FromMilliseconds(seq * 16)
};
}
}
并行消费模式
如需并行处理流元素,可借助 Parallel.ForEachAsync:
await Parallel.ForEachAsync(
AcquireSignalAsync(channel: 1, totalSamples: 1000, cts.Token),
new ParallelOptions
{
MaxDegreeOfParallelism = 4,
CancellationToken = cts.Token
},
async (sample, ct) =>
{
await ProcessSampleAsync(sample, ct);
});
关键配置速查
async IAsyncEnumerable<T> | 声明异步生成方法 |
yield return | 惰式产出单个元素 |
await foreach | 异步迭代消费 |
[EnumeratorCancellation] | 令牌自动穿透 |
ConfigureAwait(false) | 库代码中避免上下文捕获 |