Following system colour scheme - Python 增强提案 Selected dark colour scheme - Python 增强提案 Selected light colour scheme - Python 增强提案

Python 增强提案

PEP 525 – 异步生成器

作者:
Yury Selivanov <yury at edgedb.com>
讨论列表:
Python-Dev 列表
状态:
最终
类型:
标准跟踪
创建:
2016年7月28日
Python 版本:
3.6
历史记录:
2016年8月2日,2016年8月23日,2016年9月1日,2016年9月6日

目录

摘要

PEP 492 引入了对原生协程和 async/await 语法的支持,并将其应用于 Python 3.5。本文提议扩展 Python 的异步功能,添加对异步生成器的支持。

基本原理和目标

常规生成器(在 PEP 255 中引入)提供了一种编写复杂数据生产者并使其表现得像迭代器的优雅方式。

然而,目前还没有等效的概念用于异步迭代协议async for)。这使得编写异步数据生产者变得不必要地复杂,因为必须定义一个实现 __aiter____anext__ 的类,才能在 async for 语句中使用它。

从本质上讲,PEP 255 的目标和基本原理应用于异步执行案例,也适用于此提案。

性能是本提案的另一个亮点:在我们对参考实现的测试中,异步生成器比等效的异步迭代器实现快 2 倍

为了说明代码质量的提升,请考虑以下打印数字并在迭代时延迟给定时间的类

class Ticker:
    """Yield numbers from 0 to `to` every `delay` seconds."""

    def __init__(self, delay, to):
        self.delay = delay
        self.i = 0
        self.to = to

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.to:
            raise StopAsyncIteration
        self.i += 1
        if i:
            await asyncio.sleep(self.delay)
        return i

同样的功能可以用更简单的异步生成器来实现

async def ticker(delay, to):
    """Yield numbers from 0 to `to` every `delay` seconds."""
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

规范

本提案引入了 Python 中异步生成器的概念。

本规范假设读者了解 Python 中生成器和协程的实现(PEP 342PEP 380PEP 492)。

异步生成器

Python 中的生成器是指包含一个或多个 yield 表达式的任何函数

def func():            # a function
    return

def genfunc():         # a generator function
    yield

我们建议使用相同的方法来定义异步生成器

async def coro():      # a coroutine function
    await smth()

async def asyncgen():  # an asynchronous generator function
    await smth()
    yield 42

调用异步生成器函数的结果是一个异步生成器对象,它实现了 PEP 492 中定义的异步迭代协议。

在异步生成器中使用非空的 return 语句会导致 SyntaxError

异步迭代协议的支持

该协议要求实现两种特殊方法

  1. 一个 __aiter__ 方法,返回一个异步迭代器
  2. 一个 __anext__ 方法,返回一个可等待对象,该对象使用 StopIteration 异常来“yield”值,并使用 StopAsyncIteration 异常来表示迭代结束。

异步生成器定义了这两种方法。让我们手动迭代一个简单的异步生成器

async def genfunc():
    yield 1
    yield 2

gen = genfunc()

assert gen.__aiter__() is gen

assert await gen.__anext__() == 1
assert await gen.__anext__() == 2

await gen.__anext__()  # This line will raise StopAsyncIteration.

终结

PEP 492 要求使用事件循环或调度程序来运行协程。由于异步生成器旨在从协程中使用,因此它们也需要事件循环来运行和终结它们。

异步生成器可以包含 try..finally 块,以及 async with。重要的是要保证,即使在部分迭代后,然后被垃圾回收,生成器也能被安全地终结。例如

async def square_series(con, to):
    async with con.transaction():
        cursor = con.cursor(
            'SELECT generate_series(0, $1) AS i', to)
        async for row in cursor:
            yield row['i'] ** 2

async for i in square_series(con, 1000):
    if i == 100:
        break

