PEP 789 – 通过限制异步生成器中的 yield 来防止任务取消错误
- 作者:
- Zac Hatfield-Dodds <zac at zhd.dev>, Nathaniel J. Smith <njs at pobox.com>
- PEP 代表:
- 讨论邮件列表:
- Discourse 讨论主题
- 状态:
- 草稿
- 类型:
- 标准跟踪
- 创建日期:
- 2024年5月14日
- Python 版本:
- 3.14
摘要
结构化并发在 Python 中越来越受欢迎。诸如 asyncio.TaskGroup
和 asyncio.timeout
上下文管理器之类的接口支持组合推理,并允许开发人员清晰地限定并发任务的生命周期。但是,在这样的上下文中使用 yield
来挂起框架会导致错误的任务被取消、超时被忽略以及异常被错误处理的情况。更根本的是,在 TaskGroup
内挂起框架违反了结构化并发设计原则,即子任务封装在其父框架中。
为了解决这些问题,本 PEP 提出了一种新的 sys.prevent_yields()
上下文管理器。当语法上位于此上下文中时,尝试 yield
将引发 RuntimeError,从而阻止任务产生值。此外,将为诸如 @contextmanager
之类的装饰器提供一种机制,以允许在装饰函数内部使用 yield。 sys.prevent_yields()
将由 asyncio 和下游库用于实现任务组、超时和取消;而 contextlib
等将使用相关机制将生成器转换为允许安全 yield 的上下文管理器。
背景
结构化并发在 Python 中越来越受欢迎,以更新的 asyncio
接口和第三方库(如 Trio 和 anyio)的形式出现。这些接口支持组合推理,*只要*用户不编写在取消作用域内挂起框架的 yield
。
取消作用域是一种上下文管理器,它可以……取消……该上下文中发生的所有工作(……作用域)。在 asyncio 中,这在 with asyncio.timeout():
或 async with asyncio.TaskGroup() as tg:
的设计中是隐式的,它们分别在指定持续时间后取消包含的工作,或者当其中一个任务引发异常时取消同级任务。取消作用域的核心功能是同步的,但面向用户的上下文管理器可以是同步的或异步的。[1] [2]
这种结构化方法运行良好,除非您遇到一个特定的尖锐边缘:通过在取消作用域内 yield
来破坏嵌套结构。这与添加一些跨函数的 goto
对结构化控制流的影响非常相似,其影响确实非常糟糕。
- 错误的任务可能会被取消,无论是由于超时、同级任务中的错误还是显式请求取消某些其他任务。
- 异常(包括
CancelledError
)可能会传递到错误的任务。 - 异常可能完全丢失,被丢弃而不是添加到
ExceptionGroup
中。
问题陈述
这是根本问题:yield 会挂起调用帧。只有在叶子帧中 yield 才有意义 - 即,如果您的调用栈类似于 A -> B -> C,那么您可以挂起 C,但您不能在 C 仍在运行时挂起 B。
但是,TaskGroup 是一种“并发调用”原语,其中单个帧可以有多个并发运行的子帧。这意味着如果我们允许人们混合使用 yield 和 TaskGroup,那么我们最终可能会处于这种情况,即 B 被挂起但 C 仍在积极运行。这是没有意义的,并且会导致严重的实际问题(例如,如果 C 引发异常并且 A 已返回,我们无法传播它)。
这是生成器控制流和结构化并发控制流之间根本的不兼容性,而不是我们可以通过调整 API 来解决的问题。唯一看起来可行的解决方案是在 TaskGroup 内禁止使用 yield。
尽管超时不会让子任务继续运行,但其密切的类比和相关问题使我们得出结论,yield 应该在所有取消作用域内都被禁止,而不仅仅是 TaskGroup。请参阅 我们不能只将异常传递到正确的位置吗? 以获取讨论。
激励示例
让我们考虑三个示例,看看这在实践中可能是什么样子。
将超时泄漏到外部作用域
假设我们想要迭代一个异步迭代器,但每个元素最多等待 max_time
秒。我们可能会自然地将执行此操作的逻辑封装在异步生成器中,以便调用站点可以继续使用简单的 async for
循环。
async def iter_with_timeout(ait, max_time):
try:
while True:
with timeout(max_time):
yield await anext(ait)
except StopAsyncIteration:
return
async def fn():
async for elem in iter_with_timeout(ait, max_time=1.0):
await do_something_with(elem)
不幸的是,此版本中存在一个错误:超时可能会在生成器产生值后但恢复之前到期!在这种情况下,我们会看到在外部任务中引发了 CancelledError
,而该错误无法被 with timeout(max_time):
语句捕获。
修复方法非常简单:在超时上下文中获取下一个元素,然后在该上下文之外产生值。
async def correct_iter_with_timeout(ait, max_time):
try:
while True:
with timeout(max_time):
tmp = await anext(ait)
yield tmp
except StopAsyncIteration:
return
泄漏后台任务(破坏取消和异常处理)
超时不是唯一包装取消作用域的接口 - 并且如果您需要一些后台工作任务,则不能在产生值之前简单地关闭 TaskGroup
。
例如,让我们看一下扇入生成器,我们将使用它来合并来自多个“传感器”的馈送。我们还将使用一个小缓冲区设置模拟传感器,以便在控制流位于 combined_iterators
生成器之外时,在后台任务中引发错误。
import asyncio, itertools
async def mock_sensor(name):
for n in itertools.count():
await asyncio.sleep(0.1)
if n == 1 and name == "b": # 'presence detection'
yield "PRESENT"
elif n == 3 and name == "a": # inject a simple bug
print("oops, raising RuntimeError")
raise RuntimeError
else:
yield f"{name}-{n}" # non-presence sensor data
async def move_elements_to_queue(ait, queue):
async for obj in ait:
await queue.put(obj)
async def combined_iterators(*aits):
"""Combine async iterators by starting N tasks, each of
which move elements from one iterable to a shared queue."""
q = asyncio.Queue(maxsize=2)
async with asyncio.TaskGroup() as tg:
for ait in aits:
tg.create_task(move_elements_to_queue(ait, q))
while True:
yield await q.get()
async def turn_on_lights_when_someone_gets_home():
combined = combined_iterators(mock_sensor("a"), mock_sensor("b"))
async for event in combined:
print(event)
if event == "PRESENT":
break
print("main task sleeping for a bit")
await asyncio.sleep(1) # do some other operation
asyncio.run(turn_on_lights_when_someone_gets_home())
当我们运行此代码时,我们会看到预期的观察序列,然后是“检测”,然后当主任务处于睡眠状态时,我们在后台触发该 RuntimeError
。但是……我们实际上并没有观察到 RuntimeError
,甚至没有作为另一个异常的 __context__
!
>> python3.11 demo.py
a-0
b-0
a-1
PRESENT
main task sleeping for a bit
oops, raising RuntimeError
Traceback (most recent call last):
File "demo.py", line 39, in <module>
asyncio.run(turn_on_lights_when_someone_gets_home())
...
File "demo.py", line 37, in turn_on_lights_when_someone_gets_home
await asyncio.sleep(1) # do some other operation
File ".../python3.11/asyncio/tasks.py", line 649, in sleep
return await future
asyncio.exceptions.CancelledError
这里,问题同样在于我们在取消作用域内 yield
了;这次是 TaskGroup
用于在其中一个子任务引发异常时取消同级任务的作用域。但是,原本打算用于同级任务的 CancelledError
却被注入到外部任务中,因此我们从未有机会创建和引发 ExceptionGroup(..., [RuntimeError()])
。
为了解决这个问题,我们需要将我们的异步生成器转换为异步上下文管理器,它会产生一个异步可迭代对象 - 在这种情况下,是一个包装队列的生成器;将来 也许是队列本身
async def queue_as_aiterable(queue):
# async generators that don't `yield` inside a cancel scope are fine!
while True:
try:
yield await queue.get()
except asyncio.QueueShutDown:
return
@asynccontextmanager # yield-in-cancel-scope is OK in a context manager
async def combined_iterators(*aits):
q = asyncio.Queue(maxsize=2)
async with asyncio.TaskGroup() as tg:
for ait in aits:
tg.create_task(move_elements_to_queue(ait, q))
yield queue_as_aiterable(q)
async def turn_on_lights_when_someone_gets_home():
...
async with combined_iterators(...) as ait:
async for event in ait:
...
在用户定义的上下文管理器中
在取消作用域内使用 yield 可以是安全的,当且仅当您使用生成器来实现上下文管理器 [3] - 在这种情况下,任何传播的异常都将重定向到预期的任务。
我们还在 flake8-async 中实现了 ASYNC101
lint 规则,该规则会警告在已知的取消作用域中使用 yield。用户教育是否足以避免这些问题?不幸的是,不是:用户定义的上下文管理器也可以包装取消作用域,并且识别或 lint 所有此类情况是不可行的。
这在实践中经常出现,因为“在此上下文的持续时间内运行一些后台任务”是在结构化并发中非常常见的模式。我们在上面的 combined_iterators()
中看到了这一点;并且在 Websocket 协议的多个实现中也看到了此错误。
async def get_messages(websocket_url):
# The websocket protocol requires background tasks to manage the socket heartbeat
async with open_websocket(websocket_url) as ws: # contains a TaskGroup!
while True:
yield await ws.get_message()
async with open_websocket(websocket_url) as ws:
async for message in get_messages(ws):
...
规范
为了防止这些问题,我们建议
- 一个新的上下文管理器,
with sys.prevent_yields(reason): ...
,如果在其中尝试使用 yield,它将引发 RuntimeError。[4] asyncio 和下游代码中的类似取消作用域的上下文管理器可以包装它以防止在其with
块内使用 yield。 - 一种机制,通过该机制,生成器到上下文管理器的装饰器可以在一次调用中允许使用 yield。我们还不确定这应该是什么样子;领先的候选者是
- 代码对象属性,
fn.__code__.co_allow_yields = True
,或者 - 某种调用标志,例如
fn.__invoke_with_yields__
,以避免修改可能在装饰和未装饰函数之间共享的代码对象。
- 代码对象属性,
实现 - 跟踪帧
新的 sys.prevent_yields
上下文管理器需要解释器支持。对于每个帧,我们都跟踪此上下文管理器的进入和退出。
我们并没有特别执着于确切的表示方式;我们将把它讨论成一个栈(这将支持清晰的错误消息),但更紧凑的表示方式(例如一对整数)也可以。
- 当进入新创建或恢复的帧时,初始化空的入口和出口栈。
- 从帧返回时,将这些栈合并到父帧的栈中。
- 当产生值时
- 如果
entries != [] and not frame.allow_yield_flag
,则引发RuntimeError
而不是产生值(此 PEP 提出的新行为) - 否则,将栈合并到父帧中,就像返回一样。
- 如果
由于这是关于在任务**内部**产出帧,而不是在任务之间切换,因此语法上的yield
和yield from
应该受到影响,但await
表达式不应该受到影响。
我们可以通过为所有不是生成器的栈帧,在每个线程中存储此元数据到一个栈中来减少开销。
工作示例
无 yield 示例
在这个例子中,我们看到了在从sys.prevent_yields
展开,经过用户定义的上下文管理器,回到原始帧的过程中,栈合并的多个轮次。为简洁起见,阻止产出的原因没有显示;它是“1 enter”状态的一部分。

