Following system colour scheme - Python 增强提案 Selected dark colour scheme - Python 增强提案 Selected light colour scheme - Python 增强提案

Python 增强提案

PEP 734 – 标准库中的多个解释器

作者:
Eric Snow <ericsnowcurrently at gmail.com>
讨论列表:
Discourse 主题
状态:
延期
类型:
标准跟踪
创建:
2023年11月6日
Python 版本:
3.13
历史记录:
2023年12月14日
替换:
554
决议:
Discourse 消息

目录

注意

此 PEP 本质上是 PEP 554 的延续。该文档在 7 年的讨论中积累了大量辅助信息。此 PEP 将其精简回核心信息。许多额外信息仍然有效且有用,只是不属于此处特定提案的直接范围。

摘要

此 PEP 提出添加一个新的模块 interpreters,以支持在当前进程中的多个解释器中检查、创建和运行代码。这包括表示底层解释器的 Interpreter 对象。该模块还将提供一个基本的 Queue 类用于解释器之间的通信。最后,我们将添加一个基于 interpreters 模块的新 concurrent.futures.InterpreterPoolExecutor

简介

从根本上说,“解释器”是 Python 线程必须共享的(基本上)所有运行时状态的集合。因此,让我们首先看看线程。然后我们将回到解释器。

线程和线程状态

一个 Python 进程将有一个或多个 OS 线程运行 Python 代码(或以其他方式与 C API 交互)。这些线程中的每一个都使用自己的线程状态 (PyThreadState) 与 CPython 运行时交互,该状态保存该线程独有的所有运行时状态。还有一些运行时状态在多个 OS 线程之间共享。

任何 OS 线程都可以切换它当前使用的线程状态,只要它不是另一个 OS 线程已经使用(或曾经使用)的状态即可。此“当前”线程状态由运行时存储在某个线程本地变量中,并且可以通过 PyThreadState_Get() 显式查找。它会自动为初始(“主”)OS 线程和 threading.Thread 对象设置。从 C API 中,它由 PyThreadState_Swap() 设置(和清除),并且可以通过 PyGILState_Ensure() 设置。大多数 C API 要求存在当前线程状态,要么隐式查找,要么作为参数传递。

OS 线程和线程状态之间的关系是一对多。每个线程状态最多与一个 OS 线程关联,并记录其线程 ID。线程状态永远不会用于多个 OS 线程。但是,在另一个方向上,一个 OS 线程可能有多个线程状态与其关联,尽管同样,只有一个可以是当前状态。

当 OS 线程有多个线程状态时,PyThreadState_Swap() 用于在该 OS 线程中切换它们,请求的线程状态成为当前状态。使用旧线程状态在线程中运行的任何内容都会有效地暂停,直到该线程状态被交换回来。

解释器状态

如前所述,某些运行时状态由多个 OS 线程共享。其中一些由 sys 模块公开,但许多是在内部使用且没有显式公开或仅通过 C API 公开。

此共享状态称为解释器状态 (PyInterpreterState)。我们有时会在这里将其简称为“解释器”,尽管这有时也用于指代 python 可执行文件、Python 实现以及字节码解释器(即 exec()/eval())。

CPython 从 1.5 版(1997 年)开始就在同一进程中支持多个解释器(也称为“子解释器”)。此功能可通过 C API 使用。

解释器和线程

线程状态与解释器状态的关系与 OS 线程和进程的关系非常相似(在高级别)。首先,关系是一对多。线程状态属于单个解释器(并存储指向它的指针)。该线程状态永远不会用于不同的解释器。但是,在另一个方向上,解释器可能与其关联零个或多个线程状态。只有在 OS 线程中其线程状态为当前状态时,才认为解释器处于活动状态。

解释器是通过使用 Py_NewInterpreterFromConfig()(或 Py_NewInterpreter(),它是 Py_NewInterpreterFromConfig() 的轻量级包装器)的 C API 创建的。该函数执行以下操作

  1. 创建一个新的解释器状态
  2. 创建一个新的线程状态
  3. 将线程状态设置为当前状态(解释器初始化需要当前 tstate)
  4. 使用该线程状态初始化解释器状态
  5. 返回线程状态(仍然是当前状态)