上面的代码定义了一个异步生成器,它使用 async with 在事务中迭代数据库游标。然后,该生成器使用 async for 进行迭代,并在某个时刻中断迭代。

然后 square_series() 生成器将被垃圾回收,如果没有异步关闭生成器的机制,Python 解释器将无法执行任何操作。

为了解决这个问题,我们建议执行以下操作

  1. 在异步生成器上实现一个 aclose 方法,该方法返回一个特殊的可等待对象。当被等待时,它会将 GeneratorExit 抛入挂起的生成器,并对其进行迭代,直到发生 GeneratorExitStopAsyncIteration

    这与 close() 方法对常规 Python 生成器执行的操作非常相似,只是需要事件循环来执行 aclose()

  2. 当异步生成器在其 finally 块中执行 yield 表达式时,抛出 RuntimeError(尽管可以使用 await)。
    async def gen():
        try:
            yield
        finally:
            await asyncio.sleep(1)   # Can use 'await'.
    
            yield                    # Cannot use 'yield',
                                     # this line will trigger a
                                     # RuntimeError.
    
  3. sys 模块添加两个新方法:set_asyncgen_hooks()get_asyncgen_hooks()

sys.set_asyncgen_hooks() 背后的理念是允许事件循环拦截异步生成器的迭代和终结,这样最终用户就不需要关心终结问题,一切都能正常工作。

sys.set_asyncgen_hooks() 接受两个参数

  • firstiter:一个可调用对象,当异步生成器第一次迭代时将被调用。
  • finalizer:一个可调用对象,当异步生成器即将被 GC 时将被调用。

当异步生成器第一次迭代时,它会存储对当前终结器的引用。

当异步生成器即将被垃圾回收时,它会调用其缓存的终结器。假设终结器会使用迭代开始时活动的循环调度 aclose() 调用。

例如,以下是 asyncio 如何修改以允许安全终结异步生成器

# asyncio/base_events.py

class BaseEventLoop:

    def run_forever(self):
        ...
        old_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen)
        try:
            ...
        finally:
            sys.set_asyncgen_hooks(*old_hooks)
            ...

    def _finalize_asyncgen(self, gen):
        self.create_task(gen.aclose())

第二个参数 firstiter 允许事件循环维护一个在其控制下实例化的异步生成器的弱集。这使得可以实现“关闭”机制,以安全地终结所有打开的生成器并关闭事件循环。

sys.set_asyncgen_hooks() 是线程特定的,因此多个在并行线程中运行的事件循环可以安全地使用它。

sys.get_asyncgen_hooks() 返回一个类似于 namedtuple 的结构,其中包含 firstiterfinalizer 字段。

asyncio

asyncio 事件循环将使用 sys.set_asyncgen_hooks() API 维护所有已调度的异步生成器的弱集,并在生成器需要被 GC 时调度其 aclose() 协程方法。

为了确保 asyncio 程序能够可靠地终结所有已调度的异步生成器,我们建议添加一个新的事件循环协程方法 loop.shutdown_asyncgens()。该方法将调度所有当前打开的异步生成器,使用 aclose() 调用进行关闭。

调用 loop.shutdown_asyncgens() 方法后,每当新的异步生成器第一次迭代时,事件循环都会发出警告。这样做的目的是,在请求所有异步生成器关闭后,程序不应该执行迭代新异步生成器的代码。

关于如何使用 shutdown_asyncgens 协程的示例

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

异步生成器对象

该对象是根据标准 Python 生成器对象建模的。从本质上讲,异步生成器的行为旨在复制同步生成器的行为,唯一的区别在于 API 是异步的。