如果没有yield
,我们不会引发任何错误,并且由于进入和退出次数平衡,帧像往常一样返回,无需进一步跟踪。
尝试 yield 示例
在这个例子中,帧试图在sys.prevent_yields
上下文中yield
。解释器检测到这一点,并引发RuntimeError
而不是挂起帧。

允许 yield 示例
在这个例子中,装饰器已将帧标记为允许产出。这可能是@contextlib.contextmanager
或相关的装饰器。

当帧被允许产出时,进入/退出栈在挂起之前合并到父帧的栈中。当帧恢复时,其栈为空。最后,当帧退出时,退出被合并到父帧的栈中,重新平衡它。
这确保了父帧正确地继承了任何剩余的sys.prevent_yields
状态,同时允许帧安全地挂起和恢复。
允许上下文管理器使用 yield
待办事项:此部分是一个占位符,等待关于``@contextmanager``在包装函数中重新启用产出机制的决定。
- 解释并展示一个关于
@asynccontextmanager
如何设置标志的代码示例。
请注意,诸如@pytest.fixture
之类的第三方装饰器表明我们不能仅仅让解释器对contextlib进行特殊处理。
如果 sys.prevent_yields
被误用,会发生什么
虽然不建议这样做,但可以按不对应于任何有效嵌套的顺序调用sys.prevent_yields.__enter__
和.__exit__
,或者以其他方式获得无效的帧状态。
sys.prevent_yields.__exit__
可以通过两种方式检测无效状态。首先,如果未阻止产出,我们可以简单地引发异常而无需更改状态。其次,如果栈顶有一个意外的进入,我们建议弹出该进入并引发异常——这确保了乱序调用仍然会清除栈,同时仍然清楚地表明某些地方出了问题。
(如果我们选择例如基于整数而不是基于栈的表示,则此类状态可能根本无法与正确的嵌套区分开来,在这种情况下,这个问题就不会出现)
预期用途
在标准库中,sys.prevent_yields
可以被asyncio.TaskGroup
、asyncio.timeout
和asyncio.timeout_at
使用。在下游,我们期望在trio.CancelScope
、异步fixture(在pytest-trio、anyio等中)以及可能的其他地方使用它。
我们认为与异步正确性无关的用例,例如阻止decimal.localcontext
从生成器中泄漏,超出了本PEP的范围。
生成器到上下文管理器的支持将由@contextlib.(async)contextmanager
使用,如果需要,将在(Async)ExitStack
中使用。
向后兼容性
添加sys.prevent_yields
上下文管理器、对@contextlib.(async)contextmanager
的更改以及相应的解释器支持都是完全向后兼容的。
阻止asyncio.TaskGroup
、asycio.timeout
和asyncio.timeout_at
内部的产出将是对至少一些实际代码的重大更改,这些代码(无论多么不安全且容易出现上述动机问题)可能经常有效,以至于被用于生产环境。
我们将寻求社区对标准库代码适当弃用路径的反馈,包括任何弃用期的建议长度。作为一个初步建议,我们可以在3.14中仅在asyncio调试模式下使挂起stdlib上下文发出DeprecationWarning;然后在3.15中过渡到默认警告并在调试模式下报错;最后在3.16中报错。
无论stdlib的使用情况如何,下游框架都将立即采用此功能。
此错误有多普遍?
我们在这里没有确切的数字,但相信许多项目都受到了影响。自从在工作中同一周遇到一个中等严重和一个严重错误,这些错误归因于挂起取消范围后,我们已经使用静态分析取得了一些成功。Zac在PyCon上与三位人士交谈,他们认识到了这些症状,并得出结论,他们可能也受到了影响。
待办事项:在生态系统项目(例如aio-libs软件包)中运行ASYNC101 lint规则,并了解广泛使用的PyPI软件包中的频率?这将有助于为stdlib代码提供中断/弃用路径。
如何教授这个
异步生成器很少被教授给新手程序员。
大多数中级和高级Python程序员只会作为TaskGroup
、timeout
和@contextmanager
的用户与本PEP交互。对于这部分人群,我们预计一个清晰的异常消息和文档就足够了。
- 将在使用asyncio进行开发页面中添加一个新部分,简要说明异步生成器在“取消范围”上下文中(即
TaskGroup
或timeout
上下文管理器)不允许yield
。我们预计问题重述和动机部分的一些内容将为这些文档提供依据。 - 在每个包装取消范围的上下文管理器的文档中,因此现在是
sys.prevent_yields
,包含一个标准句子,例如“如果在异步生成器中使用,[在此上下文管理器中yield
是错误的]。”,并带有指向上面解释的超链接。
对于实现取消范围语义的asyncio、Trio、curio或其他框架维护人员,我们将确保sys.prevent_yields
的文档提供了从本PEP的解决方案和实现部分提取的完整解释。我们预计会咨询大多数此类维护人员以获取他们对PEP草案的反馈。
被拒绝的替代方案
PEP 533,迭代器的确定性清理
PEP 533建议向迭代器协议添加__[a]iterclose__
,基本上是在每个(异步)for循环周围包装一个with [a]closing(ait)
。虽然这对于确保及时且确定地清理迭代器持有的资源很有用,但它旨在解决的问题并不能完全解决激发本PEP的问题。
即使有了PEP 533,错误触发的取消仍然会传递到错误的任务,并且可能在迭代器关闭之前造成严重破坏。此外,它没有解决TaskGroup
的基本结构化并发问题,其中挂起拥有TaskGroup的帧与子任务完全封装在其父帧中的模型不兼容。
完全弃用异步生成器
在2024年语言峰会上,一些与会者建议改为完全弃用异步生成器。不幸的是,虽然实践中的常见情况都使用异步生成器,但Trio代码可以使用标准生成器触发相同的问题。
# We use Trio for this example, because while `asyncio.timeout()` is async,
# Trio's CancelScope type and timeout context managers are synchronous.
import trio
def abandon_each_iteration_after(max_seconds):
# This is of course broken, but I can imagine someone trying it...
while True:
with trio.move_on_after(max_seconds):
yield
@trio.run
async def main():
for _ in abandon_each_iteration_after(max_seconds=1):
await trio.sleep(3)
如果没有出现问题中的错误,这段代码看起来会非常惯用——但在大约一秒钟后,它不会继续执行下一个迭代,而是引发了
Traceback (most recent call last):
File "demo.py", line 10, in <module>
async def main():
File "trio/_core/_run.py", line 2297, in run
raise runner.main_task_outcome.error
File "demo.py", line 12, in main
await trio.sleep(3)
File "trio/_timeouts.py", line 87, in sleep
await sleep_until(trio.current_time() + seconds)
...
File "trio/_core/_run.py", line 1450, in raise_cancel
raise Cancelled._create()
trio.Cancelled: Cancelled
此外,还有一些非取消范围的同步上下文管理器表现出相关问题,例如上面提到的decimal.localcontext
。虽然修复下面的示例不是本PEP的目标,但它表明在with内部产出问题并不仅限于异步生成器。
import decimal
def why_would_you_do_this():
with decimal.localcontext(decimal.Context(prec=1)):
yield
one = decimal.Decimal(1)
print(one / 3) # 0.3333333333333333333333333333
next(gen := why_would_you_do_this())
print(one / 3) # 0.3
虽然我在没有异步生成器的情况下在异步Python中获得了良好的体验[5],但我更愿意修复问题而不是将它们从语言中删除。
我们不能只将异常传递到正确的位置吗?
如果我们实现了PEP 568(上下文变量的生成器敏感性;另请参见PEP 550),则可以处理超时中的异常:事件循环可以避免触发CancelledError
,直到包含上下文管理器的生成器帧位于栈中——在恢复生成器时或在生成器完成时。
这可能需要任意长的时间;即使我们实现了PEP 533以确保在退出(异步)for循环时及时清理,仍然可以使用next/send手动驱动生成器。
但是,这并没有解决TaskGroup
的另一个问题。生成器的模型是,你将一个栈帧置于挂起状态,然后可以将其视为一个惰性值,可以存储、移动,并可能在某个任意位置丢弃或恢复。结构化并发的模型是你的栈变成了一个树,子任务封装在某个父帧中。它们正在以不同的、不幸的是不兼容的方向扩展基本的结构化编程模型。
例如,假设挂起包含打开的 TaskGroup
的框架也会挂起所有子任务。这将保留“向下”结构化并发,因为子任务仍然封装 - 尽管代价是使我们两个激励示例以及许多实际代码都死锁。但是,仍然可以在不同的任务中恢复生成器,从而违反结构化并发的“向上”不变性。
我们认为不值得添加太多机制来处理取消范围,同时仍然让任务组处于损坏状态。
替代实现 - 检查字节码
Jelle Zijlstra 已 概述了一种替代方案,其中 sys.prevent_yields
检查调用者的字节码,直到确认在调用指令指针和下一个上下文退出之间没有 yield。我们预计可以相当轻松地添加对语法嵌套上下文管理器的支持。
但是,目前尚不清楚当用户定义的上下文管理器包装 sys.prevent_yields
时,这将如何工作。更糟糕的是,这种方法忽略了对 __enter__()
和 __exit__()
的显式调用,这意味着上下文管理协议将取决于是否使用了 with
语句。
“仅在使用时付费”的性能成本非常有吸引力。但是,检查框架对象对于核心控制流构造来说代价高昂,并通过反优化导致整个程序变慢。另一方面,添加解释器支持以获得更好的性能会导致与我们上面首选解决方案相同的“无论如何都支付”语义。
脚注
版权
本文档置于公共领域或根据 CC0-1.0-Universal 许可证,以较宽松者为准。
来源: https://github.com/python/peps/blob/main/peps/pep-0789.rst
上次修改时间: 2024-06-04 01:45:13 GMT