请注意,返回的线程状态可能会立即被丢弃。除了当解释器真正要使用时,没有要求解释器有任何线程状态。此时,它必须在当前 OS 线程中处于活动状态。

要在当前 OS 线程中激活现有解释器,C API 用户首先确保该解释器具有相应的线程状态。然后像平常一样调用 PyThreadState_Swap(),使用该线程状态。如果另一个解释器的线程状态已经是当前状态,则它会像平常一样被交换出去,因此在 OS 线程中对该解释器的执行将有效地暂停,直到它被交换回来。

一旦解释器像这样在当前 OS 线程中处于活动状态,线程就可以调用任何 C API,例如 PyEval_EvalCode()(即 exec())。这是通过使用当前线程状态作为运行时上下文来实现的。

“主”解释器

当 Python 进程启动时,它会为当前 OS 线程创建一个单一的解释器状态(“主”解释器)和一个单一的线程状态。然后使用它们初始化 Python 运行时。

初始化后,脚本或模块或 REPL 会使用它们执行。该执行发生在解释器的 __main__ 模块中。

当进程在主 OS 线程中完成运行请求的 Python 代码或 REPL 时,Python 运行时会在该线程中使用主解释器完成。

运行时完成对仍在运行的 Python 线程(无论是在主解释器中还是在子解释器中)只有轻微的、间接的影响。这是因为,它会立即无限期地等待所有非守护进程 Python 线程完成。

虽然可以查询 C API,但没有机制可以使任何 Python 线程直接知道完成已开始,除了可能使用 threading._register_atexit() 注册的“atexit”函数。

任何剩余的子解释器本身会在稍后完成,但此时它们在任何 OS 线程中都不是当前状态。

解释器隔离

CPython 的解释器旨在彼此严格隔离。这意味着解释器永远不会共享对象(除了在使用永生、不可变的内置对象的一些非常特殊的情况下)。每个解释器都有自己的模块 (sys.modules)、类、函数和变量。即使两个解释器定义了相同的类,每个解释器也将拥有自己的副本。这同样适用于 C 中的状态,包括扩展模块中的状态。CPython C API 文档 解释了更多内容

值得注意的是,某些进程全局状态将始终由解释器共享,有些是可变的,有些是不可变的。共享不可变状态几乎没有问题,同时提供了一些好处(主要是性能)。但是,所有共享的可变状态都需要特殊的管理,特别是对于线程安全,其中一些由 OS 为我们处理。

可变

  • 文件描述符
  • 底层环境变量
  • 进程内存(尽管分配器是隔离的)
  • 解释器列表