定义了以下方法和属性

  1. agen.__aiter__():返回 agen
  2. agen.__anext__():返回一个可等待对象,当被等待时执行一次异步生成器迭代。
  3. agen.asend(val):返回一个可等待对象,该对象将 val 对象推入 agen 生成器。当 agen 尚未被迭代时,val 必须为 None

    示例

    async def gen():
        await asyncio.sleep(0.1)
        v = yield 42
        print(v)
        await asyncio.sleep(0.2)
    
    g = gen()
    
    await g.asend(None)      # Will return 42 after sleeping
                             # for 0.1 seconds.
    
    await g.asend('hello')   # Will print 'hello' and
                             # raise StopAsyncIteration
                             # (after sleeping for 0.2 seconds.)
    
  4. agen.athrow(typ, [val, [tb]]):返回一个可等待对象,该对象将异常抛入 agen 生成器。

    示例

    async def gen():
        try:
            await asyncio.sleep(0.1)
            yield 'hello'
        except ZeroDivisionError:
            await asyncio.sleep(0.2)
            yield 'world'
    
    g = gen()
    v = await g.asend(None)
    print(v)                # Will print 'hello' after
                            # sleeping for 0.1 seconds.
    
    v = await g.athrow(ZeroDivisionError)
    print(v)                # Will print 'world' after
                            $ sleeping 0.2 seconds.
    
  5. agen.aclose():返回一个可等待对象awaitable),该对象会向生成器抛出一个GeneratorExit异常。该可等待对象可以返回一个yield的值,如果agen处理了异常,或者agen会被关闭并且异常会传播回调用方。
  6. agen.__name__agen.__qualname__:可读写名称和限定名称属性。
  7. agen.ag_awaitagen当前正在等待的对象,或者为None。这类似于生成器当前可用的gi_yieldfrom和协程的cr_await
  8. agen.ag_frameagen.ag_runningagen.ag_code:定义方式与标准生成器的类似属性相同。

StopIterationStopAsyncIteration不会从异步生成器中传播出去,并被替换为RuntimeError

实现细节

异步生成器对象(PyAsyncGenObject)与PyGenObject共享结构布局。除此之外,参考实现引入了三个新对象

  1. PyAsyncGenASend:实现__anext__asend()方法的可等待对象。
  2. PyAsyncGenAThrow:实现athrow()aclose()方法的可等待对象。
  3. _PyAsyncGenWrappedValue:异步生成器直接yield的每个对象都隐式地封装到此结构中。这就是生成器实现如何将使用常规迭代协议yield的对象与使用异步迭代协议yield的对象区分开来。

PyAsyncGenASendPyAsyncGenAThrow是可等待对象(它们具有返回self__await__方法)并且是类似协程的对象(实现了__iter____next__send()throw()方法)。从本质上讲,它们控制着异步生成器的迭代方式。

../_images/pep-0525-1.png

PyAsyncGenASend 和 PyAsyncGenAThrow

PyAsyncGenASend是一个类似协程的对象,它驱动__anext__asend()方法并实现异步迭代协议。

agen.asend(val)agen.__anext__()返回PyAsyncGenASend的实例(它们持有对父agen对象的引用)。

数据流定义如下

  1. 当第一次调用PyAsyncGenASend.send(val)时,val会被推送到父agen对象(使用PyGenObject现有的设施)。

    PyAsyncGenASend对象的后续迭代会将None推送到agen

    当yield一个_PyAsyncGenWrappedValue对象时,它会被解包,并且会抛出一个带有解包值的StopIteration异常。

  2. 当第一次调用PyAsyncGenASend.throw(*exc)时,*exc会被抛到父agen对象中。

    PyAsyncGenASend对象的后续迭代会将None推送到agen

    当yield一个_PyAsyncGenWrappedValue对象时,它会被解包,并且会抛出一个带有解包值的StopIteration异常。

  3. 异步生成器中的return语句会抛出StopAsyncIteration异常,该异常会通过PyAsyncGenASend.send()PyAsyncGenASend.throw()方法传播。

PyAsyncGenAThrowPyAsyncGenASend非常相似。唯一的区别是,当第一次调用PyAsyncGenAThrow.send()时,它会将异常抛到父agen对象中(而不是将值推送到其中)。

