PEP 525 – 异步生成器
- 作者:
- Yury Selivanov <yury at edgedb.com>
- 讨论列表:
- Python-Dev 列表
- 状态:
- 最终
- 类型:
- 标准跟踪
- 创建:
- 2016年7月28日
- Python 版本:
- 3.6
- 历史记录:
- 2016年8月2日,2016年8月23日,2016年9月1日,2016年9月6日
摘要
PEP 492 引入了对原生协程和 async
/await
语法的支持,并将其应用于 Python 3.5。本文提议扩展 Python 的异步功能,添加对异步生成器的支持。
基本原理和目标
常规生成器(在 PEP 255 中引入)提供了一种编写复杂数据生产者并使其表现得像迭代器的优雅方式。
然而,目前还没有等效的概念用于异步迭代协议(async for
)。这使得编写异步数据生产者变得不必要地复杂,因为必须定义一个实现 __aiter__
和 __anext__
的类,才能在 async for
语句中使用它。
从本质上讲,PEP 255 的目标和基本原理应用于异步执行案例,也适用于此提案。
性能是本提案的另一个亮点:在我们对参考实现的测试中,异步生成器比等效的异步迭代器实现快 2 倍。
为了说明代码质量的提升,请考虑以下打印数字并在迭代时延迟给定时间的类
class Ticker:
"""Yield numbers from 0 to `to` every `delay` seconds."""
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
同样的功能可以用更简单的异步生成器来实现
async def ticker(delay, to):
"""Yield numbers from 0 to `to` every `delay` seconds."""
for i in range(to):
yield i
await asyncio.sleep(delay)
规范
本提案引入了 Python 中异步生成器的概念。
本规范假设读者了解 Python 中生成器和协程的实现(PEP 342、PEP 380 和 PEP 492)。
异步生成器
Python 中的生成器是指包含一个或多个 yield
表达式的任何函数
def func(): # a function
return
def genfunc(): # a generator function
yield
我们建议使用相同的方法来定义异步生成器
async def coro(): # a coroutine function
await smth()
async def asyncgen(): # an asynchronous generator function
await smth()
yield 42
调用异步生成器函数的结果是一个异步生成器对象,它实现了 PEP 492 中定义的异步迭代协议。
在异步生成器中使用非空的 return
语句会导致 SyntaxError
。
异步迭代协议的支持
该协议要求实现两种特殊方法
- 一个
__aiter__
方法,返回一个异步迭代器。 - 一个
__anext__
方法,返回一个可等待对象,该对象使用StopIteration
异常来“yield”值,并使用StopAsyncIteration
异常来表示迭代结束。
异步生成器定义了这两种方法。让我们手动迭代一个简单的异步生成器
async def genfunc():
yield 1
yield 2
gen = genfunc()
assert gen.__aiter__() is gen
assert await gen.__anext__() == 1
assert await gen.__anext__() == 2
await gen.__anext__() # This line will raise StopAsyncIteration.
终结
PEP 492 要求使用事件循环或调度程序来运行协程。由于异步生成器旨在从协程中使用,因此它们也需要事件循环来运行和终结它们。
异步生成器可以包含 try..finally
块,以及 async with
。重要的是要保证,即使在部分迭代后,然后被垃圾回收,生成器也能被安全地终结。例如
async def square_series(con, to):
async with con.transaction():
cursor = con.cursor(
'SELECT generate_series(0, $1) AS i', to)
async for row in cursor:
yield row['i'] ** 2
async for i in square_series(con, 1000):
if i == 100:
break
上面的代码定义了一个异步生成器,它使用 async with
在事务中迭代数据库游标。然后,该生成器使用 async for
进行迭代,并在某个时刻中断迭代。
然后 square_series()
生成器将被垃圾回收,如果没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。
为了解决这个问题,我们建议执行以下操作
- 在异步生成器上实现一个
aclose
方法,该方法返回一个特殊的可等待对象。当被等待时,它会将GeneratorExit
抛入挂起的生成器,并对其进行迭代,直到发生GeneratorExit
或StopAsyncIteration
。这与
close()
方法对常规 Python 生成器执行的操作非常相似,只是需要事件循环来执行aclose()
。 - 当异步生成器在其
finally
块中执行yield
表达式时,抛出RuntimeError
(尽管可以使用await
)。async def gen(): try: yield finally: await asyncio.sleep(1) # Can use 'await'. yield # Cannot use 'yield', # this line will trigger a # RuntimeError.
- 向
sys
模块添加两个新方法:set_asyncgen_hooks()
和get_asyncgen_hooks()
。
sys.set_asyncgen_hooks()
背后的理念是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题,一切都能正常工作。
sys.set_asyncgen_hooks()
接受两个参数
firstiter
:一个可调用对象,当异步生成器第一次迭代时将被调用。finalizer
:一个可调用对象,当异步生成器即将被 GC 时将被调用。
当异步生成器第一次迭代时,它会存储对当前终结器的引用。
当异步生成器即将被垃圾回收时,它会调用其缓存的终结器。假设终结器会使用迭代开始时活动的循环调度 aclose()
调用。
例如,以下是 asyncio 如何修改以允许安全终结异步生成器
# asyncio/base_events.py
class BaseEventLoop:
def run_forever(self):
...
old_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
try:
...
finally:
sys.set_asyncgen_hooks(*old_hooks)
...
def _finalize_asyncgen(self, gen):
self.create_task(gen.aclose())
第二个参数 firstiter
允许事件循环维护一个在其控制下实例化的异步生成器的弱集。这使得可以实现“关闭”机制,以安全地终结所有打开的生成器并关闭事件循环。
sys.set_asyncgen_hooks()
是线程特定的,因此多个在并行线程中运行的事件循环可以安全地使用它。
sys.get_asyncgen_hooks()
返回一个类似于 namedtuple 的结构,其中包含 firstiter
和 finalizer
字段。
asyncio
asyncio 事件循环将使用 sys.set_asyncgen_hooks()
API 维护所有已调度的异步生成器的弱集,并在生成器需要被 GC 时调度其 aclose()
协程方法。
为了确保 asyncio 程序能够可靠地终结所有已调度的异步生成器,我们建议添加一个新的事件循环协程方法 loop.shutdown_asyncgens()
。该方法将调度所有当前打开的异步生成器,使用 aclose()
调用进行关闭。
调用 loop.shutdown_asyncgens()
方法后,每当新的异步生成器第一次迭代时,事件循环都会发出警告。这样做的目的是,在请求所有异步生成器关闭后,程序不应该执行迭代新异步生成器的代码。
关于如何使用 shutdown_asyncgens
协程的示例
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
异步生成器对象
该对象是根据标准 Python 生成器对象建模的。从本质上讲,异步生成器的行为旨在复制同步生成器的行为,唯一的区别在于 API 是异步的。
定义了以下方法和属性
agen.__aiter__()
:返回agen
。agen.__anext__()
:返回一个可等待对象,当被等待时执行一次异步生成器迭代。agen.asend(val)
:返回一个可等待对象,该对象将val
对象推入agen
生成器。当agen
尚未被迭代时,val
必须为None
。示例
async def gen(): await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) g = gen() await g.asend(None) # Will return 42 after sleeping # for 0.1 seconds. await g.asend('hello') # Will print 'hello' and # raise StopAsyncIteration # (after sleeping for 0.2 seconds.)
agen.athrow(typ, [val, [tb]])
:返回一个可等待对象,该对象将异常抛入agen
生成器。示例
async def gen(): try: await asyncio.sleep(0.1) yield 'hello' except ZeroDivisionError: await asyncio.sleep(0.2) yield 'world' g = gen() v = await g.asend(None) print(v) # Will print 'hello' after # sleeping for 0.1 seconds. v = await g.athrow(ZeroDivisionError) print(v) # Will print 'world' after $ sleeping 0.2 seconds.
agen.aclose()
:返回一个可等待对象(awaitable),该对象会向生成器抛出一个GeneratorExit
异常。该可等待对象可以返回一个yield的值,如果agen
处理了异常,或者agen
会被关闭并且异常会传播回调用方。agen.__name__
和agen.__qualname__
:可读写名称和限定名称属性。agen.ag_await
:agen
当前正在等待的对象,或者为None
。这类似于生成器当前可用的gi_yieldfrom
和协程的cr_await
。agen.ag_frame
、agen.ag_running
和agen.ag_code
:定义方式与标准生成器的类似属性相同。
StopIteration
和StopAsyncIteration
不会从异步生成器中传播出去,并被替换为RuntimeError
。
实现细节
异步生成器对象(PyAsyncGenObject
)与PyGenObject
共享结构布局。除此之外,参考实现引入了三个新对象
PyAsyncGenASend
:实现__anext__
和asend()
方法的可等待对象。PyAsyncGenAThrow
:实现athrow()
和aclose()
方法的可等待对象。_PyAsyncGenWrappedValue
:异步生成器直接yield的每个对象都隐式地封装到此结构中。这就是生成器实现如何将使用常规迭代协议yield的对象与使用异步迭代协议yield的对象区分开来。
PyAsyncGenASend
和PyAsyncGenAThrow
是可等待对象(它们具有返回self
的__await__
方法)并且是类似协程的对象(实现了__iter__
、__next__
、send()
和throw()
方法)。从本质上讲,它们控制着异步生成器的迭代方式。

