Following system colour scheme - Python 增强提案 Selected dark colour scheme - Python 增强提案 Selected light colour scheme - Python 增强提案

Python 增强提案

PEP 3148 – futures - 异步执行计算

作者:
Brian Quinlan <brian at sweetapp.com>
状态:
最终版
类型:
标准跟踪
创建日期:
2009年10月16日
Python 版本:
3.2
发布历史:


目录

摘要

本 PEP 提出了一个包的设计,用于使用线程和进程促进可调用对象的评估。

动机

Python 目前拥有强大的原语来构建多线程和多进程应用程序,但并行化简单的操作需要大量工作,例如:显式启动进程/线程,构建工作/结果队列,以及等待完成或其他终止条件(例如:失败,超时)。当每个组件都发明自己的并行执行策略时,设计一个具有全局进程/线程限制的应用程序也很困难。

规范

命名

提议的包将被称为“futures”,并将位于一个新的“concurrent”顶级包中。“futures”库被推入“concurrent”命名空间的理由有多个组成部分。第一个也是最简单的一个是防止与 Python 中长期使用的现有“from __future__ import x”习惯用法产生任何混淆。此外,人们认为在名称前添加“concurrent”完全表示该库与什么相关——即并发——这应该消除任何额外的歧义,因为已经有人指出社区中并非所有人都熟悉 Java Futures 或 Futures 术语,除非它与美国股市有关。

最后;我们正在为标准库开辟一个新的命名空间——显然命名为“concurrent”。我们希望将来能将现有或新增的并发相关库添加到其中。一个主要例子是 multiprocessing.Pool 工作,以及该模块中包含的其他“附加组件”,它们跨越线程和进程边界工作。

接口

提议的包提供了两个核心类:ExecutorFutureExecutor 接收异步工作请求(以可调用对象及其参数的形式)并返回一个 Future 来表示该工作请求的执行。

Executor

Executor 是一个抽象类,提供异步执行调用的方法。

submit(fn, *args, **kwargs)

安排可调用对象以 fn(*args, **kwargs) 形式执行,并返回一个表示可调用对象执行的 Future 实例。

这是一个抽象方法,必须由 Executor 子类实现。

map(func, *iterables, timeout=None)

等同于 map(func, *iterables),但 func 是异步执行的,并且可以并发地进行多次对 func 的调用。如果调用 __next__() 并且在从原始调用 map() 经过 *timeout* 秒后结果仍不可用,则返回的迭代器会引发 TimeoutError。如果未指定 *timeout* 或为 None,则等待时间没有限制。如果调用引发异常,则当从迭代器中检索其值时,将引发该异常。

shutdown(wait=True)

通知执行器,当当前待处理的 future 执行完成后,它应该释放正在使用的任何资源。在 shutdown 后对 Executor.submitExecutor.map 的调用将引发 RuntimeError

如果 wait 为 True,则此方法将一直等到所有待处理的 future 执行完成并且与执行器关联的资源已释放后才返回。如果 wait 为 False,则此方法将立即返回,并且当所有待处理的 future 执行完成后,与执行器关联的资源将被释放。无论 wait 的值如何,整个 Python 程序都不会退出,直到所有待处理的 future 执行完成。

__enter__()
__exit__(exc_type, exc_val, exc_tb)
当将执行器用作上下文管理器时,__exit__ 将调用 Executor.shutdown(wait=True)

ProcessPoolExecutor

ProcessPoolExecutor 类是 Executor 的子类,它使用进程池异步执行调用。传递给 ProcessPoolExecutor.submit 的可调用对象和参数必须根据与 multiprocessing 模块相同的限制进行可序列化。

从提交给 ProcessPoolExecutor 的可调用对象内部调用 ExecutorFuture 方法将导致死锁。

__init__(max_workers)

使用最多 *max_workers* 个进程池异步执行调用。如果 *max_workers* 为 None 或未给出,则将创建与机器处理器数量相同的 worker 进程。

ThreadPoolExecutor

ThreadPoolExecutor 类是 Executor 的子类,它使用线程池异步执行调用。

当与 Future 关联的可调用对象等待另一个 Future 的结果时,可能会发生死锁。例如

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

__init__(max_workers)

使用最多 *max_workers* 个线程池异步执行调用。

Future 对象

Future 类封装了可调用对象的异步执行。Future 实例由 Executor.submit 返回。

cancel()

尝试取消调用。如果调用当前正在执行,则无法取消,方法将返回 False;否则,调用将被取消,方法将返回 True

cancelled()

如果调用成功取消,则返回 True

running()

如果调用当前正在执行且无法取消,则返回 True

done()

如果调用已成功取消或完成运行,则返回 True

result(timeout=None)

返回调用返回的值。如果调用尚未完成,则此方法将等待最多 *timeout* 秒。如果调用在 *timeout* 秒内未完成,则将引发 TimeoutError。如果未指定 *timeout* 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则将引发 CancelledError

