Python 异步编程中的异步迭代器详解
异步迭代器:在 asyncio 中实现自定义异步遍历
在 Python 的异步编程中,除了协程和任务之外,还有一类强大的工具——异步迭代器(asynchronous iterators)。它们允许我们在 async for 循环中逐步获取数据,尤其适用于需要按需生成或等待结果的场景。
理解异步迭代器的核心机制
一个对象要成为异步迭代器,必须实现两个特殊方法:
- __aiter__():返回异步迭代器实例本身,通常直接返回
self。 - __anext__():返回一个可等待对象(awaitable),该对象在完成时产生下一个值。当没有更多元素时,必须抛出
StopAsyncIteration异常以终止循环。
与传统的同步迭代器不同,异步迭代器的每一步都可以包含异步操作,例如网络请求、文件读取或延时处理。
如何定义一个异步迭代器
下面是一个自定义异步迭代器的示例,它每次返回递增的数字,并在每次迭代之间模拟耗时操作:
import asyncio
class NumberStream:
def __init__(self, start=1, end=5):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current > self.end:
raise StopAsyncIteration
# 模拟异步工作,如 I/O 操作
await asyncio.sleep(0.5)
value = self.current
self.current += 1
return value
在这个例子中,__anext__ 是一个协程函数(使用 async def 定义),因此它可以 await 其他协程,比如 asyncio.sleep()。
使用 async for 遍历异步迭代器
最常见的方式是使用 async for 语句来消费异步迭代器。这个语法只能在协程内部使用。
async def main():
async for number in NumberStream(1, 3):
print(f"Received: {number}")
# 启动事件循环
asyncio.run(main())
输出将会是:
Received: 1
Received: 2
Received: 3
每次循环都会自动调用 __anext__() 并等待其结果,整个过程是非阻塞的,但顺序执行。
手动控制迭代:使用 anext()
如果需要更细粒度地控制迭代过程,可以使用内置的 anext() 函数:
async def manual_traversal():
stream = NumberStream(1, 2)
try:
while True:
value = await anext(stream)
print(f"Got: {value}")
except StopAsyncIteration:
print("No more items")
anext() 接收一个异步迭代器并返回一个 awaitable,通过 await 获取实际值。当迭代结束时会触发 StopAsyncIteration,应妥善捕获该异常。
结合异步列表推导式收集结果
还可以利用异步列表推导式从异步迭代器中构建结果列表:
async def collect_results():
results = [x async for x in NumberStream(1, 4)]
print(results) # 输出: [1, 2, 3, 4]
这种写法简洁且符合 Python 风格,在需要将流式数据转为集合时非常实用。
异步生成器:更简单的替代方案
对于许多情况,无需手动实现 __aiter__ 和 __anext__。Python 提供了异步生成器函数,只需在生成器前加上 async 即可:
async def number_generator(start, stop):
for i in range(start, stop + 1):
await asyncio.sleep(0.5)
yield i
async def use_generator():
async for num in number_generator(1, 3):
print(num)
这种方式更加简洁,功能上等价于手动定义的异步迭代器。