Following system colour scheme Selected dark colour scheme Selected light colour scheme

Python 增强提案

PEP 3148 – futures - 异步执行计算

作者:
Brian Quinlan <brian at sweetapp.com>
状态:
最终版
类型:
标准规范
创建日期:
2009年10月16日
Python 版本:
3.2
历史记录:


目录

摘要

本 PEP 提出了一种用于构建包的设计,该包可以方便地使用线程和进程来评估可调用对象。

动机

Python 目前拥有强大的基本功能来构建多线程和多进程应用程序,但并行化简单的操作需要大量的工作,例如显式启动进程/线程,构建工作/结果队列,以及等待完成或其他终止条件(例如失败、超时)。当每个组件都发明自己的并行执行策略时,设计一个具有全局进程/线程限制的应用程序也很困难。

规范

命名

建议的包将称为“futures”,并将位于新的“concurrent”顶级包中。将 futures 库推入“concurrent”命名空间背后的基本原理有多个方面。首先,也是最简单的,是为了避免与现有的“from __future__ import x”习惯用法产生任何混淆,该习惯用法在 Python 中已经使用了很长时间。此外,人们认为,在名称前加上“concurrent”前缀可以完全表示该库与什么相关——即并发——这应该可以消除任何额外的歧义,因为有人注意到,并非社区中的每个人都熟悉 Java Futures,或者除了与美国股市相关之外的 Futures 这个术语。

最后,我们正在为标准库开辟一个新的命名空间——显然名为“concurrent”。我们希望将来能够在此基础上添加或移动现有的与并发相关的库。一个典型的例子是 multiprocessing.Pool 的工作,以及该模块中包含的其他“附加组件”,这些组件跨线程和进程边界工作。

接口

建议的包提供了两个核心类:ExecutorFuture。一个 Executor 接收异步工作请求(以可调用对象及其参数的形式),并返回一个 Future 来表示该工作请求的执行。

执行器

Executor 是一个抽象类,提供异步执行调用的方法。

submit(fn, *args, **kwargs)

调度可调用对象以 fn(*args, **kwargs) 的形式执行,并返回一个 Future 实例,表示该可调用对象的执行。

这是一个抽象方法,必须由 Executor 子类实现。

map(func, *iterables, timeout=None)

等效于 map(func, *iterables),但 func 是异步执行的,并且可以同时进行多次对 func 的调用。如果调用 __next__() 并且结果在从最初调用 map() 开始的 timeout 秒后仍不可用,则返回的迭代器将引发 TimeoutError。如果未指定 timeout 或为 None,则等待时间没有限制。如果调用引发异常,则在从迭代器中检索其值时将引发该异常。

shutdown(wait=True)

向执行器发出信号,指示当当前挂起的 future 执行完毕时,它应该释放正在使用的任何资源。在 shutdown 后进行的 Executor.submitExecutor.map 调用将引发 RuntimeError

如果 wait 为 True,则此方法将不会返回,直到所有挂起的 future 都执行完毕并且与执行器关联的资源都已释放。如果 wait 为 False,则此方法将立即返回,并且当所有挂起的 future 都执行完毕时,与执行器关联的资源将被释放。无论 wait 的值为多少,整个 Python 程序都不会退出,直到所有挂起的 future 都执行完毕。

__enter__()
__exit__(exc_type, exc_val, exc_tb)
当使用执行器作为上下文管理器时,__exit__ 将调用 Executor.shutdown(wait=True)

进程池执行器

ProcessPoolExecutor 类是 Executor 的一个子类,它使用一个进程池来异步执行调用。传递给 ProcessPoolExecutor.submit 的可调用对象和参数必须是可 pickle 的,其限制与 multiprocessing 模块相同。

从提交给 ProcessPoolExecutor 的可调用对象内部调用 ExecutorFuture 方法会导致死锁。

__init__(max_workers)

使用最多 max_workers 个进程的进程池异步执行调用。如果 max_workersNone 或未给出,则将创建与机器处理器数量相同的 worker 进程。

线程池执行器

ThreadPoolExecutor 类是 Executor 的一个子类,它使用一个线程池来异步执行调用。

当与 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。例如

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

以及

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

__init__(max_workers)

使用最多 max_workers 个线程的线程池异步执行调用。

Future 对象

Future 类封装了可调用对象的异步执行。Future 实例由 Executor.submit 返回。

cancel()

尝试取消调用。如果当前正在执行调用,则无法取消,并且该方法将返回 False,否则将取消调用,并且该方法将返回 True

cancelled()

如果成功取消了调用,则返回 True

running()

如果当前正在执行调用并且无法取消,则返回 True

done()

如果成功取消了调用或调用已完成运行,则返回 True

result(timeout=None)

返回调用返回的值。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果在 timeout 秒内调用未完成,则将引发 TimeoutError。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则将引发 CancelledError