PyAsyncGenASend 和 PyAsyncGenAThrow
PyAsyncGenASend
是一个类似协程的对象,它驱动__anext__
和asend()
方法并实现异步迭代协议。
agen.asend(val)
和agen.__anext__()
返回PyAsyncGenASend
的实例(它们持有对父agen
对象的引用)。
数据流定义如下
- 当第一次调用
PyAsyncGenASend.send(val)
时,val
会被推送到父agen
对象(使用PyGenObject
现有的设施)。对
PyAsyncGenASend
对象的后续迭代会将None
推送到agen
。当yield一个
_PyAsyncGenWrappedValue
对象时,它会被解包,并且会抛出一个带有解包值的StopIteration
异常。 - 当第一次调用
PyAsyncGenASend.throw(*exc)
时,*exc
会被抛到父agen
对象中。对
PyAsyncGenASend
对象的后续迭代会将None
推送到agen
。当yield一个
_PyAsyncGenWrappedValue
对象时,它会被解包,并且会抛出一个带有解包值的StopIteration
异常。 - 异步生成器中的
return
语句会抛出StopAsyncIteration
异常,该异常会通过PyAsyncGenASend.send()
和PyAsyncGenASend.throw()
方法传播。
PyAsyncGenAThrow
与PyAsyncGenASend
非常相似。唯一的区别是,当第一次调用PyAsyncGenAThrow.send()
时,它会将异常抛到父agen
对象中(而不是将值推送到其中)。
新的标准库函数和类型
types.AsyncGeneratorType
– 异步生成器对象的类型。sys.set_asyncgen_hooks()
和sys.get_asyncgen_hooks()
方法,用于在事件循环中设置异步生成器的终结器和迭代拦截器。inspect.isasyncgen()
和inspect.isasyncgenfunction()
内省函数。- asyncio 事件循环的新方法:
loop.shutdown_asyncgens()
。 - 新的
collections.abc.AsyncGenerator
抽象基类。
向后兼容性
该提案完全向后兼容。
在 Python 3.5 中,如果在async def
函数内部使用yield
表达式,则会产生SyntaxError
,因此在 3.6 中引入异步生成器是安全的。
性能
常规生成器
常规生成器的性能没有下降。以下微基准测试在带有和不带有异步生成器的 CPython 上运行速度相同。
def gen():
i = 0
while i < 100000000:
yield i
i += 1
list(gen())
相较于异步迭代器的改进
以下微基准测试表明,异步生成器比用纯 Python 实现的异步迭代器快约2.3 倍。
N = 10 ** 7
async def agen():
for i in range(N):
yield i
class AIter:
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= N:
raise StopAsyncIteration
self.i += 1
return i
设计考量
aiter()
和 anext()
内置函数
最初,PEP 492 将__aiter__
定义为应该返回一个可等待对象的方法,从而产生一个异步迭代器。
但是,在 CPython 3.5.2 中,__aiter__
被重新定义为直接返回异步迭代器。为了避免破坏向后兼容性,决定 Python 3.6 将支持这两种方式:__aiter__
仍然可以返回一个可等待对象,同时发出DeprecationWarning
警告。
由于 Python 3.6 中__aiter__
的这种双重特性,我们无法添加aiter()
内置函数的同步实现。因此,建议等到 Python 3.7。
异步列表/字典/集合推导式
异步推导式的语法与异步生成器机制无关,应该在单独的 PEP 中进行考虑。
异步 yield from
虽然从理论上讲,可以为异步生成器实现yield from
支持,但这需要对生成器实现进行重大重新设计。
yield from
对于异步生成器来说也并不那么重要,因为不需要提供在协程之上实现另一个协程协议的机制。并且,要组合异步生成器,可以使用简单的async for
循环。
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
yield v
为什么 asend()
和 athrow()
方法是必要的
它们使得可以使用异步生成器实现类似于contextlib.contextmanager
的概念。例如,使用建议的设计,可以实现以下模式。
@async_context_manager
async def ctx():
await open()
try:
yield
finally:
await close()
async with ctx():
await ...
另一个原因是,可以使用__anext__
对象返回的对象将数据和异常推送到异步生成器中,但这很难正确地做到。添加显式的asend()
和athrow()
将为安全地实现这一点铺平道路。
在实现方面,asend()
是__anext__
的稍微更通用的版本,而athrow()
与aclose()
非常相似。因此,为异步生成器定义这些方法不会增加任何额外的复杂性。
示例
使用当前参考实现的工作示例(将以一秒的延迟打印 0 到 9 的数字)。
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def run():
async for i in ticker(1, 10):
print(i)
import asyncio
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()
接受
实现
参考文献
致谢
感谢 Guido van Rossum、Victor Stinner、Elvis Pranskevichus、Nathaniel Smith、Łukasz Langa、Andrew Svetlov 和许多其他人提供的反馈、代码审查和围绕此 PEP 的讨论。
版权
本文档已置于公共领域。
来源:https://github.com/python/peps/blob/main/peps/pep-0525.rst