不可变

  • 内置类型(例如 dictbytes
  • 单例(例如 None
  • 内置/扩展/冻结模块的底层静态模块数据(例如函数)

现有的执行组件

Python 的一些现有部分可能有助于理解如何在子解释器中运行代码。

在 CPython 中,每个组件都围绕以下 C API 函数(或变体)之一构建

  • PyEval_EvalCode():使用给定的代码对象运行字节码解释器
  • PyRun_String():编译 + PyEval_EvalCode()
  • PyRun_File():读取 + 编译 + PyEval_EvalCode()
  • PyRun_InteractiveOneObject():编译 + PyEval_EvalCode()
  • PyObject_Call():调用 PyEval_EvalCode()

builtins.exec()

内置的 exec() 可以用来执行 Python 代码。它本质上是对 C API 函数 PyRun_String()PyEval_EvalCode() 的封装。

以下是内置 exec() 的一些相关特性

  • 它在当前的操作系统线程中运行,并暂停那里正在运行的任何内容,当 exec() 完成时恢复。其他操作系统线程不受影响。(为了避免暂停当前的 Python 线程,可以在 threading.Thread 中运行 exec()。)
  • 它可以启动其他线程,这些线程不会中断它。
  • 它针对“全局”命名空间(以及“局部”命名空间)执行。在模块级别,exec() 默认使用当前模块的 __dict__(即 globals())。exec() 原样使用该命名空间,并且不会在之前或之后清除它。
  • 它会传播其运行的代码中任何未捕获的异常。异常是从最初调用 exec() 的 Python 线程中的 exec() 调用处引发的。

命令行

python CLI 提供了几种运行 Python 代码的方式。在每种情况下,它都映射到相应的 C API 调用

  • <no args>-i - 运行 REPL(PyRun_InteractiveOneObject()
  • <filename> - 运行脚本(PyRun_File()
  • -c <code> - 运行给定的 Python 代码(PyRun_String()
  • -m module - 将模块作为脚本运行(通过 runpy._run_module_as_main() 使用 PyEval_EvalCode()

在每种情况下,它本质上是运行主解释器的 __main__ 模块顶层的 exec() 的变体。

threading.Thread

当启动 Python 线程时,它使用新的线程状态通过 PyObject_Call() 运行“目标”函数。全局命名空间来自 func.__globals__,并且任何未捕获的异常都会被丢弃。

动机

interpreters 模块将为多解释器功能提供高级接口。目标是使 CPython 的现有多解释器功能更容易被 Python 代码访问。这在 CPython 具有每个解释器的 GIL(PEP 684)并且人们越来越有兴趣使用多个解释器时尤其相关。

如果没有标准库模块,用户将仅限于 C API,这限制了他们可以尝试和利用多解释器的程度。

该模块将包括一个在解释器之间通信的基本机制。如果没有它,多个解释器将是一个用处不大的功能。

规范

该模块将

  • 公开现有的多解释器支持
  • 引入一个在解释器之间通信的基本机制

该模块将包装一个新的低级 _interpreters 模块(与 threading 模块的方式相同)。但是,该低级 API 不打算供公众使用,因此不属于本提案的一部分。

使用解释器

该模块定义了以下函数

  • get_current() -> Interpreter
    返回当前正在执行的解释器的 Interpreter 对象。
  • list_all() -> list[Interpreter]
    返回每个现有解释器的 Interpreter 对象,无论它当前是否在任何操作系统线程中运行。
  • create() -> Interpreter
    创建一个新的解释器并返回它的 Interpreter 对象。解释器本身不执行任何操作,并且天生不与任何操作系统线程绑定。这只有在解释器中实际运行某些内容(例如 Interpreter.exec())时以及运行时才会发生。解释器可能拥有也可能没有准备好使用的线程状态,但这严格来说是内部实现细节。

解释器对象

一个 interpreters.Interpreter 对象,它表示具有相应唯一 ID 的解释器(PyInterpreterState)。对于任何给定的解释器,只有一个对象。

如果解释器是用 interpreters.create() 创建的,那么一旦所有具有其 ID 的 Interpreter 对象(跨所有解释器)都被删除,它就会被销毁。

Interpreter 对象可能表示除 interpreters.create() 创建的解释器之外的其他解释器。例如,主解释器(由 Python 的运行时初始化创建)以及通过 C-API 使用 Py_NewInterpreter() 创建的解释器。此类 Interpreter 对象将无法与其对应的解释器交互,例如通过 Interpreter.exec()(尽管我们将来可能会放宽此限制)。

属性和方法

  • id
    (只读)一个非负的 int,用于标识此 Interpreter 实例表示的解释器。从概念上讲,这类似于进程 ID。
  • __hash__()
    返回解释器 id 的哈希值。这与 ID 的整数值的哈希值相同。
  • is_running() -> bool
    如果解释器当前正在其 __main__ 模块中执行代码,则返回 True。这排除了子线程。

    它仅指是否有操作系统线程在解释器的 __main__ 模块中运行脚本(代码)。这基本上意味着 Interpreter.exec() 是否在某个操作系统线程中运行。子线程中运行的代码将被忽略。

  • prepare_main(**kwargs)
    在解释器的 __main__ 模块中绑定一个或多个对象。

    关键字参数名称将用作属性名称。这些值将绑定为新的对象,尽管与原始对象完全相同。仅允许在解释器之间传递的特定支持的对象。请参阅共享对象

    prepare_main() 有助于在解释器中运行代码之前初始化其全局变量。

  • exec(code, /)
    在解释器(在当前操作系统线程中)中执行给定的源代码,使用其 __main__ 模块。它不返回任何内容。

    这本质上等效于切换到当前操作系统线程中的此解释器,然后使用此解释器的 __main__ 模块的 __dict__ 作为全局变量和局部变量调用内置的 exec()

    当前操作系统线程(不同的解释器)中运行的代码将有效地暂停,直到 Interpreter.exec() 完成。为了避免暂停它,创建一个新的 threading.Thread 并在其中调用 Interpreter.exec()(就像 Interpreter.call_in_thread() 所做的那样)。

    Interpreter.exec() 既不会在之前也不会在之后重置解释器的状态或 __main__ 模块,因此每次连续调用都会从上次调用结束的地方继续。这对于在以后执行某些重复任务之前运行一些代码来初始化解释器(例如使用导入)很有用。

    如果存在未捕获的异常,它将作为 ExecutionFailed 传播到调用解释器。原始异常的完整错误显示(相对于被调用解释器生成)保存在传播的 ExecutionFailed 上。这包括完整的回溯,以及所有额外的信息,例如语法错误详细信息和链接异常。如果未捕获 ExecutionFailed,则将显示该完整错误显示,就像在主解释器中引发并未捕获传播的异常一样。在调试时,拥有完整的回溯尤其有用。

    如果不希望传播异常,则应在传递给 Interpreter.exec()code 周围使用显式的 try-except。同样,任何依赖于异常中特定信息的错误处理都必须在给定的 code 周围使用显式的 try-except,因为 ExecutionFailed 不会保留这些信息。

  • call(callable, /)
    在解释器中调用可调用对象。返回值将被丢弃。如果可调用对象引发异常,则它将与 Interpreter.exec() 一样,作为 ExecutionFailed 异常传播。

    目前仅支持普通函数,并且仅支持不带参数且没有单元格变量的函数。自由全局变量相对于目标解释器的 __main__ 模块解析。

    将来,我们可以添加对参数、闭包和更广泛的可调用对象的支持,至少部分地可以通过 pickle 实现。我们还可以考虑不丢弃返回值。最初的限制是为了让我们能够尽快将模块的基本功能提供给用户。

  • call_in_thread(callable, /) -> threading.Thread
    本质上,在一个新线程中应用 Interpreter.call()。返回值将被丢弃,异常不会被传播。

    call_in_thread() 约等于

    def task():
        interp.call(func)
    t = threading.Thread(target=task)
    t.start()
    
  • close()
    销毁底层解释器。

解释器之间通信

该模块通过特殊的队列引入了基本的通信机制。

存在 interpreters.Queue 对象,但它们仅代理实际的数据结构:一个存在于任何解释器之外的无界 FIFO 队列。这些队列具有特殊的适应性,可以安全地在解释器之间传递对象数据,而不会违反解释器隔离。这包括线程安全。

与 Python 中的其他队列一样,对于每个“put”,对象都添加到队列的尾部,每个“get”都从队列的头部弹出下一个对象。每个添加的对象都将按照其入队的顺序弹出。

只有专门支持在解释器之间传递的对象才能通过 interpreters.Queue 发送。请注意,实际的对象不会被发送,而是其底层数据。但是,弹出的对象仍将严格等同于原始对象。请参阅 可共享对象

该模块定义了以下函数

  • create_queue(maxsize=0, *, syncobj=False) -> Queue
    创建一个新的队列。如果 maxsize 为零或负数,则队列是无界的。

    “syncobj”用作 put()put_nowait() 的默认值。

队列对象

interpreters.Queue 对象充当 interpreters 模块公开的底层跨解释器安全队列的代理。每个 Queue 对象都表示具有相应唯一 ID 的队列。对于任何给定的队列,都只有一个对象。

Queue 实现 queue.Queue 的所有方法,除了 task_done()join(),因此它类似于 asyncio.Queuemultiprocessing.Queue

属性和方法

  • id
    (只读) 一个非负的 int,用于标识相应的跨解释器队列。从概念上讲,这类似于用于管道的文件描述符。
  • maxsize
    (只读) 队列中允许的项目数。零表示“无界”。
  • __hash__()
    返回队列的 id 的哈希值。这与 ID 的整数值的哈希值相同。
  • empty()
    如果队列为空,则返回 True,否则返回 False

    这只是调用时状态的快照。其他线程或解释器可能会导致此状态发生变化。

  • full()
    如果队列中有 maxsize 个项目,则返回 True

    如果队列初始化时 maxsize=0(默认值),则 full() 永远不会返回 True

    这只是调用时状态的快照。其他线程或解释器可能会导致此状态发生变化。

  • qsize()
    返回队列中项目的数量。

    这只是调用时状态的快照。其他线程或解释器可能会导致此状态发生变化。

  • put(obj, timeout=None, *, syncobj=None)
    将对象添加到队列中。

    如果 maxsize > 0 且队列已满,则阻塞直到有空闲插槽可用。如果 timeout 是一个正数,则它只会阻塞至少这么多秒,然后引发 interpreters.QueueFull。否则将永远阻塞。

    如果“syncobj”为真,则对象必须是 可共享的,这意味着对象的数据是通过传递而不是对象本身。如果“syncobj”为假,则所有对象都受支持。但是,存在一些性能损失,并且所有对象都是副本(例如,通过 pickle)。因此,可变对象永远不会在解释器之间自动同步。如果“syncobj”为 None(默认值),则使用队列的默认值。

    如果一个对象仍在队列中,并且将它放入队列的解释器(即它所属的解释器)被销毁,则该对象将立即从队列中移除。(我们以后可能会添加一个选项,用哨兵替换队列中移除的对象,或为相应的 get() 调用引发异常。)

  • put_nowait(obj, *, syncobj=None)
    类似于 put(),但实际上具有立即超时。因此,如果队列已满,它会立即引发 interpreters.QueueFull
  • get(timeout=None) -> object
    从队列中弹出下一个对象并返回它。在队列为空时阻塞。如果提供了正数 timeout 并且在这么多秒内没有对象添加到队列中,则引发 interpreters.QueueEmpty
  • get_nowait() -> object
    类似于 get(),但不阻塞。如果队列不为空,则返回下一个项目。否则,引发 interpreters.QueueEmpty

可共享对象

Interpreter.prepare_main() 仅适用于“可共享”对象。 interpreters.Queue 也一样(可选)。

“可共享”对象是可以从一个解释器传递到另一个解释器的对象。该对象不一定实际上被解释器直接共享。但是,即使它没有被共享,也应该将共享对象视为直接共享。这是所有可共享对象的强大等效性保证。(见下文。)

对于某些类型(内置单例),实际对象是共享的。对于某些类型,对象的底层数据实际上是共享的,但每个解释器都有一个包装该数据的不同对象。对于所有其他可共享类型,都会创建一个严格的副本或代理,以便相应的对象继续完全匹配。在底层数据复杂但必须复制(例如 tuple)的情况下,数据将尽可能高效地序列化。

可共享对象必须由 Python 运行时在内部专门支持。但是,对于以后添加对更多类型的支持没有任何限制。

以下是支持对象的初始列表

  • str
  • bytes
  • int
  • float
  • bool (True/False)
  • None
  • tuple(仅包含可共享的项目)
  • interpreters.Queue
  • memoryview(底层缓冲区实际上是共享的)

请注意,列表中的最后两个,队列和 memoryview,在技术上是可变数据类型,而其余的则不是。当任何解释器共享可变数据时,总存在数据竞争的风险。跨解释器安全,包括线程安全,是队列的基本功能。

但是,memoryview 没有任何本地适应性。用户负责管理线程安全,无论是通过队列传递来回传递令牌以指示安全(请参阅 同步),还是通过为各个解释器分配子范围排他性。

大多数对象将通过队列 (interpreters.Queue) 共享,因为解释器彼此之间通信信息。不太频繁地,对象将通过 prepare_main() 共享,以在运行代码之前设置解释器。但是,prepare_main() 是共享队列的主要方式,以便为另一个解释器提供进一步通信的方式。

最后,提醒一下:对于某些类型,实际对象是共享的,而对于其余类型,只有底层数据是共享的,无论是作为副本还是通过代理。无论如何,它始终保留“可共享”对象的强大等效性保证。

保证是指一个解释器中的共享对象严格等同于另一个解释器中的相应对象。换句话说,这两个对象将彼此无法区分。共享对象应视为原始对象已直接共享,无论它是否实际上已共享。这与仅仅相等相比,是一个略有不同且更强的承诺。

对于可变对象(如 Interpreters.Queuememoryview),该保证尤其重要。在一个解释器中修改对象将始终立即反映在共享该对象的每个其他解释器中。

同步

在某些情况下,两个解释器应该同步。这可能涉及共享资源、工作器管理或保持顺序一致性。

在多线程编程中,典型的同步原语是互斥体等类型。 threading 模块公开了几个。但是,解释器无法共享对象,这意味着它们无法共享 threading.Lock 对象。

interpreters 模块不提供任何此类专用的同步原语。相反,interpreters.Queue 对象提供了人们可能需要的一切。

例如,如果存在需要管理访问的共享资源,则可以使用队列来管理它,其中解释器传递一个对象以指示谁可以使用该资源

import interpreters
from mymodule import load_big_data, check_data

numworkers = 10
control = interpreters.create_queue()
data = memoryview(load_big_data())

def worker():
    interp = interpreters.create()
    interp.prepare_main(control=control, data=data)
    interp.exec("""if True:
        from mymodule import edit_data
        while True:
            token = control.get()
            edit_data(data)
            control.put(token)
        """)
threads = [threading.Thread(target=worker) for _ in range(numworkers)]
for t in threads:
    t.start()

token = 'football'
control.put(token)
while True:
    control.get()
    if not check_data(data):
        break
    control.put(token)

异常

  • InterpreterError
    指示发生了一些与解释器相关的故障。

    此异常是 Exception 的子类。

  • InterpreterNotFoundError
    在底层解释器被销毁(例如,通过 C-API)后,从 Interpreter 方法引发。

    此异常是 InterpreterError 的子类。

  • ExecutionFailed
    当存在未捕获的异常时,从 Interpreter.exec()Interpreter.call() 引发。此异常的错误显示包括未捕获异常的回溯,该回溯在正常错误显示后显示,非常类似于 ExceptionGroup 的情况。

    属性

    • type - 表示原始异常类,包含 __name____module____qualname__ 属性。
    • msg - 原始异常的 str(exc)
    • snapshot - 原始异常的 traceback.TracebackException 对象

    此异常是 InterpreterError 的子类。

  • QueueError
    指示某些与队列相关的错误发生。

    此异常是 Exception 的子类。

  • QueueNotFoundError
    在底层队列被销毁后,从 interpreters.Queue 方法中引发。

    此异常是 QueueError 的子类。

  • QueueEmpty
    当队列为空时,从 Queue.get()(或没有默认值的 get_nowait())中引发。

    此异常同时是 QueueError 和标准库 queue.Empty 的子类。

  • QueueFull
    当队列已达到最大尺寸时,从 Queue.put()(带超时)或 put_nowait() 中引发。

    此异常同时是 QueueError 和标准库 queue.Empty 的子类。

InterpreterPoolExecutor

除了新的 interpreters 模块外,还将提供一个新的 concurrent.futures.InterpreterPoolExecutor。它将派生自 ThreadPoolExecutor,其中每个工作线程都在其自己的线程中执行,但每个线程都有自己的子解释器。

与其他执行器一样,InterpreterPoolExecutor 将支持用于任务和初始化器的可调用对象。同样,在两种情况下,参数都将基本不受限制。可调用对象和参数通常会在发送到工作线程的解释器时进行序列化,例如使用 pickle,就像 ProcessPoolExecutor 的工作方式一样。这与 Interpreter.call() 形成对比,后者(至少在最初)将受到更多限制。

工作线程之间,或执行器(或通常是其解释器)与工作线程之间的通信,仍然可以通过 interpreters.Queue 对象进行,这些对象由初始化器设置。

sys.implementation.supports_isolated_interpreters

Python 实现不需要支持子解释器,尽管大多数主要实现都支持。如果某个实现支持它们,则 sys.implementation.supports_isolated_interpreters 将设置为 True。否则将设置为 False。如果该功能不受支持,则导入 interpreters 模块将引发 ImportError

示例

以下示例演示了多个解释器可能很有用的实际案例。

示例 1

有一系列传入的请求,将通过子线程中的工作线程进行处理。

  • 每个工作线程都有自己的解释器
  • 有一个队列用于向工作线程发送任务,另一个队列用于返回结果
  • 结果在专用线程中处理
  • 每个工作线程持续运行,直到收到“停止”哨兵(None
  • 结果处理程序持续运行,直到所有工作线程都停止
import interpreters
from mymodule import iter_requests, handle_result

tasks = interpreters.create_queue()
results = interpreters.create_queue()

numworkers = 20
threads = []

def results_handler():
    running = numworkers
    while running:
        try:
            res = results.get(timeout=0.1)
        except interpreters.QueueEmpty:
            # No workers have finished a request since last time.
            pass
        else:
            if res is None:
                # A worker has stopped.
                running -= 1
            else:
                handle_result(res)
    empty = object()
    assert results.get_nowait(empty) is empty
threads.append(threading.Thread(target=results_handler))

def worker():
    interp = interpreters.create()
    interp.prepare_main(tasks=tasks, results=results)
    interp.exec("""if True:
        from mymodule import handle_request, capture_exception

        while True:
            req = tasks.get()
            if req is None:
                # Stop!
                break
            try:
                res = handle_request(req)
            except Exception as exc:
                res = capture_exception(exc)
            results.put(res)
        # Notify the results handler.
        results.put(None)
        """)
threads.extend(threading.Thread(target=worker) for _ in range(numworkers))

for t in threads:
    t.start()

for req in iter_requests():
    tasks.put(req)
# Send the "stop" signal.
for _ in range(numworkers):
    tasks.put(None)

for t in threads:
    t.join()

示例 2

此案例类似于上一个案例,因为有一堆子线程中的工作线程。但是,这次代码正在将一个大型数据数组分割成块,其中每个工作线程一次处理一个块。将该数据复制到每个解释器将极其低效,因此代码利用直接共享 memoryview 缓冲区。

  • 所有解释器共享源数组的缓冲区
  • 每个解释器将其结果写入第二个共享缓冲区
  • 使用队列向工作线程发送任务
  • 只有一个工作线程会读取源数组中的任何给定索引
  • 只有一个工作线程会写入结果中的任何给定索引(这就是它确保线程安全的方式)
import interpreters
import queue
from mymodule import read_large_data_set, use_results

numworkers = 3
data, chunksize = read_large_data_set()
buf = memoryview(data)
numchunks = (len(buf) + 1) / chunksize
results = memoryview(b'\0' * numchunks)

tasks = interpreters.create_queue()

def worker(id):
    interp = interpreters.create()
    interp.prepare_main(data=buf, results=results, tasks=tasks)
    interp.exec("""if True:
        from mymodule import reduce_chunk

        while True:
            req = tasks.get()
            if res is None:
                # Stop!
                break
            resindex, start, end = req
            chunk = data[start: end]
            res = reduce_chunk(chunk)
            results[resindex] = res
        """)
threads = [threading.Thread(target=worker) for _ in range(numworkers)]
for t in threads:
    t.start()

for i in range(numchunks):
    # Assume there's at least one worker running still.
    start = i * chunksize
    end = start + chunksize
    if end > len(buf):
        end = len(buf)
    tasks.put((start, end, i))
# Send the "stop" signal.
for _ in range(numworkers):
    tasks.put(None)

for t in threads:
    t.join()

use_results(results)

基本原理

最小 API

由于核心开发团队没有真正体验用户如何在 Python 代码中使用多个解释器,因此本提案有意将初始 API 保持尽可能精简和最少。目标是提供一个经过深思熟虑的基础,以便以后根据需要添加更多(更高级)的功能。

也就是说,拟议的设计结合了从社区中现有子解释器使用、现有标准库模块以及其他编程语言中吸取的经验教训。它还考虑了在 CPython 测试套件中使用子解释器以及在 并发基准测试 中使用它们的经验。

create(),create_queue()

通常,用户调用类型以创建该类型的实例,此时对象的资源将被供应。 interpreters 模块采用了一种不同的方法,用户必须调用 create() 以获取新的解释器或 create_queue() 以获取新的队列。直接调用 interpreters.Interpreter() 仅返回现有解释器周围的包装器(interpreters.Queue() 也是如此)。

这是因为解释器(和队列)是特殊的资源。它们在进程中全局存在,并且不受当前解释器管理/拥有。因此,interpreters 模块使创建解释器(或队列)成为与创建 interpreters.Interpreter(或 interpreters.Queue)实例明显不同的操作。

Interpreter.prepare_main() 设置多个变量

prepare_main() 可以被视为一种设置函数。它支持一次设置多个名称,例如 interp.prepare_main(spam=1, eggs=2),而大多数设置函数一次设置一个项目。主要原因是为了效率。

要在解释器的 __main__.__dict__ 中设置值,实现必须首先将操作系统线程切换到已识别的解释器,这涉及一些不可忽略的开销。设置值后,它必须切换回来。此外,在解释器之间传递对象的机制还有一些额外的开销,如果一次设置多个值,则可以总体上减少这些开销。

因此,prepare_main() 支持一次设置多个值。

传播异常

来自子解释器的未捕获异常(通过 Interpreter.exec())可以(有效地)被忽略,就像 threading.Thread() 所做的那样,或者被传播,就像内置的 exec() 所做的那样。由于 Interpreter.exec() 是一个同步操作,就像内置的 exec() 一样,未捕获的异常会被传播。

但是,此类异常不会直接引发。这是因为解释器彼此隔离,并且不得共享对象,包括异常。这可以通过引发异常的代理来解决,无论是摘要、副本还是包装它的代理。任何这些都可以保留跟踪堆栈,这对于调试很有用。引发的 ExecutionFailed 就是这样的代理。

还需要考虑另一个问题。如果传播的异常没有立即被捕获,它将沿着调用栈向上冒泡,直到被捕获(或未被捕获)。如果其他地方的代码可能会捕获它,则有助于识别该异常来自子解释器(即“远程”源),而不是来自当前解释器。这就是 Interpreter.exec() 引发 ExecutionFailed 的原因,以及它是一个普通 Exception 而不是副本或具有与原始异常匹配的类的代理的原因。例如,来自子解释器的未捕获 ValueError 永远不会在以后的 try: ... except ValueError: ... 中被捕获。相反,必须直接处理 ExecutionFailed

相反,从 Interpreter.call() 传播的异常不涉及 ExecutionFailed,而是直接引发,就像起源于调用解释器一样。这是因为 Interpreter.call() 是一种更高级别的方法,它使用 pickle 来支持通常无法在解释器之间传递的对象。

有限的对象共享

解释器隔离 中所述,只有少量内置对象才能真正地在解释器之间共享。在所有其他情况下,对象只能通过副本或代理间接共享。

可以通过队列(以及 Interpreter.prepare_main())作为副本共享的对象集为了效率而受到限制。

支持共享所有对象(通过 pickle)是可能的,但不在本提案范围内。一方面,在这些情况下知道仅使用了高效的实现很有帮助。此外,在这些情况下,通过 pickle 支持可变对象将违反“共享”对象等效(并保持这种状态)的保证。

对象与 ID 代理

对于解释器和队列,低级模块都使用代理对象,这些对象通过其对应的进程全局 ID 公开底层状态。在这两种情况下,状态也都是进程全局的,并且将被多个解释器使用。因此,它们不适合作为 PyObject 实现,这实际上只是解释器特定数据的一种选择。这就是 interpreters 模块提供通过 ID 弱关联的对象的原因。

被拒绝的想法

请参阅 PEP 554


来源:https://github.com/python/peps/blob/main/peps/pep-0734.rst

上次修改时间:2024-04-10 21:49:06 GMT