如果调用引发异常,则此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果在 timeout 秒内调用未完成,则将引发 TimeoutError。如果未指定 timeout 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则将引发 CancelledError

如果调用完成而未引发异常,则返回 None

add_done_callback(fn)

将可调用对象 fn 附加到 future,当 future 被取消或完成运行时,将调用该可调用对象。fn 将以 future 作为其唯一参数被调用。

添加的可调用对象按添加顺序调用,并且始终在添加它们的进程所属的线程中调用。如果可调用对象引发 Exception,则将其记录并忽略。如果可调用对象引发其他 BaseException,则行为未定义。

如果 future 已经完成或被取消,则将立即调用 fn

内部 Future 方法

以下 Future 方法旨在用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

应由 Executor 实现调用,以在执行与 Future 关联的工作之前调用。

如果该方法返回 False,则 Future 已被取消,即调用了 Future.cancel 并返回了 True。任何等待 Future 完成的线程(例如通过 as_completed()wait())都将被唤醒。

如果该方法返回 True,则 Future 未被取消,并且已处于运行状态,即对 Future.running() 的调用将返回 True

此方法只能调用一次,并且不能在调用 Future.set_result()Future.set_exception() 之后调用。

set_result(result)

设置与 Future 关联的工作的结果。

set_exception(exception)

将与 Future 关联的工作的结果设置为给定的 Exception

模块函数

wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 *fs* 给出的(可能由不同的 Executor 实例创建的)Future 实例完成。返回一个命名的包含两个集合的 2 元组。第一个集合,名为“done”,包含在等待完成之前完成(结束或被取消)的 Future。第二个集合,名为“not_done”,包含未完成的 Future。

timeout 可用于控制在返回之前等待的最大秒数。如果未指定 timeout 或为 None,则等待时间没有限制。

return_when 指示方法何时应返回。它必须是以下常量之一

常量 描述
FIRST_COMPLETED 当任何 Future 完成或被取消时,方法将返回。
FIRST_EXCEPTION 当任何 Future 通过引发异常完成时,方法将返回。如果将来没有引发异常,则等效于 ALL_COMPLETED。
ALL_COMPLETED 当所有调用完成时,方法将返回。

as_completed(fs, timeout=None)

返回由 *fs* 给出的 Future 实例上的迭代器,该迭代器在它们完成(结束或被取消)时生成 Future。在调用 as_completed() 之前完成的任何 Future 将首先被生成。如果调用 __next__() 且结果在从最初调用 as_completed() 开始的 *timeout* 秒后不可用,则返回的迭代器会引发 TimeoutError。如果未指定 *timeout* 或为 None,则等待时间没有限制。

Future 实例可以由不同的 Executor 实例创建。

检查素数示例

from concurrent import futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime,
                                                      PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

网页爬取示例

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()

基本原理

此模块的建议设计受到 Java java.util.concurrent 包 [1] 的强烈影响。与 Java 一样,该模块的概念基础是 Future 类,它表示异步计算的进度和结果。Future 类对所使用的计算模式几乎没有承诺,例如,它可以用于表示延迟或急切计算,用于使用线程、进程或远程过程调用进行计算。

Future 由 Executor 类的具体实现(在 Java 中称为 ExecutorService)创建。参考实现提供了使用进程或线程池来急切地计算计算的类。

Future 已经在 Python 中作为流行的 Python 食谱配方的一部分出现 [2],并在 Python-3000 邮件列表中进行了讨论 [3]

建议的设计是显式的,即它要求客户端意识到他们正在使用 Future。可以设计一个返回代理对象(类似于 weakref)的模块,这些对象可以透明地使用。可以在建议的显式机制之上构建代理实现。

建议的设计没有对 Python 语言语法或语义进行任何更改。可以引入特殊语法 [4] 以将函数和方法调用标记为异步。当操作急切地异步评估时,将返回代理结果,并且只有在操作完成之前使用代理对象时,执行才会阻塞。

Anh Hai Trinh 提出了一个更简单但更有限的 API 概念 [5],并且该 API 已在 stdlib-sig 上进行了详细讨论 [6]

建议的设计在 Python-Dev 邮件列表中进行了讨论 [7]。根据这些讨论,进行了以下更改

  • Executor 类已成为抽象基类
  • 由于缺乏令人信服的用例,因此删除了 Future.remove_done_callback 方法
  • Future.add_done_callback 方法已修改为允许多次添加相同的可调用对象
  • Future 类的变异方法得到了更好的文档记录,以表明它们对创建它们的 Executor 是私有的

参考实现

参考实现 [8] 包含建议设计的完整实现。它已在 Linux 和 Mac OS X 上进行了测试。

参考文献


来源:https://github.com/python/peps/blob/main/peps/pep-3148.rst

上次修改:2023-09-09 17:39:29 GMT