PEP 525 – 异步生成器
- 作者:
- Yury Selivanov <yury at edgedb.com>
- 讨论至:
- Python-Dev 列表
- 状态:
- 最终版
- 类型:
- 标准跟踪
- 创建日期:
- 2016年7月28日
- Python 版本:
- 3.6
- 发布历史:
- 2016年8月2日,2016年8月23日,2016年9月1日,2016年9月6日
摘要
PEP 492 在 Python 3.5 中引入了对原生协程和 async/await 语法的支持。本文建议通过增加对*异步生成器*的支持来扩展 Python 的异步功能。
基本原理和目标
普通生成器(在 PEP 255 中引入)提供了一种优雅的方式来编写复杂的*数据生产者*,并使其表现得像迭代器。
然而,目前对于*异步迭代协议*(async for)没有等价的概念。这使得编写异步数据生产者变得不必要的复杂,因为必须定义一个实现 __aiter__ 和 __anext__ 的类才能在 async for 语句中使用它。
本质上,PEP 255 的目标和基本原理,应用于异步执行的情况,也同样适用于本提案。
性能是本提案的另一个重点:在我们的参考实现测试中,异步生成器比作为异步迭代器实现的等效代码**快2倍**。
作为代码质量改进的示例,请考虑以下类,它在迭代时以给定延迟打印数字
class Ticker:
"""Yield numbers from 0 to `to` every `delay` seconds."""
def __init__(self, delay, to):
self.delay = delay
self.i = 0
self.to = to
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= self.to:
raise StopAsyncIteration
self.i += 1
if i:
await asyncio.sleep(self.delay)
return i
同样的功能可以用更简单的异步生成器实现
async def ticker(delay, to):
"""Yield numbers from 0 to `to` every `delay` seconds."""
for i in range(to):
yield i
await asyncio.sleep(delay)
规范
本提案将*异步生成器*的概念引入 Python。
本规范假定读者了解 Python 中生成器和协程的实现知识(PEP 342、PEP 380 和 PEP 492)。
异步生成器
Python *生成器*是任何包含一个或多个 yield 表达式的函数
def func(): # a function
return
def genfunc(): # a generator function
yield
我们建议使用相同的方法来定义*异步生成器*
async def coro(): # a coroutine function
await smth()
async def asyncgen(): # an asynchronous generator function
await smth()
yield 42
调用*异步生成器函数*的结果是一个*异步生成器对象*,它实现了 PEP 492 中定义的异步迭代协议。
在异步生成器中包含非空的 return 语句是 SyntaxError。
支持异步迭代协议
该协议要求实现两个特殊方法
- 一个
__aiter__方法返回一个*异步迭代器*。 - 一个
__anext__方法返回一个*可等待*对象,它使用StopIteration异常来“生成”值,并使用StopAsyncIteration异常来表示迭代结束。
异步生成器定义了这两个方法。让我们手动迭代一个简单的异步生成器
async def genfunc():
yield 1
yield 2
gen = genfunc()
assert gen.__aiter__() is gen
assert await gen.__anext__() == 1
assert await gen.__anext__() == 2
await gen.__anext__() # This line will raise StopAsyncIteration.
终结
PEP 492 要求事件循环或调度器来运行协程。由于异步生成器旨在从协程中使用,因此它们也需要事件循环来运行和终结它们。
异步生成器可以有 try..finally 块,以及 async with。重要的是要保证,即使是部分迭代后被垃圾回收,生成器也能安全地终结。例如
async def square_series(con, to):
async with con.transaction():
cursor = con.cursor(
'SELECT generate_series(0, $1) AS i', to)
async for row in cursor:
yield row['i'] ** 2
async for i in square_series(con, 1000):
if i == 100:
break
上述代码定义了一个异步生成器,它使用 async with 在事务中迭代数据库游标。然后使用 async for 迭代该生成器,它会在某个点中断迭代。
然后 square_series() 生成器将被垃圾回收,如果没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。
为了解决这个问题,我们建议采取以下措施
- 在异步生成器上实现一个
aclose方法,该方法返回一个特殊的*可等待*对象。当被等待时,它会将GeneratorExit抛入挂起的生成器中,并迭代它直到出现GeneratorExit或StopAsyncIteration。这与
close()方法对普通 Python 生成器所做的事情非常相似,不同之处在于执行aclose()需要一个事件循环。 - 当异步生成器在其
finally块中执行yield表达式时(尽管使用await是可以的),抛出RuntimeErrorasync def gen(): try: yield finally: await asyncio.sleep(1) # Can use 'await'. yield # Cannot use 'yield', # this line will trigger a # RuntimeError.
- 向
sys模块添加两个新方法:set_asyncgen_hooks()和get_asyncgen_hooks()。
sys.set_asyncgen_hooks() 的目的是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题,一切都能正常工作。
sys.set_asyncgen_hooks() 接受两个参数
firstiter:一个可调用对象,在异步生成器首次迭代时被调用。finalizer:一个可调用对象,在异步生成器即将被垃圾回收时被调用。
当异步生成器首次迭代时,它会存储对当前*终结器*的引用。
当异步生成器即将被垃圾回收时,它会调用其缓存的*终结器*。假设终结器将使用在迭代开始时处于活动状态的循环调度 aclose() 调用。
例如,以下是 asyncio 如何修改以允许异步生成器安全终结
# asyncio/base_events.py
class BaseEventLoop:
def run_forever(self):
...
old_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
try:
...
finally:
sys.set_asyncgen_hooks(*old_hooks)
...
def _finalize_asyncgen(self, gen):
self.create_task(gen.aclose())
第二个参数 firstiter 允许事件循环维护在其控制下实例化的异步生成器的弱集合。这使得实现“关闭”机制成为可能,以安全地终结所有打开的生成器并关闭事件循环。
sys.set_asyncgen_hooks() 是线程特有的,因此在并行线程中运行的多个事件循环可以安全地使用它。
sys.get_asyncgen_hooks() 返回一个类似命名元组的结构,包含 firstiter 和 finalizer 字段。
asyncio
asyncio 事件循环将使用 sys.set_asyncgen_hooks() API 来维护所有调度异步生成器的弱集合,并在生成器即将被垃圾回收时调度它们的 aclose() 协程方法。
为确保 asyncio 程序能够可靠地终结所有调度异步生成器,我们建议为事件循环添加一个新的协程方法 loop.shutdown_asyncgens()。该方法将调度所有当前打开的异步生成器以通过 aclose() 调用关闭。
调用 loop.shutdown_asyncgens() 方法后,每当首次迭代新的异步生成器时,事件循环将发出警告。其思想是,在请求所有异步生成器关闭后,程序不应执行迭代新的异步生成器的代码。
shutdown_asyncgens 协程的用法示例
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
异步生成器对象
该对象是按照标准 Python 生成器对象建模的。本质上,异步生成器的行为旨在复制同步生成器的行为,唯一的区别是 API 是异步的。
定义了以下方法和属性
agen.__aiter__():返回agen。agen.__anext__():返回一个*可等待*对象,在等待时执行一次异步生成器迭代。agen.asend(val):返回一个*可等待*对象,将val对象推入agen生成器中。当agen尚未迭代时,val必须为None。示例
async def gen(): await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) g = gen() await g.asend(None) # Will return 42 after sleeping # for 0.1 seconds. await g.asend('hello') # Will print 'hello' and # raise StopAsyncIteration # (after sleeping for 0.2 seconds.)
agen.athrow(typ, [val, [tb]]):返回一个*可等待*对象,它将异常抛入agen生成器中。示例
async def gen(): try: await asyncio.sleep(0.1) yield 'hello' except ZeroDivisionError: await asyncio.sleep(0.2) yield 'world' g = gen() v = await g.asend(None) print(v) # Will print 'hello' after # sleeping for 0.1 seconds. v = await g.athrow(ZeroDivisionError) print(v) # Will print 'world' after # sleeping 0.2 seconds.
agen.aclose():返回一个*可等待*对象,它将GeneratorExit异常抛入生成器中。如果agen处理了异常,则*可等待*对象可以返回一个生成的值,否则agen将被关闭,并且异常将传播回调用者。agen.__name__和agen.__qualname__:可读写名称和限定名称属性。agen.ag_await:agen当前正在*等待*的对象,或者None。这类似于当前可用于生成器的gi_yieldfrom和用于协程的cr_await。agen.ag_frame、agen.ag_running和agen.ag_code:以与标准生成器类似属性相同的方式定义。
StopIteration 和 StopAsyncIteration 不会从异步生成器中传播出去,它们会被 RuntimeError 替换。
实施细节
异步生成器对象(PyAsyncGenObject)与 PyGenObject 共享结构布局。此外,参考实现引入了三个新对象
PyAsyncGenASend:实现__anext__和asend()方法的可等待对象。PyAsyncGenAThrow:实现athrow()和aclose()方法的可等待对象。_PyAsyncGenWrappedValue:从异步生成器直接生成的每个对象都隐式地包装到此结构中。这就是生成器实现如何将使用常规迭代协议生成的对象与使用异步迭代协议生成的对象分开的方式。
PyAsyncGenASend 和 PyAsyncGenAThrow 是可等待对象(它们有 __await__ 方法返回 self),并且是类协程对象(实现 __iter__、__next__、send() 和 throw() 方法)。本质上,它们控制异步生成器的迭代方式
PyAsyncGenASend 和 PyAsyncGenAThrow
PyAsyncGenASend 是一个类协程对象,驱动 __anext__ 和 asend() 方法,并实现异步迭代协议。
agen.asend(val) 和 agen.__anext__() 返回 PyAsyncGenASend 的实例(它们持有对父 agen 对象的引用)。
数据流定义如下
- 当首次调用
PyAsyncGenASend.send(val)时,val被推入父agen对象(使用PyGenObject的现有功能)。随后对
PyAsyncGenASend对象的迭代将None推入agen。当生成
_PyAsyncGenWrappedValue对象时,它会被解包,并以未包装的值作为参数抛出StopIteration异常。 - 当首次调用
PyAsyncGenASend.throw(*exc)时,*exc被抛入父agen对象中。随后对
PyAsyncGenASend对象的迭代将None推入agen。当生成
_PyAsyncGenWrappedValue对象时,它会被解包,并以未包装的值作为参数抛出StopIteration异常。 - 异步生成器中的
return语句会引发StopAsyncIteration异常,该异常通过PyAsyncGenASend.send()和PyAsyncGenASend.throw()方法传播。
PyAsyncGenAThrow 与 PyAsyncGenASend 非常相似。唯一的区别是 PyAsyncGenAThrow.send() 首次调用时,会将异常抛入父 agen 对象中(而不是将值推入其中)。
新的标准库函数和类型
types.AsyncGeneratorType– 异步生成器对象的类型。sys.set_asyncgen_hooks()和sys.get_asyncgen_hooks()方法用于在事件循环中设置异步生成器终结器和迭代拦截器。inspect.isasyncgen()和inspect.isasyncgenfunction()内省函数。- asyncio 事件循环的新方法:
loop.shutdown_asyncgens()。 - 新的
collections.abc.AsyncGenerator抽象基类。
向后兼容性
本提案完全向后兼容。
在 Python 3.5 中,在 async def 函数内部定义 yield 表达式是 SyntaxError,因此在 3.6 中引入异步生成器是安全的。
性能
普通生成器
普通生成器没有性能下降。以下微基准测试在 CPython 上有和没有异步生成器的情况下以相同的速度运行
def gen():
i = 0
while i < 100000000:
yield i
i += 1
list(gen())
对异步迭代器的改进
以下微基准测试显示,异步生成器比纯 Python 实现的异步迭代器**快约2.3倍**
N = 10 ** 7
async def agen():
for i in range(N):
yield i
class AIter:
def __init__(self):
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
if i >= N:
raise StopAsyncIteration
self.i += 1
return i
设计考虑
aiter() 和 anext() 内置函数
最初,PEP 492 将 __aiter__ 定义为一个应该返回*可等待*对象的方法,从而产生一个异步迭代器。
然而,在 CPython 3.5.2 中,__aiter__ 被重新定义为直接返回异步迭代器。为了避免破坏向后兼容性,决定 Python 3.6 将支持两种方式:__aiter__ 仍然可以返回一个*可等待*对象,但会发出 DeprecationWarning。
由于 __aiter__ 在 Python 3.6 中具有这种双重性质,我们无法添加 aiter() 内置函数的同步实现。因此,建议等到 Python 3.7。
异步列表/字典/集合推导式
异步推导式的语法与异步生成器机制无关,应在单独的 PEP 中考虑。
异步 yield from
虽然理论上可以为异步生成器实现 yield from 支持,但这需要对生成器实现进行重大重新设计。
yield from 对于异步生成器来说也相对不那么关键,因为不需要在协程之上提供实现另一个协程协议的机制。为了组合异步生成器,可以使用简单的 async for 循环
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
yield v
为什么需要 asend() 和 athrow() 方法
它们使得使用异步生成器实现类似于 contextlib.contextmanager 的概念成为可能。例如,通过提议的设计,可以实现以下模式
@async_context_manager
async def ctx():
await open()
try:
yield
finally:
await close()
async with ctx():
await ...
另一个原因是,可以使用 __anext__ 对象返回的对象将数据推送并抛出异常到异步生成器中,但要正确地做到这一点很困难。添加显式的 asend() 和 athrow() 将为实现此目标铺平一条安全的道路。
在实现方面,asend() 是 __anext__ 的一个稍微更通用的版本,而 athrow() 与 aclose() 非常相似。因此,为异步生成器定义这些方法不会增加任何额外的复杂性。
示例
当前参考实现的一个工作示例(将以一秒延迟打印从0到9的数字)
async def ticker(delay, to):
for i in range(to):
yield i
await asyncio.sleep(delay)
async def run():
async for i in ticker(1, 10):
print(i)
import asyncio
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
finally:
loop.close()
接受
实施
参考资料
致谢
我感谢 Guido van Rossum、Victor Stinner、Elvis Pranskevichus、Nathaniel Smith、Łukasz Langa、Andrew Svetlov 和许多其他人对本 PEP 的反馈、代码审查和讨论。
版权
本文档已置于公共领域。
来源:https://github.com/python/peps/blob/main/peps/pep-0525.rst