如果调用引发异常,则此方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果调用尚未完成,则此方法将等待最多 *timeout* 秒。如果调用在 *timeout* 秒内未完成,则将引发 TimeoutError。如果未指定 *timeout* 或为 None,则等待时间没有限制。

如果 future 在完成之前被取消,则将引发 CancelledError

如果调用完成而未引发异常,则返回 None

add_done_callback(fn)

将一个可调用对象 *fn* 附加到 future,当 future 被取消或完成运行时将调用 *fn*。*fn* 将以 future 作为其唯一参数被调用。

添加的可调用对象按照添加的顺序被调用,并且总是在添加它们的进程所属的线程中被调用。如果可调用对象引发 Exception,则它将被记录并忽略。如果可调用对象引发另一个 BaseException,则行为未定义。

如果 future 已经完成或被取消,则 *fn* 将立即被调用。

内部 Future 方法

以下 Future 方法旨在用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

应由 Executor 实现者在执行与 Future 关联的工作之前调用。

如果方法返回 False,则 Future 已被取消,即 Future.cancel 已被调用并返回 True。所有等待 Future 完成的线程(例如通过 as_completed()wait())将被唤醒。

如果方法返回 True,则 Future 未被取消并已进入运行状态,即对 Future.running() 的调用将返回 True

此方法只能调用一次,并且不能在 Future.set_result()Future.set_exception() 被调用之后调用。

set_result(result)

设置与 Future 关联的工作结果。

set_exception(exception)

将与 Future 关联的工作结果设置为给定的 Exception

模块函数

wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 *fs* 给出的 Future 实例(可能由不同的 Executor 实例创建)完成。返回一个由两个集合组成的命名 2 元组。第一个集合名为“done”,包含在等待完成之前已完成(完成或被取消)的 future。第二个集合名为“not_done”,包含未完成的 future。

*timeout* 可用于控制返回前等待的最大秒数。如果未指定 *timeout* 或为 None,则等待时间没有限制。

*return_when* 指示方法何时返回。它必须是以下常量之一

常量 描述
FIRST_COMPLETED 当任何 future 完成或被取消时,方法将返回。
FIRST_EXCEPTION 当任何 future 通过引发异常完成时,方法将返回。如果没有 future 引发异常,则等同于 ALL_COMPLETED。
ALL_COMPLETED 当所有调用完成时,方法将返回。

as_completed(fs, timeout=None)

返回一个迭代器,遍历由 *fs* 给定的 Future 实例,这些实例在完成时(完成或被取消)被 yield。在调用 as_completed() 之前完成的任何 future 将首先被 yield。如果调用 __next__() 并且在从原始调用 as_completed() 经过 *timeout* 秒后结果仍不可用,则返回的迭代器会引发 TimeoutError。如果未指定 *timeout* 或为 None,则等待时间没有限制。

Future 实例可以由不同的 Executor 实例创建。

检查素数示例

from concurrent import futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime,
                                                      PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

网页抓取示例

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()

基本原理

本模块的提议设计深受 Java java.util.concurrent 包 [1] 的影响。该模块的概念基础,与 Java 中一样,是 Future 类,它表示异步计算的进度和结果。Future 类对所使用的评估模式没有做出太多承诺,例如,它可用于表示惰性或急切评估,用于使用线程、进程或远程过程调用的评估。

Futures 由 Executor 类的具体实现(在 Java 中称为 ExecutorService)创建。参考实现提供了使用进程池或线程池来急切评估计算的类。

Futures 已经在 Python 中作为流行的 Python 食谱配方 [2] 的一部分出现,并在 Python-3000 邮件列表 [3] 上进行了讨论。

提议的设计是明确的,即它要求客户端知道他们正在使用 Futures。可以设计一个返回代理对象(weakref 风格)的模块,这些代理对象可以透明地使用。可以在提议的明确机制之上构建代理实现。

提议的设计不会对 Python 语言的语法或语义引入任何更改。可以引入特殊语法 [4] 来将函数和方法调用标记为异步。在操作异步急切评估时将返回一个代理结果,并且只有在操作完成之前使用代理对象时才会阻塞执行。

Anh Hai Trinh 提出了一个更简单但更有限的 API 概念 [5],并且该 API 已在 stdlib-sig 上进行了详细讨论 [6]

提议的设计在 Python-Dev 邮件列表 [7] 上进行了讨论。在这些讨论之后,进行了以下更改

  • Executor 类被制作成一个抽象基类
  • 由于缺乏令人信服的用例,Future.remove_done_callback 方法被删除
  • Future.add_done_callback 方法被修改为允许多次添加相同的可调用对象
  • Future 类的修改方法被更好地记录,以表明它们是创建它们的 Executor 的私有方法

参考实现

参考实现 [8] 包含提议设计的完整实现。它已在 Linux 和 Mac OS X 上进行了测试。

参考资料


来源: https://github.com/python/peps/blob/main/peps/pep-3148.rst

最后修改: 2025-02-01 08:59:27 GMT