PEP 3148 – futures - 异步执行计算
- 作者:
- Brian Quinlan <brian at sweetapp.com>
- 状态:
- 最终版
- 类型:
- 标准规范
- 创建日期:
- 2009年10月16日
- Python 版本:
- 3.2
- 历史记录:
摘要
本 PEP 提出了一种用于构建包的设计,该包可以方便地使用线程和进程来评估可调用对象。
动机
Python 目前拥有强大的基本功能来构建多线程和多进程应用程序,但并行化简单的操作需要大量的工作,例如显式启动进程/线程,构建工作/结果队列,以及等待完成或其他终止条件(例如失败、超时)。当每个组件都发明自己的并行执行策略时,设计一个具有全局进程/线程限制的应用程序也很困难。
规范
命名
建议的包将称为“futures”,并将位于新的“concurrent”顶级包中。将 futures 库推入“concurrent”命名空间背后的基本原理有多个方面。首先,也是最简单的,是为了避免与现有的“from __future__ import x”习惯用法产生任何混淆,该习惯用法在 Python 中已经使用了很长时间。此外,人们认为,在名称前加上“concurrent”前缀可以完全表示该库与什么相关——即并发——这应该可以消除任何额外的歧义,因为有人注意到,并非社区中的每个人都熟悉 Java Futures,或者除了与美国股市相关之外的 Futures 这个术语。
最后,我们正在为标准库开辟一个新的命名空间——显然名为“concurrent”。我们希望将来能够在此基础上添加或移动现有的与并发相关的库。一个典型的例子是 multiprocessing.Pool 的工作,以及该模块中包含的其他“附加组件”,这些组件跨线程和进程边界工作。
接口
建议的包提供了两个核心类:Executor
和 Future
。一个 Executor
接收异步工作请求(以可调用对象及其参数的形式),并返回一个 Future
来表示该工作请求的执行。
执行器
Executor
是一个抽象类,提供异步执行调用的方法。
submit(fn, *args, **kwargs)
调度可调用对象以fn(*args, **kwargs)
的形式执行,并返回一个Future
实例,表示该可调用对象的执行。这是一个抽象方法,必须由 Executor 子类实现。
map(func, *iterables, timeout=None)
等效于map(func, *iterables)
,但 func 是异步执行的,并且可以同时进行多次对 func 的调用。如果调用__next__()
并且结果在从最初调用map()
开始的 timeout 秒后仍不可用,则返回的迭代器将引发TimeoutError
。如果未指定 timeout 或为None
,则等待时间没有限制。如果调用引发异常,则在从迭代器中检索其值时将引发该异常。
shutdown(wait=True)
向执行器发出信号,指示当当前挂起的 future 执行完毕时,它应该释放正在使用的任何资源。在 shutdown 后进行的Executor.submit
和Executor.map
调用将引发RuntimeError
。如果 wait 为
True
,则此方法将不会返回,直到所有挂起的 future 都执行完毕并且与执行器关联的资源都已释放。如果 wait 为False
,则此方法将立即返回,并且当所有挂起的 future 都执行完毕时,与执行器关联的资源将被释放。无论 wait 的值为多少,整个 Python 程序都不会退出,直到所有挂起的 future 都执行完毕。
__enter__()
__exit__(exc_type, exc_val, exc_tb)
当使用执行器作为上下文管理器时,__exit__
将调用Executor.shutdown(wait=True)
。
进程池执行器
ProcessPoolExecutor
类是 Executor
的一个子类,它使用一个进程池来异步执行调用。传递给 ProcessPoolExecutor.submit
的可调用对象和参数必须是可 pickle 的,其限制与 multiprocessing 模块相同。
从提交给 ProcessPoolExecutor
的可调用对象内部调用 Executor
或 Future
方法会导致死锁。
__init__(max_workers)
使用最多 max_workers 个进程的进程池异步执行调用。如果 max_workers 为None
或未给出,则将创建与机器处理器数量相同的 worker 进程。
线程池执行器
ThreadPoolExecutor
类是 Executor
的一个子类,它使用一个线程池来异步执行调用。
当与 Future
关联的可调用对象等待另一个 Future
的结果时,可能会发生死锁。例如
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
以及
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
__init__(max_workers)
使用最多 max_workers 个线程的线程池异步执行调用。
Future 对象
Future
类封装了可调用对象的异步执行。Future
实例由 Executor.submit
返回。
cancel()
尝试取消调用。如果当前正在执行调用,则无法取消,并且该方法将返回False
,否则将取消调用,并且该方法将返回True
。
cancelled()
如果成功取消了调用,则返回True
。
running()
如果当前正在执行调用并且无法取消,则返回True
。
done()
如果成功取消了调用或调用已完成运行,则返回True
。
result(timeout=None)
返回调用返回的值。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果在 timeout 秒内调用未完成,则将引发TimeoutError
。如果未指定 timeout 或为None
,则等待时间没有限制。如果 future 在完成之前被取消,则将引发
CancelledError
。如果调用引发异常,则此方法将引发相同的异常。
exception(timeout=None)
返回调用引发的异常。如果调用尚未完成,则此方法将等待最多 timeout 秒。如果在 timeout 秒内调用未完成,则将引发TimeoutError
。如果未指定 timeout 或为None
,则等待时间没有限制。如果 future 在完成之前被取消,则将引发
CancelledError
。如果调用完成而未引发异常,则返回
None
。
add_done_callback(fn)
将可调用对象 fn 附加到 future,当 future 被取消或完成运行时,将调用该可调用对象。fn 将以 future 作为其唯一参数被调用。添加的可调用对象按添加顺序调用,并且始终在添加它们的进程所属的线程中调用。如果可调用对象引发
Exception
,则将其记录并忽略。如果可调用对象引发其他BaseException
,则行为未定义。如果 future 已经完成或被取消,则将立即调用 fn。
内部 Future 方法
以下 Future
方法旨在用于单元测试和 Executor
实现。
set_running_or_notify_cancel()
应由Executor
实现调用,以在执行与Future
关联的工作之前调用。如果该方法返回
False
,则Future
已被取消,即调用了Future.cancel
并返回了True
。任何等待Future
完成的线程(例如通过as_completed()
或wait()
)都将被唤醒。如果该方法返回
True
,则Future
未被取消,并且已处于运行状态,即对Future.running()
的调用将返回True
。此方法只能调用一次,并且不能在调用
Future.set_result()
或Future.set_exception()
之后调用。
set_result(result)
设置与Future
关联的工作的结果。
set_exception(exception)
将与Future
关联的工作的结果设置为给定的Exception
。
模块函数
wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由 *fs* 给出的(可能由不同的Executor
实例创建的)Future
实例完成。返回一个命名的包含两个集合的 2 元组。第一个集合,名为“done”,包含在等待完成之前完成(结束或被取消)的 Future。第二个集合,名为“not_done”,包含未完成的 Future。timeout 可用于控制在返回之前等待的最大秒数。如果未指定 timeout 或为 None,则等待时间没有限制。
return_when 指示方法何时应返回。它必须是以下常量之一
常量 描述 FIRST_COMPLETED
当任何 Future 完成或被取消时,方法将返回。 FIRST_EXCEPTION
当任何 Future 通过引发异常完成时,方法将返回。如果将来没有引发异常,则等效于 ALL_COMPLETED。 ALL_COMPLETED
当所有调用完成时,方法将返回。
as_completed(fs, timeout=None)
返回由 *fs* 给出的Future
实例上的迭代器,该迭代器在它们完成(结束或被取消)时生成 Future。在调用as_completed()
之前完成的任何 Future 将首先被生成。如果调用__next__()
且结果在从最初调用as_completed()
开始的 *timeout* 秒后不可用,则返回的迭代器会引发TimeoutError
。如果未指定 *timeout* 或为None
,则等待时间没有限制。
Future
实例可以由不同的Executor
实例创建。
检查素数示例
from concurrent import futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime,
PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
网页爬取示例
from concurrent import futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
def main():
with futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict(
(executor.submit(load_url, url, 60), url)
for url in URLS)
for future in futures.as_completed(future_to_url):
url = future_to_url[future]
try:
print('%r page is %d bytes' % (
url, len(future.result())))
except Exception as e:
print('%r generated an exception: %s' % (
url, e))
if __name__ == '__main__':
main()
基本原理
此模块的建议设计受到 Java java.util.concurrent 包 [1] 的强烈影响。与 Java 一样,该模块的概念基础是 Future 类,它表示异步计算的进度和结果。Future 类对所使用的计算模式几乎没有承诺,例如,它可以用于表示延迟或急切计算,用于使用线程、进程或远程过程调用进行计算。
Future 由 Executor 类的具体实现(在 Java 中称为 ExecutorService)创建。参考实现提供了使用进程或线程池来急切地计算计算的类。
Future 已经在 Python 中作为流行的 Python 食谱配方的一部分出现 [2],并在 Python-3000 邮件列表中进行了讨论 [3]。
建议的设计是显式的,即它要求客户端意识到他们正在使用 Future。可以设计一个返回代理对象(类似于 weakref
)的模块,这些对象可以透明地使用。可以在建议的显式机制之上构建代理实现。
建议的设计没有对 Python 语言语法或语义进行任何更改。可以引入特殊语法 [4] 以将函数和方法调用标记为异步。当操作急切地异步评估时,将返回代理结果,并且只有在操作完成之前使用代理对象时,执行才会阻塞。
Anh Hai Trinh 提出了一个更简单但更有限的 API 概念 [5],并且该 API 已在 stdlib-sig 上进行了详细讨论 [6]。
建议的设计在 Python-Dev 邮件列表中进行了讨论 [7]。根据这些讨论,进行了以下更改
Executor
类已成为抽象基类- 由于缺乏令人信服的用例,因此删除了
Future.remove_done_callback
方法 Future.add_done_callback
方法已修改为允许多次添加相同的可调用对象Future
类的变异方法得到了更好的文档记录,以表明它们对创建它们的Executor
是私有的
参考实现
参考实现 [8] 包含建议设计的完整实现。它已在 Linux 和 Mac OS X 上进行了测试。
参考文献
版权声明
本文档已置于公共领域。
来源:https://github.com/python/peps/blob/main/peps/pep-3148.rst