PEP 789 – 通过限制异步生成器中的 yield 来防止任务取消错误
- 作者:
- Zac Hatfield-Dodds <zac at zhd.dev>, Nathaniel J. Smith <njs at pobox.com>
- PEP 代理人:
- 讨论至:
- Discourse 帖子
- 状态:
- 草案
- 类型:
- 标准跟踪
- 创建日期:
- 2024年5月14日
- Python 版本:
- 3.14
摘要
结构化并发在 Python 中越来越流行。诸如 asyncio.TaskGroup 和 asyncio.timeout 上下文管理器之类的接口支持组合推理,并允许开发人员清晰地界定并发任务的生命周期。然而,使用 yield 在此类上下文中暂停帧会导致任务被错误取消、超时被忽略以及异常处理不当的情况。更根本的是,在 TaskGroup 中暂停帧违反了结构化并发的设计原则,即子任务封装在其父帧中。
为了解决这些问题,本 PEP 提出了一个新的 sys.prevent_yields() 上下文管理器。当在语法上处于此上下文中时,尝试 yield 将引发 RuntimeError,从而阻止任务 yield。此外,将提供一种机制,使得像 @contextmanager 这样的装饰器能够允许在被装饰的函数内部进行 yield。sys.prevent_yields() 将被 asyncio 和下游库用于实现任务组、超时和取消;contextlib 等将使用相关机制将生成器转换为允许安全 yield 的上下文管理器。
背景
结构化并发在 Python 中越来越流行,其形式是更新的 asyncio 接口以及 Trio 和 anyio 等第三方库。只要用户从不在取消作用域内编写暂停帧的 yield,这些接口就支持组合推理。
取消作用域是一个上下文管理器,它可以…取消…在该上下文(…作用域)内发生的任何工作。在 asyncio 中,这隐含在 with asyncio.timeout(): 或 async with asyncio.TaskGroup() as tg: 的设计中,它们分别在指定持续时间后取消包含的工作,或在一个子任务引发异常时取消兄弟任务。取消作用域的核心功能是同步的,但面向用户的上下文管理器可以是同步或异步的。[1] [2]
这种结构化方法运作良好,除非你遇到一个特定的尖锐边缘:在取消作用域内使用 yield 打破嵌套结构。这对结构化控制流的影响与添加几个跨函数 goto 语句的效果大致相同,而且后果确实非常严重。
- 任务可能被错误取消,无论是由于超时、兄弟任务中的错误,还是取消其他任务的显式请求
- 异常,包括
CancelledError,可能会传递给错误的任务 - 异常可能会完全丢失,被丢弃而不是添加到
ExceptionGroup中
问题陈述
这里是根本问题:yield 会暂停调用帧。只有在叶子帧中 yield 才合理——也就是说,如果你的调用栈是 A -> B -> C,那么你可以暂停 C,但你不能在 C 仍在运行时暂停 B。
但是,TaskGroup 是一种“并发调用”原语,其中单个帧可以有多个并发运行的子帧。这意味着,如果我们允许人们混合使用 yield 和 TaskGroup,那么我们最终会遇到这种情况,即 B 被暂停而 C 正在活动运行。这是没有意义的,并会导致严重的实际问题(例如,如果 C 引发异常而 A 已经返回,我们无法传播它)。
这是生成器控制流和结构化并发控制流之间根本不兼容的问题,我们无法通过调整 API 来解决。唯一的解决方案似乎是禁止在 TaskGroup 内部进行 yield。
尽管超时不会让子任务继续运行,但密切的类比和相关问题使我们得出结论,yield 应该在所有取消作用域内被禁止,而不仅仅是 TaskGroups。有关讨论,请参阅 我们不能直接将异常传递到正确的位置吗?。
激励示例
让我们考虑三个示例,看看这在实践中可能是什么样子。
将超时泄漏到外部作用域
假设我们想要迭代一个异步迭代器,但每个元素最多等待 max_time 秒。我们很自然地会将实现逻辑封装在异步生成器中,这样调用站点就可以继续使用简单的 async for 循环
async def iter_with_timeout(ait, max_time):
try:
while True:
with timeout(max_time):
yield await anext(ait)
except StopAsyncIteration:
return
async def fn():
async for elem in iter_with_timeout(ait, max_time=1.0):
await do_something_with(elem)
不幸的是,这个版本有一个 bug:超时可能在生成器 yield 之后但在恢复之前过期!在这种情况下,我们会在外部任务中看到一个 CancelledError 被引发,而 with timeout(max_time): 语句无法捕获它。
修复方法很简单:在超时上下文中获取下一个元素,然后在该上下文之外 yield。
async def correct_iter_with_timeout(ait, max_time):
try:
while True:
with timeout(max_time):
tmp = await anext(ait)
yield tmp
except StopAsyncIteration:
return
泄漏后台任务(破坏取消和异常处理)
超时并不是唯一封装取消作用域的接口——如果你需要一些后台工作任务,你不能简单地在 yield 之前关闭 TaskGroup。
举个例子,让我们看看一个扇入生成器,我们将用它来合并来自多个“传感器”的数据流。我们还将为模拟传感器设置一个小缓冲区,以便在控制流位于 combined_iterators 生成器之外时,在后台任务中引发错误。
import asyncio, itertools
async def mock_sensor(name):
for n in itertools.count():
await asyncio.sleep(0.1)
if n == 1 and name == "b": # 'presence detection'
yield "PRESENT"
elif n == 3 and name == "a": # inject a simple bug
print("oops, raising RuntimeError")
raise RuntimeError
else:
yield f"{name}-{n}" # non-presence sensor data
async def move_elements_to_queue(ait, queue):
async for obj in ait:
await queue.put(obj)
async def combined_iterators(*aits):
"""Combine async iterators by starting N tasks, each of
which move elements from one iterable to a shared queue."""
q = asyncio.Queue(maxsize=2)
async with asyncio.TaskGroup() as tg:
for ait in aits:
tg.create_task(move_elements_to_queue(ait, q))
while True:
yield await q.get()
async def turn_on_lights_when_someone_gets_home():
combined = combined_iterators(mock_sensor("a"), mock_sensor("b"))
async for event in combined:
print(event)
if event == "PRESENT":
break
print("main task sleeping for a bit")
await asyncio.sleep(1) # do some other operation
asyncio.run(turn_on_lights_when_someone_gets_home())
当我们运行这段代码时,我们看到了预期的观测序列,然后是“检测”,然后在主任务休眠时,我们在后台触发了 RuntimeError。但是……我们实际上没有观察到 RuntimeError,甚至不是作为另一个异常的 __context__!
>> python3.11 demo.py
a-0
b-0
a-1
PRESENT
main task sleeping for a bit
oops, raising RuntimeError
Traceback (most recent call last):
File "demo.py", line 39, in <module>
asyncio.run(turn_on_lights_when_someone_gets_home())
...
File "demo.py", line 37, in turn_on_lights_when_someone_gets_home
await asyncio.sleep(1) # do some other operation
File ".../python3.11/asyncio/tasks.py", line 649, in sleep
return await future
asyncio.exceptions.CancelledError
这里,问题再次在于我们在取消作用域内进行了 yield;这次的作用域是 TaskGroup 在其中一个子任务引发异常时用于取消兄弟任务的作用域。然而,原本打算用于兄弟任务的 CancelledError 却被注入到**外部**任务中,因此我们没有机会创建并引发 ExceptionGroup(..., [RuntimeError()])。
要解决这个问题,我们需要将异步生成器转换为异步上下文管理器,它会 yield 一个异步可迭代对象——在这种情况下是一个包装队列的生成器;将来也许是队列本身
async def queue_as_aiterable(queue):
# async generators that don't `yield` inside a cancel scope are fine!
while True:
try:
yield await queue.get()
except asyncio.QueueShutDown:
return
@asynccontextmanager # yield-in-cancel-scope is OK in a context manager
async def combined_iterators(*aits):
q = asyncio.Queue(maxsize=2)
async with asyncio.TaskGroup() as tg:
for ait in aits:
tg.create_task(move_elements_to_queue(ait, q))
yield queue_as_aiterable(q)
async def turn_on_lights_when_someone_gets_home():
...
async with combined_iterators(...) as ait:
async for event in ait:
...
在用户定义的上下文管理器中
在取消作用域内 yield 是安全的,当且仅当你使用生成器来实现上下文管理器时[3]——在这种情况下,任何传播的异常都将被重定向到预期的任务。
我们还在 flake8-async 中实现了 ASYNC101 linter 规则,该规则会警告在已知取消作用域中进行 yield。用户教育是否足以避免这些问题?不幸的是,不能:用户定义的上下文管理器也可以封装取消作用域,并且识别或 lint 所有此类情况是不可行的。
这在实践中经常出现,因为“在此上下文期间运行一些后台任务”是结构化并发中非常常见的模式。我们在上面的 combined_iterators() 中看到了这一点;并且在websocket协议的多个实现中都看到了这个错误
async def get_messages(websocket_url):
# The websocket protocol requires background tasks to manage the socket heartbeat
async with open_websocket(websocket_url) as ws: # contains a TaskGroup!
while True:
yield await ws.get_message()
async with open_websocket(websocket_url) as ws:
async for message in get_messages(ws):
...
规范
为了防止这些问题,我们建议
- 一个新的上下文管理器,
with sys.prevent_yields(reason): ...,如果你在其中尝试 yield,它将引发 RuntimeError。[4] asyncio 和下游代码中的类似取消作用域的上下文管理器可以包装它,以防止在**它们的** with 块内部进行 yield。 - 一种机制,通过该机制,生成器到上下文管理器的装饰器可以允许一次调用中的 yield。我们还不确定这应该是什么样子;主要的候选方案是
- 一个代码对象属性,
fn.__code__.co_allow_yields = True,或者 - 某种调用标志,例如
fn.__invoke_with_yields__,以避免修改可能在装饰函数和未装饰函数之间共享的代码对象
- 一个代码对象属性,
实现 - 跟踪帧
新的 sys.prevent_yields 上下文管理器将需要解释器支持。对于每个帧,我们跟踪此上下文管理器的进入和退出。
我们对确切的表示形式没有特别的偏好;我们将以堆栈的形式讨论它(这将支持清晰的错误消息),但更紧凑的表示形式(例如整数对)也适用。
- 当进入一个新创建或恢复的帧时,初始化空的进入和退出堆栈。
- 当从一个帧返回时,将这些堆栈合并到父帧的堆栈中。
- 当 yield 时
- 如果
entries != [] and not frame.allow_yield_flag,则引发RuntimeError而不是 yield(本 PEP 提出的新行为) - 否则,将堆栈合并到父帧中,如同返回一样。
- 如果
因为这涉及到任务_内_的帧的 yield,而不是任务之间的切换,所以语法上的 yield 和 yield from 应该受到影响,但 await 表达式不应该受到影响。
我们可以通过为所有非生成器堆栈帧在每个线程中存储一个堆栈来减少开销。
工作示例
无 yield 示例
在此示例中,我们看到多轮堆栈合并,因为我们从 sys.prevent_yields 解开,通过用户定义的 ContextManager,回到原始帧。为简洁起见,未显示阻止 yield 的原因;它是“1 enter”状态的一部分。
如果没有 yield,我们不会引发任何错误,并且因为进入和退出的数量平衡,帧像往常一样返回,无需进一步跟踪。
尝试 yield 示例
在此示例中,Frame 尝试在 sys.prevent_yields 上下文内部进行 yield。这被解释器检测到,解释器会引发 RuntimeError 而不是暂停帧。
允许 yield 示例
在这个例子中,一个装饰器将 Frame 标记为允许 yield。这可能是 @contextlib.contextmanager 或相关的装饰器。
当帧被允许 yield 时,进入/退出堆栈在暂停之前合并到父帧的堆栈中。当帧恢复时,其堆栈为空。最后,当帧退出时,退出合并到父帧的堆栈中,从而重新平衡它。
这确保了父帧正确继承任何剩余的 sys.prevent_yields 状态,同时允许帧安全地暂停和恢复。
允许上下文管理器 yield
TODO: 本节为占位符,待决定关于 `@contextmanager` 在包装函数中重新启用 yield 的机制。
- 解释并展示
@asynccontextmanager如何设置标志的代码示例
请注意,@pytest.fixture 等第三方装饰器表明我们不能仅仅让解释器特殊处理 contextlib。
如果 sys.prevent_yields 被滥用时的行为
尽管不建议这样做,但有可能以不对应任何有效嵌套的顺序调用 sys.prevent_yields.__enter__ 和 .__exit__,或者以其他方式获取无效的帧状态。
sys.prevent_yields.__exit__ 检测无效状态有两种方式。首先,如果未阻止 yield,我们可以简单地引发异常而不改变状态。其次,如果堆栈顶部存在意外的入口,我们建议弹出该入口并引发异常——这确保了无序调用仍会清除堆栈,同时仍然明确表示存在问题。
(如果我们选择例如基于整数而不是基于堆栈的表示,则此类状态可能根本无法与正确嵌套区分开来,在这种情况下问题就不会出现)
预期用途
在标准库中,sys.prevent_yields 可以被 asyncio.TaskGroup、asyncio.timeout 和 asyncio.timeout_at 使用。在下游,我们期望在 trio.CancelScope、异步 fixture(在 pytest-trio、anyio 等中)以及其他地方使用它。
我们认为与异步正确性无关的用例,例如防止 decimal.localcontext 从生成器中泄漏,不在本 PEP 的讨论范围之内。
生成器到上下文管理器的支持将由 @contextlib.(async)contextmanager 使用,并且在必要时由 (Async)ExitStack 使用。
向后兼容性
添加 sys.prevent_yields 上下文管理器、对 @contextlib.(async)contextmanager 的更改以及相应的解释器支持都完全向后兼容。
阻止在 asyncio.TaskGroup、asycio.timeout 和 asyncio.timeout_at 内部进行 yield 将是对野外至少一些代码的破坏性更改,这些代码(尽管不安全且容易出现上述激励问题)可能足够频繁地工作以投入生产。
我们将寻求社区对标准库代码的适当弃用途径的反馈,包括任何弃用期的建议长度。作为初步建议,我们可以在 3.14 中仅在 asyncio 调试模式下暂停标准库上下文时发出 DeprecationWarning;然后过渡到在 3.15 中默认警告并在调试模式下错误;最后在 3.16 中出现硬性错误。
无论标准库如何使用,下游框架都将立即采用此功能。
这个 bug 有多普遍?
我们没有确凿的数据,但相信许多项目在野外受到影响。自从在同一周内工作中遇到一个中等和关键的 bug,归因于暂停取消作用域后,我们已成功使用静态分析。Zac 在 PyCon 上与三个人交谈,他们都识别出症状并得出结论,他们可能受到了影响。
TODO: 在生态系统项目(例如 aio-libs 包)上运行 ASYNC101 lint 规则,并了解其在广泛使用的 PyPI 包中的频率?这将有助于为标准库代码的破坏/弃用途径提供信息。
如何教授此内容
异步生成器很少教授给初级程序员。
大多数中高级 Python 程序员只会作为 TaskGroup、timeout 和 @contextmanager 的用户与本 PEP 交互。对于这个群体,我们预计清晰的异常消息和文档就足够了。
- 将在使用 asyncio 进行开发页面中添加一个新部分,简要说明异步生成器在“取消作用域”上下文内(即
TaskGroup或timeout上下文管理器)不允许yield。我们预计问题重述和部分动机部分将为这些文档提供基础。 - 对于每个包装取消作用域(因此现在是
sys.prevent_yields)的上下文管理器的文档,应包含一个标准句子,例如“如果在异步生成器中使用,[在此上下文管理器内部yield是一个错误]”,并附带指向上述解释的超链接。
对于 asyncio、Trio、curio 或其他实现取消作用域语义的框架维护者,我们将确保 sys.prevent_yields 的文档提供从本 PEP 的解决方案和实现部分提炼出的完整解释。我们预计会咨询大多数此类维护者,以获取他们对 PEP 草案的反馈。
被拒绝的替代方案
PEP 533,迭代器的确定性清理
PEP 533 提议向迭代器协议添加 __[a]iterclose__,本质上是在每个(异步)for 循环周围包装一个 with [a]closing(ait)。虽然这对于确保迭代器所持资源的及时和确定性清理(它旨在解决的问题)非常有用,但它并未完全解决促成本 PEP 的问题。
即使有了 PEP 533,错误的取消仍然会传递给错误的任务,并在迭代器关闭之前造成严重破坏。此外,它没有解决 TaskGroup 的基本结构化并发问题,即暂停拥有 TaskGroup 的帧与子任务完全封装在其父帧中的模型不兼容。
完全弃用异步生成器
在2024年语言峰会上,一些与会者建议_完全_弃用异步生成器。不幸的是,虽然实际中常见的用例都使用异步生成器,但 Trio 代码可能会使用标准生成器触发相同的问题
# We use Trio for this example, because while `asyncio.timeout()` is async,
# Trio's CancelScope type and timeout context managers are synchronous.
import trio
def abandon_each_iteration_after(max_seconds):
# This is of course broken, but I can imagine someone trying it...
while True:
with trio.move_on_after(max_seconds):
yield
@trio.run
async def main():
for _ in abandon_each_iteration_after(max_seconds=1):
await trio.sleep(3)
如果不是因为这个 bug,这段代码看起来会非常地道——但在大约一秒钟后,它没有进入下一次迭代,而是引发了
Traceback (most recent call last):
File "demo.py", line 10, in <module>
async def main():
File "trio/_core/_run.py", line 2297, in run
raise runner.main_task_outcome.error
File "demo.py", line 12, in main
await trio.sleep(3)
File "trio/_timeouts.py", line 87, in sleep
await sleep_until(trio.current_time() + seconds)
...
File "trio/_core/_run.py", line 1450, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
此外,还有一些与取消作用域无关的同步上下文管理器也存在相关问题,例如前面提到的 decimal.localcontext。虽然修复下面的示例不是本 PEP 的目标,但它表明 yield-within-with 问题并非异步生成器所独有
import decimal
def why_would_you_do_this():
with decimal.localcontext(decimal.Context(prec=1)):
yield
one = decimal.Decimal(1)
print(one / 3) # 0.3333333333333333333333333333
next(gen := why_would_you_do_this())
print(one / 3) # 0.3
虽然我在没有异步生成器的异步 Python 中有很好的经验[5],但我更愿意解决问题而不是将它们从语言中移除。
我们不能直接将异常传递到正确的位置吗?
如果我们实现了 PEP 568(上下文变量的生成器敏感性;另见 PEP 550),那么就可以处理超时异常:事件循环可以避免触发 CancelledError,直到包含上下文管理器的生成器帧在堆栈上——无论是生成器恢复时,还是它最终完成时。
这可能需要任意长的时间;即使我们实现了 PEP 533 以确保在退出(异步)for 循环时及时清理,仍然可以通过 next/send 手动驱动生成器。
然而,这并没有解决 TaskGroup 的另一个问题。生成器的模型是你将一个栈帧置于暂停状态,然后可以将其视为一个惰性值,可以存储、移动,甚至在任意位置丢弃或复活。结构化并发的模型是你的栈变成一棵树,子任务封装在某个父帧中。它们以不同且不幸地不兼容的方向扩展了基本的结构化编程模型。
例如,假设暂停一个包含开放 TaskGroup 的帧也会暂停所有子任务。这将保留“向下”的结构化并发,因为子任务仍然被封装——尽管代价是使我们的两个激励示例以及许多实际代码陷入死锁。然而,仍然有可能在不同的任务中恢复生成器,从而违反结构化并发的“向上”不变性。
我们认为不值得添加如此多的机制来处理取消作用域,同时仍然让任务组处于损坏状态。
替代实现 - 检查字节码
Jelle Zijlstra 提出了另一种方案,其中 sys.prevent_yields 检查调用者的字节码,直到确信在调用指令指针和下一个上下文退出之间没有 yield。我们预计可以相当容易地添加对语法嵌套上下文管理器的支持。
然而,目前尚不清楚当用户定义的上下文管理器包装 sys.prevent_yields 时,这将如何工作。更糟糕的是,这种方法忽略了对 __enter__() 和 __exit__() 的显式调用,这意味着上下文管理协议会因是否使用 with 语句而异。
“只在使用时付费”的性能成本非常有吸引力。然而,检查帧对象对于核心控制流构造来说开销太大,并且会导致通过去优化而导致的整个程序减速。另一方面,为提高性能而添加解释器支持会导致与我们上面首选解决方案相同的“无论如何都付费”语义。
脚注
版权
本文档置于公共领域或 CC0-1.0-Universal 许可证下,以更宽松者为准。
来源:https://github.com/python/peps/blob/main/peps/pep-0789.rst
最后修改时间:2024-06-04 01:45:13 GMT