新的标准库函数和类型

  1. types.AsyncGeneratorType – 异步生成器对象的类型。
  2. sys.set_asyncgen_hooks()sys.get_asyncgen_hooks()方法,用于在事件循环中设置异步生成器的终结器和迭代拦截器。
  3. inspect.isasyncgen()inspect.isasyncgenfunction()内省函数。
  4. asyncio 事件循环的新方法:loop.shutdown_asyncgens()
  5. 新的collections.abc.AsyncGenerator抽象基类。

向后兼容性

该提案完全向后兼容。

在 Python 3.5 中,如果在async def函数内部使用yield表达式,则会产生SyntaxError,因此在 3.6 中引入异步生成器是安全的。

性能

常规生成器

常规生成器的性能没有下降。以下微基准测试在带有和不带有异步生成器的 CPython 上运行速度相同。

def gen():
    i = 0
    while i < 100000000:
        yield i
        i += 1

list(gen())

相较于异步迭代器的改进

以下微基准测试表明,异步生成器比用纯 Python 实现的异步迭代器快约2.3 倍

N = 10 ** 7

async def agen():
    for i in range(N):
        yield i

class AIter:
    def __init__(self):
        self.i = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= N:
            raise StopAsyncIteration
        self.i += 1
        return i

设计考量

aiter()anext() 内置函数

最初,PEP 492__aiter__定义为应该返回一个可等待对象的方法,从而产生一个异步迭代器。

但是,在 CPython 3.5.2 中,__aiter__被重新定义为直接返回异步迭代器。为了避免破坏向后兼容性,决定 Python 3.6 将支持这两种方式:__aiter__仍然可以返回一个可等待对象,同时发出DeprecationWarning警告。

由于 Python 3.6 中__aiter__的这种双重特性,我们无法添加aiter()内置函数的同步实现。因此,建议等到 Python 3.7。

异步列表/字典/集合推导式

异步推导式的语法与异步生成器机制无关,应该在单独的 PEP 中进行考虑。

异步 yield from

虽然从理论上讲,可以为异步生成器实现yield from支持,但这需要对生成器实现进行重大重新设计。

yield from对于异步生成器来说也并不那么重要,因为不需要提供在协程之上实现另一个协程协议的机制。并且,要组合异步生成器,可以使用简单的async for循环。

async def g1():
    yield 1
    yield 2

async def g2():
    async for v in g1():
        yield v

为什么 asend()athrow() 方法是必要的

它们使得可以使用异步生成器实现类似于contextlib.contextmanager的概念。例如,使用建议的设计,可以实现以下模式。

@async_context_manager
async def ctx():
    await open()
    try:
        yield
    finally:
        await close()

async with ctx():
    await ...

另一个原因是,可以使用__anext__对象返回的对象将数据和异常推送到异步生成器中,但这很难正确地做到。添加显式的asend()athrow()将为安全地实现这一点铺平道路。

在实现方面,asend()__anext__的稍微更通用的版本,而athrow()aclose()非常相似。因此,为异步生成器定义这些方法不会增加任何额外的复杂性。

示例

使用当前参考实现的工作示例(将以一秒的延迟打印 0 到 9 的数字)。

async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)


async def run():
    async for i in ticker(1, 10):
        print(i)


import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

接受

PEP 525 于 2016 年 9 月 6 日被 Guido 接受 [2]

实现

该实现在问题 28003 中跟踪 [3]。参考实现的 git 存储库可在 [1] 中获取。

参考文献

致谢

感谢 Guido van Rossum、Victor Stinner、Elvis Pranskevichus、Nathaniel Smith、Łukasz Langa、Andrew Svetlov 和许多其他人提供的反馈、代码审查和围绕此 PEP 的讨论。


来源:https://github.com/python/peps/blob/main/peps/pep-0525.rst

上次修改:2023-09-09 17:39:29 GMT