当前位置:首页 > 技术 > 正文内容

C# 异步流 IAsyncEnumerable 完全指南

访客 技术 2026年7月1日 1

IAsyncEnumerable<T> 的本质

这是 C# 8.0 引入的异步迭代模式,核心在于延迟拉取——消费者请求时才异步生成下一个元素,而非预先构建完整集合。

与传统模式的差异

模式内存占用启动延迟适用场景
List<T>全量驻留等待全部就绪小数据集
Task<List<T>>全量驻留异步等待全部中等数据,可接受等待
IAsyncEnumerable<T>单元素缓冲首元素就绪即处理大数据流、实时流

核心语法结构

// 生产者:异步生成器
async IAsyncEnumerable<TResult> StreamDataAsync()
{
    while (await HasMoreAsync())
    {
        await Task.Delay(50); // 模拟 IO
        yield return await FetchNextAsync(); // 惰式产出
    }
}

// 消费者:异步迭代
await foreach (var element in StreamDataAsync())
{
    Process(element); // 立即处理,无需等待后续元素
}

实战场景:日志流处理

async IAsyncEnumerable<LogEntry> TailLogStreamAsync(
    string logPath,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await using var fs = new FileStream(
        logPath, 
        FileMode.Open, 
        FileAccess.Read, 
        FileShare.ReadWrite | FileShare.Delete,
        bufferSize: 4096,
        useAsync: true);

    using var sr = new StreamReader(fs, Encoding.UTF8);
    
    while (!ct.IsCancellationRequested)
    {
        var line = await sr.ReadLineAsync(ct);
        
        if (line is null)
        {
            await Task.Delay(100, ct); // 等待新内容写入
            continue;
        }

        yield return LogEntry.Parse(line);
    }
}

分页 API 的流式封装

public record DeviceReading(
    Guid SensorId, 
    double Temperature, 
    DateTimeOffset CapturedAt);

async IAsyncEnumerable<DeviceReading> PollSensorDataAsync(
    string endpoint,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    var http = new HttpClient();
    int cursor = 0;
    const int pageSize = 50;

    while (true)
    {
        ct.ThrowIfCancellationRequested();
        
        var page = await http.GetFromJsonAsync<List<DeviceReading>>(
            $"{endpoint}?from={cursor}&limit={pageSize}", 
            ct);

        if (page?.Count is null or 0)
            break;

        foreach (var reading in page)
        {
            yield return reading;
        }

        cursor += page.Count;
    }
}

取消传播的正确方式

[EnumeratorCancellation] 特性确保 await foreach 传入的令牌能穿透到生成器内部:

// 定义时标注
async IAsyncEnumerable<int> GenerateWithCancelAsync(
    [EnumeratorCancellation] CancellationToken outerToken = default)
{
    for (int i = 0; i < 1000; i++)
    {
        // 无需手动传递,outerToken 自动接收 await foreach 的令牌
        await Task.Delay(100, outerToken);
        yield return i;
    }
}

// 消费时传入
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));

await foreach (var num in GenerateWithCancelAsync(cts.Token))
{
    Console.WriteLine(num);
}

若省略特性,则需显式调用 WithCancellation

// 不推荐:语法冗余
await foreach (var num in GenerateWithCancelAsync().WithCancellation(cts.Token))
{
    // ...
}

自定义类型示例:信号采样

public readonly struct SignalSample
{
    public long SequenceNumber { get; init; }
    public float Amplitude { get; init; }
    public TimeSpan Offset { get; init; }
}

async IAsyncEnumerable<SignalSample> AcquireSignalAsync(
    int channel,
    int totalSamples,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    var start = TimeSpan.Zero;
    
    for (long seq = 0; seq < totalSamples; seq++)
    {
        ct.ThrowIfCancellationRequested();
        
        // 模拟硬件采集延迟
        await Task.Delay(16, ct); // ~60Hz 采样率
        
        yield return new SignalSample
        {
            SequenceNumber = seq,
            Amplitude = MathF.Sin(seq * 0.1f) * channel,
            Offset = start + TimeSpan.FromMilliseconds(seq * 16)
        };
    }
}

并行消费模式

如需并行处理流元素,可借助 Parallel.ForEachAsync

await Parallel.ForEachAsync(
    AcquireSignalAsync(channel: 1, totalSamples: 1000, cts.Token),
    new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4,
        CancellationToken = cts.Token 
    },
    async (sample, ct) =>
    {
        await ProcessSampleAsync(sample, ct);
    });

关键配置速查

async IAsyncEnumerable<T>声明异步生成方法
yield return惰式产出单个元素
await foreach异步迭代消费
[EnumeratorCancellation]令牌自动穿透
ConfigureAwait(false)库代码中避免上下文捕获
标签: C#

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。